Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)

Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
new file mode 100644
index 0000000..a751b8e
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
@@ -0,0 +1,208 @@
+import pika
+import yaml
+import subprocess
+import logging
+import logging.config
+import operator
+import json
+import ConfigParser
+import pipeline
+import utils
+#from ceilometer import pipeline
+from collections import  OrderedDict
+
+
+class UnsortableList(list):
+    def sort(self, *args, **kwargs):
+        pass
+
+class UnsortableOrderedDict(OrderedDict):
+    def items(self, *args, **kwargs):
+        return UnsortableList(OrderedDict.items(self, *args, **kwargs))
+
+#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+
+
+tmp_pipeline_conf = "/tmp/pipeline.yaml"
+
+'''
+LEVELS = {'DEBUG': logging.DEBUG,
+          'INFO': logging.INFO,
+          'WARNING': logging.WARNING,
+          'ERROR': logging.ERROR,
+          'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+'''
+def get_source_info(meter):
+    sink_name = meter + "_sink"
+    meter_name = meter+"_name"   
+    source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
+    logging.debug("* new source_info :%s",source_info)
+    return (source_info,sink_name)
+
+def get_sink_info(meter,sink_name,target):
+    sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
+    logging.debug("* new source_info :%s",sink_info)
+    return sink_info
+
+def restart_ceilometer_services():
+    try : 
+       config = ConfigParser.ConfigParser()
+       config.read('pipeline_agent.conf')
+       services = config.get('RABBITMQ','Ceilometer_service')
+       service = services.split(",")
+    except Exception as e:
+        logging.error("* Error in confing file:%s",e.__str__())
+        return False
+    else :
+        for service_name in service:
+            command = ['service',service_name, 'restart'];
+            logging.debug("Executing: %s command",command)
+            #shell=FALSE for sudo to work.
+            try :
+                subprocess.call(command, shell=False)
+            except Exception as e:
+                logging.error("* %s command execution failed with error %s",command,e.__str__())
+                return False
+    return True 
+   
+def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
+    #import pdb;pdb.set_trace() 
+    try :
+        pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
+    except Exception as e:
+        logging.error ("Got Exception: %s",e.__str__())
+        return False 
+    return True
+   
+
+def callback(ch, method, properties, msg):
+    logging.debug(" [x] Received %r",msg)
+    #import pdb; pdb.set_trace()
+    #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+    orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+    with open (orig_pipeline_conf, 'r') as fap:
+         data = fap.read()
+         pipeline_cfg = yaml.safe_load(data)
+    logging.debug("Pipeline config: %s", pipeline_cfg)
+
+    try : 
+        json_msg = json.loads(msg)
+        meter = json_msg['sub_info']
+        publisher = json_msg['target']
+        flag = json_msg['action']
+        update_status = []  
+        if type(meter) is list:
+            logging.debug("Metere is a list ... Need to handle it ")
+            for meter_info in meter :
+                update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
+        else :
+             update_status.append(update_pipeline_yaml(meter,publisher,flag))
+ 
+        if reduce(operator.or_,  update_status):
+            if not restart_ceilometer_services():
+                logging.error("Error in restarting ceilometer services")
+                return False
+    except Exception as e :
+        logging.error("Got exception:%s in parsing message",e.__str__())
+        return False
+
+   
+
+
+ 
+def update_pipeline_yaml(meter,publisher,flag):
+    logging.debug("meter name:%s",meter)
+    logging.debug("publisher or target name:%s",publisher)
+
+    orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+    ''' Parsing orginal pipeline yaml file '''
+    try :
+         with open (orig_pipeline_conf, 'r') as fap:
+             data = fap.read()
+             pipeline_cfg = yaml.safe_load(data)
+         logging.debug("Pipeline config: %s", pipeline_cfg)
+   
+         ''' Chcking parsing errors '''
+    
+         if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
+             logging.error("Original pipeline.yaml parsing failed")
+             return False
+         else :
+             status = None
+             if flag == "ADD" :
+                 status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
+             elif flag == "DEL" :
+                 status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
+       
+             if status == True : 
+                 tmp_pipeline_conf = "/tmp/pipeline.yaml"
+                 with open(tmp_pipeline_conf, "w") as f:
+                      yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+                 if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
+                      logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
+                      with open(orig_pipeline_conf, "w") as f:
+                          yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+                      return True
+                 else :
+                      logging.info("Retaining original conf,as update meter info has errors")
+                      return False     
+    except Exception as e:
+        logging.error("* Error in confing file:%s",e.__str__())
+        return False
+
+ 
+def msg_queue_listner():
+    
+    try:
+        config = ConfigParser.ConfigParser()
+        config.read('pipeline_agent.conf')
+        rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+        rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+        rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+        rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+        '''
+        log_level    = config.get('LOGGING','level')
+        log_file       = config.get('LOGGING','filename')
+ 
+        level = LEVELS.get(log_level, logging.NOTSET)
+        logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
+                    datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+        '''
+        logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)  
+    except Exception as e:
+        logging.error("* Error in confing file:%s",e.__str__())
+    else :
+        logging.debug("*------------------Rabbit MQ Server Info---------")
+        logging.debug("rabbitmq_username:%s",rabbitmq_username)
+        logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
+        logging.debug("rabbitmq_host:%s",rabbitmq_host)
+        logging.debug("rabbitmq_port:%s",rabbitmq_port)
+        credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+        parameters = pika.ConnectionParameters(rabbitmq_host,
+                                               rabbitmq_port,
+                                               '/',
+                                               credentials)
+        connection = pika.BlockingConnection(parameters)
+        channel = connection.channel()
+        #channel.queue_declare(queue='pubsub')
+        channel.exchange_declare(exchange='pubsub',
+                         type='fanout')
+
+        result = channel.queue_declare(exclusive=True)
+        queue_name = result.method.queue
+
+        channel.queue_bind(exchange='pubsub',
+                    queue=queue_name)
+        logging.debug("[*] Waiting for messages. To exit press CTRL+C")
+
+        channel.basic_consume(callback,
+                              queue=queue_name,
+                              no_ack=True)
+        channel.start_consuming()
+
+if __name__ == "__main__":
+    #logging.debug("* Starting pipeline agent module")
+    msg_queue_listner()
+