blob: 60c9bc74c9ef9cfca78b95b4855ec84d361dbeec [file] [log] [blame]
Matteo Scandolo6288d5a2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Andrea Campanellaedfdbca2017-02-01 17:33:47 -080017import six
18import uuid
19import datetime
20from kombu.connection import BrokerConnection
21from kombu.messaging import Exchange, Queue, Consumer, Producer
22import subprocess
23import re
24import time, threading
25import sys, getopt
26import logging
27import os
28
29
30logfile = "veg_stats_notifier.log"
31level=logging.INFO
32logger=logging.getLogger('veg_stats_notifier')
33logger.setLevel(level)
34# create formatter
35formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")
36handler=logging.handlers.RotatingFileHandler(logfile,maxBytes=1000000, backupCount=1)
37# add formatter to handler
38handler.setFormatter(formatter)
39logger.addHandler(handler)
40
41def get_all_docker_containers():
42 p = subprocess.Popen('docker ps --no-trunc', shell=True, stdout=subprocess.PIPE)
43 firstline = True
44 dockercontainers = {}
45 while True:
46 out = p.stdout.readline()
47 if out == '' and p.poll() != None:
48 break
49 if out != '':
50 if firstline is True:
51 firstline = False
52 else:
53 fields = out.split()
54 container_fields = {}
55 container_fields['id'] = fields[0]
56 dockercontainers[fields[-1]] = container_fields
57 return dockercontainers
58
59def extract_compute_stats_from_all_vegs(dockercontainers):
60 for k,v in dockercontainers.iteritems():
61 cmd = 'sudo docker stats --no-stream=true ' + v['id']
62 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63 firstline = True
64 while True:
65 out = p.stdout.readline()
66 if out == '' and p.poll() != None:
67 break
68 if out != '':
69 if firstline is True:
70 firstline = False
71 else:
72 fields = out.split()
73 #['CONTAINER_ID', 'CPU%', 'MEMUSE', 'UNITS', '/', 'MEMLIMIT', 'UNITS', 'MEM%', 'NET I/O', 'UNITS', '/', 'NET I/O LIMIT', 'UNITS', 'BLOCK I/O', 'UNITS', '/', 'BLOCK I/O LIMIT', 'UNITS']
74 v['cpu_util'] = fields[1][:-1]
75 if fields[6] == 'GB':
76 v['memory'] = str(float(fields[5]) * 1000)
77 else:
78 v['memory'] = fields[5]
79 if fields[3] == 'GB':
80 v['memory_usage'] = str(float(fields[2]) * 1000)
81 else:
82 v['memory_usage'] = fields[2]
83 v['network_stats'] = []
84 for intf in ['eth0', 'eth1']:
85 cmd = 'sudo docker exec ' + v['id'] + ' ifconfig ' + intf
86 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
87 out,err = p.communicate()
88 if out:
89 intf_stats = {}
90 m = re.search("RX bytes:(\d+)", str(out))
91 if m:
92 intf_stats['rx_bytes'] = m.group(1)
93 m = re.search("TX bytes:(\d+)", str(out))
94 if m:
95 intf_stats['tx_bytes'] = m.group(1)
96 m = re.search("RX packets:(\d+)", str(out))
97 if m:
98 intf_stats['rx_packets'] = m.group(1)
99 m = re.search("TX packets:(\d+)", str(out))
100 if m:
101 intf_stats['tx_packets'] = m.group(1)
102 if intf_stats:
103 intf_stats['intf'] = intf
104 v['network_stats'].append(intf_stats)
105
106def extract_dns_stats_from_all_vegs(dockercontainers):
107 for k,v in dockercontainers.iteritems():
108 cmd = 'docker exec ' + v['id'] + ' killall -10 dnsmasq'
109 p = subprocess.Popen (cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
110 (output, error) = p.communicate()
111 if error:
112 logger.error("killall dnsmasq command failed with error = %s",error)
113 continue
114 cmd = 'docker exec ' + v['id'] + ' tail -7 /var/log/syslog'
115 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
116 (output, error) = p.communicate()
117 if error:
118 logger.error("tail on dnsmasq log command failed with error = %s",error)
119 continue
120 log_list = output.splitlines()
121 i = 0
122 while i < len(log_list):
123 m = re.search('(?<=:\scache size\s)(\S*)(?=,\s),\s(\S*)(?=/)/(\S*)(?=\scache insertions re-used unexpired cache entries)', log_list[i])
124 if m == None:
125 i = i+1
126 continue;
127 v['cache_size'] = m.group(1)
128 v['replaced_unexpired_entries'] = m.group(2)
129 v['total_inserted_entries'] = m.group(3)
130 i = i+1
131 m = re.search('(?<=:\squeries forwarded\s)(\S*)(?=,),\squeries answered locally\s(\S*)(?=$)', log_list[i])
132 v['queries_forwarded'] = m.group(1)
133 v['queries_answered_locally'] = m.group(2)
134 break;
135 i = i+2
136 v['server_stats'] = []
137 while i < len(log_list):
138 m = re.search('(?<=:\sserver\s)(\S*)(?=#)#\d*:\squeries sent\s(\S*)(?=,),\sretried or failed\s(\S*)(?=$)', log_list[i])
139 if m == None:
140 i = i+1
141 continue
142 dns_server = {}
143 dns_server['id'] = m.group(1)
144 dns_server['queries_sent'] = m.group(2)
145 dns_server['queries_failed'] = m.group(3)
146 v['server_stats'].append(dns_server)
147 i = i+1
148 return dockercontainers
149
150
151keystone_tenant_id='3a397e70f64e4e40b69b6266c634d9d0'
152keystone_user_id='1e3ce043029547f1a61c1996d1a531a2'
153rabbit_user='openstack'
154rabbit_password='80608318c273f348a7c3'
155rabbit_host='10.11.10.1'
156vegservice_rabbit_exchange='vegservice'
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200157veg_publisher_id='veg_publisher'
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800158
159producer = None
160
161def setup_rabbit_mq_channel():
162 global producer
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200163 global rabbit_user, rabbit_password, rabbit_host, vegservice_rabbit_exchange,veg_publisher_id
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800164 vegservice_exchange = Exchange(vegservice_rabbit_exchange, "topic", durable=False)
165 # connections/channels
166 connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password)
167 logger.info('Connection to RabbitMQ server successful')
168 channel = connection.channel()
169 # produce
170 producer = Producer(channel, exchange=vegservice_exchange, routing_key='notifications.info')
171 p = subprocess.Popen('hostname', shell=True, stdout=subprocess.PIPE)
172 (hostname, error) = p.communicate()
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200173 veg_publisher_id = veg_publisher_id + '_on_' + hostname
174 logger.info('veg_publisher_id=%s',veg_publisher_id)
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800175
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200176def publish_veg_stats():
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800177 global producer
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200178 global keystone_tenant_id, keystone_user_id, veg_publisher_id
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800179
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200180 logger.debug('publish_veg_stats invoked')
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800181
182 dockercontainers = get_all_docker_containers()
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200183 veg_container_compute_stats = extract_compute_stats_from_all_vegs(dockercontainers)
184 veg_container_dns_stats = extract_dns_stats_from_all_vegs(dockercontainers)
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800185
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200186 for k,v in veg_container_dns_stats.iteritems():
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800187 msg = {'event_type': 'veg',
188 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200189 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800190 'timestamp':datetime.datetime.now().isoformat(),
191 'priority':'INFO',
192 'payload': {'veg_id':k,
193 'user_id':keystone_user_id,
194 'tenant_id':keystone_tenant_id
195 }
196 }
197 producer.publish(msg)
198 logger.debug('Publishing veg event: %s', msg)
199
200 compute_payload = {}
201 if 'cpu_util' in v:
202 compute_payload['cpu_util']= v['cpu_util']
203 if 'memory' in v:
204 compute_payload['memory']= v['memory']
205 if 'memory_usage' in v:
206 compute_payload['memory_usage']= v['memory_usage']
207 if ('network_stats' in v) and (v['network_stats']):
208 compute_payload['network_stats']= v['network_stats']
209 if compute_payload:
210 compute_payload['veg_id'] = k
211 compute_payload['user_id'] = keystone_user_id
212 compute_payload['tenant_id'] = keystone_tenant_id
213 msg = {'event_type': 'veg.compute.stats',
214 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200215 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800216 'timestamp':datetime.datetime.now().isoformat(),
217 'priority':'INFO',
218 'payload': compute_payload
219 }
220 producer.publish(msg)
221 logger.debug('Publishing veg.dns.cache.size event: %s', msg)
222
223 if 'cache_size' in v:
224 msg = {'event_type': 'veg.dns.cache.size',
225 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200226 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800227 'timestamp':datetime.datetime.now().isoformat(),
228 'priority':'INFO',
229 'payload': {'veg_id':k,
230 'user_id':keystone_user_id,
231 'tenant_id':keystone_tenant_id,
232 'cache_size':v['cache_size']
233 }
234 }
235 producer.publish(msg)
236 logger.debug('Publishing veg.dns.cache.size event: %s', msg)
237
238 if 'total_inserted_entries' in v:
239 msg = {'event_type': 'veg.dns.total_inserted_entries',
240 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200241 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800242 'timestamp':datetime.datetime.now().isoformat(),
243 'priority':'INFO',
244 'payload': {'veg_id':k,
245 'user_id':keystone_user_id,
246 'tenant_id':keystone_tenant_id,
247 'total_inserted_entries':v['total_inserted_entries']
248 }
249 }
250 producer.publish(msg)
251 logger.debug('Publishing veg.dns.total_inserted_entries event: %s', msg)
252
253 if 'replaced_unexpired_entries' in v:
254 msg = {'event_type': 'veg.dns.replaced_unexpired_entries',
255 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200256 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800257 'timestamp':datetime.datetime.now().isoformat(),
258 'priority':'INFO',
259 'payload': {'veg_id':k,
260 'user_id':keystone_user_id,
261 'tenant_id':keystone_tenant_id,
262 'replaced_unexpired_entries':v['replaced_unexpired_entries']
263 }
264 }
265 producer.publish(msg)
266 logger.debug('Publishing veg.dns.replaced_unexpired_entries event: %s', msg)
267
268 if 'queries_forwarded' in v:
269 msg = {'event_type': 'veg.dns.queries_forwarded',
270 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200271 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800272 'timestamp':datetime.datetime.now().isoformat(),
273 'priority':'INFO',
274 'payload': {'veg_id':k,
275 'user_id':keystone_user_id,
276 'tenant_id':keystone_tenant_id,
277 'queries_forwarded':v['queries_forwarded']
278 }
279 }
280 producer.publish(msg)
281 logger.debug('Publishing veg.dns.queries_forwarded event: %s', msg)
282
283 if 'queries_answered_locally' in v:
284 msg = {'event_type': 'veg.dns.queries_answered_locally',
285 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200286 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800287 'timestamp':datetime.datetime.now().isoformat(),
288 'priority':'INFO',
289 'payload': {'veg_id':k,
290 'user_id':keystone_user_id,
291 'tenant_id':keystone_tenant_id,
292 'queries_answered_locally':v['queries_answered_locally']
293 }
294 }
295 producer.publish(msg)
296 logger.debug('Publishing veg.dns.queries_answered_locally event: %s', msg)
297
298 if 'server_stats' in v:
299 for server in v['server_stats']:
300 msg = {'event_type': 'veg.dns.server.queries_sent',
301 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200302 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800303 'timestamp':datetime.datetime.now().isoformat(),
304 'priority':'INFO',
305 'payload': {'veg_id':k,
306 'user_id':keystone_user_id,
307 'tenant_id':keystone_tenant_id,
308 'upstream_server':server['id'],
309 'queries_sent':server['queries_sent']
310 }
311 }
312 producer.publish(msg)
313 logger.debug('Publishing veg.dns.server.queries_sent event: %s', msg)
314
315 msg = {'event_type': 'veg.dns.server.queries_failed',
316 'message_id':six.text_type(uuid.uuid4()),
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200317 'publisher_id': veg_publisher_id,
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800318 'timestamp':datetime.datetime.now().isoformat(),
319 'priority':'INFO',
320 'payload': {'veg_id':k,
321 'user_id':keystone_user_id,
322 'tenant_id':keystone_tenant_id,
323 'upstream_server':server['id'],
324 'queries_failed':server['queries_failed']
325 }
326 }
327 producer.publish(msg)
328 logger.debug('Publishing veg.dns.server.queries_failed event: %s', msg)
329
330def periodic_publish():
Andrea Campanella08c14ca2017-03-31 16:13:09 +0200331 publish_veg_stats()
Andrea Campanellaedfdbca2017-02-01 17:33:47 -0800332 #Publish every 5minutes
333 threading.Timer(300, periodic_publish).start()
334
335def main(argv):
336 global keystone_tenant_id, keystone_user_id, rabbit_user, rabbit_password, rabbit_host, vegservice_rabbit_exchange
337 try:
338 opts, args = getopt.getopt(argv,"",["keystone_tenant_id=","keystone_user_id=","rabbit_host=","rabbit_user=","rabbit_password=","vegservice_rabbit_exchange="])
339 except getopt.GetoptError:
340 print 'veg_stats_notifier.py keystone_tenant_id=<keystone_tenant_id> keystone_user_id=<keystone_user_id> rabbit_host=<IP addr> rabbit_user=<user> rabbit_password=<password> vegservice_rabbit_exchange=<exchange name>'
341 sys.exit(2)
342 for opt, arg in opts:
343 if opt in ("--keystone_tenant_id"):
344 keystone_tenant_id = arg
345 elif opt in ("--keystone_user_id"):
346 keystone_user_id = arg
347 elif opt in ("--rabbit_user"):
348 rabbit_user = arg
349 elif opt in ("--rabbit_password"):
350 rabbit_password = arg
351 elif opt in ("--rabbit_host"):
352 rabbit_host = arg
353 elif opt in ("--vegservice_rabbit_exchange"):
354 vegservice_rabbit_exchange = arg
355 logger.info("veg_stats_notifier args:keystone_tenant_id=%s keystone_user_id=%s rabbit_user=%s rabbit_host=%s vegservice_rabbit_exchange=%s",keystone_tenant_id,keystone_user_id,rabbit_user,rabbit_host,vegservice_rabbit_exchange)
356 setup_rabbit_mq_channel()
357 periodic_publish()
358
359if __name__ == "__main__":
360 main(sys.argv[1:])