Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
new file mode 100644
index 0000000..a751b8e
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
@@ -0,0 +1,208 @@
+import pika
+import yaml
+import subprocess
+import logging
+import logging.config
+import operator
+import json
+import ConfigParser
+import pipeline
+import utils
+#from ceilometer import pipeline
+from collections import OrderedDict
+
+
+class UnsortableList(list):
+ def sort(self, *args, **kwargs):
+ pass
+
+class UnsortableOrderedDict(OrderedDict):
+ def items(self, *args, **kwargs):
+ return UnsortableList(OrderedDict.items(self, *args, **kwargs))
+
+#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+
+
+tmp_pipeline_conf = "/tmp/pipeline.yaml"
+
+'''
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+'''
+def get_source_info(meter):
+ sink_name = meter + "_sink"
+ meter_name = meter+"_name"
+ source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
+ logging.debug("* new source_info :%s",source_info)
+ return (source_info,sink_name)
+
+def get_sink_info(meter,sink_name,target):
+ sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
+ logging.debug("* new source_info :%s",sink_info)
+ return sink_info
+
+def restart_ceilometer_services():
+ try :
+ config = ConfigParser.ConfigParser()
+ config.read('pipeline_agent.conf')
+ services = config.get('RABBITMQ','Ceilometer_service')
+ service = services.split(",")
+ except Exception as e:
+ logging.error("* Error in confing file:%s",e.__str__())
+ return False
+ else :
+ for service_name in service:
+ command = ['service',service_name, 'restart'];
+ logging.debug("Executing: %s command",command)
+ #shell=FALSE for sudo to work.
+ try :
+ subprocess.call(command, shell=False)
+ except Exception as e:
+ logging.error("* %s command execution failed with error %s",command,e.__str__())
+ return False
+ return True
+
+def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
+ #import pdb;pdb.set_trace()
+ try :
+ pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
+ except Exception as e:
+ logging.error ("Got Exception: %s",e.__str__())
+ return False
+ return True
+
+
+def callback(ch, method, properties, msg):
+ logging.debug(" [x] Received %r",msg)
+ #import pdb; pdb.set_trace()
+ #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+ orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+ with open (orig_pipeline_conf, 'r') as fap:
+ data = fap.read()
+ pipeline_cfg = yaml.safe_load(data)
+ logging.debug("Pipeline config: %s", pipeline_cfg)
+
+ try :
+ json_msg = json.loads(msg)
+ meter = json_msg['sub_info']
+ publisher = json_msg['target']
+ flag = json_msg['action']
+ update_status = []
+ if type(meter) is list:
+ logging.debug("Metere is a list ... Need to handle it ")
+ for meter_info in meter :
+ update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
+ else :
+ update_status.append(update_pipeline_yaml(meter,publisher,flag))
+
+ if reduce(operator.or_, update_status):
+ if not restart_ceilometer_services():
+ logging.error("Error in restarting ceilometer services")
+ return False
+ except Exception as e :
+ logging.error("Got exception:%s in parsing message",e.__str__())
+ return False
+
+
+
+
+
+def update_pipeline_yaml(meter,publisher,flag):
+ logging.debug("meter name:%s",meter)
+ logging.debug("publisher or target name:%s",publisher)
+
+ orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+ ''' Parsing orginal pipeline yaml file '''
+ try :
+ with open (orig_pipeline_conf, 'r') as fap:
+ data = fap.read()
+ pipeline_cfg = yaml.safe_load(data)
+ logging.debug("Pipeline config: %s", pipeline_cfg)
+
+ ''' Chcking parsing errors '''
+
+ if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
+ logging.error("Original pipeline.yaml parsing failed")
+ return False
+ else :
+ status = None
+ if flag == "ADD" :
+ status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
+ elif flag == "DEL" :
+ status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
+
+ if status == True :
+ tmp_pipeline_conf = "/tmp/pipeline.yaml"
+ with open(tmp_pipeline_conf, "w") as f:
+ yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+ if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
+ logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
+ with open(orig_pipeline_conf, "w") as f:
+ yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+ return True
+ else :
+ logging.info("Retaining original conf,as update meter info has errors")
+ return False
+ except Exception as e:
+ logging.error("* Error in confing file:%s",e.__str__())
+ return False
+
+
+def msg_queue_listner():
+
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read('pipeline_agent.conf')
+ rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+ rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+ rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+ rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+ '''
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+
+ level = LEVELS.get(log_level, logging.NOTSET)
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+ '''
+ logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)
+ except Exception as e:
+ logging.error("* Error in confing file:%s",e.__str__())
+ else :
+ logging.debug("*------------------Rabbit MQ Server Info---------")
+ logging.debug("rabbitmq_username:%s",rabbitmq_username)
+ logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
+ logging.debug("rabbitmq_host:%s",rabbitmq_host)
+ logging.debug("rabbitmq_port:%s",rabbitmq_port)
+ credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+ parameters = pika.ConnectionParameters(rabbitmq_host,
+ rabbitmq_port,
+ '/',
+ credentials)
+ connection = pika.BlockingConnection(parameters)
+ channel = connection.channel()
+ #channel.queue_declare(queue='pubsub')
+ channel.exchange_declare(exchange='pubsub',
+ type='fanout')
+
+ result = channel.queue_declare(exclusive=True)
+ queue_name = result.method.queue
+
+ channel.queue_bind(exchange='pubsub',
+ queue=queue_name)
+ logging.debug("[*] Waiting for messages. To exit press CTRL+C")
+
+ channel.basic_consume(callback,
+ queue=queue_name,
+ no_ack=True)
+ channel.start_consuming()
+
+if __name__ == "__main__":
+ #logging.debug("* Starting pipeline agent module")
+ msg_queue_listner()
+