| import six |
| import uuid |
| import datetime |
| from kombu.connection import BrokerConnection |
| from kombu.messaging import Exchange, Queue, Consumer, Producer |
| import subprocess |
| import re |
| import time, threading |
| import sys, getopt |
| import logging |
| |
| |
| logfile = "vcpe_stats_notifier.log" |
| level=logging.INFO |
| logger=logging.getLogger('vcpe_stats_notifier') |
| logger.setLevel(level) |
| handler=logging.handlers.RotatingFileHandler(logfile,maxBytes=1000000, backupCount=1) |
| logger.addHandler(handler) |
| |
| def extract_dns_stats_from_all_vcpes(): |
| p = subprocess.Popen('docker ps', shell=True, stdout=subprocess.PIPE) |
| firstline = True |
| dockercontainers = {} |
| while True: |
| out = p.stdout.readline() |
| if out == '' and p.poll() != None: |
| break |
| if out != '': |
| if firstline is True: |
| firstline = False |
| else: |
| fields = out.split() |
| container_fields = {} |
| container_fields['id'] = fields[0] |
| dockercontainers[fields[-1]] = container_fields |
| for k,v in dockercontainers.iteritems(): |
| cmd = 'docker exec ' + v['id'] + ' killall -10 dnsmasq' |
| p = subprocess.Popen (cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE) |
| (output, error) = p.communicate() |
| if error: |
| logger.error("killall dnsmasq command failed with error = %s",error) |
| continue |
| cmd = 'docker exec ' + v['id'] + ' tail -7 /var/log/syslog' |
| p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) |
| (output, error) = p.communicate() |
| if error: |
| logger.error("tail on dnsmasq log command failed with error = %s",error) |
| continue |
| log_list = output.splitlines() |
| i = 0 |
| while i < len(log_list): |
| m = re.search('(?<=:\scache size\s)(\S*)(?=,\s),\s(\S*)(?=/)/(\S*)(?=\scache insertions re-used unexpired cache entries)', log_list[i]) |
| if m == None: |
| i = i+1 |
| continue; |
| v['cache_size'] = m.group(1) |
| v['replaced_unexpired_entries'] = m.group(2) |
| v['total_inserted_entries'] = m.group(3) |
| i = i+1 |
| m = re.search('(?<=:\squeries forwarded\s)(\S*)(?=,),\squeries answered locally\s(\S*)(?=$)', log_list[i]) |
| v['queries_forwarded'] = m.group(1) |
| v['queries_answered_locally'] = m.group(2) |
| break; |
| i = i+2 |
| v['server_stats'] = [] |
| while i < len(log_list): |
| m = re.search('(?<=:\sserver\s)(\S*)(?=#)#\d*:\squeries sent\s(\S*)(?=,),\sretried or failed\s(\S*)(?=$)', log_list[i]) |
| if m == None: |
| i = i+1 |
| continue |
| dns_server = {} |
| dns_server['id'] = m.group(1) |
| dns_server['queries_sent'] = m.group(2) |
| dns_server['queries_failed'] = m.group(3) |
| v['server_stats'].append(dns_server) |
| i = i+1 |
| return dockercontainers |
| |
| |
| keystone_tenant_id='3a397e70f64e4e40b69b6266c634d9d0' |
| keystone_user_id='1e3ce043029547f1a61c1996d1a531a2' |
| rabbit_user='openstack' |
| rabbit_password='80608318c273f348a7c3' |
| rabbit_host='10.11.10.1' |
| vcpeservice_rabbit_exchange='vcpeservice' |
| cpe_publisher_id='vcpe_publisher' |
| |
| producer = None |
| |
| def setup_rabbit_mq_channel(): |
| global producer |
| global rabbit_user, rabbit_password, rabbit_host, vcpeservice_rabbit_exchange,cpe_publisher_id |
| vcpeservice_exchange = Exchange(vcpeservice_rabbit_exchange, "topic", durable=False) |
| # connections/channels |
| connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password) |
| logger.info('Connection to RabbitMQ server successful') |
| channel = connection.channel() |
| # produce |
| producer = Producer(channel, exchange=vcpeservice_exchange, routing_key='notifications.info') |
| p = subprocess.Popen('hostname', shell=True, stdout=subprocess.PIPE) |
| (hostname, error) = p.communicate() |
| cpe_publisher_id = cpe_publisher_id + '_on_' + hostname |
| logger.info('cpe_publisher_id=%s',cpe_publisher_id) |
| |
| def publish_cpe_stats(): |
| global producer |
| global keystone_tenant_id, keystone_user_id, cpe_publisher_id |
| cpe_container_stats = extract_dns_stats_from_all_vcpes() |
| |
| for k,v in cpe_container_stats.iteritems(): |
| msg = {'event_type': 'vcpe', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id |
| } |
| } |
| producer.publish(msg) |
| if 'cache_size' in v: |
| msg = {'event_type': 'vcpe.dns.cache.size', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'cache_size':v['cache_size'] |
| } |
| } |
| producer.publish(msg) |
| if 'total_inserted_entries' in v: |
| msg = {'event_type': 'vcpe.dns.total_inserted_entries', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'total_inserted_entries':v['total_inserted_entries'] |
| } |
| } |
| producer.publish(msg) |
| if 'replaced_unexpired_entries' in v: |
| msg = {'event_type': 'vcpe.dns.replaced_unexpired_entries', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'replaced_unexpired_entries':v['replaced_unexpired_entries'] |
| } |
| } |
| producer.publish(msg) |
| |
| if 'queries_forwarded' in v: |
| msg = {'event_type': 'vcpe.dns.queries_forwarded', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'queries_forwarded':v['queries_forwarded'] |
| } |
| } |
| producer.publish(msg) |
| |
| if 'queries_answered_locally' in v: |
| msg = {'event_type': 'vcpe.dns.queries_answered_locally', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'queries_answered_locally':v['queries_answered_locally'] |
| } |
| } |
| producer.publish(msg) |
| |
| if 'server_stats' in v: |
| for server in v['server_stats']: |
| msg = {'event_type': 'vcpe.dns.server.queries_sent', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'upstream_server':server['id'], |
| 'queries_sent':server['queries_sent'] |
| } |
| } |
| producer.publish(msg) |
| |
| msg = {'event_type': 'vcpe.dns.server.queries_failed', |
| 'message_id':six.text_type(uuid.uuid4()), |
| 'publisher_id': cpe_publisher_id, |
| 'timestamp':datetime.datetime.now().isoformat(), |
| 'priority':'INFO', |
| 'payload': {'vcpe_id':k, |
| 'user_id':keystone_user_id, |
| 'tenant_id':keystone_tenant_id, |
| 'upstream_server':server['id'], |
| 'queries_failed':server['queries_failed'] |
| } |
| } |
| producer.publish(msg) |
| |
| def periodic_publish(): |
| publish_cpe_stats() |
| #Publish every 5minutes |
| threading.Timer(300, periodic_publish).start() |
| |
| def main(argv): |
| global keystone_tenant_id, keystone_user_id, rabbit_user, rabbit_password, rabbit_host, vcpeservice_rabbit_exchange |
| try: |
| opts, args = getopt.getopt(argv,"",["keystone_tenant_id=","keystone_user_id=","rabbit_host=","rabbit_user=","rabbit_password=","vcpeservice_rabbit_exchange="]) |
| except getopt.GetoptError: |
| print 'vcpe_stats_notifier.py keystone_tenant_id=<keystone_tenant_id> keystone_user_id=<keystone_user_id> rabbit_host=<IP addr> rabbit_user=<user> rabbit_password=<password> vcpeservice_rabbit_exchange=<exchange name>' |
| sys.exit(2) |
| for opt, arg in opts: |
| if opt in ("--keystone_tenant_id"): |
| keystone_tenant_id = arg |
| elif opt in ("--keystone_user_id"): |
| keystone_user_id = arg |
| elif opt in ("--rabbit_user"): |
| rabbit_user = arg |
| elif opt in ("--rabbit_password"): |
| rabbit_password = arg |
| elif opt in ("--rabbit_host"): |
| rabbit_host = arg |
| elif opt in ("--vcpeservice_rabbit_exchange"): |
| vcpeservice_rabbit_exchange = arg |
| logger.info("vcpe_stats_notifier args:keystone_tenant_id=%s keystone_user_id=%s rabbit_user=%s rabbit_host=%s vcpeservice_rabbit_exchange=%s",keystone_tenant_id,keystone_user_id,rabbit_user,rabbit_host,vcpeservice_rabbit_exchange) |
| setup_rabbit_mq_channel() |
| periodic_publish() |
| |
| if __name__ == "__main__": |
| main(sys.argv[1:]) |