Ceilometer PUB/SUB support and xos Auto-scaling changes (partial)
diff --git a/xos/configurations/cord/ceilometer_pub_sub.tar.gz b/xos/configurations/cord/ceilometer_pub_sub.tar.gz
new file mode 100644
index 0000000..eb88a2b
--- /dev/null
+++ b/xos/configurations/cord/ceilometer_pub_sub.tar.gz
Binary files differ
diff --git a/xos/configurations/cord/install_ceilometer_patch.sh b/xos/configurations/cord/install_ceilometer_patch.sh
index d6b4056..77aa05b 100755
--- a/xos/configurations/cord/install_ceilometer_patch.sh
+++ b/xos/configurations/cord/install_ceilometer_patch.sh
@@ -3,9 +3,9 @@
exit 0
fi
echo "Verifying if all the required files are present"
-if [ ! -f openstack_ceilometer_patch.tar.gz ];
+if [ ! -f openstack_ceilometer_patch.tar.gz ] || [ ! -f ceilometer_pub_sub.tar.gz ];
then
- echo "File openstack_ceilometer_patch.tar.gz not found"
+ echo "File openstack_ceilometer_patch.tar.gz or ceilometer_pub_sub.tar.gz not found"
exit 1
fi
echo "Copying the ceilometer patch files to /usr/lib/python2.7/dist-packages/ceilometer"
@@ -20,3 +20,7 @@
sudo service ceilometer-agent-notification restart
echo "Restarting ceilometer-agent-central"
sudo service ceilometer-agent-central restart
+tar -xzf ceilometer_pub_sub.tar.gz
+echo "Starting Ceilometer PUB/SUB service"
+cd ceilometer_pub_sub
+python sub_main.py &
diff --git a/xos/core/xoslib/methods/ceilometerview.py b/xos/core/xoslib/methods/ceilometerview.py
index d02e716..5e0ac35 100644
--- a/xos/core/xoslib/methods/ceilometerview.py
+++ b/xos/core/xoslib/methods/ceilometerview.py
@@ -152,6 +152,23 @@
else:
return list(set(a) - set(b))
+def get_resource_map(request, ceilometer_url):
+ resource_map = {}
+ try:
+ resources = resource_list(request, ceilometer_url=ceilometer_url)
+ for r in resources:
+ if 'display_name' in r['metadata']:
+ name = r['metadata']['display_name']
+ elif 'name' in r['metadata']:
+ name = r['metadata']['name']
+ else:
+ name = r['resource_id']
+ resource_map[r['resource_id']] = name
+ except requests.exceptions.RequestException as e:
+ raise e
+
+ return resource_map
+
class Meters(object):
"""Class for listing of available meters.
@@ -163,11 +180,12 @@
"""
- def __init__(self, request=None, ceilometer_meter_list=None, ceilometer_url=None, tenant_map=None):
+ def __init__(self, request=None, ceilometer_meter_list=None, ceilometer_url=None, tenant_map=None, resource_map=None):
# Storing the request.
self._request = request
self.ceilometer_url = ceilometer_url
self.tenant_map = tenant_map
+ self.resource_map = resource_map
# Storing the Ceilometer meter list
if ceilometer_meter_list:
@@ -382,21 +400,23 @@
if meter_info:
label = meter_info["label"]
description = meter_info["description"]
- meter_type = meter_info["type"]
+ meter_category = meter_info["type"]
else:
label = ""
description = ""
- meter_type = "Other"
+ meter_category = "Other"
for meter in meter_candidates:
meter["label"] = label
meter["description"] = description
- meter["type"] = meter_type
+ meter["category"] = meter_category
if meter["project_id"] in self.tenant_map.keys():
meter["slice"] = self.tenant_map[meter["project_id"]]["slice"]
meter["service"] = self.tenant_map[meter["project_id"]]["service"]
else:
meter["slice"] = meter["project_id"]
meter["service"] = "Other"
+ if meter["resource_id"] in self.resource_map.keys():
+ meter["resource_name"] = self.resource_map[meter["resource_id"]]
self._cached_meters[meter_name] = meter_candidates
@@ -929,7 +949,8 @@
if (not tenant_ceilometer_url):
raise XOSMissingField("Tenant ceilometer URL is missing")
tenant_map = getTenantControllerTenantMap(request.user)
- meters = Meters(request, ceilometer_url=tenant_ceilometer_url, tenant_map=tenant_map)
+ resource_map = get_resource_map(request, ceilometer_url=tenant_ceilometer_url)
+ meters = Meters(request, ceilometer_url=tenant_ceilometer_url, tenant_map=tenant_map, resource_map=resource_map)
services = {
_('Nova'): meters.list_nova(),
_('Neutron'): meters.list_neutron(),
@@ -997,7 +1018,8 @@
return Response(row)
#Statistics query for all meter
- meters = Meters(request, ceilometer_url=tenant_ceilometer_url, tenant_map=tenant_map)
+ resource_map = get_resource_map(request, ceilometer_url=tenant_ceilometer_url)
+ meters = Meters(request, ceilometer_url=tenant_ceilometer_url, tenant_map=tenant_map, resource_map=resource_map)
services = {
_('Nova'): meters.list_nova(),
_('Neutron'): meters.list_neutron(),
@@ -1017,11 +1039,13 @@
statistic = statistics[-1]
row = {"name": 'none',
"slice": meter["slice"],
+ "project_id": meter["project_id"],
"service": meter["service"],
"resource_id": meter["resource_id"],
+ "resource_name": meter["resource_name"],
"meter": meter["name"],
"description": meter["description"],
- "type": service,
+ "category": service,
"time": statistic["period_end"],
"value": statistic["avg"],
"unit": meter["unit"]}
@@ -1055,3 +1079,23 @@
samples = sample_list(request, meter_name,
ceilometer_url=tenant_ceilometer_url, query=query, limit=limit)
return Response(samples)
+
+class ServiceAdjustScale(APIView):
+ method_kind = "list"
+ method_name = "serviceadjustscale"
+
+ def get(self, request, format=None):
+ if (not request.user.is_authenticated()) or (not request.user.is_admin()):
+ raise PermissionDenied("You must be authenticated admin user in order to use this API")
+ service = request.QUERY_PARAMS.get('service', None)
+ slice_hint = request.QUERY_PARAMS.get('slice_hint', None)
+ scale = request.QUERY_PARAMS.get('scale', None)
+ if not service or not slice_hint or not scale:
+ raise XOSMissingField("Mandatory fields missing")
+ services = Service.select_by_user(request.user)
+ logger.info('SRIKANTH: Services for this user %(services)s' % {'services':services})
+ if not services or (not services.get(name=service)):
+ raise XOSMissingField("Service not found")
+ service = services.get(name=service)
+ service.adjust_scale(slice_hint, scale)
+ return Response("Success")
diff --git a/xos/observers/monitoring_channel/templates/ceilometer_proxy_server.py b/xos/observers/monitoring_channel/templates/ceilometer_proxy_server.py
index 9329cfa..62f0804 100644
--- a/xos/observers/monitoring_channel/templates/ceilometer_proxy_server.py
+++ b/xos/observers/monitoring_channel/templates/ceilometer_proxy_server.py
@@ -5,6 +5,8 @@
import json
from ceilometerclient import client
import logging
+import urllib
+import urllib2
from wsgilog import WsgiLog
web.config.debug=False
@@ -49,6 +51,7 @@
r'^/v2/meters/(?P<meter_name>[A-Za-z0-9_:.\-]+)/statistics$', 'statistics_list',
r'^/v2/samples$', 'sample_list',
r'^/v2/resources$', 'resource_list',
+ r'^/v2/subscribe$', 'pubsub_handler',
)
app = web.application(urls, globals())
@@ -245,5 +248,36 @@
resources.extend(results)
return json.dumps([ob._info for ob in resources])
+class pubsub_handler:
+ def POST(self):
+ global config
+ parse_ceilometer_proxy_config()
+ data_str = unicode(web.data(),'iso-8859-1')
+ post_data = json.loads(data_str)
+ final_query=[]
+ for (k,v) in config.items('allowed_tenants'):
+ query = make_query(tenant_id=k)
+ final_query.extend(query)
+ if not final_query:
+ raise Exception("Not allowed to subscribe to any meters")
+ post_data["query"] = final_query
+ #TODO: The PUB/SUB url needs to be read from config
+ put_request = urllib2.Request("http://10.11.10.1:4455/subscribe", json.dumps(post_data))
+ put_request.get_method = lambda: 'SUB'
+ put_request.add_header('Content-Type', 'application/json')
+ response = urllib2.urlopen(put_request)
+ response_text = response.read()
+ return json.dumps(response_text)
+
+ def DELETE(self):
+ data_str = web.data()
+ #TODO: The PUB/SUB url needs to be read from config
+ put_request = urllib2.Request("http://10.11.10.1:4455/unsubscribe", data_str)
+ put_request.get_method = lambda: 'UNSUB'
+ put_request.add_header('Content-Type', 'application/json')
+ response = urllib2.urlopen(put_request)
+ response_text = response.read()
+ return json.dumps(response_text)
+
if __name__ == "__main__":
app.run(FileLog)
diff --git a/xos/observers/monitoring_channel/templates/ceilometer_pub_sub_consumer.py b/xos/observers/monitoring_channel/templates/ceilometer_pub_sub_consumer.py
new file mode 100644
index 0000000..ecbabb9
--- /dev/null
+++ b/xos/observers/monitoring_channel/templates/ceilometer_pub_sub_consumer.py
@@ -0,0 +1,212 @@
+import socket
+import requests
+import urllib2
+import json
+import msgpack
+import collections
+import time, thread, threading
+
+projects_map = {}
+
+UDP_IP = "0.0.0.0"
+UDP_PORT = 12346
+
+def acquire_xos_monitoring_channel():
+ url = "http://ctl:9999/xoslib/monitoringchannel/"
+ admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
+ monitoring_channels = requests.get(url, auth=admin_auth).json()
+ ceilometer_url = None
+ if not monitoring_channels:
+ print 'SRIKANTH: No monitoring channels for this user...'
+ return None
+ else:
+ monitoring_channel = monitoring_channels[0]
+ while not monitoring_channel['ceilometer_url']:
+ print 'SRIKANTH: Waiting for monitoring channel create'
+ sleep(0.5)
+ monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
+ #TODO: Wait until URL is completely UP
+ while True:
+ print 'SRIKANTH: Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
+ try:
+ response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
+ break
+ except urllib2.HTTPError, e:
+ print 'SRIKANTH: HTTP error %s' % e.reason
+ break
+ except urllib2.URLError, e:
+ print 'SRIKANTH: URL error %(reason)s' % e.reason
+ pass
+ return monitoring_channel
+
+def print_samples():
+ print ""
+ print ""
+ for project in projects_map.keys():
+ print "project=%s, alarm_state=%s" % (project, projects_map[project]['alarm'])
+ for resource in projects_map[project]['resources'].keys():
+ print "resource=%s" % resource
+ for i in projects_map[project]['resources'][resource]:
+ print " time=%s val=%s" % ( i['timestamp'],i['counter_volume'])
+
+def periodic_print():
+ print_samples()
+ #Print every 1minute
+ threading.Timer(60, periodic_print).start()
+
+
+CPU_UPPER_THRESHOLD = 80 #80%
+CPU_LOWER_THRESHOLD = 30 #30%
+CPU_THRESHOLD_REPEAT = 3
+INITIAL_STATE = 'normal_config'
+SCALE_UP_EVALUATION = 'scale_up_eval'
+SCALE_DOWN_EVALUATION = 'scale_down_eval'
+SCALE_UP_ALARM = 'scale_up'
+SCALE_DOWN_ALARM = 'scale_down'
+
+def getXosTenantInfo(project):
+ print "SRIKANTH: Getting XOS info for openstack Project %s" % project
+ url = "http://ctl:9999/xos/controllerslices/"
+ admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
+ controller_slices = requests.get(url, auth=admin_auth).json()
+ for cslice in controller_slices:
+ if cslice['tenant_id'] == project:
+ print "SRIKANTH: Matching controller_slice=%s" % cslice['humanReadableName']
+ slice = requests.get(cslice['slice'], auth=admin_auth).json()
+ slice_name = slice['humanReadableName']
+ print "SRIKANTH: Matching slice=%s" % slice_name
+ service = requests.get(slice['service'], auth=admin_auth).json()
+ service_name = service['humanReadableName']
+ print "SRIKANTH: Matching service=%s" % service_name
+ return {'service':service_name, 'slice':slice_name}
+ logger.warn("SRIKANTH: Project %(project)s has no associated XOS slice" % {'project':project})
+ return None
+
+def handle_adjust_scale(project, adjust):
+ if (adjust != 'up') and (adjust != 'down'):
+ print "SRIKANTH: Invalid adjust value %s " % adjust
+ return
+ current_instances = len(projects_map[project]['resources'].keys())
+ if (current_instances <=1 and adjust == 'down'):
+ print "SRIKANTH: %s is running with already minimum instances and can not scale down further " % project
+ return
+ if (current_instances >=2 and adjust == 'up'):
+ print "SRIKANTH: %s is running with already maximum instances and can not scale up further " % project
+ return
+ xos_tenant = getXosTenantInfo(project)
+ if not xos_tenant:
+ print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any slice" % project
+ return
+ xos_service = xos_tenant['service']
+ xos_slice = xos_tenant['slice']
+ if not xos_service or not xos_slice:
+ print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
+ return
+ print "SRIKANTH: SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
+ query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
+ url = "http://ctl:9999/xoslib/serviceadjustscale/"
+ admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
+ response = requests.get(url, params=query_params, auth=admin_auth).json()
+ print "SRIKANTH: XOS adjust_scale response: %s" % response
+
+def periodic_cpu_threshold_evaluator():
+ for project in projects_map.keys():
+ aggregate_cpu_util = sum([resource_queue[-1]['counter_volume'] \
+ for resource_queue in projects_map[project]['resources'].values()]) \
+ /len(projects_map[project]['resources'].keys())
+
+ if (projects_map[project]['alarm'] == INITIAL_STATE or
+ projects_map[project]['alarm'] == SCALE_UP_ALARM or
+ projects_map[project]['alarm'] == SCALE_DOWN_ALARM):
+ if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
+ projects_map[project]['uthreadshold_count'] = 1
+ projects_map[project]['alarm'] = SCALE_UP_EVALUATION
+ if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
+ projects_map[project]['alarm'] = SCALE_UP_ALARM
+ handle_adjust_scale(project, 'up')
+ elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
+ projects_map[project]['lthreadshold_count'] = 1
+ projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
+ if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
+ projects_map[project]['alarm'] = SCALE_DOWN_ALARM
+ handle_adjust_scale(project, 'down')
+ else:
+ projects_map[project]['uthreadshold_count'] = 0
+ projects_map[project]['lthreadshold_count'] = 0
+ projects_map[project]['alarm'] = INITIAL_STATE
+ elif projects_map[project]['alarm'] == SCALE_UP_EVALUATION:
+ if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
+ projects_map[project]['uthreadshold_count'] += 1
+ if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
+ projects_map[project]['alarm'] = SCALE_UP_ALARM
+ handle_adjust_scale(project, 'up')
+ elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
+ projects_map[project]['lthreadshold_count'] += 1
+ projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
+ else:
+ projects_map[project]['uthreadshold_count'] = 0
+ projects_map[project]['alarm'] = INITIAL_STATE
+ elif projects_map[project]['alarm'] == SCALE_DOWN_EVALUATION:
+ if aggregate_cpu_util < CPU_LOWER_THRESHOLD:
+ projects_map[project]['lthreadshold_count'] += 1
+ if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
+ projects_map[project]['alarm'] = SCALE_DOWN_ALARM
+ handle_adjust_scale(project, 'down')
+ elif aggregate_cpu_util > CPU_UPPER_THRESHOLD:
+ projects_map[project]['uthreadshold_count'] += 1
+ projects_map[project]['alarm'] = SCALE_UP_EVALUATION
+ else:
+ projects_map[project]['lthreadshold_count'] = 0
+ projects_map[project]['alarm'] = INITIAL_STATE
+ threading.Timer(60, periodic_cpu_threshold_evaluator).start()
+
+def read_notification_from_ceilometer(host,port):
+ udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
+ udp.bind((host, port))
+
+ while True:
+ data, source = udp.recvfrom(64000)
+ try:
+ sample = msgpack.loads(data, encoding='utf-8')
+ if sample['counter_name'] != 'cpu_util':
+ continue
+ if sample['project_id'] not in projects_map.keys():
+ projects_map[sample['project_id']] = {}
+ projects_map[sample['project_id']]['resources'] = {}
+ projects_map[sample['project_id']]['uthreadshold_count'] = 0
+ projects_map[sample['project_id']]['lthreadshold_count'] = 0
+ projects_map[sample['project_id']]['alarm'] = INITIAL_STATE
+ resource_map = projects_map[sample['project_id']]['resources']
+ if sample['resource_id'] not in resource_map.keys():
+ resource_map[sample['resource_id']] = collections.deque(maxlen=10)
+ samples_map = resource_map[sample['resource_id']]
+ sample = {'counter_name':sample['counter_name'],
+ 'project_id':sample['project_id'],
+ 'resource_id':sample['resource_id'],
+ 'timestamp':sample['timestamp'],
+ 'counter_unit':sample['counter_unit'],
+ 'counter_volume':sample['counter_volume']}
+ samples_map.append(sample)
+ except Exception as e:
+ print e
+
+def main():
+ monitoring_channel = acquire_xos_monitoring_channel()
+ if not monitoring_channel:
+ print 'SRIKANTH: XOS monitoring_channel is not created... Create it before using this app'
+ return
+ thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
+ ceilometer_url = monitoring_channel['ceilometer_url']
+ subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
+ subscribe_url = ceilometer_url + 'v2/subscribe'
+ response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
+ print 'SRIKANTH: Ceilometer Subscription status:%s' % response.text
+ #TODO: Fix the typo in 'sucess'
+ if (not 'sucess' in response.text) and (not 'already exists' in response.text):
+ print 'SRIKANTH: Ceilometer Subscription unsuccessful...Exiting'
+ return
+ periodic_cpu_threshold_evaluator()
+ periodic_print()
+
+if __name__ == "__main__":
+ main()