blob: ecbabb925466956cf76ebf4f6b285540dfb20b0a [file] [log] [blame]
Srikanth Vavilapalli7da4b712015-12-14 00:52:57 -05001import socket
2import requests
3import urllib2
4import json
5import msgpack
6import collections
7import time, thread, threading
8
9projects_map = {}
10
11UDP_IP = "0.0.0.0"
12UDP_PORT = 12346
13
14def acquire_xos_monitoring_channel():
15 url = "http://ctl:9999/xoslib/monitoringchannel/"
16 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
17 monitoring_channels = requests.get(url, auth=admin_auth).json()
18 ceilometer_url = None
19 if not monitoring_channels:
20 print 'SRIKANTH: No monitoring channels for this user...'
21 return None
22 else:
23 monitoring_channel = monitoring_channels[0]
24 while not monitoring_channel['ceilometer_url']:
25 print 'SRIKANTH: Waiting for monitoring channel create'
26 sleep(0.5)
27 monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
28 #TODO: Wait until URL is completely UP
29 while True:
30 print 'SRIKANTH: Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
31 try:
32 response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
33 break
34 except urllib2.HTTPError, e:
35 print 'SRIKANTH: HTTP error %s' % e.reason
36 break
37 except urllib2.URLError, e:
38 print 'SRIKANTH: URL error %(reason)s' % e.reason
39 pass
40 return monitoring_channel
41
42def print_samples():
43 print ""
44 print ""
45 for project in projects_map.keys():
46 print "project=%s, alarm_state=%s" % (project, projects_map[project]['alarm'])
47 for resource in projects_map[project]['resources'].keys():
48 print "resource=%s" % resource
49 for i in projects_map[project]['resources'][resource]:
50 print " time=%s val=%s" % ( i['timestamp'],i['counter_volume'])
51
52def periodic_print():
53 print_samples()
54 #Print every 1minute
55 threading.Timer(60, periodic_print).start()
56
57
58CPU_UPPER_THRESHOLD = 80 #80%
59CPU_LOWER_THRESHOLD = 30 #30%
60CPU_THRESHOLD_REPEAT = 3
61INITIAL_STATE = 'normal_config'
62SCALE_UP_EVALUATION = 'scale_up_eval'
63SCALE_DOWN_EVALUATION = 'scale_down_eval'
64SCALE_UP_ALARM = 'scale_up'
65SCALE_DOWN_ALARM = 'scale_down'
66
67def getXosTenantInfo(project):
68 print "SRIKANTH: Getting XOS info for openstack Project %s" % project
69 url = "http://ctl:9999/xos/controllerslices/"
70 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
71 controller_slices = requests.get(url, auth=admin_auth).json()
72 for cslice in controller_slices:
73 if cslice['tenant_id'] == project:
74 print "SRIKANTH: Matching controller_slice=%s" % cslice['humanReadableName']
75 slice = requests.get(cslice['slice'], auth=admin_auth).json()
76 slice_name = slice['humanReadableName']
77 print "SRIKANTH: Matching slice=%s" % slice_name
78 service = requests.get(slice['service'], auth=admin_auth).json()
79 service_name = service['humanReadableName']
80 print "SRIKANTH: Matching service=%s" % service_name
81 return {'service':service_name, 'slice':slice_name}
82 logger.warn("SRIKANTH: Project %(project)s has no associated XOS slice" % {'project':project})
83 return None
84
85def handle_adjust_scale(project, adjust):
86 if (adjust != 'up') and (adjust != 'down'):
87 print "SRIKANTH: Invalid adjust value %s " % adjust
88 return
89 current_instances = len(projects_map[project]['resources'].keys())
90 if (current_instances <=1 and adjust == 'down'):
91 print "SRIKANTH: %s is running with already minimum instances and can not scale down further " % project
92 return
93 if (current_instances >=2 and adjust == 'up'):
94 print "SRIKANTH: %s is running with already maximum instances and can not scale up further " % project
95 return
96 xos_tenant = getXosTenantInfo(project)
97 if not xos_tenant:
98 print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any slice" % project
99 return
100 xos_service = xos_tenant['service']
101 xos_slice = xos_tenant['slice']
102 if not xos_service or not xos_slice:
103 print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
104 return
105 print "SRIKANTH: SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
106 query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
107 url = "http://ctl:9999/xoslib/serviceadjustscale/"
108 admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
109 response = requests.get(url, params=query_params, auth=admin_auth).json()
110 print "SRIKANTH: XOS adjust_scale response: %s" % response
111
112def periodic_cpu_threshold_evaluator():
113 for project in projects_map.keys():
114 aggregate_cpu_util = sum([resource_queue[-1]['counter_volume'] \
115 for resource_queue in projects_map[project]['resources'].values()]) \
116 /len(projects_map[project]['resources'].keys())
117
118 if (projects_map[project]['alarm'] == INITIAL_STATE or
119 projects_map[project]['alarm'] == SCALE_UP_ALARM or
120 projects_map[project]['alarm'] == SCALE_DOWN_ALARM):
121 if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
122 projects_map[project]['uthreadshold_count'] = 1
123 projects_map[project]['alarm'] = SCALE_UP_EVALUATION
124 if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
125 projects_map[project]['alarm'] = SCALE_UP_ALARM
126 handle_adjust_scale(project, 'up')
127 elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
128 projects_map[project]['lthreadshold_count'] = 1
129 projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
130 if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
131 projects_map[project]['alarm'] = SCALE_DOWN_ALARM
132 handle_adjust_scale(project, 'down')
133 else:
134 projects_map[project]['uthreadshold_count'] = 0
135 projects_map[project]['lthreadshold_count'] = 0
136 projects_map[project]['alarm'] = INITIAL_STATE
137 elif projects_map[project]['alarm'] == SCALE_UP_EVALUATION:
138 if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
139 projects_map[project]['uthreadshold_count'] += 1
140 if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
141 projects_map[project]['alarm'] = SCALE_UP_ALARM
142 handle_adjust_scale(project, 'up')
143 elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
144 projects_map[project]['lthreadshold_count'] += 1
145 projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
146 else:
147 projects_map[project]['uthreadshold_count'] = 0
148 projects_map[project]['alarm'] = INITIAL_STATE
149 elif projects_map[project]['alarm'] == SCALE_DOWN_EVALUATION:
150 if aggregate_cpu_util < CPU_LOWER_THRESHOLD:
151 projects_map[project]['lthreadshold_count'] += 1
152 if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
153 projects_map[project]['alarm'] = SCALE_DOWN_ALARM
154 handle_adjust_scale(project, 'down')
155 elif aggregate_cpu_util > CPU_UPPER_THRESHOLD:
156 projects_map[project]['uthreadshold_count'] += 1
157 projects_map[project]['alarm'] = SCALE_UP_EVALUATION
158 else:
159 projects_map[project]['lthreadshold_count'] = 0
160 projects_map[project]['alarm'] = INITIAL_STATE
161 threading.Timer(60, periodic_cpu_threshold_evaluator).start()
162
163def read_notification_from_ceilometer(host,port):
164 udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
165 udp.bind((host, port))
166
167 while True:
168 data, source = udp.recvfrom(64000)
169 try:
170 sample = msgpack.loads(data, encoding='utf-8')
171 if sample['counter_name'] != 'cpu_util':
172 continue
173 if sample['project_id'] not in projects_map.keys():
174 projects_map[sample['project_id']] = {}
175 projects_map[sample['project_id']]['resources'] = {}
176 projects_map[sample['project_id']]['uthreadshold_count'] = 0
177 projects_map[sample['project_id']]['lthreadshold_count'] = 0
178 projects_map[sample['project_id']]['alarm'] = INITIAL_STATE
179 resource_map = projects_map[sample['project_id']]['resources']
180 if sample['resource_id'] not in resource_map.keys():
181 resource_map[sample['resource_id']] = collections.deque(maxlen=10)
182 samples_map = resource_map[sample['resource_id']]
183 sample = {'counter_name':sample['counter_name'],
184 'project_id':sample['project_id'],
185 'resource_id':sample['resource_id'],
186 'timestamp':sample['timestamp'],
187 'counter_unit':sample['counter_unit'],
188 'counter_volume':sample['counter_volume']}
189 samples_map.append(sample)
190 except Exception as e:
191 print e
192
193def main():
194 monitoring_channel = acquire_xos_monitoring_channel()
195 if not monitoring_channel:
196 print 'SRIKANTH: XOS monitoring_channel is not created... Create it before using this app'
197 return
198 thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
199 ceilometer_url = monitoring_channel['ceilometer_url']
200 subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
201 subscribe_url = ceilometer_url + 'v2/subscribe'
202 response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
203 print 'SRIKANTH: Ceilometer Subscription status:%s' % response.text
204 #TODO: Fix the typo in 'sucess'
205 if (not 'sucess' in response.text) and (not 'already exists' in response.text):
206 print 'SRIKANTH: Ceilometer Subscription unsuccessful...Exiting'
207 return
208 periodic_cpu_threshold_evaluator()
209 periodic_print()
210
211if __name__ == "__main__":
212 main()