Code changes includes:
1.Added ner file kafka_broker.py to communicate with kafka server.
2.Added code to accept xos_ip and kafka_ip ad arguments :
3.Replace hardcoaded ip with the ip retrived from the step 2(xos_ip).
Change-Id: I1254ecc5560b2ef781035aa86a778fac94d524a8
diff --git a/auto-scale/xos_auto_scaling_app.py b/auto-scale/xos_auto_scaling_app.py
index 848ccc0..9ec0103 100644
--- a/auto-scale/xos_auto_scaling_app.py
+++ b/auto-scale/xos_auto_scaling_app.py
@@ -1,3 +1,4 @@
+import sys, getopt
import socket
import requests
import urllib2
@@ -15,11 +16,25 @@
xos_tenant_info_map = {}
xos_instances_info_map = {}
+#gethostname() -- return the current hostname
+#gethostbyname() -- map a hostname to its IP number
+hostname = None
+xos_ip = None
+kafka_ip = xos_ip
+xos_port = 9000
+
+print "----------------------------------------------------------------------"
+print "xos_ip:",xos_ip
+print "kfka_ip:",kafka_ip
+print "----------------------------------------------------------------------"
+
use_kafka = True
if use_kafka:
import kafka
- from kafka import TopicPartition
+ import kafka_broker
+ from oslo_utils import netutils
+ #from kafka import TopicPartition
else:
UDP_IP = "0.0.0.0"
UDP_PORT = 12346
@@ -31,30 +46,32 @@
return response
def acquire_xos_monitoring_channel():
- url = "http://ctl:9999/xoslib/monitoringchannel/"
+ global xos_ip
+
+ url = "http://" + xos_ip + ":" + str(xos_port) +"/api/tenant/monitoring/monitoringchannel/"
+ print url
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
monitoring_channels = requests.get(url, auth=admin_auth).json()
ceilometer_url = None
if not monitoring_channels:
- print 'SRIKANTH: No monitoring channels for this user...'
+ print 'No monitoring channels for this user...'
return None
else:
monitoring_channel = monitoring_channels[0]
while not monitoring_channel['ceilometer_url']:
- print 'SRIKANTH: Waiting for monitoring channel create'
+ print 'Waiting for monitoring channel create'
sleep(0.5)
monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
- #TODO: Wait until URL is completely UP
while True:
- print 'SRIKANTH: Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
+ print 'Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
try:
response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
break
except urllib2.HTTPError, e:
- print 'SRIKANTH: HTTP error %s' % e.reason
+ print 'HTTP error %s' % e.reason
break
except urllib2.URLError, e:
- print 'SRIKANTH: URL error %s' % e.reason
+ print 'URL error %s' % e.reason
pass
return monitoring_channel
@@ -84,8 +101,10 @@
SCALE_DOWN_ALARM = 'scale_down'
def loadAllXosTenantInfo():
- print "SRIKANTH: Loading all XOS tenant info"
- url = "http://ctl:9999/xos/controllerslices/"
+ global xos_ip
+
+ print "Loading all XOS tenant info"
+ url = "http://" + xos_ip + ":" + str(xos_port) +"/xos/controllerslices/"
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
controller_slices = requests.get(url, auth=admin_auth).json()
for cslice in controller_slices:
@@ -100,8 +119,10 @@
print "SRIKANTH: Project: %s Service:%s Slice:%s" % (cslice['tenant_id'],service_name,slice_name)
def loadAllXosInstanceInfo():
- print "SRIKANTH: Loading all XOS instance info"
- url = "http://ctl:9999/xos/instances/"
+ global xos_ip
+
+ print "Loading all XOS instance info"
+ url = "http://" + xos_ip + ":" + str(xos_port) +"/xos/instances/"
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
xos_instances = requests.get(url, auth=admin_auth).json()
for instance in xos_instances:
@@ -115,7 +136,7 @@
loadAllXosTenantInfo()
xos_tenant_info = xos_tenant_info_map.get(project, None)
if not xos_tenant_info:
- print "SRIKANTH: Project %s has no associated XOS slice" % project
+ print "Project %s has no associated XOS slice" % project
return xos_tenant_info
def getXosInstanceInfo(resource):
@@ -126,29 +147,31 @@
loadAllXosInstanceInfo()
xos_instance_info = xos_instances_info_map.get(resource, None)
if not xos_instance_info:
- print "SRIKANTH: Resource %s has no associated XOS instance" % project
+ print "Resource %s has no associated XOS instance" % project
return xos_instance_info
def handle_adjust_scale(project, adjust):
+ global xos_ip
+
if (adjust != 'up') and (adjust != 'down'):
print "SRIKANTH: Invalid adjust value %s " % adjust
return
current_instances = len(projects_map[project]['resources'].keys())
if (current_instances <=1 and adjust == 'down'):
- print "SRIKANTH: %s is running with already minimum instances and can not scale down further " % project
+ print "%s is running with already minimum instances and can not scale down further " % project
return
if (current_instances >=2 and adjust == 'up'):
- print "SRIKANTH: %s is running with already maximum instances and can not scale up further " % project
+ print "%s is running with already maximum instances and can not scale up further " % project
return
#xos_tenant = getXosTenantInfo(project)
xos_service = projects_map[project]['service']
xos_slice = projects_map[project]['slice']
if not xos_service or not xos_slice:
- print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
+ print "Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
return
- print "SRIKANTH: SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
+ print "SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
- url = "http://ctl:9999/xoslib/serviceadjustscale/"
+ url = "http://" + xos_ip + ":" + str(xos_port) + "/xoslib/serviceadjustscale/"
admin_auth=("padmin@vicci.org", "letmein") # use your XOS username and password
response = requests.get(url, params=query_params, auth=admin_auth).json()
print "SRIKANTH: XOS adjust_scale response: %s" % response
@@ -204,20 +227,34 @@
projects_map[project]['alarm'] = INITIAL_STATE
threading.Timer(20, periodic_cpu_threshold_evaluator).start()
+def read_notification_from_ceilometer_over_kafka(parse_target):
+ print("Kafka target:",parse_target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ for message in kafka_publisher.kafka_consumer:
+ #print message.value
+ #logging.info("%s",message.value)
+ process_notification_from_ceilometer(json.loads(message.value))
+ #print status
+ except Exception as e:
+ print "AUTO_SCALE Exception:",e
+'''
def read_notification_from_ceilometer_over_kafka(host,port,topic):
print "Kafka target" , host, port, topic
try :
consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (host,port)])
- consumer.assign([TopicPartition(topic,0)])
+ #consumer.assign([TopicPartition(topic,0)])
+ consumer.set_topic_partitions(topic)
consumer.seek_to_end()
for message in consumer:
- #print message.value
+ print message.value
#logging.debug("%s",message.value)
process_notification_from_ceilometer(json.loads(message.value))
#status = process_ceilometer_message(json.loads(message.value),message.value)
#print status
except Exception as e:
print "AUTO_SCALE Exception:",e
+'''
def read_notification_from_ceilometer(host,port):
udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
@@ -295,39 +332,68 @@
else:
app.run(host=webserver_host,port=webserver_port,debug=True, use_reloader=False)
+def parse_args(argv):
+ global xos_ip, kafka_ip
-def main():
+ try:
+ opts, args = getopt.getopt(argv,"k:x:",["kafka-ip=","xos-ip="])
+ except getopt.GetoptError:
+ print 'xos_auto_scaling_app.py --xos-ip=<IP> --kafka-ip=<>'
+ sys.exit(2)
+
+ for opt, arg in opts:
+ if opt in ("--xos-ip"):
+ xos_ip = arg
+ elif opt in ("--kafka-ip"):
+ kafka_ip = arg
+
+ if not xos_ip or not kafka_ip:
+ print 'xos_auto_scaling_app.py --xos-ip=<IP> --kafka-ip=<>'
+ sys.exit(2)
+
+
+def main(argv):
+ global xos_ip, kafka_ip
+
+ parse_args(argv)
monitoring_channel = acquire_xos_monitoring_channel()
if not monitoring_channel:
- print 'SRIKANTH: XOS monitoring_channel is not created... Create it before using this app'
+ print 'XOS monitoring_channel is not created... Create it before using this app'
return
loadAllXosTenantInfo()
loadAllXosInstanceInfo()
ceilometer_url = monitoring_channel['ceilometer_url']
if use_kafka:
- thread.start_new(read_notification_from_ceilometer_over_kafka, ("10.11.10.1","9092","auto-scale",))
- subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://10.11.10.1:9092?topic=auto-scale"}
+ url = "kafka://" + kafka_ip + ":9092?topic=auto-scale"
+ parse_target=netutils.urlsplit(url)
+ thread.start_new(read_notification_from_ceilometer_over_kafka, (parse_target,))
+ #thread.start_new(read_notification_from_ceilometer_over_kafka, (xos_ip,"9092","auto-scale",))
+ subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://" + kafka_ip + ":9092?topic=auto-scale"}
else:
thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
- subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
+ subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://" + xos_ip + ":12346"}
subscribe_url = ceilometer_url + 'v2/subscribe'
response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
- print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription status:%s' % response.text
+ print 'Ceilometer meter "cpu_util" Subscription status:%s' % response.text
#TODO: Fix the typo in 'sucess'
if (not 'sucess' in response.text) and (not 'already exists' in response.text):
- print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
+ print 'Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
return
- subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://10.11.10.1:12346"}
+ #TODO: Fix the need for different app_id for each subscription from the same application
+ if use_kafka:
+ subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"kafka://" + kafka_ip + ":9092?topic=auto-scale"}
+ else:
+ subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://" + xos_ip + ":12346"}
subscribe_url = ceilometer_url + 'v2/subscribe'
response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
- print 'SRIKANTH: Ceilometer meter "instance" Subscription status:%s' % response.text
+ print 'Ceilometer meter "instance" Subscription status:%s' % response.text
#TODO: Fix the typo in 'sucess'
if (not 'sucess' in response.text) and (not 'already exists' in response.text):
- print 'SRIKANTH: Ceilometer meter "instance"Subscription unsuccessful...Exiting'
+ print 'Ceilometer meter "instance"Subscription unsuccessful...Exiting'
return
periodic_cpu_threshold_evaluator()
periodic_print()
setup_webserver()
if __name__ == "__main__":
- main()
+ main(sys.argv[1:])