blob: 8d21ba2e651105486f3c64e0b7eff3f6dfc721f0 [file] [log] [blame]
Matteo Scandoloaca86652017-08-08 13:05:27 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Scott Baker761e1062016-06-20 17:18:17 -070017import six
18import uuid
19import datetime
20from kombu.connection import BrokerConnection
21from kombu.messaging import Exchange, Queue, Consumer, Producer
22import subprocess
23import re
24import time, threading
25import sys, getopt
26import logging
27import os
28
29
30logfile = "vcpe_stats_notifier.log"
31level=logging.INFO
32logger=logging.getLogger('vcpe_stats_notifier')
33logger.setLevel(level)
34# create formatter
35formatter = logging.Formatter("%(asctime)s;%(levelname)s;%(message)s")
36handler=logging.handlers.RotatingFileHandler(logfile,maxBytes=1000000, backupCount=1)
37# add formatter to handler
38handler.setFormatter(formatter)
39logger.addHandler(handler)
40
41def get_all_docker_containers():
42 p = subprocess.Popen('docker ps --no-trunc', shell=True, stdout=subprocess.PIPE)
43 firstline = True
44 dockercontainers = {}
45 while True:
46 out = p.stdout.readline()
47 if out == '' and p.poll() != None:
48 break
49 if out != '':
50 if firstline is True:
51 firstline = False
52 else:
53 fields = out.split()
54 container_fields = {}
55 container_fields['id'] = fields[0]
56 dockercontainers[fields[-1]] = container_fields
57 return dockercontainers
58
59def extract_compute_stats_from_all_vcpes(dockercontainers):
60 for k,v in dockercontainers.iteritems():
61 cmd = 'sudo docker stats --no-stream=true ' + v['id']
62 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
63 firstline = True
64 while True:
65 out = p.stdout.readline()
66 if out == '' and p.poll() != None:
67 break
68 if out != '':
69 if firstline is True:
70 firstline = False
71 else:
72 fields = out.split()
73 #['CONTAINER_ID', 'CPU%', 'MEMUSE', 'UNITS', '/', 'MEMLIMIT', 'UNITS', 'MEM%', 'NET I/O', 'UNITS', '/', 'NET I/O LIMIT', 'UNITS', 'BLOCK I/O', 'UNITS', '/', 'BLOCK I/O LIMIT', 'UNITS']
74 v['cpu_util'] = fields[1][:-1]
75 if fields[6] == 'GB':
76 v['memory'] = str(float(fields[5]) * 1000)
77 else:
78 v['memory'] = fields[5]
79 if fields[3] == 'GB':
80 v['memory_usage'] = str(float(fields[2]) * 1000)
81 else:
82 v['memory_usage'] = fields[2]
83 v['network_stats'] = []
84 for intf in ['eth0', 'eth1']:
85 cmd = 'sudo docker exec ' + v['id'] + ' ifconfig ' + intf
86 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
87 out,err = p.communicate()
88 if out:
89 intf_stats = {}
90 m = re.search("RX bytes:(\d+)", str(out))
91 if m:
92 intf_stats['rx_bytes'] = m.group(1)
93 m = re.search("TX bytes:(\d+)", str(out))
94 if m:
95 intf_stats['tx_bytes'] = m.group(1)
96 m = re.search("RX packets:(\d+)", str(out))
97 if m:
98 intf_stats['rx_packets'] = m.group(1)
99 m = re.search("TX packets:(\d+)", str(out))
100 if m:
101 intf_stats['tx_packets'] = m.group(1)
102 if intf_stats:
103 intf_stats['intf'] = intf
104 v['network_stats'].append(intf_stats)
105
106def extract_dns_stats_from_all_vcpes(dockercontainers):
107 for k,v in dockercontainers.iteritems():
108 cmd = 'docker exec ' + v['id'] + ' killall -10 dnsmasq'
109 p = subprocess.Popen (cmd, shell=True, stderr=subprocess.PIPE, stdout=subprocess.PIPE)
110 (output, error) = p.communicate()
111 if error:
112 logger.error("killall dnsmasq command failed with error = %s",error)
113 continue
114 cmd = 'docker exec ' + v['id'] + ' tail -7 /var/log/syslog'
115 p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE)
116 (output, error) = p.communicate()
117 if error:
118 logger.error("tail on dnsmasq log command failed with error = %s",error)
119 continue
120 log_list = output.splitlines()
121 i = 0
122 while i < len(log_list):
123 m = re.search('(?<=:\scache size\s)(\S*)(?=,\s),\s(\S*)(?=/)/(\S*)(?=\scache insertions re-used unexpired cache entries)', log_list[i])
124 if m == None:
125 i = i+1
126 continue;
127 v['cache_size'] = m.group(1)
128 v['replaced_unexpired_entries'] = m.group(2)
129 v['total_inserted_entries'] = m.group(3)
130 i = i+1
131 m = re.search('(?<=:\squeries forwarded\s)(\S*)(?=,),\squeries answered locally\s(\S*)(?=$)', log_list[i])
132 v['queries_forwarded'] = m.group(1)
133 v['queries_answered_locally'] = m.group(2)
134 break;
135 i = i+2
136 v['server_stats'] = []
137 while i < len(log_list):
138 m = re.search('(?<=:\sserver\s)(\S*)(?=#)#\d*:\squeries sent\s(\S*)(?=,),\sretried or failed\s(\S*)(?=$)', log_list[i])
139 if m == None:
140 i = i+1
141 continue
142 dns_server = {}
143 dns_server['id'] = m.group(1)
144 dns_server['queries_sent'] = m.group(2)
145 dns_server['queries_failed'] = m.group(3)
146 v['server_stats'].append(dns_server)
147 i = i+1
148 return dockercontainers
149
150
151keystone_tenant_id='3a397e70f64e4e40b69b6266c634d9d0'
152keystone_user_id='1e3ce043029547f1a61c1996d1a531a2'
153rabbit_user='openstack'
154rabbit_password='80608318c273f348a7c3'
155rabbit_host='10.11.10.1'
156vcpeservice_rabbit_exchange='vcpeservice'
157cpe_publisher_id='vcpe_publisher'
158
159producer = None
160
161def setup_rabbit_mq_channel():
162 global producer
163 global rabbit_user, rabbit_password, rabbit_host, vcpeservice_rabbit_exchange,cpe_publisher_id
164 vcpeservice_exchange = Exchange(vcpeservice_rabbit_exchange, "topic", durable=False)
165 # connections/channels
166 connection = BrokerConnection(rabbit_host, rabbit_user, rabbit_password)
167 logger.info('Connection to RabbitMQ server successful')
168 channel = connection.channel()
169 # produce
170 producer = Producer(channel, exchange=vcpeservice_exchange, routing_key='notifications.info')
171 p = subprocess.Popen('hostname', shell=True, stdout=subprocess.PIPE)
172 (hostname, error) = p.communicate()
173 cpe_publisher_id = cpe_publisher_id + '_on_' + hostname
174 logger.info('cpe_publisher_id=%s',cpe_publisher_id)
175
176def publish_cpe_stats():
177 global producer
178 global keystone_tenant_id, keystone_user_id, cpe_publisher_id
179
180 logger.debug('publish_cpe_stats invoked')
181
182 dockercontainers = get_all_docker_containers()
183 cpe_container_compute_stats = extract_compute_stats_from_all_vcpes(dockercontainers)
184 cpe_container_dns_stats = extract_dns_stats_from_all_vcpes(dockercontainers)
185
186 for k,v in cpe_container_dns_stats.iteritems():
187 msg = {'event_type': 'vcpe',
188 'message_id':six.text_type(uuid.uuid4()),
189 'publisher_id': cpe_publisher_id,
190 'timestamp':datetime.datetime.now().isoformat(),
191 'priority':'INFO',
192 'payload': {'vcpe_id':k,
193 'user_id':keystone_user_id,
194 'tenant_id':keystone_tenant_id
195 }
196 }
197 producer.publish(msg)
198 logger.debug('Publishing vcpe event: %s', msg)
199
200 compute_payload = {}
201 if 'cpu_util' in v:
202 compute_payload['cpu_util']= v['cpu_util']
203 if 'memory' in v:
204 compute_payload['memory']= v['memory']
205 if 'memory_usage' in v:
206 compute_payload['memory_usage']= v['memory_usage']
207 if ('network_stats' in v) and (v['network_stats']):
208 compute_payload['network_stats']= v['network_stats']
209 if compute_payload:
210 compute_payload['vcpe_id'] = k
211 compute_payload['user_id'] = keystone_user_id
212 compute_payload['tenant_id'] = keystone_tenant_id
213 msg = {'event_type': 'vcpe.compute.stats',
214 'message_id':six.text_type(uuid.uuid4()),
215 'publisher_id': cpe_publisher_id,
216 'timestamp':datetime.datetime.now().isoformat(),
217 'priority':'INFO',
218 'payload': compute_payload
219 }
220 producer.publish(msg)
221 logger.debug('Publishing vcpe.dns.cache.size event: %s', msg)
222
223 if 'cache_size' in v:
224 msg = {'event_type': 'vcpe.dns.cache.size',
225 'message_id':six.text_type(uuid.uuid4()),
226 'publisher_id': cpe_publisher_id,
227 'timestamp':datetime.datetime.now().isoformat(),
228 'priority':'INFO',
229 'payload': {'vcpe_id':k,
230 'user_id':keystone_user_id,
231 'tenant_id':keystone_tenant_id,
232 'cache_size':v['cache_size']
233 }
234 }
235 producer.publish(msg)
236 logger.debug('Publishing vcpe.dns.cache.size event: %s', msg)
237
238 if 'total_inserted_entries' in v:
239 msg = {'event_type': 'vcpe.dns.total_inserted_entries',
240 'message_id':six.text_type(uuid.uuid4()),
241 'publisher_id': cpe_publisher_id,
242 'timestamp':datetime.datetime.now().isoformat(),
243 'priority':'INFO',
244 'payload': {'vcpe_id':k,
245 'user_id':keystone_user_id,
246 'tenant_id':keystone_tenant_id,
247 'total_inserted_entries':v['total_inserted_entries']
248 }
249 }
250 producer.publish(msg)
251 logger.debug('Publishing vcpe.dns.total_inserted_entries event: %s', msg)
252
253 if 'replaced_unexpired_entries' in v:
254 msg = {'event_type': 'vcpe.dns.replaced_unexpired_entries',
255 'message_id':six.text_type(uuid.uuid4()),
256 'publisher_id': cpe_publisher_id,
257 'timestamp':datetime.datetime.now().isoformat(),
258 'priority':'INFO',
259 'payload': {'vcpe_id':k,
260 'user_id':keystone_user_id,
261 'tenant_id':keystone_tenant_id,
262 'replaced_unexpired_entries':v['replaced_unexpired_entries']
263 }
264 }
265 producer.publish(msg)
266 logger.debug('Publishing vcpe.dns.replaced_unexpired_entries event: %s', msg)
267
268 if 'queries_forwarded' in v:
269 msg = {'event_type': 'vcpe.dns.queries_forwarded',
270 'message_id':six.text_type(uuid.uuid4()),
271 'publisher_id': cpe_publisher_id,
272 'timestamp':datetime.datetime.now().isoformat(),
273 'priority':'INFO',
274 'payload': {'vcpe_id':k,
275 'user_id':keystone_user_id,
276 'tenant_id':keystone_tenant_id,
277 'queries_forwarded':v['queries_forwarded']
278 }
279 }
280 producer.publish(msg)
281 logger.debug('Publishing vcpe.dns.queries_forwarded event: %s', msg)
282
283 if 'queries_answered_locally' in v:
284 msg = {'event_type': 'vcpe.dns.queries_answered_locally',
285 'message_id':six.text_type(uuid.uuid4()),
286 'publisher_id': cpe_publisher_id,
287 'timestamp':datetime.datetime.now().isoformat(),
288 'priority':'INFO',
289 'payload': {'vcpe_id':k,
290 'user_id':keystone_user_id,
291 'tenant_id':keystone_tenant_id,
292 'queries_answered_locally':v['queries_answered_locally']
293 }
294 }
295 producer.publish(msg)
296 logger.debug('Publishing vcpe.dns.queries_answered_locally event: %s', msg)
297
298 if 'server_stats' in v:
299 for server in v['server_stats']:
300 msg = {'event_type': 'vcpe.dns.server.queries_sent',
301 'message_id':six.text_type(uuid.uuid4()),
302 'publisher_id': cpe_publisher_id,
303 'timestamp':datetime.datetime.now().isoformat(),
304 'priority':'INFO',
305 'payload': {'vcpe_id':k,
306 'user_id':keystone_user_id,
307 'tenant_id':keystone_tenant_id,
308 'upstream_server':server['id'],
309 'queries_sent':server['queries_sent']
310 }
311 }
312 producer.publish(msg)
313 logger.debug('Publishing vcpe.dns.server.queries_sent event: %s', msg)
314
315 msg = {'event_type': 'vcpe.dns.server.queries_failed',
316 'message_id':six.text_type(uuid.uuid4()),
317 'publisher_id': cpe_publisher_id,
318 'timestamp':datetime.datetime.now().isoformat(),
319 'priority':'INFO',
320 'payload': {'vcpe_id':k,
321 'user_id':keystone_user_id,
322 'tenant_id':keystone_tenant_id,
323 'upstream_server':server['id'],
324 'queries_failed':server['queries_failed']
325 }
326 }
327 producer.publish(msg)
328 logger.debug('Publishing vcpe.dns.server.queries_failed event: %s', msg)
329
330def periodic_publish():
331 publish_cpe_stats()
332 #Publish every 5minutes
333 threading.Timer(300, periodic_publish).start()
334
335def main(argv):
336 global keystone_tenant_id, keystone_user_id, rabbit_user, rabbit_password, rabbit_host, vcpeservice_rabbit_exchange
337 try:
338 opts, args = getopt.getopt(argv,"",["keystone_tenant_id=","keystone_user_id=","rabbit_host=","rabbit_user=","rabbit_password=","vcpeservice_rabbit_exchange="])
339 except getopt.GetoptError:
340 print 'vcpe_stats_notifier.py keystone_tenant_id=<keystone_tenant_id> keystone_user_id=<keystone_user_id> rabbit_host=<IP addr> rabbit_user=<user> rabbit_password=<password> vcpeservice_rabbit_exchange=<exchange name>'
341 sys.exit(2)
342 for opt, arg in opts:
343 if opt in ("--keystone_tenant_id"):
344 keystone_tenant_id = arg
345 elif opt in ("--keystone_user_id"):
346 keystone_user_id = arg
347 elif opt in ("--rabbit_user"):
348 rabbit_user = arg
349 elif opt in ("--rabbit_password"):
350 rabbit_password = arg
351 elif opt in ("--rabbit_host"):
352 rabbit_host = arg
353 elif opt in ("--vcpeservice_rabbit_exchange"):
354 vcpeservice_rabbit_exchange = arg
355 logger.info("vcpe_stats_notifier args:keystone_tenant_id=%s keystone_user_id=%s rabbit_user=%s rabbit_host=%s vcpeservice_rabbit_exchange=%s",keystone_tenant_id,keystone_user_id,rabbit_user,rabbit_host,vcpeservice_rabbit_exchange)
356 setup_rabbit_mq_channel()
357 periodic_publish()
358
359if __name__ == "__main__":
360 main(sys.argv[1:])