Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)

Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
new file mode 100644
index 0000000..17690f9
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
@@ -0,0 +1,229 @@
+import abc
+import six
+import yaml
+import os
+import logging
+
+class PipelineException(Exception):
+    def __init__(self, message, pipeline_cfg):
+        self.msg = message
+        self.pipeline_cfg = pipeline_cfg
+
+    def __str__(self):
+        return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
+
+
+class Source(object):
+    """Represents a source of samples or events."""
+
+    def __init__(self, cfg):
+        self.cfg = cfg
+
+        try:
+            self.name = cfg['name']
+            self.sinks = cfg.get('sinks')
+        except KeyError as err:
+            raise PipelineException(
+                "Required field %s not specified" % err.args[0], cfg)
+
+    def __str__(self):
+        return self.name
+
+    def check_sinks(self, sinks):
+        if not self.sinks:
+            raise PipelineException(
+                "No sink defined in source %s" % self,
+                self.cfg)
+        for sink in self.sinks:
+            if sink not in sinks:
+                raise PipelineException(
+                    "Dangling sink %s from source %s" % (sink, self),
+                    self.cfg)
+    def check_source_filtering(self, data, d_type):
+        """Source data rules checking
+
+        - At least one meaningful datapoint exist
+        - Included type and excluded type can't co-exist on the same pipeline
+        - Included type meter and wildcard can't co-exist at same pipeline
+        """
+        if not data:
+            raise PipelineException('No %s specified' % d_type, self.cfg)
+
+        if ([x for x in data if x[0] not in '!*'] and
+           [x for x in data if x[0] == '!']):
+            raise PipelineException(
+                'Both included and excluded %s specified' % d_type,
+                self.cfg)
+
+        if '*' in data and [x for x in data if x[0] not in '!*']:
+            raise PipelineException(
+                'Included %s specified with wildcard' % d_type,
+                self.cfg)
+
+    @staticmethod
+    def is_supported(dataset, data_name):
+        # Support wildcard like storage.* and !disk.*
+        # Start with negation, we consider that the order is deny, allow
+        if any(fnmatch.fnmatch(data_name, datapoint[1:])
+               for datapoint in dataset if datapoint[0] == '!'):
+            return False
+
+        if any(fnmatch.fnmatch(data_name, datapoint)
+               for datapoint in dataset if datapoint[0] != '!'):
+            return True
+
+        # if we only have negation, we suppose the default is allow
+        return all(datapoint.startswith('!') for datapoint in dataset)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Pipeline(object):
+    """Represents a coupling between a sink and a corresponding source."""
+
+    def __init__(self, source, sink):
+        self.source = source
+        self.sink = sink
+        self.name = str(self)
+
+    def __str__(self):
+        return (self.source.name if self.source.name == self.sink.name
+                else '%s:%s' % (self.source.name, self.sink.name))
+
+
+class SamplePipeline(Pipeline):
+    """Represents a pipeline for Samples."""
+
+    def get_interval(self):
+        return self.source.interval
+
+class SampleSource(Source):
+    """Represents a source of samples.
+
+    In effect it is a set of pollsters and/or notification handlers emitting
+    samples for a set of matching meters. Each source encapsulates meter name
+    matching, polling interval determination, optional resource enumeration or
+    discovery, and mapping to one or more sinks for publication.
+    """
+
+    def __init__(self, cfg):
+        super(SampleSource, self).__init__(cfg)
+        try:
+            try:
+                self.interval = int(cfg['interval'])
+            except ValueError:
+                raise PipelineException("Invalid interval value", cfg)
+            # Support 'counters' for backward compatibility
+            self.meters = cfg.get('meters', cfg.get('counters'))
+        except KeyError as err:
+            raise PipelineException(
+                "Required field %s not specified" % err.args[0], cfg)
+        if self.interval <= 0:
+            raise PipelineException("Interval value should > 0", cfg)
+
+        self.resources = cfg.get('resources') or []
+        if not isinstance(self.resources, list):
+            raise PipelineException("Resources should be a list", cfg)
+
+        self.discovery = cfg.get('discovery') or []
+        if not isinstance(self.discovery, list):
+            raise PipelineException("Discovery should be a list", cfg)
+        self.check_source_filtering(self.meters, 'meters')
+
+    def support_meter(self, meter_name):
+        return self.is_supported(self.meters, meter_name)
+
+
+class Sink(object):
+
+    def __init__(self, cfg, transformer_manager):
+        self.cfg = cfg
+
+        try:
+            self.name = cfg['name']
+            # It's legal to have no transformer specified
+            self.transformer_cfg = cfg.get('transformers') or []
+        except KeyError as err:
+            raise PipelineException(
+                "Required field %s not specified" % err.args[0], cfg)
+
+        if not cfg.get('publishers'):
+            raise PipelineException("No publisher specified", cfg)
+
+      
+
+class SampleSink(Sink):
+    def Testfun(self):
+        pass
+      
+
+SAMPLE_TYPE = {'pipeline': SamplePipeline,
+               'source': SampleSource,
+               'sink': SampleSink}
+
+
+class PipelineManager(object):
+
+     def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
+        self.pipelines = []
+        if 'sources' in cfg or 'sinks' in cfg:
+            if not ('sources' in cfg and 'sinks' in cfg):
+                raise PipelineException("Both sources & sinks are required",
+                                        cfg)
+            #LOG.info(_('detected decoupled pipeline config format'))
+            logging.info("detected decoupled pipeline config format %s",cfg)
+            sources = [p_type['source'](s) for s in cfg.get('sources', [])]
+            sinks = {}
+            for s in cfg.get('sinks', []):
+                if s['name'] in sinks:
+                    raise PipelineException("Duplicated sink names: %s" %
+                                            s['name'], self)
+                else:
+                    sinks[s['name']] = p_type['sink'](s, transformer_manager)
+            for source in sources:
+                source.check_sinks(sinks)
+                for target in source.sinks:
+                    pipe = p_type['pipeline'](source, sinks[target])
+                    if pipe.name in [p.name for p in self.pipelines]:
+                        raise PipelineException(
+                            "Duplicate pipeline name: %s. Ensure pipeline"
+                            " names are unique. (name is the source and sink"
+                            " names combined)" % pipe.name, cfg)
+                    else:
+                        self.pipelines.append(pipe)
+        else:
+            #LOG.warning(_('detected deprecated pipeline config format'))
+            logging.warning("detected deprecated pipeline config format")
+            for pipedef in cfg:
+                source = p_type['source'](pipedef)
+                sink = p_type['sink'](pipedef, transformer_manager)
+                pipe = p_type['pipeline'](source, sink)
+                if pipe.name in [p.name for p in self.pipelines]:
+                    raise PipelineException(
+                        "Duplicate pipeline name: %s. Ensure pipeline"
+                        " names are unique" % pipe.name, cfg)
+                else:
+                    self.pipelines.append(pipe)
+     
+
+def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
+    if not os.path.exists(cfg_file):
+        #cfg_file = cfg.CONF.find_file(cfg_file)
+        print "File doesn't exists"   
+        return False
+
+    ##LOG.debug(_("Pipeline config file: %s"), cfg_file)
+    logging.debug("Pipeline config file: %s", cfg_file)
+
+    with open(cfg_file) as fap:
+        data = fap.read()
+
+    pipeline_cfg = yaml.safe_load(data)
+     
+    ##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
+    logging.info("Pipeline config: %s", pipeline_cfg)
+    logging.debug("Pipeline config: %s", pipeline_cfg)
+      
+    return PipelineManager(pipeline_cfg,
+                           None, p_type)
+    
+