blob: 6e0d8caca2567dd75c0c65a7c662fa5e7b66a2e2 [file] [log] [blame]
Andrea Campanellaedfdbca2017-02-01 17:33:47 -08001import six
2import uuid
3import datetime
4from kombu.connection import BrokerConnection
5from kombu.messaging import Exchange, Queue, Consumer, Producer
6import subprocess
7import re
8import time, threading
9import sys, getopt
10import logging
11import os
12
13
14logfile = "veg_stats_notifier.log"
15level=logging.INFO
16logger=logging.getLogger('veg_stats_notifier')
17logger.setLevel(level)
18# create formatter
19formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")
20handler=logging.handlers.RotatingFileHandler(logfile,maxBytes=1000000, backupCount=1)
21# add formatter to handler
22handler.setFormatter(formatter)
23logger.addHandler(handler)
24
25def get_all_docker_containers():
26 p = subprocess.Popen('docker ps --no-trunc', shell=True, stdout=subprocess.PIPE)
27 firstline = True
28 dockercontainers = {}
29 while True:
30 out = p.stdout.readline()
31 if out == '' and p.poll() != None:
32 break
33 if out != '':
34 if firstline is True:
35 firstline = False
36 else:
37 fields = out.split()
38 container_fields = {}
39 container_fields['id'] = fields[0]
40 dockercontainers[fields[-1]] = container_fields
41 return dockercontainers
42
43def extract_compute_stats_from_all_vegs(dockercontainers):
44 for k,v in dockercontainers.iteritems():
45 cmd = 'sudo docker stats --no-stream=true ' + v['id']
46 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
47 firstline = True
48 while True:
49 out = p.stdout.readline()
50 if out == '' and p.poll() != None:
51 break
52 if out != '':
53 if firstline is True:
54 firstline = False
55 else:
56 fields = out.split()
57 #['CONTAINER_ID', 'CPU%', 'MEMUSE', 'UNITS', '/', 'MEMLIMIT', 'UNITS', 'MEM%', 'NET I/O', 'UNITS', '/', 'NET I/O LIMIT', 'UNITS', 'BLOCK I/O', 'UNITS', '/', 'BLOCK I/O LIMIT', 'UNITS']
58 v['cpu_util'] = fields[1][:-1]
59 if fields[6] == 'GB':
60 v['memory'] = str(float(fields[5]) * 1000)
61 else:
62 v['memory'] = fields[5]
63 if fields[3] == 'GB':
64 v['memory_usage'] = str(float(fields[2]) * 1000)
65 else:
66 v['memory_usage'] = fields[2]
67 v['network_stats'] = []
68 for intf in ['eth0', 'eth1']:
69 cmd = 'sudo docker exec ' + v['id'] + ' ifconfig ' + intf
70 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
71 out,err = p.communicate()
72 if out:
73 intf_stats = {}
74 m = re.search("RX bytes:(\d+)", str(out))
75 if m:
76 intf_stats['rx_bytes'] = m.group(1)
77 m = re.search("TX bytes:(\d+)", str(out))
78 if m:
79 intf_stats['tx_bytes'] = m.group(1)
80 m = re.search("RX packets:(\d+)", str(out))
81 if m:
82 intf_stats['rx_packets'] = m.group(1)
83 m = re.search("TX packets:(\d+)", str(out))
84 if m:
85 intf_stats['tx_packets'] = m.group(1)
86 if intf_stats:
87 intf_stats['intf'] = intf
88 v['network_stats'].append(intf_stats)
89
90def extract_dns_stats_from_all_vegs(dockercontainers):
91 for k,v in dockercontainers.iteritems():
92 cmd = 'docker exec ' + v['id'] + ' killall -10 dnsmasq'
93 p = subprocess.Popen (cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
94 (output, error) = p.communicate()
95 if error:
96 logger.error("killall dnsmasq command failed with error = %s",error)
97 continue
98 cmd = 'docker exec ' + v['id'] + ' tail -7 /var/log/syslog'
99 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
100 (output, error) = p.communicate()
101 if error:
102 logger.error("tail on dnsmasq log command failed with error = %s",error)
103 continue
104 log_list = output.splitlines()
105 i = 0
106 while i < len(log_list):
107 m = re.search('(?<=:\scache size\s)(\S*)(?=,\s),\s(\S*)(?=/)/(\S*)(?=\scache insertions re-used unexpired cache entries)', log_list[i])
108 if m == None:
109 i = i+1
110 continue;
111 v['cache_size'] = m.group(1)
112 v['replaced_unexpired_entries'] = m.group(2)
113 v['total_inserted_entries'] = m.group(3)
114 i = i+1
115 m = re.search('(?<=:\squeries forwarded\s)(\S*)(?=,),\squeries answered locally\s(\S*)(?=$)', log_list[i])
116 v['queries_forwarded'] = m.group(1)
117 v['queries_answered_locally'] = m.group(2)
118 break;
119 i = i+2
120 v['server_stats'] = []
121 while i < len(log_list):
122 m = re.search('(?<=:\sserver\s)(\S*)(?=#)#\d*:\squeries sent\s(\S*)(?=,),\sretried or failed\s(\S*)(?=$)', log_list[i])
123 if m == None:
124 i = i+1
125 continue
126 dns_server = {}
127 dns_server['id'] = m.group(1)
128 dns_server['queries_sent'] = m.group(2)
129 dns_server['queries_failed'] = m.group(3)
130 v['server_stats'].append(dns_server)
131 i = i+1
132 return dockercontainers
133
134
135keystone_tenant_id='3a397e70f64e4e40b69b6266c634d9d0'
136keystone_user_id='1e3ce043029547f1a61c1996d1a531a2'
137rabbit_user='openstack'
138rabbit_password='80608318c273f348a7c3'
139rabbit_host='10.11.10.1'
140vegservice_rabbit_exchange='vegservice'
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200141veg_publisher_id='veg_publisher'
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800142
143producer = None
144
145def setup_rabbit_mq_channel():
146 global producer
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200147 global rabbit_user, rabbit_password, rabbit_host, vegservice_rabbit_exchange,veg_publisher_id
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800148 vegservice_exchange = Exchange(vegservice_rabbit_exchange, "topic", durable=False)
149 # connections/channels
150 connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password)
151 logger.info('Connection to RabbitMQ server successful')
152 channel = connection.channel()
153 # produce
154 producer = Producer(channel, exchange=vegservice_exchange, routing_key='notifications.info')
155 p = subprocess.Popen('hostname', shell=True, stdout=subprocess.PIPE)
156 (hostname, error) = p.communicate()
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200157 veg_publisher_id = veg_publisher_id + '_on_' + hostname
158 logger.info('veg_publisher_id=%s',veg_publisher_id)
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800159
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200160def publish_veg_stats():
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800161 global producer
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200162 global keystone_tenant_id, keystone_user_id, veg_publisher_id
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800163
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200164 logger.debug('publish_veg_stats invoked')
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800165
166 dockercontainers = get_all_docker_containers()
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200167 veg_container_compute_stats = extract_compute_stats_from_all_vegs(dockercontainers)
168 veg_container_dns_stats = extract_dns_stats_from_all_vegs(dockercontainers)
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800169
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200170 for k,v in veg_container_dns_stats.iteritems():
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800171 msg = {'event_type': 'veg',
172 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200173 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800174 'timestamp':datetime.datetime.now().isoformat(),
175 'priority':'INFO',
176 'payload': {'veg_id':k,
177 'user_id':keystone_user_id,
178 'tenant_id':keystone_tenant_id
179 }
180 }
181 producer.publish(msg)
182 logger.debug('Publishing veg event: %s', msg)
183
184 compute_payload = {}
185 if 'cpu_util' in v:
186 compute_payload['cpu_util']= v['cpu_util']
187 if 'memory' in v:
188 compute_payload['memory']= v['memory']
189 if 'memory_usage' in v:
190 compute_payload['memory_usage']= v['memory_usage']
191 if ('network_stats' in v) and (v['network_stats']):
192 compute_payload['network_stats']= v['network_stats']
193 if compute_payload:
194 compute_payload['veg_id'] = k
195 compute_payload['user_id'] = keystone_user_id
196 compute_payload['tenant_id'] = keystone_tenant_id
197 msg = {'event_type': 'veg.compute.stats',
198 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200199 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800200 'timestamp':datetime.datetime.now().isoformat(),
201 'priority':'INFO',
202 'payload': compute_payload
203 }
204 producer.publish(msg)
205 logger.debug('Publishing veg.dns.cache.size event: %s', msg)
206
207 if 'cache_size' in v:
208 msg = {'event_type': 'veg.dns.cache.size',
209 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200210 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800211 'timestamp':datetime.datetime.now().isoformat(),
212 'priority':'INFO',
213 'payload': {'veg_id':k,
214 'user_id':keystone_user_id,
215 'tenant_id':keystone_tenant_id,
216 'cache_size':v['cache_size']
217 }
218 }
219 producer.publish(msg)
220 logger.debug('Publishing veg.dns.cache.size event: %s', msg)
221
222 if 'total_inserted_entries' in v:
223 msg = {'event_type': 'veg.dns.total_inserted_entries',
224 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200225 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800226 'timestamp':datetime.datetime.now().isoformat(),
227 'priority':'INFO',
228 'payload': {'veg_id':k,
229 'user_id':keystone_user_id,
230 'tenant_id':keystone_tenant_id,
231 'total_inserted_entries':v['total_inserted_entries']
232 }
233 }
234 producer.publish(msg)
235 logger.debug('Publishing veg.dns.total_inserted_entries event: %s', msg)
236
237 if 'replaced_unexpired_entries' in v:
238 msg = {'event_type': 'veg.dns.replaced_unexpired_entries',
239 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200240 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800241 'timestamp':datetime.datetime.now().isoformat(),
242 'priority':'INFO',
243 'payload': {'veg_id':k,
244 'user_id':keystone_user_id,
245 'tenant_id':keystone_tenant_id,
246 'replaced_unexpired_entries':v['replaced_unexpired_entries']
247 }
248 }
249 producer.publish(msg)
250 logger.debug('Publishing veg.dns.replaced_unexpired_entries event: %s', msg)
251
252 if 'queries_forwarded' in v:
253 msg = {'event_type': 'veg.dns.queries_forwarded',
254 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200255 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800256 'timestamp':datetime.datetime.now().isoformat(),
257 'priority':'INFO',
258 'payload': {'veg_id':k,
259 'user_id':keystone_user_id,
260 'tenant_id':keystone_tenant_id,
261 'queries_forwarded':v['queries_forwarded']
262 }
263 }
264 producer.publish(msg)
265 logger.debug('Publishing veg.dns.queries_forwarded event: %s', msg)
266
267 if 'queries_answered_locally' in v:
268 msg = {'event_type': 'veg.dns.queries_answered_locally',
269 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200270 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800271 'timestamp':datetime.datetime.now().isoformat(),
272 'priority':'INFO',
273 'payload': {'veg_id':k,
274 'user_id':keystone_user_id,
275 'tenant_id':keystone_tenant_id,
276 'queries_answered_locally':v['queries_answered_locally']
277 }
278 }
279 producer.publish(msg)
280 logger.debug('Publishing veg.dns.queries_answered_locally event: %s', msg)
281
282 if 'server_stats' in v:
283 for server in v['server_stats']:
284 msg = {'event_type': 'veg.dns.server.queries_sent',
285 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200286 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800287 'timestamp':datetime.datetime.now().isoformat(),
288 'priority':'INFO',
289 'payload': {'veg_id':k,
290 'user_id':keystone_user_id,
291 'tenant_id':keystone_tenant_id,
292 'upstream_server':server['id'],
293 'queries_sent':server['queries_sent']
294 }
295 }
296 producer.publish(msg)
297 logger.debug('Publishing veg.dns.server.queries_sent event: %s', msg)
298
299 msg = {'event_type': 'veg.dns.server.queries_failed',
300 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200301 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800302 'timestamp':datetime.datetime.now().isoformat(),
303 'priority':'INFO',
304 'payload': {'veg_id':k,
305 'user_id':keystone_user_id,
306 'tenant_id':keystone_tenant_id,
307 'upstream_server':server['id'],
308 'queries_failed':server['queries_failed']
309 }
310 }
311 producer.publish(msg)
312 logger.debug('Publishing veg.dns.server.queries_failed event: %s', msg)
313
314def periodic_publish():
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200315 publish_veg_stats()
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800316 #Publish every 5minutes
317 threading.Timer(300, periodic_publish).start()
318
319def main(argv):
320 global keystone_tenant_id, keystone_user_id, rabbit_user, rabbit_password, rabbit_host, vegservice_rabbit_exchange
321 try:
322 opts, args = getopt.getopt(argv,"",["keystone_tenant_id=","keystone_user_id=","rabbit_host=","rabbit_user=","rabbit_password=","vegservice_rabbit_exchange="])
323 except getopt.GetoptError:
324 print 'veg_stats_notifier.py keystone_tenant_id=<keystone_tenant_id> keystone_user_id=<keystone_user_id> rabbit_host=<IP addr> rabbit_user=<user> rabbit_password=<password> vegservice_rabbit_exchange=<exchange name>'
325 sys.exit(2)
326 for opt, arg in opts:
327 if opt in ("--keystone_tenant_id"):
328 keystone_tenant_id = arg
329 elif opt in ("--keystone_user_id"):
330 keystone_user_id = arg
331 elif opt in ("--rabbit_user"):
332 rabbit_user = arg
333 elif opt in ("--rabbit_password"):
334 rabbit_password = arg
335 elif opt in ("--rabbit_host"):
336 rabbit_host = arg
337 elif opt in ("--vegservice_rabbit_exchange"):
338 vegservice_rabbit_exchange = arg
339 logger.info("veg_stats_notifier args:keystone_tenant_id=%s keystone_user_id=%s rabbit_user=%s rabbit_host=%s vegservice_rabbit_exchange=%s",keystone_tenant_id,keystone_user_id,rabbit_user,rabbit_host,vegservice_rabbit_exchange)
340 setup_rabbit_mq_channel()
341 periodic_publish()
342
343if __name__ == "__main__":
344 main(sys.argv[1:])