rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 1 | import abc |
| 2 | import six |
| 3 | import yaml |
| 4 | import os |
| 5 | import logging |
| 6 | |
| 7 | class PipelineException(Exception): |
| 8 | def __init__(self, message, pipeline_cfg): |
| 9 | self.msg = message |
| 10 | self.pipeline_cfg = pipeline_cfg |
| 11 | |
| 12 | def __str__(self): |
| 13 | return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg) |
| 14 | |
| 15 | |
| 16 | class Source(object): |
| 17 | """Represents a source of samples or events.""" |
| 18 | |
| 19 | def __init__(self, cfg): |
| 20 | self.cfg = cfg |
| 21 | |
| 22 | try: |
| 23 | self.name = cfg['name'] |
| 24 | self.sinks = cfg.get('sinks') |
| 25 | except KeyError as err: |
| 26 | raise PipelineException( |
| 27 | "Required field %s not specified" % err.args[0], cfg) |
| 28 | |
| 29 | def __str__(self): |
| 30 | return self.name |
| 31 | |
| 32 | def check_sinks(self, sinks): |
| 33 | if not self.sinks: |
| 34 | raise PipelineException( |
| 35 | "No sink defined in source %s" % self, |
| 36 | self.cfg) |
| 37 | for sink in self.sinks: |
| 38 | if sink not in sinks: |
| 39 | raise PipelineException( |
| 40 | "Dangling sink %s from source %s" % (sink, self), |
| 41 | self.cfg) |
| 42 | def check_source_filtering(self, data, d_type): |
| 43 | """Source data rules checking |
| 44 | |
| 45 | - At least one meaningful datapoint exist |
| 46 | - Included type and excluded type can't co-exist on the same pipeline |
| 47 | - Included type meter and wildcard can't co-exist at same pipeline |
| 48 | """ |
| 49 | if not data: |
| 50 | raise PipelineException('No %s specified' % d_type, self.cfg) |
| 51 | |
| 52 | if ([x for x in data if x[0] not in '!*'] and |
| 53 | [x for x in data if x[0] == '!']): |
| 54 | raise PipelineException( |
| 55 | 'Both included and excluded %s specified' % d_type, |
| 56 | self.cfg) |
| 57 | |
| 58 | if '*' in data and [x for x in data if x[0] not in '!*']: |
| 59 | raise PipelineException( |
| 60 | 'Included %s specified with wildcard' % d_type, |
| 61 | self.cfg) |
| 62 | |
| 63 | @staticmethod |
| 64 | def is_supported(dataset, data_name): |
| 65 | # Support wildcard like storage.* and !disk.* |
| 66 | # Start with negation, we consider that the order is deny, allow |
| 67 | if any(fnmatch.fnmatch(data_name, datapoint[1:]) |
| 68 | for datapoint in dataset if datapoint[0] == '!'): |
| 69 | return False |
| 70 | |
| 71 | if any(fnmatch.fnmatch(data_name, datapoint) |
| 72 | for datapoint in dataset if datapoint[0] != '!'): |
| 73 | return True |
| 74 | |
| 75 | # if we only have negation, we suppose the default is allow |
| 76 | return all(datapoint.startswith('!') for datapoint in dataset) |
| 77 | |
| 78 | |
| 79 | @six.add_metaclass(abc.ABCMeta) |
| 80 | class Pipeline(object): |
| 81 | """Represents a coupling between a sink and a corresponding source.""" |
| 82 | |
| 83 | def __init__(self, source, sink): |
| 84 | self.source = source |
| 85 | self.sink = sink |
| 86 | self.name = str(self) |
| 87 | |
| 88 | def __str__(self): |
| 89 | return (self.source.name if self.source.name == self.sink.name |
| 90 | else '%s:%s' % (self.source.name, self.sink.name)) |
| 91 | |
| 92 | |
| 93 | class SamplePipeline(Pipeline): |
| 94 | """Represents a pipeline for Samples.""" |
| 95 | |
| 96 | def get_interval(self): |
| 97 | return self.source.interval |
| 98 | |
| 99 | class SampleSource(Source): |
| 100 | """Represents a source of samples. |
| 101 | |
| 102 | In effect it is a set of pollsters and/or notification handlers emitting |
| 103 | samples for a set of matching meters. Each source encapsulates meter name |
| 104 | matching, polling interval determination, optional resource enumeration or |
| 105 | discovery, and mapping to one or more sinks for publication. |
| 106 | """ |
| 107 | |
| 108 | def __init__(self, cfg): |
| 109 | super(SampleSource, self).__init__(cfg) |
| 110 | try: |
| 111 | try: |
| 112 | self.interval = int(cfg['interval']) |
| 113 | except ValueError: |
| 114 | raise PipelineException("Invalid interval value", cfg) |
| 115 | # Support 'counters' for backward compatibility |
| 116 | self.meters = cfg.get('meters', cfg.get('counters')) |
| 117 | except KeyError as err: |
| 118 | raise PipelineException( |
| 119 | "Required field %s not specified" % err.args[0], cfg) |
| 120 | if self.interval <= 0: |
| 121 | raise PipelineException("Interval value should > 0", cfg) |
| 122 | |
| 123 | self.resources = cfg.get('resources') or [] |
| 124 | if not isinstance(self.resources, list): |
| 125 | raise PipelineException("Resources should be a list", cfg) |
| 126 | |
| 127 | self.discovery = cfg.get('discovery') or [] |
| 128 | if not isinstance(self.discovery, list): |
| 129 | raise PipelineException("Discovery should be a list", cfg) |
| 130 | self.check_source_filtering(self.meters, 'meters') |
| 131 | |
| 132 | def support_meter(self, meter_name): |
| 133 | return self.is_supported(self.meters, meter_name) |
| 134 | |
| 135 | |
| 136 | class Sink(object): |
| 137 | |
| 138 | def __init__(self, cfg, transformer_manager): |
| 139 | self.cfg = cfg |
| 140 | |
| 141 | try: |
| 142 | self.name = cfg['name'] |
| 143 | # It's legal to have no transformer specified |
| 144 | self.transformer_cfg = cfg.get('transformers') or [] |
| 145 | except KeyError as err: |
| 146 | raise PipelineException( |
| 147 | "Required field %s not specified" % err.args[0], cfg) |
| 148 | |
| 149 | if not cfg.get('publishers'): |
| 150 | raise PipelineException("No publisher specified", cfg) |
| 151 | |
| 152 | |
| 153 | |
| 154 | class SampleSink(Sink): |
| 155 | def Testfun(self): |
| 156 | pass |
| 157 | |
| 158 | |
| 159 | SAMPLE_TYPE = {'pipeline': SamplePipeline, |
| 160 | 'source': SampleSource, |
| 161 | 'sink': SampleSink} |
| 162 | |
| 163 | |
| 164 | class PipelineManager(object): |
| 165 | |
| 166 | def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE): |
| 167 | self.pipelines = [] |
| 168 | if 'sources' in cfg or 'sinks' in cfg: |
| 169 | if not ('sources' in cfg and 'sinks' in cfg): |
| 170 | raise PipelineException("Both sources & sinks are required", |
| 171 | cfg) |
| 172 | #LOG.info(_('detected decoupled pipeline config format')) |
| 173 | logging.info("detected decoupled pipeline config format %s",cfg) |
| 174 | sources = [p_type['source'](s) for s in cfg.get('sources', [])] |
| 175 | sinks = {} |
| 176 | for s in cfg.get('sinks', []): |
| 177 | if s['name'] in sinks: |
| 178 | raise PipelineException("Duplicated sink names: %s" % |
| 179 | s['name'], self) |
| 180 | else: |
| 181 | sinks[s['name']] = p_type['sink'](s, transformer_manager) |
| 182 | for source in sources: |
| 183 | source.check_sinks(sinks) |
| 184 | for target in source.sinks: |
| 185 | pipe = p_type['pipeline'](source, sinks[target]) |
| 186 | if pipe.name in [p.name for p in self.pipelines]: |
| 187 | raise PipelineException( |
| 188 | "Duplicate pipeline name: %s. Ensure pipeline" |
| 189 | " names are unique. (name is the source and sink" |
| 190 | " names combined)" % pipe.name, cfg) |
| 191 | else: |
| 192 | self.pipelines.append(pipe) |
| 193 | else: |
| 194 | #LOG.warning(_('detected deprecated pipeline config format')) |
| 195 | logging.warning("detected deprecated pipeline config format") |
| 196 | for pipedef in cfg: |
| 197 | source = p_type['source'](pipedef) |
| 198 | sink = p_type['sink'](pipedef, transformer_manager) |
| 199 | pipe = p_type['pipeline'](source, sink) |
| 200 | if pipe.name in [p.name for p in self.pipelines]: |
| 201 | raise PipelineException( |
| 202 | "Duplicate pipeline name: %s. Ensure pipeline" |
| 203 | " names are unique" % pipe.name, cfg) |
| 204 | else: |
| 205 | self.pipelines.append(pipe) |
| 206 | |
| 207 | |
| 208 | def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE): |
| 209 | if not os.path.exists(cfg_file): |
| 210 | #cfg_file = cfg.CONF.find_file(cfg_file) |
| 211 | print "File doesn't exists" |
| 212 | return False |
| 213 | |
| 214 | ##LOG.debug(_("Pipeline config file: %s"), cfg_file) |
| 215 | logging.debug("Pipeline config file: %s", cfg_file) |
| 216 | |
| 217 | with open(cfg_file) as fap: |
| 218 | data = fap.read() |
| 219 | |
| 220 | pipeline_cfg = yaml.safe_load(data) |
| 221 | |
| 222 | ##LOG.info(_("Pipeline config: %s"), pipeline_cfg) |
| 223 | logging.info("Pipeline config: %s", pipeline_cfg) |
| 224 | logging.debug("Pipeline config: %s", pipeline_cfg) |
| 225 | |
| 226 | return PipelineManager(pipeline_cfg, |
| 227 | None, p_type) |
| 228 | |
| 229 | |