blob: 17690f9b5ec53730d898e7efbb1c2adc7d4537b4 [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001import abc
2import six
3import yaml
4import os
5import logging
6
7class PipelineException(Exception):
8 def __init__(self, message, pipeline_cfg):
9 self.msg = message
10 self.pipeline_cfg = pipeline_cfg
11
12 def __str__(self):
13 return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
14
15
16class Source(object):
17 """Represents a source of samples or events."""
18
19 def __init__(self, cfg):
20 self.cfg = cfg
21
22 try:
23 self.name = cfg['name']
24 self.sinks = cfg.get('sinks')
25 except KeyError as err:
26 raise PipelineException(
27 "Required field %s not specified" % err.args[0], cfg)
28
29 def __str__(self):
30 return self.name
31
32 def check_sinks(self, sinks):
33 if not self.sinks:
34 raise PipelineException(
35 "No sink defined in source %s" % self,
36 self.cfg)
37 for sink in self.sinks:
38 if sink not in sinks:
39 raise PipelineException(
40 "Dangling sink %s from source %s" % (sink, self),
41 self.cfg)
42 def check_source_filtering(self, data, d_type):
43 """Source data rules checking
44
45 - At least one meaningful datapoint exist
46 - Included type and excluded type can't co-exist on the same pipeline
47 - Included type meter and wildcard can't co-exist at same pipeline
48 """
49 if not data:
50 raise PipelineException('No %s specified' % d_type, self.cfg)
51
52 if ([x for x in data if x[0] not in '!*'] and
53 [x for x in data if x[0] == '!']):
54 raise PipelineException(
55 'Both included and excluded %s specified' % d_type,
56 self.cfg)
57
58 if '*' in data and [x for x in data if x[0] not in '!*']:
59 raise PipelineException(
60 'Included %s specified with wildcard' % d_type,
61 self.cfg)
62
63 @staticmethod
64 def is_supported(dataset, data_name):
65 # Support wildcard like storage.* and !disk.*
66 # Start with negation, we consider that the order is deny, allow
67 if any(fnmatch.fnmatch(data_name, datapoint[1:])
68 for datapoint in dataset if datapoint[0] == '!'):
69 return False
70
71 if any(fnmatch.fnmatch(data_name, datapoint)
72 for datapoint in dataset if datapoint[0] != '!'):
73 return True
74
75 # if we only have negation, we suppose the default is allow
76 return all(datapoint.startswith('!') for datapoint in dataset)
77
78
79@six.add_metaclass(abc.ABCMeta)
80class Pipeline(object):
81 """Represents a coupling between a sink and a corresponding source."""
82
83 def __init__(self, source, sink):
84 self.source = source
85 self.sink = sink
86 self.name = str(self)
87
88 def __str__(self):
89 return (self.source.name if self.source.name == self.sink.name
90 else '%s:%s' % (self.source.name, self.sink.name))
91
92
93class SamplePipeline(Pipeline):
94 """Represents a pipeline for Samples."""
95
96 def get_interval(self):
97 return self.source.interval
98
99class SampleSource(Source):
100 """Represents a source of samples.
101
102 In effect it is a set of pollsters and/or notification handlers emitting
103 samples for a set of matching meters. Each source encapsulates meter name
104 matching, polling interval determination, optional resource enumeration or
105 discovery, and mapping to one or more sinks for publication.
106 """
107
108 def __init__(self, cfg):
109 super(SampleSource, self).__init__(cfg)
110 try:
111 try:
112 self.interval = int(cfg['interval'])
113 except ValueError:
114 raise PipelineException("Invalid interval value", cfg)
115 # Support 'counters' for backward compatibility
116 self.meters = cfg.get('meters', cfg.get('counters'))
117 except KeyError as err:
118 raise PipelineException(
119 "Required field %s not specified" % err.args[0], cfg)
120 if self.interval <= 0:
121 raise PipelineException("Interval value should > 0", cfg)
122
123 self.resources = cfg.get('resources') or []
124 if not isinstance(self.resources, list):
125 raise PipelineException("Resources should be a list", cfg)
126
127 self.discovery = cfg.get('discovery') or []
128 if not isinstance(self.discovery, list):
129 raise PipelineException("Discovery should be a list", cfg)
130 self.check_source_filtering(self.meters, 'meters')
131
132 def support_meter(self, meter_name):
133 return self.is_supported(self.meters, meter_name)
134
135
136class Sink(object):
137
138 def __init__(self, cfg, transformer_manager):
139 self.cfg = cfg
140
141 try:
142 self.name = cfg['name']
143 # It's legal to have no transformer specified
144 self.transformer_cfg = cfg.get('transformers') or []
145 except KeyError as err:
146 raise PipelineException(
147 "Required field %s not specified" % err.args[0], cfg)
148
149 if not cfg.get('publishers'):
150 raise PipelineException("No publisher specified", cfg)
151
152
153
154class SampleSink(Sink):
155 def Testfun(self):
156 pass
157
158
159SAMPLE_TYPE = {'pipeline': SamplePipeline,
160 'source': SampleSource,
161 'sink': SampleSink}
162
163
164class PipelineManager(object):
165
166 def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
167 self.pipelines = []
168 if 'sources' in cfg or 'sinks' in cfg:
169 if not ('sources' in cfg and 'sinks' in cfg):
170 raise PipelineException("Both sources & sinks are required",
171 cfg)
172 #LOG.info(_('detected decoupled pipeline config format'))
173 logging.info("detected decoupled pipeline config format %s",cfg)
174 sources = [p_type['source'](s) for s in cfg.get('sources', [])]
175 sinks = {}
176 for s in cfg.get('sinks', []):
177 if s['name'] in sinks:
178 raise PipelineException("Duplicated sink names: %s" %
179 s['name'], self)
180 else:
181 sinks[s['name']] = p_type['sink'](s, transformer_manager)
182 for source in sources:
183 source.check_sinks(sinks)
184 for target in source.sinks:
185 pipe = p_type['pipeline'](source, sinks[target])
186 if pipe.name in [p.name for p in self.pipelines]:
187 raise PipelineException(
188 "Duplicate pipeline name: %s. Ensure pipeline"
189 " names are unique. (name is the source and sink"
190 " names combined)" % pipe.name, cfg)
191 else:
192 self.pipelines.append(pipe)
193 else:
194 #LOG.warning(_('detected deprecated pipeline config format'))
195 logging.warning("detected deprecated pipeline config format")
196 for pipedef in cfg:
197 source = p_type['source'](pipedef)
198 sink = p_type['sink'](pipedef, transformer_manager)
199 pipe = p_type['pipeline'](source, sink)
200 if pipe.name in [p.name for p in self.pipelines]:
201 raise PipelineException(
202 "Duplicate pipeline name: %s. Ensure pipeline"
203 " names are unique" % pipe.name, cfg)
204 else:
205 self.pipelines.append(pipe)
206
207
208def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
209 if not os.path.exists(cfg_file):
210 #cfg_file = cfg.CONF.find_file(cfg_file)
211 print "File doesn't exists"
212 return False
213
214 ##LOG.debug(_("Pipeline config file: %s"), cfg_file)
215 logging.debug("Pipeline config file: %s", cfg_file)
216
217 with open(cfg_file) as fap:
218 data = fap.read()
219
220 pipeline_cfg = yaml.safe_load(data)
221
222 ##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
223 logging.info("Pipeline config: %s", pipeline_cfg)
224 logging.debug("Pipeline config: %s", pipeline_cfg)
225
226 return PipelineManager(pipeline_cfg,
227 None, p_type)
228
229