Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)

Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/README b/xos/synchronizer/ceilometer/pipeline_agent_module/README
new file mode 100644
index 0000000..a37ccc3
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/README
@@ -0,0 +1,41 @@
+Dynamic Pipeline-Agent Module:
+1.Packages :
+        pika
+        yaml
+        subprocess
+        logging
+        operator
+        json
+        ConfigParser
+
+ package can be installed using the command:
+  -> pip install pika
+ Remaing packages will come by default with OS package or can be installed
+ using command
+ -> sudo apt-get <package-name>
+
+2.Files:
+a. utils.py: Consists of utility function for parsing and updating pipeline.yaml
+b. pipeline.yaml:Sample pipeline.yaml file with minimum source and sink information tags,
+c. pipeline.py: Does validation of pipeline.yaml configuration.
+d. pipeline_agent.py: Main file of the module while will listen on Rabbitmq exchange "pubsub"
+e. pipeline_agent.conf : Conf file should consist of the following information:
+                       i.Rabbitmq server datails(host,port,username,passwd)
+                       ii.LOGGING info(logging level,file name)
+                       iii.Ceilometer services needed to be restarted after pipeline.yaml changes.
+f. README
+
+3.To run the module:
+  ->sudo python pipeline_agent.py
+
+4.Format to send conf msg to the module:
+  i.For updating conf :
+    -> msg={"sub_info":sub_info,"target":target,"action":"ADD"}
+  ii.for deleting conf :
+    -> msg={"sub_info":sub_info,"target":target,"action":"DEL"}
+
+     The above two msgs should be in json fomrat and should send to same rabbitmq-server where pipeline_agent.py is running
+     with "pubsub"  exchage.
+     ex:
+        sub_info  = ["cpu_util", "memory"]
+        target = "kafka://1.2.3.2:18"
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
new file mode 100644
index 0000000..17690f9
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
@@ -0,0 +1,229 @@
+import abc
+import six
+import yaml
+import os
+import logging
+
+class PipelineException(Exception):
+    def __init__(self, message, pipeline_cfg):
+        self.msg = message
+        self.pipeline_cfg = pipeline_cfg
+
+    def __str__(self):
+        return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
+
+
+class Source(object):
+    """Represents a source of samples or events."""
+
+    def __init__(self, cfg):
+        self.cfg = cfg
+
+        try:
+            self.name = cfg['name']
+            self.sinks = cfg.get('sinks')
+        except KeyError as err:
+            raise PipelineException(
+                "Required field %s not specified" % err.args[0], cfg)
+
+    def __str__(self):
+        return self.name
+
+    def check_sinks(self, sinks):
+        if not self.sinks:
+            raise PipelineException(
+                "No sink defined in source %s" % self,
+                self.cfg)
+        for sink in self.sinks:
+            if sink not in sinks:
+                raise PipelineException(
+                    "Dangling sink %s from source %s" % (sink, self),
+                    self.cfg)
+    def check_source_filtering(self, data, d_type):
+        """Source data rules checking
+
+        - At least one meaningful datapoint exist
+        - Included type and excluded type can't co-exist on the same pipeline
+        - Included type meter and wildcard can't co-exist at same pipeline
+        """
+        if not data:
+            raise PipelineException('No %s specified' % d_type, self.cfg)
+
+        if ([x for x in data if x[0] not in '!*'] and
+           [x for x in data if x[0] == '!']):
+            raise PipelineException(
+                'Both included and excluded %s specified' % d_type,
+                self.cfg)
+
+        if '*' in data and [x for x in data if x[0] not in '!*']:
+            raise PipelineException(
+                'Included %s specified with wildcard' % d_type,
+                self.cfg)
+
+    @staticmethod
+    def is_supported(dataset, data_name):
+        # Support wildcard like storage.* and !disk.*
+        # Start with negation, we consider that the order is deny, allow
+        if any(fnmatch.fnmatch(data_name, datapoint[1:])
+               for datapoint in dataset if datapoint[0] == '!'):
+            return False
+
+        if any(fnmatch.fnmatch(data_name, datapoint)
+               for datapoint in dataset if datapoint[0] != '!'):
+            return True
+
+        # if we only have negation, we suppose the default is allow
+        return all(datapoint.startswith('!') for datapoint in dataset)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Pipeline(object):
+    """Represents a coupling between a sink and a corresponding source."""
+
+    def __init__(self, source, sink):
+        self.source = source
+        self.sink = sink
+        self.name = str(self)
+
+    def __str__(self):
+        return (self.source.name if self.source.name == self.sink.name
+                else '%s:%s' % (self.source.name, self.sink.name))
+
+
+class SamplePipeline(Pipeline):
+    """Represents a pipeline for Samples."""
+
+    def get_interval(self):
+        return self.source.interval
+
+class SampleSource(Source):
+    """Represents a source of samples.
+
+    In effect it is a set of pollsters and/or notification handlers emitting
+    samples for a set of matching meters. Each source encapsulates meter name
+    matching, polling interval determination, optional resource enumeration or
+    discovery, and mapping to one or more sinks for publication.
+    """
+
+    def __init__(self, cfg):
+        super(SampleSource, self).__init__(cfg)
+        try:
+            try:
+                self.interval = int(cfg['interval'])
+            except ValueError:
+                raise PipelineException("Invalid interval value", cfg)
+            # Support 'counters' for backward compatibility
+            self.meters = cfg.get('meters', cfg.get('counters'))
+        except KeyError as err:
+            raise PipelineException(
+                "Required field %s not specified" % err.args[0], cfg)
+        if self.interval <= 0:
+            raise PipelineException("Interval value should > 0", cfg)
+
+        self.resources = cfg.get('resources') or []
+        if not isinstance(self.resources, list):
+            raise PipelineException("Resources should be a list", cfg)
+
+        self.discovery = cfg.get('discovery') or []
+        if not isinstance(self.discovery, list):
+            raise PipelineException("Discovery should be a list", cfg)
+        self.check_source_filtering(self.meters, 'meters')
+
+    def support_meter(self, meter_name):
+        return self.is_supported(self.meters, meter_name)
+
+
+class Sink(object):
+
+    def __init__(self, cfg, transformer_manager):
+        self.cfg = cfg
+
+        try:
+            self.name = cfg['name']
+            # It's legal to have no transformer specified
+            self.transformer_cfg = cfg.get('transformers') or []
+        except KeyError as err:
+            raise PipelineException(
+                "Required field %s not specified" % err.args[0], cfg)
+
+        if not cfg.get('publishers'):
+            raise PipelineException("No publisher specified", cfg)
+
+      
+
+class SampleSink(Sink):
+    def Testfun(self):
+        pass
+      
+
+SAMPLE_TYPE = {'pipeline': SamplePipeline,
+               'source': SampleSource,
+               'sink': SampleSink}
+
+
+class PipelineManager(object):
+
+     def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
+        self.pipelines = []
+        if 'sources' in cfg or 'sinks' in cfg:
+            if not ('sources' in cfg and 'sinks' in cfg):
+                raise PipelineException("Both sources & sinks are required",
+                                        cfg)
+            #LOG.info(_('detected decoupled pipeline config format'))
+            logging.info("detected decoupled pipeline config format %s",cfg)
+            sources = [p_type['source'](s) for s in cfg.get('sources', [])]
+            sinks = {}
+            for s in cfg.get('sinks', []):
+                if s['name'] in sinks:
+                    raise PipelineException("Duplicated sink names: %s" %
+                                            s['name'], self)
+                else:
+                    sinks[s['name']] = p_type['sink'](s, transformer_manager)
+            for source in sources:
+                source.check_sinks(sinks)
+                for target in source.sinks:
+                    pipe = p_type['pipeline'](source, sinks[target])
+                    if pipe.name in [p.name for p in self.pipelines]:
+                        raise PipelineException(
+                            "Duplicate pipeline name: %s. Ensure pipeline"
+                            " names are unique. (name is the source and sink"
+                            " names combined)" % pipe.name, cfg)
+                    else:
+                        self.pipelines.append(pipe)
+        else:
+            #LOG.warning(_('detected deprecated pipeline config format'))
+            logging.warning("detected deprecated pipeline config format")
+            for pipedef in cfg:
+                source = p_type['source'](pipedef)
+                sink = p_type['sink'](pipedef, transformer_manager)
+                pipe = p_type['pipeline'](source, sink)
+                if pipe.name in [p.name for p in self.pipelines]:
+                    raise PipelineException(
+                        "Duplicate pipeline name: %s. Ensure pipeline"
+                        " names are unique" % pipe.name, cfg)
+                else:
+                    self.pipelines.append(pipe)
+     
+
+def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
+    if not os.path.exists(cfg_file):
+        #cfg_file = cfg.CONF.find_file(cfg_file)
+        print "File doesn't exists"   
+        return False
+
+    ##LOG.debug(_("Pipeline config file: %s"), cfg_file)
+    logging.debug("Pipeline config file: %s", cfg_file)
+
+    with open(cfg_file) as fap:
+        data = fap.read()
+
+    pipeline_cfg = yaml.safe_load(data)
+     
+    ##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
+    logging.info("Pipeline config: %s", pipeline_cfg)
+    logging.debug("Pipeline config: %s", pipeline_cfg)
+      
+    return PipelineManager(pipeline_cfg,
+                           None, p_type)
+    
+
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.yaml b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.yaml
new file mode 100644
index 0000000..505471a
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.yaml
@@ -0,0 +1,13 @@
+---
+sources:
+    - name: meter_source
+      interval: 600
+      meters:
+          - "*"
+      sinks:
+          - meter_sink
+sinks:
+    - name: meter_sink
+      transformers:
+      publishers:
+          - notifier://
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.conf b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.conf
new file mode 100644
index 0000000..9d811f6
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.conf
@@ -0,0 +1,32 @@
+#[LOGGING]
+#level = DEBUG 
+#filename = pipeline_agent.log
+
+[RABBITMQ]
+Rabbitmq_username = openstack
+Rabbitmq_passwd = 4815196be370811224fe
+Rabbitmq_host = 10.11.10.1
+Rabbitmq_port = 5672
+Ceilometer_service = ceilometer-agent-central,ceilometer-alarm-evaluator,ceilometer-api 
+
+[loggers]
+keys=root
+
+[handlers]
+keys=logfile
+
+[formatters]
+keys=logfileformatter
+
+[logger_root]
+level=INFO
+handlers=logfile
+
+[formatter_logfileformatter]
+format='%(asctime)s %(filename)s %(levelname)s %(message)s'
+
+[handler_logfile]
+class=handlers.RotatingFileHandler
+level=NOTSET
+args=('pipeline_agent.log','a',1000000,100)
+formatter=logfileformatter
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.log b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.log
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.log
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
new file mode 100644
index 0000000..a751b8e
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
@@ -0,0 +1,208 @@
+import pika
+import yaml
+import subprocess
+import logging
+import logging.config
+import operator
+import json
+import ConfigParser
+import pipeline
+import utils
+#from ceilometer import pipeline
+from collections import  OrderedDict
+
+
+class UnsortableList(list):
+    def sort(self, *args, **kwargs):
+        pass
+
+class UnsortableOrderedDict(OrderedDict):
+    def items(self, *args, **kwargs):
+        return UnsortableList(OrderedDict.items(self, *args, **kwargs))
+
+#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+
+
+tmp_pipeline_conf = "/tmp/pipeline.yaml"
+
+'''
+LEVELS = {'DEBUG': logging.DEBUG,
+          'INFO': logging.INFO,
+          'WARNING': logging.WARNING,
+          'ERROR': logging.ERROR,
+          'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+'''
+def get_source_info(meter):
+    sink_name = meter + "_sink"
+    meter_name = meter+"_name"   
+    source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
+    logging.debug("* new source_info :%s",source_info)
+    return (source_info,sink_name)
+
+def get_sink_info(meter,sink_name,target):
+    sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
+    logging.debug("* new source_info :%s",sink_info)
+    return sink_info
+
+def restart_ceilometer_services():
+    try : 
+       config = ConfigParser.ConfigParser()
+       config.read('pipeline_agent.conf')
+       services = config.get('RABBITMQ','Ceilometer_service')
+       service = services.split(",")
+    except Exception as e:
+        logging.error("* Error in confing file:%s",e.__str__())
+        return False
+    else :
+        for service_name in service:
+            command = ['service',service_name, 'restart'];
+            logging.debug("Executing: %s command",command)
+            #shell=FALSE for sudo to work.
+            try :
+                subprocess.call(command, shell=False)
+            except Exception as e:
+                logging.error("* %s command execution failed with error %s",command,e.__str__())
+                return False
+    return True 
+   
+def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
+    #import pdb;pdb.set_trace() 
+    try :
+        pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
+    except Exception as e:
+        logging.error ("Got Exception: %s",e.__str__())
+        return False 
+    return True
+   
+
+def callback(ch, method, properties, msg):
+    logging.debug(" [x] Received %r",msg)
+    #import pdb; pdb.set_trace()
+    #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+    orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+    with open (orig_pipeline_conf, 'r') as fap:
+         data = fap.read()
+         pipeline_cfg = yaml.safe_load(data)
+    logging.debug("Pipeline config: %s", pipeline_cfg)
+
+    try : 
+        json_msg = json.loads(msg)
+        meter = json_msg['sub_info']
+        publisher = json_msg['target']
+        flag = json_msg['action']
+        update_status = []  
+        if type(meter) is list:
+            logging.debug("Metere is a list ... Need to handle it ")
+            for meter_info in meter :
+                update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
+        else :
+             update_status.append(update_pipeline_yaml(meter,publisher,flag))
+ 
+        if reduce(operator.or_,  update_status):
+            if not restart_ceilometer_services():
+                logging.error("Error in restarting ceilometer services")
+                return False
+    except Exception as e :
+        logging.error("Got exception:%s in parsing message",e.__str__())
+        return False
+
+   
+
+
+ 
+def update_pipeline_yaml(meter,publisher,flag):
+    logging.debug("meter name:%s",meter)
+    logging.debug("publisher or target name:%s",publisher)
+
+    orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+    ''' Parsing orginal pipeline yaml file '''
+    try :
+         with open (orig_pipeline_conf, 'r') as fap:
+             data = fap.read()
+             pipeline_cfg = yaml.safe_load(data)
+         logging.debug("Pipeline config: %s", pipeline_cfg)
+   
+         ''' Chcking parsing errors '''
+    
+         if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
+             logging.error("Original pipeline.yaml parsing failed")
+             return False
+         else :
+             status = None
+             if flag == "ADD" :
+                 status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
+             elif flag == "DEL" :
+                 status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
+       
+             if status == True : 
+                 tmp_pipeline_conf = "/tmp/pipeline.yaml"
+                 with open(tmp_pipeline_conf, "w") as f:
+                      yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+                 if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
+                      logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
+                      with open(orig_pipeline_conf, "w") as f:
+                          yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+                      return True
+                 else :
+                      logging.info("Retaining original conf,as update meter info has errors")
+                      return False     
+    except Exception as e:
+        logging.error("* Error in confing file:%s",e.__str__())
+        return False
+
+ 
+def msg_queue_listner():
+    
+    try:
+        config = ConfigParser.ConfigParser()
+        config.read('pipeline_agent.conf')
+        rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+        rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+        rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+        rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+        '''
+        log_level    = config.get('LOGGING','level')
+        log_file       = config.get('LOGGING','filename')
+ 
+        level = LEVELS.get(log_level, logging.NOTSET)
+        logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
+                    datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+        '''
+        logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)  
+    except Exception as e:
+        logging.error("* Error in confing file:%s",e.__str__())
+    else :
+        logging.debug("*------------------Rabbit MQ Server Info---------")
+        logging.debug("rabbitmq_username:%s",rabbitmq_username)
+        logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
+        logging.debug("rabbitmq_host:%s",rabbitmq_host)
+        logging.debug("rabbitmq_port:%s",rabbitmq_port)
+        credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+        parameters = pika.ConnectionParameters(rabbitmq_host,
+                                               rabbitmq_port,
+                                               '/',
+                                               credentials)
+        connection = pika.BlockingConnection(parameters)
+        channel = connection.channel()
+        #channel.queue_declare(queue='pubsub')
+        channel.exchange_declare(exchange='pubsub',
+                         type='fanout')
+
+        result = channel.queue_declare(exclusive=True)
+        queue_name = result.method.queue
+
+        channel.queue_bind(exchange='pubsub',
+                    queue=queue_name)
+        logging.debug("[*] Waiting for messages. To exit press CTRL+C")
+
+        channel.basic_consume(callback,
+                              queue=queue_name,
+                              no_ack=True)
+        channel.start_consuming()
+
+if __name__ == "__main__":
+    #logging.debug("* Starting pipeline agent module")
+    msg_queue_listner()
+
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/utils.py b/xos/synchronizer/ceilometer/pipeline_agent_module/utils.py
new file mode 100644
index 0000000..3aa4865
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/utils.py
@@ -0,0 +1,404 @@
+import yaml
+import random
+import string
+import logging
+import fnmatch
+
+def main():
+    orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+    with open (orig_pipeline_conf, 'r') as fap:
+        data = fap.read()
+        pipeline_cfg = yaml.safe_load(data)
+    return pipeline_cfg
+
+def build_meter_list():
+    ''' function to exiting  meter list from pipeline.yaml'''
+    orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+    with open (orig_pipeline_conf, 'r') as fap:
+         data = fap.read()
+         pipeline_cfg = yaml.safe_load(data)
+    source_cfg = pipeline_cfg['sources']
+    meter_list=[]
+    for i in source_cfg:
+         meter_list.append(i['meters'])
+    
+    return meter_list
+
+def get_sink_name_from_publisher(publisher,pipeline_cfg):
+    sink_cfg = pipeline_cfg['sinks']
+    ''' Iterating over the list of publishers to get sink name'''
+    try :
+        for sinks in sink_cfg:
+            pub_list = sinks.get('publishers')
+            try :
+                k = pub_list.index(publisher)
+                return sinks.get('name') 
+            except Exception as e:
+                #print ("Got Exception",e.__str__())
+                continue   
+    except Exception as e:
+        return None
+
+def get_source_name_from_meter(meter,pipeline_cfg):
+    source_cfg = pipeline_cfg['sources']
+    ''' Iternating over the list of meters to get source name'''
+    try :  
+        for sources in source_cfg:
+            meter_list = sources.get('meters')
+            try :
+                k = meter_list.index(meter)
+                return sources.get('name')
+            except Exception as e:
+                #print ("Got Exception",e.__str__())
+                continue   
+    except Exception as e:
+        return None
+
+def get_source_name_from_with_meter_patter_match(meter,pipeline_cfg):
+    ''' Iternating over the list of meters for wildcard match to get source name'''
+    source_cfg = pipeline_cfg['sources']
+    try :
+        for sources in source_cfg:
+            meter_list = sources.get('meters')
+            for k in meter_list:
+                if k[0] == "*":
+                    logging.warning("Ignoring wild card meter(*) case ")
+                    continue
+                if fnmatch.fnmatch(k,meter):
+                    logging.debug("substring match")
+                    return (sources.get('name'),"superset",k)
+                if fnmatch.fnmatch(meter,k):
+                    logging.debug("input is super match")
+                    return (sources.get('name'),"subset",k)
+    except Exception as e:
+        return None,None,None
+
+    return None,None,None
+
+def get_source_name_from_sink_name(sink_name,pipeline_cfg):
+    ''' iterating over list of sources to get sink name'''
+    source_cfg = pipeline_cfg['sources']
+    try :
+        for sources in source_cfg:
+            sink_list = sources.get("sinks")
+            try :
+                k = sink_list.index(sink_name)
+            #sources.get("meters").append("m2")
+                return sources.get("name")
+            except Exception as e:
+                continue
+    except Exception as e:
+        return None
+
+def get_sink_name_from_source_name(source_name,pipeline_cfg):
+    ''' iterating over list of sinks to get sink name'''
+    source_cfg = pipeline_cfg['sources']
+    try :
+        for sources in source_cfg:
+            try :
+                if  sources.get("name") == source_name: 
+                    return sources.get("sinks")
+            except Exception as e:
+                continue
+    except Exception as e:
+        return None 
+
+def add_meter_to_source(meter_name,source_name,pipeline_cfg):
+    ''' iterating over the list of sources to add meter to the matching source'''
+    source_cfg = pipeline_cfg['sources']
+    try :
+        for sources in source_cfg:
+            try :
+                if  sources.get("name") == source_name: 
+                    sources.get("meters").append(meter_name)
+                    return True 
+            except Exception as e:
+                continue
+    except Exception as e:
+        return False
+
+def get_meter_list_from_source(source_name,pipeline_cfg):
+    ''' iterating over the list of sources to get meters under the given source'''
+    source_cfg = pipeline_cfg['sources']
+    try :
+        for sources in source_cfg:
+            try :
+                if  sources.get("name") == source_name:
+                    return sources.get("meters")
+            except Exception as e:
+                continue
+    except Exception as e:
+        return None
+
+def get_publisher_list_from_sink(sink_name,pipeline_cfg):
+    sink_cfg = pipeline_cfg['sinks']
+    ''' Iterating over the list of sinks to build publishers list '''
+    publisher_list = []
+    try :
+        for sinks in sink_cfg:
+            try :
+                for j in sink_name:
+                    if j == sinks.get("name"):
+                        publisher_list.append(sinks.get("publishers"))
+                        return publisher_list
+            except Exception as e:
+                #print ("Got Exception",e.__str__())
+                continue   
+    except Exception as e:
+        return None
+
+def get_publisher_list_from_sinkname(sink_name,pipeline_cfg):
+    sink_cfg = pipeline_cfg['sinks']
+    ''' Iterating over the list of sinks to build publishers list '''
+    try :
+        for sinks in sink_cfg:
+            pub_list = sinks.get('publishers')
+            try :
+                 if sink_name == sinks.get("name"):
+                     return pub_list   
+            except Exception as e:
+                #print ("Got Exception",e.__str__())
+                continue
+    except Exception as e:
+        return None
+
+
+def delete_meter_from_source(meter_name,source_name,pipeline_cfg) :
+    ''' function to delete meter for the given source '''
+    source_cfg = pipeline_cfg['sources']
+    try :
+        for sources in source_cfg:
+            try :
+                if  sources.get("name") == source_name:
+                    meter_list = sources.get('meters')
+                    try :
+                       meter_index = meter_list.index(meter_name)
+                       logging.debug("meter name is present at index:%s",meter_index)
+                       if len(meter_list) == 1 and meter_index == 0:
+                           logging.debug("Only one meter exists removing entire source entry")
+                           source_cfg.remove(sources)
+                       else :
+                           meter_list.pop(meter_index)
+                       return True     
+                    except Exception as e:
+                        continue
+            except Exception as e:
+                continue
+    except Exception as e:
+        return False 
+
+def delete_publisher_from_sink(publisher,sink_name,pipeline_cfg):
+    sink_cfg = pipeline_cfg['sinks']
+    ''' Iterating over the list of publishers '''
+    try :
+        for sinks in sink_cfg:
+            pub_list = sinks.get('publishers')
+            #print pub_list
+            try :
+                if sink_name == sinks.get("name"):
+                    k = pub_list.index(publisher)
+                    pub_list.pop(k)
+                    #print k
+                    return True 
+            except Exception as e:
+                #print ("Got Exception",e.__str__())
+                continue   
+    except Exception as e:
+        return None
+
+def delete_sink_from_pipeline(sink_name,pipeline_cfg):
+    sink_cfg = pipeline_cfg['sinks']
+    try :
+        for sinks in sink_cfg:
+            if sink_name == sinks.get("name"):
+                sink_cfg.remove(sinks)
+                return True
+    except Exception as e:
+        return False 
+
+def add_publisher_to_sink(publisher_name,sink_name,pipeline_cfg):
+    sink_cfg = pipeline_cfg['sinks']
+    try :
+        for sinks in sink_cfg:
+            if sink_name == sinks.get("name"):
+                sinks.get('publishers').append(publisher_name)
+                return True
+    except Exception as e:
+        return None 
+
+def get_source_info(meter):
+    name = ''.join(random.choice(string.ascii_lowercase) for _ in range(9))
+    sink_name = name + "_sink"
+    meter_name = name + "_source"
+    source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
+    logging.debug("* new source_info :%s",source_info)
+    return (source_info,sink_name)
+
+def get_sink_info(meter,sink_name,target):
+    sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
+    logging.debug("* new source_info :%s",sink_info)
+    return sink_info
+
+def delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg):
+    #import pdb;pdb.set_trace()
+   
+    sink_name = get_sink_name_from_publisher(publisher,pipeline_cfg)
+    source_name = get_source_name_from_meter(meter,pipeline_cfg)
+      
+    if sink_name is None or source_name is None:
+       logging.error("Either sink or source name Exists in the pipeline.yaml")
+       return False
+   
+    meter_list = get_meter_list_from_source(source_name,pipeline_cfg)
+   
+    temp_meter_list = []
+   
+    for j in meter_list:
+        temp_meter_list.append(j)
+  
+    pub_list = get_publisher_list_from_sinkname(sink_name,pipeline_cfg)
+    if len(pub_list) > 2 and  len(temp_meter_list) == 1:
+        if delete_publisher_from_sink(publisher,sink_name,pipeline_cfg):
+            return True
+        else:
+            return False    
+  
+    if delete_meter_from_source(meter,source_name,pipeline_cfg) :
+        if len(temp_meter_list) == 1:
+            if delete_publisher_from_sink(publisher,sink_name,pipeline_cfg) :
+                if get_source_name_from_sink_name(sink_name,pipeline_cfg) is None:
+                    delete_sink_from_pipeline(sink_name,pipeline_cfg)  
+                return True
+            else :
+                return False 
+        return True         
+    return False
+    
+
+def update_sink_aggrgation(meter,publisher,source_name,matching_meter,meter_match,pipeline_cfg):
+    ''' Build new source and sink '''
+    new_source_info,new_sink_name = get_source_info(meter)
+    new_sink_info = get_sink_info(meter,new_sink_name,publisher)
+
+    meter_list = get_meter_list_from_source(source_name,pipeline_cfg)
+    sink_name = get_sink_name_from_source_name(source_name,pipeline_cfg)
+    publisher_list = get_publisher_list_from_sink(sink_name,pipeline_cfg)
+    for i in publisher_list:
+        for j in i:
+            #print j
+            if j not in new_sink_info.get("publishers") :
+                new_sink_info.get("publishers").append(j)
+                #print new_sink_info
+
+    cfg_source = pipeline_cfg['sources']
+    cfg_sink = pipeline_cfg['sinks']
+    if meter_match == "superset" :
+        new_source_info.get("meters").append("!"+ matching_meter)
+    elif meter_match == "subset" :
+        ''' here need to get list of meters with sub-string match '''
+        add_meter_to_source("!"+meter,source_name,pipeline_cfg)
+        add_publisher_to_sink(publisher,sink_name,pipeline_cfg)
+
+    logging.debug("-----------  Before Updating Meter Info ------------------")
+    logging.debug("%s",pipeline_cfg)
+
+    ''' Updating source and sink info '''
+    cfg_source.append(new_source_info)
+    cfg_sink.append(new_sink_info)
+    logging.debug("-----------  After Updating Meter Info --------------------")
+    logging.debug("%s",pipeline_cfg)
+
+def update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg):
+    #import pdb;pdb.set_trace()
+    sink_name = get_sink_name_from_publisher(publisher,pipeline_cfg)
+    source_name = get_source_name_from_meter(meter,pipeline_cfg)
+    if sink_name is None :
+        logging.debug("No Sink exists with the given Publisher")
+        if source_name is None:
+            ''' Commenting the code related t owild card '''
+            '''
+            pattern_source_name,pattern,matching_meter = get_source_name_from_with_meter_patter_match(meter,pipeline_cfg)
+            if pattern_source_name is not None:
+                if pattern == "superset" :
+                    #add_meter_to_source("!"+meter,pattern_source_name,pipeline_cfg)
+                    update_sink_aggrgation(meter,publisher,pattern_source_name,matching_meter,"superset",pipeline_cfg)
+                    #print pipeline_cfg
+                    return True 
+                if pattern == "subset" :
+                   update_sink_aggrgation(meter,publisher,pattern_source_name,matching_meter,"subset",pipeline_cfg)
+                   return True    
+            ''' 
+            source_info,sink_name = get_source_info(meter)
+            sink_info = get_sink_info(meter,sink_name,publisher)
+  
+            cfg_source = pipeline_cfg['sources']
+            cfg_sink = pipeline_cfg['sinks']
+
+            logging.debug("-----------  Before Updating Meter Info ------------------")
+            logging.debug("%s",pipeline_cfg)
+
+            ''' Updating source and sink info '''
+            cfg_source.append(source_info)
+            cfg_sink.append(sink_info)
+            logging.debug("-----------  After Updating Meter Info --------------------")
+            logging.debug("%s",pipeline_cfg)
+            return True
+        else :
+             logging.debug("Meter already exists in the conf file under source name:%s ",source_name)
+             meter_list = get_meter_list_from_source(source_name,pipeline_cfg)
+             publisher_list=[]
+             if len(meter_list) > 1:
+                sink_name = get_sink_name_from_source_name(source_name,pipeline_cfg)
+                '''
+                if type(sink_name) is list :
+                    for sinkname in sink_name:    
+                        publisher_list.append(get_publisher_list_from_sink(sinkname,pipeline_cfg))
+                else :
+                     publisher_list.append(get_publisher_list_from_sink(sink_name,pipeline_cfg))
+                ''' 
+                publisher_list = get_publisher_list_from_sink(sink_name,pipeline_cfg)
+                new_source_info,new_sink_name = get_source_info(meter)
+                new_sink_info = get_sink_info(meter,new_sink_name,publisher)
+                for i in publisher_list:
+                     for j in i:
+                          #print j
+                          if j not in new_sink_info.get("publishers") :
+                              new_sink_info.get("publishers").append(j)
+                cfg_source = pipeline_cfg['sources']
+                cfg_sink = pipeline_cfg['sinks']
+
+                logging.debug("-----------  Before Updating Meter Info ------------------")
+                logging.debug("%s",pipeline_cfg)
+
+                ''' Updating source and sink info '''
+                cfg_source.append(new_source_info)
+                cfg_sink.append(new_sink_info)
+                logging.debug("-----------  After Updating Meter Info --------------------")
+                logging.debug("%s",pipeline_cfg)
+                delete_meter_from_source(meter,source_name,pipeline_cfg)
+                logging.debug("%s",pipeline_cfg)
+                return True
+             else :
+                  logging.debug ("Source already exists for this meter add publisher to it .....:%s",source_name)
+                  sink_name_list = get_sink_name_from_source_name(source_name,pipeline_cfg)
+                  for sink_name in sink_name_list :
+                      add_publisher_to_sink(publisher,sink_name,pipeline_cfg)
+                  return True    
+                  #print pipeline_cfg
+    else :
+         logging.debug ("Publisher already exists under sink:%s",sink_name)
+         if get_source_name_from_meter(meter,pipeline_cfg) is not None:
+             logging.debug("Both meter  and publisher already exists in the conf file")
+             logging.debug( "Update request is not sucessful")
+             return False
+         else :
+             source_name = get_source_name_from_sink_name(sink_name,pipeline_cfg) 
+             logging.debug ("Need to add meter to already existing source which \
+                    has this publisher under one of its sink")
+             #print source_name
+             if add_meter_to_source(meter,source_name,pipeline_cfg):
+                 logging.debug("Meter added sucessfully")
+                 return True   
+               
+            
+