blob: 19d7bf17ea0ee4e7d07b388e565f78d9819a03a8 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#!/usr/bin/python
import socket,thread
import sys
import msgpack
import fnmatch
import operator
import logging
import logging.handlers
import logging.config
import ConfigParser
import json
from oslo_utils import units
from oslo_utils import netutils
from pubrecords import *
import kafka
import kafka_broker
from flask import request, Request, jsonify
from flask import Flask
from flask import make_response
app = Flask(__name__)
COMPARATORS = {
'gt': operator.gt,
'lt': operator.lt,
'ge': operator.ge,
'le': operator.le,
'eq': operator.eq,
'ne': operator.ne,
}
LEVELS = {'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL}
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
''' Stores all the subscribed meter's list '''
meter_list = []
''' Stores meter to app-id mapping '''
meter_dict = {}
@app.route('/subscribe',methods=['POST','SUB'])
def subscribe():
try :
app_id = request.json['app_id']
target = request.json['target']
sub_info = request.json['sub_info']
try :
validate_sub_info(sub_info)
except Exception as e:
logging.error("* %s",e.__str__())
return e.__str__()
''' Flag to Update pipeling cfg file '''
config = ConfigParser.ConfigParser()
config.read('pub_sub.conf')
if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
update_pipeline_conf(sub_info,target,app_id,"ADD")
else:
logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
if not 'query' in request.json.keys():
logging.info("query request is not provided by user")
query = None
else:
query = request.json['query']
for i in range(len(query)):
if not 'field' in query[i].keys():
err_str = "Query field"
raise Exception (err_str)
if not 'op' in query[i].keys():
err_str = "Query op"
raise Exception (err_str)
if not 'value' in query[i].keys():
err_str = "Query value"
raise Exception (err_str)
except Exception as e:
err_str = "KeyError: Parsing subscription request " + e.__str__() + "\n"
logging.error("* KeyError: Parsing subscription request :%s",e.__str__())
return err_str
parse_target=netutils.urlsplit(target)
if not parse_target.netloc:
err_str = "Error:Invalid target format"
logging.error("* Invalid target format")
return err_str
status = ""
if parse_target.scheme == "udp" or parse_target.scheme == "kafka":
host,port=netutils.parse_host_port(parse_target.netloc)
scheme = parse_target.scheme
app_ip = host
app_port = port
if host == None or port == None :
err_str = "* Error: Invalid IP Address format"
logging.error("* Invalid IP Address format")
return err_str
subscription_info = sub_info
sub_info_filter = query
logging.info("Creating subscription for app:%s for meters:%s with filters:%s and target:%s",app_id, subscription_info, sub_info_filter, target)
subscrip_obj=subinfo(scheme,app_id,app_ip,app_port,subscription_info,sub_info_filter,target)
status = subscrip_obj.update_subinfo()
subinfo.print_subinfo()
if parse_target.scheme == "file" :
pass
return status
@app.route('/unsubscribe',methods=['POST','UNSUB'])
def unsubscribe():
try :
app_id = request.json['app_id']
sub_info,target = subinfo.get_subinfo(app_id)
if sub_info is None or target is None:
err_str = "No subscription exists with app id: " + app_id + "\n"
logging.error("* No subscription exists with app id:%s ",app_id)
return err_str
else:
''' Flag to Update pipeling cfg file '''
config = ConfigParser.ConfigParser()
config.read('pub_sub.conf')
if config.get('RABBITMQ','UpdateConfMgmt') == "True" :
update_pipeline_conf(sub_info,target,app_id,"DEL")
else:
logging.warning("Update Conf Mgmt flag is disabled,enable the flag to update Conf Mgmt")
#update_pipeline_conf(sub_info,target,"DEL")
subinfo.delete_subinfo(app_id)
except Exception as e:
logging.error("* %s",e.__str__())
return e.__str__()
return "UnSubscrition is sucessful! \n"
@app.errorhandler(404)
def not_found(error):
return make_response(jsonify({'error': 'Not found'}), 404)
def print_subscribed_meter_list():
logging.debug("-------------------------------------------------")
#print (meter_list)
logging.debug("meter_list:%s",meter_list)
logging.debug("meter_dict:%s",meter_dict)
#print (meter_dict)
logging.debug("-------------------------------------------------")
def validate_sub_info(sub_info):
if type(sub_info) is not list:
sub_info = [sub_info]
for meter in sub_info:
if meter.startswith("*") or meter.startswith("!"):
err_str = "Given meter is not supported:" + meter + "\n"
logging.error("* Given meter is not supported:%s",meter)
raise Exception (err_str)
def update_meter_dict(meterinfo,app_id):
try :
if type(meterinfo) == list:
for meter in meterinfo:
if meter_dict.get(meter) is None:
meter_dict[meter] = [app_id]
elif app_id not in meter_dict.get(meter):
meter_dict.get(meter).append(app_id)
else:
if meter_dict.get(meterinfo) is None:
meter_dict[meterinfo] = [app_id]
elif app_id not in meter_dict.get(meterinfo):
meter_dict.get(meterinfo).append(app_id)
except Exception as e:
logging.error("* %s",e.__str__())
def check_send_msg_confmgmt_del(sub_info,app_id):
temp_sub_info=[]
temm_meter_info = None
if len(meter_list) == 0:
#print("No subscription exists")
logging.info("No subscription exists")
return False,None
if type(sub_info) == list:
for meterinfo in sub_info:
if meter_dict.get(meterinfo) is None:
logging.warning("%s meter doesn't exist in the meter dict",meterinfo)
continue
if app_id in meter_dict.get(meterinfo):
if len(meter_dict.get(meterinfo)) == 1:
#print "Only single app is subscribing this meter"
logging.info("Only single app is subscribing this meter")
del meter_dict[meterinfo]
temp_sub_info.append(meterinfo)
if meterinfo in meter_list:
meter_list.remove(meterinfo)
else:
meter_dict.get(meterinfo).remove(app_id)
return True,temp_sub_info
else :
if meter_dict.get(sub_info) is None:
logging.warning("%s meter doesn't exist in the meter dict",sub_info)
return False,None
if app_id in meter_dict.get(sub_info):
if len(meter_dict.get(sub_info)) == 1:
#print "Only single app is subscribing this meter"
logging.info("Only single app is subscribing this meter")
del meter_dict[sub_info]
if sub_info in meter_list:
meter_list.remove(sub_info)
return True,sub_info
else:
meter_dict.get(sub_info).remove(app_id)
return False,None
def check_send_msg_confmgmt_add(sub_info,app_id):
temp_sub_info=[]
update_meter_dict(sub_info,app_id)
#import pdb;pdb.set_trace()
if len(meter_list) == 0:
logging.info("No subinfo exits")
if type(sub_info) == list:
for j in sub_info:
meter_list.append(j)
return True,sub_info
else :
meter_list.append(sub_info)
return True,sub_info
if type(sub_info) == list:
for j in sub_info:
if j in meter_list:
#print ("meter already exists",j)
logging.info("meter already exist:%s",j)
continue
else :
temp_sub_info.append(j)
meter_list.append(j)
if temp_sub_info is not None:
return True,temp_sub_info
else :
return False,None
else :
if sub_info not in meter_list:
meter_list.append(sub_info)
#print ("subscription for meter doesn't exist",sub_info)
logging.warning("subscription for meter doesn't exist:%s",sub_info)
return True,sub_info
else :
#print ("subscription already exist for ",sub_info)
logging.info("subscription already exist for:%s ",sub_info)
return False,sub_info
def update_pipeline_conf(sub_info,target,app_id,flag):
import pika
logging.debug("* sub_info:%s",sub_info)
logging.debug("* target:%s",target)
#msg={"sub_info":sub_info,"target":target,"action":flag}
#json_msg=json.dumps(msg)
#msg="image"
meter_sub_info = None
if flag == "ADD":
status,meter_sub_info=check_send_msg_confmgmt_add(sub_info,app_id)
if status == False or meter_sub_info == None or meter_sub_info == []:
logging.warning("%s is already subscribed with the conf mgmt")
return
elif flag == "DEL":
status,meter_sub_info=check_send_msg_confmgmt_del(sub_info,app_id)
if status == False or meter_sub_info == None or meter_sub_info == []:
logging.warning("%s is already unsubscribed with the conf mgmt")
return
try :
config = ConfigParser.ConfigParser()
config.read('pub_sub.conf')
rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
ceilometer_client_info = config.get('CLIENT','target')
#msg={"sub_info":sub_info,"target":ceilometer_client_info,"action":flag}
msg={"sub_info":meter_sub_info,"target":ceilometer_client_info,"action":flag}
#print msg
json_msg=json.dumps(msg)
credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
parameters = pika.ConnectionParameters(rabbitmq_host,
rabbitmq_port,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
properties = pika.BasicProperties(content_type = "application/json")
channel = connection.channel()
channel.exchange_declare(exchange='pubsub',
type='fanout')
channel.basic_publish(exchange='pubsub',
routing_key='',
properties = properties,
body=json_msg)
logging.debug(" [x] %s Sent",msg)
logging.info(" [x] %s Sent",msg)
connection.close()
except Exception as e:
logging.error("Error:%s",e.__str__())
def read_notification_from_ceilometer(host,port):
UDP_IP = host
UDP_PORT = port
logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
udp = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((UDP_IP, UDP_PORT))
while True:
#print thread.get_ident()
#logging.debug("thread.get_ident():%s", thread.get_ident())
data, source = udp.recvfrom(64 * units.Ki)
sample = msgpack.loads(data, encoding='utf-8')
#logging.debug("* -------------------------------------------------------")
logging.debug("%s",sample)
#print(sample)
for obj in sub_info:
msg_list = []
if obj.scheme == "udp" :
if type(obj.subscription_info) is list:
for info in obj.subscription_info:
msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
else :
msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
try:
if reduce(operator.or_, msg_list):
host = obj.ipaddress
port = int(obj.portno)
l=[]
#logging.debug("* -------------------------------------------------------")
if obj.sub_info_filter is None:
try:
logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
udp.sendto(data,(host,port))
except Exception as e:
logging.error ("Unable to send sample over UDP for %s and %s,%s",host,port,e.__str__())
ret_str = ("Unable to send sample over UDP for %s and %s,%s")%(host,port,e.__str__())
continue
for i in range(len(obj.sub_info_filter)):
if obj.sub_info_filter[i]['op'] in COMPARATORS:
op = COMPARATORS[obj.sub_info_filter[i]['op']]
logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
logging.info("* Logical and of Query %s",l)
else:
logging.deubg("* Not a valid operator ignoring app_id:%s",obj.app_id)
l.append(False)
logging.info("* Logical and of Query %s",l)
if reduce(operator.and_, l):
try:
logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
udp.sendto(data,(host,port))
except Exception:
logging.error ("Unable to send sample over UDP for %s and %s ",host,port)
ret_str = ("Unable to send sample over UDP for %s and %s ")%(host,port)
else :
logging.warning("* No Notification found with the given subscription")
else :
logging.warning("* No valid subscrition found for %s",obj.app_id)
except Exception as e:
logging.error("Key_Error:%s ",e.__str__())
ret_str = ("Key_Error:%s \n")% e.__str__()
def read_notification_from_ceilometer_over_udp(host,port):
UDP_IP = host
UDP_PORT = port
logging.debug("* Sarting UDP Client on ip:%s , port:%d",UDP_IP,UDP_PORT)
udp = socket.socket(socket.AF_INET, # Internet
socket.SOCK_DGRAM) # UDP
udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
udp.bind((UDP_IP, UDP_PORT))
while True:
#print thread.get_ident()
#logging.debug("thread.get_ident():%s", thread.get_ident())
data, source = udp.recvfrom(64 * units.Ki)
sample = msgpack.loads(data, encoding='utf-8')
status = process_ceilometer_message(sample,data)
def read_notification_from_ceilometer_over_kafka(parse_target):
logging.info("Kafka target:%s",parse_target)
try :
kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
for message in kafka_publisher.kafka_consumer:
#print message.value
#logging.debug("%s",message.value)
#logging.info("%s",message.value)
status = process_ceilometer_message(json.loads(message.value),message.value)
#print status
except Exception as e:
logging.error("Error in Kafka setup:%s ",e.__str__())
def process_ceilometer_message(sample,data):
logging.debug("%s",sample)
#logging.info("%s",sample)
if len(sub_info) < 1:
#print "No subscription exists"
return
for obj in sub_info:
#import pdb;pdb.set_trace()
msg_list = []
if type(obj.subscription_info) is list:
for info in obj.subscription_info:
msg_list.append(fnmatch.fnmatch(sample['counter_name'],info))
else :
msg_list.append(fnmatch.fnmatch(sample['counter_name'],obj.subscription_info))
try:
if reduce(operator.or_, msg_list):
'''
kafka_publisher = None
if obj.scheme == "kafka" :
parse_target=netutils.urlsplit(obj.target)
try :
kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
except Exception as e:
logging.error("* Error in connecting kafka broker:%s",e.__str__())
# return False
continue
'''
host = obj.ipaddress
port = int(obj.portno)
l=[]
logging.debug("* -------------------------------------------------------")
if obj.sub_info_filter is None:
try:
if obj.scheme == "udp" :
#logging.debug("* Sending data without query over UDP for host:%s and port:%s",host,port)
#logging.info("* Sending data without query over UDP for host:%s and port:%s",host,port)
#udp = socket.socket(socket.AF_INET, # Internet
# socket.SOCK_DGRAM) # UDP
#udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
obj.udp.sendto(data,(host,port))
#return True
continue
elif obj.scheme == "kafka" :
#logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
#logging.info("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,kafka_publisher.topic)
obj.kafka_publisher._send(sample)
#return True
continue
except Exception as e:
logging.error ("Unable to send sample over UDP/kafka for %s and %s,%s",host,port,e.__str__())
ret_str = ("Unable to send sample over UDP for %s and %s,%s ")%(host,port,e.__str__())
#return False
continue
for i in range(len(obj.sub_info_filter)):
if obj.sub_info_filter[i]['op'] in COMPARATORS:
op = COMPARATORS[obj.sub_info_filter[i]['op']]
#logging.debug("* obj.sub_info_filter[i]['value']:%s",obj.sub_info_filter[i]['value'])
#logging.debug("* obj.sub_info_filter[i]['field']:%s",obj.sub_info_filter[i]['field'])
l.append(op(obj.sub_info_filter[i]['value'],sample[obj.sub_info_filter[i]['field']]))
#logging.info("* Logical and of Query %s",l)
else:
logging.info("* Not a valid operator ignoring app_id:%s",obj.app_id)
l.append(False)
#logging.info("* Logical and of Query %s",l)
if reduce(operator.or_, l):
try:
if obj.scheme == "udp" :
logging.debug("* Sending data over UDP for host:%s and port:%s",host,port)
#udp = socket.socket(socket.AF_INET, # Internet
# socket.SOCK_DGRAM) # UDP
#udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
obj.udp.sendto(data,(host,port))
#return True
continue
elif obj.scheme == "kafka" :
logging.debug("* Sending data over kafka for host:%s and port:%s and topec:%s",host,port,obj.kafka_publisher.topic)
obj.kafka_publisher._send(sample)
#return True
continue
except Exception:
logging.error ("Unable to send sample over UDP/Kafka for %s and %s ",host,port)
ret_str = ("Unable to send sample over UDP/Kafka for %s and %s ")%(host,port)
#return False
continue
else :
logging.debug("* No Notification found with the given subscription")
continue
else :
logging.debug("* No matching subscrition found for %s",sample['counter_name'])
continue
except Exception as e:
logging.error("Key_Error:%s ",e.__str__())
ret_str = ("Key_Error:%s \n")%e.__str__()
#return False
continue
def initialize(ceilometer_client):
logging.debug("Ceilometer client info:%s",ceilometer_client)
parse_target=netutils.urlsplit(ceilometer_client)
if not parse_target.netloc:
err_str = "Error:Invalid client format"
logging.error("* Invalid client format")
return err_str
if parse_target.scheme == "udp" :
host,port=netutils.parse_host_port(parse_target.netloc)
scheme = parse_target.scheme
app_ip = host
app_port = port
if host == None or port == None :
err_str = "* Error: Invalid IP Address format"
logging.error("* Invalid IP Address format")
return err_str
thread.start_new(read_notification_from_ceilometer_over_udp,(host,port,))
elif parse_target.scheme == "kafka" :
thread.start_new(read_notification_from_ceilometer_over_kafka,(parse_target,))
if __name__ == "__main__":
try:
config = ConfigParser.ConfigParser()
config.read('pub_sub.conf')
webserver_host = config.get('WEB_SERVER','webserver_host')
webserver_port = int (config.get('WEB_SERVER','webserver_port'))
# client_host = config.get('CLIENT','client_host')
# client_port = int (config.get('CLIENT','client_port'))
ceilometer_client_info = config.get('CLIENT','target')
'''
log_level = config.get('LOGGING','level')
log_file = config.get('LOGGING','filename')
maxbytes = int (config.get('LOGGING','maxbytes'))
backupcount = int (config.get('LOGGING','backupcount'))
level = LEVELS.get(log_level, logging.NOTSET)
'''
logging.config.fileConfig('pub_sub.conf', disable_existing_loggers=False)
'''
logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
# create rotating file handler
rfh = logging.handlers.RotatingFileHandler(
log_file, encoding='utf8', maxBytes=maxbytes,
backupCount=backupcount,delay=0)
logging.getLogger().addHandler(rfh)
'''
except Exception as e:
print("* Error in config file:",e.__str__())
#logging.error("* Error in confing file:%s",e.__str__())
else:
#initialize(client_host,client_port)
initialize(ceilometer_client_info)
app.run(host=webserver_host,port=webserver_port,debug=False)