Matteo Scandolo | eb0d11c | 2017-08-08 13:05:26 -0700 | [diff] [blame^] | 1 | |
| 2 | # Copyright 2017-present Open Networking Foundation |
| 3 | # |
| 4 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | # you may not use this file except in compliance with the License. |
| 6 | # You may obtain a copy of the License at |
| 7 | # |
| 8 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | # |
| 10 | # Unless required by applicable law or agreed to in writing, software |
| 11 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | # See the License for the specific language governing permissions and |
| 14 | # limitations under the License. |
| 15 | |
| 16 | |
rdudyala | b086cf3 | 2016-08-11 00:07:45 -0400 | [diff] [blame] | 17 | import pika |
| 18 | import yaml |
| 19 | import subprocess |
| 20 | import logging |
| 21 | import logging.config |
| 22 | import operator |
| 23 | import json |
| 24 | import ConfigParser |
| 25 | import pipeline |
| 26 | import utils |
| 27 | #from ceilometer import pipeline |
| 28 | from collections import OrderedDict |
| 29 | |
| 30 | |
| 31 | class UnsortableList(list): |
| 32 | def sort(self, *args, **kwargs): |
| 33 | pass |
| 34 | |
| 35 | class UnsortableOrderedDict(OrderedDict): |
| 36 | def items(self, *args, **kwargs): |
| 37 | return UnsortableList(OrderedDict.items(self, *args, **kwargs)) |
| 38 | |
| 39 | #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict) |
| 40 | |
| 41 | |
| 42 | tmp_pipeline_conf = "/tmp/pipeline.yaml" |
| 43 | |
| 44 | ''' |
| 45 | LEVELS = {'DEBUG': logging.DEBUG, |
| 46 | 'INFO': logging.INFO, |
| 47 | 'WARNING': logging.WARNING, |
| 48 | 'ERROR': logging.ERROR, |
| 49 | 'CRITICAL': logging.CRITICAL} |
| 50 | |
| 51 | _DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S" |
| 52 | ''' |
| 53 | def get_source_info(meter): |
| 54 | sink_name = meter + "_sink" |
| 55 | meter_name = meter+"_name" |
| 56 | source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]} |
| 57 | logging.debug("* new source_info :%s",source_info) |
| 58 | return (source_info,sink_name) |
| 59 | |
| 60 | def get_sink_info(meter,sink_name,target): |
| 61 | sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name} |
| 62 | logging.debug("* new source_info :%s",sink_info) |
| 63 | return sink_info |
| 64 | |
| 65 | def restart_ceilometer_services(): |
| 66 | try : |
| 67 | config = ConfigParser.ConfigParser() |
| 68 | config.read('pipeline_agent.conf') |
| 69 | services = config.get('RABBITMQ','Ceilometer_service') |
| 70 | service = services.split(",") |
| 71 | except Exception as e: |
| 72 | logging.error("* Error in confing file:%s",e.__str__()) |
| 73 | return False |
| 74 | else : |
| 75 | for service_name in service: |
| 76 | command = ['service',service_name, 'restart']; |
| 77 | logging.debug("Executing: %s command",command) |
| 78 | #shell=FALSE for sudo to work. |
| 79 | try : |
| 80 | subprocess.call(command, shell=False) |
| 81 | except Exception as e: |
| 82 | logging.error("* %s command execution failed with error %s",command,e.__str__()) |
| 83 | return False |
| 84 | return True |
| 85 | |
| 86 | def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None): |
| 87 | #import pdb;pdb.set_trace() |
| 88 | try : |
| 89 | pipeline._setup_pipeline_manager(pipeline_cfg_file,None) |
| 90 | except Exception as e: |
| 91 | logging.error ("Got Exception: %s",e.__str__()) |
| 92 | return False |
| 93 | return True |
| 94 | |
| 95 | |
| 96 | def callback(ch, method, properties, msg): |
| 97 | logging.debug(" [x] Received %r",msg) |
| 98 | #import pdb; pdb.set_trace() |
| 99 | #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict) |
| 100 | orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml" |
| 101 | with open (orig_pipeline_conf, 'r') as fap: |
| 102 | data = fap.read() |
| 103 | pipeline_cfg = yaml.safe_load(data) |
| 104 | logging.debug("Pipeline config: %s", pipeline_cfg) |
| 105 | |
| 106 | try : |
| 107 | json_msg = json.loads(msg) |
| 108 | meter = json_msg['sub_info'] |
| 109 | publisher = json_msg['target'] |
| 110 | flag = json_msg['action'] |
| 111 | update_status = [] |
| 112 | if type(meter) is list: |
| 113 | logging.debug("Metere is a list ... Need to handle it ") |
| 114 | for meter_info in meter : |
| 115 | update_status.append(update_pipeline_yaml(meter_info,publisher,flag)) |
| 116 | else : |
| 117 | update_status.append(update_pipeline_yaml(meter,publisher,flag)) |
| 118 | |
| 119 | if reduce(operator.or_, update_status): |
| 120 | if not restart_ceilometer_services(): |
| 121 | logging.error("Error in restarting ceilometer services") |
| 122 | return False |
| 123 | except Exception as e : |
| 124 | logging.error("Got exception:%s in parsing message",e.__str__()) |
| 125 | return False |
| 126 | |
| 127 | |
| 128 | |
| 129 | |
| 130 | |
| 131 | def update_pipeline_yaml(meter,publisher,flag): |
| 132 | logging.debug("meter name:%s",meter) |
| 133 | logging.debug("publisher or target name:%s",publisher) |
| 134 | |
| 135 | orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml" |
| 136 | ''' Parsing orginal pipeline yaml file ''' |
| 137 | try : |
| 138 | with open (orig_pipeline_conf, 'r') as fap: |
| 139 | data = fap.read() |
| 140 | pipeline_cfg = yaml.safe_load(data) |
| 141 | logging.debug("Pipeline config: %s", pipeline_cfg) |
| 142 | |
| 143 | ''' Chcking parsing errors ''' |
| 144 | |
| 145 | if not check_meter_with_pipeline_cfg(orig_pipeline_conf) : |
| 146 | logging.error("Original pipeline.yaml parsing failed") |
| 147 | return False |
| 148 | else : |
| 149 | status = None |
| 150 | if flag == "ADD" : |
| 151 | status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg) |
| 152 | elif flag == "DEL" : |
| 153 | status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg) |
| 154 | |
| 155 | if status == True : |
| 156 | tmp_pipeline_conf = "/tmp/pipeline.yaml" |
| 157 | with open(tmp_pipeline_conf, "w") as f: |
| 158 | yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False) |
| 159 | if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) : |
| 160 | logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig") |
| 161 | with open(orig_pipeline_conf, "w") as f: |
| 162 | yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False) |
| 163 | return True |
| 164 | else : |
| 165 | logging.info("Retaining original conf,as update meter info has errors") |
| 166 | return False |
| 167 | except Exception as e: |
| 168 | logging.error("* Error in confing file:%s",e.__str__()) |
| 169 | return False |
| 170 | |
| 171 | |
| 172 | def msg_queue_listner(): |
| 173 | |
| 174 | try: |
| 175 | config = ConfigParser.ConfigParser() |
| 176 | config.read('pipeline_agent.conf') |
| 177 | rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username') |
| 178 | rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd') |
| 179 | rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host') |
| 180 | rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') ) |
| 181 | ''' |
| 182 | log_level = config.get('LOGGING','level') |
| 183 | log_file = config.get('LOGGING','filename') |
| 184 | |
| 185 | level = LEVELS.get(log_level, logging.NOTSET) |
| 186 | logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\ |
| 187 | datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level) |
| 188 | ''' |
| 189 | logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False) |
| 190 | except Exception as e: |
| 191 | logging.error("* Error in confing file:%s",e.__str__()) |
| 192 | else : |
| 193 | logging.debug("*------------------Rabbit MQ Server Info---------") |
| 194 | logging.debug("rabbitmq_username:%s",rabbitmq_username) |
| 195 | logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd) |
| 196 | logging.debug("rabbitmq_host:%s",rabbitmq_host) |
| 197 | logging.debug("rabbitmq_port:%s",rabbitmq_port) |
| 198 | credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd) |
| 199 | parameters = pika.ConnectionParameters(rabbitmq_host, |
| 200 | rabbitmq_port, |
| 201 | '/', |
| 202 | credentials) |
| 203 | connection = pika.BlockingConnection(parameters) |
| 204 | channel = connection.channel() |
| 205 | #channel.queue_declare(queue='pubsub') |
| 206 | channel.exchange_declare(exchange='pubsub', |
| 207 | type='fanout') |
| 208 | |
| 209 | result = channel.queue_declare(exclusive=True) |
| 210 | queue_name = result.method.queue |
| 211 | |
| 212 | channel.queue_bind(exchange='pubsub', |
| 213 | queue=queue_name) |
| 214 | logging.debug("[*] Waiting for messages. To exit press CTRL+C") |
| 215 | |
| 216 | channel.basic_consume(callback, |
| 217 | queue=queue_name, |
| 218 | no_ack=True) |
| 219 | channel.start_consuming() |
| 220 | |
| 221 | if __name__ == "__main__": |
| 222 | #logging.debug("* Starting pipeline agent module") |
| 223 | msg_queue_listner() |
| 224 | |