blob: 59ee92686a703b5860e38de1d9ed906b2f3ccc80 [file] [log] [blame]
import sys, getopt
import socket
import requests
import urllib2
import json
import msgpack
import collections
import time, thread, threading
from flask import request, Request, jsonify
from flask import Flask
from flask import make_response
app = Flask(__name__)
projects_map = {}
xos_tenant_info_map = {}
xos_instances_info_map = {}
#gethostname() -- return the current hostname
#gethostbyname() -- map a hostname to its IP number
hostname = None
xos_ip = None
kafka_ip = xos_ip
xos_port = 9000
print "----------------------------------------------------------------------"
print "xos_ip:",xos_ip
print "kfka_ip:",kafka_ip
print "----------------------------------------------------------------------"
use_kafka = True
if use_kafka:
import kafka
import kafka_broker
from oslo_utils import netutils
#from kafka import TopicPartition
else:
UDP_IP = "0.0.0.0"
UDP_PORT = 12346
@app.route('/autoscaledata',methods=['GET'])
def autoscaledata():
response = app.make_response(json.dumps(projects_map.values()))
response.mimetype="application/json"
return response
def acquire_xos_monitoring_channel():
global xos_ip
url = "http://" + xos_ip + ":" + str(xos_port) +"/api/tenant/monitoring/monitoringchannel/"
print url
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
monitoring_channels = requests.get(url, auth=admin_auth).json()
ceilometer_url = None
if not monitoring_channels:
print 'No monitoring channels for this user...'
return None
else:
monitoring_channel = monitoring_channels[0]
while not monitoring_channel['ceilometer_url']:
print 'Waiting for monitoring channel create'
sleep(0.5)
monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
while True:
print 'Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
try:
response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
break
except urllib2.HTTPError, e:
print 'HTTP error %s' % e.reason
break
except urllib2.URLError, e:
print 'URL error %s' % e.reason
pass
return monitoring_channel
def print_samples():
global supported_services
print ""
print ""
for project in projects_map.keys():
xos_service = projects_map[project]['service']
if (not xos_service) or (xos_service not in supported_services):
print "Skipped scale evaluation for project=%s and service =%s" % (project, xos_service)
continue
print "service=%s slice=%s, alarm_state=%s lcount=%s ucount=%s" % (projects_map[project]['service'], projects_map[project]['slice'] if projects_map[project]['slice'] else project, projects_map[project]['alarm'], projects_map[project]['lthreadshold_count'], projects_map[project]['uthreadshold_count'])
for resource in projects_map[project]['resources'].keys():
print "resource=%s" % (projects_map[project]['resources'][resource]['xos_instance_info']['instance_name'] if projects_map[project]['resources'][resource]['xos_instance_info'] else resource)
for i in projects_map[project]['resources'][resource]['queue']:
print " time=%s val=%s" % ( i['timestamp'],i['counter_volume'])
def periodic_print():
print_samples()
#Print every 20 seconds
threading.Timer(20, periodic_print).start()
CPU_UPPER_THRESHOLD = 80 #80%
CPU_LOWER_THRESHOLD = 30 #30%
CPU_THRESHOLD_REPEAT = 3
INITIAL_STATE = 'normal_config'
SCALE_UP_EVALUATION = 'scale_up_eval'
SCALE_DOWN_EVALUATION = 'scale_down_eval'
SCALE_UP_ALARM = 'scale_up'
SCALE_DOWN_ALARM = 'scale_down'
def loadAllXosTenantInfo():
global xos_ip
print "Loading all XOS tenant info"
url = "http://" + xos_ip + ":" + str(xos_port) +"/xos/controllerslices/"
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
controller_slices = requests.get(url, auth=admin_auth).json()
for cslice in controller_slices:
slice = requests.get(cslice['slice'], auth=admin_auth).json()
slice_name = slice['humanReadableName']
if slice['service']:
service = requests.get(slice['service'], auth=admin_auth).json()
service_name = service['humanReadableName']
else:
service_name = None
xos_tenant_info_map[cslice['tenant_id']] = {'service':service_name, 'slice':slice_name}
print "SRIKANTH: Project: %s Service:%s Slice:%s" % (cslice['tenant_id'],service_name,slice_name)
def loadAllXosInstanceInfo():
global xos_ip
print "Loading all XOS instance info"
url = "http://" + xos_ip + ":" + str(xos_port) +"/xos/instances/"
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
xos_instances = requests.get(url, auth=admin_auth).json()
for instance in xos_instances:
xos_instances_info_map[instance['instance_uuid']] = {'instance_name':instance['instance_name']}
def getXosTenantInfo(project):
xos_tenant_info = xos_tenant_info_map.get(project, None)
if xos_tenant_info:
return xos_tenant_info
else:
loadAllXosTenantInfo()
xos_tenant_info = xos_tenant_info_map.get(project, None)
if not xos_tenant_info:
print "Project %s has no associated XOS slice" % project
return xos_tenant_info
def getXosInstanceInfo(resource):
xos_instance_info = xos_instances_info_map.get(resource, None)
if xos_instance_info:
return xos_instance_info
else:
loadAllXosInstanceInfo()
xos_instance_info = xos_instances_info_map.get(resource, None)
if not xos_instance_info:
print "Resource %s has no associated XOS instance" % resource
return xos_instance_info
def handle_adjust_scale(project, adjust):
global xos_ip, supported_services
if (adjust != 'up') and (adjust != 'down'):
print "SRIKANTH: Invalid adjust value %s " % adjust
return
xos_service = projects_map[project]['service']
xos_slice = projects_map[project]['slice']
if not xos_service or not xos_slice:
print "Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
return
if (xos_service not in supported_services):
print "SRIKANTH: Scaling is not supported for this service %s...Ignoring adjust_scale request" % xos_service
return
current_instances = len(projects_map[project]['resources'].keys())
if (current_instances <=1 and adjust == 'down'):
print "%s is running with already minimum instances and can not scale down further " % xos_service
return
if (current_instances >=2 and adjust == 'up'):
print "%s is running with already maximum instances and can not scale up further " % xos_service
return
#xos_tenant = getXosTenantInfo(project)
print "SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
url = "http://" + xos_ip + ":" + str(xos_port) + "/api/tenant/monitoring/dashboard/serviceadjustscale/"
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
response = requests.get(url, params=query_params, auth=admin_auth).json()
print "SRIKANTH: XOS adjust_scale response: %s" % response
def periodic_cpu_threshold_evaluator():
global supported_services
for project in projects_map.keys():
xos_service = projects_map[project]['service']
if (not xos_service) or (xos_service not in supported_services):
#print "Skipping scale evaluation for project=%s and service =%s" % (project, xos_service)
continue
aggregate_cpu_util = sum([resource['queue'][-1]['counter_volume'] \
for resource in projects_map[project]['resources'].values() if 'queue' in resource.keys()]) \
/len(projects_map[project]['resources'].keys())
if (projects_map[project]['alarm'] == INITIAL_STATE or
projects_map[project]['alarm'] == SCALE_UP_ALARM or
projects_map[project]['alarm'] == SCALE_DOWN_ALARM):
if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
projects_map[project]['uthreadshold_count'] = 1
projects_map[project]['alarm'] = SCALE_UP_EVALUATION
if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
projects_map[project]['alarm'] = SCALE_UP_ALARM
handle_adjust_scale(project, 'up')
elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
projects_map[project]['lthreadshold_count'] = 1
projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
projects_map[project]['alarm'] = SCALE_DOWN_ALARM
handle_adjust_scale(project, 'down')
else:
projects_map[project]['uthreadshold_count'] = 0
projects_map[project]['lthreadshold_count'] = 0
projects_map[project]['alarm'] = INITIAL_STATE
elif projects_map[project]['alarm'] == SCALE_UP_EVALUATION:
if aggregate_cpu_util > CPU_UPPER_THRESHOLD:
projects_map[project]['uthreadshold_count'] += 1
if projects_map[project]['uthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
projects_map[project]['alarm'] = SCALE_UP_ALARM
handle_adjust_scale(project, 'up')
elif aggregate_cpu_util < CPU_LOWER_THRESHOLD:
projects_map[project]['lthreadshold_count'] += 1
projects_map[project]['alarm'] = SCALE_DOWN_EVALUATION
else:
projects_map[project]['uthreadshold_count'] = 0
projects_map[project]['alarm'] = INITIAL_STATE
elif projects_map[project]['alarm'] == SCALE_DOWN_EVALUATION:
if aggregate_cpu_util < CPU_LOWER_THRESHOLD:
projects_map[project]['lthreadshold_count'] += 1
if projects_map[project]['lthreadshold_count'] >= CPU_THRESHOLD_REPEAT:
projects_map[project]['alarm'] = SCALE_DOWN_ALARM
handle_adjust_scale(project, 'down')
elif aggregate_cpu_util > CPU_UPPER_THRESHOLD:
projects_map[project]['uthreadshold_count'] += 1
projects_map[project]['alarm'] = SCALE_UP_EVALUATION
else:
projects_map[project]['lthreadshold_count'] = 0
projects_map[project]['alarm'] = INITIAL_STATE
threading.Timer(20, periodic_cpu_threshold_evaluator).start()
def read_notification_from_ceilometer_over_kafka(parse_target):
print("Kafka target:",parse_target)
try :
kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
for message in kafka_publisher.kafka_consumer:
#print message.value
#logging.info("%s",message.value)
process_notification_from_ceilometer(json.loads(message.value))
#print status
except Exception as e:
print "AUTO_SCALE Exception:",e
'''
def read_notification_from_ceilometer_over_kafka(host,port,topic):
print "Kafka target" , host, port, topic
try :
consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (host,port)])
#consumer.assign([TopicPartition(topic,0)])
consumer.set_topic_partitions(topic)
consumer.seek_to_end()
for message in consumer:
print message.value
#logging.debug("%s",message.value)
process_notification_from_ceilometer(json.loads(message.value))
#status = process_ceilometer_message(json.loads(message.value),message.value)
#print status
except Exception as e:
print "AUTO_SCALE Exception:",e
'''
def read_notification_from_ceilometer(host,port):
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
udp.bind((host, port))
while True:
data, source = udp.recvfrom(64000)
try:
sample = msgpack.loads(data, encoding='utf-8')
process_notification_from_ceilometer(sample)
except Exception as e:
print e
def process_notification_from_ceilometer(sample):
if sample['counter_name'] == 'instance':
if 'delete' in sample['resource_metadata']['event_type']:
xosTenantInfo = getXosTenantInfo(sample['project_id'])
xosResourceInfo = getXosInstanceInfo(sample['resource_id'])
print "SRIKANTH: Project %s Instance %s is getting deleted" % (xosTenantInfo['slice'] if xosTenantInfo['slice'] else sample['project_id'],xosResourceInfo)
if sample['project_id'] not in projects_map.keys():
return
if sample['resource_id'] not in projects_map[sample['project_id']]['resources'].keys():
return
projects_map[sample['project_id']]['resources'].pop(sample['resource_id'], None)
return
elif sample['counter_name'] != 'cpu_util':
return
if sample['project_id'] not in projects_map.keys():
projects_map[sample['project_id']] = {}
xosTenantInfo = getXosTenantInfo(sample['project_id'])
projects_map[sample['project_id']]['project_id'] = sample['project_id']
projects_map[sample['project_id']]['slice'] = (xosTenantInfo['slice'] if xosTenantInfo else None)
projects_map[sample['project_id']]['service'] = (xosTenantInfo['service'] if xosTenantInfo else None)
projects_map[sample['project_id']]['resources'] = {}
projects_map[sample['project_id']]['uthreadshold_count'] = 0
projects_map[sample['project_id']]['lthreadshold_count'] = 0
projects_map[sample['project_id']]['alarm'] = INITIAL_STATE
resource_map = projects_map[sample['project_id']]['resources']
if sample['resource_id'] not in resource_map.keys():
resource_map[sample['resource_id']] = {}
resource_map[sample['resource_id']]['xos_instance_info'] = getXosInstanceInfo(sample['resource_id'])
resource_map[sample['resource_id']]['queue'] = []
samples_queue = resource_map[sample['resource_id']]['queue']
sample = {'counter_name':sample['counter_name'],
'project_id':sample['project_id'],
'resource_id':sample['resource_id'],
'timestamp':sample['timestamp'],
'counter_unit':sample['counter_unit'],
'counter_volume':sample['counter_volume']}
deque = collections.deque(samples_queue, maxlen=10)
deque.append(sample)
resource_map[sample['resource_id']]['queue'] = list(deque)
def setup_webserver():
try:
#config = ConfigParser.ConfigParser()
#config.read('pub_sub.conf')
#webserver_host = config.get('WEB_SERVER','webserver_host')
#webserver_port = int (config.get('WEB_SERVER','webserver_port'))
#client_host = config.get('CLIENT','client_host')
#client_port = int (config.get('CLIENT','client_port'))
#log_level = config.get('LOGGING','level')
#log_file = config.get('LOGGING','filename')
#level = LEVELS.get(log_level, logging.NOTSET)
#logging.basicConfig(filename=log_file,format='%(asctime)s %(levelname)s %(message)s',\
# datefmt=_DEFAULT_LOG_DATE_FORMAT,level=level)
webserver_host = '0.0.0.0'
webserver_port = 9991
except Exception as e:
print("* Error in config file:",e.__str__())
logging.error("* Error in confing file:%s",e.__str__())
else:
app.run(host=webserver_host,port=webserver_port,debug=True, use_reloader=False)
def parse_args(argv):
global xos_ip, kafka_ip, supported_services
try:
opts, args = getopt.getopt(argv,"k:s:x:",["kafka-ip=","supported-services=","xos-ip="])
except getopt.GetoptError:
print 'xos_auto_scaling_app.py --xos-ip=<IP> --kafka-ip=<> [--supported-services=<comma seperated service name>]'
sys.exit(2)
for opt, arg in opts:
if opt in ("--xos-ip"):
xos_ip = arg
elif opt in ("--kafka-ip"):
kafka_ip = arg
elif opt in ("--supported-services"):
supported_services = arg.split(',')
if not xos_ip or not kafka_ip:
print 'xos_auto_scaling_app.py --xos-ip=<IP> --kafka-ip=<> [--supported-services=<comma seperated service name>]'
sys.exit(2)
print 'xos-ip=%s, kafka-ip=%s, supported-services=%s' % (xos_ip, kafka_ip, supported_services)
def main(argv):
global xos_ip, kafka_ip
parse_args(argv)
monitoring_channel = acquire_xos_monitoring_channel()
if not monitoring_channel:
print 'XOS monitoring_channel is not created... Create it before using this app'
return
loadAllXosTenantInfo()
loadAllXosInstanceInfo()
ceilometer_url = monitoring_channel['ceilometer_url']
if use_kafka:
url = "kafka://" + kafka_ip + ":9092?topic=auto-scale"
parse_target=netutils.urlsplit(url)
thread.start_new(read_notification_from_ceilometer_over_kafka, (parse_target,))
#thread.start_new(read_notification_from_ceilometer_over_kafka, (xos_ip,"9092","auto-scale",))
subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://" + kafka_ip + ":9092?topic=auto-scale"}
else:
thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://" + xos_ip + ":12346"}
subscribe_url = ceilometer_url + 'v2/subscribe'
response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
print 'Ceilometer meter "cpu_util" Subscription status:%s' % response.text
#TODO: Fix the typo in 'sucess'
if (not 'sucess' in response.text) and (not 'already exists' in response.text):
print 'Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
return
#TODO: Fix the need for different app_id for each subscription from the same application
if use_kafka:
subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"kafka://" + kafka_ip + ":9092?topic=auto-scale"}
else:
subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://" + xos_ip + ":12346"}
subscribe_url = ceilometer_url + 'v2/subscribe'
response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
print 'Ceilometer meter "instance" Subscription status:%s' % response.text
#TODO: Fix the typo in 'sucess'
if (not 'sucess' in response.text) and (not 'already exists' in response.text):
print 'Ceilometer meter "instance"Subscription unsuccessful...Exiting'
return
periodic_cpu_threshold_evaluator()
periodic_print()
setup_webserver()
if __name__ == "__main__":
main(sys.argv[1:])