blob: dc158eb68434fb6f51eedf4c8518ec3453849b98 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pika
import yaml
import subprocess
import logging
import logging.config
import operator
import json
import ConfigParser
import pipeline
import utils
#from ceilometer import pipeline
from collections import OrderedDict
class UnsortableList(list):
def sort(self, *args, **kwargs):
pass
class UnsortableOrderedDict(OrderedDict):
def items(self, *args, **kwargs):
return UnsortableList(OrderedDict.items(self, *args, **kwargs))
#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
tmp_pipeline_conf = "/tmp/pipeline.yaml"
'''
LEVELS = {'DEBUG': logging.DEBUG,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'ERROR': logging.ERROR,
'CRITICAL': logging.CRITICAL}
_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
'''
def get_source_info(meter):
sink_name = meter + "_sink"
meter_name = meter+"_name"
source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
logging.debug("* new source_info :%s",source_info)
return (source_info,sink_name)
def get_sink_info(meter,sink_name,target):
sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
logging.debug("* new source_info :%s",sink_info)
return sink_info
def restart_ceilometer_services():
try :
config = ConfigParser.ConfigParser()
config.read('pipeline_agent.conf')
services = config.get('RABBITMQ','Ceilometer_service')
service = services.split(",")
except Exception as e:
logging.error("* Error in confing file:%s",e.__str__())
return False
else :
for service_name in service:
command = ['service',service_name, 'restart'];
logging.debug("Executing: %s command",command)
#shell=FALSE for sudo to work.
try :
subprocess.call(command, shell=False)
except Exception as e:
logging.error("* %s command execution failed with error %s",command,e.__str__())
return False
return True
def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
#import pdb;pdb.set_trace()
try :
pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
except Exception as e:
logging.error ("Got Exception: %s",e.__str__())
return False
return True
def callback(ch, method, properties, msg):
logging.debug(" [x] Received %r",msg)
#import pdb; pdb.set_trace()
#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
with open (orig_pipeline_conf, 'r') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
logging.debug("Pipeline config: %s", pipeline_cfg)
try :
json_msg = json.loads(msg)
meter = json_msg['sub_info']
publisher = json_msg['target']
flag = json_msg['action']
update_status = []
if type(meter) is list:
logging.debug("Metere is a list ... Need to handle it ")
for meter_info in meter :
update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
else :
update_status.append(update_pipeline_yaml(meter,publisher,flag))
if reduce(operator.or_, update_status):
if not restart_ceilometer_services():
logging.error("Error in restarting ceilometer services")
return False
except Exception as e :
logging.error("Got exception:%s in parsing message",e.__str__())
return False
def update_pipeline_yaml(meter,publisher,flag):
logging.debug("meter name:%s",meter)
logging.debug("publisher or target name:%s",publisher)
orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
''' Parsing orginal pipeline yaml file '''
try :
with open (orig_pipeline_conf, 'r') as fap:
data = fap.read()
pipeline_cfg = yaml.safe_load(data)
logging.debug("Pipeline config: %s", pipeline_cfg)
''' Chcking parsing errors '''
if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
logging.error("Original pipeline.yaml parsing failed")
return False
else :
status = None
if flag == "ADD" :
status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
elif flag == "DEL" :
status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
if status == True :
tmp_pipeline_conf = "/tmp/pipeline.yaml"
with open(tmp_pipeline_conf, "w") as f:
yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
with open(orig_pipeline_conf, "w") as f:
yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
return True
else :
logging.info("Retaining original conf,as update meter info has errors")
return False
except Exception as e:
logging.error("* Error in confing file:%s",e.__str__())
return False
def msg_queue_listner():
try:
config = ConfigParser.ConfigParser()
config.read('pipeline_agent.conf')
rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
'''
log_level = config.get('LOGGING','level')
log_file = config.get('LOGGING','filename')
level = LEVELS.get(log_level, logging.NOTSET)
logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
'''
logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)
except Exception as e:
logging.error("* Error in confing file:%s",e.__str__())
else :
logging.debug("*------------------Rabbit MQ Server Info---------")
logging.debug("rabbitmq_username:%s",rabbitmq_username)
logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
logging.debug("rabbitmq_host:%s",rabbitmq_host)
logging.debug("rabbitmq_port:%s",rabbitmq_port)
credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
parameters = pika.ConnectionParameters(rabbitmq_host,
rabbitmq_port,
'/',
credentials)
connection = pika.BlockingConnection(parameters)
channel = connection.channel()
#channel.queue_declare(queue='pubsub')
channel.exchange_declare(exchange='pubsub',
type='fanout')
result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
channel.queue_bind(exchange='pubsub',
queue=queue_name)
logging.debug("[*] Waiting for messages. To exit press CTRL+C")
channel.basic_consume(callback,
queue=queue_name,
no_ack=True)
channel.start_consuming()
if __name__ == "__main__":
#logging.debug("* Starting pipeline agent module")
msg_queue_listner()