Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
new file mode 100644
index 0000000..17690f9
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
@@ -0,0 +1,229 @@
+import abc
+import six
+import yaml
+import os
+import logging
+
+class PipelineException(Exception):
+ def __init__(self, message, pipeline_cfg):
+ self.msg = message
+ self.pipeline_cfg = pipeline_cfg
+
+ def __str__(self):
+ return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
+
+
+class Source(object):
+ """Represents a source of samples or events."""
+
+ def __init__(self, cfg):
+ self.cfg = cfg
+
+ try:
+ self.name = cfg['name']
+ self.sinks = cfg.get('sinks')
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+
+ def __str__(self):
+ return self.name
+
+ def check_sinks(self, sinks):
+ if not self.sinks:
+ raise PipelineException(
+ "No sink defined in source %s" % self,
+ self.cfg)
+ for sink in self.sinks:
+ if sink not in sinks:
+ raise PipelineException(
+ "Dangling sink %s from source %s" % (sink, self),
+ self.cfg)
+ def check_source_filtering(self, data, d_type):
+ """Source data rules checking
+
+ - At least one meaningful datapoint exist
+ - Included type and excluded type can't co-exist on the same pipeline
+ - Included type meter and wildcard can't co-exist at same pipeline
+ """
+ if not data:
+ raise PipelineException('No %s specified' % d_type, self.cfg)
+
+ if ([x for x in data if x[0] not in '!*'] and
+ [x for x in data if x[0] == '!']):
+ raise PipelineException(
+ 'Both included and excluded %s specified' % d_type,
+ self.cfg)
+
+ if '*' in data and [x for x in data if x[0] not in '!*']:
+ raise PipelineException(
+ 'Included %s specified with wildcard' % d_type,
+ self.cfg)
+
+ @staticmethod
+ def is_supported(dataset, data_name):
+ # Support wildcard like storage.* and !disk.*
+ # Start with negation, we consider that the order is deny, allow
+ if any(fnmatch.fnmatch(data_name, datapoint[1:])
+ for datapoint in dataset if datapoint[0] == '!'):
+ return False
+
+ if any(fnmatch.fnmatch(data_name, datapoint)
+ for datapoint in dataset if datapoint[0] != '!'):
+ return True
+
+ # if we only have negation, we suppose the default is allow
+ return all(datapoint.startswith('!') for datapoint in dataset)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Pipeline(object):
+ """Represents a coupling between a sink and a corresponding source."""
+
+ def __init__(self, source, sink):
+ self.source = source
+ self.sink = sink
+ self.name = str(self)
+
+ def __str__(self):
+ return (self.source.name if self.source.name == self.sink.name
+ else '%s:%s' % (self.source.name, self.sink.name))
+
+
+class SamplePipeline(Pipeline):
+ """Represents a pipeline for Samples."""
+
+ def get_interval(self):
+ return self.source.interval
+
+class SampleSource(Source):
+ """Represents a source of samples.
+
+ In effect it is a set of pollsters and/or notification handlers emitting
+ samples for a set of matching meters. Each source encapsulates meter name
+ matching, polling interval determination, optional resource enumeration or
+ discovery, and mapping to one or more sinks for publication.
+ """
+
+ def __init__(self, cfg):
+ super(SampleSource, self).__init__(cfg)
+ try:
+ try:
+ self.interval = int(cfg['interval'])
+ except ValueError:
+ raise PipelineException("Invalid interval value", cfg)
+ # Support 'counters' for backward compatibility
+ self.meters = cfg.get('meters', cfg.get('counters'))
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+ if self.interval <= 0:
+ raise PipelineException("Interval value should > 0", cfg)
+
+ self.resources = cfg.get('resources') or []
+ if not isinstance(self.resources, list):
+ raise PipelineException("Resources should be a list", cfg)
+
+ self.discovery = cfg.get('discovery') or []
+ if not isinstance(self.discovery, list):
+ raise PipelineException("Discovery should be a list", cfg)
+ self.check_source_filtering(self.meters, 'meters')
+
+ def support_meter(self, meter_name):
+ return self.is_supported(self.meters, meter_name)
+
+
+class Sink(object):
+
+ def __init__(self, cfg, transformer_manager):
+ self.cfg = cfg
+
+ try:
+ self.name = cfg['name']
+ # It's legal to have no transformer specified
+ self.transformer_cfg = cfg.get('transformers') or []
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+
+ if not cfg.get('publishers'):
+ raise PipelineException("No publisher specified", cfg)
+
+
+
+class SampleSink(Sink):
+ def Testfun(self):
+ pass
+
+
+SAMPLE_TYPE = {'pipeline': SamplePipeline,
+ 'source': SampleSource,
+ 'sink': SampleSink}
+
+
+class PipelineManager(object):
+
+ def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
+ self.pipelines = []
+ if 'sources' in cfg or 'sinks' in cfg:
+ if not ('sources' in cfg and 'sinks' in cfg):
+ raise PipelineException("Both sources & sinks are required",
+ cfg)
+ #LOG.info(_('detected decoupled pipeline config format'))
+ logging.info("detected decoupled pipeline config format %s",cfg)
+ sources = [p_type['source'](s) for s in cfg.get('sources', [])]
+ sinks = {}
+ for s in cfg.get('sinks', []):
+ if s['name'] in sinks:
+ raise PipelineException("Duplicated sink names: %s" %
+ s['name'], self)
+ else:
+ sinks[s['name']] = p_type['sink'](s, transformer_manager)
+ for source in sources:
+ source.check_sinks(sinks)
+ for target in source.sinks:
+ pipe = p_type['pipeline'](source, sinks[target])
+ if pipe.name in [p.name for p in self.pipelines]:
+ raise PipelineException(
+ "Duplicate pipeline name: %s. Ensure pipeline"
+ " names are unique. (name is the source and sink"
+ " names combined)" % pipe.name, cfg)
+ else:
+ self.pipelines.append(pipe)
+ else:
+ #LOG.warning(_('detected deprecated pipeline config format'))
+ logging.warning("detected deprecated pipeline config format")
+ for pipedef in cfg:
+ source = p_type['source'](pipedef)
+ sink = p_type['sink'](pipedef, transformer_manager)
+ pipe = p_type['pipeline'](source, sink)
+ if pipe.name in [p.name for p in self.pipelines]:
+ raise PipelineException(
+ "Duplicate pipeline name: %s. Ensure pipeline"
+ " names are unique" % pipe.name, cfg)
+ else:
+ self.pipelines.append(pipe)
+
+
+def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
+ if not os.path.exists(cfg_file):
+ #cfg_file = cfg.CONF.find_file(cfg_file)
+ print "File doesn't exists"
+ return False
+
+ ##LOG.debug(_("Pipeline config file: %s"), cfg_file)
+ logging.debug("Pipeline config file: %s", cfg_file)
+
+ with open(cfg_file) as fap:
+ data = fap.read()
+
+ pipeline_cfg = yaml.safe_load(data)
+
+ ##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
+ logging.info("Pipeline config: %s", pipeline_cfg)
+ logging.debug("Pipeline config: %s", pipeline_cfg)
+
+ return PipelineManager(pipeline_cfg,
+ None, p_type)
+
+