move ceilometer service over from xos repo
Change-Id: I9402876545cf552675cd0a836e02e2c72fea5f6b
diff --git a/xos/synchronizer/templates/Dockerfile.monitoring_channel b/xos/synchronizer/templates/Dockerfile.monitoring_channel
new file mode 100644
index 0000000..45defb8
--- /dev/null
+++ b/xos/synchronizer/templates/Dockerfile.monitoring_channel
@@ -0,0 +1,26 @@
+FROM ubuntu:14.04.2
+MAINTAINER Andy Bavier <acb@cs.princeton.edu>
+
+# XXX Workaround for docker bug:
+# https://github.com/docker/docker/issues/6345
+# Kernel 3.15 breaks docker, uss the line below as a workaround
+# until there is a fix
+RUN ln -s -f /bin/true /usr/bin/chfn
+# XXX End workaround
+
+# Install.
+RUN apt-get update && apt-get install -y \
+ python-pip \
+ python-dev
+
+RUN pip install web.py
+RUN pip install wsgilog
+RUN pip install python-ceilometerclient
+RUN mkdir -p /usr/local/share
+ADD ceilometer_proxy_server.py /usr/local/share/
+RUN chmod +x /usr/local/share/ceilometer_proxy_server.py
+ADD start_ceilometer_proxy /usr/local/sbin/
+RUN chmod +x /usr/local/sbin/start_ceilometer_proxy
+EXPOSE 8000
+WORKDIR /usr/local/share
+CMD /usr/local/sbin/start_ceilometer_proxy
diff --git a/xos/synchronizer/templates/Dockerfile.sflowpubsub b/xos/synchronizer/templates/Dockerfile.sflowpubsub
new file mode 100644
index 0000000..c9025ee
--- /dev/null
+++ b/xos/synchronizer/templates/Dockerfile.sflowpubsub
@@ -0,0 +1,22 @@
+FROM ubuntu:14.04.2
+MAINTAINER Andy Bavier <acb@cs.princeton.edu>
+
+# XXX Workaround for docker bug:
+# https://github.com/docker/docker/issues/6345
+# Kernel 3.15 breaks docker, uss the line below as a workaround
+# until there is a fix
+RUN ln -s -f /bin/true /usr/bin/chfn
+# XXX End workaround
+
+# Install.
+RUN apt-get update && apt-get install -y \
+ python-pip \
+ python-dev
+
+RUN pip install Flask
+RUN mkdir -p /usr/local/share/
+ADD sflow_pub_sub /usr/local/share/sflow_pub_sub
+RUN chmod +x /usr/local/share/sflow_pub_sub/sflow_pub_sub_main.py
+RUN chmod +x /usr/local/share/sflow_pub_sub/start_sflow_pub_sub
+WORKDIR /usr/local/share/sflow_pub_sub/
+CMD /usr/local/share/sflow_pub_sub/start_sflow_pub_sub
diff --git a/xos/synchronizer/templates/ceilometer_proxy_config.j2 b/xos/synchronizer/templates/ceilometer_proxy_config.j2
new file mode 100644
index 0000000..bd6c521
--- /dev/null
+++ b/xos/synchronizer/templates/ceilometer_proxy_config.j2
@@ -0,0 +1,17 @@
+# This file autogenerated by monitoring-channel observer
+# It contains a list of attributes to be used by ceilometer proxy web server
+# syntax: key=value
+
+[default]
+auth_url={{ auth_url }}
+admin_user={{ admin_user }}
+admin_tenant={{ admin_tenant }}
+admin_password={{ admin_password }}
+ceilometer_pub_sub_url={{ ceilometer_pub_sub_url }}
+
+[allowed_tenants]
+{% if allowed_tenant_ids %}
+{% for tenant_id in allowed_tenant_ids %}
+{{ tenant_id }}
+{% endfor %}
+{% endif %}
diff --git a/xos/synchronizer/templates/ceilometer_proxy_server.py b/xos/synchronizer/templates/ceilometer_proxy_server.py
new file mode 100644
index 0000000..c81b941
--- /dev/null
+++ b/xos/synchronizer/templates/ceilometer_proxy_server.py
@@ -0,0 +1,294 @@
+#!/usr/bin/env python
+import web
+import ConfigParser
+import io
+import json
+from ceilometerclient import client
+import logging
+import urllib
+import urllib2
+from urlparse import urlparse
+from wsgilog import WsgiLog
+
+web.config.debug=False
+
+logfile = "ceilometer_proxy_server.log"
+level=logging.INFO
+logger=logging.getLogger('ceilometer_proxy_server')
+logger.setLevel(level)
+handler=logging.handlers.RotatingFileHandler(logfile,maxBytes=1000000, backupCount=1)
+logger.addHandler(handler)
+
+class FileLog(WsgiLog):
+ def __init__(self, application):
+ WsgiLog.__init__(
+ self,
+ application,
+ logformat = '%(message)s',
+ tofile = True,
+ toprint = True,
+ prnlevel = level,
+ file = logfile,
+ backups =1
+ )
+ def __call__(self, environ, start_response):
+ def hstart_response(status, response_headers, *args):
+ out = start_response(status, response_headers, *args)
+ try:
+ logline=environ["SERVER_PROTOCOL"]+" "+environ["REQUEST_METHOD"]+" "+environ["REQUEST_URI"]+" - "+status
+ except err:
+ logline="Could not log <%s> due to err <%s>" % (str(environ), err)
+ logger.info(logline)
+
+ return out
+
+ return super(FileLog, self).__call__(environ, hstart_response)
+
+#TODOs:
+#-See if we can avoid using python-ceilometerclient and instead use the REST calls directly with AuthToken
+#
+urls = (
+ r'^/v2/meters$', 'meter_list',
+ r'^/v2/meters/(?P<meter_name>[A-Za-z0-9_:.\-]+)/statistics$', 'statistics_list',
+ r'^/v2/samples$', 'sample_list',
+ r'^/v2/resources$', 'resource_list',
+ r'^/v2/subscribe$', 'pubsub_handler',
+)
+
+app = web.application(urls, globals())
+
+config = None
+ceilometer_client = None
+
+
+def parse_ceilometer_proxy_config():
+ global config
+ config = ConfigParser.RawConfigParser(allow_no_value=True)
+ config.read('ceilometer_proxy_config')
+
+def ceilometerclient():
+ global config, ceilometer_client
+ if ceilometer_client:
+ return ceilometer_client
+
+ if not config:
+ parse_ceilometer_proxy_config()
+
+ keystone = {}
+ keystone['os_username']=config.get('default','admin_user')
+ keystone['os_password']=config.get('default','admin_password')
+ keystone['os_auth_url']=config.get('default','auth_url')
+ keystone['os_tenant_name']=config.get('default','admin_tenant')
+ ceilometer_client = client.get_client(2,**keystone)
+ logger.info('ceilometer get_client is successful')
+ return ceilometer_client
+
+def make_query(user_id=None, tenant_id=None, resource_id=None,
+ user_ids=None, tenant_ids=None, resource_ids=None):
+ """Returns query built from given parameters.
+
+ This query can be then used for querying resources, meters and
+ statistics.
+
+ :Parameters:
+ - `user_id`: user_id, has a priority over list of ids
+ - `tenant_id`: tenant_id, has a priority over list of ids
+ - `resource_id`: resource_id, has a priority over list of ids
+ - `user_ids`: list of user_ids
+ - `tenant_ids`: list of tenant_ids
+ - `resource_ids`: list of resource_ids
+ """
+ user_ids = user_ids or []
+ tenant_ids = tenant_ids or []
+ resource_ids = resource_ids or []
+
+ query = []
+ if user_id:
+ user_ids = [user_id]
+ for u_id in user_ids:
+ query.append({"field": "user_id", "op": "eq", "value": u_id})
+
+ if tenant_id:
+ tenant_ids = [tenant_id]
+ for t_id in tenant_ids:
+ query.append({"field": "project_id", "op": "eq", "value": t_id})
+
+ if resource_id:
+ resource_ids = [resource_id]
+ for r_id in resource_ids:
+ query.append({"field": "resource_id", "op": "eq", "value": r_id})
+
+ return query
+
+def filter_query_params(query_params):
+ new_query=[]
+ i=0
+ user_specified_tenants=[]
+ for field in query_params['q.field']:
+ if (field != 'project_id') and (field != 'project'):
+ query = {}
+ query['field']=field
+ if query_params['q.op'][i] != '':
+ query['op']=query_params['q.op'][i]
+ query['value']=query_params['q.value'][i]
+ new_query.append(query)
+ else:
+ user_specified_tenants.append(query_params['q.value'][i])
+ i=i+1
+ return new_query,user_specified_tenants
+
+class meter_list:
+ def GET(self):
+ global config
+ keyword_args = {
+ "q.field": [],
+ "q.op": [],
+ "q.type": [],
+ "q.value": [],
+ }
+ query_params = web.input(**keyword_args)
+ new_query, user_specified_tenants = filter_query_params(query_params)
+
+ client = ceilometerclient()
+ meters=[]
+ for (k,v) in config.items('allowed_tenants'):
+ if user_specified_tenants and (k not in user_specified_tenants):
+ continue
+ final_query=[]
+ final_query.extend(new_query)
+ query = make_query(tenant_id=k)
+ final_query.extend(query)
+ logger.debug('final query=%s',final_query)
+ results = client.meters.list(q=final_query)
+ meters.extend(results)
+ return json.dumps([ob._info for ob in meters])
+
+class statistics_list:
+ def GET(self, meter_name):
+ global config
+ keyword_args = {
+ "q.field": [],
+ "q.op": [],
+ "q.type": [],
+ "q.value": [],
+ "period": None
+ }
+ query_params = web.input(**keyword_args)
+ new_query, user_specified_tenants = filter_query_params(query_params)
+
+ client = ceilometerclient()
+ period = query_params.period
+ statistics = []
+ for (k,v) in config.items('allowed_tenants'):
+ if user_specified_tenants and (k not in user_specified_tenants):
+ continue
+ final_query=[]
+ final_query.extend(new_query)
+ query = make_query(tenant_id=k)
+ final_query.extend(query)
+ logger.debug('final query=%s',final_query)
+ results = client.statistics.list(meter_name=meter_name, q=final_query, period=period)
+ statistics.extend(results)
+ return json.dumps([ob._info for ob in statistics])
+
+class sample_list:
+ def GET(self):
+ global config
+ keyword_args = {
+ "q.field": [],
+ "q.op": [],
+ "q.type": [],
+ "q.value": [],
+ "limit": None,
+ }
+ query_params = web.input(**keyword_args)
+ new_query, user_specified_tenants = filter_query_params(query_params)
+
+ client = ceilometerclient()
+ limit=query_params.limit
+ samples=[]
+ for (k,v) in config.items('allowed_tenants'):
+ if user_specified_tenants and (k not in user_specified_tenants):
+ continue
+ final_query=[]
+ final_query.extend(new_query)
+ query = make_query(tenant_id=k)
+ final_query.extend(query)
+ logger.debug('final query=%s',final_query)
+ results = client.new_samples.list(q=final_query,limit=limit)
+ samples.extend(results)
+ return json.dumps([ob._info for ob in samples])
+
+class resource_list:
+ def GET(self):
+ global config
+ keyword_args = {
+ "q.field": [],
+ "q.op": [],
+ "q.type": [],
+ "q.value": [],
+ "limit": None,
+ "links": None,
+ }
+ query_params = web.input(**keyword_args)
+ new_query, user_specified_tenants = filter_query_params(query_params)
+
+ client = ceilometerclient()
+ limit=query_params.limit
+ links=query_params.links
+ resources=[]
+ for (k,v) in config.items('allowed_tenants'):
+ if user_specified_tenants and (k not in user_specified_tenants):
+ continue
+ final_query=[]
+ final_query.extend(new_query)
+ query = make_query(tenant_id=k)
+ final_query.extend(query)
+ logger.debug('final query=%s',final_query)
+ results = client.resources.list(q=final_query, limit=limit, links=links)
+ resources.extend(results)
+ return json.dumps([ob._info for ob in resources])
+
+class pubsub_handler:
+ def POST(self):
+ global config
+ parse_ceilometer_proxy_config()
+ ceilometer_pub_sub_url = config.get('default', 'ceilometer_pub_sub_url')
+ url = urlparse(ceilometer_pub_sub_url)
+ if (not url.scheme) or (not url.netloc):
+ raise Exception("Ceilometer PUB/SUB URL not set")
+ ceilometer_pub_sub_url = url.scheme + "://" + url.netloc + "/subscribe"
+ data_str = unicode(web.data(),'iso-8859-1')
+ post_data = json.loads(data_str)
+ final_query=[]
+ for (k,v) in config.items('allowed_tenants'):
+ query = make_query(tenant_id=k)
+ final_query.extend(query)
+ if not final_query:
+ raise Exception("Not allowed to subscribe to any meters")
+ post_data["query"] = final_query
+ #TODO: The PUB/SUB url needs to be read from config
+ put_request = urllib2.Request(ceilometer_pub_sub_url, json.dumps(post_data))
+ put_request.get_method = lambda: 'SUB'
+ put_request.add_header('Content-Type', 'application/json')
+ response = urllib2.urlopen(put_request)
+ response_text = response.read()
+ return json.dumps(response_text)
+
+ def DELETE(self):
+ ceilometer_pub_sub_url = config.get('default', 'ceilometer_pub_sub_url')
+ url = urlparse(ceilometer_pub_sub_url)
+ if (not url.scheme) or (not url.netloc):
+ raise Exception("Ceilometer PUB/SUB URL not set")
+ ceilometer_pub_sub_url = url.scheme + "://" + url.netloc + "/unsubscribe"
+ data_str = web.data()
+ #TODO: The PUB/SUB url needs to be read from config
+ put_request = urllib2.Request(ceilometer_pub_sub_url, data_str)
+ put_request.get_method = lambda: 'UNSUB'
+ put_request.add_header('Content-Type', 'application/json')
+ response = urllib2.urlopen(put_request)
+ response_text = response.read()
+ return json.dumps(response_text)
+
+if __name__ == "__main__":
+ app.run(FileLog)
diff --git a/xos/synchronizer/templates/monitoring-channel.conf.j2 b/xos/synchronizer/templates/monitoring-channel.conf.j2
new file mode 100644
index 0000000..eb937ac
--- /dev/null
+++ b/xos/synchronizer/templates/monitoring-channel.conf.j2
@@ -0,0 +1,10 @@
+# Upstart script for Monitoring channel
+description "Upstart script for Monitoring channel container"
+author "andy@onlab.us"
+start on filesystem and started docker
+stop on runlevel [!2345]
+respawn
+
+script
+ /usr/local/sbin/start-monitoring-channel-{{ unique_id }}.sh
+end script
diff --git a/xos/synchronizer/templates/sflow_pub_sub/README b/xos/synchronizer/templates/sflow_pub_sub/README
new file mode 100644
index 0000000..ee8ad9b
--- /dev/null
+++ b/xos/synchronizer/templates/sflow_pub_sub/README
@@ -0,0 +1,37 @@
+
+Subscribe-Publish Frame Work:
+1.Command to Install Flask Webserver frame work.
+ sudo pip install Flask
+
+ Along with flask we need the following packages:
+ msgpack
+ fnmatch
+ operator
+ logging
+ oslo_utils
+ ConfigParser
+
+2.Files: i.sub_main.py
+ ii.pubrecords.py
+ iii.pub_sub.conf
+
+3.Command to start the server:
+ #python sun_main.py
+4.Command for subscription:
+ i.app_id:Application ID,should be unique.
+ ii.target:
+ Presently only udp is supported.
+ a.udp:<ip:portno>
+ b.kafka:<kafkaip:kafkaport>
+ iii.sub_info:Sunscription notifications.ex:cpu_util,cpu_*
+ iv.query:
+ Below information need to provide as part of query.
+ a.field:fileds like user id ,porject id etc.,
+ b.op:"eq","gt","lt" etc.,
+ c.value:value of the fileds.
+ Example:
+ curl -i -H "Content-Type: application/json" -X SUB -d '{"app_id":"10","target":"udp://10.11.10.1:5006","sub_info":"cpu_util","query":[{"field":"user_id","op":"eq","value":"e1271a86bd4e413c87248baf2e5f01e0"},{"field":"project_id","op":"eq","value":"b1a3bf16d2014b47be9aefea88087318"},{"field":"resource_id","op":"eq","value":"658cd03f-d0f0-4f55-9f48-39e7222a8646"}]}' -L http://10.11.10.1:4455/subscribe
+
+5.Command for unsunscription:
+ For unsubcription only appid will be needed.
+ curl -i -H "Content-Type: application/json" -X UNSUB -d '{"app_id":"10"}' http://10.11.10.1:4455/unsubscribe
diff --git a/xos/synchronizer/templates/sflow_pub_sub/sample_sflow_pub_sub.conf_sample b/xos/synchronizer/templates/sflow_pub_sub/sample_sflow_pub_sub.conf_sample
new file mode 100644
index 0000000..40b5bf5
--- /dev/null
+++ b/xos/synchronizer/templates/sflow_pub_sub/sample_sflow_pub_sub.conf_sample
@@ -0,0 +1,11 @@
+[LOGGING]
+level = DEBUG
+filename = sflow_pub_sub.log
+
+[WEB_SERVER]
+webserver_host = 0.0.0.0
+webserver_port = 33333
+
+[SFLOW]
+listening_ip_addr = 0.0.0.0
+listening_port = 6343
diff --git a/xos/synchronizer/templates/sflow_pub_sub/sflow_pub_sub_config.j2 b/xos/synchronizer/templates/sflow_pub_sub/sflow_pub_sub_config.j2
new file mode 100644
index 0000000..1c5c88c
--- /dev/null
+++ b/xos/synchronizer/templates/sflow_pub_sub/sflow_pub_sub_config.j2
@@ -0,0 +1,15 @@
+# This file autogenerated by sflow service synchronizer
+# It contains a list of attributes to be used by sflow service
+# syntax: key=value
+
+[LOGGING]
+level = DEBUG
+filename = sflow_pub_sub.log
+
+[WEB_SERVER]
+webserver_host = 0.0.0.0
+webserver_port = {{ sflow_api_port }}
+
+[SFLOW]
+listening_ip_addr = 0.0.0.0
+listening_port = {{ sflow_port }}
diff --git a/xos/synchronizer/templates/sflow_pub_sub/sflow_pub_sub_main.py b/xos/synchronizer/templates/sflow_pub_sub/sflow_pub_sub_main.py
new file mode 100644
index 0000000..1276721
--- /dev/null
+++ b/xos/synchronizer/templates/sflow_pub_sub/sflow_pub_sub_main.py
@@ -0,0 +1,136 @@
+#!/usr/bin/python
+import socket,thread
+import sys
+import fnmatch
+import operator
+import logging
+import ConfigParser
+from urlparse import urlparse
+from sflow_sub_records import *
+
+from flask import request, Request, jsonify
+from flask import Flask
+from flask import make_response
+app = Flask(__name__)
+
+COMPARATORS = {
+ 'gt': operator.gt,
+ 'lt': operator.lt,
+ 'ge': operator.ge,
+ 'le': operator.le,
+ 'eq': operator.eq,
+ 'ne': operator.ne,
+}
+
+LEVELS = {'DEBUG': logging.DEBUG,
+ 'INFO': logging.INFO,
+ 'WARNING': logging.WARNING,
+ 'ERROR': logging.ERROR,
+ 'CRITICAL': logging.CRITICAL}
+
+_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
+
+@app.route('/subscribe',methods=['POST'])
+def subscribe():
+ logging.debug(" SUB data:%s",request.data)
+ target = request.data
+ parse_target=urlparse(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+
+ status = ""
+ if parse_target.scheme == "udp" :
+ host=parse_target.hostname
+ port=parse_target.port
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+
+ if host == None or port == None :
+ err_str = "* Error: Invalid IP Address format"
+ logging.error("* Invalid IP Address format")
+ return err_str
+
+ subscrip_obj=sflow_sub_record(scheme,None,app_ip,app_port,None,None)
+ status = add_sflow_sub_record(subscrip_obj)
+ print_sflow_sub_records()
+
+ if parse_target.scheme == "kafka" :
+ pass
+ if parse_target.scheme == "file" :
+ pass
+ return status
+
+@app.route('/unsubscribe',methods=['POST'])
+def unsubscribe():
+ try :
+ target = request.data
+ parse_target=urlparse(target)
+ if not parse_target.netloc:
+ err_str = "Error:Invalid target format"
+ logging.error("* Invalid target format")
+ return err_str
+
+ status = ""
+ if parse_target.scheme == "udp" :
+ host=parse_target.hostname
+ port=parse_target.port
+ scheme = parse_target.scheme
+ app_ip = host
+ app_port = port
+
+ delete_sflow_sub_record(app_ip, app_port)
+ except Exception as e:
+ logging.error("* %s",e.__str__())
+ return e.__str__()
+ return "UnSubscrition is sucessful! \n"
+
+@app.errorhandler(404)
+def not_found(error):
+ return make_response(jsonify({'error': 'Not found'}), 404)
+
+def sflow_recv(host,port):
+ udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
+ udp.bind((host, port))
+ logging.info("Started sflow receive thread on %s:%s",host, str(port))
+
+ while True:
+ data, source = udp.recvfrom(64000)
+ for obj in sflow_sub_database:
+ target_host = obj.ipaddress
+ target_port = int(obj.portno)
+ try:
+ logging.debug("Replicating the sFlow data to:%s:%s",target_host, str(target_port))
+ udp.sendto(data,(target_host,target_port))
+ except Exception:
+ logging.error ("Unable to send sFlow data to target %s:%s ",target_host,str(target_port))
+ logging.warn("Exiting sflow receive thread")
+
+
+def initialize(host,port):
+ thread.start_new(sflow_recv,(host,port,))
+
+if __name__ == "__main__":
+
+ try:
+ config = ConfigParser.ConfigParser()
+ config.read('sflow_pub_sub.conf')
+ webserver_host = config.get('WEB_SERVER','webserver_host')
+ webserver_port = int (config.get('WEB_SERVER','webserver_port'))
+ sflow_listening_ip_addr = config.get('SFLOW','listening_ip_addr')
+ sflow_listening_port = int (config.get('SFLOW','listening_port'))
+
+ log_level = config.get('LOGGING','level')
+ log_file = config.get('LOGGING','filename')
+
+ level = LEVELS.get(log_level, logging.NOTSET)
+ logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
+ datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
+ except Exception as e:
+ print("* Error in config file:",e.__str__())
+ logging.error("* Error in confing file:%s",e.__str__())
+ else:
+ initialize(sflow_listening_ip_addr,sflow_listening_port)
+ app.run(host=webserver_host,port=webserver_port,debug=False)
diff --git a/xos/synchronizer/templates/sflow_pub_sub/sflow_sub_records.py b/xos/synchronizer/templates/sflow_pub_sub/sflow_sub_records.py
new file mode 100644
index 0000000..f8b0038
--- /dev/null
+++ b/xos/synchronizer/templates/sflow_pub_sub/sflow_sub_records.py
@@ -0,0 +1,63 @@
+#!/usr/bin/python
+import fnmatch
+import logging
+
+class sflow_sub_record:
+ def __init__(self,scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter):
+ logging.debug("* Updating subscription_info ")
+ self.scheme = scheme
+ self.app_id = app_id
+ self.ipaddress = app_ip
+ self.portno = app_port
+ self.subscription_info = subscription_info
+ self.sub_info_filter = sub_info_filter
+
+sflow_sub_database=[]
+def add_sflow_sub_record(record):
+ logging.info("* inside %s",add_sflow_sub_record.__name__)
+ if not sflow_sub_database:
+ logging.debug("* -----------List is EMpty -------------")
+ sflow_sub_database.append(record)
+ logging.debug("* Subscription is sucessful")
+ return "Subscription is sucessful \n"
+ for x in sflow_sub_database:
+ if (record.ipaddress == x.ipaddress) and (record.portno == x.portno) :
+ logging.warning("* entry already exists\n")
+ return "entry already exists \n"
+ sflow_sub_database.append(record)
+ return "Subscription is sucessful \n"
+
+def delete_sflow_sub_record(ip,port):
+ logging.info("* inside %s",delete_sflow_sub_record.__name__)
+ Flag = False
+ for x in sflow_sub_database:
+ if (ip == x.ipaddress) and (port == x.portno) :
+ sflow_sub_database.remove(x)
+ Flag = True
+ logging.debug("* Un-Subscription is sucessful")
+ return "Un-Subscription is sucessful \n"
+ if not Flag :
+ err_str = "No subscription exists with target: udp://" + ip + ":" + str(port) + "\n"
+ logging.error(err_str)
+ raise Exception (err_str)
+
+def print_sflow_sub_records():
+ logging.info("* inside %s",print_sflow_sub_records.__name__)
+ for obj in sflow_sub_database:
+ logging.debug("* ------------------------------------------------")
+ logging.debug("* scheme:%s",obj.scheme)
+ logging.debug("* app_id:%s",obj.app_id)
+ logging.debug("* portno:%s",obj.portno )
+ logging.debug("* ipaddress:%s",obj.ipaddress)
+ logging.debug("* portno:%s",obj.portno)
+ logging.debug("* subscription_info:%s",obj.subscription_info)
+ logging.debug("* sub_info_filter:%s",obj.sub_info_filter)
+ logging.debug("* ------------------------------------------------")
+
+def get_sflow_sub_records(notif_subscription_info):
+ logging.info("* inside %s",get_sflow_sub_records.__name__)
+ sub_list=[]
+ for obj in sflow_sub_database:
+ if obj.subscription_info == notif_subscription_info:
+ sub_list.append(obj)
+ return sub_list
diff --git a/xos/synchronizer/templates/sflow_pub_sub/start_sflow_pub_sub b/xos/synchronizer/templates/sflow_pub_sub/start_sflow_pub_sub
new file mode 100644
index 0000000..e2edda2
--- /dev/null
+++ b/xos/synchronizer/templates/sflow_pub_sub/start_sflow_pub_sub
@@ -0,0 +1 @@
+/usr/local/share/sflow_pub_sub/sflow_pub_sub_main.py
diff --git a/xos/synchronizer/templates/start-monitoring-channel.sh.j2 b/xos/synchronizer/templates/start-monitoring-channel.sh.j2
new file mode 100755
index 0000000..4486985
--- /dev/null
+++ b/xos/synchronizer/templates/start-monitoring-channel.sh.j2
@@ -0,0 +1,49 @@
+#!/bin/bash
+
+function mac_to_iface {
+ MAC=$1
+ ifconfig|grep $MAC| awk '{print $1}'|grep -v '\.'
+}
+
+function generate_mac_from_ip {
+ IP=$1
+ printf "02:42:%02x:%02x:%02x:%02x\n" `echo $IP|awk -F '.' '{print $1, $2, $3, $4}'`
+}
+
+iptables -L > /dev/null
+ip6tables -L > /dev/null
+
+MONITORING_CHANNEL=monitoring-channel-{{ unique_id }}
+HEADNODEFLATLANIP={{ headnode_flat_lan_ip }}
+HOST_FORWARDING_PORT_FOR_CEILOMETER={{ ceilometer_host_port }}
+
+docker inspect $MONITORING_CHANNEL > /dev/null 2>&1
+if [ "$?" == 1 ]
+then
+ #sudo docker build -t monitoring-channel -f Dockerfile.monitoring_channel .
+ #sudo docker pull srikanthvavila/monitoring-channel
+if [ -z "$HEADNODEFLATLANIP" ] || [ "$HEADNODEFLATLANIP" == "None" ]
+then
+# docker run -d --name=$MONITORING_CHANNEL --privileged=true -p $HOST_FORWARDING_PORT_FOR_CEILOMETER:8000 -v /usr/local/share/monitoring-channel-{{ unique_id }}_ceilometer_proxy_config:/usr/local/share/ceilometer_proxy_config srikanthvavila/monitoring-channel
+ docker run -d --name=$MONITORING_CHANNEL --privileged=true -p $HOST_FORWARDING_PORT_FOR_CEILOMETER:8000 srikanthvavila/monitoring-channel
+else
+# docker run -d --name=$MONITORING_CHANNEL --add-host="ctl:$HEADNODEFLATLANIP" --privileged=true -p $HOST_FORWARDING_PORT_FOR_CEILOMETER:8000 -v /usr/local/share/monitoring-channel-{{ unique_id }}_ceilometer_proxy_config:/usr/local/share/ceilometer_proxy_config srikanthvavila/monitoring-channel
+ docker run -d --name=$MONITORING_CHANNEL --add-host="ctl:$HEADNODEFLATLANIP" --privileged=true -p $HOST_FORWARDING_PORT_FOR_CEILOMETER:8000 srikanthvavila/monitoring-channel
+fi
+else
+ docker start $MONITORING_CHANNEL
+fi
+
+# Set up networking via pipework
+#SHARED_LAN_IFACE=$( mac_to_iface {{ shared_lan_mac }} )
+#docker exec $MONITORING_CHANNEL ifconfig eth0 >> /dev/null || pipework $SHARED_LAN_IFACE -i eth0 $MONITORING_CHANNEL 192.168.0.1/24
+
+# Make sure VM's eth0 (hpc_client) has no IP address
+#ifconfig $HPC_IFACE 0.0.0.0
+
+# Now copy ceilometer proxy configuration to container
+#cat /usr/local/share/monitoring-channel-{{ unique_id }}_ceilometer_proxy_config | sudo docker exec -i $MONITORING_CHANNEL bash -c 'cat > /usr/local/share/ceilometer_proxy_config'
+docker cp /usr/local/share/monitoring-channel-{{ unique_id }}_ceilometer_proxy_config $MONITORING_CHANNEL:/usr/local/share/ceilometer_proxy_config
+
+# Attach to container
+docker start -a $MONITORING_CHANNEL
diff --git a/xos/synchronizer/templates/start_ceilometer_proxy b/xos/synchronizer/templates/start_ceilometer_proxy
new file mode 100644
index 0000000..ddaa9c8
--- /dev/null
+++ b/xos/synchronizer/templates/start_ceilometer_proxy
@@ -0,0 +1 @@
+/usr/local/share/ceilometer_proxy_server.py 8000