blob: a751b8e47c175059dbaefd8a6b140e3a8b5fb392 [file] [log] [blame]
rdudyalab086cf32016-08-11 00:07:45 -04001import pika
2import yaml
3import subprocess
4import logging
5import logging.config
6import operator
7import json
8import ConfigParser
9import pipeline
10import utils
11#from ceilometer import pipeline
12from collections import OrderedDict
13
14
15class UnsortableList(list):
16 def sort(self, *args, **kwargs):
17 pass
18
19class UnsortableOrderedDict(OrderedDict):
20 def items(self, *args, **kwargs):
21 return UnsortableList(OrderedDict.items(self, *args, **kwargs))
22
23#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
24
25
26tmp_pipeline_conf = "/tmp/pipeline.yaml"
27
28'''
29LEVELS = {'DEBUG': logging.DEBUG,
30 'INFO': logging.INFO,
31 'WARNING': logging.WARNING,
32 'ERROR': logging.ERROR,
33 'CRITICAL': logging.CRITICAL}
34
35_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
36'''
37def get_source_info(meter):
38 sink_name = meter + "_sink"
39 meter_name = meter+"_name"
40 source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
41 logging.debug("* new source_info :%s",source_info)
42 return (source_info,sink_name)
43
44def get_sink_info(meter,sink_name,target):
45 sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
46 logging.debug("* new source_info :%s",sink_info)
47 return sink_info
48
49def restart_ceilometer_services():
50 try :
51 config = ConfigParser.ConfigParser()
52 config.read('pipeline_agent.conf')
53 services = config.get('RABBITMQ','Ceilometer_service')
54 service = services.split(",")
55 except Exception as e:
56 logging.error("* Error in confing file:%s",e.__str__())
57 return False
58 else :
59 for service_name in service:
60 command = ['service',service_name, 'restart'];
61 logging.debug("Executing: %s command",command)
62 #shell=FALSE for sudo to work.
63 try :
64 subprocess.call(command, shell=False)
65 except Exception as e:
66 logging.error("* %s command execution failed with error %s",command,e.__str__())
67 return False
68 return True
69
70def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
71 #import pdb;pdb.set_trace()
72 try :
73 pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
74 except Exception as e:
75 logging.error ("Got Exception: %s",e.__str__())
76 return False
77 return True
78
79
80def callback(ch, method, properties, msg):
81 logging.debug(" [x] Received %r",msg)
82 #import pdb; pdb.set_trace()
83 #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
84 orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
85 with open (orig_pipeline_conf, 'r') as fap:
86 data = fap.read()
87 pipeline_cfg = yaml.safe_load(data)
88 logging.debug("Pipeline config: %s", pipeline_cfg)
89
90 try :
91 json_msg = json.loads(msg)
92 meter = json_msg['sub_info']
93 publisher = json_msg['target']
94 flag = json_msg['action']
95 update_status = []
96 if type(meter) is list:
97 logging.debug("Metere is a list ... Need to handle it ")
98 for meter_info in meter :
99 update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
100 else :
101 update_status.append(update_pipeline_yaml(meter,publisher,flag))
102
103 if reduce(operator.or_, update_status):
104 if not restart_ceilometer_services():
105 logging.error("Error in restarting ceilometer services")
106 return False
107 except Exception as e :
108 logging.error("Got exception:%s in parsing message",e.__str__())
109 return False
110
111
112
113
114
115def update_pipeline_yaml(meter,publisher,flag):
116 logging.debug("meter name:%s",meter)
117 logging.debug("publisher or target name:%s",publisher)
118
119 orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
120 ''' Parsing orginal pipeline yaml file '''
121 try :
122 with open (orig_pipeline_conf, 'r') as fap:
123 data = fap.read()
124 pipeline_cfg = yaml.safe_load(data)
125 logging.debug("Pipeline config: %s", pipeline_cfg)
126
127 ''' Chcking parsing errors '''
128
129 if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
130 logging.error("Original pipeline.yaml parsing failed")
131 return False
132 else :
133 status = None
134 if flag == "ADD" :
135 status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
136 elif flag == "DEL" :
137 status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
138
139 if status == True :
140 tmp_pipeline_conf = "/tmp/pipeline.yaml"
141 with open(tmp_pipeline_conf, "w") as f:
142 yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
143 if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
144 logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
145 with open(orig_pipeline_conf, "w") as f:
146 yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
147 return True
148 else :
149 logging.info("Retaining original conf,as update meter info has errors")
150 return False
151 except Exception as e:
152 logging.error("* Error in confing file:%s",e.__str__())
153 return False
154
155
156def msg_queue_listner():
157
158 try:
159 config = ConfigParser.ConfigParser()
160 config.read('pipeline_agent.conf')
161 rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
162 rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
163 rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
164 rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
165 '''
166 log_level = config.get('LOGGING','level')
167 log_file = config.get('LOGGING','filename')
168
169 level = LEVELS.get(log_level, logging.NOTSET)
170 logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
171 datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
172 '''
173 logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)
174 except Exception as e:
175 logging.error("* Error in confing file:%s",e.__str__())
176 else :
177 logging.debug("*------------------Rabbit MQ Server Info---------")
178 logging.debug("rabbitmq_username:%s",rabbitmq_username)
179 logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
180 logging.debug("rabbitmq_host:%s",rabbitmq_host)
181 logging.debug("rabbitmq_port:%s",rabbitmq_port)
182 credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
183 parameters = pika.ConnectionParameters(rabbitmq_host,
184 rabbitmq_port,
185 '/',
186 credentials)
187 connection = pika.BlockingConnection(parameters)
188 channel = connection.channel()
189 #channel.queue_declare(queue='pubsub')
190 channel.exchange_declare(exchange='pubsub',
191 type='fanout')
192
193 result = channel.queue_declare(exclusive=True)
194 queue_name = result.method.queue
195
196 channel.queue_bind(exchange='pubsub',
197 queue=queue_name)
198 logging.debug("[*] Waiting for messages. To exit press CTRL+C")
199
200 channel.basic_consume(callback,
201 queue=queue_name,
202 no_ack=True)
203 channel.start_consuming()
204
205if __name__ == "__main__":
206 #logging.debug("* Starting pipeline agent module")
207 msg_queue_listner()
208