blob: 19d7bf17ea0ee4e7d07b388e565f78d9819a03a8 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
rdudyalab086cf32016-08-11 00:07:45 -040017#!/usr/bin/python
18import socket,thread
19import sys
20import msgpack
21import fnmatch
22import operator
23import logging
24import logging.handlers
25import logging.config
26import ConfigParser
27import json
28from oslo_utils import units
29from oslo_utils import netutils
30from pubrecords import *
31import kafka
32import kafka_broker
33
34from flask import request, Request, jsonify
35from flask import Flask
36from flask import make_response
37app = Flask(__name__)
38
39COMPARATORS = {
40 'gt': operator.gt,
41 'lt': operator.lt,
42 'ge': operator.ge,
43 'le': operator.le,
44 'eq': operator.eq,
45 'ne': operator.ne,
46}
47
48LEVELS = {'DEBUG': logging.DEBUG,
49 'INFO': logging.INFO,
50 'WARNING': logging.WARNING,
51 'ERROR': logging.ERROR,
52 'CRITICAL': logging.CRITICAL}
53
54_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
55
56''' Stores all the subscribed meter's list '''
57meter_list = []
58''' Stores meter to app-id mapping '''
59meter_dict = {}
60
61@app.route('/subscribe',methods=['POST','SUB'])
62def subscribe():
63 try :
64 app_id = request.json['app_id']
65 target = request.json['target']
66 sub_info = request.json['sub_info']
67
68 try :
69 validate_sub_info(sub_info)
70 except Exception as e:
71 logging.error("* %s",e.__str__())
72 return e.__str__()
73
74 ''' Flag to Update pipeling cfg file '''
75 config = ConfigParser.ConfigParser()
76 config.read('pub_sub.conf')
77 if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
78 update_pipeline_conf(sub_info,target,app_id,"ADD")
79 else:
80 logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
81
82 if not 'query' in request.json.keys():
83 logging.info("query request is not provided by user")
84 query = None
85 else:
86 query = request.json['query']
87 for i in range(len(query)):
88 if not 'field' in query[i].keys():
89 err_str = "Query field"
90 raise Exception (err_str)
91 if not 'op' in query[i].keys():
92 err_str = "Query op"
93 raise Exception (err_str)
94 if not 'value' in query[i].keys():
95 err_str = "Query value"
96 raise Exception (err_str)
97 except Exception as e:
98 err_str = "KeyError: Parsing subscription request " + e.__str__() + "\n"
99 logging.error("* KeyError: Parsing subscription request :%s",e.__str__())
100 return err_str
101
102 parse_target=netutils.urlsplit(target)
103 if not parse_target.netloc:
104 err_str = "Error:Invalid target format"
105 logging.error("* Invalid target format")
106 return err_str
107
108 status = ""
109 if parse_target.scheme == "udp" or parse_target.scheme == "kafka":
110 host,port=netutils.parse_host_port(parse_target.netloc)
111 scheme = parse_target.scheme
112 app_ip = host
113 app_port = port
114
115 if host == None or port == None :
116 err_str = "* Error: Invalid IP Address format"
117 logging.error("* Invalid IP Address format")
118 return err_str
119
120 subscription_info = sub_info
121 sub_info_filter = query
122 logging.info("Creating subscription for app:%s for meters:%s with filters:%s and target:%s",app_id, subscription_info, sub_info_filter, target)
123 subscrip_obj=subinfo(scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target)
124 status = subscrip_obj.update_subinfo()
125 subinfo.print_subinfo()
126
127 if parse_target.scheme == "file" :
128 pass
129 return status
130
131@app.route('/unsubscribe',methods=['POST','UNSUB'])
132def unsubscribe():
133 try :
134 app_id = request.json['app_id']
135 sub_info,target = subinfo.get_subinfo(app_id)
136 if sub_info is None or target is None:
137 err_str = "No subscription exists with app id: " + app_id + "\n"
138 logging.error("* No subscription exists with app id:%s ",app_id)
139 return err_str
140 else:
141 ''' Flag to Update pipeling cfg file '''
142 config = ConfigParser.ConfigParser()
143 config.read('pub_sub.conf')
144 if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
145 update_pipeline_conf(sub_info,target,app_id,"DEL")
146 else:
147 logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
148 #update_pipeline_conf(sub_info,target,"DEL")
149 subinfo.delete_subinfo(app_id)
150 except Exception as e:
151 logging.error("* %s",e.__str__())
152 return e.__str__()
153 return "UnSubscrition is sucessful! \n"
154
155@app.errorhandler(404)
156def not_found(error):
157 return make_response(jsonify({'error': 'Not found'}), 404)
158
159def print_subscribed_meter_list():
160 logging.debug("-------------------------------------------------")
161 #print (meter_list)
162 logging.debug("meter_list:%s",meter_list)
163 logging.debug("meter_dict:%s",meter_dict)
164 #print (meter_dict)
165 logging.debug("-------------------------------------------------")
166
167def validate_sub_info(sub_info):
168 if type(sub_info) is not list:
169 sub_info = [sub_info]
170 for meter in sub_info:
171 if meter.startswith("*") or meter.startswith("!"):
172 err_str = "Given meter is not supported:" + meter + "\n"
173 logging.error("* Given meter is not supported:%s",meter)
174 raise Exception (err_str)
175
176def update_meter_dict(meterinfo,app_id):
177 try :
178 if type(meterinfo) == list:
179 for meter in meterinfo:
180 if meter_dict.get(meter) is None:
181 meter_dict[meter] = [app_id]
182 elif app_id not in meter_dict.get(meter):
183 meter_dict.get(meter).append(app_id)
184 else:
185 if meter_dict.get(meterinfo) is None:
186 meter_dict[meterinfo] = [app_id]
187 elif app_id not in meter_dict.get(meterinfo):
188 meter_dict.get(meterinfo).append(app_id)
189 except Exception as e:
190 logging.error("* %s",e.__str__())
191
192def check_send_msg_confmgmt_del(sub_info,app_id):
193 temp_sub_info=[]
194 temm_meter_info = None
195 if len(meter_list) == 0:
196 #print("No subscription exists")
197 logging.info("No subscription exists")
198 return False,None
199 if type(sub_info) == list:
200 for meterinfo in sub_info:
201 if meter_dict.get(meterinfo) is None:
202 logging.warning("%s meter doesn't exist in the meter dict",meterinfo)
203 continue
204 if app_id in meter_dict.get(meterinfo):
205 if len(meter_dict.get(meterinfo)) == 1:
206 #print "Only single app is subscribing this meter"
207 logging.info("Only single app is subscribing this meter")
208 del meter_dict[meterinfo]
209 temp_sub_info.append(meterinfo)
210 if meterinfo in meter_list:
211 meter_list.remove(meterinfo)
212 else:
213 meter_dict.get(meterinfo).remove(app_id)
214 return True,temp_sub_info
215 else :
216 if meter_dict.get(sub_info) is None:
217 logging.warning("%s meter doesn't exist in the meter dict",sub_info)
218 return False,None
219 if app_id in meter_dict.get(sub_info):
220 if len(meter_dict.get(sub_info)) == 1:
221 #print "Only single app is subscribing this meter"
222 logging.info("Only single app is subscribing this meter")
223 del meter_dict[sub_info]
224 if sub_info in meter_list:
225 meter_list.remove(sub_info)
226 return True,sub_info
227 else:
228 meter_dict.get(sub_info).remove(app_id)
229 return False,None
230
231def check_send_msg_confmgmt_add(sub_info,app_id):
232 temp_sub_info=[]
233 update_meter_dict(sub_info,app_id)
234 #import pdb;pdb.set_trace()
235 if len(meter_list) == 0:
236 logging.info("No subinfo exits")
237 if type(sub_info) == list:
238 for j in sub_info:
239 meter_list.append(j)
240 return True,sub_info
241 else :
242 meter_list.append(sub_info)
243 return True,sub_info
244 if type(sub_info) == list:
245 for j in sub_info:
246 if j in meter_list:
247 #print ("meter already exists",j)
248 logging.info("meter already exist:%s",j)
249 continue
250 else :
251 temp_sub_info.append(j)
252 meter_list.append(j)
253 if temp_sub_info is not None:
254 return True,temp_sub_info
255 else :
256 return False,None
257 else :
258 if sub_info not in meter_list:
259 meter_list.append(sub_info)
260 #print ("subscription for meter doesn't exist",sub_info)
261 logging.warning("subscription for meter doesn't exist:%s",sub_info)
262 return True,sub_info
263 else :
264 #print ("subscription already exist for ",sub_info)
265 logging.info("subscription already exist for:%s ",sub_info)
266 return False,sub_info
267
268def update_pipeline_conf(sub_info,target,app_id,flag):
269 import pika
270
271 logging.debug("* sub_info:%s",sub_info)
272 logging.debug("* target:%s",target)
273
274 #msg={"sub_info":sub_info,"target":target,"action":flag}
275
276 #json_msg=json.dumps(msg)
277 #msg="image"
278 meter_sub_info = None
279 if flag == "ADD":
280 status,meter_sub_info=check_send_msg_confmgmt_add(sub_info,app_id)
281 if status == False or meter_sub_info == None or meter_sub_info == []:
282 logging.warning("%s is already subscribed with the conf mgmt")
283 return
284 elif flag == "DEL":
285 status,meter_sub_info=check_send_msg_confmgmt_del(sub_info,app_id)
286 if status == False or meter_sub_info == None or meter_sub_info == []:
287 logging.warning("%s is already unsubscribed with the conf mgmt")
288 return
289 try :
290 config = ConfigParser.ConfigParser()
291 config.read('pub_sub.conf')
292 rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
293 rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
294 rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
295 rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
296
297 ceilometer_client_info = config.get('CLIENT','target')
298 #msg={"sub_info":sub_info,"target":ceilometer_client_info,"action":flag}
299 msg={"sub_info":meter_sub_info,"target":ceilometer_client_info,"action":flag}
300 #print msg
301 json_msg=json.dumps(msg)
302
303 credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
304 parameters = pika.ConnectionParameters(rabbitmq_host,
305 rabbitmq_port,
306 '/',
307 credentials)
308 connection = pika.BlockingConnection(parameters)
309 properties = pika.BasicProperties(content_type = "application/json")
310 channel = connection.channel()
311 channel.exchange_declare(exchange='pubsub',
312 type='fanout')
313
314 channel.basic_publish(exchange='pubsub',
315 routing_key='',
316 properties = properties,
317 body=json_msg)
318 logging.debug(" [x] %s Sent",msg)
319 logging.info(" [x] %s Sent",msg)
320 connection.close()
321 except Exception as e:
322 logging.error("Error:%s",e.__str__())
323
324def read_notification_from_ceilometer(host,port):
325 UDP_IP = host
326 UDP_PORT = port
327
328 logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
329 udp = socket.socket(socket.AF_INET, # Internet
330 socket.SOCK_DGRAM) # UDP
331 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
332
333 udp.bind((UDP_IP, UDP_PORT))
334
335 while True:
336 #print thread.get_ident()
337 #logging.debug("thread.get_ident():%s", thread.get_ident())
338 data, source = udp.recvfrom(64 * units.Ki)
339 sample = msgpack.loads(data, encoding='utf-8')
340 #logging.debug("* -------------------------------------------------------")
341 logging.debug("%s",sample)
342 #print(sample)
343 for obj in sub_info:
344 msg_list = []
345 if obj.scheme == "udp" :
346 if type(obj.subscription_info) is list:
347 for info in obj.subscription_info:
348 msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
349 else :
350 msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
351 try:
352 if reduce(operator.or_, msg_list):
353 host = obj.ipaddress
354 port = int(obj.portno)
355 l=[]
356 #logging.debug("* -------------------------------------------------------")
357 if obj.sub_info_filter is None:
358 try:
359 logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
360 udp.sendto(data,(host,port))
361 except Exception as e:
362 logging.error ("Unable to send sample over UDP for %s and %s,%s",host,port,e.__str__())
363 ret_str = ("Unable to send sample over UDP for %s and %s,%s")%(host,port,e.__str__())
364 continue
365 for i in range(len(obj.sub_info_filter)):
366 if obj.sub_info_filter[i]['op'] in COMPARATORS:
367 op = COMPARATORS[obj.sub_info_filter[i]['op']]
368 logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
369 logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
370 l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
371 logging.info("* Logical and of Query %s",l)
372 else:
373 logging.deubg("* Not a valid operator ignoring app_id:%s",obj.app_id)
374 l.append(False)
375 logging.info("* Logical and of Query %s",l)
376 if reduce(operator.and_, l):
377 try:
378 logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
379 udp.sendto(data,(host,port))
380 except Exception:
381 logging.error ("Unable to send sample over UDP for %s and %s ",host,port)
382 ret_str = ("Unable to send sample over UDP for %s and %s ")%(host,port)
383 else :
384 logging.warning("* No Notification found with the given subscription")
385 else :
386 logging.warning("* No valid subscrition found for %s",obj.app_id)
387 except Exception as e:
388 logging.error("Key_Error:%s ",e.__str__())
389 ret_str = ("Key_Error:%s \n")% e.__str__()
390
391def read_notification_from_ceilometer_over_udp(host,port):
392 UDP_IP = host
393 UDP_PORT = port
394
395 logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
396 udp = socket.socket(socket.AF_INET, # Internet
397 socket.SOCK_DGRAM) # UDP
398 udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
399
400 udp.bind((UDP_IP, UDP_PORT))
401
402 while True:
403 #print thread.get_ident()
404 #logging.debug("thread.get_ident():%s", thread.get_ident())
405 data, source = udp.recvfrom(64 * units.Ki)
406 sample = msgpack.loads(data, encoding='utf-8')
407 status = process_ceilometer_message(sample,data)
408
409def read_notification_from_ceilometer_over_kafka(parse_target):
410 logging.info("Kafka target:%s",parse_target)
411 try :
412 kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
413 for message in kafka_publisher.kafka_consumer:
414 #print message.value
415 #logging.debug("%s",message.value)
416 #logging.info("%s",message.value)
417 status = process_ceilometer_message(json.loads(message.value),message.value)
418 #print status
419 except Exception as e:
420 logging.error("Error in Kafka setup:%s ",e.__str__())
421
422def process_ceilometer_message(sample,data):
423 logging.debug("%s",sample)
424 #logging.info("%s",sample)
425 if len(sub_info) < 1:
426 #print "No subscription exists"
427 return
428 for obj in sub_info:
429 #import pdb;pdb.set_trace()
430 msg_list = []
431 if type(obj.subscription_info) is list:
432 for info in obj.subscription_info:
433 msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
434 else :
435 msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
436 try:
437 if reduce(operator.or_, msg_list):
438 '''
439 kafka_publisher = None
440 if obj.scheme == "kafka" :
441 parse_target=netutils.urlsplit(obj.target)
442 try :
443 kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
444 except Exception as e:
445 logging.error("* Error in connecting kafka broker:%s",e.__str__())
446 # return False
447 continue
448 '''
449 host = obj.ipaddress
450 port = int(obj.portno)
451 l=[]
452 logging.debug("* -------------------------------------------------------")
453 if obj.sub_info_filter is None:
454 try:
455 if obj.scheme == "udp" :
456 #logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
457 #logging.info("* Sending data without query over UDP for host:%s and port:%s",host,port)
458 #udp = socket.socket(socket.AF_INET, # Internet
459 # socket.SOCK_DGRAM) # UDP
460 #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
461 obj.udp.sendto(data,(host,port))
462 #return True
463 continue
464 elif obj.scheme == "kafka" :
465 #logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
466 #logging.info("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
467 obj.kafka_publisher._send(sample)
468 #return True
469 continue
470 except Exception as e:
471 logging.error ("Unable to send sample over UDP/kafka for %s and %s,%s",host,port,e.__str__())
472 ret_str = ("Unable to send sample over UDP for %s and %s,%s ")%(host,port,e.__str__())
473 #return False
474 continue
475 for i in range(len(obj.sub_info_filter)):
476 if obj.sub_info_filter[i]['op'] in COMPARATORS:
477 op = COMPARATORS[obj.sub_info_filter[i]['op']]
478 #logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
479 #logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
480 l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
481 #logging.info("* Logical and of Query %s",l)
482 else:
483 logging.info("* Not a valid operator ignoring app_id:%s",obj.app_id)
484 l.append(False)
485 #logging.info("* Logical and of Query %s",l)
486 if reduce(operator.or_, l):
487 try:
488 if obj.scheme == "udp" :
489 logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
490 #udp = socket.socket(socket.AF_INET, # Internet
491 # socket.SOCK_DGRAM) # UDP
492 #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
493 obj.udp.sendto(data,(host,port))
494 #return True
495 continue
496 elif obj.scheme == "kafka" :
497 logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,obj.kafka_publisher.topic)
498 obj.kafka_publisher._send(sample)
499 #return True
500 continue
501 except Exception:
502 logging.error ("Unable to send sample over UDP/Kafka for %s and %s ",host,port)
503 ret_str = ("Unable to send sample over UDP/Kafka for %s and %s ")%(host,port)
504 #return False
505 continue
506 else :
507 logging.debug("* No Notification found with the given subscription")
508 continue
509 else :
510 logging.debug("* No matching subscrition found for %s",sample['counter_name'])
511 continue
512 except Exception as e:
513 logging.error("Key_Error:%s ",e.__str__())
514 ret_str = ("Key_Error:%s \n")%e.__str__()
515 #return False
516 continue
517
518def initialize(ceilometer_client):
519 logging.debug("Ceilometer client info:%s",ceilometer_client)
520 parse_target=netutils.urlsplit(ceilometer_client)
521 if not parse_target.netloc:
522 err_str = "Error:Invalid client format"
523 logging.error("* Invalid client format")
524 return err_str
525 if parse_target.scheme == "udp" :
526 host,port=netutils.parse_host_port(parse_target.netloc)
527 scheme = parse_target.scheme
528 app_ip = host
529 app_port = port
530 if host == None or port == None :
531 err_str = "* Error: Invalid IP Address format"
532 logging.error("* Invalid IP Address format")
533 return err_str
534 thread.start_new(read_notification_from_ceilometer_over_udp,(host,port,))
535 elif parse_target.scheme == "kafka" :
536 thread.start_new(read_notification_from_ceilometer_over_kafka,(parse_target,))
537
538
539if __name__ == "__main__":
540
541 try:
542 config = ConfigParser.ConfigParser()
543 config.read('pub_sub.conf')
544 webserver_host = config.get('WEB_SERVER','webserver_host')
545 webserver_port = int (config.get('WEB_SERVER','webserver_port'))
546 # client_host = config.get('CLIENT','client_host')
547 # client_port = int (config.get('CLIENT','client_port'))
548 ceilometer_client_info = config.get('CLIENT','target')
549 '''
550 log_level = config.get('LOGGING','level')
551 log_file = config.get('LOGGING','filename')
552 maxbytes = int (config.get('LOGGING','maxbytes'))
553 backupcount = int (config.get('LOGGING','backupcount'))
554 level = LEVELS.get(log_level, logging.NOTSET)
555 '''
556 logging.config.fileConfig('pub_sub.conf', disable_existing_loggers=False)
557 '''
558 logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
559 datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
560
561 # create rotating file handler
562
563 rfh = logging.handlers.RotatingFileHandler(
564 log_file, encoding='utf8', maxBytes=maxbytes,
565 backupCount=backupcount,delay=0)
566 logging.getLogger().addHandler(rfh)
567 '''
568
569 except Exception as e:
570 print("* Error in config file:",e.__str__())
571 #logging.error("* Error in confing file:%s",e.__str__())
572 else:
573 #initialize(client_host,client_port)
574 initialize(ceilometer_client_info)
575 app.run(host=webserver_host,port=webserver_port,debug=False)