blob: 5783a066b9ff6597f4d6428e49f648b75db32607 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import abc
import six
import yaml
import os
import logging
class PipelineException(Exception):
def __init__(self, message, pipeline_cfg):
self.msg = message
self.pipeline_cfg = pipeline_cfg
def __str__(self):
return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
class Source(object):
"""Represents a source of samples or events."""
def __init__(self, cfg):
self.cfg = cfg
try:
self.name = cfg['name']
self.sinks = cfg.get('sinks')
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
def __str__(self):
return self.name
def check_sinks(self, sinks):
if not self.sinks:
raise PipelineException(
"No sink defined in source %s" % self,
self.cfg)
for sink in self.sinks:
if sink not in sinks:
raise PipelineException(
"Dangling sink %s from source %s" % (sink, self),
self.cfg)
def check_source_filtering(self, data, d_type):
"""Source data rules checking
- At least one meaningful datapoint exist
- Included type and excluded type can't co-exist on the same pipeline
- Included type meter and wildcard can't co-exist at same pipeline
"""
if not data:
raise PipelineException('No %s specified' % d_type, self.cfg)
if ([x for x in data if x[0] not in '!*'] and
[x for x in data if x[0] == '!']):
raise PipelineException(
'Both included and excluded %s specified' % d_type,
self.cfg)
if '*' in data and [x for x in data if x[0] not in '!*']:
raise PipelineException(
'Included %s specified with wildcard' % d_type,
self.cfg)
@staticmethod
def is_supported(dataset, data_name):
# Support wildcard like storage.* and !disk.*
# Start with negation, we consider that the order is deny, allow
if any(fnmatch.fnmatch(data_name, datapoint[1:])
for datapoint in dataset if datapoint[0] == '!'):
return False
if any(fnmatch.fnmatch(data_name, datapoint)
for datapoint in dataset if datapoint[0] != '!'):
return True
# if we only have negation, we suppose the default is allow
return all(datapoint.startswith('!') for datapoint in dataset)
@six.add_metaclass(abc.ABCMeta)
class Pipeline(object):
"""Represents a coupling between a sink and a corresponding source."""
def __init__(self, source, sink):
self.source = source
self.sink = sink
self.name = str(self)
def __str__(self):
return (self.source.name if self.source.name == self.sink.name
else '%s:%s' % (self.source.name, self.sink.name))
class SamplePipeline(Pipeline):
"""Represents a pipeline for Samples."""
def get_interval(self):
return self.source.interval
class SampleSource(Source):
"""Represents a source of samples.
In effect it is a set of pollsters and/or notification handlers emitting
samples for a set of matching meters. Each source encapsulates meter name
matching, polling interval determination, optional resource enumeration or
discovery, and mapping to one or more sinks for publication.
"""
def __init__(self, cfg):
super(SampleSource, self).__init__(cfg)
try:
try:
self.interval = int(cfg['interval'])
except ValueError:
raise PipelineException("Invalid interval value", cfg)
# Support 'counters' for backward compatibility
self.meters = cfg.get('meters', cfg.get('counters'))
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
if self.interval <= 0:
raise PipelineException("Interval value should > 0", cfg)
self.resources = cfg.get('resources') or []
if not isinstance(self.resources, list):
raise PipelineException("Resources should be a list", cfg)
self.discovery = cfg.get('discovery') or []
if not isinstance(self.discovery, list):
raise PipelineException("Discovery should be a list", cfg)
self.check_source_filtering(self.meters, 'meters')
def support_meter(self, meter_name):
return self.is_supported(self.meters, meter_name)
class Sink(object):
def __init__(self, cfg, transformer_manager):
self.cfg = cfg
try:
self.name = cfg['name']
# It's legal to have no transformer specified
self.transformer_cfg = cfg.get('transformers') or []
except KeyError as err:
raise PipelineException(
"Required field %s not specified" % err.args[0], cfg)
if not cfg.get('publishers'):
raise PipelineException("No publisher specified", cfg)
class SampleSink(Sink):
def Testfun(self):
pass
SAMPLE_TYPE = {'pipeline': SamplePipeline,
'source': SampleSource,
'sink': SampleSink}
class PipelineManager(object):
def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
self.pipelines = []
if 'sources' in cfg or 'sinks' in cfg:
if not ('sources' in cfg and 'sinks' in cfg):
raise PipelineException("Both sources & sinks are required",
cfg)
#LOG.info(_('detected decoupled pipeline config format'))
logging.info("detected decoupled pipeline config format %s",cfg)
sources = [p_type['source'](s) for s in cfg.get('sources', [])]
sinks = {}
for s in cfg.get('sinks', []):
if s['name'] in sinks:
raise PipelineException("Duplicated sink names: %s" %
s['name'], self)
else:
sinks[s['name']] = p_type['sink'](s, transformer_manager)
for source in sources:
source.check_sinks(sinks)
for target in source.sinks:
pipe = p_type['pipeline'](source, sinks[target])
if pipe.name in [p.name for p in self.pipelines]:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique. (name is the source and sink"
" names combined)" % pipe.name, cfg)
else:
self.pipelines.append(pipe)
else:
#LOG.warning(_('detected deprecated pipeline config format'))
logging.warning("detected deprecated pipeline config format")
for pipedef in cfg:
source = p_type['source'](pipedef)
sink = p_type['sink'](pipedef, transformer_manager)
pipe = p_type['pipeline'](source, sink)
if pipe.name in [p.name for p in self.pipelines]:
raise PipelineException(
"Duplicate pipeline name: %s. Ensure pipeline"
" names are unique" % pipe.name, cfg)
else:
self.pipelines.append(pipe)
def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
if not os.path.exists(cfg_file):
#cfg_file = cfg.CONF.find_file(cfg_file)
print "File doesn't exists"
return False
##LOG.debug(_("Pipeline config file: %s"), cfg_file)
logging.debug("Pipeline config file: %s", cfg_file)
with open(cfg_file) as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
logging.info("Pipeline config: %s", pipeline_cfg)
logging.debug("Pipeline config: %s", pipeline_cfg)
return PipelineManager(pipeline_cfg,
None, p_type)