blob: 4d2cc76162c2b607cc504ebb09d70b591dd769f1 [file] [log] [blame]
Scott Baker761e1062016-06-20 17:18:17 -07001import six
2import uuid
3import datetime
4from kombu.connection import BrokerConnection
5from kombu.messaging import Exchange, Queue, Consumer, Producer
6import subprocess
7import re
8import time, threading
9import sys, getopt
10import logging
11import os
12
13
14logfile = "vcpe_stats_notifier.log"
15level=logging.INFO
16logger=logging.getLogger('vcpe_stats_notifier')
17logger.setLevel(level)
18# create formatter
19formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")
20handler=logging.handlers.RotatingFileHandler(logfile,maxBytes=1000000, backupCount=1)
21# add formatter to handler
22handler.setFormatter(formatter)
23logger.addHandler(handler)
24
25def get_all_docker_containers():
26 p = subprocess.Popen('docker ps --no-trunc', shell=True, stdout=subprocess.PIPE)
27 firstline = True
28 dockercontainers = {}
29 while True:
30 out = p.stdout.readline()
31 if out == '' and p.poll() != None:
32 break
33 if out != '':
34 if firstline is True:
35 firstline = False
36 else:
37 fields = out.split()
38 container_fields = {}
39 container_fields['id'] = fields[0]
40 dockercontainers[fields[-1]] = container_fields
41 return dockercontainers
42
43def extract_compute_stats_from_all_vcpes(dockercontainers):
44 for k,v in dockercontainers.iteritems():
45 cmd = 'sudo docker stats --no-stream=true ' + v['id']
46 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
47 firstline = True
48 while True:
49 out = p.stdout.readline()
50 if out == '' and p.poll() != None:
51 break
52 if out != '':
53 if firstline is True:
54 firstline = False
55 else:
56 fields = out.split()
57 #['CONTAINER_ID', 'CPU%', 'MEMUSE', 'UNITS', '/', 'MEMLIMIT', 'UNITS', 'MEM%', 'NET I/O', 'UNITS', '/', 'NET I/O LIMIT', 'UNITS', 'BLOCK I/O', 'UNITS', '/', 'BLOCK I/O LIMIT', 'UNITS']
58 v['cpu_util'] = fields[1][:-1]
59 if fields[6] == 'GB':
60 v['memory'] = str(float(fields[5]) * 1000)
61 else:
62 v['memory'] = fields[5]
63 if fields[3] == 'GB':
64 v['memory_usage'] = str(float(fields[2]) * 1000)
65 else:
66 v['memory_usage'] = fields[2]
67 v['network_stats'] = []
68 for intf in ['eth0', 'eth1']:
69 cmd = 'sudo docker exec ' + v['id'] + ' ifconfig ' + intf
70 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
71 out,err = p.communicate()
72 if out:
73 intf_stats = {}
74 m = re.search("RX bytes:(\d+)", str(out))
75 if m:
76 intf_stats['rx_bytes'] = m.group(1)
77 m = re.search("TX bytes:(\d+)", str(out))
78 if m:
79 intf_stats['tx_bytes'] = m.group(1)
80 m = re.search("RX packets:(\d+)", str(out))
81 if m:
82 intf_stats['rx_packets'] = m.group(1)
83 m = re.search("TX packets:(\d+)", str(out))
84 if m:
85 intf_stats['tx_packets'] = m.group(1)
86 if intf_stats:
87 intf_stats['intf'] = intf
88 v['network_stats'].append(intf_stats)
89
90def extract_dns_stats_from_all_vcpes(dockercontainers):
91 for k,v in dockercontainers.iteritems():
92 cmd = 'docker exec ' + v['id'] + ' killall -10 dnsmasq'
93 p = subprocess.Popen (cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
94 (output, error) = p.communicate()
95 if error:
96 logger.error("killall dnsmasq command failed with error = %s",error)
97 continue
98 cmd = 'docker exec ' + v['id'] + ' tail -7 /var/log/syslog'
99 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
100 (output, error) = p.communicate()
101 if error:
102 logger.error("tail on dnsmasq log command failed with error = %s",error)
103 continue
104 log_list = output.splitlines()
105 i = 0
106 while i < len(log_list):
107 m = re.search('(?<=:\scache size\s)(\S*)(?=,\s),\s(\S*)(?=/)/(\S*)(?=\scache insertions re-used unexpired cache entries)', log_list[i])
108 if m == None:
109 i = i+1
110 continue;
111 v['cache_size'] = m.group(1)
112 v['replaced_unexpired_entries'] = m.group(2)
113 v['total_inserted_entries'] = m.group(3)
114 i = i+1
115 m = re.search('(?<=:\squeries forwarded\s)(\S*)(?=,),\squeries answered locally\s(\S*)(?=$)', log_list[i])
116 v['queries_forwarded'] = m.group(1)
117 v['queries_answered_locally'] = m.group(2)
118 break;
119 i = i+2
120 v['server_stats'] = []
121 while i < len(log_list):
122 m = re.search('(?<=:\sserver\s)(\S*)(?=#)#\d*:\squeries sent\s(\S*)(?=,),\sretried or failed\s(\S*)(?=$)', log_list[i])
123 if m == None:
124 i = i+1
125 continue
126 dns_server = {}
127 dns_server['id'] = m.group(1)
128 dns_server['queries_sent'] = m.group(2)
129 dns_server['queries_failed'] = m.group(3)
130 v['server_stats'].append(dns_server)
131 i = i+1
132 return dockercontainers
133
134
135keystone_tenant_id='3a397e70f64e4e40b69b6266c634d9d0'
136keystone_user_id='1e3ce043029547f1a61c1996d1a531a2'
137rabbit_user='openstack'
138rabbit_password='80608318c273f348a7c3'
139rabbit_host='10.11.10.1'
140vcpeservice_rabbit_exchange='vcpeservice'
141cpe_publisher_id='vcpe_publisher'
142
143producer = None
144
145def setup_rabbit_mq_channel():
146 global producer
147 global rabbit_user, rabbit_password, rabbit_host, vcpeservice_rabbit_exchange,cpe_publisher_id
148 vcpeservice_exchange = Exchange(vcpeservice_rabbit_exchange, "topic", durable=False)
149 # connections/channels
150 connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password)
151 logger.info('Connection to RabbitMQ server successful')
152 channel = connection.channel()
153 # produce
154 producer = Producer(channel, exchange=vcpeservice_exchange, routing_key='notifications.info')
155 p = subprocess.Popen('hostname', shell=True, stdout=subprocess.PIPE)
156 (hostname, error) = p.communicate()
157 cpe_publisher_id = cpe_publisher_id + '_on_' + hostname
158 logger.info('cpe_publisher_id=%s',cpe_publisher_id)
159
160def publish_cpe_stats():
161 global producer
162 global keystone_tenant_id, keystone_user_id, cpe_publisher_id
163
164 logger.debug('publish_cpe_stats invoked')
165
166 dockercontainers = get_all_docker_containers()
167 cpe_container_compute_stats = extract_compute_stats_from_all_vcpes(dockercontainers)
168 cpe_container_dns_stats = extract_dns_stats_from_all_vcpes(dockercontainers)
169
170 for k,v in cpe_container_dns_stats.iteritems():
171 msg = {'event_type': 'vcpe',
172 'message_id':six.text_type(uuid.uuid4()),
173 'publisher_id': cpe_publisher_id,
174 'timestamp':datetime.datetime.now().isoformat(),
175 'priority':'INFO',
176 'payload': {'vcpe_id':k,
177 'user_id':keystone_user_id,
178 'tenant_id':keystone_tenant_id
179 }
180 }
181 producer.publish(msg)
182 logger.debug('Publishing vcpe event: %s', msg)
183
184 compute_payload = {}
185 if 'cpu_util' in v:
186 compute_payload['cpu_util']= v['cpu_util']
187 if 'memory' in v:
188 compute_payload['memory']= v['memory']
189 if 'memory_usage' in v:
190 compute_payload['memory_usage']= v['memory_usage']
191 if ('network_stats' in v) and (v['network_stats']):
192 compute_payload['network_stats']= v['network_stats']
193 if compute_payload:
194 compute_payload['vcpe_id'] = k
195 compute_payload['user_id'] = keystone_user_id
196 compute_payload['tenant_id'] = keystone_tenant_id
197 msg = {'event_type': 'vcpe.compute.stats',
198 'message_id':six.text_type(uuid.uuid4()),
199 'publisher_id': cpe_publisher_id,
200 'timestamp':datetime.datetime.now().isoformat(),
201 'priority':'INFO',
202 'payload': compute_payload
203 }
204 producer.publish(msg)
205 logger.debug('Publishing vcpe.dns.cache.size event: %s', msg)
206
207 if 'cache_size' in v:
208 msg = {'event_type': 'vcpe.dns.cache.size',
209 'message_id':six.text_type(uuid.uuid4()),
210 'publisher_id': cpe_publisher_id,
211 'timestamp':datetime.datetime.now().isoformat(),
212 'priority':'INFO',
213 'payload': {'vcpe_id':k,
214 'user_id':keystone_user_id,
215 'tenant_id':keystone_tenant_id,
216 'cache_size':v['cache_size']
217 }
218 }
219 producer.publish(msg)
220 logger.debug('Publishing vcpe.dns.cache.size event: %s', msg)
221
222 if 'total_inserted_entries' in v:
223 msg = {'event_type': 'vcpe.dns.total_inserted_entries',
224 'message_id':six.text_type(uuid.uuid4()),
225 'publisher_id': cpe_publisher_id,
226 'timestamp':datetime.datetime.now().isoformat(),
227 'priority':'INFO',
228 'payload': {'vcpe_id':k,
229 'user_id':keystone_user_id,
230 'tenant_id':keystone_tenant_id,
231 'total_inserted_entries':v['total_inserted_entries']
232 }
233 }
234 producer.publish(msg)
235 logger.debug('Publishing vcpe.dns.total_inserted_entries event: %s', msg)
236
237 if 'replaced_unexpired_entries' in v:
238 msg = {'event_type': 'vcpe.dns.replaced_unexpired_entries',
239 'message_id':six.text_type(uuid.uuid4()),
240 'publisher_id': cpe_publisher_id,
241 'timestamp':datetime.datetime.now().isoformat(),
242 'priority':'INFO',
243 'payload': {'vcpe_id':k,
244 'user_id':keystone_user_id,
245 'tenant_id':keystone_tenant_id,
246 'replaced_unexpired_entries':v['replaced_unexpired_entries']
247 }
248 }
249 producer.publish(msg)
250 logger.debug('Publishing vcpe.dns.replaced_unexpired_entries event: %s', msg)
251
252 if 'queries_forwarded' in v:
253 msg = {'event_type': 'vcpe.dns.queries_forwarded',
254 'message_id':six.text_type(uuid.uuid4()),
255 'publisher_id': cpe_publisher_id,
256 'timestamp':datetime.datetime.now().isoformat(),
257 'priority':'INFO',
258 'payload': {'vcpe_id':k,
259 'user_id':keystone_user_id,
260 'tenant_id':keystone_tenant_id,
261 'queries_forwarded':v['queries_forwarded']
262 }
263 }
264 producer.publish(msg)
265 logger.debug('Publishing vcpe.dns.queries_forwarded event: %s', msg)
266
267 if 'queries_answered_locally' in v:
268 msg = {'event_type': 'vcpe.dns.queries_answered_locally',
269 'message_id':six.text_type(uuid.uuid4()),
270 'publisher_id': cpe_publisher_id,
271 'timestamp':datetime.datetime.now().isoformat(),
272 'priority':'INFO',
273 'payload': {'vcpe_id':k,
274 'user_id':keystone_user_id,
275 'tenant_id':keystone_tenant_id,
276 'queries_answered_locally':v['queries_answered_locally']
277 }
278 }
279 producer.publish(msg)
280 logger.debug('Publishing vcpe.dns.queries_answered_locally event: %s', msg)
281
282 if 'server_stats' in v:
283 for server in v['server_stats']:
284 msg = {'event_type': 'vcpe.dns.server.queries_sent',
285 'message_id':six.text_type(uuid.uuid4()),
286 'publisher_id': cpe_publisher_id,
287 'timestamp':datetime.datetime.now().isoformat(),
288 'priority':'INFO',
289 'payload': {'vcpe_id':k,
290 'user_id':keystone_user_id,
291 'tenant_id':keystone_tenant_id,
292 'upstream_server':server['id'],
293 'queries_sent':server['queries_sent']
294 }
295 }
296 producer.publish(msg)
297 logger.debug('Publishing vcpe.dns.server.queries_sent event: %s', msg)
298
299 msg = {'event_type': 'vcpe.dns.server.queries_failed',
300 'message_id':six.text_type(uuid.uuid4()),
301 'publisher_id': cpe_publisher_id,
302 'timestamp':datetime.datetime.now().isoformat(),
303 'priority':'INFO',
304 'payload': {'vcpe_id':k,
305 'user_id':keystone_user_id,
306 'tenant_id':keystone_tenant_id,
307 'upstream_server':server['id'],
308 'queries_failed':server['queries_failed']
309 }
310 }
311 producer.publish(msg)
312 logger.debug('Publishing vcpe.dns.server.queries_failed event: %s', msg)
313
314def periodic_publish():
315 publish_cpe_stats()
316 #Publish every 5minutes
317 threading.Timer(300, periodic_publish).start()
318
319def main(argv):
320 global keystone_tenant_id, keystone_user_id, rabbit_user, rabbit_password, rabbit_host, vcpeservice_rabbit_exchange
321 try:
322 opts, args = getopt.getopt(argv,"",["keystone_tenant_id=","keystone_user_id=","rabbit_host=","rabbit_user=","rabbit_password=","vcpeservice_rabbit_exchange="])
323 except getopt.GetoptError:
324 print 'vcpe_stats_notifier.py keystone_tenant_id=<keystone_tenant_id> keystone_user_id=<keystone_user_id> rabbit_host=<IP addr> rabbit_user=<user> rabbit_password=<password> vcpeservice_rabbit_exchange=<exchange name>'
325 sys.exit(2)
326 for opt, arg in opts:
327 if opt in ("--keystone_tenant_id"):
328 keystone_tenant_id = arg
329 elif opt in ("--keystone_user_id"):
330 keystone_user_id = arg
331 elif opt in ("--rabbit_user"):
332 rabbit_user = arg
333 elif opt in ("--rabbit_password"):
334 rabbit_password = arg
335 elif opt in ("--rabbit_host"):
336 rabbit_host = arg
337 elif opt in ("--vcpeservice_rabbit_exchange"):
338 vcpeservice_rabbit_exchange = arg
339 logger.info("vcpe_stats_notifier args:keystone_tenant_id=%s keystone_user_id=%s rabbit_user=%s rabbit_host=%s vcpeservice_rabbit_exchange=%s",keystone_tenant_id,keystone_user_id,rabbit_user,rabbit_host,vcpeservice_rabbit_exchange)
340 setup_rabbit_mq_channel()
341 periodic_publish()
342
343if __name__ == "__main__":
344 main(sys.argv[1:])