Adding CORD specific ceilometer changes to monitoring repository
- ceilometer custom notification plugins for ONOS, vSG, vOLT and Infra layers
- ceilometer publish/subscribe module
- ceilometer dynamic pipeline config module
- ceilometer UDP proxy
- ceilometer Custom Image(ceilometer -v2 -v3 versions,kafka_installer,startup scripts)
Change-Id: Ie2ab8ce89cdadbd1fb4dc54ee15e46f8cc8c4c18
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py
new file mode 100644
index 0000000..ce495fc
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_broker.py
@@ -0,0 +1,164 @@
+#
+# Copyright 2015 Cisco Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import json
+
+import kafka
+from oslo_config import cfg
+from oslo_utils import netutils
+from six.moves.urllib import parse as urlparse
+import logging as LOG
+
+
+class KafkaBrokerPublisher():
+ def __init__(self, parsed_url):
+ self.kafka_client = None
+ self.kafka_server = None
+
+ self.host, self.port = netutils.parse_host_port(
+ parsed_url.netloc, default_port=9092)
+
+ self.local_queue = []
+
+ params = urlparse.parse_qs(parsed_url.query)
+ self.topic = params.get('topic', ['ceilometer'])[-1]
+ self.policy = params.get('policy', ['default'])[-1]
+ self.max_queue_length = int(params.get(
+ 'max_queue_length', [1024])[-1])
+ self.max_retry = int(params.get('max_retry', [100])[-1])
+
+ if self.policy in ['default', 'drop', 'queue']:
+ LOG.info(('Publishing policy set to %s') % self.policy)
+ else:
+ LOG.warn(('Publishing policy is unknown (%s) force to default')
+ % self.policy)
+ self.policy = 'default'
+
+ try:
+ self._get_client()
+ self._get_server()
+ except Exception as e:
+ LOG.exception("Failed to connect to Kafka service: %s", e)
+
+ def publish_samples(self, context, samples):
+ """Send a metering message for kafka broker.
+
+ :param context: Execution context from the service or RPC call
+ :param samples: Samples from pipeline after transformation
+ """
+ samples_list = [
+ utils.meter_message_from_counter(
+ sample, cfg.CONF.publisher.telemetry_secret)
+ for sample in samples
+ ]
+
+ self.local_queue.append(samples_list)
+
+ try:
+ self._check_kafka_connection()
+ except Exception as e:
+ raise e
+
+ self.flush()
+
+ def flush(self):
+ queue = self.local_queue
+ self.local_queue = self._process_queue(queue)
+ if self.policy == 'queue':
+ self._check_queue_length()
+
+ def publish_events(self, context, events):
+ """Send an event message for kafka broker.
+
+ :param context: Execution context from the service or RPC call
+ :param events: events from pipeline after transformation
+ """
+ events_list = [utils.message_from_event(
+ event, cfg.CONF.publisher.telemetry_secret) for event in events]
+
+ self.local_queue.append(events_list)
+
+ try:
+ self._check_kafka_connection()
+ except Exception as e:
+ raise e
+
+ self.flush()
+
+ def _process_queue(self, queue):
+ current_retry = 0
+ while queue:
+ data = queue[0]
+ try:
+ self._send(data)
+ except Exception:
+ LOG.warn(("Failed to publish %d datum"),
+ sum([len(d) for d in queue]))
+ if self.policy == 'queue':
+ return queue
+ elif self.policy == 'drop':
+ return []
+ current_retry += 1
+ if current_retry >= self.max_retry:
+ self.local_queue = []
+ LOG.exception(("Failed to retry to send sample data "
+ "with max_retry times"))
+ raise
+ else:
+ queue.pop(0)
+ return []
+
+ def _check_queue_length(self):
+ queue_length = len(self.local_queue)
+ if queue_length > self.max_queue_length > 0:
+ diff = queue_length - self.max_queue_length
+ self.local_queue = self.local_queue[diff:]
+ LOG.warn(("Kafka Publisher max local queue length is exceeded, "
+ "dropping %d oldest data") % diff)
+
+ def _check_kafka_connection(self):
+ try:
+ self._get_client()
+ except Exception as e:
+ LOG.exception(_LE("Failed to connect to Kafka service: %s"), e)
+
+ if self.policy == 'queue':
+ self._check_queue_length()
+ else:
+ self.local_queue = []
+ raise Exception('Kafka Client is not available, '
+ 'please restart Kafka client')
+
+ def _get_client(self):
+ if not self.kafka_client:
+ self.kafka_client = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_producer = kafka.SimpleProducer(self.kafka_client)
+
+ def _get_server(self):
+ if not self.kafka_server:
+ self.kafka_server = kafka.KafkaClient(
+ "%s:%s" % (self.host, self.port))
+ self.kafka_consumer = kafka.KafkaConsumer(self.topic,bootstrap_servers = ["%s:%s" % (self.host, self.port)])
+
+
+ def _send(self, data):
+ #for d in data:
+ try:
+ self.kafka_producer.send_messages(
+ self.topic, json.dumps(data))
+ except Exception as e:
+ LOG.exception(("Failed to send sample data: %s"), e)
+ raise
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py
new file mode 100644
index 0000000..4d7cff0
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/kafka_client.py
@@ -0,0 +1,20 @@
+import kafka
+import kafka_broker
+from oslo_utils import netutils
+import logging
+
+def read_notification_from_ceilometer_over_kafka(parse_target):
+ logging.info("Kafka target:%s",parse_target)
+ try :
+ kafka_publisher=kafka_broker.KafkaBrokerPublisher(parse_target)
+ for message in kafka_publisher.kafka_consumer:
+ #print message.value
+ logging.info("%s",message.value)
+ #print status
+ except Exception as e:
+ logging.error("Error in Kafka setup:%s ",e.__str__())
+
+ceilometer_client="kafka://10.11.10.1:9092?topic=test"
+logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s %(message)s',filename='kafka_client.log',level=logging.INFO)
+parse_target=netutils.urlsplit(ceilometer_client)
+read_notification_from_ceilometer_over_kafka(parse_target)
diff --git a/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py
new file mode 100644
index 0000000..1c30d63
--- /dev/null
+++ b/xos/synchronizer/ceilometer/ceilometer_pub_sub/test/udp_client_cpu.py
@@ -0,0 +1,22 @@
+import socket
+import msgpack
+from oslo_utils import units
+import logging
+UDP_IP = "10.11.10.1"
+UDP_PORT = 5006
+
+logging.basicConfig(format='%(asctime)s %(filename)s %(levelname)s %(message)s',filename='udp_client.log',level=logging.INFO)
+udp = socket.socket(socket.AF_INET, # Internet
+ socket.SOCK_DGRAM) # UDP
+udp.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+udp.bind((UDP_IP, UDP_PORT))
+while True:
+ data, source = udp.recvfrom(64 * units.Ki)
+ #print data
+ #try:
+ sample = msgpack.loads(data, encoding='utf-8')
+ logging.info("%s",sample)
+ print sample
+ #except Exception:
+ #logging.info("%s",sample)
+ # print ("UDP: Cannot decode data sent by %s"), source