Code changes includes:
1.Added ner file kafka_broker.py to communicate with kafka server.
2.Added code to accept xos_ip and kafka_ip ad arguments :
3.Replace hardcoaded ip with the ip retrived from the step 2(xos_ip).

Change-Id: I1254ecc5560b2ef781035aa86a778fac94d524a8
diff --git a/auto-scale/README.md b/auto-scale/README.md
index cb11620..90e6c2c 100644
--- a/auto-scale/README.md
+++ b/auto-scale/README.md
@@ -7,9 +7,21 @@
 
 #How to
 
-Ensure that the CORD config is installed and then run:
+Before running the application, install the following packages
 
-`python xos_auto_scaling_app.py`
+sudo apt-get update
+sudo apt-get install python-pip python-dev build-essential
+sudo pip install --upgrade pip
+sudo pip install --upgrade virtualenv
+sudo apt-get install msgpack-python
+sudo pip install Flask
+sudo pip install kafka-python
+sudo pip install oslo.utils
+sudo pip install Babel (if required)
+
+Ensure that XOS REST IP address is accessible from where this application is installed and then run:
+
+`python xos_auto_scaling_app.py --xos-ip=<xos-ip> --kafka-ip=<monitoringservice-kafka-ip>`
 
 This command will start the autoscaling application and start REST server on 9991 port.
 
@@ -36,36 +48,6 @@
       sinks:
           - cpu_sink
 ```
-3b) Also ensure the publisher in "cpu_sink" contains the following URL "udp://"IP of Ceilometer PUB-SUB":5004" as shown below.<br/>
-```
-    - name: cpu_sink
-      transformers:
-          - name: "rate_of_change"
-            parameters:
-                target:
-                    name: "cpu_util"
-                    unit: "%"
-                    type: "gauge"
-                    scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
-      publishers:
-          - notifier://
-```
-
-To:
-```
-    - name: cpu_sink
-      transformers:
-          - name: "rate_of_change"
-            parameters:
-                target:
-                    name: "cpu_util"
-                    unit: "%"
-                    type: "gauge"
-                    scale: "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
-      publishers:
-          - notifier://
-          - udp://10.11.10.1:5004
-```
 4) sudo service ceilometer-agent-compute restart<br/>
 5) With this change, the autoscaling application should start receiving the CPU utilization samples every 60 seconds<br/>
 6) The REST API to retrieve the cpu utilization samples from autoscaling application: http://<app_ip>:9991/autoscaledata 
diff --git a/auto-scale/kafka_broker.py b/auto-scale/kafka_broker.py
new file mode 100644
index 0000000..22604ef
--- /dev/null
+++ b/auto-scale/kafka_broker.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+    def __init__(self, parsed_url):
+        self.kafka_client = None
+        self.kafka_server = None
+        self.kafka_consumer = None
+
+        self.host, self.port = netutils.parse_host_port(
+            parsed_url.netloc, default_port=9092)
+
+        self.local_queue = []
+
+        params = urlparse.parse_qs(parsed_url.query)
+        self.topic = params.get('topic', ['ceilometer'])[-1]
+        self.policy = params.get('policy', ['default'])[-1]
+        self.max_queue_length = int(params.get(
+            'max_queue_length', [1024])[-1])
+        self.max_retry = int(params.get('max_retry', [100])[-1])
+
+        if self.policy in ['default', 'drop', 'queue']:
+            LOG.info(('Publishing policy set to %s') % self.policy)
+        else:
+            LOG.warn(('Publishing policy is unknown (%s) force to default')
+                     % self.policy)
+            self.policy = 'default'
+
+        try:
+            self._get_client()
+            self._get_server()
+        except Exception as e:
+            LOG.exception("Failed to connect to Kafka service: %s", e)
+
+    def _get_client(self):
+        if not self.kafka_client:
+            self.kafka_client = kafka.KafkaClient(
+                "%s:%s" % (self.host, self.port))
+            self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+    
+    def _get_server(self):
+        if not self.kafka_server:
+           self.kafka_server = kafka.KafkaClient(
+                "%s:%s" % (self.host, self.port))
+           self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+
+
+    def _send(self, data):
+        #for d in data:
+            try:
+                self.kafka_producer.send_messages(
+                    self.topic, json.dumps(data))
+            except Exception as e:
+                LOG.exception(("Failed to send sample data: %s"), e)
+                raise
diff --git a/auto-scale/xos_auto_scaling_app.py b/auto-scale/xos_auto_scaling_app.py
index 848ccc0..9ec0103 100644
--- a/auto-scale/xos_auto_scaling_app.py
+++ b/auto-scale/xos_auto_scaling_app.py
@@ -1,3 +1,4 @@
+import sys, getopt
 import socket
 import requests
 import urllib2
@@ -15,11 +16,25 @@
 xos_tenant_info_map = {}
 xos_instances_info_map = {}
 
+#gethostname() -- return the current hostname
+#gethostbyname() -- map a hostname to its IP number
+hostname = None
+xos_ip = None
+kafka_ip = xos_ip
+xos_port = 9000
+
+print "----------------------------------------------------------------------"
+print "xos_ip:",xos_ip
+print "kfka_ip:",kafka_ip
+print "----------------------------------------------------------------------"
+
 use_kafka = True
 
 if use_kafka:
    import kafka
-   from kafka import TopicPartition
+   import kafka_broker
+   from oslo_utils import netutils
+   #from kafka import TopicPartition
 else:
    UDP_IP = "0.0.0.0"
    UDP_PORT = 12346
@@ -31,30 +46,32 @@
     return response
 
 def acquire_xos_monitoring_channel():
-    url = "http://ctl:9999/xoslib/monitoringchannel/"
+    global xos_ip
+
+    url = "http://" + xos_ip + ":" + str(xos_port) +"/api/tenant/monitoring/monitoringchannel/"
+    print url
     admin_auth=("padmin@vicci.org", "letmein")   # use your XOS username and password
     monitoring_channels = requests.get(url, auth=admin_auth).json()
     ceilometer_url = None
     if not monitoring_channels:
-        print 'SRIKANTH: No monitoring channels for this user...'
+        print 'No monitoring channels for this user...'
         return None
     else:
         monitoring_channel = monitoring_channels[0]
     while not monitoring_channel['ceilometer_url']:
-         print 'SRIKANTH: Waiting for monitoring channel create'
+         print 'Waiting for monitoring channel create'
          sleep(0.5)
          monitoring_channel = requests.get(url, auth=admin_auth).json()[0]
-    #TODO: Wait until URL is completely UP
     while True:
-        print 'SRIKANTH: Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
+        print 'Waiting for ceilometer proxy URL %s is available' % monitoring_channel['ceilometer_url']
         try:
             response = urllib2.urlopen(monitoring_channel['ceilometer_url'],timeout=1)
             break
         except urllib2.HTTPError, e:
-            print 'SRIKANTH: HTTP error %s' % e.reason
+            print 'HTTP error %s' % e.reason
             break
         except urllib2.URLError, e:
-            print 'SRIKANTH: URL error %s' % e.reason
+            print 'URL error %s' % e.reason
             pass
     return monitoring_channel
 
@@ -84,8 +101,10 @@
 SCALE_DOWN_ALARM = 'scale_down'
 
 def loadAllXosTenantInfo():
-    print "SRIKANTH: Loading all XOS tenant info"
-    url = "http://ctl:9999/xos/controllerslices/"
+    global xos_ip
+
+    print "Loading all XOS tenant info"
+    url = "http://" + xos_ip + ":" + str(xos_port) +"/xos/controllerslices/"
     admin_auth=("padmin@vicci.org", "letmein")   # use your XOS username and password
     controller_slices = requests.get(url, auth=admin_auth).json()
     for cslice in controller_slices:
@@ -100,8 +119,10 @@
          print "SRIKANTH: Project: %s Service:%s Slice:%s" % (cslice['tenant_id'],service_name,slice_name)
 
 def loadAllXosInstanceInfo():
-    print "SRIKANTH: Loading all XOS instance info"
-    url = "http://ctl:9999/xos/instances/"
+    global xos_ip
+
+    print "Loading all XOS instance info"
+    url = "http://" + xos_ip + ":" + str(xos_port) +"/xos/instances/"
     admin_auth=("padmin@vicci.org", "letmein")   # use your XOS username and password
     xos_instances = requests.get(url, auth=admin_auth).json()
     for instance in xos_instances:
@@ -115,7 +136,7 @@
         loadAllXosTenantInfo()
         xos_tenant_info = xos_tenant_info_map.get(project, None)
         if not xos_tenant_info:
-            print "SRIKANTH: Project %s has no associated XOS slice" % project
+            print "Project %s has no associated XOS slice" % project
         return xos_tenant_info
 
 def getXosInstanceInfo(resource):
@@ -126,29 +147,31 @@
         loadAllXosInstanceInfo()
         xos_instance_info = xos_instances_info_map.get(resource, None)
         if not xos_instance_info:
-            print "SRIKANTH: Resource %s has no associated XOS instance" % project
+            print "Resource %s has no associated XOS instance" % project
         return xos_instance_info
 
 def handle_adjust_scale(project, adjust):
+    global xos_ip
+
     if (adjust != 'up') and (adjust != 'down'):
         print "SRIKANTH: Invalid adjust value %s " % adjust
         return
     current_instances = len(projects_map[project]['resources'].keys())
     if (current_instances <=1 and adjust == 'down'):
-        print "SRIKANTH: %s is running with already minimum instances and can not scale down further " % project
+        print "%s is running with already minimum instances and can not scale down further " % project
         return
     if (current_instances >=2 and adjust == 'up'):
-        print "SRIKANTH: %s is running with already maximum instances and can not scale up further " % project
+        print "%s is running with already maximum instances and can not scale up further " % project
         return
     #xos_tenant = getXosTenantInfo(project)
     xos_service = projects_map[project]['service']
     xos_slice = projects_map[project]['slice']
     if not xos_service or not xos_slice: 
-        print "SRIKANTH: Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
+        print "Can not handle adjust_scale for Project %s because not associated with any service or slice" % project
         return
-    print "SRIKANTH: SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
+    print "SCALE %s for Project %s, Slice=%s, Service=%s from current=%d to new=%d" % (adjust, project, xos_slice, xos_service, current_instances, current_instances+1 if (adjust=='up') else current_instances-1)
     query_params = {'service':xos_service, 'slice_hint':xos_slice, 'scale':current_instances+1 if (adjust=='up') else current_instances-1}
-    url = "http://ctl:9999/xoslib/serviceadjustscale/"
+    url = "http://" + xos_ip + ":" + str(xos_port) + "/xoslib/serviceadjustscale/"
     admin_auth=("padmin@vicci.org", "letmein")   # use your XOS username and password
     response = requests.get(url, params=query_params, auth=admin_auth).json()
     print "SRIKANTH: XOS adjust_scale response: %s" % response
@@ -204,20 +227,34 @@
                   projects_map[project]['alarm'] = INITIAL_STATE
      threading.Timer(20, periodic_cpu_threshold_evaluator).start()
 
+def read_notification_from_ceilometer_over_kafka(parse_target):
+    print("Kafka target:",parse_target)
+    try :
+        kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+        for message in kafka_publisher.kafka_consumer:
+            #print message.value
+            #logging.info("%s",message.value)
+            process_notification_from_ceilometer(json.loads(message.value))
+            #print status
+    except Exception as e:
+        print "AUTO_SCALE Exception:",e
+'''
 def read_notification_from_ceilometer_over_kafka(host,port,topic):
     print "Kafka target" , host, port, topic
     try :
         consumer=kafka.KafkaConsumer(bootstrap_servers=["%s:%s" % (host,port)])
-        consumer.assign([TopicPartition(topic,0)])
+        #consumer.assign([TopicPartition(topic,0)])
+        consumer.set_topic_partitions(topic)    
         consumer.seek_to_end()
         for message in consumer:
-            #print message.value
+            print message.value
             #logging.debug("%s",message.value)
             process_notification_from_ceilometer(json.loads(message.value))
             #status = process_ceilometer_message(json.loads(message.value),message.value)
             #print status
     except Exception as e:
         print "AUTO_SCALE Exception:",e
+'''
 
 def read_notification_from_ceilometer(host,port):
    udp = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) # UDP
@@ -295,39 +332,68 @@
     else: 
         app.run(host=webserver_host,port=webserver_port,debug=True, use_reloader=False)
 
+def parse_args(argv):
+   global xos_ip, kafka_ip
 
-def main():
+   try:
+      opts, args = getopt.getopt(argv,"k:x:",["kafka-ip=","xos-ip="])
+   except getopt.GetoptError:
+      print 'xos_auto_scaling_app.py --xos-ip=<IP> --kafka-ip=<>'
+      sys.exit(2)
+
+   for opt, arg in opts:
+      if opt in ("--xos-ip"):
+         xos_ip = arg
+      elif opt in ("--kafka-ip"):
+         kafka_ip = arg
+
+   if not xos_ip or not kafka_ip:
+      print 'xos_auto_scaling_app.py --xos-ip=<IP> --kafka-ip=<>'
+      sys.exit(2)
+
+
+def main(argv):
+   global xos_ip, kafka_ip
+
+   parse_args(argv)
    monitoring_channel = acquire_xos_monitoring_channel()
    if not monitoring_channel:
-        print 'SRIKANTH: XOS monitoring_channel is not created... Create it before using this app'
+        print 'XOS monitoring_channel is not created... Create it before using this app'
         return
    loadAllXosTenantInfo()
    loadAllXosInstanceInfo()
    ceilometer_url = monitoring_channel['ceilometer_url']
    if use_kafka:
-       thread.start_new(read_notification_from_ceilometer_over_kafka, ("10.11.10.1","9092","auto-scale",))
-       subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://10.11.10.1:9092?topic=auto-scale"}
+       url = "kafka://" + kafka_ip + ":9092?topic=auto-scale"
+       parse_target=netutils.urlsplit(url) 
+       thread.start_new(read_notification_from_ceilometer_over_kafka, (parse_target,))
+       #thread.start_new(read_notification_from_ceilometer_over_kafka, (xos_ip,"9092","auto-scale",))
+       subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"kafka://" + kafka_ip + ":9092?topic=auto-scale"}
    else:
        thread.start_new(read_notification_from_ceilometer,(UDP_IP,UDP_PORT,))
-       subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://10.11.10.1:12346"}
+       subscribe_data = {"sub_info":"cpu_util","app_id":"xos_auto_scale","target":"udp://" + xos_ip + ":12346"}
    subscribe_url = ceilometer_url + 'v2/subscribe'
    response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
-   print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription status:%s' % response.text
+   print 'Ceilometer meter "cpu_util" Subscription status:%s' % response.text
    #TODO: Fix the typo in 'sucess'
    if (not 'sucess' in response.text) and (not 'already exists' in response.text):
-       print 'SRIKANTH: Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
+       print 'Ceilometer meter "cpu_util" Subscription unsuccessful...Exiting'
        return
-   subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://10.11.10.1:12346"}
+   #TODO: Fix the need for different app_id for each subscription from the same application
+   if use_kafka:
+       subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"kafka://" + kafka_ip + ":9092?topic=auto-scale"}
+   else:
+       subscribe_data = {"sub_info":"instance","app_id":"xos_auto_scale2","target":"udp://" + xos_ip + ":12346"}
    subscribe_url = ceilometer_url + 'v2/subscribe'
    response = requests.post(subscribe_url, data=json.dumps(subscribe_data))
-   print 'SRIKANTH: Ceilometer meter "instance" Subscription status:%s' % response.text
+   print 'Ceilometer meter "instance" Subscription status:%s' % response.text
    #TODO: Fix the typo in 'sucess'
    if (not 'sucess' in response.text) and (not 'already exists' in response.text):
-       print 'SRIKANTH: Ceilometer meter "instance"Subscription unsuccessful...Exiting'
+       print 'Ceilometer meter "instance"Subscription unsuccessful...Exiting'
        return
    periodic_cpu_threshold_evaluator()
    periodic_print()
    setup_webserver()
 
 if __name__ == "__main__":
-   main()
+   main(sys.argv[1:])