Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py
new file mode 100644
index 0000000..e7ebcf4
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py
@@ -0,0 +1,559 @@
+#!/usr/bin/python
+import socket,thread
+import sys
+import msgpack
+import fnmatch
+import operator
+import logging
+import logging.handlers
+import logging.config
+import ConfigParser
+import json
+from oslo_utils import units
+from oslo_utils import netutils
+from pubrecords import *
+import kafka
+import kafka_broker
+
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+app = Flask(__name__)
+
+COMPARATORS = {
+ 'gt': operator.gt,
+ 'lt': operator.lt,
+ 'ge': operator.ge,
+ 'le': operator.le,
+ 'eq': operator.eq,
+ 'ne': operator.ne,
+}
+
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+''' Stores all the subscribed meter's list '''
+meter_list = []
+''' Stores meter to app-id mapping '''
+meter_dict = {}
+
+@app.route('/subscribe',methods=['POST','SUB'])
+def subscribe():
+ try :
+ app_id = request.json['app_id']
+ target = request.json['target']
+ sub_info = request.json['sub_info']
+
+ try :
+ validate_sub_info(sub_info)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+
+ ''' Flag to Update pipeling cfg file '''
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
+ update_pipeline_conf(sub_info,target,app_id,"ADD")
+ else:
+ logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
+
+ if not 'query' in request.json.keys():
+ logging.info("query request is not provided by user")
+ query = None
+ else:
+ query = request.json['query']
+ for i in range(len(query)):
+ if not 'field' in query[i].keys():
+ err_str = "Query field"
+ raise Exception (err_str)
+ if not 'op' in query[i].keys():
+ err_str = "Query op"
+ raise Exception (err_str)
+ if not 'value' in query[i].keys():
+ err_str = "Query value"
+ raise Exception (err_str)
+ except Exception as e:
+ err_str = "KeyError: Parsing subscription request " + e.__str__() + "\n"
+ logging.error("* KeyError: Parsing subscription request :%s",e.__str__())
+ return err_str
+
+ parse_target=netutils.urlsplit(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+
+ status = ""
+ if parse_target.scheme == "udp" or parse_target.scheme == "kafka":
+ host,port=netutils.parse_host_port(parse_target.netloc)
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+
+ subscription_info = sub_info
+ sub_info_filter = query
+ logging.info("Creating subscription for app:%s for meters:%s with filters:%s and target:%s",app_id, subscription_info, sub_info_filter, target)
+ subscrip_obj=subinfo(scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target)
+ status = subscrip_obj.update_subinfo()
+ subinfo.print_subinfo()
+
+ if parse_target.scheme == "file" :
+ pass
+ return status
+
+@app.route('/unsubscribe',methods=['POST','UNSUB'])
+def unsubscribe():
+ try :
+ app_id = request.json['app_id']
+ sub_info,target = subinfo.get_subinfo(app_id)
+ if sub_info is None or target is None:
+ err_str = "No subscription exists with app id: " + app_id + "\n"
+ logging.error("* No subscription exists with app id:%s ",app_id)
+ return err_str
+ else:
+ ''' Flag to Update pipeling cfg file '''
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
+ update_pipeline_conf(sub_info,target,app_id,"DEL")
+ else:
+ logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
+ #update_pipeline_conf(sub_info,target,"DEL")
+ subinfo.delete_subinfo(app_id)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+ return "UnSubscrition is sucessful! \n"
+
+@app.errorhandler(404)
+def not_found(error):
+ return make_response(jsonify({'error': 'Not found'}), 404)
+
+def print_subscribed_meter_list():
+ logging.debug("-------------------------------------------------")
+ #print (meter_list)
+ logging.debug("meter_list:%s",meter_list)
+ logging.debug("meter_dict:%s",meter_dict)
+ #print (meter_dict)
+ logging.debug("-------------------------------------------------")
+
+def validate_sub_info(sub_info):
+ if type(sub_info) is not list:
+ sub_info = [sub_info]
+ for meter in sub_info:
+ if meter.startswith("*") or meter.startswith("!"):
+ err_str = "Given meter is not supported:" + meter + "\n"
+ logging.error("* Given meter is not supported:%s",meter)
+ raise Exception (err_str)
+
+def update_meter_dict(meterinfo,app_id):
+ try :
+ if type(meterinfo) == list:
+ for meter in meterinfo:
+ if meter_dict.get(meter) is None:
+ meter_dict[meter] = [app_id]
+ elif app_id not in meter_dict.get(meter):
+ meter_dict.get(meter).append(app_id)
+ else:
+ if meter_dict.get(meterinfo) is None:
+ meter_dict[meterinfo] = [app_id]
+ elif app_id not in meter_dict.get(meterinfo):
+ meter_dict.get(meterinfo).append(app_id)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+
+def check_send_msg_confmgmt_del(sub_info,app_id):
+ temp_sub_info=[]
+ temm_meter_info = None
+ if len(meter_list) == 0:
+ #print("No subscription exists")
+ logging.info("No subscription exists")
+ return False,None
+ if type(sub_info) == list:
+ for meterinfo in sub_info:
+ if meter_dict.get(meterinfo) is None:
+ logging.warning("%s meter doesn't exist in the meter dict",meterinfo)
+ continue
+ if app_id in meter_dict.get(meterinfo):
+ if len(meter_dict.get(meterinfo)) == 1:
+ #print "Only single app is subscribing this meter"
+ logging.info("Only single app is subscribing this meter")
+ del meter_dict[meterinfo]
+ temp_sub_info.append(meterinfo)
+ if meterinfo in meter_list:
+ meter_list.remove(meterinfo)
+ else:
+ meter_dict.get(meterinfo).remove(app_id)
+ return True,temp_sub_info
+ else :
+ if meter_dict.get(sub_info) is None:
+ logging.warning("%s meter doesn't exist in the meter dict",sub_info)
+ return False,None
+ if app_id in meter_dict.get(sub_info):
+ if len(meter_dict.get(sub_info)) == 1:
+ #print "Only single app is subscribing this meter"
+ logging.info("Only single app is subscribing this meter")
+ del meter_dict[sub_info]
+ if sub_info in meter_list:
+ meter_list.remove(sub_info)
+ return True,sub_info
+ else:
+ meter_dict.get(sub_info).remove(app_id)
+ return False,None
+
+def check_send_msg_confmgmt_add(sub_info,app_id):
+ temp_sub_info=[]
+ update_meter_dict(sub_info,app_id)
+ #import pdb;pdb.set_trace()
+ if len(meter_list) == 0:
+ logging.info("No subinfo exits")
+ if type(sub_info) == list:
+ for j in sub_info:
+ meter_list.append(j)
+ return True,sub_info
+ else :
+ meter_list.append(sub_info)
+ return True,sub_info
+ if type(sub_info) == list:
+ for j in sub_info:
+ if j in meter_list:
+ #print ("meter already exists",j)
+ logging.info("meter already exist:%s",j)
+ continue
+ else :
+ temp_sub_info.append(j)
+ meter_list.append(j)
+ if temp_sub_info is not None:
+ return True,temp_sub_info
+ else :
+ return False,None
+ else :
+ if sub_info not in meter_list:
+ meter_list.append(sub_info)
+ #print ("subscription for meter doesn't exist",sub_info)
+ logging.warning("subscription for meter doesn't exist:%s",sub_info)
+ return True,sub_info
+ else :
+ #print ("subscription already exist for ",sub_info)
+ logging.info("subscription already exist for:%s ",sub_info)
+ return False,sub_info
+
+def update_pipeline_conf(sub_info,target,app_id,flag):
+ import pika
+
+ logging.debug("* sub_info:%s",sub_info)
+ logging.debug("* target:%s",target)
+
+ #msg={"sub_info":sub_info,"target":target,"action":flag}
+
+ #json_msg=json.dumps(msg)
+ #msg="image"
+ meter_sub_info = None
+ if flag == "ADD":
+ status,meter_sub_info=check_send_msg_confmgmt_add(sub_info,app_id)
+ if status == False or meter_sub_info == None or meter_sub_info == []:
+ logging.warning("%s is already subscribed with the conf mgmt")
+ return
+ elif flag == "DEL":
+ status,meter_sub_info=check_send_msg_confmgmt_del(sub_info,app_id)
+ if status == False or meter_sub_info == None or meter_sub_info == []:
+ logging.warning("%s is already unsubscribed with the conf mgmt")
+ return
+ try :
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+ rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+ rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+ rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+
+ ceilometer_client_info = config.get('CLIENT','target')
+ #msg={"sub_info":sub_info,"target":ceilometer_client_info,"action":flag}
+ msg={"sub_info":meter_sub_info,"target":ceilometer_client_info,"action":flag}
+ #print msg
+ json_msg=json.dumps(msg)
+
+ credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+ parameters = pika.ConnectionParameters(rabbitmq_host,
+ rabbitmq_port,
+ '/',
+ credentials)
+ connection = pika.BlockingConnection(parameters)
+ properties = pika.BasicProperties(content_type = "application/json")
+ channel = connection.channel()
+ channel.exchange_declare(exchange='pubsub',
+ type='fanout')
+
+ channel.basic_publish(exchange='pubsub',
+ routing_key='',
+ properties = properties,
+ body=json_msg)
+ logging.debug(" [x] %s Sent",msg)
+ logging.info(" [x] %s Sent",msg)
+ connection.close()
+ except Exception as e:
+ logging.error("Error:%s",e.__str__())
+
+def read_notification_from_ceilometer(host,port):
+ UDP_IP = host
+ UDP_PORT = port
+
+ logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
+ udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+ udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ udp.bind((UDP_IP, UDP_PORT))
+
+ while True:
+ #print thread.get_ident()
+ #logging.debug("thread.get_ident():%s", thread.get_ident())
+ data, source = udp.recvfrom(64 * units.Ki)
+ sample = msgpack.loads(data, encoding='utf-8')
+ #logging.debug("* -------------------------------------------------------")
+ logging.debug("%s",sample)
+ #print(sample)
+ for obj in sub_info:
+ msg_list = []
+ if obj.scheme == "udp" :
+ if type(obj.subscription_info) is list:
+ for info in obj.subscription_info:
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
+ else :
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
+ try:
+ if reduce(operator.or_, msg_list):
+ host = obj.ipaddress
+ port = int(obj.portno)
+ l=[]
+ #logging.debug("* -------------------------------------------------------")
+ if obj.sub_info_filter is None:
+ try:
+ logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
+ udp.sendto(data,(host,port))
+ except Exception as e:
+ logging.error ("Unable to send sample over UDP for %s and %s,%s",host,port,e.__str__())
+ ret_str = ("Unable to send sample over UDP for %s and %s,%s")%(host,port,e.__str__())
+ continue
+ for i in range(len(obj.sub_info_filter)):
+ if obj.sub_info_filter[i]['op'] in COMPARATORS:
+ op = COMPARATORS[obj.sub_info_filter[i]['op']]
+ logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
+ logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
+ l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
+ logging.info("* Logical and of Query %s",l)
+ else:
+ logging.deubg("* Not a valid operator ignoring app_id:%s",obj.app_id)
+ l.append(False)
+ logging.info("* Logical and of Query %s",l)
+ if reduce(operator.and_, l):
+ try:
+ logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
+ udp.sendto(data,(host,port))
+ except Exception:
+ logging.error ("Unable to send sample over UDP for %s and %s ",host,port)
+ ret_str = ("Unable to send sample over UDP for %s and %s ")%(host,port)
+ else :
+ logging.warning("* No Notification found with the given subscription")
+ else :
+ logging.warning("* No valid subscrition found for %s",obj.app_id)
+ except Exception as e:
+ logging.error("Key_Error:%s ",e.__str__())
+ ret_str = ("Key_Error:%s \n")% e.__str__()
+
+def read_notification_from_ceilometer_over_udp(host,port):
+ UDP_IP = host
+ UDP_PORT = port
+
+ logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
+ udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+ udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ udp.bind((UDP_IP, UDP_PORT))
+
+ while True:
+ #print thread.get_ident()
+ #logging.debug("thread.get_ident():%s", thread.get_ident())
+ data, source = udp.recvfrom(64 * units.Ki)
+ sample = msgpack.loads(data, encoding='utf-8')
+ status = process_ceilometer_message(sample,data)
+
+def read_notification_from_ceilometer_over_kafka(parse_target):
+ logging.info("Kafka target:%s",parse_target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ for message in kafka_publisher.kafka_consumer:
+ #print message.value
+ #logging.debug("%s",message.value)
+ #logging.info("%s",message.value)
+ status = process_ceilometer_message(json.loads(message.value),message.value)
+ #print status
+ except Exception as e:
+ logging.error("Error in Kafka setup:%s ",e.__str__())
+
+def process_ceilometer_message(sample,data):
+ logging.debug("%s",sample)
+ #logging.info("%s",sample)
+ if len(sub_info) < 1:
+ #print "No subscription exists"
+ return
+ for obj in sub_info:
+ #import pdb;pdb.set_trace()
+ msg_list = []
+ if type(obj.subscription_info) is list:
+ for info in obj.subscription_info:
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
+ else :
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
+ try:
+ if reduce(operator.or_, msg_list):
+ '''
+ kafka_publisher = None
+ if obj.scheme == "kafka" :
+ parse_target=netutils.urlsplit(obj.target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ except Exception as e:
+ logging.error("* Error in connecting kafka broker:%s",e.__str__())
+ # return False
+ continue
+ '''
+ host = obj.ipaddress
+ port = int(obj.portno)
+ l=[]
+ logging.debug("* -------------------------------------------------------")
+ if obj.sub_info_filter is None:
+ try:
+ if obj.scheme == "udp" :
+ #logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
+ #logging.info("* Sending data without query over UDP for host:%s and port:%s",host,port)
+ #udp = socket.socket(socket.AF_INET, # Internet
+ # socket.SOCK_DGRAM) # UDP
+ #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ obj.udp.sendto(data,(host,port))
+ #return True
+ continue
+ elif obj.scheme == "kafka" :
+ #logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
+ #logging.info("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
+ obj.kafka_publisher._send(sample)
+ #return True
+ continue
+ except Exception as e:
+ logging.error ("Unable to send sample over UDP/kafka for %s and %s,%s",host,port,e.__str__())
+ ret_str = ("Unable to send sample over UDP for %s and %s,%s ")%(host,port,e.__str__())
+ #return False
+ continue
+ for i in range(len(obj.sub_info_filter)):
+ if obj.sub_info_filter[i]['op'] in COMPARATORS:
+ op = COMPARATORS[obj.sub_info_filter[i]['op']]
+ #logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
+ #logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
+ l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
+ #logging.info("* Logical and of Query %s",l)
+ else:
+ logging.info("* Not a valid operator ignoring app_id:%s",obj.app_id)
+ l.append(False)
+ #logging.info("* Logical and of Query %s",l)
+ if reduce(operator.or_, l):
+ try:
+ if obj.scheme == "udp" :
+ logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
+ #udp = socket.socket(socket.AF_INET, # Internet
+ # socket.SOCK_DGRAM) # UDP
+ #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ obj.udp.sendto(data,(host,port))
+ #return True
+ continue
+ elif obj.scheme == "kafka" :
+ logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,obj.kafka_publisher.topic)
+ obj.kafka_publisher._send(sample)
+ #return True
+ continue
+ except Exception:
+ logging.error ("Unable to send sample over UDP/Kafka for %s and %s ",host,port)
+ ret_str = ("Unable to send sample over UDP/Kafka for %s and %s ")%(host,port)
+ #return False
+ continue
+ else :
+ logging.debug("* No Notification found with the given subscription")
+ continue
+ else :
+ logging.debug("* No matching subscrition found for %s",sample['counter_name'])
+ continue
+ except Exception as e:
+ logging.error("Key_Error:%s ",e.__str__())
+ ret_str = ("Key_Error:%s \n")%e.__str__()
+ #return False
+ continue
+
+def initialize(ceilometer_client):
+ logging.debug("Ceilometer client info:%s",ceilometer_client)
+ parse_target=netutils.urlsplit(ceilometer_client)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid client format"
+ logging.error("* Invalid client format")
+ return err_str
+ if parse_target.scheme == "udp" :
+ host,port=netutils.parse_host_port(parse_target.netloc)
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+ thread.start_new(read_notification_from_ceilometer_over_udp,(host,port,))
+ elif parse_target.scheme == "kafka" :
+ thread.start_new(read_notification_from_ceilometer_over_kafka,(parse_target,))
+
+
+if __name__ == "__main__":
+
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ webserver_host = config.get('WEB_SERVER','webserver_host')
+ webserver_port = int (config.get('WEB_SERVER','webserver_port'))
+ # client_host = config.get('CLIENT','client_host')
+ # client_port = int (config.get('CLIENT','client_port'))
+ ceilometer_client_info = config.get('CLIENT','target')
+ '''
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+ maxbytes = int (config.get('LOGGING','maxbytes'))
+ backupcount = int (config.get('LOGGING','backupcount'))
+ level = LEVELS.get(log_level, logging.NOTSET)
+ '''
+ logging.config.fileConfig('pub_sub.conf', disable_existing_loggers=False)
+ '''
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+
+ # create rotating file handler
+
+ rfh = logging.handlers.RotatingFileHandler(
+ log_file, encoding='utf8', maxBytes=maxbytes,
+ backupCount=backupcount,delay=0)
+ logging.getLogger().addHandler(rfh)
+ '''
+
+ except Exception as e:
+ print("* Error in config file:",e.__str__())
+ #logging.error("* Error in confing file:%s",e.__str__())
+ else:
+ #initialize(client_host,client_port)
+ initialize(ceilometer_client_info)
+ app.run(host=webserver_host,port=webserver_port,debug=False)