blob: 68ba4cf81f8095062371690ad71fb47c6965866e [file] [log] [blame]
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -05001import socket
2import requests
3import urllib2
4import json
5import msgpack
6import collections
7import time, thread, threading
8
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -05009from flask import request, Request, jsonify
10from flask import Flask
11from flask import make_response
12app = Flask(__name__)
13
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050014projects_map = {}
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -050015xos_tenant_info_map = {}
16xos_instances_info_map = {}
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050017
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -050018use_kafka = True
19
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -040020XOS_ENDPOINT = '130.127.133.58:9999'
21KAFKA_SERVER_IP = '130.127.133.58'
22KAFKA_SERVER_PORT = '9092'
23KAFKA_TOPIC = 'auto-scale'
24LOCAL_KAFKA_TARGET_URL = 'kafka://'+KAFKA_SERVER_IP+':'+KAFKA_SERVER_PORT+'?topic='+KAFKA_TOPIC
25
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -050026if use_kafka:
27 import kafka
28 from kafka import TopicPartition
29else:
30 UDP_IP = "0.0.0.0"
31 UDP_PORT = 12346
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050032
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -050033@app.route('/autoscaledata',methods=['GET'])
34def autoscaledata():
teone46c13ab2015-12-16 18:28:14 -050035 response = app.make_response(json.dumps(projects_map.values()))
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -050036 response.mimetype="application/json"
37 return response
38
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050039def acquire_xos_monitoring_channel():
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -040040 url = "http://"+XOS_ENDPOINT+"/api/tenant/ceilometer/monitoringchannel/"
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050041 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
42 monitoring_channels = requests.get(url, auth=admin_auth).json()
43 ceilometer_url = None
44 if not monitoring_channels:
45 print 'SRIKANTH: No monitoring channels for this user...'
46 return None
47 else:
48 monitoring_channel = monitoring_channels[0]
49 while not monitoring_channel['ceilometer_url']:
50 print 'SRIKANTH: Waiting for monitoring channel create'
51 sleep(0.5)
52 monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
53 #TODO: Wait until URL is completely UP
54 while True:
55 print 'SRIKANTH: Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
56 try:
57 response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
58 break
59 except urllib2.HTTPError, e:
60 print 'SRIKANTH: HTTP error %s' % e.reason
61 break
62 except urllib2.URLError, e:
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -060063 print 'SRIKANTH: URL error %s' % e.reason
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050064 pass
65 return monitoring_channel
66
67def print_samples():
68 print ""
69 print ""
70 for project in projects_map.keys():
teone46c13ab2015-12-16 18:28:14 -050071 print "service=%s slice=%s, alarm_state=%s" % (projects_map[project]['service'], projects_map[project]['slice'] if projects_map[project]['slice'] else project, projects_map[project]['alarm'])
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050072 for resource in projects_map[project]['resources'].keys():
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -050073 print "resource=%s" % (projects_map[project]['resources'][resource]['xos_instance_info']['instance_name'] if projects_map[project]['resources'][resource]['xos_instance_info'] else resource)
74 for i in projects_map[project]['resources'][resource]['queue']:
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050075 print " time=%s val=%s" % ( i['timestamp'],i['counter_volume'])
76
77def periodic_print():
78 print_samples()
79 #Print every 1minute
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -060080 threading.Timer(20, periodic_print).start()
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050081
82
83CPU_UPPER_THRESHOLD = 80 #80%
84CPU_LOWER_THRESHOLD = 30 #30%
85CPU_THRESHOLD_REPEAT = 3
86INITIAL_STATE = 'normal_config'
87SCALE_UP_EVALUATION = 'scale_up_eval'
88SCALE_DOWN_EVALUATION = 'scale_down_eval'
89SCALE_UP_ALARM = 'scale_up'
90SCALE_DOWN_ALARM = 'scale_down'
91
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -050092def loadAllXosTenantInfo():
93 print "SRIKANTH: Loading all XOS tenant info"
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -040094 url = "http://"+XOS_ENDPOINT+"/xos/controllerslices/"
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -050095 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
96 controller_slices = requests.get(url, auth=admin_auth).json()
97 for cslice in controller_slices:
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -050098 slice = requests.get(cslice['slice'], auth=admin_auth).json()
99 slice_name = slice['humanReadableName']
100 if slice['service']:
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500101 service = requests.get(slice['service'], auth=admin_auth).json()
102 service_name = service['humanReadableName']
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -0500103 else:
104 service_name = None
105 xos_tenant_info_map[cslice['tenant_id']] = {'service':service_name, 'slice':slice_name}
106 print "SRIKANTH: Project: %s Service:%s Slice:%s" % (cslice['tenant_id'],service_name,slice_name)
107
108def loadAllXosInstanceInfo():
109 print "SRIKANTH: Loading all XOS instance info"
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -0400110 url = "http://"+XOS_ENDPOINT+"/xos/instances/"
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -0500111 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
112 xos_instances = requests.get(url, auth=admin_auth).json()
113 for instance in xos_instances:
114 xos_instances_info_map[instance['instance_uuid']] = {'instance_name':instance['instance_name']}
115
116def getXosTenantInfo(project):
117 xos_tenant_info = xos_tenant_info_map.get(project, None)
118 if xos_tenant_info:
119 return xos_tenant_info
120 else:
121 loadAllXosTenantInfo()
122 xos_tenant_info = xos_tenant_info_map.get(project, None)
123 if not xos_tenant_info:
124 print "SRIKANTH: Project %s has no associated XOS slice" % project
125 return xos_tenant_info
126
127def getXosInstanceInfo(resource):
128 xos_instance_info = xos_instances_info_map.get(resource, None)
129 if xos_instance_info:
130 return xos_instance_info
131 else:
132 loadAllXosInstanceInfo()
133 xos_instance_info = xos_instances_info_map.get(resource, None)
134 if not xos_instance_info:
135 print "SRIKANTH: Resource %s has no associated XOS instance" % project
136 return xos_instance_info
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500137
138def handle_adjust_scale(project, adjust):
139 if (adjust != 'up') and (adjust != 'down'):
140 print "SRIKANTH: Invalid adjust value %s " % adjust
141 return
142 current_instances = len(projects_map[project]['resources'].keys())
143 if (current_instances <=1 and adjust == 'down'):
144 print "SRIKANTH: %s is running with already minimum instances and can not scale down further " % project
145 return
146 if (current_instances >=2 and adjust == 'up'):
147 print "SRIKANTH: %s is running with already maximum instances and can not scale up further " % project
148 return
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -0500149 #xos_tenant = getXosTenantInfo(project)
teone46c13ab2015-12-16 18:28:14 -0500150 xos_service = projects_map[project]['service']
151 xos_slice = projects_map[project]['slice']
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500152 if not xos_service or not xos_slice:
153 print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
154 return
155 print "SRIKANTH: SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
156 query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -0400157 url = "http://"+XOS_ENDPOINT+"/xoslib/serviceadjustscale/"
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500158 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
159 response = requests.get(url, params=query_params, auth=admin_auth).json()
160 print "SRIKANTH: XOS adjust_scale response: %s" % response
161
162def periodic_cpu_threshold_evaluator():
163 for project in projects_map.keys():
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -0500164 aggregate_cpu_util = sum([resource['queue'][-1]['counter_volume'] \
165 for resource in projects_map[project]['resources'].values()]) \
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500166 /len(projects_map[project]['resources'].keys())
167
168 if (projects_map[project]['alarm'] == INITIAL_STATE or
169 projects_map[project]['alarm'] == SCALE_UP_ALARM or
170 projects_map[project]['alarm'] == SCALE_DOWN_ALARM):
171 if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
172 projects_map[project]['uthreadshold_count'] = 1
173 projects_map[project]['alarm'] = SCALE_UP_EVALUATION
174 if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
175 projects_map[project]['alarm'] = SCALE_UP_ALARM
176 handle_adjust_scale(project, 'up')
177 elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
178 projects_map[project]['lthreadshold_count'] = 1
179 projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
180 if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
181 projects_map[project]['alarm'] = SCALE_DOWN_ALARM
182 handle_adjust_scale(project, 'down')
183 else:
184 projects_map[project]['uthreadshold_count'] = 0
185 projects_map[project]['lthreadshold_count'] = 0
186 projects_map[project]['alarm'] = INITIAL_STATE
187 elif projects_map[project]['alarm'] == SCALE_UP_EVALUATION:
188 if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
189 projects_map[project]['uthreadshold_count'] += 1
190 if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
191 projects_map[project]['alarm'] = SCALE_UP_ALARM
192 handle_adjust_scale(project, 'up')
193 elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
194 projects_map[project]['lthreadshold_count'] += 1
195 projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
196 else:
197 projects_map[project]['uthreadshold_count'] = 0
198 projects_map[project]['alarm'] = INITIAL_STATE
199 elif projects_map[project]['alarm'] == SCALE_DOWN_EVALUATION:
200 if aggregate_cpu_util < CPU_LOWER_THRESHOLD:
201 projects_map[project]['lthreadshold_count'] += 1
202 if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
203 projects_map[project]['alarm'] = SCALE_DOWN_ALARM
204 handle_adjust_scale(project, 'down')
205 elif aggregate_cpu_util > CPU_UPPER_THRESHOLD:
206 projects_map[project]['uthreadshold_count'] += 1
207 projects_map[project]['alarm'] = SCALE_UP_EVALUATION
208 else:
209 projects_map[project]['lthreadshold_count'] = 0
210 projects_map[project]['alarm'] = INITIAL_STATE
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600211 threading.Timer(20, periodic_cpu_threshold_evaluator).start()
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500212
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500213def read_notification_from_ceilometer_over_kafka(host,port,topic):
214 print "Kafka target" , host, port, topic
215 try :
216 consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (host,port)])
217 consumer.assign([TopicPartition(topic,0)])
218 consumer.seek_to_end()
219 for message in consumer:
220 #print message.value
221 #logging.debug("%s",message.value)
222 process_notification_from_ceilometer(json.loads(message.value))
223 #status = process_ceilometer_message(json.loads(message.value),message.value)
224 #print status
225 except Exception as e:
226 print "AUTO_SCALE Exception:",e
227
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500228def read_notification_from_ceilometer(host,port):
229 udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
230 udp.bind((host, port))
231
232 while True:
233 data, source = udp.recvfrom(64000)
234 try:
235 sample = msgpack.loads(data, encoding='utf-8')
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500236 process_notification_from_ceilometer(sample)
237 except Exception as e:
238 print e
239
240def process_notification_from_ceilometer(sample):
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600241 if sample['counter_name'] == 'instance':
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -0400242 if ('event_type' in sample['resource_metadata'].keys()) and ('delete' in sample['resource_metadata']['event_type']):
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600243 xosTenantInfo = getXosTenantInfo(sample['project_id'])
244 xosResourceInfo = getXosInstanceInfo(sample['resource_id'])
245 print "SRIKANTH: Project %s Instance %s is getting deleted" % (xosTenantInfo['slice'] if xosTenantInfo['slice'] else sample['project_id'],xosResourceInfo)
246 if sample['project_id'] not in projects_map.keys():
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500247 return
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600248 if sample['resource_id'] not in projects_map[sample['project_id']]['resources'].keys():
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500249 return
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600250 projects_map[sample['project_id']]['resources'].pop(sample['resource_id'], None)
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500251 return
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600252 elif sample['counter_name'] != 'cpu_util':
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500253 return
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500254 if sample['project_id'] not in projects_map.keys():
255 projects_map[sample['project_id']] = {}
teone46c13ab2015-12-16 18:28:14 -0500256 xosTenantInfo = getXosTenantInfo(sample['project_id'])
257 projects_map[sample['project_id']]['project_id'] = sample['project_id']
258 projects_map[sample['project_id']]['slice'] = xosTenantInfo['slice']
259 projects_map[sample['project_id']]['service'] = xosTenantInfo['service']
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500260 projects_map[sample['project_id']]['resources'] = {}
261 projects_map[sample['project_id']]['uthreadshold_count'] = 0
262 projects_map[sample['project_id']]['lthreadshold_count'] = 0
263 projects_map[sample['project_id']]['alarm'] = INITIAL_STATE
264 resource_map = projects_map[sample['project_id']]['resources']
265 if sample['resource_id'] not in resource_map.keys():
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -0500266 resource_map[sample['resource_id']] = {}
267 resource_map[sample['resource_id']]['xos_instance_info'] = getXosInstanceInfo(sample['resource_id'])
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -0500268 resource_map[sample['resource_id']]['queue'] = []
269 samples_queue = resource_map[sample['resource_id']]['queue']
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500270 sample = {'counter_name':sample['counter_name'],
271 'project_id':sample['project_id'],
272 'resource_id':sample['resource_id'],
273 'timestamp':sample['timestamp'],
274 'counter_unit':sample['counter_unit'],
275 'counter_volume':sample['counter_volume']}
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -0500276 deque = collections.deque(samples_queue, maxlen=10)
277 deque.append(sample)
278 resource_map[sample['resource_id']]['queue'] = list(deque)
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500279
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -0500280def setup_webserver():
281 try:
282 #config = ConfigParser.ConfigParser()
283 #config.read('pub_sub.conf')
284 #webserver_host = config.get('WEB_SERVER','webserver_host')
285 #webserver_port = int (config.get('WEB_SERVER','webserver_port'))
286 #client_host = config.get('CLIENT','client_host')
287 #client_port = int (config.get('CLIENT','client_port'))
288
289 #log_level = config.get('LOGGING','level')
290 #log_file = config.get('LOGGING','filename')
291
292 #level = LEVELS.get(log_level, logging.NOTSET)
293 #logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
294 # datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
295 webserver_host = '0.0.0.0'
296 webserver_port = 9991
297
298 except Exception as e:
299 print("* Error in config file:",e.__str__())
300 logging.error("* Error in confing file:%s",e.__str__())
301 else:
302 app.run(host=webserver_host,port=webserver_port,debug=True, use_reloader=False)
303
304
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500305def main():
306 monitoring_channel = acquire_xos_monitoring_channel()
307 if not monitoring_channel:
308 print 'SRIKANTH: XOS monitoring_channel is not created... Create it before using this app'
309 return
Srikanth Vavilapalli0183a9f2015-12-14 17:35:04 -0500310 loadAllXosTenantInfo()
311 loadAllXosInstanceInfo()
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500312 ceilometer_url = monitoring_channel['ceilometer_url']
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500313 if use_kafka:
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -0400314 thread.start_new(read_notification_from_ceilometer_over_kafka, (KAFKA_SERVER_IP,KAFKA_SERVER_PORT,KAFKA_TOPIC,))
315 subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":LOCAL_KAFKA_TARGET_URL}
Srikanth Vavilapallie8ddae62016-02-20 17:16:55 -0500316 else:
317 thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
318 subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500319 subscribe_url = ceilometer_url + 'v2/subscribe'
320 response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600321 print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription status:%s' % response.text
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500322 #TODO: Fix the typo in 'sucess'
323 if (not 'sucess' in response.text) and (not 'already exists' in response.text):
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600324 print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
325 return
Srikanth Vavilapalli80159a92016-05-10 14:35:20 -0400326 if use_kafka:
327 subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":LOCAL_KAFKA_TARGET_URL}
328 else:
329 subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://10.11.10.1:12346"}
Srikanth Vavilapallie1a34e82015-12-17 16:58:09 -0600330 subscribe_url = ceilometer_url + 'v2/subscribe'
331 response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
332 print 'SRIKANTH: Ceilometer meter "instance" Subscription status:%s' % response.text
333 #TODO: Fix the typo in 'sucess'
334 if (not 'sucess' in response.text) and (not 'already exists' in response.text):
335 print 'SRIKANTH: Ceilometer meter "instance"Subscription unsuccessful...Exiting'
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500336 return
337 periodic_cpu_threshold_evaluator()
338 periodic_print()
Srikanth Vavilapalli8d363f12015-12-16 14:55:30 -0500339 setup_webserver()
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -0500340
341if __name__ == "__main__":
342 main()