Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/README b/xos/synchronizer/ceilometer/ceilometer_pub_sub/README
new file mode 100644
index 0000000..dbd6a5f
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/README
@@ -0,0 +1,70 @@
+
+Subscribe-Publish Frame Work:
+1.Command to Install Flask Webserver frame work.
+ sudo pip install Flask
+
+ Along with flask we need the following packages:
+ msgpack
+ fnmatch
+ operator
+ logging
+ oslo_utils
+ ConfigParser
+
+2.Files: i.sub_main.py
+ ii.pubrecords.py
+ iii.pub_sub.conf
+
+3.Command to start the server:
+ #python sun_main.py
+4.Command for subscription:
+ i.app_id:Application ID,should be unique.
+ ii.target:
+ Presently only udp is supported.
+ a.udp:<ip:portno>
+ b.kafka:<kafkaip:kafkaport>
+ iii.sub_info:Sunscription notifications.ex:cpu_util,cpu_*
+ It can be given as single input or list.
+ iv.query:
+ Below information need to provide as part of query.
+ a.field:fileds like user id ,porject id etc.,
+ b.op:"eq","gt","lt" etc.,
+ c.value:value of the fileds.
+ Example:
+ curl -i -H "Content-Type: application/json" -X POST -d '{"app_id":"10","target":"udp://10.11.10.1:5006","sub_info":"cpu_util","query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L http://10.11.10.1:4455/subscribe
+ curl -i -H "Content-Type: application/json" -X POST -d '{"app_id":"10","target":"udp://10.11.10.1:5006", "sub_info":["cpu_util", "memory"],"query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L http://10.11.10.1:4455/subscribe
+
+5.Command for unsunscription:
+ For unsubcription only appid will be needed.
+ curl -i -H "Content-Type: application/json" -X POST -d '{"app_id":"10"}' http://10.11.10.1:4455/unsubscribe
+
+6.Running Kafka on the server server where pub-sub module is running:
+ i.Download the kafka from:
+ #https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
+ http://apache.arvixe.com/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
+ ii.install java
+ sudo apt-get update
+ sudo apt-get install default-jre
+ iii. install kafka package
+ sudo easy_install pip
+ sudo pip install kafka-python
+ iv.tar -xzf kafka_2.11-0.9.0.0.tgz
+ v. Start the zookeeper server:
+ bin/zookeeper-server-start.sh config/zookeeper.properties
+ vi.Start Kafka Server :
+ bin/kafka-server-start.sh config/server.properties
+ vii.To read messages from kafka on a topic test :
+ bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
+ viii.Before configuring kafka:// publisher in ceilometer:
+ install kafka on both controller node and compute nodes
+ Restart the ceilometer-agent-notification, ceilometer-agent-compute, ceilometer-agent-central daemons
+
+7.[Optional]Install Kafka-web-console (GUI)
+ i.wget https://github.com/adamfokken/kafka-web-console/archive/topic-add-remove.zip
+ ii.unzip it
+ iii.wget http://downloads.typesafe.com/typesafe-activator/1.3.2/typesafe-activator-1.3.2-minimal.zip
+ iv.unzip it and add it to the system path so you can execute the activator command that it provides.
+ v.Install javac if required: sudo apt-get install openjdk-7-jdk
+ vi.cd kafka-web-console-topic-add-remove
+ vii.activator start -DapplyEvolutions.default=true
+ viii.Point your browser to the kafka we-console port (9000) and register the zookeeper
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/kafka_broker.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/kafka_broker.py
new file mode 100644
index 0000000..778486b
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/kafka_broker.py
@@ -0,0 +1,167 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from kafka import TopicPartition
+from oslo_config import cfg
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+ def __init__(self, parsed_url):
+ self.kafka_client = None
+ self.kafka_server = None
+
+ self.host, self.port = netutils.parse_host_port(
+ parsed_url.netloc, default_port=9092)
+
+ self.local_queue = []
+
+ params = urlparse.parse_qs(parsed_url.query)
+ self.topic = params.get('topic', ['ceilometer'])[-1]
+ self.policy = params.get('policy', ['default'])[-1]
+ self.max_queue_length = int(params.get(
+ 'max_queue_length', [1024])[-1])
+ self.max_retry = int(params.get('max_retry', [100])[-1])
+
+ if self.policy in ['default', 'drop', 'queue']:
+ LOG.info(('Publishing policy set to %s') % self.policy)
+ else:
+ LOG.warn(('Publishing policy is unknown (%s) force to default')
+ % self.policy)
+ self.policy = 'default'
+
+ try:
+ self._get_client()
+ self._get_server()
+ except Exception as e:
+ LOG.exception("Failed to connect to Kafka service: %s", e)
+
+ def publish_samples(self, context, samples):
+ """Send a metering message for kafka broker.
+
+ :param context: Execution context from the service or RPC call
+ :param samples: Samples from pipeline after transformation
+ """
+ samples_list = [
+ utils.meter_message_from_counter(
+ sample, cfg.CONF.publisher.telemetry_secret)
+ for sample in samples
+ ]
+
+ self.local_queue.append(samples_list)
+
+ try:
+ self._check_kafka_connection()
+ except Exception as e:
+ raise e
+
+ self.flush()
+
+ def flush(self):
+ queue = self.local_queue
+ self.local_queue = self._process_queue(queue)
+ if self.policy == 'queue':
+ self._check_queue_length()
+
+ def publish_events(self, context, events):
+ """Send an event message for kafka broker.
+
+ :param context: Execution context from the service or RPC call
+ :param events: events from pipeline after transformation
+ """
+ events_list = [utils.message_from_event(
+ event, cfg.CONF.publisher.telemetry_secret) for event in events]
+
+ self.local_queue.append(events_list)
+
+ try:
+ self._check_kafka_connection()
+ except Exception as e:
+ raise e
+
+ self.flush()
+
+ def _process_queue(self, queue):
+ current_retry = 0
+ while queue:
+ data = queue[0]
+ try:
+ self._send(data)
+ except Exception:
+ LOG.warn(("Failed to publish %d datum"),
+ sum([len(d) for d in queue]))
+ if self.policy == 'queue':
+ return queue
+ elif self.policy == 'drop':
+ return []
+ current_retry += 1
+ if current_retry >= self.max_retry:
+ self.local_queue = []
+ LOG.exception(("Failed to retry to send sample data "
+ "with max_retry times"))
+ raise
+ else:
+ queue.pop(0)
+ return []
+
+ def _check_queue_length(self):
+ queue_length = len(self.local_queue)
+ if queue_length > self.max_queue_length > 0:
+ diff = queue_length - self.max_queue_length
+ self.local_queue = self.local_queue[diff:]
+ LOG.warn(("Kafka Publisher max local queue length is exceeded, "
+ "dropping %d oldest data") % diff)
+
+ def _check_kafka_connection(self):
+ try:
+ self._get_client()
+ except Exception as e:
+ LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+
+ if self.policy == 'queue':
+ self._check_queue_length()
+ else:
+ self.local_queue = []
+ raise Exception('Kafka Client is not available, '
+ 'please restart Kafka client')
+
+ def _get_client(self):
+ if not self.kafka_client:
+ self.kafka_client = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+
+ def _get_server(self):
+ if not self.kafka_server:
+ self.kafka_server = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ #self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+ self.kafka_consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (self.host,self.port)])
+ self.kafka_consumer.assign([TopicPartition(self.topic,0)])
+ self.kafka_consumer.seek_to_end()
+
+ def _send(self, data):
+ #for d in data:
+ try:
+ self.kafka_producer.send_messages(
+ self.topic, json.dumps(data))
+ except Exception as e:
+ LOG.exception(("Failed to send sample data: %s"), e)
+ raise
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
new file mode 100644
index 0000000..6998903
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
@@ -0,0 +1,44 @@
+#[LOGGING]
+#level = INFO
+#filename = pub_sub.log
+#maxbytes = 1000000
+#backupcount = 5
+
+[WEB_SERVER]
+webserver_host = localhost
+webserver_port = 4455
+
+[CLIENT]
+target = kafka://localhost:9092?topic=ceilometer
+#target = udp://10.11.10.1:5004/
+
+[RABBITMQ]
+#UpdateConfMgmt = True
+UpdateConfMgmt = False
+Rabbitmq_username = openstack
+Rabbitmq_passwd = password
+Rabbitmq_host = localhost
+Rabbitmq_port = 5672
+
+[loggers]
+keys=root
+
+[handlers]
+keys=logfile
+
+[formatters]
+keys=logfileformatter
+
+[logger_root]
+level=INFO
+handlers=logfile
+
+[formatter_logfileformatter]
+format='%(asctime)s %(filename)s %(levelname)s %(message)s'
+
+[handler_logfile]
+class=handlers.RotatingFileHandler
+level=NOTSET
+args=('pub_sub.log','a',1000000,5)
+formatter=logfileformatter
+
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/pubrecords.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pubrecords.py
new file mode 100644
index 0000000..bca62b8
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pubrecords.py
@@ -0,0 +1,95 @@
+#!/usr/bin/python
+import socket
+from oslo_utils import units
+from oslo_utils import netutils
+import kafka
+import kafka_broker
+import fnmatch
+import logging
+import copy
+
+sub_info=[]
+class subinfo:
+ def __init__(self,scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target):
+ logging.debug("* Updating subscription_info ")
+ self.scheme = scheme
+ self.app_id = app_id
+ self.ipaddress = app_ip
+ self.portno = app_port
+ self.subscription_info = subscription_info
+ self.sub_info_filter = sub_info_filter
+ self.target = target
+
+ if scheme == "kafka":
+ ''' Creating kafka publisher to send message over kafka '''
+ parse_target = netutils.urlsplit(target)
+ self.kafka_publisher = kafka_broker.KafkaBrokerPublisher(parse_target)
+ elif scheme == "udp":
+ ''' Creating UDP socket to send message over UDP '''
+ self.udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+ self.udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ def update_subinfo(self):
+ logging.info("* inside %s",self.update_subinfo.__name__)
+ if not sub_info:
+ logging.debug("* -----------List is EMpty -------------")
+ sub_info.append(self)
+ logging.debug("* Subscription is sucessful")
+ return "Subscription is sucessful \n"
+ for obj in sub_info:
+ if obj.app_id == self.app_id :
+ # obj.subscription_info=self.subscription_info
+ sub_info.remove(obj)
+ sub_info.append(self)
+ logging.warning("* entry already exists so overwriting this subscription \n")
+ return "entry already exists so overwriting this subscription \n"
+ sub_info.append(self)
+ return "Subscription is sucessful \n"
+
+ @staticmethod
+ def delete_subinfo(app_id):
+ logging.info("* inside %s",subinfo.delete_subinfo.__name__)
+ Flag = False
+ for obj in sub_info:
+ if obj.app_id == app_id :
+ sub_info.remove(obj)
+ Flag = True
+ logging.debug("* Un-Subscription is sucessful")
+ return "Un-Subscription is sucessful \n"
+ if not Flag :
+ err_str = "No subscription exists with app id: " + app_id + "\n"
+ logging.error("* No subscription exists with app id:%s ",app_id)
+ raise Exception (err_str)
+
+ @staticmethod
+ def print_subinfo():
+ logging.info("* inside %s",subinfo.print_subinfo.__name__)
+ for obj in sub_info:
+ logging.debug("* ------------------------------------------------")
+ logging.debug("* scheme:%s",obj.scheme)
+ logging.debug("* app_id:%s",obj.app_id)
+ logging.debug("* portno:%s",obj.portno )
+ logging.debug("* ipaddress:%s",obj.ipaddress)
+ logging.debug("* subscription_info:%s",obj.subscription_info)
+ logging.debug("* sub_info_filter:%s",obj.sub_info_filter)
+ logging.debug("* target:%s",obj.target)
+ logging.debug("* ------------------------------------------------")
+ @staticmethod
+ def get_subinfo(app_id):
+ logging.info("* inside %s",subinfo.get_subinfo.__name__)
+ Flag = False
+ for obj in sub_info:
+ if obj.app_id == app_id :
+ return obj.subscription_info,obj.target
+ return (None,None)
+
+
+ @staticmethod
+ def get_sub_list(notif_subscription_info):
+ logging.info("* inside %s",subinfo.get_sublist.__name__)
+ sub_list=[]
+ for obj in sub_info:
+ if obj.subscription_info == notif_subscription_info:
+ sub_list.append(obj)
+ return sub_list
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py
new file mode 100644
index 0000000..e7ebcf4
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py
@@ -0,0 +1,559 @@
+#!/usr/bin/python
+import socket,thread
+import sys
+import msgpack
+import fnmatch
+import operator
+import logging
+import logging.handlers
+import logging.config
+import ConfigParser
+import json
+from oslo_utils import units
+from oslo_utils import netutils
+from pubrecords import *
+import kafka
+import kafka_broker
+
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+app = Flask(__name__)
+
+COMPARATORS = {
+ 'gt': operator.gt,
+ 'lt': operator.lt,
+ 'ge': operator.ge,
+ 'le': operator.le,
+ 'eq': operator.eq,
+ 'ne': operator.ne,
+}
+
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+''' Stores all the subscribed meter's list '''
+meter_list = []
+''' Stores meter to app-id mapping '''
+meter_dict = {}
+
+@app.route('/subscribe',methods=['POST','SUB'])
+def subscribe():
+ try :
+ app_id = request.json['app_id']
+ target = request.json['target']
+ sub_info = request.json['sub_info']
+
+ try :
+ validate_sub_info(sub_info)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+
+ ''' Flag to Update pipeling cfg file '''
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
+ update_pipeline_conf(sub_info,target,app_id,"ADD")
+ else:
+ logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
+
+ if not 'query' in request.json.keys():
+ logging.info("query request is not provided by user")
+ query = None
+ else:
+ query = request.json['query']
+ for i in range(len(query)):
+ if not 'field' in query[i].keys():
+ err_str = "Query field"
+ raise Exception (err_str)
+ if not 'op' in query[i].keys():
+ err_str = "Query op"
+ raise Exception (err_str)
+ if not 'value' in query[i].keys():
+ err_str = "Query value"
+ raise Exception (err_str)
+ except Exception as e:
+ err_str = "KeyError: Parsing subscription request " + e.__str__() + "\n"
+ logging.error("* KeyError: Parsing subscription request :%s",e.__str__())
+ return err_str
+
+ parse_target=netutils.urlsplit(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+
+ status = ""
+ if parse_target.scheme == "udp" or parse_target.scheme == "kafka":
+ host,port=netutils.parse_host_port(parse_target.netloc)
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+
+ subscription_info = sub_info
+ sub_info_filter = query
+ logging.info("Creating subscription for app:%s for meters:%s with filters:%s and target:%s",app_id, subscription_info, sub_info_filter, target)
+ subscrip_obj=subinfo(scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target)
+ status = subscrip_obj.update_subinfo()
+ subinfo.print_subinfo()
+
+ if parse_target.scheme == "file" :
+ pass
+ return status
+
+@app.route('/unsubscribe',methods=['POST','UNSUB'])
+def unsubscribe():
+ try :
+ app_id = request.json['app_id']
+ sub_info,target = subinfo.get_subinfo(app_id)
+ if sub_info is None or target is None:
+ err_str = "No subscription exists with app id: " + app_id + "\n"
+ logging.error("* No subscription exists with app id:%s ",app_id)
+ return err_str
+ else:
+ ''' Flag to Update pipeling cfg file '''
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
+ update_pipeline_conf(sub_info,target,app_id,"DEL")
+ else:
+ logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
+ #update_pipeline_conf(sub_info,target,"DEL")
+ subinfo.delete_subinfo(app_id)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+ return "UnSubscrition is sucessful! \n"
+
+@app.errorhandler(404)
+def not_found(error):
+ return make_response(jsonify({'error': 'Not found'}), 404)
+
+def print_subscribed_meter_list():
+ logging.debug("-------------------------------------------------")
+ #print (meter_list)
+ logging.debug("meter_list:%s",meter_list)
+ logging.debug("meter_dict:%s",meter_dict)
+ #print (meter_dict)
+ logging.debug("-------------------------------------------------")
+
+def validate_sub_info(sub_info):
+ if type(sub_info) is not list:
+ sub_info = [sub_info]
+ for meter in sub_info:
+ if meter.startswith("*") or meter.startswith("!"):
+ err_str = "Given meter is not supported:" + meter + "\n"
+ logging.error("* Given meter is not supported:%s",meter)
+ raise Exception (err_str)
+
+def update_meter_dict(meterinfo,app_id):
+ try :
+ if type(meterinfo) == list:
+ for meter in meterinfo:
+ if meter_dict.get(meter) is None:
+ meter_dict[meter] = [app_id]
+ elif app_id not in meter_dict.get(meter):
+ meter_dict.get(meter).append(app_id)
+ else:
+ if meter_dict.get(meterinfo) is None:
+ meter_dict[meterinfo] = [app_id]
+ elif app_id not in meter_dict.get(meterinfo):
+ meter_dict.get(meterinfo).append(app_id)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+
+def check_send_msg_confmgmt_del(sub_info,app_id):
+ temp_sub_info=[]
+ temm_meter_info = None
+ if len(meter_list) == 0:
+ #print("No subscription exists")
+ logging.info("No subscription exists")
+ return False,None
+ if type(sub_info) == list:
+ for meterinfo in sub_info:
+ if meter_dict.get(meterinfo) is None:
+ logging.warning("%s meter doesn't exist in the meter dict",meterinfo)
+ continue
+ if app_id in meter_dict.get(meterinfo):
+ if len(meter_dict.get(meterinfo)) == 1:
+ #print "Only single app is subscribing this meter"
+ logging.info("Only single app is subscribing this meter")
+ del meter_dict[meterinfo]
+ temp_sub_info.append(meterinfo)
+ if meterinfo in meter_list:
+ meter_list.remove(meterinfo)
+ else:
+ meter_dict.get(meterinfo).remove(app_id)
+ return True,temp_sub_info
+ else :
+ if meter_dict.get(sub_info) is None:
+ logging.warning("%s meter doesn't exist in the meter dict",sub_info)
+ return False,None
+ if app_id in meter_dict.get(sub_info):
+ if len(meter_dict.get(sub_info)) == 1:
+ #print "Only single app is subscribing this meter"
+ logging.info("Only single app is subscribing this meter")
+ del meter_dict[sub_info]
+ if sub_info in meter_list:
+ meter_list.remove(sub_info)
+ return True,sub_info
+ else:
+ meter_dict.get(sub_info).remove(app_id)
+ return False,None
+
+def check_send_msg_confmgmt_add(sub_info,app_id):
+ temp_sub_info=[]
+ update_meter_dict(sub_info,app_id)
+ #import pdb;pdb.set_trace()
+ if len(meter_list) == 0:
+ logging.info("No subinfo exits")
+ if type(sub_info) == list:
+ for j in sub_info:
+ meter_list.append(j)
+ return True,sub_info
+ else :
+ meter_list.append(sub_info)
+ return True,sub_info
+ if type(sub_info) == list:
+ for j in sub_info:
+ if j in meter_list:
+ #print ("meter already exists",j)
+ logging.info("meter already exist:%s",j)
+ continue
+ else :
+ temp_sub_info.append(j)
+ meter_list.append(j)
+ if temp_sub_info is not None:
+ return True,temp_sub_info
+ else :
+ return False,None
+ else :
+ if sub_info not in meter_list:
+ meter_list.append(sub_info)
+ #print ("subscription for meter doesn't exist",sub_info)
+ logging.warning("subscription for meter doesn't exist:%s",sub_info)
+ return True,sub_info
+ else :
+ #print ("subscription already exist for ",sub_info)
+ logging.info("subscription already exist for:%s ",sub_info)
+ return False,sub_info
+
+def update_pipeline_conf(sub_info,target,app_id,flag):
+ import pika
+
+ logging.debug("* sub_info:%s",sub_info)
+ logging.debug("* target:%s",target)
+
+ #msg={"sub_info":sub_info,"target":target,"action":flag}
+
+ #json_msg=json.dumps(msg)
+ #msg="image"
+ meter_sub_info = None
+ if flag == "ADD":
+ status,meter_sub_info=check_send_msg_confmgmt_add(sub_info,app_id)
+ if status == False or meter_sub_info == None or meter_sub_info == []:
+ logging.warning("%s is already subscribed with the conf mgmt")
+ return
+ elif flag == "DEL":
+ status,meter_sub_info=check_send_msg_confmgmt_del(sub_info,app_id)
+ if status == False or meter_sub_info == None or meter_sub_info == []:
+ logging.warning("%s is already unsubscribed with the conf mgmt")
+ return
+ try :
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+ rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+ rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+ rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+
+ ceilometer_client_info = config.get('CLIENT','target')
+ #msg={"sub_info":sub_info,"target":ceilometer_client_info,"action":flag}
+ msg={"sub_info":meter_sub_info,"target":ceilometer_client_info,"action":flag}
+ #print msg
+ json_msg=json.dumps(msg)
+
+ credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+ parameters = pika.ConnectionParameters(rabbitmq_host,
+ rabbitmq_port,
+ '/',
+ credentials)
+ connection = pika.BlockingConnection(parameters)
+ properties = pika.BasicProperties(content_type = "application/json")
+ channel = connection.channel()
+ channel.exchange_declare(exchange='pubsub',
+ type='fanout')
+
+ channel.basic_publish(exchange='pubsub',
+ routing_key='',
+ properties = properties,
+ body=json_msg)
+ logging.debug(" [x] %s Sent",msg)
+ logging.info(" [x] %s Sent",msg)
+ connection.close()
+ except Exception as e:
+ logging.error("Error:%s",e.__str__())
+
+def read_notification_from_ceilometer(host,port):
+ UDP_IP = host
+ UDP_PORT = port
+
+ logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
+ udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+ udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ udp.bind((UDP_IP, UDP_PORT))
+
+ while True:
+ #print thread.get_ident()
+ #logging.debug("thread.get_ident():%s", thread.get_ident())
+ data, source = udp.recvfrom(64 * units.Ki)
+ sample = msgpack.loads(data, encoding='utf-8')
+ #logging.debug("* -------------------------------------------------------")
+ logging.debug("%s",sample)
+ #print(sample)
+ for obj in sub_info:
+ msg_list = []
+ if obj.scheme == "udp" :
+ if type(obj.subscription_info) is list:
+ for info in obj.subscription_info:
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
+ else :
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
+ try:
+ if reduce(operator.or_, msg_list):
+ host = obj.ipaddress
+ port = int(obj.portno)
+ l=[]
+ #logging.debug("* -------------------------------------------------------")
+ if obj.sub_info_filter is None:
+ try:
+ logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
+ udp.sendto(data,(host,port))
+ except Exception as e:
+ logging.error ("Unable to send sample over UDP for %s and %s,%s",host,port,e.__str__())
+ ret_str = ("Unable to send sample over UDP for %s and %s,%s")%(host,port,e.__str__())
+ continue
+ for i in range(len(obj.sub_info_filter)):
+ if obj.sub_info_filter[i]['op'] in COMPARATORS:
+ op = COMPARATORS[obj.sub_info_filter[i]['op']]
+ logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
+ logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
+ l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
+ logging.info("* Logical and of Query %s",l)
+ else:
+ logging.deubg("* Not a valid operator ignoring app_id:%s",obj.app_id)
+ l.append(False)
+ logging.info("* Logical and of Query %s",l)
+ if reduce(operator.and_, l):
+ try:
+ logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
+ udp.sendto(data,(host,port))
+ except Exception:
+ logging.error ("Unable to send sample over UDP for %s and %s ",host,port)
+ ret_str = ("Unable to send sample over UDP for %s and %s ")%(host,port)
+ else :
+ logging.warning("* No Notification found with the given subscription")
+ else :
+ logging.warning("* No valid subscrition found for %s",obj.app_id)
+ except Exception as e:
+ logging.error("Key_Error:%s ",e.__str__())
+ ret_str = ("Key_Error:%s \n")% e.__str__()
+
+def read_notification_from_ceilometer_over_udp(host,port):
+ UDP_IP = host
+ UDP_PORT = port
+
+ logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
+ udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+ udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+ udp.bind((UDP_IP, UDP_PORT))
+
+ while True:
+ #print thread.get_ident()
+ #logging.debug("thread.get_ident():%s", thread.get_ident())
+ data, source = udp.recvfrom(64 * units.Ki)
+ sample = msgpack.loads(data, encoding='utf-8')
+ status = process_ceilometer_message(sample,data)
+
+def read_notification_from_ceilometer_over_kafka(parse_target):
+ logging.info("Kafka target:%s",parse_target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ for message in kafka_publisher.kafka_consumer:
+ #print message.value
+ #logging.debug("%s",message.value)
+ #logging.info("%s",message.value)
+ status = process_ceilometer_message(json.loads(message.value),message.value)
+ #print status
+ except Exception as e:
+ logging.error("Error in Kafka setup:%s ",e.__str__())
+
+def process_ceilometer_message(sample,data):
+ logging.debug("%s",sample)
+ #logging.info("%s",sample)
+ if len(sub_info) < 1:
+ #print "No subscription exists"
+ return
+ for obj in sub_info:
+ #import pdb;pdb.set_trace()
+ msg_list = []
+ if type(obj.subscription_info) is list:
+ for info in obj.subscription_info:
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
+ else :
+ msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
+ try:
+ if reduce(operator.or_, msg_list):
+ '''
+ kafka_publisher = None
+ if obj.scheme == "kafka" :
+ parse_target=netutils.urlsplit(obj.target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ except Exception as e:
+ logging.error("* Error in connecting kafka broker:%s",e.__str__())
+ # return False
+ continue
+ '''
+ host = obj.ipaddress
+ port = int(obj.portno)
+ l=[]
+ logging.debug("* -------------------------------------------------------")
+ if obj.sub_info_filter is None:
+ try:
+ if obj.scheme == "udp" :
+ #logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
+ #logging.info("* Sending data without query over UDP for host:%s and port:%s",host,port)
+ #udp = socket.socket(socket.AF_INET, # Internet
+ # socket.SOCK_DGRAM) # UDP
+ #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ obj.udp.sendto(data,(host,port))
+ #return True
+ continue
+ elif obj.scheme == "kafka" :
+ #logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
+ #logging.info("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
+ obj.kafka_publisher._send(sample)
+ #return True
+ continue
+ except Exception as e:
+ logging.error ("Unable to send sample over UDP/kafka for %s and %s,%s",host,port,e.__str__())
+ ret_str = ("Unable to send sample over UDP for %s and %s,%s ")%(host,port,e.__str__())
+ #return False
+ continue
+ for i in range(len(obj.sub_info_filter)):
+ if obj.sub_info_filter[i]['op'] in COMPARATORS:
+ op = COMPARATORS[obj.sub_info_filter[i]['op']]
+ #logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
+ #logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
+ l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
+ #logging.info("* Logical and of Query %s",l)
+ else:
+ logging.info("* Not a valid operator ignoring app_id:%s",obj.app_id)
+ l.append(False)
+ #logging.info("* Logical and of Query %s",l)
+ if reduce(operator.or_, l):
+ try:
+ if obj.scheme == "udp" :
+ logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
+ #udp = socket.socket(socket.AF_INET, # Internet
+ # socket.SOCK_DGRAM) # UDP
+ #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ obj.udp.sendto(data,(host,port))
+ #return True
+ continue
+ elif obj.scheme == "kafka" :
+ logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,obj.kafka_publisher.topic)
+ obj.kafka_publisher._send(sample)
+ #return True
+ continue
+ except Exception:
+ logging.error ("Unable to send sample over UDP/Kafka for %s and %s ",host,port)
+ ret_str = ("Unable to send sample over UDP/Kafka for %s and %s ")%(host,port)
+ #return False
+ continue
+ else :
+ logging.debug("* No Notification found with the given subscription")
+ continue
+ else :
+ logging.debug("* No matching subscrition found for %s",sample['counter_name'])
+ continue
+ except Exception as e:
+ logging.error("Key_Error:%s ",e.__str__())
+ ret_str = ("Key_Error:%s \n")%e.__str__()
+ #return False
+ continue
+
+def initialize(ceilometer_client):
+ logging.debug("Ceilometer client info:%s",ceilometer_client)
+ parse_target=netutils.urlsplit(ceilometer_client)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid client format"
+ logging.error("* Invalid client format")
+ return err_str
+ if parse_target.scheme == "udp" :
+ host,port=netutils.parse_host_port(parse_target.netloc)
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+ thread.start_new(read_notification_from_ceilometer_over_udp,(host,port,))
+ elif parse_target.scheme == "kafka" :
+ thread.start_new(read_notification_from_ceilometer_over_kafka,(parse_target,))
+
+
+if __name__ == "__main__":
+
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read('pub_sub.conf')
+ webserver_host = config.get('WEB_SERVER','webserver_host')
+ webserver_port = int (config.get('WEB_SERVER','webserver_port'))
+ # client_host = config.get('CLIENT','client_host')
+ # client_port = int (config.get('CLIENT','client_port'))
+ ceilometer_client_info = config.get('CLIENT','target')
+ '''
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+ maxbytes = int (config.get('LOGGING','maxbytes'))
+ backupcount = int (config.get('LOGGING','backupcount'))
+ level = LEVELS.get(log_level, logging.NOTSET)
+ '''
+ logging.config.fileConfig('pub_sub.conf', disable_existing_loggers=False)
+ '''
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+
+ # create rotating file handler
+
+ rfh = logging.handlers.RotatingFileHandler(
+ log_file, encoding='utf8', maxBytes=maxbytes,
+ backupCount=backupcount,delay=0)
+ logging.getLogger().addHandler(rfh)
+ '''
+
+ except Exception as e:
+ print("* Error in config file:",e.__str__())
+ #logging.error("* Error in confing file:%s",e.__str__())
+ else:
+ #initialize(client_host,client_port)
+ initialize(ceilometer_client_info)
+ app.run(host=webserver_host,port=webserver_port,debug=False)
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py
new file mode 100644
index 0000000..ce495fc
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py
@@ -0,0 +1,164 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from oslo_config import cfg
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+ def __init__(self, parsed_url):
+ self.kafka_client = None
+ self.kafka_server = None
+
+ self.host, self.port = netutils.parse_host_port(
+ parsed_url.netloc, default_port=9092)
+
+ self.local_queue = []
+
+ params = urlparse.parse_qs(parsed_url.query)
+ self.topic = params.get('topic', ['ceilometer'])[-1]
+ self.policy = params.get('policy', ['default'])[-1]
+ self.max_queue_length = int(params.get(
+ 'max_queue_length', [1024])[-1])
+ self.max_retry = int(params.get('max_retry', [100])[-1])
+
+ if self.policy in ['default', 'drop', 'queue']:
+ LOG.info(('Publishing policy set to %s') % self.policy)
+ else:
+ LOG.warn(('Publishing policy is unknown (%s) force to default')
+ % self.policy)
+ self.policy = 'default'
+
+ try:
+ self._get_client()
+ self._get_server()
+ except Exception as e:
+ LOG.exception("Failed to connect to Kafka service: %s", e)
+
+ def publish_samples(self, context, samples):
+ """Send a metering message for kafka broker.
+
+ :param context: Execution context from the service or RPC call
+ :param samples: Samples from pipeline after transformation
+ """
+ samples_list = [
+ utils.meter_message_from_counter(
+ sample, cfg.CONF.publisher.telemetry_secret)
+ for sample in samples
+ ]
+
+ self.local_queue.append(samples_list)
+
+ try:
+ self._check_kafka_connection()
+ except Exception as e:
+ raise e
+
+ self.flush()
+
+ def flush(self):
+ queue = self.local_queue
+ self.local_queue = self._process_queue(queue)
+ if self.policy == 'queue':
+ self._check_queue_length()
+
+ def publish_events(self, context, events):
+ """Send an event message for kafka broker.
+
+ :param context: Execution context from the service or RPC call
+ :param events: events from pipeline after transformation
+ """
+ events_list = [utils.message_from_event(
+ event, cfg.CONF.publisher.telemetry_secret) for event in events]
+
+ self.local_queue.append(events_list)
+
+ try:
+ self._check_kafka_connection()
+ except Exception as e:
+ raise e
+
+ self.flush()
+
+ def _process_queue(self, queue):
+ current_retry = 0
+ while queue:
+ data = queue[0]
+ try:
+ self._send(data)
+ except Exception:
+ LOG.warn(("Failed to publish %d datum"),
+ sum([len(d) for d in queue]))
+ if self.policy == 'queue':
+ return queue
+ elif self.policy == 'drop':
+ return []
+ current_retry += 1
+ if current_retry >= self.max_retry:
+ self.local_queue = []
+ LOG.exception(("Failed to retry to send sample data "
+ "with max_retry times"))
+ raise
+ else:
+ queue.pop(0)
+ return []
+
+ def _check_queue_length(self):
+ queue_length = len(self.local_queue)
+ if queue_length > self.max_queue_length > 0:
+ diff = queue_length - self.max_queue_length
+ self.local_queue = self.local_queue[diff:]
+ LOG.warn(("Kafka Publisher max local queue length is exceeded, "
+ "dropping %d oldest data") % diff)
+
+ def _check_kafka_connection(self):
+ try:
+ self._get_client()
+ except Exception as e:
+ LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+
+ if self.policy == 'queue':
+ self._check_queue_length()
+ else:
+ self.local_queue = []
+ raise Exception('Kafka Client is not available, '
+ 'please restart Kafka client')
+
+ def _get_client(self):
+ if not self.kafka_client:
+ self.kafka_client = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+
+ def _get_server(self):
+ if not self.kafka_server:
+ self.kafka_server = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+
+
+ def _send(self, data):
+ #for d in data:
+ try:
+ self.kafka_producer.send_messages(
+ self.topic, json.dumps(data))
+ except Exception as e:
+ LOG.exception(("Failed to send sample data: %s"), e)
+ raise
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py
new file mode 100644
index 0000000..4d7cff0
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py
@@ -0,0 +1,20 @@
+import kafka
+import kafka_broker
+from oslo_utils import netutils
+import logging
+
+def read_notification_from_ceilometer_over_kafka(parse_target):
+ logging.info("Kafka target:%s",parse_target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ for message in kafka_publisher.kafka_consumer:
+ #print message.value
+ logging.info("%s",message.value)
+ #print status
+ except Exception as e:
+ logging.error("Error in Kafka setup:%s ",e.__str__())
+
+ceilometer_client="kafka://10.11.10.1:9092?topic=test"
+logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s %(message)s',filename='kafka_client.log',level=logging.INFO)
+parse_target=netutils.urlsplit(ceilometer_client)
+read_notification_from_ceilometer_over_kafka(parse_target)
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py
new file mode 100644
index 0000000..1c30d63
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py
@@ -0,0 +1,22 @@
+import socket
+import msgpack
+from oslo_utils import units
+import logging
+UDP_IP = "10.11.10.1"
+UDP_PORT = 5006
+
+logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s %(message)s',filename='udp_client.log',level=logging.INFO)
+udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+udp.bind((UDP_IP, UDP_PORT))
+while True:
+ data, source = udp.recvfrom(64 * units.Ki)
+ #print data
+ #try:
+ sample = msgpack.loads(data, encoding='utf-8')
+ logging.info("%s",sample)
+ print sample
+ #except Exception:
+ #logging.info("%s",sample)
+ # print ("UDP: Cannot decode data sent by %s"), source