move ceilometer service over from xos repo
+Subscribe-Publish Frame Work:
+1.Command to Install Flask Webserver frame work.
+ sudo pip install Flask
+ Along with flask we need the following packages:
+ msgpack
+ fnmatch
+ operator
+ logging
+ oslo_utils
+ ConfigParser
+ iii.pub_sub.conf
+3.Command to start the server:
+ #python
+4.Command for subscription:
+ i.app_id:Application ID,should be unique.
+ Presently only udp is supported.
+ a.udp:<ip:portno>
+ b.kafka:<kafkaip:kafkaport>
+ iii.sub_info:Sunscription notifications.ex:cpu_util,cpu_*
+ iv.query:
+ Below information need to provide as part of query.
+ a.field:fileds like user id ,porject id etc.,
+ b.op:"eq","gt","lt" etc.,
+ c.value:value of the fileds.
+ Example:
+ curl -i -H "Content-Type: application/json" -X SUB -d '{"app_id":"10","target":"udp://","sub_info":"cpu_util","query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L
+5.Command for unsunscription:
+ For unsubcription only appid will be needed.
+ curl -i -H "Content-Type: application/json" -X UNSUB -d '{"app_id":"10"}'
+level = DEBUG
+filename = sflow_pub_sub.log
+webserver_host =
+webserver_port = 33333
+listening_ip_addr =
+listening_port = 6343
+# This file autogenerated by sflow service synchronizer
+# It contains a list of attributes to be used by sflow service
+# syntax: key=value
+level = DEBUG
+filename = sflow_pub_sub.log
+webserver_host =
+webserver_port = {{ sflow_api_port }}
+listening_ip_addr =
+listening_port = {{ sflow_port }}
+import socket,thread
+import sys
+import fnmatch
+import operator
+import logging
+import ConfigParser
+from urlparse import urlparse
+from sflow_sub_records import *
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+app = Flask(__name__)
+ 'gt':,
+ 'lt':,
+ 'ge':,
+ 'le': operator.le,
+ 'eq': operator.eq,
+ 'ne':,
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+def subscribe():
+ logging.debug(" SUB data:%s",
+ target =
+ parse_target=urlparse(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+ status = ""
+ if parse_target.scheme == "udp" :
+ host=parse_target.hostname
+ port=parse_target.port
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+ subscrip_obj=sflow_sub_record(scheme,None,app_ip,app_port,None,None)
+ status = add_sflow_sub_record(subscrip_obj)
+ print_sflow_sub_records()
+ if parse_target.scheme == "kafka" :
+ pass
+ if parse_target.scheme == "file" :
+ pass
+ return status
+def unsubscribe():
+ try :
+ target =
+ parse_target=urlparse(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+ status = ""
+ if parse_target.scheme == "udp" :
+ host=parse_target.hostname
+ port=parse_target.port
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+ delete_sflow_sub_record(app_ip, app_port)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+ return "UnSubscrition is sucessful! \n"
+def not_found(error):
+ return make_response(jsonify({'error': 'Not found'}), 404)
+def sflow_recv(host,port):
+ udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
+ udp.bind((host, port))
+"Started sflow receive thread on %s:%s",host, str(port))
+ while True:
+ data, source = udp.recvfrom(64000)
+ for obj in sflow_sub_database:
+ target_host = obj.ipaddress
+ target_port = int(obj.portno)
+ try:
+ logging.debug("Replicating the sFlow data to:%s:%s",target_host, str(target_port))
+ udp.sendto(data,(target_host,target_port))
+ except Exception:
+ logging.error ("Unable to send sFlow data to target %s:%s ",target_host,str(target_port))
+ logging.warn("Exiting sflow receive thread")
+def initialize(host,port):
+ thread.start_new(sflow_recv,(host,port,))
+if __name__ == "__main__":
+ try:
+ config = ConfigParser.ConfigParser()
+ webserver_host = config.get('WEB_SERVER','webserver_host')
+ webserver_port = int (config.get('WEB_SERVER','webserver_port'))
+ sflow_listening_ip_addr = config.get('SFLOW','listening_ip_addr')
+ sflow_listening_port = int (config.get('SFLOW','listening_port'))
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+ level = LEVELS.get(log_level, logging.NOTSET)
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+ except Exception as e:
+ print("* Error in config file:",e.__str__())
+ logging.error("* Error in confing file:%s",e.__str__())
+ else:
+ initialize(sflow_listening_ip_addr,sflow_listening_port)
+import fnmatch
+import logging
+class sflow_sub_record:
+ def __init__(self,scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter):
+ logging.debug("* Updating subscription_info ")
+ self.scheme = scheme
+ self.app_id = app_id
+ self.ipaddress = app_ip
+ self.portno = app_port
+ self.subscription_info = subscription_info
+ self.sub_info_filter = sub_info_filter
+def add_sflow_sub_record(record):
+"* inside %s",add_sflow_sub_record.__name__)
+ if not sflow_sub_database:
+ logging.debug("* -----------List is EMpty -------------")
+ sflow_sub_database.append(record)
+ logging.debug("* Subscription is sucessful")
+ return "Subscription is sucessful \n"
+ for x in sflow_sub_database:
+ if (record.ipaddress == x.ipaddress) and (record.portno == x.portno) :
+ logging.warning("* entry already exists\n")
+ return "entry already exists \n"
+ sflow_sub_database.append(record)
+ return "Subscription is sucessful \n"
+def delete_sflow_sub_record(ip,port):
+"* inside %s",delete_sflow_sub_record.__name__)
+ Flag = False
+ for x in sflow_sub_database:
+ if (ip == x.ipaddress) and (port == x.portno) :
+ sflow_sub_database.remove(x)
+ Flag = True
+ logging.debug("* Un-Subscription is sucessful")
+ return "Un-Subscription is sucessful \n"
+ if not Flag :
+ err_str = "No subscription exists with target: udp://" + ip + ":" + str(port) + "\n"
+ logging.error(err_str)
+ raise Exception (err_str)
+def print_sflow_sub_records():
+"* inside %s",print_sflow_sub_records.__name__)
+ for obj in sflow_sub_database:
+ logging.debug("* ------------------------------------------------")
+ logging.debug("* scheme:%s",obj.scheme)
+ logging.debug("* app_id:%s",obj.app_id)
+ logging.debug("* portno:%s",obj.portno )
+ logging.debug("* ipaddress:%s",obj.ipaddress)
+ logging.debug("* portno:%s",obj.portno)
+ logging.debug("* subscription_info:%s",obj.subscription_info)
+ logging.debug("* sub_info_filter:%s",obj.sub_info_filter)
+ logging.debug("* ------------------------------------------------")
+def get_sflow_sub_records(notif_subscription_info):
+"* inside %s",get_sflow_sub_records.__name__)
+ sub_list=[]
+ for obj in sflow_sub_database:
+ if obj.subscription_info == notif_subscription_info:
+ sub_list.append(obj)
+ return sub_list
