Code changes includes:
1.Added ner file kafka_broker.py to communicate with kafka server.
2.Added code to accept xos_ip and kafka_ip ad arguments :
3.Replace hardcoaded ip with the ip retrived from the step 2(xos_ip).
Change-Id: I1254ecc5560b2ef781035aa86a778fac94d524a8
diff --git a/auto-scale/kafka_broker.py b/auto-scale/kafka_broker.py
new file mode 100644
index 0000000..22604ef
--- /dev/null
+++ b/auto-scale/kafka_broker.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+ def __init__(self, parsed_url):
+ self.kafka_client = None
+ self.kafka_server = None
+ self.kafka_consumer = None
+
+ self.host, self.port = netutils.parse_host_port(
+ parsed_url.netloc, default_port=9092)
+
+ self.local_queue = []
+
+ params = urlparse.parse_qs(parsed_url.query)
+ self.topic = params.get('topic', ['ceilometer'])[-1]
+ self.policy = params.get('policy', ['default'])[-1]
+ self.max_queue_length = int(params.get(
+ 'max_queue_length', [1024])[-1])
+ self.max_retry = int(params.get('max_retry', [100])[-1])
+
+ if self.policy in ['default', 'drop', 'queue']:
+ LOG.info(('Publishing policy set to %s') % self.policy)
+ else:
+ LOG.warn(('Publishing policy is unknown (%s) force to default')
+ % self.policy)
+ self.policy = 'default'
+
+ try:
+ self._get_client()
+ self._get_server()
+ except Exception as e:
+ LOG.exception("Failed to connect to Kafka service: %s", e)
+
+ def _get_client(self):
+ if not self.kafka_client:
+ self.kafka_client = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+
+ def _get_server(self):
+ if not self.kafka_server:
+ self.kafka_server = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+
+
+ def _send(self, data):
+ #for d in data:
+ try:
+ self.kafka_producer.send_messages(
+ self.topic, json.dumps(data))
+ except Exception as e:
+ LOG.exception(("Failed to send sample data: %s"), e)
+ raise