Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)

Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/README b/xos/synchronizer/ceilometer/ceilometer_pub_sub/README
new file mode 100644
index 0000000..dbd6a5f
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/README
@@ -0,0 +1,70 @@
+
+Subscribe-Publish Frame Work:
+1.Command to Install Flask Webserver frame work.
+  sudo pip install Flask
+
+  Along with flask we need the following packages:
+   msgpack
+   fnmatch
+   operator
+   logging
+   oslo_utils
+   ConfigParser
+ 
+2.Files: i.sub_main.py
+         ii.pubrecords.py
+         iii.pub_sub.conf
+
+3.Command to start the server:
+    #python sun_main.py
+4.Command for subscription:
+      i.app_id:Application ID,should be unique.
+      ii.target:
+           Presently only udp is supported.
+           a.udp:<ip:portno>
+           b.kafka:<kafkaip:kafkaport>
+      iii.sub_info:Sunscription notifications.ex:cpu_util,cpu_*
+           It can be given as single input or list.
+      iv.query:
+         Below information need to provide as part of query.
+         a.field:fileds like user id ,porject id etc.,
+         b.op:"eq","gt","lt" etc.,
+         c.value:value of the fileds.
+     Example:
+  		 curl -i -H "Content-Type: application/json" -X POST -d '{"app_id":"10","target":"udp://10.11.10.1:5006","sub_info":"cpu_util","query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L http://10.11.10.1:4455/subscribe
+           curl -i -H "Content-Type: application/json" -X POST -d '{"app_id":"10","target":"udp://10.11.10.1:5006", "sub_info":["cpu_util", "memory"],"query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L http://10.11.10.1:4455/subscribe
+
+5.Command for unsunscription:
+    For unsubcription only appid will be needed.
+    curl -i -H "Content-Type: application/json" -X POST -d '{"app_id":"10"}' http://10.11.10.1:4455/unsubscribe
+
+6.Running Kafka on the server server where pub-sub module is running:
+  i.Download the kafka from:
+     #https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
+     http://apache.arvixe.com/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz
+  ii.install java
+     sudo apt-get update
+     sudo apt-get install default-jre
+  iii. install kafka package
+     sudo easy_install pip
+     sudo pip install kafka-python
+  iv.tar -xzf kafka_2.11-0.9.0.0.tgz
+  v. Start the zookeeper server:
+      bin/zookeeper-server-start.sh config/zookeeper.properties
+  vi.Start Kafka Server :
+      bin/kafka-server-start.sh config/server.properties
+  vii.To read messages from kafka on a topic test :
+     bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
+  viii.Before configuring kafka:// publisher in ceilometer:
+     install kafka on both controller node and compute nodes
+     Restart the ceilometer-agent-notification, ceilometer-agent-compute, ceilometer-agent-central daemons
+
+7.[Optional]Install Kafka-web-console (GUI)
+  i.wget https://github.com/adamfokken/kafka-web-console/archive/topic-add-remove.zip
+  ii.unzip it
+  iii.wget http://downloads.typesafe.com/typesafe-activator/1.3.2/typesafe-activator-1.3.2-minimal.zip
+  iv.unzip it and add it to the system path so you can execute the activator command that it provides.
+  v.Install javac if required: sudo apt-get install openjdk-7-jdk
+  vi.cd kafka-web-console-topic-add-remove
+  vii.activator start -DapplyEvolutions.default=true
+  viii.Point your browser to the kafka we-console port (9000) and register the zookeeper
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/kafka_broker.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/kafka_broker.py
new file mode 100644
index 0000000..778486b
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/kafka_broker.py
@@ -0,0 +1,167 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from kafka import TopicPartition
+from oslo_config import cfg
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+    def __init__(self, parsed_url):
+        self.kafka_client = None
+        self.kafka_server = None
+
+        self.host, self.port = netutils.parse_host_port(
+            parsed_url.netloc, default_port=9092)
+
+        self.local_queue = []
+
+        params = urlparse.parse_qs(parsed_url.query)
+        self.topic = params.get('topic', ['ceilometer'])[-1]
+        self.policy = params.get('policy', ['default'])[-1]
+        self.max_queue_length = int(params.get(
+            'max_queue_length', [1024])[-1])
+        self.max_retry = int(params.get('max_retry', [100])[-1])
+
+        if self.policy in ['default', 'drop', 'queue']:
+            LOG.info(('Publishing policy set to %s') % self.policy)
+        else:
+            LOG.warn(('Publishing policy is unknown (%s) force to default')
+                     % self.policy)
+            self.policy = 'default'
+
+        try:
+            self._get_client()
+            self._get_server()
+        except Exception as e:
+            LOG.exception("Failed to connect to Kafka service: %s", e)
+
+    def publish_samples(self, context, samples):
+        """Send a metering message for kafka broker.
+
+        :param context: Execution context from the service or RPC call
+        :param samples: Samples from pipeline after transformation
+        """
+        samples_list = [
+            utils.meter_message_from_counter(
+                sample, cfg.CONF.publisher.telemetry_secret)
+            for sample in samples
+        ]
+
+        self.local_queue.append(samples_list)
+
+        try:
+            self._check_kafka_connection()
+        except Exception as e:
+            raise e
+
+        self.flush()
+
+    def flush(self):
+        queue = self.local_queue
+        self.local_queue = self._process_queue(queue)
+        if self.policy == 'queue':
+            self._check_queue_length()
+
+    def publish_events(self, context, events):
+        """Send an event message for kafka broker.
+
+        :param context: Execution context from the service or RPC call
+        :param events: events from pipeline after transformation
+        """
+        events_list = [utils.message_from_event(
+            event, cfg.CONF.publisher.telemetry_secret) for event in events]
+
+        self.local_queue.append(events_list)
+
+        try:
+            self._check_kafka_connection()
+        except Exception as e:
+            raise e
+
+        self.flush()
+
+    def _process_queue(self, queue):
+        current_retry = 0
+        while queue:
+            data = queue[0]
+            try:
+                self._send(data)
+            except Exception:
+                LOG.warn(("Failed to publish %d datum"),
+                         sum([len(d) for d in queue]))
+                if self.policy == 'queue':
+                    return queue
+                elif self.policy == 'drop':
+                    return []
+                current_retry += 1
+                if current_retry >= self.max_retry:
+                    self.local_queue = []
+                    LOG.exception(("Failed to retry to send sample data "
+                                      "with max_retry times"))
+                    raise
+            else:
+                queue.pop(0)
+        return []
+
+    def _check_queue_length(self):
+        queue_length = len(self.local_queue)
+        if queue_length > self.max_queue_length > 0:
+            diff = queue_length - self.max_queue_length
+            self.local_queue = self.local_queue[diff:]
+            LOG.warn(("Kafka Publisher max local queue length is exceeded, "
+                     "dropping %d oldest data") % diff)
+
+    def _check_kafka_connection(self):
+        try:
+            self._get_client()
+        except Exception as e:
+            LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+
+            if self.policy == 'queue':
+                self._check_queue_length()
+            else:
+                self.local_queue = []
+            raise Exception('Kafka Client is not available, '
+                            'please restart Kafka client')
+
+    def _get_client(self):
+        if not self.kafka_client:
+            self.kafka_client = kafka.KafkaClient(
+                "%s:%s" % (self.host, self.port))
+            self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+    
+    def _get_server(self):
+        if not self.kafka_server:
+           self.kafka_server = kafka.KafkaClient(
+                "%s:%s" % (self.host, self.port))
+           #self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+           self.kafka_consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (self.host,self.port)])
+           self.kafka_consumer.assign([TopicPartition(self.topic,0)])
+           self.kafka_consumer.seek_to_end()
+
+    def _send(self, data):
+        #for d in data:
+            try:
+                self.kafka_producer.send_messages(
+                    self.topic, json.dumps(data))
+            except Exception as e:
+                LOG.exception(("Failed to send sample data: %s"), e)
+                raise
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
new file mode 100644
index 0000000..6998903
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pub_sub.conf
@@ -0,0 +1,44 @@
+#[LOGGING]
+#level = INFO
+#filename = pub_sub.log
+#maxbytes = 1000000
+#backupcount = 5
+
+[WEB_SERVER]
+webserver_host = localhost
+webserver_port = 4455 
+
+[CLIENT]
+target = kafka://localhost:9092?topic=ceilometer
+#target = udp://10.11.10.1:5004/
+
+[RABBITMQ]
+#UpdateConfMgmt = True
+UpdateConfMgmt = False
+Rabbitmq_username = openstack
+Rabbitmq_passwd = password 
+Rabbitmq_host = localhost
+Rabbitmq_port = 5672
+
+[loggers]
+keys=root
+
+[handlers]
+keys=logfile
+
+[formatters]
+keys=logfileformatter
+
+[logger_root]
+level=INFO
+handlers=logfile
+
+[formatter_logfileformatter]
+format='%(asctime)s %(filename)s %(levelname)s %(message)s'
+
+[handler_logfile]
+class=handlers.RotatingFileHandler
+level=NOTSET
+args=('pub_sub.log','a',1000000,5)
+formatter=logfileformatter
+
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/pubrecords.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pubrecords.py
new file mode 100644
index 0000000..bca62b8
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/pubrecords.py
@@ -0,0 +1,95 @@
+#!/usr/bin/python
+import socket
+from oslo_utils import units
+from oslo_utils import netutils
+import kafka
+import kafka_broker
+import fnmatch
+import logging
+import copy
+
+sub_info=[] 
+class subinfo:
+    def __init__(self,scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target):
+        logging.debug("* Updating subscription_info ") 
+        self.scheme = scheme
+        self.app_id = app_id
+        self.ipaddress = app_ip 
+        self.portno = app_port 
+        self.subscription_info = subscription_info
+        self.sub_info_filter = sub_info_filter
+        self.target = target
+        
+        if scheme == "kafka":
+            ''' Creating kafka publisher to send message over kafka '''
+            parse_target = netutils.urlsplit(target)
+            self.kafka_publisher = kafka_broker.KafkaBrokerPublisher(parse_target)
+        elif scheme == "udp":
+            ''' Creating UDP socket to send message over UDP '''
+            self.udp = socket.socket(socket.AF_INET, # Internet
+                                     socket.SOCK_DGRAM) # UDP
+            self.udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)   
+
+    def update_subinfo(self):
+        logging.info("* inside %s",self.update_subinfo.__name__)
+        if not sub_info:
+            logging.debug("* -----------List is EMpty -------------") 
+            sub_info.append(self)
+            logging.debug("* Subscription is sucessful") 
+            return "Subscription is sucessful \n" 
+        for obj in sub_info:
+            if obj.app_id == self.app_id :
+               # obj.subscription_info=self.subscription_info
+                sub_info.remove(obj)
+                sub_info.append(self)
+                logging.warning("* entry already exists so overwriting this subscription \n")
+                return "entry already exists so overwriting this subscription \n" 
+        sub_info.append(self)
+        return "Subscription is sucessful \n"
+ 
+    @staticmethod
+    def delete_subinfo(app_id):
+        logging.info("* inside %s",subinfo.delete_subinfo.__name__)
+        Flag = False 
+        for obj in sub_info:
+            if obj.app_id == app_id : 
+                    sub_info.remove(obj)
+                    Flag = True
+                    logging.debug("* Un-Subscription is sucessful") 
+                    return "Un-Subscription is sucessful \n"
+        if not Flag :
+           err_str = "No subscription exists with app id: " + app_id + "\n"
+           logging.error("* No subscription exists with app id:%s ",app_id)
+           raise Exception (err_str)
+       
+    @staticmethod
+    def print_subinfo():
+        logging.info("* inside %s",subinfo.print_subinfo.__name__)
+        for obj in sub_info:
+            logging.debug("* ------------------------------------------------") 
+            logging.debug("* scheme:%s",obj.scheme)  
+            logging.debug("* app_id:%s",obj.app_id)
+            logging.debug("* portno:%s",obj.portno ) 
+            logging.debug("* ipaddress:%s",obj.ipaddress)  
+            logging.debug("* subscription_info:%s",obj.subscription_info)
+            logging.debug("* sub_info_filter:%s",obj.sub_info_filter)
+            logging.debug("* target:%s",obj.target)
+            logging.debug("* ------------------------------------------------")
+    @staticmethod
+    def get_subinfo(app_id):
+        logging.info("* inside %s",subinfo.get_subinfo.__name__)
+        Flag = False
+        for obj in sub_info:
+            if obj.app_id == app_id :
+                    return obj.subscription_info,obj.target
+        return (None,None)
+       
+ 
+    @staticmethod
+    def get_sub_list(notif_subscription_info):
+        logging.info("* inside %s",subinfo.get_sublist.__name__)
+        sub_list=[]  
+        for obj in sub_info:
+            if obj.subscription_info == notif_subscription_info:
+                sub_list.append(obj)
+        return sub_list
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py
new file mode 100644
index 0000000..e7ebcf4
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/sub_main.py
@@ -0,0 +1,559 @@
+#!/usr/bin/python
+import socket,thread
+import sys
+import msgpack
+import fnmatch
+import operator
+import logging
+import logging.handlers
+import logging.config
+import ConfigParser
+import json
+from oslo_utils import units
+from oslo_utils import netutils
+from pubrecords import *
+import kafka
+import kafka_broker
+
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+app = Flask(__name__)
+
+COMPARATORS = {
+    'gt': operator.gt,
+    'lt': operator.lt,
+    'ge': operator.ge,
+    'le': operator.le,
+    'eq': operator.eq,
+    'ne': operator.ne,
+}
+
+LEVELS = {'DEBUG': logging.DEBUG,
+          'INFO': logging.INFO,
+          'WARNING': logging.WARNING,
+          'ERROR': logging.ERROR,
+          'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+''' Stores all the subscribed meter's list '''
+meter_list = []
+''' Stores meter to app-id mapping '''
+meter_dict = {}
+
+@app.route('/subscribe',methods=['POST','SUB'])
+def subscribe():
+    try :
+        app_id = request.json['app_id']
+        target = request.json['target']
+        sub_info = request.json['sub_info']
+
+        try :
+            validate_sub_info(sub_info)
+        except Exception as e:
+            logging.error("* %s",e.__str__())
+            return e.__str__()
+
+        ''' Flag to Update pipeling cfg file '''
+        config = ConfigParser.ConfigParser()
+        config.read('pub_sub.conf')
+        if config.get('RABBITMQ','UpdateConfMgmt') == "True" : 
+            update_pipeline_conf(sub_info,target,app_id,"ADD")
+        else:
+            logging.warning("Update Conf Mgmt flag is disabled,enable the flag to  update Conf Mgmt")
+
+        if not 'query' in request.json.keys():
+            logging.info("query request is not provided by user")
+            query = None 
+        else:
+             query = request.json['query']
+             for i in range(len(query)):
+                 if not 'field' in query[i].keys():
+                     err_str = "Query field"
+                     raise Exception (err_str)
+                 if not 'op' in query[i].keys():
+                     err_str = "Query op"
+                     raise Exception (err_str)
+                 if not 'value' in query[i].keys():
+                     err_str = "Query value" 
+                     raise Exception (err_str)
+    except Exception as e:
+        err_str = "KeyError: Parsing subscription request " + e.__str__() + "\n"
+        logging.error("* KeyError: Parsing subscription request :%s",e.__str__())  
+        return err_str 
+
+    parse_target=netutils.urlsplit(target)
+    if not parse_target.netloc:
+        err_str = "Error:Invalid target format"
+        logging.error("* Invalid target format")
+        return err_str 
+
+    status = "" 
+    if parse_target.scheme == "udp" or  parse_target.scheme == "kafka":
+         host,port=netutils.parse_host_port(parse_target.netloc)
+         scheme = parse_target.scheme
+         app_ip = host 
+         app_port = port
+ 
+         if host == None or port == None :
+             err_str = "* Error: Invalid IP Address format"
+             logging.error("* Invalid IP Address format")
+             return err_str
+  
+         subscription_info = sub_info
+         sub_info_filter = query 
+         logging.info("Creating subscription for app:%s for meters:%s with filters:%s and target:%s",app_id, subscription_info, sub_info_filter, target)
+         subscrip_obj=subinfo(scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target)
+         status = subscrip_obj.update_subinfo()
+         subinfo.print_subinfo()
+
+    if parse_target.scheme == "file" :
+         pass
+    return status 
+
+@app.route('/unsubscribe',methods=['POST','UNSUB'])
+def unsubscribe():
+    try :  
+        app_id = request.json['app_id']
+        sub_info,target = subinfo.get_subinfo(app_id)
+        if sub_info is None or target is None:
+            err_str = "No subscription exists with app id: " + app_id + "\n"
+            logging.error("* No subscription exists with app id:%s ",app_id)
+            return err_str 
+        else:
+            ''' Flag to Update pipeling cfg file '''
+            config = ConfigParser.ConfigParser()
+            config.read('pub_sub.conf')
+            if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
+                update_pipeline_conf(sub_info,target,app_id,"DEL")
+            else:
+                logging.warning("Update Conf Mgmt flag is disabled,enable the flag to  update Conf Mgmt")
+            #update_pipeline_conf(sub_info,target,"DEL")
+            subinfo.delete_subinfo(app_id)
+    except Exception as e:
+         logging.error("* %s",e.__str__())
+         return e.__str__()
+    return "UnSubscrition is sucessful! \n"
+
+@app.errorhandler(404)
+def not_found(error):
+    return make_response(jsonify({'error': 'Not found'}), 404)
+
+def print_subscribed_meter_list():
+    logging.debug("-------------------------------------------------")
+    #print (meter_list)
+    logging.debug("meter_list:%s",meter_list)
+    logging.debug("meter_dict:%s",meter_dict)
+    #print (meter_dict)
+    logging.debug("-------------------------------------------------")
+
+def validate_sub_info(sub_info):
+    if type(sub_info) is not list:
+        sub_info = [sub_info]
+    for meter in sub_info:
+        if meter.startswith("*") or meter.startswith("!"):
+            err_str = "Given meter is not supported:" + meter + "\n"
+            logging.error("* Given meter is not supported:%s",meter)
+            raise Exception (err_str)
+
+def update_meter_dict(meterinfo,app_id):
+    try :
+         if type(meterinfo) == list:
+             for meter in meterinfo:  
+                 if meter_dict.get(meter) is None:
+                     meter_dict[meter] = [app_id]
+                 elif app_id not in meter_dict.get(meter):
+                     meter_dict.get(meter).append(app_id)
+         else:
+             if meter_dict.get(meterinfo) is None:
+                 meter_dict[meterinfo] = [app_id]
+             elif app_id not in meter_dict.get(meterinfo):
+                 meter_dict.get(meterinfo).append(app_id)
+    except Exception as e:
+         logging.error("* %s",e.__str__())
+
+def check_send_msg_confmgmt_del(sub_info,app_id):
+    temp_sub_info=[]
+    temm_meter_info = None
+    if len(meter_list) == 0:
+        #print("No subscription exists")
+        logging.info("No subscription exists")
+        return False,None
+    if type(sub_info) == list:
+       for meterinfo in sub_info:
+           if meter_dict.get(meterinfo) is None:
+              logging.warning("%s meter doesn't exist in the meter dict",meterinfo)
+              continue 
+           if app_id in meter_dict.get(meterinfo):
+               if len(meter_dict.get(meterinfo)) == 1:
+                   #print "Only single app is subscribing this meter"
+                   logging.info("Only single app is subscribing this meter")
+                   del meter_dict[meterinfo]
+                   temp_sub_info.append(meterinfo)
+                   if meterinfo in meter_list:
+                       meter_list.remove(meterinfo)
+               else:
+                   meter_dict.get(meterinfo).remove(app_id)  
+       return True,temp_sub_info 
+    else :
+         if meter_dict.get(sub_info) is None:
+              logging.warning("%s meter doesn't exist in the meter dict",sub_info)
+              return False,None 
+         if app_id in meter_dict.get(sub_info):
+             if len(meter_dict.get(sub_info)) == 1:
+                  #print "Only single app is subscribing this meter"
+                  logging.info("Only single app is subscribing this meter")
+                  del meter_dict[sub_info]
+                  if sub_info in meter_list:
+                     meter_list.remove(sub_info)
+                  return True,sub_info   
+             else:
+                 meter_dict.get(sub_info).remove(app_id)
+    return False,None 
+     
+def check_send_msg_confmgmt_add(sub_info,app_id):
+    temp_sub_info=[]
+    update_meter_dict(sub_info,app_id)
+    #import pdb;pdb.set_trace()
+    if len(meter_list) == 0:
+        logging.info("No subinfo exits")
+        if type(sub_info) == list:
+            for j in sub_info:
+                meter_list.append(j)
+            return True,sub_info
+        else :
+            meter_list.append(sub_info)
+            return True,sub_info
+    if type(sub_info) == list:
+        for j in sub_info:
+            if j in meter_list:
+                #print ("meter already exists",j)
+                logging.info("meter already exist:%s",j)
+                continue
+            else :
+                 temp_sub_info.append(j)  
+                 meter_list.append(j)
+        if temp_sub_info is not None:
+            return True,temp_sub_info
+        else :
+            return False,None
+    else :
+         if sub_info not in meter_list:         
+             meter_list.append(sub_info)
+             #print ("subscription for  meter doesn't exist",sub_info)
+             logging.warning("subscription for  meter doesn't exist:%s",sub_info)
+             return True,sub_info
+         else :  
+             #print ("subscription already exist for ",sub_info)
+             logging.info("subscription already exist for:%s ",sub_info)
+             return False,sub_info         
+
+def update_pipeline_conf(sub_info,target,app_id,flag):
+    import pika
+
+    logging.debug("* sub_info:%s",sub_info)
+    logging.debug("* target:%s",target)
+  
+    #msg={"sub_info":sub_info,"target":target,"action":flag}
+    
+    #json_msg=json.dumps(msg)
+    #msg="image"
+    meter_sub_info = None
+    if flag == "ADD":
+       status,meter_sub_info=check_send_msg_confmgmt_add(sub_info,app_id)
+       if status == False or meter_sub_info == None or meter_sub_info == []:
+           logging.warning("%s is already subscribed with the conf mgmt")
+           return 
+    elif flag == "DEL": 
+       status,meter_sub_info=check_send_msg_confmgmt_del(sub_info,app_id)
+       if status == False or meter_sub_info == None or meter_sub_info == []:
+           logging.warning("%s is already unsubscribed with the conf mgmt")
+           return 
+    try :
+        config = ConfigParser.ConfigParser()
+        config.read('pub_sub.conf')
+        rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
+        rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
+        rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
+        rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
+
+        ceilometer_client_info = config.get('CLIENT','target')
+        #msg={"sub_info":sub_info,"target":ceilometer_client_info,"action":flag}
+        msg={"sub_info":meter_sub_info,"target":ceilometer_client_info,"action":flag}
+        #print msg
+        json_msg=json.dumps(msg)
+
+        credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
+        parameters = pika.ConnectionParameters(rabbitmq_host,
+                                               rabbitmq_port,
+                                               '/',
+                                               credentials)
+        connection = pika.BlockingConnection(parameters)
+        properties = pika.BasicProperties(content_type = "application/json")
+        channel = connection.channel()
+        channel.exchange_declare(exchange='pubsub',
+                         type='fanout')
+ 
+        channel.basic_publish(exchange='pubsub',
+                              routing_key='',
+                              properties = properties, 
+                              body=json_msg)
+        logging.debug(" [x] %s Sent",msg)
+        logging.info(" [x] %s Sent",msg)
+        connection.close() 
+    except Exception as e:
+           logging.error("Error:%s",e.__str__())
+  
+def read_notification_from_ceilometer(host,port):
+     UDP_IP = host 
+     UDP_PORT = port
+ 
+     logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT) 
+     udp = socket.socket(socket.AF_INET, # Internet
+                          socket.SOCK_DGRAM) # UDP
+     udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+     udp.bind((UDP_IP, UDP_PORT))
+     
+     while True:
+            #print thread.get_ident() 
+            #logging.debug("thread.get_ident():%s", thread.get_ident()) 
+            data, source = udp.recvfrom(64 * units.Ki)
+            sample = msgpack.loads(data, encoding='utf-8')
+            #logging.debug("* -------------------------------------------------------")
+            logging.debug("%s",sample)
+            #print(sample)
+            for obj in sub_info:
+                msg_list = []
+                if obj.scheme == "udp" :
+                    if type(obj.subscription_info) is list:
+                        for info in obj.subscription_info:
+                            msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
+                    else :
+                        msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info)) 
+                    try:  
+                        if reduce(operator.or_, msg_list): 
+                            host = obj.ipaddress
+                            port = int(obj.portno)
+                            l=[]
+                            #logging.debug("* -------------------------------------------------------")
+                            if obj.sub_info_filter is None:
+                                try:  
+                                    logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port) 
+                                    udp.sendto(data,(host,port))
+                                except Exception as e:
+                                    logging.error ("Unable to send sample over UDP for %s and %s,%s",host,port,e.__str__())
+                                    ret_str = ("Unable to send sample over UDP for %s and %s,%s")%(host,port,e.__str__())
+                                continue 
+                            for i in range(len(obj.sub_info_filter)):
+                                if obj.sub_info_filter[i]['op'] in COMPARATORS:
+                                    op = COMPARATORS[obj.sub_info_filter[i]['op']]
+                                    logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
+                                    logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
+                                    l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
+                                    logging.info("* Logical and of Query %s",l)    
+                                else:
+                                    logging.deubg("* Not a valid operator ignoring app_id:%s",obj.app_id)
+                                    l.append(False)
+                                    logging.info("* Logical and of Query %s",l)    
+                            if reduce(operator.and_, l):
+                                try:  
+                                    logging.debug("* Sending data over UDP for host:%s and port:%s",host,port) 
+                                    udp.sendto(data,(host,port))
+                                except Exception:
+                                    logging.error ("Unable to send sample over UDP for %s and %s ",host,port)
+                                    ret_str = ("Unable to send sample over UDP for %s and %s ")%(host,port)
+                            else :
+                                 logging.warning("* No Notification found with the given subscription")
+                        else :
+                            logging.warning("* No valid subscrition found for %s",obj.app_id)
+                    except Exception as e:
+                       logging.error("Key_Error:%s ",e.__str__())
+                       ret_str = ("Key_Error:%s \n")% e.__str__()
+
+def read_notification_from_ceilometer_over_udp(host,port):
+    UDP_IP = host
+    UDP_PORT = port
+
+    logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
+    udp = socket.socket(socket.AF_INET, # Internet
+                          socket.SOCK_DGRAM) # UDP
+    udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+
+    udp.bind((UDP_IP, UDP_PORT))
+
+    while True:
+        #print thread.get_ident()
+        #logging.debug("thread.get_ident():%s", thread.get_ident())
+        data, source = udp.recvfrom(64 * units.Ki)
+        sample = msgpack.loads(data, encoding='utf-8')
+        status = process_ceilometer_message(sample,data)
+  
+def read_notification_from_ceilometer_over_kafka(parse_target):
+    logging.info("Kafka target:%s",parse_target)
+    try :
+        kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+        for message in kafka_publisher.kafka_consumer:
+            #print message.value
+            #logging.debug("%s",message.value)
+            #logging.info("%s",message.value)
+            status = process_ceilometer_message(json.loads(message.value),message.value)
+            #print status
+    except Exception as e:
+        logging.error("Error in Kafka setup:%s ",e.__str__())
+
+def process_ceilometer_message(sample,data):
+    logging.debug("%s",sample)
+    #logging.info("%s",sample)
+    if len(sub_info) < 1:
+        #print  "No subscription exists"
+        return
+    for obj in sub_info:
+         #import pdb;pdb.set_trace()
+         msg_list = []
+         if type(obj.subscription_info) is list:
+             for info in obj.subscription_info:
+                 msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
+         else :
+             msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
+         try:
+             if reduce(operator.or_, msg_list):
+                 ''' 
+                 kafka_publisher = None
+                 if obj.scheme == "kafka" :
+		    parse_target=netutils.urlsplit(obj.target)
+	            try :
+		        kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+                    except Exception as e:
+                        logging.error("* Error in connecting kafka broker:%s",e.__str__())
+                       # return False
+                        continue 
+                 '''
+                 host = obj.ipaddress
+                 port = int(obj.portno)
+                 l=[]
+                 logging.debug("* -------------------------------------------------------")
+                 if obj.sub_info_filter is None:
+                     try:
+                         if obj.scheme == "udp" :
+                              #logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
+                              #logging.info("* Sending data without query over UDP for host:%s and port:%s",host,port)
+                              #udp = socket.socket(socket.AF_INET, # Internet
+                              #                     socket.SOCK_DGRAM) # UDP
+                              #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
+                              obj.udp.sendto(data,(host,port))
+                              #return True
+                              continue
+                         elif obj.scheme == "kafka" :
+                              #logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
+                              #logging.info("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
+                              obj.kafka_publisher._send(sample)  
+                              #return True
+                              continue                                  
+                     except Exception as e:
+                          logging.error ("Unable to send sample over UDP/kafka for %s and %s,%s",host,port,e.__str__())
+                          ret_str = ("Unable to send sample over UDP for %s and %s,%s ")%(host,port,e.__str__())
+                          #return False
+                          continue 
+                 for i in range(len(obj.sub_info_filter)):
+                     if obj.sub_info_filter[i]['op'] in COMPARATORS:
+                          op = COMPARATORS[obj.sub_info_filter[i]['op']]
+                          #logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
+                          #logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
+                          l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
+                          #logging.info("* Logical and of Query %s",l)
+                     else:
+                          logging.info("* Not a valid operator ignoring app_id:%s",obj.app_id)
+                          l.append(False)
+                          #logging.info("* Logical and of Query %s",l)
+                 if reduce(operator.or_, l):
+                     try:
+                         if obj.scheme == "udp" :
+                              logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
+                              #udp = socket.socket(socket.AF_INET, # Internet
+                              #                    socket.SOCK_DGRAM) # UDP
+                              #udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+                              obj.udp.sendto(data,(host,port))
+                              #return True
+                              continue
+                         elif obj.scheme == "kafka" :
+                              logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,obj.kafka_publisher.topic)
+                              obj.kafka_publisher._send(sample)  
+                              #return True
+                              continue                                  
+                     except Exception:
+                         logging.error ("Unable to send sample over UDP/Kafka for %s and %s ",host,port)
+                         ret_str = ("Unable to send sample over UDP/Kafka for %s and %s ")%(host,port)
+                         #return False
+                         continue   
+                 else :
+		       logging.debug("* No Notification found with the given subscription")
+                       continue
+             else :
+                  logging.debug("* No matching subscrition found for %s",sample['counter_name'])
+                  continue
+         except Exception as e:
+             logging.error("Key_Error:%s ",e.__str__())
+             ret_str = ("Key_Error:%s \n")%e.__str__()
+             #return False
+             continue
+
+def initialize(ceilometer_client):
+     logging.debug("Ceilometer client info:%s",ceilometer_client)
+     parse_target=netutils.urlsplit(ceilometer_client)
+     if not parse_target.netloc:
+        err_str = "Error:Invalid client format"
+        logging.error("* Invalid client format")
+        return err_str
+     if parse_target.scheme == "udp" :
+         host,port=netutils.parse_host_port(parse_target.netloc)
+         scheme = parse_target.scheme
+         app_ip = host
+         app_port = port
+         if host == None or port == None :
+             err_str = "* Error: Invalid IP Address format"
+             logging.error("* Invalid IP Address format")
+             return err_str
+         thread.start_new(read_notification_from_ceilometer_over_udp,(host,port,))
+     elif parse_target.scheme == "kafka" :
+         thread.start_new(read_notification_from_ceilometer_over_kafka,(parse_target,))
+     
+        
+if __name__ == "__main__":
+
+    try:
+        config = ConfigParser.ConfigParser()
+        config.read('pub_sub.conf')
+        webserver_host = config.get('WEB_SERVER','webserver_host')
+        webserver_port = int (config.get('WEB_SERVER','webserver_port'))
+       # client_host    = config.get('CLIENT','client_host')
+      #  client_port    = int (config.get('CLIENT','client_port'))
+        ceilometer_client_info = config.get('CLIENT','target')
+        '''  
+        log_level      = config.get('LOGGING','level')
+        log_file       = config.get('LOGGING','filename')
+        maxbytes       = int (config.get('LOGGING','maxbytes'))
+        backupcount    = int (config.get('LOGGING','backupcount'))
+        level = LEVELS.get(log_level, logging.NOTSET)
+        '''
+        logging.config.fileConfig('pub_sub.conf', disable_existing_loggers=False)
+        ''' 
+        logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
+                    datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+
+        # create rotating file handler
+        
+        rfh = logging.handlers.RotatingFileHandler(
+                 log_file, encoding='utf8', maxBytes=maxbytes,
+                 backupCount=backupcount,delay=0)
+        logging.getLogger().addHandler(rfh)
+        '''
+         
+    except Exception as e:
+        print("* Error in config file:",e.__str__())
+        #logging.error("* Error in confing file:%s",e.__str__())
+    else: 
+        #initialize(client_host,client_port)
+        initialize(ceilometer_client_info)
+        app.run(host=webserver_host,port=webserver_port,debug=False)
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py
new file mode 100644
index 0000000..ce495fc
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py
@@ -0,0 +1,164 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from oslo_config import cfg
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+    def __init__(self, parsed_url):
+        self.kafka_client = None
+        self.kafka_server = None
+
+        self.host, self.port = netutils.parse_host_port(
+            parsed_url.netloc, default_port=9092)
+
+        self.local_queue = []
+
+        params = urlparse.parse_qs(parsed_url.query)
+        self.topic = params.get('topic', ['ceilometer'])[-1]
+        self.policy = params.get('policy', ['default'])[-1]
+        self.max_queue_length = int(params.get(
+            'max_queue_length', [1024])[-1])
+        self.max_retry = int(params.get('max_retry', [100])[-1])
+
+        if self.policy in ['default', 'drop', 'queue']:
+            LOG.info(('Publishing policy set to %s') % self.policy)
+        else:
+            LOG.warn(('Publishing policy is unknown (%s) force to default')
+                     % self.policy)
+            self.policy = 'default'
+
+        try:
+            self._get_client()
+            self._get_server()
+        except Exception as e:
+            LOG.exception("Failed to connect to Kafka service: %s", e)
+
+    def publish_samples(self, context, samples):
+        """Send a metering message for kafka broker.
+
+        :param context: Execution context from the service or RPC call
+        :param samples: Samples from pipeline after transformation
+        """
+        samples_list = [
+            utils.meter_message_from_counter(
+                sample, cfg.CONF.publisher.telemetry_secret)
+            for sample in samples
+        ]
+
+        self.local_queue.append(samples_list)
+
+        try:
+            self._check_kafka_connection()
+        except Exception as e:
+            raise e
+
+        self.flush()
+
+    def flush(self):
+        queue = self.local_queue
+        self.local_queue = self._process_queue(queue)
+        if self.policy == 'queue':
+            self._check_queue_length()
+
+    def publish_events(self, context, events):
+        """Send an event message for kafka broker.
+
+        :param context: Execution context from the service or RPC call
+        :param events: events from pipeline after transformation
+        """
+        events_list = [utils.message_from_event(
+            event, cfg.CONF.publisher.telemetry_secret) for event in events]
+
+        self.local_queue.append(events_list)
+
+        try:
+            self._check_kafka_connection()
+        except Exception as e:
+            raise e
+
+        self.flush()
+
+    def _process_queue(self, queue):
+        current_retry = 0
+        while queue:
+            data = queue[0]
+            try:
+                self._send(data)
+            except Exception:
+                LOG.warn(("Failed to publish %d datum"),
+                         sum([len(d) for d in queue]))
+                if self.policy == 'queue':
+                    return queue
+                elif self.policy == 'drop':
+                    return []
+                current_retry += 1
+                if current_retry >= self.max_retry:
+                    self.local_queue = []
+                    LOG.exception(("Failed to retry to send sample data "
+                                      "with max_retry times"))
+                    raise
+            else:
+                queue.pop(0)
+        return []
+
+    def _check_queue_length(self):
+        queue_length = len(self.local_queue)
+        if queue_length > self.max_queue_length > 0:
+            diff = queue_length - self.max_queue_length
+            self.local_queue = self.local_queue[diff:]
+            LOG.warn(("Kafka Publisher max local queue length is exceeded, "
+                     "dropping %d oldest data") % diff)
+
+    def _check_kafka_connection(self):
+        try:
+            self._get_client()
+        except Exception as e:
+            LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+
+            if self.policy == 'queue':
+                self._check_queue_length()
+            else:
+                self.local_queue = []
+            raise Exception('Kafka Client is not available, '
+                            'please restart Kafka client')
+
+    def _get_client(self):
+        if not self.kafka_client:
+            self.kafka_client = kafka.KafkaClient(
+                "%s:%s" % (self.host, self.port))
+            self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+    
+    def _get_server(self):
+        if not self.kafka_server:
+           self.kafka_server = kafka.KafkaClient(
+                "%s:%s" % (self.host, self.port))
+           self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+
+
+    def _send(self, data):
+        #for d in data:
+            try:
+                self.kafka_producer.send_messages(
+                    self.topic, json.dumps(data))
+            except Exception as e:
+                LOG.exception(("Failed to send sample data: %s"), e)
+                raise
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py
new file mode 100644
index 0000000..4d7cff0
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py
@@ -0,0 +1,20 @@
+import kafka
+import kafka_broker
+from oslo_utils import netutils
+import logging
+
+def read_notification_from_ceilometer_over_kafka(parse_target):
+    logging.info("Kafka target:%s",parse_target)
+    try :
+        kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+        for message in kafka_publisher.kafka_consumer:
+            #print message.value
+            logging.info("%s",message.value)
+            #print status
+    except Exception as e:
+        logging.error("Error in Kafka setup:%s ",e.__str__())
+
+ceilometer_client="kafka://10.11.10.1:9092?topic=test"
+logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s %(message)s',filename='kafka_client.log',level=logging.INFO)
+parse_target=netutils.urlsplit(ceilometer_client)
+read_notification_from_ceilometer_over_kafka(parse_target)
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py
new file mode 100644
index 0000000..1c30d63
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py
@@ -0,0 +1,22 @@
+import socket
+import msgpack
+from oslo_utils import units
+import logging
+UDP_IP = "10.11.10.1"
+UDP_PORT = 5006
+
+logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s %(message)s',filename='udp_client.log',level=logging.INFO)
+udp = socket.socket(socket.AF_INET, # Internet
+                     socket.SOCK_DGRAM) # UDP
+udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+udp.bind((UDP_IP, UDP_PORT))
+while True:
+     data, source = udp.recvfrom(64 * units.Ki)
+     #print data
+     #try:
+     sample = msgpack.loads(data, encoding='utf-8')
+     logging.info("%s",sample)
+     print sample
+     #except Exception:
+         #logging.info("%s",sample)
+     #    print ("UDP: Cannot decode data sent by %s"), source