blob: 848ccc062b28a60785a2df434ae5f575c5f41f68 [file] [log] [blame]
Matteo Scandolobf14f882016-06-02 10:01:34 -07001import socket
2import requests
3import urllib2
4import json
5import msgpack
6import collections
7import time, thread, threading
8
9from flask import request, Request, jsonify
10from flask import Flask
11from flask import make_response
12app = Flask(__name__)
13
14projects_map = {}
15xos_tenant_info_map = {}
16xos_instances_info_map = {}
17
18use_kafka = True
19
20if use_kafka:
21 import kafka
22 from kafka import TopicPartition
23else:
24 UDP_IP = "0.0.0.0"
25 UDP_PORT = 12346
26
27@app.route('/autoscaledata',methods=['GET'])
28def autoscaledata():
29 response = app.make_response(json.dumps(projects_map.values()))
30 response.mimetype="application/json"
31 return response
32
33def acquire_xos_monitoring_channel():
34 url = "http://ctl:9999/xoslib/monitoringchannel/"
35 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
36 monitoring_channels = requests.get(url, auth=admin_auth).json()
37 ceilometer_url = None
38 if not monitoring_channels:
39 print 'SRIKANTH: No monitoring channels for this user...'
40 return None
41 else:
42 monitoring_channel = monitoring_channels[0]
43 while not monitoring_channel['ceilometer_url']:
44 print 'SRIKANTH: Waiting for monitoring channel create'
45 sleep(0.5)
46 monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
47 #TODO: Wait until URL is completely UP
48 while True:
49 print 'SRIKANTH: Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
50 try:
51 response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
52 break
53 except urllib2.HTTPError, e:
54 print 'SRIKANTH: HTTP error %s' % e.reason
55 break
56 except urllib2.URLError, e:
57 print 'SRIKANTH: URL error %s' % e.reason
58 pass
59 return monitoring_channel
60
61def print_samples():
62 print ""
63 print ""
64 for project in projects_map.keys():
65 print "service=%s slice=%s, alarm_state=%s" % (projects_map[project]['service'], projects_map[project]['slice'] if projects_map[project]['slice'] else project, projects_map[project]['alarm'])
66 for resource in projects_map[project]['resources'].keys():
67 print "resource=%s" % (projects_map[project]['resources'][resource]['xos_instance_info']['instance_name'] if projects_map[project]['resources'][resource]['xos_instance_info'] else resource)
68 for i in projects_map[project]['resources'][resource]['queue']:
69 print " time=%s val=%s" % ( i['timestamp'],i['counter_volume'])
70
71def periodic_print():
72 print_samples()
73 #Print every 1minute
74 threading.Timer(20, periodic_print).start()
75
76
77CPU_UPPER_THRESHOLD = 80 #80%
78CPU_LOWER_THRESHOLD = 30 #30%
79CPU_THRESHOLD_REPEAT = 3
80INITIAL_STATE = 'normal_config'
81SCALE_UP_EVALUATION = 'scale_up_eval'
82SCALE_DOWN_EVALUATION = 'scale_down_eval'
83SCALE_UP_ALARM = 'scale_up'
84SCALE_DOWN_ALARM = 'scale_down'
85
86def loadAllXosTenantInfo():
87 print "SRIKANTH: Loading all XOS tenant info"
88 url = "http://ctl:9999/xos/controllerslices/"
89 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
90 controller_slices = requests.get(url, auth=admin_auth).json()
91 for cslice in controller_slices:
92 slice = requests.get(cslice['slice'], auth=admin_auth).json()
93 slice_name = slice['humanReadableName']
94 if slice['service']:
95 service = requests.get(slice['service'], auth=admin_auth).json()
96 service_name = service['humanReadableName']
97 else:
98 service_name = None
99 xos_tenant_info_map[cslice['tenant_id']] = {'service':service_name, 'slice':slice_name}
100 print "SRIKANTH: Project: %s Service:%s Slice:%s" % (cslice['tenant_id'],service_name,slice_name)
101
102def loadAllXosInstanceInfo():
103 print "SRIKANTH: Loading all XOS instance info"
104 url = "http://ctl:9999/xos/instances/"
105 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
106 xos_instances = requests.get(url, auth=admin_auth).json()
107 for instance in xos_instances:
108 xos_instances_info_map[instance['instance_uuid']] = {'instance_name':instance['instance_name']}
109
110def getXosTenantInfo(project):
111 xos_tenant_info = xos_tenant_info_map.get(project, None)
112 if xos_tenant_info:
113 return xos_tenant_info
114 else:
115 loadAllXosTenantInfo()
116 xos_tenant_info = xos_tenant_info_map.get(project, None)
117 if not xos_tenant_info:
118 print "SRIKANTH: Project %s has no associated XOS slice" % project
119 return xos_tenant_info
120
121def getXosInstanceInfo(resource):
122 xos_instance_info = xos_instances_info_map.get(resource, None)
123 if xos_instance_info:
124 return xos_instance_info
125 else:
126 loadAllXosInstanceInfo()
127 xos_instance_info = xos_instances_info_map.get(resource, None)
128 if not xos_instance_info:
129 print "SRIKANTH: Resource %s has no associated XOS instance" % project
130 return xos_instance_info
131
132def handle_adjust_scale(project, adjust):
133 if (adjust != 'up') and (adjust != 'down'):
134 print "SRIKANTH: Invalid adjust value %s " % adjust
135 return
136 current_instances = len(projects_map[project]['resources'].keys())
137 if (current_instances <=1 and adjust == 'down'):
138 print "SRIKANTH: %s is running with already minimum instances and can not scale down further " % project
139 return
140 if (current_instances >=2 and adjust == 'up'):
141 print "SRIKANTH: %s is running with already maximum instances and can not scale up further " % project
142 return
143 #xos_tenant = getXosTenantInfo(project)
144 xos_service = projects_map[project]['service']
145 xos_slice = projects_map[project]['slice']
146 if not xos_service or not xos_slice:
147 print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
148 return
149 print "SRIKANTH: SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
150 query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
151 url = "http://ctl:9999/xoslib/serviceadjustscale/"
152 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
153 response = requests.get(url, params=query_params, auth=admin_auth).json()
154 print "SRIKANTH: XOS adjust_scale response: %s" % response
155
156def periodic_cpu_threshold_evaluator():
157 for project in projects_map.keys():
158 aggregate_cpu_util = sum([resource['queue'][-1]['counter_volume'] \
159 for resource in projects_map[project]['resources'].values()]) \
160 /len(projects_map[project]['resources'].keys())
161
162 if (projects_map[project]['alarm'] == INITIAL_STATE or
163 projects_map[project]['alarm'] == SCALE_UP_ALARM or
164 projects_map[project]['alarm'] == SCALE_DOWN_ALARM):
165 if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
166 projects_map[project]['uthreadshold_count'] = 1
167 projects_map[project]['alarm'] = SCALE_UP_EVALUATION
168 if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
169 projects_map[project]['alarm'] = SCALE_UP_ALARM
170 handle_adjust_scale(project, 'up')
171 elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
172 projects_map[project]['lthreadshold_count'] = 1
173 projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
174 if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
175 projects_map[project]['alarm'] = SCALE_DOWN_ALARM
176 handle_adjust_scale(project, 'down')
177 else:
178 projects_map[project]['uthreadshold_count'] = 0
179 projects_map[project]['lthreadshold_count'] = 0
180 projects_map[project]['alarm'] = INITIAL_STATE
181 elif projects_map[project]['alarm'] == SCALE_UP_EVALUATION:
182 if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
183 projects_map[project]['uthreadshold_count'] += 1
184 if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
185 projects_map[project]['alarm'] = SCALE_UP_ALARM
186 handle_adjust_scale(project, 'up')
187 elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
188 projects_map[project]['lthreadshold_count'] += 1
189 projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
190 else:
191 projects_map[project]['uthreadshold_count'] = 0
192 projects_map[project]['alarm'] = INITIAL_STATE
193 elif projects_map[project]['alarm'] == SCALE_DOWN_EVALUATION:
194 if aggregate_cpu_util < CPU_LOWER_THRESHOLD:
195 projects_map[project]['lthreadshold_count'] += 1
196 if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
197 projects_map[project]['alarm'] = SCALE_DOWN_ALARM
198 handle_adjust_scale(project, 'down')
199 elif aggregate_cpu_util > CPU_UPPER_THRESHOLD:
200 projects_map[project]['uthreadshold_count'] += 1
201 projects_map[project]['alarm'] = SCALE_UP_EVALUATION
202 else:
203 projects_map[project]['lthreadshold_count'] = 0
204 projects_map[project]['alarm'] = INITIAL_STATE
205 threading.Timer(20, periodic_cpu_threshold_evaluator).start()
206
207def read_notification_from_ceilometer_over_kafka(host,port,topic):
208 print "Kafka target" , host, port, topic
209 try :
210 consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (host,port)])
211 consumer.assign([TopicPartition(topic,0)])
212 consumer.seek_to_end()
213 for message in consumer:
214 #print message.value
215 #logging.debug("%s",message.value)
216 process_notification_from_ceilometer(json.loads(message.value))
217 #status = process_ceilometer_message(json.loads(message.value),message.value)
218 #print status
219 except Exception as e:
220 print "AUTO_SCALE Exception:",e
221
222def read_notification_from_ceilometer(host,port):
223 udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
224 udp.bind((host, port))
225
226 while True:
227 data, source = udp.recvfrom(64000)
228 try:
229 sample = msgpack.loads(data, encoding='utf-8')
230 process_notification_from_ceilometer(sample)
231 except Exception as e:
232 print e
233
234def process_notification_from_ceilometer(sample):
235 if sample['counter_name'] == 'instance':
236 if 'delete' in sample['resource_metadata']['event_type']:
237 xosTenantInfo = getXosTenantInfo(sample['project_id'])
238 xosResourceInfo = getXosInstanceInfo(sample['resource_id'])
239 print "SRIKANTH: Project %s Instance %s is getting deleted" % (xosTenantInfo['slice'] if xosTenantInfo['slice'] else sample['project_id'],xosResourceInfo)
240 if sample['project_id'] not in projects_map.keys():
241 return
242 if sample['resource_id'] not in projects_map[sample['project_id']]['resources'].keys():
243 return
244 projects_map[sample['project_id']]['resources'].pop(sample['resource_id'], None)
245 return
246 elif sample['counter_name'] != 'cpu_util':
247 return
248 if sample['project_id'] not in projects_map.keys():
249 projects_map[sample['project_id']] = {}
250 xosTenantInfo = getXosTenantInfo(sample['project_id'])
251 projects_map[sample['project_id']]['project_id'] = sample['project_id']
252 projects_map[sample['project_id']]['slice'] = xosTenantInfo['slice']
253 projects_map[sample['project_id']]['service'] = xosTenantInfo['service']
254 projects_map[sample['project_id']]['resources'] = {}
255 projects_map[sample['project_id']]['uthreadshold_count'] = 0
256 projects_map[sample['project_id']]['lthreadshold_count'] = 0
257 projects_map[sample['project_id']]['alarm'] = INITIAL_STATE
258 resource_map = projects_map[sample['project_id']]['resources']
259 if sample['resource_id'] not in resource_map.keys():
260 resource_map[sample['resource_id']] = {}
261 resource_map[sample['resource_id']]['xos_instance_info'] = getXosInstanceInfo(sample['resource_id'])
262 resource_map[sample['resource_id']]['queue'] = []
263 samples_queue = resource_map[sample['resource_id']]['queue']
264 sample = {'counter_name':sample['counter_name'],
265 'project_id':sample['project_id'],
266 'resource_id':sample['resource_id'],
267 'timestamp':sample['timestamp'],
268 'counter_unit':sample['counter_unit'],
269 'counter_volume':sample['counter_volume']}
270 deque = collections.deque(samples_queue, maxlen=10)
271 deque.append(sample)
272 resource_map[sample['resource_id']]['queue'] = list(deque)
273
274def setup_webserver():
275 try:
276 #config = ConfigParser.ConfigParser()
277 #config.read('pub_sub.conf')
278 #webserver_host = config.get('WEB_SERVER','webserver_host')
279 #webserver_port = int (config.get('WEB_SERVER','webserver_port'))
280 #client_host = config.get('CLIENT','client_host')
281 #client_port = int (config.get('CLIENT','client_port'))
282
283 #log_level = config.get('LOGGING','level')
284 #log_file = config.get('LOGGING','filename')
285
286 #level = LEVELS.get(log_level, logging.NOTSET)
287 #logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
288 # datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
289 webserver_host = '0.0.0.0'
290 webserver_port = 9991
291
292 except Exception as e:
293 print("* Error in config file:",e.__str__())
294 logging.error("* Error in confing file:%s",e.__str__())
295 else:
296 app.run(host=webserver_host,port=webserver_port,debug=True, use_reloader=False)
297
298
299def main():
300 monitoring_channel = acquire_xos_monitoring_channel()
301 if not monitoring_channel:
302 print 'SRIKANTH: XOS monitoring_channel is not created... Create it before using this app'
303 return
304 loadAllXosTenantInfo()
305 loadAllXosInstanceInfo()
306 ceilometer_url = monitoring_channel['ceilometer_url']
307 if use_kafka:
308 thread.start_new(read_notification_from_ceilometer_over_kafka, ("10.11.10.1","9092","auto-scale",))
309 subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://10.11.10.1:9092?topic=auto-scale"}
310 else:
311 thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
312 subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
313 subscribe_url = ceilometer_url + 'v2/subscribe'
314 response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
315 print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription status:%s' % response.text
316 #TODO: Fix the typo in 'sucess'
317 if (not 'sucess' in response.text) and (not 'already exists' in response.text):
318 print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
319 return
320 subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://10.11.10.1:12346"}
321 subscribe_url = ceilometer_url + 'v2/subscribe'
322 response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
323 print 'SRIKANTH: Ceilometer meter "instance" Subscription status:%s' % response.text
324 #TODO: Fix the typo in 'sucess'
325 if (not 'sucess' in response.text) and (not 'already exists' in response.text):
326 print 'SRIKANTH: Ceilometer meter "instance"Subscription unsuccessful...Exiting'
327 return
328 periodic_cpu_threshold_evaluator()
329 periodic_print()
330 setup_webserver()
331
332if __name__ == "__main__":
333 main()