blob: 5783a066b9ff6597f4d6428e49f648b75db32607 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
rdudyalab086cf32016-08-11 00:07:45 -040017import abc
18import six
19import yaml
20import os
21import logging
22
23class PipelineException(Exception):
24 def __init__(self, message, pipeline_cfg):
25 self.msg = message
26 self.pipeline_cfg = pipeline_cfg
27
28 def __str__(self):
29 return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
30
31
32class Source(object):
33 """Represents a source of samples or events."""
34
35 def __init__(self, cfg):
36 self.cfg = cfg
37
38 try:
39 self.name = cfg['name']
40 self.sinks = cfg.get('sinks')
41 except KeyError as err:
42 raise PipelineException(
43 "Required field %s not specified" % err.args[0], cfg)
44
45 def __str__(self):
46 return self.name
47
48 def check_sinks(self, sinks):
49 if not self.sinks:
50 raise PipelineException(
51 "No sink defined in source %s" % self,
52 self.cfg)
53 for sink in self.sinks:
54 if sink not in sinks:
55 raise PipelineException(
56 "Dangling sink %s from source %s" % (sink, self),
57 self.cfg)
58 def check_source_filtering(self, data, d_type):
59 """Source data rules checking
60
61 - At least one meaningful datapoint exist
62 - Included type and excluded type can't co-exist on the same pipeline
63 - Included type meter and wildcard can't co-exist at same pipeline
64 """
65 if not data:
66 raise PipelineException('No %s specified' % d_type, self.cfg)
67
68 if ([x for x in data if x[0] not in '!*'] and
69 [x for x in data if x[0] == '!']):
70 raise PipelineException(
71 'Both included and excluded %s specified' % d_type,
72 self.cfg)
73
74 if '*' in data and [x for x in data if x[0] not in '!*']:
75 raise PipelineException(
76 'Included %s specified with wildcard' % d_type,
77 self.cfg)
78
79 @staticmethod
80 def is_supported(dataset, data_name):
81 # Support wildcard like storage.* and !disk.*
82 # Start with negation, we consider that the order is deny, allow
83 if any(fnmatch.fnmatch(data_name, datapoint[1:])
84 for datapoint in dataset if datapoint[0] == '!'):
85 return False
86
87 if any(fnmatch.fnmatch(data_name, datapoint)
88 for datapoint in dataset if datapoint[0] != '!'):
89 return True
90
91 # if we only have negation, we suppose the default is allow
92 return all(datapoint.startswith('!') for datapoint in dataset)
93
94
95@six.add_metaclass(abc.ABCMeta)
96class Pipeline(object):
97 """Represents a coupling between a sink and a corresponding source."""
98
99 def __init__(self, source, sink):
100 self.source = source
101 self.sink = sink
102 self.name = str(self)
103
104 def __str__(self):
105 return (self.source.name if self.source.name == self.sink.name
106 else '%s:%s' % (self.source.name, self.sink.name))
107
108
109class SamplePipeline(Pipeline):
110 """Represents a pipeline for Samples."""
111
112 def get_interval(self):
113 return self.source.interval
114
115class SampleSource(Source):
116 """Represents a source of samples.
117
118 In effect it is a set of pollsters and/or notification handlers emitting
119 samples for a set of matching meters. Each source encapsulates meter name
120 matching, polling interval determination, optional resource enumeration or
121 discovery, and mapping to one or more sinks for publication.
122 """
123
124 def __init__(self, cfg):
125 super(SampleSource, self).__init__(cfg)
126 try:
127 try:
128 self.interval = int(cfg['interval'])
129 except ValueError:
130 raise PipelineException("Invalid interval value", cfg)
131 # Support 'counters' for backward compatibility
132 self.meters = cfg.get('meters', cfg.get('counters'))
133 except KeyError as err:
134 raise PipelineException(
135 "Required field %s not specified" % err.args[0], cfg)
136 if self.interval <= 0:
137 raise PipelineException("Interval value should > 0", cfg)
138
139 self.resources = cfg.get('resources') or []
140 if not isinstance(self.resources, list):
141 raise PipelineException("Resources should be a list", cfg)
142
143 self.discovery = cfg.get('discovery') or []
144 if not isinstance(self.discovery, list):
145 raise PipelineException("Discovery should be a list", cfg)
146 self.check_source_filtering(self.meters, 'meters')
147
148 def support_meter(self, meter_name):
149 return self.is_supported(self.meters, meter_name)
150
151
152class Sink(object):
153
154 def __init__(self, cfg, transformer_manager):
155 self.cfg = cfg
156
157 try:
158 self.name = cfg['name']
159 # It's legal to have no transformer specified
160 self.transformer_cfg = cfg.get('transformers') or []
161 except KeyError as err:
162 raise PipelineException(
163 "Required field %s not specified" % err.args[0], cfg)
164
165 if not cfg.get('publishers'):
166 raise PipelineException("No publisher specified", cfg)
167
168
169
170class SampleSink(Sink):
171 def Testfun(self):
172 pass
173
174
175SAMPLE_TYPE = {'pipeline': SamplePipeline,
176 'source': SampleSource,
177 'sink': SampleSink}
178
179
180class PipelineManager(object):
181
182 def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
183 self.pipelines = []
184 if 'sources' in cfg or 'sinks' in cfg:
185 if not ('sources' in cfg and 'sinks' in cfg):
186 raise PipelineException("Both sources & sinks are required",
187 cfg)
188 #LOG.info(_('detected decoupled pipeline config format'))
189 logging.info("detected decoupled pipeline config format %s",cfg)
190 sources = [p_type['source'](s) for s in cfg.get('sources', [])]
191 sinks = {}
192 for s in cfg.get('sinks', []):
193 if s['name'] in sinks:
194 raise PipelineException("Duplicated sink names: %s" %
195 s['name'], self)
196 else:
197 sinks[s['name']] = p_type['sink'](s, transformer_manager)
198 for source in sources:
199 source.check_sinks(sinks)
200 for target in source.sinks:
201 pipe = p_type['pipeline'](source, sinks[target])
202 if pipe.name in [p.name for p in self.pipelines]:
203 raise PipelineException(
204 "Duplicate pipeline name: %s. Ensure pipeline"
205 " names are unique. (name is the source and sink"
206 " names combined)" % pipe.name, cfg)
207 else:
208 self.pipelines.append(pipe)
209 else:
210 #LOG.warning(_('detected deprecated pipeline config format'))
211 logging.warning("detected deprecated pipeline config format")
212 for pipedef in cfg:
213 source = p_type['source'](pipedef)
214 sink = p_type['sink'](pipedef, transformer_manager)
215 pipe = p_type['pipeline'](source, sink)
216 if pipe.name in [p.name for p in self.pipelines]:
217 raise PipelineException(
218 "Duplicate pipeline name: %s. Ensure pipeline"
219 " names are unique" % pipe.name, cfg)
220 else:
221 self.pipelines.append(pipe)
222
223
224def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
225 if not os.path.exists(cfg_file):
226 #cfg_file = cfg.CONF.find_file(cfg_file)
227 print "File doesn't exists"
228 return False
229
230 ##LOG.debug(_("Pipeline config file: %s"), cfg_file)
231 logging.debug("Pipeline config file: %s", cfg_file)
232
233 with open(cfg_file) as fap:
234 data = fap.read()
235
236 pipeline_cfg = yaml.safe_load(data)
237
238 ##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
239 logging.info("Pipeline config: %s", pipeline_cfg)
240 logging.debug("Pipeline config: %s", pipeline_cfg)
241
242 return PipelineManager(pipeline_cfg,
243 None, p_type)
244
245