rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 1 | import pika |
| 2 | import yaml |
| 3 | import subprocess |
| 4 | import logging |
| 5 | import logging.config |
| 6 | import operator |
| 7 | import json |
| 8 | import ConfigParser |
| 9 | import pipeline |
| 10 | import utils |
| 11 | #from ceilometer import pipeline |
| 12 | from collections import OrderedDict |
| 13 | |
| 14 | |
| 15 | class UnsortableList(list): |
| 16 | def sort(self, *args, **kwargs): |
| 17 | pass |
| 18 | |
| 19 | class UnsortableOrderedDict(OrderedDict): |
| 20 | def items(self, *args, **kwargs): |
| 21 | return UnsortableList(OrderedDict.items(self, *args, **kwargs)) |
| 22 | |
| 23 | #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict) |
| 24 | |
| 25 | |
| 26 | tmp_pipeline_conf = "/tmp/pipeline.yaml" |
| 27 | |
| 28 | ''' |
| 29 | LEVELS = {'DEBUG': logging.DEBUG, |
| 30 | 'INFO': logging.INFO, |
| 31 | 'WARNING': logging.WARNING, |
| 32 | 'ERROR': logging.ERROR, |
| 33 | 'CRITICAL': logging.CRITICAL} |
| 34 | |
| 35 | _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" |
| 36 | ''' |
| 37 | def get_source_info(meter): |
| 38 | sink_name = meter + "_sink" |
| 39 | meter_name = meter+"_name" |
| 40 | source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]} |
| 41 | logging.debug("* new source_info :%s",source_info) |
| 42 | return (source_info,sink_name) |
| 43 | |
| 44 | def get_sink_info(meter,sink_name,target): |
| 45 | sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name} |
| 46 | logging.debug("* new source_info :%s",sink_info) |
| 47 | return sink_info |
| 48 | |
| 49 | def restart_ceilometer_services(): |
| 50 | try : |
| 51 | config = ConfigParser.ConfigParser() |
| 52 | config.read('pipeline_agent.conf') |
| 53 | services = config.get('RABBITMQ','Ceilometer_service') |
| 54 | service = services.split(",") |
| 55 | except Exception as e: |
| 56 | logging.error("* Error in confing file:%s",e.__str__()) |
| 57 | return False |
| 58 | else : |
| 59 | for service_name in service: |
| 60 | command = ['service',service_name, 'restart']; |
| 61 | logging.debug("Executing: %s command",command) |
| 62 | #shell=FALSE for sudo to work. |
| 63 | try : |
| 64 | subprocess.call(command, shell=False) |
| 65 | except Exception as e: |
| 66 | logging.error("* %s command execution failed with error %s",command,e.__str__()) |
| 67 | return False |
| 68 | return True |
| 69 | |
| 70 | def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None): |
| 71 | #import pdb;pdb.set_trace() |
| 72 | try : |
| 73 | pipeline._setup_pipeline_manager(pipeline_cfg_file,None) |
| 74 | except Exception as e: |
| 75 | logging.error ("Got Exception: %s",e.__str__()) |
| 76 | return False |
| 77 | return True |
| 78 | |
| 79 | |
| 80 | def callback(ch, method, properties, msg): |
| 81 | logging.debug(" [x] Received %r",msg) |
| 82 | #import pdb; pdb.set_trace() |
| 83 | #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict) |
| 84 | orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml" |
| 85 | with open (orig_pipeline_conf, 'r') as fap: |
| 86 | data = fap.read() |
| 87 | pipeline_cfg = yaml.safe_load(data) |
| 88 | logging.debug("Pipeline config: %s", pipeline_cfg) |
| 89 | |
| 90 | try : |
| 91 | json_msg = json.loads(msg) |
| 92 | meter = json_msg['sub_info'] |
| 93 | publisher = json_msg['target'] |
| 94 | flag = json_msg['action'] |
| 95 | update_status = [] |
| 96 | if type(meter) is list: |
| 97 | logging.debug("Metere is a list ... Need to handle it ") |
| 98 | for meter_info in meter : |
| 99 | update_status.append(update_pipeline_yaml(meter_info,publisher,flag)) |
| 100 | else : |
| 101 | update_status.append(update_pipeline_yaml(meter,publisher,flag)) |
| 102 | |
| 103 | if reduce(operator.or_, update_status): |
| 104 | if not restart_ceilometer_services(): |
| 105 | logging.error("Error in restarting ceilometer services") |
| 106 | return False |
| 107 | except Exception as e : |
| 108 | logging.error("Got exception:%s in parsing message",e.__str__()) |
| 109 | return False |
| 110 | |
| 111 | |
| 112 | |
| 113 | |
| 114 | |
| 115 | def update_pipeline_yaml(meter,publisher,flag): |
| 116 | logging.debug("meter name:%s",meter) |
| 117 | logging.debug("publisher or target name:%s",publisher) |
| 118 | |
| 119 | orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml" |
| 120 | ''' Parsing orginal pipeline yaml file ''' |
| 121 | try : |
| 122 | with open (orig_pipeline_conf, 'r') as fap: |
| 123 | data = fap.read() |
| 124 | pipeline_cfg = yaml.safe_load(data) |
| 125 | logging.debug("Pipeline config: %s", pipeline_cfg) |
| 126 | |
| 127 | ''' Chcking parsing errors ''' |
| 128 | |
| 129 | if not check_meter_with_pipeline_cfg(orig_pipeline_conf) : |
| 130 | logging.error("Original pipeline.yaml parsing failed") |
| 131 | return False |
| 132 | else : |
| 133 | status = None |
| 134 | if flag == "ADD" : |
| 135 | status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg) |
| 136 | elif flag == "DEL" : |
| 137 | status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg) |
| 138 | |
| 139 | if status == True : |
| 140 | tmp_pipeline_conf = "/tmp/pipeline.yaml" |
| 141 | with open(tmp_pipeline_conf, "w") as f: |
| 142 | yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False) |
| 143 | if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) : |
| 144 | logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig") |
| 145 | with open(orig_pipeline_conf, "w") as f: |
| 146 | yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False) |
| 147 | return True |
| 148 | else : |
| 149 | logging.info("Retaining original conf,as update meter info has errors") |
| 150 | return False |
| 151 | except Exception as e: |
| 152 | logging.error("* Error in confing file:%s",e.__str__()) |
| 153 | return False |
| 154 | |
| 155 | |
| 156 | def msg_queue_listner(): |
| 157 | |
| 158 | try: |
| 159 | config = ConfigParser.ConfigParser() |
| 160 | config.read('pipeline_agent.conf') |
| 161 | rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username') |
| 162 | rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd') |
| 163 | rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host') |
| 164 | rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') ) |
| 165 | ''' |
| 166 | log_level = config.get('LOGGING','level') |
| 167 | log_file = config.get('LOGGING','filename') |
| 168 | |
| 169 | level = LEVELS.get(log_level, logging.NOTSET) |
| 170 | logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\ |
| 171 | datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level) |
| 172 | ''' |
| 173 | logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False) |
| 174 | except Exception as e: |
| 175 | logging.error("* Error in confing file:%s",e.__str__()) |
| 176 | else : |
| 177 | logging.debug("*------------------Rabbit MQ Server Info---------") |
| 178 | logging.debug("rabbitmq_username:%s",rabbitmq_username) |
| 179 | logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd) |
| 180 | logging.debug("rabbitmq_host:%s",rabbitmq_host) |
| 181 | logging.debug("rabbitmq_port:%s",rabbitmq_port) |
| 182 | credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd) |
| 183 | parameters = pika.ConnectionParameters(rabbitmq_host, |
| 184 | rabbitmq_port, |
| 185 | '/', |
| 186 | credentials) |
| 187 | connection = pika.BlockingConnection(parameters) |
| 188 | channel = connection.channel() |
| 189 | #channel.queue_declare(queue='pubsub') |
| 190 | channel.exchange_declare(exchange='pubsub', |
| 191 | type='fanout') |
| 192 | |
| 193 | result = channel.queue_declare(exclusive=True) |
| 194 | queue_name = result.method.queue |
| 195 | |
| 196 | channel.queue_bind(exchange='pubsub', |
| 197 | queue=queue_name) |
| 198 | logging.debug("[*] Waiting for messages. To exit press CTRL+C") |
| 199 | |
| 200 | channel.basic_consume(callback, |
| 201 | queue=queue_name, |
| 202 | no_ack=True) |
| 203 | channel.start_consuming() |
| 204 | |
| 205 | if __name__ == "__main__": |
| 206 | #logging.debug("* Starting pipeline agent module") |
| 207 | msg_queue_listner() |
| 208 | |