blob: dc158eb68434fb6f51eedf4c8518ec3453849b98 [file] [log] [blame]
Matteo Scandoloeb0d11c2017-08-08 13:05:26 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
rdudyalab086cf32016-08-11 00:07:45 -040017import pika
18import yaml
19import subprocess
20import logging
21import logging.config
22import operator
23import json
24import ConfigParser
25import pipeline
26import utils
27#from ceilometer import pipeline
28from collections import OrderedDict
29
30
31class UnsortableList(list):
32 def sort(self, *args, **kwargs):
33 pass
34
35class UnsortableOrderedDict(OrderedDict):
36 def items(self, *args, **kwargs):
37 return UnsortableList(OrderedDict.items(self, *args, **kwargs))
38
39#yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
40
41
42tmp_pipeline_conf = "/tmp/pipeline.yaml"
43
44'''
45LEVELS = {'DEBUG': logging.DEBUG,
46 'INFO': logging.INFO,
47 'WARNING': logging.WARNING,
48 'ERROR': logging.ERROR,
49 'CRITICAL': logging.CRITICAL}
50
51_DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
52'''
53def get_source_info(meter):
54 sink_name = meter + "_sink"
55 meter_name = meter+"_name"
56 source_info = {'interval': 6,'meters': [meter],'name': meter_name,'sinks':[sink_name]}
57 logging.debug("* new source_info :%s",source_info)
58 return (source_info,sink_name)
59
60def get_sink_info(meter,sink_name,target):
61 sink_info = {'publishers':['notifier://',target],'transformers':None ,'name': sink_name}
62 logging.debug("* new source_info :%s",sink_info)
63 return sink_info
64
65def restart_ceilometer_services():
66 try :
67 config = ConfigParser.ConfigParser()
68 config.read('pipeline_agent.conf')
69 services = config.get('RABBITMQ','Ceilometer_service')
70 service = services.split(",")
71 except Exception as e:
72 logging.error("* Error in confing file:%s",e.__str__())
73 return False
74 else :
75 for service_name in service:
76 command = ['service',service_name, 'restart'];
77 logging.debug("Executing: %s command",command)
78 #shell=FALSE for sudo to work.
79 try :
80 subprocess.call(command, shell=False)
81 except Exception as e:
82 logging.error("* %s command execution failed with error %s",command,e.__str__())
83 return False
84 return True
85
86def check_meter_with_pipeline_cfg(pipeline_cfg_file,meter=None,target=None):
87 #import pdb;pdb.set_trace()
88 try :
89 pipeline._setup_pipeline_manager(pipeline_cfg_file,None)
90 except Exception as e:
91 logging.error ("Got Exception: %s",e.__str__())
92 return False
93 return True
94
95
96def callback(ch, method, properties, msg):
97 logging.debug(" [x] Received %r",msg)
98 #import pdb; pdb.set_trace()
99 #yaml.add_representer(UnsortableOrderedDict, yaml.representer.SafeRepresenter.represent_dict)
100 orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
101 with open (orig_pipeline_conf, 'r') as fap:
102 data = fap.read()
103 pipeline_cfg = yaml.safe_load(data)
104 logging.debug("Pipeline config: %s", pipeline_cfg)
105
106 try :
107 json_msg = json.loads(msg)
108 meter = json_msg['sub_info']
109 publisher = json_msg['target']
110 flag = json_msg['action']
111 update_status = []
112 if type(meter) is list:
113 logging.debug("Metere is a list ... Need to handle it ")
114 for meter_info in meter :
115 update_status.append(update_pipeline_yaml(meter_info,publisher,flag))
116 else :
117 update_status.append(update_pipeline_yaml(meter,publisher,flag))
118
119 if reduce(operator.or_, update_status):
120 if not restart_ceilometer_services():
121 logging.error("Error in restarting ceilometer services")
122 return False
123 except Exception as e :
124 logging.error("Got exception:%s in parsing message",e.__str__())
125 return False
126
127
128
129
130
131def update_pipeline_yaml(meter,publisher,flag):
132 logging.debug("meter name:%s",meter)
133 logging.debug("publisher or target name:%s",publisher)
134
135 orig_pipeline_conf = "/etc/ceilometer/pipeline.yaml"
136 ''' Parsing orginal pipeline yaml file '''
137 try :
138 with open (orig_pipeline_conf, 'r') as fap:
139 data = fap.read()
140 pipeline_cfg = yaml.safe_load(data)
141 logging.debug("Pipeline config: %s", pipeline_cfg)
142
143 ''' Chcking parsing errors '''
144
145 if not check_meter_with_pipeline_cfg(orig_pipeline_conf) :
146 logging.error("Original pipeline.yaml parsing failed")
147 return False
148 else :
149 status = None
150 if flag == "ADD" :
151 status = utils.update_conf_to_pipe_line_cfg(meter,publisher,pipeline_cfg)
152 elif flag == "DEL" :
153 status = utils.delete_conf_from_pipe_line_cfg(meter,publisher,pipeline_cfg)
154
155 if status == True :
156 tmp_pipeline_conf = "/tmp/pipeline.yaml"
157 with open(tmp_pipeline_conf, "w") as f:
158 yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
159 if check_meter_with_pipeline_cfg(tmp_pipeline_conf,meter,publisher) :
160 logging.debug("Tmp pipeline.yaml parsed sucessfully,coping it as orig")
161 with open(orig_pipeline_conf, "w") as f:
162 yaml.safe_dump( pipeline_cfg, f ,default_flow_style=False)
163 return True
164 else :
165 logging.info("Retaining original conf,as update meter info has errors")
166 return False
167 except Exception as e:
168 logging.error("* Error in confing file:%s",e.__str__())
169 return False
170
171
172def msg_queue_listner():
173
174 try:
175 config = ConfigParser.ConfigParser()
176 config.read('pipeline_agent.conf')
177 rabbitmq_username = config.get('RABBITMQ','Rabbitmq_username')
178 rabbitmq_passwd = config.get('RABBITMQ','Rabbitmq_passwd')
179 rabbitmq_host = config.get('RABBITMQ','Rabbitmq_host')
180 rabbitmq_port = int ( config.get('RABBITMQ','Rabbitmq_port') )
181 '''
182 log_level = config.get('LOGGING','level')
183 log_file = config.get('LOGGING','filename')
184
185 level = LEVELS.get(log_level, logging.NOTSET)
186 logging.basicConfig(filename=log_file,format='%(asctime)s %(filename)s %(levelname)s %(message)s',\
187 datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
188 '''
189 logging.config.fileConfig('pipeline_agent.conf', disable_existing_loggers=False)
190 except Exception as e:
191 logging.error("* Error in confing file:%s",e.__str__())
192 else :
193 logging.debug("*------------------Rabbit MQ Server Info---------")
194 logging.debug("rabbitmq_username:%s",rabbitmq_username)
195 logging.debug("rabbitmq_passwd:%s",rabbitmq_passwd)
196 logging.debug("rabbitmq_host:%s",rabbitmq_host)
197 logging.debug("rabbitmq_port:%s",rabbitmq_port)
198 credentials = pika.PlainCredentials(rabbitmq_username,rabbitmq_passwd)
199 parameters = pika.ConnectionParameters(rabbitmq_host,
200 rabbitmq_port,
201 '/',
202 credentials)
203 connection = pika.BlockingConnection(parameters)
204 channel = connection.channel()
205 #channel.queue_declare(queue='pubsub')
206 channel.exchange_declare(exchange='pubsub',
207 type='fanout')
208
209 result = channel.queue_declare(exclusive=True)
210 queue_name = result.method.queue
211
212 channel.queue_bind(exchange='pubsub',
213 queue=queue_name)
214 logging.debug("[*] Waiting for messages. To exit press CTRL+C")
215
216 channel.basic_consume(callback,
217 queue=queue_name,
218 no_ack=True)
219 channel.start_consuming()
220
221if __name__ == "__main__":
222 #logging.debug("* Starting pipeline agent module")
223 msg_queue_listner()
224