Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/README b/xos/synchronizer/ceilometer/pipeline_agent_module/README
new file mode 100644
index 0000000..a37ccc3
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/README
@@ -0,0 +1,41 @@
+Dynamic Pipeline-Agent Module:
+1.Packages :
+ pika
+ yaml
+ subprocess
+ logging
+ operator
+ json
+ ConfigParser
+
+ package can be installed using the command:
+ -> pip install pika
+ Remaing packages will come by default with OS package or can be installed
+ using command
+ -> sudo apt-get <package-name>
+
+2.Files:
+a. utils.py: Consists of utility function for parsing and updating pipeline.yaml
+b. pipeline.yaml:Sample pipeline.yaml file with minimum source and sink information tags,
+c. pipeline.py: Does validation of pipeline.yaml configuration.
+d. pipeline_agent.py: Main file of the module while will listen on Rabbitmq exchange "pubsub"
+e. pipeline_agent.conf : Conf file should consist of the following information:
+ i.Rabbitmq server datails(host,port,username,passwd)
+ ii.LOGGING info(logging level,file name)
+ iii.Ceilometer services needed to be restarted after pipeline.yaml changes.
+f. README
+
+3.To run the module:
+ ->sudo python pipeline_agent.py
+
+4.Format to send conf msg to the module:
+ i.For updating conf :
+ -> msg={"sub_info":sub_info,"target":target,"action":"ADD"}
+ ii.for deleting conf :
+ -> msg={"sub_info":sub_info,"target":target,"action":"DEL"}
+
+ The above two msgs should be in json fomrat and should send to same rabbitmq-server where pipeline_agent.py is running
+ with "pubsub" exchage.
+ ex:
+ sub_info = ["cpu_util", "memory"]
+ target = "kafka://1.2.3.2:18"
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
new file mode 100644
index 0000000..17690f9
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.py
@@ -0,0 +1,229 @@
+import abc
+import six
+import yaml
+import os
+import logging
+
+class PipelineException(Exception):
+ def __init__(self, message, pipeline_cfg):
+ self.msg = message
+ self.pipeline_cfg = pipeline_cfg
+
+ def __str__(self):
+ return 'Pipeline %s: %s' % (self.pipeline_cfg, self.msg)
+
+
+class Source(object):
+ """Represents a source of samples or events."""
+
+ def __init__(self, cfg):
+ self.cfg = cfg
+
+ try:
+ self.name = cfg['name']
+ self.sinks = cfg.get('sinks')
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+
+ def __str__(self):
+ return self.name
+
+ def check_sinks(self, sinks):
+ if not self.sinks:
+ raise PipelineException(
+ "No sink defined in source %s" % self,
+ self.cfg)
+ for sink in self.sinks:
+ if sink not in sinks:
+ raise PipelineException(
+ "Dangling sink %s from source %s" % (sink, self),
+ self.cfg)
+ def check_source_filtering(self, data, d_type):
+ """Source data rules checking
+
+ - At least one meaningful datapoint exist
+ - Included type and excluded type can't co-exist on the same pipeline
+ - Included type meter and wildcard can't co-exist at same pipeline
+ """
+ if not data:
+ raise PipelineException('No %s specified' % d_type, self.cfg)
+
+ if ([x for x in data if x[0] not in '!*'] and
+ [x for x in data if x[0] == '!']):
+ raise PipelineException(
+ 'Both included and excluded %s specified' % d_type,
+ self.cfg)
+
+ if '*' in data and [x for x in data if x[0] not in '!*']:
+ raise PipelineException(
+ 'Included %s specified with wildcard' % d_type,
+ self.cfg)
+
+ @staticmethod
+ def is_supported(dataset, data_name):
+ # Support wildcard like storage.* and !disk.*
+ # Start with negation, we consider that the order is deny, allow
+ if any(fnmatch.fnmatch(data_name, datapoint[1:])
+ for datapoint in dataset if datapoint[0] == '!'):
+ return False
+
+ if any(fnmatch.fnmatch(data_name, datapoint)
+ for datapoint in dataset if datapoint[0] != '!'):
+ return True
+
+ # if we only have negation, we suppose the default is allow
+ return all(datapoint.startswith('!') for datapoint in dataset)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Pipeline(object):
+ """Represents a coupling between a sink and a corresponding source."""
+
+ def __init__(self, source, sink):
+ self.source = source
+ self.sink = sink
+ self.name = str(self)
+
+ def __str__(self):
+ return (self.source.name if self.source.name == self.sink.name
+ else '%s:%s' % (self.source.name, self.sink.name))
+
+
+class SamplePipeline(Pipeline):
+ """Represents a pipeline for Samples."""
+
+ def get_interval(self):
+ return self.source.interval
+
+class SampleSource(Source):
+ """Represents a source of samples.
+
+ In effect it is a set of pollsters and/or notification handlers emitting
+ samples for a set of matching meters. Each source encapsulates meter name
+ matching, polling interval determination, optional resource enumeration or
+ discovery, and mapping to one or more sinks for publication.
+ """
+
+ def __init__(self, cfg):
+ super(SampleSource, self).__init__(cfg)
+ try:
+ try:
+ self.interval = int(cfg['interval'])
+ except ValueError:
+ raise PipelineException("Invalid interval value", cfg)
+ # Support 'counters' for backward compatibility
+ self.meters = cfg.get('meters', cfg.get('counters'))
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+ if self.interval <= 0:
+ raise PipelineException("Interval value should > 0", cfg)
+
+ self.resources = cfg.get('resources') or []
+ if not isinstance(self.resources, list):
+ raise PipelineException("Resources should be a list", cfg)
+
+ self.discovery = cfg.get('discovery') or []
+ if not isinstance(self.discovery, list):
+ raise PipelineException("Discovery should be a list", cfg)
+ self.check_source_filtering(self.meters, 'meters')
+
+ def support_meter(self, meter_name):
+ return self.is_supported(self.meters, meter_name)
+
+
+class Sink(object):
+
+ def __init__(self, cfg, transformer_manager):
+ self.cfg = cfg
+
+ try:
+ self.name = cfg['name']
+ # It's legal to have no transformer specified
+ self.transformer_cfg = cfg.get('transformers') or []
+ except KeyError as err:
+ raise PipelineException(
+ "Required field %s not specified" % err.args[0], cfg)
+
+ if not cfg.get('publishers'):
+ raise PipelineException("No publisher specified", cfg)
+
+
+
+class SampleSink(Sink):
+ def Testfun(self):
+ pass
+
+
+SAMPLE_TYPE = {'pipeline': SamplePipeline,
+ 'source': SampleSource,
+ 'sink': SampleSink}
+
+
+class PipelineManager(object):
+
+ def __init__(self, cfg, transformer_manager, p_type=SAMPLE_TYPE):
+ self.pipelines = []
+ if 'sources' in cfg or 'sinks' in cfg:
+ if not ('sources' in cfg and 'sinks' in cfg):
+ raise PipelineException("Both sources & sinks are required",
+ cfg)
+ #LOG.info(_('detected decoupled pipeline config format'))
+ logging.info("detected decoupled pipeline config format %s",cfg)
+ sources = [p_type['source'](s) for s in cfg.get('sources', [])]
+ sinks = {}
+ for s in cfg.get('sinks', []):
+ if s['name'] in sinks:
+ raise PipelineException("Duplicated sink names: %s" %
+ s['name'], self)
+ else:
+ sinks[s['name']] = p_type['sink'](s, transformer_manager)
+ for source in sources:
+ source.check_sinks(sinks)
+ for target in source.sinks:
+ pipe = p_type['pipeline'](source, sinks[target])
+ if pipe.name in [p.name for p in self.pipelines]:
+ raise PipelineException(
+ "Duplicate pipeline name: %s. Ensure pipeline"
+ " names are unique. (name is the source and sink"
+ " names combined)" % pipe.name, cfg)
+ else:
+ self.pipelines.append(pipe)
+ else:
+ #LOG.warning(_('detected deprecated pipeline config format'))
+ logging.warning("detected deprecated pipeline config format")
+ for pipedef in cfg:
+ source = p_type['source'](pipedef)
+ sink = p_type['sink'](pipedef, transformer_manager)
+ pipe = p_type['pipeline'](source, sink)
+ if pipe.name in [p.name for p in self.pipelines]:
+ raise PipelineException(
+ "Duplicate pipeline name: %s. Ensure pipeline"
+ " names are unique" % pipe.name, cfg)
+ else:
+ self.pipelines.append(pipe)
+
+
+def _setup_pipeline_manager(cfg_file, transformer_manager, p_type=SAMPLE_TYPE):
+ if not os.path.exists(cfg_file):
+ #cfg_file = cfg.CONF.find_file(cfg_file)
+ print "File doesn't exists"
+ return False
+
+ ##LOG.debug(_("Pipeline config file: %s"), cfg_file)
+ logging.debug("Pipeline config file: %s", cfg_file)
+
+ with open(cfg_file) as fap:
+ data = fap.read()
+
+ pipeline_cfg = yaml.safe_load(data)
+
+ ##LOG.info(_("Pipeline config: %s"), pipeline_cfg)
+ logging.info("Pipeline config: %s", pipeline_cfg)
+ logging.debug("Pipeline config: %s", pipeline_cfg)
+
+ return PipelineManager(pipeline_cfg,
+ None, p_type)
+
+
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.yaml b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.yaml
new file mode 100644
index 0000000..505471a
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline.yaml
@@ -0,0 +1,13 @@
+---
+sources:
+ - name: meter_source
+ interval: 600
+ meters:
+ - "*"
+ sinks:
+ - meter_sink
+sinks:
+ - name: meter_sink
+ transformers:
+ publishers:
+ - notifier://
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.conf b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.conf
new file mode 100644
index 0000000..9d811f6
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.conf
@@ -0,0 +1,32 @@
+#[LOGGING]
+#level = DEBUG
+#filename = pipeline_agent.log
+
+[RABBITMQ]
+Rabbitmq_username = openstack
+Rabbitmq_passwd = 4815196be370811224fe
+Rabbitmq_host = 10.11.10.1
+Rabbitmq_port = 5672
+Ceilometer_service = ceilometer-agent-central,ceilometer-alarm-evaluator,ceilometer-api
+
+[loggers]
+keys=root
+
+[handlers]
+keys=logfile
+
+[formatters]
+keys=logfileformatter
+
+[logger_root]
+level=INFO
+handlers=logfile
+
+[formatter_logfileformatter]
+format='%(asctime)s %(filename)s %(levelname)s %(message)s'
+
+[handler_logfile]
+class=handlers.RotatingFileHandler
+level=NOTSET
+args=('pipeline_agent.log','a',1000000,100)
+formatter=logfileformatter
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.log b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.log
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.log
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
new file mode 100644
index 0000000..a751b8e
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/pipeline_agent.py
@@ -0,0 +1,208 @@
+import pika
+import yaml
+import subprocess
+import logging
+import logging.config
+import operator
+import json
+import ConfigParser
+import pipeline
+import utils
+#from ceilometer import pipeline
+from collections import OrderedDict
+
+
+class UnsortableList(list):
+ def sort(self, *args, **kwargs):
+ pass
+
+class UnsortableOrderedDict(OrderedDict):
+ def items(self, *args, **kwargs):
+ return UnsortableList(OrderedDict.items(self, *args, **kwargs))
+
+#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+
+
+tmp_pipeline_conf = "/tmp/pipeline.yaml"
+
+'''
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+'''
+def get_source_info(meter):
+ sink_name = meter + "_sink"
+ meter_name = meter+"_name"
+ source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
+ logging.debug("* new source_info :%s",source_info)
+ return (source_info,sink_name)
+
+def get_sink_info(meter,sink_name,target):
+ sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
+ logging.debug("* new source_info :%s",sink_info)
+ return sink_info
+
+def restart_ceilometer_services():
+ try :
+ config = ConfigParser.ConfigParser()
+ config.read('pipeline_agent.conf')
+ services = config.get('RABBITMQ','Ceilometer_service')
+ service = services.split(",")
+ except Exception as e:
+ logging.error("* Error in confing file:%s",e.__str__())
+ return False
+ else :
+ for service_name in service:
+ command = ['service',service_name, 'restart'];
+ logging.debug("Executing: %s command",command)
+ #shell=FALSE for sudo to work.
+ try :
+ subprocess.call(command, shell=False)
+ except Exception as e:
+ logging.error("* %s command execution failed with error %s",command,e.__str__())
+ return False
+ return True
+
+def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
+ #import pdb;pdb.set_trace()
+ try :
+ pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
+ except Exception as e:
+ logging.error ("Got Exception: %s",e.__str__())
+ return False
+ return True
+
+
+def callback(ch, method, properties, msg):
+ logging.debug(" [x] Received %r",msg)
+ #import pdb; pdb.set_trace()
+ #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
+ orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+ with open (orig_pipeline_conf, 'r') as fap:
+ data = fap.read()
+ pipeline_cfg = yaml.safe_load(data)
+ logging.debug("Pipeline config: %s", pipeline_cfg)
+
+ try :
+ json_msg = json.loads(msg)
+ meter = json_msg['sub_info']
+ publisher = json_msg['target']
+ flag = json_msg['action']
+ update_status = []
+ if type(meter) is list:
+ logging.debug("Metere is a list ... Need to handle it ")
+ for meter_info in meter :
+ update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
+ else :
+ update_status.append(update_pipeline_yaml(meter,publisher,flag))
+
+ if reduce(operator.or_, update_status):
+ if not restart_ceilometer_services():
+ logging.error("Error in restarting ceilometer services")
+ return False
+ except Exception as e :
+ logging.error("Got exception:%s in parsing message",e.__str__())
+ return False
+
+
+
+
+
+def update_pipeline_yaml(meter,publisher,flag):
+ logging.debug("meter name:%s",meter)
+ logging.debug("publisher or target name:%s",publisher)
+
+ orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+ ''' Parsing orginal pipeline yaml file '''
+ try :
+ with open (orig_pipeline_conf, 'r') as fap:
+ data = fap.read()
+ pipeline_cfg = yaml.safe_load(data)
+ logging.debug("Pipeline config: %s", pipeline_cfg)
+
+ ''' Chcking parsing errors '''
+
+ if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
+ logging.error("Original pipeline.yaml parsing failed")
+ return False
+ else :
+ status = None
+ if flag == "ADD" :
+ status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
+ elif flag == "DEL" :
+ status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
+
+ if status == True :
+ tmp_pipeline_conf = "/tmp/pipeline.yaml"
+ with open(tmp_pipeline_conf, "w") as f:
+ yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+ if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
+ logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
+ with open(orig_pipeline_conf, "w") as f:
+ yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
+ return True
+ else :
+ logging.info("Retaining original conf,as update meter info has errors")
+ return False
+ except Exception as e:
+ logging.error("* Error in confing file:%s",e.__str__())
+ return False
+
+
+def msg_queue_listner():
+
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read('pipeline_agent.conf')
+ rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+ rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+ rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+ rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+ '''
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+
+ level = LEVELS.get(log_level, logging.NOTSET)
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+ '''
+ logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)
+ except Exception as e:
+ logging.error("* Error in confing file:%s",e.__str__())
+ else :
+ logging.debug("*------------------Rabbit MQ Server Info---------")
+ logging.debug("rabbitmq_username:%s",rabbitmq_username)
+ logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
+ logging.debug("rabbitmq_host:%s",rabbitmq_host)
+ logging.debug("rabbitmq_port:%s",rabbitmq_port)
+ credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+ parameters = pika.ConnectionParameters(rabbitmq_host,
+ rabbitmq_port,
+ '/',
+ credentials)
+ connection = pika.BlockingConnection(parameters)
+ channel = connection.channel()
+ #channel.queue_declare(queue='pubsub')
+ channel.exchange_declare(exchange='pubsub',
+ type='fanout')
+
+ result = channel.queue_declare(exclusive=True)
+ queue_name = result.method.queue
+
+ channel.queue_bind(exchange='pubsub',
+ queue=queue_name)
+ logging.debug("[*] Waiting for messages. To exit press CTRL+C")
+
+ channel.basic_consume(callback,
+ queue=queue_name,
+ no_ack=True)
+ channel.start_consuming()
+
+if __name__ == "__main__":
+ #logging.debug("* Starting pipeline agent module")
+ msg_queue_listner()
+
diff --git a/xos/synchronizer/ceilometer/pipeline_agent_module/utils.py b/xos/synchronizer/ceilometer/pipeline_agent_module/utils.py
new file mode 100644
index 0000000..3aa4865
--- /dev/null
+++ b/xos/synchronizer/ceilometer/pipeline_agent_module/utils.py
@@ -0,0 +1,404 @@
+import yaml
+import random
+import string
+import logging
+import fnmatch
+
+def main():
+ orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+ with open (orig_pipeline_conf, 'r') as fap:
+ data = fap.read()
+ pipeline_cfg = yaml.safe_load(data)
+ return pipeline_cfg
+
+def build_meter_list():
+ ''' function to exiting meter list from pipeline.yaml'''
+ orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
+ with open (orig_pipeline_conf, 'r') as fap:
+ data = fap.read()
+ pipeline_cfg = yaml.safe_load(data)
+ source_cfg = pipeline_cfg['sources']
+ meter_list=[]
+ for i in source_cfg:
+ meter_list.append(i['meters'])
+
+ return meter_list
+
+def get_sink_name_from_publisher(publisher,pipeline_cfg):
+ sink_cfg = pipeline_cfg['sinks']
+ ''' Iterating over the list of publishers to get sink name'''
+ try :
+ for sinks in sink_cfg:
+ pub_list = sinks.get('publishers')
+ try :
+ k = pub_list.index(publisher)
+ return sinks.get('name')
+ except Exception as e:
+ #print ("Got Exception",e.__str__())
+ continue
+ except Exception as e:
+ return None
+
+def get_source_name_from_meter(meter,pipeline_cfg):
+ source_cfg = pipeline_cfg['sources']
+ ''' Iternating over the list of meters to get source name'''
+ try :
+ for sources in source_cfg:
+ meter_list = sources.get('meters')
+ try :
+ k = meter_list.index(meter)
+ return sources.get('name')
+ except Exception as e:
+ #print ("Got Exception",e.__str__())
+ continue
+ except Exception as e:
+ return None
+
+def get_source_name_from_with_meter_patter_match(meter,pipeline_cfg):
+ ''' Iternating over the list of meters for wildcard match to get source name'''
+ source_cfg = pipeline_cfg['sources']
+ try :
+ for sources in source_cfg:
+ meter_list = sources.get('meters')
+ for k in meter_list:
+ if k[0] == "*":
+ logging.warning("Ignoring wild card meter(*) case ")
+ continue
+ if fnmatch.fnmatch(k,meter):
+ logging.debug("substring match")
+ return (sources.get('name'),"superset",k)
+ if fnmatch.fnmatch(meter,k):
+ logging.debug("input is super match")
+ return (sources.get('name'),"subset",k)
+ except Exception as e:
+ return None,None,None
+
+ return None,None,None
+
+def get_source_name_from_sink_name(sink_name,pipeline_cfg):
+ ''' iterating over list of sources to get sink name'''
+ source_cfg = pipeline_cfg['sources']
+ try :
+ for sources in source_cfg:
+ sink_list = sources.get("sinks")
+ try :
+ k = sink_list.index(sink_name)
+ #sources.get("meters").append("m2")
+ return sources.get("name")
+ except Exception as e:
+ continue
+ except Exception as e:
+ return None
+
+def get_sink_name_from_source_name(source_name,pipeline_cfg):
+ ''' iterating over list of sinks to get sink name'''
+ source_cfg = pipeline_cfg['sources']
+ try :
+ for sources in source_cfg:
+ try :
+ if sources.get("name") == source_name:
+ return sources.get("sinks")
+ except Exception as e:
+ continue
+ except Exception as e:
+ return None
+
+def add_meter_to_source(meter_name,source_name,pipeline_cfg):
+ ''' iterating over the list of sources to add meter to the matching source'''
+ source_cfg = pipeline_cfg['sources']
+ try :
+ for sources in source_cfg:
+ try :
+ if sources.get("name") == source_name:
+ sources.get("meters").append(meter_name)
+ return True
+ except Exception as e:
+ continue
+ except Exception as e:
+ return False
+
+def get_meter_list_from_source(source_name,pipeline_cfg):
+ ''' iterating over the list of sources to get meters under the given source'''
+ source_cfg = pipeline_cfg['sources']
+ try :
+ for sources in source_cfg:
+ try :
+ if sources.get("name") == source_name:
+ return sources.get("meters")
+ except Exception as e:
+ continue
+ except Exception as e:
+ return None
+
+def get_publisher_list_from_sink(sink_name,pipeline_cfg):
+ sink_cfg = pipeline_cfg['sinks']
+ ''' Iterating over the list of sinks to build publishers list '''
+ publisher_list = []
+ try :
+ for sinks in sink_cfg:
+ try :
+ for j in sink_name:
+ if j == sinks.get("name"):
+ publisher_list.append(sinks.get("publishers"))
+ return publisher_list
+ except Exception as e:
+ #print ("Got Exception",e.__str__())
+ continue
+ except Exception as e:
+ return None
+
+def get_publisher_list_from_sinkname(sink_name,pipeline_cfg):
+ sink_cfg = pipeline_cfg['sinks']
+ ''' Iterating over the list of sinks to build publishers list '''
+ try :
+ for sinks in sink_cfg:
+ pub_list = sinks.get('publishers')
+ try :
+ if sink_name == sinks.get("name"):
+ return pub_list
+ except Exception as e:
+ #print ("Got Exception",e.__str__())
+ continue
+ except Exception as e:
+ return None
+
+
+def delete_meter_from_source(meter_name,source_name,pipeline_cfg) :
+ ''' function to delete meter for the given source '''
+ source_cfg = pipeline_cfg['sources']
+ try :
+ for sources in source_cfg:
+ try :
+ if sources.get("name") == source_name:
+ meter_list = sources.get('meters')
+ try :
+ meter_index = meter_list.index(meter_name)
+ logging.debug("meter name is present at index:%s",meter_index)
+ if len(meter_list) == 1 and meter_index == 0:
+ logging.debug("Only one meter exists removing entire source entry")
+ source_cfg.remove(sources)
+ else :
+ meter_list.pop(meter_index)
+ return True
+ except Exception as e:
+ continue
+ except Exception as e:
+ continue
+ except Exception as e:
+ return False
+
+def delete_publisher_from_sink(publisher,sink_name,pipeline_cfg):
+ sink_cfg = pipeline_cfg['sinks']
+ ''' Iterating over the list of publishers '''
+ try :
+ for sinks in sink_cfg:
+ pub_list = sinks.get('publishers')
+ #print pub_list
+ try :
+ if sink_name == sinks.get("name"):
+ k = pub_list.index(publisher)
+ pub_list.pop(k)
+ #print k
+ return True
+ except Exception as e:
+ #print ("Got Exception",e.__str__())
+ continue
+ except Exception as e:
+ return None
+
+def delete_sink_from_pipeline(sink_name,pipeline_cfg):
+ sink_cfg = pipeline_cfg['sinks']
+ try :
+ for sinks in sink_cfg:
+ if sink_name == sinks.get("name"):
+ sink_cfg.remove(sinks)
+ return True
+ except Exception as e:
+ return False
+
+def add_publisher_to_sink(publisher_name,sink_name,pipeline_cfg):
+ sink_cfg = pipeline_cfg['sinks']
+ try :
+ for sinks in sink_cfg:
+ if sink_name == sinks.get("name"):
+ sinks.get('publishers').append(publisher_name)
+ return True
+ except Exception as e:
+ return None
+
+def get_source_info(meter):
+ name = ''.join(random.choice(string.ascii_lowercase) for _ in range(9))
+ sink_name = name + "_sink"
+ meter_name = name + "_source"
+ source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
+ logging.debug("* new source_info :%s",source_info)
+ return (source_info,sink_name)
+
+def get_sink_info(meter,sink_name,target):
+ sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
+ logging.debug("* new source_info :%s",sink_info)
+ return sink_info
+
+def delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg):
+ #import pdb;pdb.set_trace()
+
+ sink_name = get_sink_name_from_publisher(publisher,pipeline_cfg)
+ source_name = get_source_name_from_meter(meter,pipeline_cfg)
+
+ if sink_name is None or source_name is None:
+ logging.error("Either sink or source name Exists in the pipeline.yaml")
+ return False
+
+ meter_list = get_meter_list_from_source(source_name,pipeline_cfg)
+
+ temp_meter_list = []
+
+ for j in meter_list:
+ temp_meter_list.append(j)
+
+ pub_list = get_publisher_list_from_sinkname(sink_name,pipeline_cfg)
+ if len(pub_list) > 2 and len(temp_meter_list) == 1:
+ if delete_publisher_from_sink(publisher,sink_name,pipeline_cfg):
+ return True
+ else:
+ return False
+
+ if delete_meter_from_source(meter,source_name,pipeline_cfg) :
+ if len(temp_meter_list) == 1:
+ if delete_publisher_from_sink(publisher,sink_name,pipeline_cfg) :
+ if get_source_name_from_sink_name(sink_name,pipeline_cfg) is None:
+ delete_sink_from_pipeline(sink_name,pipeline_cfg)
+ return True
+ else :
+ return False
+ return True
+ return False
+
+
+def update_sink_aggrgation(meter,publisher,source_name,matching_meter,meter_match,pipeline_cfg):
+ ''' Build new source and sink '''
+ new_source_info,new_sink_name = get_source_info(meter)
+ new_sink_info = get_sink_info(meter,new_sink_name,publisher)
+
+ meter_list = get_meter_list_from_source(source_name,pipeline_cfg)
+ sink_name = get_sink_name_from_source_name(source_name,pipeline_cfg)
+ publisher_list = get_publisher_list_from_sink(sink_name,pipeline_cfg)
+ for i in publisher_list:
+ for j in i:
+ #print j
+ if j not in new_sink_info.get("publishers") :
+ new_sink_info.get("publishers").append(j)
+ #print new_sink_info
+
+ cfg_source = pipeline_cfg['sources']
+ cfg_sink = pipeline_cfg['sinks']
+ if meter_match == "superset" :
+ new_source_info.get("meters").append("!"+ matching_meter)
+ elif meter_match == "subset" :
+ ''' here need to get list of meters with sub-string match '''
+ add_meter_to_source("!"+meter,source_name,pipeline_cfg)
+ add_publisher_to_sink(publisher,sink_name,pipeline_cfg)
+
+ logging.debug("----------- Before Updating Meter Info ------------------")
+ logging.debug("%s",pipeline_cfg)
+
+ ''' Updating source and sink info '''
+ cfg_source.append(new_source_info)
+ cfg_sink.append(new_sink_info)
+ logging.debug("----------- After Updating Meter Info --------------------")
+ logging.debug("%s",pipeline_cfg)
+
+def update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg):
+ #import pdb;pdb.set_trace()
+ sink_name = get_sink_name_from_publisher(publisher,pipeline_cfg)
+ source_name = get_source_name_from_meter(meter,pipeline_cfg)
+ if sink_name is None :
+ logging.debug("No Sink exists with the given Publisher")
+ if source_name is None:
+ ''' Commenting the code related t owild card '''
+ '''
+ pattern_source_name,pattern,matching_meter = get_source_name_from_with_meter_patter_match(meter,pipeline_cfg)
+ if pattern_source_name is not None:
+ if pattern == "superset" :
+ #add_meter_to_source("!"+meter,pattern_source_name,pipeline_cfg)
+ update_sink_aggrgation(meter,publisher,pattern_source_name,matching_meter,"superset",pipeline_cfg)
+ #print pipeline_cfg
+ return True
+ if pattern == "subset" :
+ update_sink_aggrgation(meter,publisher,pattern_source_name,matching_meter,"subset",pipeline_cfg)
+ return True
+ '''
+ source_info,sink_name = get_source_info(meter)
+ sink_info = get_sink_info(meter,sink_name,publisher)
+
+ cfg_source = pipeline_cfg['sources']
+ cfg_sink = pipeline_cfg['sinks']
+
+ logging.debug("----------- Before Updating Meter Info ------------------")
+ logging.debug("%s",pipeline_cfg)
+
+ ''' Updating source and sink info '''
+ cfg_source.append(source_info)
+ cfg_sink.append(sink_info)
+ logging.debug("----------- After Updating Meter Info --------------------")
+ logging.debug("%s",pipeline_cfg)
+ return True
+ else :
+ logging.debug("Meter already exists in the conf file under source name:%s ",source_name)
+ meter_list = get_meter_list_from_source(source_name,pipeline_cfg)
+ publisher_list=[]
+ if len(meter_list) > 1:
+ sink_name = get_sink_name_from_source_name(source_name,pipeline_cfg)
+ '''
+ if type(sink_name) is list :
+ for sinkname in sink_name:
+ publisher_list.append(get_publisher_list_from_sink(sinkname,pipeline_cfg))
+ else :
+ publisher_list.append(get_publisher_list_from_sink(sink_name,pipeline_cfg))
+ '''
+ publisher_list = get_publisher_list_from_sink(sink_name,pipeline_cfg)
+ new_source_info,new_sink_name = get_source_info(meter)
+ new_sink_info = get_sink_info(meter,new_sink_name,publisher)
+ for i in publisher_list:
+ for j in i:
+ #print j
+ if j not in new_sink_info.get("publishers") :
+ new_sink_info.get("publishers").append(j)
+ cfg_source = pipeline_cfg['sources']
+ cfg_sink = pipeline_cfg['sinks']
+
+ logging.debug("----------- Before Updating Meter Info ------------------")
+ logging.debug("%s",pipeline_cfg)
+
+ ''' Updating source and sink info '''
+ cfg_source.append(new_source_info)
+ cfg_sink.append(new_sink_info)
+ logging.debug("----------- After Updating Meter Info --------------------")
+ logging.debug("%s",pipeline_cfg)
+ delete_meter_from_source(meter,source_name,pipeline_cfg)
+ logging.debug("%s",pipeline_cfg)
+ return True
+ else :
+ logging.debug ("Source already exists for this meter add publisher to it .....:%s",source_name)
+ sink_name_list = get_sink_name_from_source_name(source_name,pipeline_cfg)
+ for sink_name in sink_name_list :
+ add_publisher_to_sink(publisher,sink_name,pipeline_cfg)
+ return True
+ #print pipeline_cfg
+ else :
+ logging.debug ("Publisher already exists under sink:%s",sink_name)
+ if get_source_name_from_meter(meter,pipeline_cfg) is not None:
+ logging.debug("Both meter and publisher already exists in the conf file")
+ logging.debug( "Update request is not sucessful")
+ return False
+ else :
+ source_name = get_source_name_from_sink_name(sink_name,pipeline_cfg)
+ logging.debug ("Need to add meter to already existing source which \
+ has this publisher under one of its sink")
+ #print source_name
+ if add_meter_to_source(meter,source_name,pipeline_cfg):
+ logging.debug("Meter added sucessfully")
+ return True
+
+
+