blob: e7ebcf46b12cd04ac8b345994ae3926f03a764ba [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001#!/usr/bin/python
2import socket,thread
3import sys
4import msgpack
5import fnmatch
6import operator
7import logging
8import logging.handlers
9import logging.config
10import ConfigParser
11import json
12from oslo_utils import units
13from oslo_utils import netutils
14from pubrecords import *
15import kafka
16import kafka_broker
17
18from flask import request, Request, jsonify
19from flask import Flask
20from flask import make_response
21app = Flask(__name__)
22
23COMPARATORS = {
24 'gt': operator.gt,
25 'lt': operator.lt,
26 'ge': operator.ge,
27 'le': operator.le,
28 'eq': operator.eq,
29 'ne': operator.ne,
30}
31
32LEVELS = {'DEBUG': logging.DEBUG,
33 'INFO': logging.INFO,
34 'WARNING': logging.WARNING,
35 'ERROR': logging.ERROR,
36 'CRITICAL': logging.CRITICAL}
37
38_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
39
40''' Stores all the subscribed meter's list '''
41meter_list = []
42''' Stores meter to app-id mapping '''
43meter_dict = {}
44
45@app.route('/subscribe',methods=['POST','SUB'])
46def subscribe():
47 try :
48 app_id = request.json['app_id']
49 target = request.json['target']
50 sub_info = request.json['sub_info']
51
52 try :
53 validate_sub_info(sub_info)
54 except Exception as e:
55 logging.error("* %s",e.__str__())
56 return e.__str__()
57
58 ''' Flag to Update pipeling cfg file '''
59 config = ConfigParser.ConfigParser()
60 config.read('pub_sub.conf')
61 if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
62 update_pipeline_conf(sub_info,target,app_id,"ADD")
63 else:
64 logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
65
66 if not 'query' in request.json.keys():
67 logging.info("query request is not provided by user")
68 query = None
69 else:
70 query = request.json['query']
71 for i in range(len(query)):
72 if not 'field' in query[i].keys():
73 err_str = "Query field"
74 raise Exception (err_str)
75 if not 'op' in query[i].keys():
76 err_str = "Query op"
77 raise Exception (err_str)
78 if not 'value' in query[i].keys():
79 err_str = "Query value"
80 raise Exception (err_str)
81 except Exception as e:
82 err_str = "KeyError: Parsing subscription request " + e.__str__() + "\n"
83 logging.error("* KeyError: Parsing subscription request :%s",e.__str__())
84 return err_str
85
86 parse_target=netutils.urlsplit(target)
87 if not parse_target.netloc:
88 err_str = "Error:Invalid target format"
89 logging.error("* Invalid target format")
90 return err_str
91
92 status = ""
93 if parse_target.scheme == "udp" or parse_target.scheme == "kafka":
94 host,port=netutils.parse_host_port(parse_target.netloc)
95 scheme = parse_target.scheme
96 app_ip = host
97 app_port = port
98
99 if host == None or port == None :
100 err_str = "* Error: Invalid IP Address format"
101 logging.error("* Invalid IP Address format")
102 return err_str
103
104 subscription_info = sub_info
105 sub_info_filter = query
106 logging.info("Creating subscription for app:%s for meters:%s with filters:%s and target:%s",app_id, subscription_info, sub_info_filter, target)
107 subscrip_obj=subinfo(scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target)
108 status = subscrip_obj.update_subinfo()
109 subinfo.print_subinfo()
110
111 if parse_target.scheme == "file" :
112 pass
113 return status
114
115@app.route('/unsubscribe',methods=['POST','UNSUB'])
116def unsubscribe():
117 try :
118 app_id = request.json['app_id']
119 sub_info,target = subinfo.get_subinfo(app_id)
120 if sub_info is None or target is None:
121 err_str = "No subscription exists with app id: " + app_id + "\n"
122 logging.error("* No subscription exists with app id:%s ",app_id)
123 return err_str
124 else:
125 ''' Flag to Update pipeling cfg file '''
126 config = ConfigParser.ConfigParser()
127 config.read('pub_sub.conf')
128 if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
129 update_pipeline_conf(sub_info,target,app_id,"DEL")
130 else:
131 logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
132 #update_pipeline_conf(sub_info,target,"DEL")
133 subinfo.delete_subinfo(app_id)
134 except Exception as e:
135 logging.error("* %s",e.__str__())
136 return e.__str__()
137 return "UnSubscrition is sucessful! \n"
138
139@app.errorhandler(404)
140def not_found(error):
141 return make_response(jsonify({'error': 'Not found'}), 404)
142
143def print_subscribed_meter_list():
144 logging.debug("-------------------------------------------------")
145 #print (meter_list)
146 logging.debug("meter_list:%s",meter_list)
147 logging.debug("meter_dict:%s",meter_dict)
148 #print (meter_dict)
149 logging.debug("-------------------------------------------------")
150
151def validate_sub_info(sub_info):
152 if type(sub_info) is not list:
153 sub_info = [sub_info]
154 for meter in sub_info:
155 if meter.startswith("*") or meter.startswith("!"):
156 err_str = "Given meter is not supported:" + meter + "\n"
157 logging.error("* Given meter is not supported:%s",meter)
158 raise Exception (err_str)
159
160def update_meter_dict(meterinfo,app_id):
161 try :
162 if type(meterinfo) == list:
163 for meter in meterinfo:
164 if meter_dict.get(meter) is None:
165 meter_dict[meter] = [app_id]
166 elif app_id not in meter_dict.get(meter):
167 meter_dict.get(meter).append(app_id)
168 else:
169 if meter_dict.get(meterinfo) is None:
170 meter_dict[meterinfo] = [app_id]
171 elif app_id not in meter_dict.get(meterinfo):
172 meter_dict.get(meterinfo).append(app_id)
173 except Exception as e:
174 logging.error("* %s",e.__str__())
175
176def check_send_msg_confmgmt_del(sub_info,app_id):
177 temp_sub_info=[]
178 temm_meter_info = None
179 if len(meter_list) == 0:
180 #print("No subscription exists")
181 logging.info("No subscription exists")
182 return False,None
183 if type(sub_info) == list:
184 for meterinfo in sub_info:
185 if meter_dict.get(meterinfo) is None:
186 logging.warning("%s meter doesn't exist in the meter dict",meterinfo)
187 continue
188 if app_id in meter_dict.get(meterinfo):
189 if len(meter_dict.get(meterinfo)) == 1:
190 #print "Only single app is subscribing this meter"
191 logging.info("Only single app is subscribing this meter")
192 del meter_dict[meterinfo]
193 temp_sub_info.append(meterinfo)
194 if meterinfo in meter_list:
195 meter_list.remove(meterinfo)
196 else:
197 meter_dict.get(meterinfo).remove(app_id)
198 return True,temp_sub_info
199 else :
200 if meter_dict.get(sub_info) is None:
201 logging.warning("%s meter doesn't exist in the meter dict",sub_info)
202 return False,None
203 if app_id in meter_dict.get(sub_info):
204 if len(meter_dict.get(sub_info)) == 1:
205 #print "Only single app is subscribing this meter"
206 logging.info("Only single app is subscribing this meter")
207 del meter_dict[sub_info]
208 if sub_info in meter_list:
209 meter_list.remove(sub_info)
210 return True,sub_info
211 else:
212 meter_dict.get(sub_info).remove(app_id)
213 return False,None
214
215def check_send_msg_confmgmt_add(sub_info,app_id):
216 temp_sub_info=[]
217 update_meter_dict(sub_info,app_id)
218 #import pdb;pdb.set_trace()
219 if len(meter_list) == 0:
220 logging.info("No subinfo exits")
221 if type(sub_info) == list:
222 for j in sub_info:
223 meter_list.append(j)
224 return True,sub_info
225 else :
226 meter_list.append(sub_info)
227 return True,sub_info
228 if type(sub_info) == list:
229 for j in sub_info:
230 if j in meter_list:
231 #print ("meter already exists",j)
232 logging.info("meter already exist:%s",j)
233 continue
234 else :
235 temp_sub_info.append(j)
236 meter_list.append(j)
237 if temp_sub_info is not None:
238 return True,temp_sub_info
239 else :
240 return False,None
241 else :
242 if sub_info not in meter_list:
243 meter_list.append(sub_info)
244 #print ("subscription for meter doesn't exist",sub_info)
245 logging.warning("subscription for meter doesn't exist:%s",sub_info)
246 return True,sub_info
247 else :
248 #print ("subscription already exist for ",sub_info)
249 logging.info("subscription already exist for:%s ",sub_info)
250 return False,sub_info
251
252def update_pipeline_conf(sub_info,target,app_id,flag):
253 import pika
254
255 logging.debug("* sub_info:%s",sub_info)
256 logging.debug("* target:%s",target)
257
258 #msg={"sub_info":sub_info,"target":target,"action":flag}
259
260 #json_msg=json.dumps(msg)
261 #msg="image"
262 meter_sub_info = None
263 if flag == "ADD":
264 status,meter_sub_info=check_send_msg_confmgmt_add(sub_info,app_id)
265 if status == False or meter_sub_info == None or meter_sub_info == []:
266 logging.warning("%s is already subscribed with the conf mgmt")
267 return
268 elif flag == "DEL":
269 status,meter_sub_info=check_send_msg_confmgmt_del(sub_info,app_id)
270 if status == False or meter_sub_info == None or meter_sub_info == []:
271 logging.warning("%s is already unsubscribed with the conf mgmt")
272 return
273 try :
274 config = ConfigParser.ConfigParser()
275 config.read('pub_sub.conf')
276 rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
277 rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
278 rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
279 rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
280
281 ceilometer_client_info = config.get('CLIENT','target')
282 #msg={"sub_info":sub_info,"target":ceilometer_client_info,"action":flag}
283 msg={"sub_info":meter_sub_info,"target":ceilometer_client_info,"action":flag}
284 #print msg
285 json_msg=json.dumps(msg)
286
287 credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
288 parameters = pika.ConnectionParameters(rabbitmq_host,
289 rabbitmq_port,
290 '/',
291 credentials)
292 connection = pika.BlockingConnection(parameters)
293 properties = pika.BasicProperties(content_type = "application/json")
294 channel = connection.channel()
295 channel.exchange_declare(exchange='pubsub',
296 type='fanout')
297
298 channel.basic_publish(exchange='pubsub',
299 routing_key='',
300 properties = properties,
301 body=json_msg)
302 logging.debug(" [x] %s Sent",msg)
303 logging.info(" [x] %s Sent",msg)
304 connection.close()
305 except Exception as e:
306 logging.error("Error:%s",e.__str__())
307
308def read_notification_from_ceilometer(host,port):
309 UDP_IP = host
310 UDP_PORT = port
311
312 logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
313 udp = socket.socket(socket.AF_INET, # Internet
314 socket.SOCK_DGRAM) # UDP
315 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
316
317 udp.bind((UDP_IP, UDP_PORT))
318
319 while True:
320 #print thread.get_ident()
321 #logging.debug("thread.get_ident():%s", thread.get_ident())
322 data, source = udp.recvfrom(64 * units.Ki)
323 sample = msgpack.loads(data, encoding='utf-8')
324 #logging.debug("* -------------------------------------------------------")
325 logging.debug("%s",sample)
326 #print(sample)
327 for obj in sub_info:
328 msg_list = []
329 if obj.scheme == "udp" :
330 if type(obj.subscription_info) is list:
331 for info in obj.subscription_info:
332 msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
333 else :
334 msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
335 try:
336 if reduce(operator.or_, msg_list):
337 host = obj.ipaddress
338 port = int(obj.portno)
339 l=[]
340 #logging.debug("* -------------------------------------------------------")
341 if obj.sub_info_filter is None:
342 try:
343 logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
344 udp.sendto(data,(host,port))
345 except Exception as e:
346 logging.error ("Unable to send sample over UDP for %s and %s,%s",host,port,e.__str__())
347 ret_str = ("Unable to send sample over UDP for %s and %s,%s")%(host,port,e.__str__())
348 continue
349 for i in range(len(obj.sub_info_filter)):
350 if obj.sub_info_filter[i]['op'] in COMPARATORS:
351 op = COMPARATORS[obj.sub_info_filter[i]['op']]
352 logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
353 logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
354 l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
355 logging.info("* Logical and of Query %s",l)
356 else:
357 logging.deubg("* Not a valid operator ignoring app_id:%s",obj.app_id)
358 l.append(False)
359 logging.info("* Logical and of Query %s",l)
360 if reduce(operator.and_, l):
361 try:
362 logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
363 udp.sendto(data,(host,port))
364 except Exception:
365 logging.error ("Unable to send sample over UDP for %s and %s ",host,port)
366 ret_str = ("Unable to send sample over UDP for %s and %s ")%(host,port)
367 else :
368 logging.warning("* No Notification found with the given subscription")
369 else :
370 logging.warning("* No valid subscrition found for %s",obj.app_id)
371 except Exception as e:
372 logging.error("Key_Error:%s ",e.__str__())
373 ret_str = ("Key_Error:%s \n")% e.__str__()
374
375def read_notification_from_ceilometer_over_udp(host,port):
376 UDP_IP = host
377 UDP_PORT = port
378
379 logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
380 udp = socket.socket(socket.AF_INET, # Internet
381 socket.SOCK_DGRAM) # UDP
382 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
383
384 udp.bind((UDP_IP, UDP_PORT))
385
386 while True:
387 #print thread.get_ident()
388 #logging.debug("thread.get_ident():%s", thread.get_ident())
389 data, source = udp.recvfrom(64 * units.Ki)
390 sample = msgpack.loads(data, encoding='utf-8')
391 status = process_ceilometer_message(sample,data)
392
393def read_notification_from_ceilometer_over_kafka(parse_target):
394 logging.info("Kafka target:%s",parse_target)
395 try :
396 kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
397 for message in kafka_publisher.kafka_consumer:
398 #print message.value
399 #logging.debug("%s",message.value)
400 #logging.info("%s",message.value)
401 status = process_ceilometer_message(json.loads(message.value),message.value)
402 #print status
403 except Exception as e:
404 logging.error("Error in Kafka setup:%s ",e.__str__())
405
406def process_ceilometer_message(sample,data):
407 logging.debug("%s",sample)
408 #logging.info("%s",sample)
409 if len(sub_info) < 1:
410 #print "No subscription exists"
411 return
412 for obj in sub_info:
413 #import pdb;pdb.set_trace()
414 msg_list = []
415 if type(obj.subscription_info) is list:
416 for info in obj.subscription_info:
417 msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
418 else :
419 msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
420 try:
421 if reduce(operator.or_, msg_list):
422 '''
423 kafka_publisher = None
424 if obj.scheme == "kafka" :
425 parse_target=netutils.urlsplit(obj.target)
426 try :
427 kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
428 except Exception as e:
429 logging.error("* Error in connecting kafka broker:%s",e.__str__())
430 # return False
431 continue
432 '''
433 host = obj.ipaddress
434 port = int(obj.portno)
435 l=[]
436 logging.debug("* -------------------------------------------------------")
437 if obj.sub_info_filter is None:
438 try:
439 if obj.scheme == "udp" :
440 #logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
441 #logging.info("* Sending data without query over UDP for host:%s and port:%s",host,port)
442 #udp = socket.socket(socket.AF_INET, # Internet
443 # socket.SOCK_DGRAM) # UDP
444 #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
445 obj.udp.sendto(data,(host,port))
446 #return True
447 continue
448 elif obj.scheme == "kafka" :
449 #logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
450 #logging.info("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
451 obj.kafka_publisher._send(sample)
452 #return True
453 continue
454 except Exception as e:
455 logging.error ("Unable to send sample over UDP/kafka for %s and %s,%s",host,port,e.__str__())
456 ret_str = ("Unable to send sample over UDP for %s and %s,%s ")%(host,port,e.__str__())
457 #return False
458 continue
459 for i in range(len(obj.sub_info_filter)):
460 if obj.sub_info_filter[i]['op'] in COMPARATORS:
461 op = COMPARATORS[obj.sub_info_filter[i]['op']]
462 #logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
463 #logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
464 l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
465 #logging.info("* Logical and of Query %s",l)
466 else:
467 logging.info("* Not a valid operator ignoring app_id:%s",obj.app_id)
468 l.append(False)
469 #logging.info("* Logical and of Query %s",l)
470 if reduce(operator.or_, l):
471 try:
472 if obj.scheme == "udp" :
473 logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
474 #udp = socket.socket(socket.AF_INET, # Internet
475 # socket.SOCK_DGRAM) # UDP
476 #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
477 obj.udp.sendto(data,(host,port))
478 #return True
479 continue
480 elif obj.scheme == "kafka" :
481 logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,obj.kafka_publisher.topic)
482 obj.kafka_publisher._send(sample)
483 #return True
484 continue
485 except Exception:
486 logging.error ("Unable to send sample over UDP/Kafka for %s and %s ",host,port)
487 ret_str = ("Unable to send sample over UDP/Kafka for %s and %s ")%(host,port)
488 #return False
489 continue
490 else :
491 logging.debug("* No Notification found with the given subscription")
492 continue
493 else :
494 logging.debug("* No matching subscrition found for %s",sample['counter_name'])
495 continue
496 except Exception as e:
497 logging.error("Key_Error:%s ",e.__str__())
498 ret_str = ("Key_Error:%s \n")%e.__str__()
499 #return False
500 continue
501
502def initialize(ceilometer_client):
503 logging.debug("Ceilometer client info:%s",ceilometer_client)
504 parse_target=netutils.urlsplit(ceilometer_client)
505 if not parse_target.netloc:
506 err_str = "Error:Invalid client format"
507 logging.error("* Invalid client format")
508 return err_str
509 if parse_target.scheme == "udp" :
510 host,port=netutils.parse_host_port(parse_target.netloc)
511 scheme = parse_target.scheme
512 app_ip = host
513 app_port = port
514 if host == None or port == None :
515 err_str = "* Error: Invalid IP Address format"
516 logging.error("* Invalid IP Address format")
517 return err_str
518 thread.start_new(read_notification_from_ceilometer_over_udp,(host,port,))
519 elif parse_target.scheme == "kafka" :
520 thread.start_new(read_notification_from_ceilometer_over_kafka,(parse_target,))
521
522
523if __name__ == "__main__":
524
525 try:
526 config = ConfigParser.ConfigParser()
527 config.read('pub_sub.conf')
528 webserver_host = config.get('WEB_SERVER','webserver_host')
529 webserver_port = int (config.get('WEB_SERVER','webserver_port'))
530 # client_host = config.get('CLIENT','client_host')
531 # client_port = int (config.get('CLIENT','client_port'))
532 ceilometer_client_info = config.get('CLIENT','target')
533 '''
534 log_level = config.get('LOGGING','level')
535 log_file = config.get('LOGGING','filename')
536 maxbytes = int (config.get('LOGGING','maxbytes'))
537 backupcount = int (config.get('LOGGING','backupcount'))
538 level = LEVELS.get(log_level, logging.NOTSET)
539 '''
540 logging.config.fileConfig('pub_sub.conf', disable_existing_loggers=False)
541 '''
542 logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
543 datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
544
545 # create rotating file handler
546
547 rfh = logging.handlers.RotatingFileHandler(
548 log_file, encoding='utf8', maxBytes=maxbytes,
549 backupCount=backupcount,delay=0)
550 logging.getLogger().addHandler(rfh)
551 '''
552
553 except Exception as e:
554 print("* Error in config file:",e.__str__())
555 #logging.error("* Error in confing file:%s",e.__str__())
556 else:
557 #initialize(client_host,client_port)
558 initialize(ceilometer_client_info)
559 app.run(host=webserver_host,port=webserver_port,debug=False)