Matteo Scandolo | eb0d11c | 2017-08-08 13:05:26 -0700 | [diff] [blame] | 1 | |
| 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 17 | import abc |
| 18 | import six |
| 19 | import yaml |
| 20 | import os |
| 21 | import logging |
| 22 | |
| 23 | class PipelineException(Exception): |
| 24 | def __init__(self, message, pipeline_cfg): |
| 25 | self.msg = message |
| 26 | self.pipeline_cfg = pipeline_cfg |
| 27 | |
| 28 | def __str__(self): |
| 29 | return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) |
| 30 | |
| 31 | |
| 32 | class Source(object): |
| 33 | """Represents a source of samples or events.""" |
| 34 | |
| 35 | def __init__(self, cfg): |
| 36 | self.cfg = cfg |
| 37 | |
| 38 | try: |
| 39 | self.name = cfg['name'] |
| 40 | self.sinks = cfg.get('sinks') |
| 41 | except KeyError as err: |
| 42 | raise PipelineException( |
| 43 | "Required field %s not specified" % err.args[0], cfg) |
| 44 | |
| 45 | def __str__(self): |
| 46 | return self.name |
| 47 | |
| 48 | def check_sinks(self, sinks): |
| 49 | if not self.sinks: |
| 50 | raise PipelineException( |
| 51 | "No sink defined in source %s" % self, |
| 52 | self.cfg) |
| 53 | for sink in self.sinks: |
| 54 | if sink not in sinks: |
| 55 | raise PipelineException( |
| 56 | "Dangling sink %s from source %s" % (sink, self), |
| 57 | self.cfg) |
| 58 | def check_source_filtering(self, data, d_type): |
| 59 | """Source data rules checking |
| 60 | |
| 61 | - At least one meaningful datapoint exist |
| 62 | - Included type and excluded type can't co-exist on the same pipeline |
| 63 | - Included type meter and wildcard can't co-exist at same pipeline |
| 64 | """ |
| 65 | if not data: |
| 66 | raise PipelineException('No %s specified' % d_type, self.cfg) |
| 67 | |
| 68 | if ([x for x in data if x[0] not in '!*'] and |
| 69 | [x for x in data if x[0] == '!']): |
| 70 | raise PipelineException( |
| 71 | 'Both included and excluded %s specified' % d_type, |
| 72 | self.cfg) |
| 73 | |
| 74 | if '*' in data and [x for x in data if x[0] not in '!*']: |
| 75 | raise PipelineException( |
| 76 | 'Included %s specified with wildcard' % d_type, |
| 77 | self.cfg) |
| 78 | |
| 79 | @staticmethod |
| 80 | def is_supported(dataset, data_name): |
| 81 | # Support wildcard like storage.* and !disk.* |
| 82 | # Start with negation, we consider that the order is deny, allow |
| 83 | if any(fnmatch.fnmatch(data_name, datapoint[1:]) |
| 84 | for datapoint in dataset if datapoint[0] == '!'): |
| 85 | return False |
| 86 | |
| 87 | if any(fnmatch.fnmatch(data_name, datapoint) |
| 88 | for datapoint in dataset if datapoint[0] != '!'): |
| 89 | return True |
| 90 | |
| 91 | # if we only have negation, we suppose the default is allow |
| 92 | return all(datapoint.startswith('!') for datapoint in dataset) |
| 93 | |
| 94 | |
| 95 | @six.add_metaclass(abc.ABCMeta) |
| 96 | class Pipeline(object): |
| 97 | """Represents a coupling between a sink and a corresponding source.""" |
| 98 | |
| 99 | def __init__(self, source, sink): |
| 100 | self.source = source |
| 101 | self.sink = sink |
| 102 | self.name = str(self) |
| 103 | |
| 104 | def __str__(self): |
| 105 | return (self.source.name if self.source.name == self.sink.name |
| 106 | else '%s:%s' % (self.source.name, self.sink.name)) |
| 107 | |
| 108 | |
| 109 | class SamplePipeline(Pipeline): |
| 110 | """Represents a pipeline for Samples.""" |
| 111 | |
| 112 | def get_interval(self): |
| 113 | return self.source.interval |
| 114 | |
| 115 | class SampleSource(Source): |
| 116 | """Represents a source of samples. |
| 117 | |
| 118 | In effect it is a set of pollsters and/or notification handlers emitting |
| 119 | samples for a set of matching meters. Each source encapsulates meter name |
| 120 | matching, polling interval determination, optional resource enumeration or |
| 121 | discovery, and mapping to one or more sinks for publication. |
| 122 | """ |
| 123 | |
| 124 | def __init__(self, cfg): |
| 125 | super(SampleSource, self).__init__(cfg) |
| 126 | try: |
| 127 | try: |
| 128 | self.interval = int(cfg['interval']) |
| 129 | except ValueError: |
| 130 | raise PipelineException("Invalid interval value", cfg) |
| 131 | # Support 'counters' for backward compatibility |
| 132 | self.meters = cfg.get('meters', cfg.get('counters')) |
| 133 | except KeyError as err: |
| 134 | raise PipelineException( |
| 135 | "Required field %s not specified" % err.args[0], cfg) |
| 136 | if self.interval <= 0: |
| 137 | raise PipelineException("Interval value should > 0", cfg) |
| 138 | |
| 139 | self.resources = cfg.get('resources') or [] |
| 140 | if not isinstance(self.resources, list): |
| 141 | raise PipelineException("Resources should be a list", cfg) |
| 142 | |
| 143 | self.discovery = cfg.get('discovery') or [] |
| 144 | if not isinstance(self.discovery, list): |
| 145 | raise PipelineException("Discovery should be a list", cfg) |
| 146 | self.check_source_filtering(self.meters, 'meters') |
| 147 | |
| 148 | def support_meter(self, meter_name): |
| 149 | return self.is_supported(self.meters, meter_name) |
| 150 | |
| 151 | |
| 152 | class Sink(object): |
| 153 | |
| 154 | def __init__(self, cfg, transformer_manager): |
| 155 | self.cfg = cfg |
| 156 | |
| 157 | try: |
| 158 | self.name = cfg['name'] |
| 159 | # It's legal to have no transformer specified |
| 160 | self.transformer_cfg = cfg.get('transformers') or [] |
| 161 | except KeyError as err: |
| 162 | raise PipelineException( |
| 163 | "Required field %s not specified" % err.args[0], cfg) |
| 164 | |
| 165 | if not cfg.get('publishers'): |
| 166 | raise PipelineException("No publisher specified", cfg) |
| 167 | |
| 168 | |
| 169 | |
| 170 | class SampleSink(Sink): |
| 171 | def Testfun(self): |
| 172 | pass |
| 173 | |
| 174 | |
| 175 | SAMPLE_TYPE = {'pipeline': SamplePipeline, |
| 176 | 'source': SampleSource, |
| 177 | 'sink': SampleSink} |
| 178 | |
| 179 | |
| 180 | class PipelineManager(object): |
| 181 | |
| 182 | def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE): |
| 183 | self.pipelines = [] |
| 184 | if 'sources' in cfg or 'sinks' in cfg: |
| 185 | if not ('sources' in cfg and 'sinks' in cfg): |
| 186 | raise PipelineException("Both sources & sinks are required", |
| 187 | cfg) |
| 188 | #LOG.info(_('detected decoupled pipeline config format')) |
| 189 | logging.info("detected decoupled pipeline config format %s",cfg) |
| 190 | sources = [p_type['source'](s) for s in cfg.get('sources', [])] |
| 191 | sinks = {} |
| 192 | for s in cfg.get('sinks', []): |
| 193 | if s['name'] in sinks: |
| 194 | raise PipelineException("Duplicated sink names: %s" % |
| 195 | s['name'], self) |
| 196 | else: |
| 197 | sinks[s['name']] = p_type['sink'](s, transformer_manager) |
| 198 | for source in sources: |
| 199 | source.check_sinks(sinks) |
| 200 | for target in source.sinks: |
| 201 | pipe = p_type['pipeline'](source, sinks[target]) |
| 202 | if pipe.name in [p.name for p in self.pipelines]: |
| 203 | raise PipelineException( |
| 204 | "Duplicate pipeline name: %s. Ensure pipeline" |
| 205 | " names are unique. (name is the source and sink" |
| 206 | " names combined)" % pipe.name, cfg) |
| 207 | else: |
| 208 | self.pipelines.append(pipe) |
| 209 | else: |
| 210 | #LOG.warning(_('detected deprecated pipeline config format')) |
| 211 | logging.warning("detected deprecated pipeline config format") |
| 212 | for pipedef in cfg: |
| 213 | source = p_type['source'](pipedef) |
| 214 | sink = p_type['sink'](pipedef, transformer_manager) |
| 215 | pipe = p_type['pipeline'](source, sink) |
| 216 | if pipe.name in [p.name for p in self.pipelines]: |
| 217 | raise PipelineException( |
| 218 | "Duplicate pipeline name: %s. Ensure pipeline" |
| 219 | " names are unique" % pipe.name, cfg) |
| 220 | else: |
| 221 | self.pipelines.append(pipe) |
| 222 | |
| 223 | |
| 224 | def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE): |
| 225 | if not os.path.exists(cfg_file): |
| 226 | #cfg_file = cfg.CONF.find_file(cfg_file) |
| 227 | print "File doesn't exists" |
| 228 | return False |
| 229 | |
| 230 | ##LOG.debug(_("Pipeline config file: %s"), cfg_file) |
| 231 | logging.debug("Pipeline config file: %s", cfg_file) |
| 232 | |
| 233 | with open(cfg_file) as fap: |
| 234 | data = fap.read() |
| 235 | |
| 236 | pipeline_cfg = yaml.safe_load(data) |
| 237 | |
| 238 | ##LOG.info(_("Pipeline config: %s"), pipeline_cfg) |
| 239 | logging.info("Pipeline config: %s", pipeline_cfg) |
| 240 | logging.debug("Pipeline config: %s", pipeline_cfg) |
| 241 | |
| 242 | return PipelineManager(pipeline_cfg, |
| 243 | None, p_type) |
| 244 | |
| 245 | |