Kafka transport support for Auto-scaling application
diff --git a/applications/auto-scale/xos_auto_scaling_app.py b/applications/auto-scale/xos_auto_scaling_app.py
index ee51bb3..848ccc0 100644
--- a/applications/auto-scale/xos_auto_scaling_app.py
+++ b/applications/auto-scale/xos_auto_scaling_app.py
@@ -15,8 +15,14 @@
 xos_tenant_info_map = {}
 xos_instances_info_map = {}
 
-UDP_IP = "0.0.0.0"
-UDP_PORT = 12346
+use_kafka = True
+
+if use_kafka:
+   import kafka
+   from kafka import TopicPartition
+else:
+   UDP_IP = "0.0.0.0"
+   UDP_PORT = 12346
 
 @app.route('/autoscaledata',methods=['GET'])
 def autoscaledata():
@@ -198,6 +204,21 @@
                   projects_map[project]['alarm'] = INITIAL_STATE
      threading.Timer(20, periodic_cpu_threshold_evaluator).start()
 
+def read_notification_from_ceilometer_over_kafka(host,port,topic):
+    print "Kafka target" , host, port, topic
+    try :
+        consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (host,port)])
+        consumer.assign([TopicPartition(topic,0)])
+        consumer.seek_to_end()
+        for message in consumer:
+            #print message.value
+            #logging.debug("%s",message.value)
+            process_notification_from_ceilometer(json.loads(message.value))
+            #status = process_ceilometer_message(json.loads(message.value),message.value)
+            #print status
+    except Exception as e:
+        print "AUTO_SCALE Exception:",e
+
 def read_notification_from_ceilometer(host,port):
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
    udp.bind((host, port))
@@ -206,19 +227,24 @@
       data, source = udp.recvfrom(64000)
       try:
          sample = msgpack.loads(data, encoding='utf-8')
+         process_notification_from_ceilometer(sample)
+      except Exception as e:
+         print e
+
+def process_notification_from_ceilometer(sample):
          if sample['counter_name'] == 'instance':
              if 'delete' in sample['resource_metadata']['event_type']:
 	          xosTenantInfo = getXosTenantInfo(sample['project_id'])
                   xosResourceInfo = getXosInstanceInfo(sample['resource_id'])
                   print "SRIKANTH: Project %s Instance %s is getting deleted" % (xosTenantInfo['slice'] if xosTenantInfo['slice'] else sample['project_id'],xosResourceInfo) 
                   if sample['project_id'] not in projects_map.keys():
-                       continue
+                       return
                   if sample['resource_id'] not in projects_map[sample['project_id']]['resources'].keys():
-                       continue
+                       return
                   projects_map[sample['project_id']]['resources'].pop(sample['resource_id'], None)
-             continue
+             return
          elif sample['counter_name'] != 'cpu_util':
-              continue
+              return
          if sample['project_id'] not in projects_map.keys():
               projects_map[sample['project_id']] = {}
 	      xosTenantInfo = getXosTenantInfo(sample['project_id'])
@@ -244,8 +270,6 @@
          deque = collections.deque(samples_queue, maxlen=10)
          deque.append(sample)
          resource_map[sample['resource_id']]['queue'] = list(deque)
-      except Exception as e:
-         print e
 
 def setup_webserver():
     try:
@@ -279,9 +303,13 @@
         return
    loadAllXosTenantInfo()
    loadAllXosInstanceInfo()
-   thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
    ceilometer_url = monitoring_channel['ceilometer_url']
-   subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
+   if use_kafka:
+       thread.start_new(read_notification_from_ceilometer_over_kafka, ("10.11.10.1","9092","auto-scale",))
+       subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://10.11.10.1:9092?topic=auto-scale"}
+   else:
+       thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
+       subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
    subscribe_url = ceilometer_url + 'v2/subscribe'
    response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
    print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription status:%s' % response.text