Even bus connected to Kafka publisher
Change-Id: I44e3d732f9689bfa7b46e274eb062b825645d450
diff --git a/voltha/main.py b/voltha/main.py
index 18ed7ad..6911ae4 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -33,6 +33,7 @@
from voltha.adapters.loader import AdapterLoader
from voltha.coordinator import Coordinator
from voltha.core.core import VolthaCore
+from voltha.northbound.diagnostics import Diagnostics
from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
from voltha.northbound.rest.health_check import init_rest_service
@@ -275,7 +276,11 @@
yield registry.register(
'kafka_proxy',
- KafkaProxy(self.args.consul, self.args.kafka)
+ KafkaProxy(
+ self.args.consul,
+ self.args.kafka,
+ config=self.config.get('kafka-proxy', {})
+ )
).start()
yield registry.register(
@@ -297,6 +302,11 @@
AdapterLoader(config=self.config.get('adapter_loader', {}))
).start()
+ yield registry.register(
+ 'diag',
+ Diagnostics(config=self.config.get('diagnostics', {}))
+ ).start()
+
self.log.info('started-internal-services')
except Exception as e:
diff --git a/voltha/northbound/diagnostics.py b/voltha/northbound/diagnostics.py
new file mode 100644
index 0000000..1395d2d
--- /dev/null
+++ b/voltha/northbound/diagnostics.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Voltha internal diagnostics
+"""
+import structlog
+import time
+import gc
+
+from simplejson import dumps
+from twisted.internet.defer import Deferred
+from twisted.internet.task import LoopingCall
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from common.event_bus import EventBusClient
+from voltha.registry import IComponent, registry
+
+log = structlog.get_logger()
+
+
+@implementer(IComponent)
+class Diagnostics(object):
+
+ def __init__(self, config):
+ self.config = config
+ self.periodic_check_interval = config.get('periodic_check_interval', 15)
+ self.periodic_checks = None
+ self.event_bus = EventBusClient()
+ self.instance_id = registry('main').get_args().instance_id
+
+ def start(self):
+ log.debug('starting')
+ self.periodic_checks = LoopingCall(self.run_periodic_checks)
+ self.periodic_checks.start(self.periodic_check_interval)
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ if self.periodic_checks is not None:
+ self.periodic_checks.stop()
+ log.info('stopped')
+
+ def run_periodic_checks(self):
+
+ ts = time.time(), # TODO switch to '2016.12.10T10:12:32Z' format?
+
+ backlog = dict(
+ ts=ts,
+ object='voltha.{}'.format(self.instance_id),
+ metric='internal-backlog',
+ value=len(gc.get_referrers(Deferred))
+ )
+
+ self.event_bus.publish('kpis', dumps(backlog))
+
+ log.debug('periodic-check', ts=ts)
+
+ Deferred()
diff --git a/voltha/northbound/kafka/event_bus_publisher.py b/voltha/northbound/kafka/event_bus_publisher.py
new file mode 100644
index 0000000..6cd7083
--- /dev/null
+++ b/voltha/northbound/kafka/event_bus_publisher.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A gateway between the internal event bus and the Kafka publisher proxy
+to publish select topics and messages posted to the Voltha-internal event
+bus toward the external world.
+"""
+import structlog
+
+from common.event_bus import EventBusClient
+
+log = structlog.get_logger()
+
+
+class EventBusPublisher(object):
+
+ def __init__(self, kafka_proxy, config):
+ self.kafka_proxy = kafka_proxy
+ self.config = config
+ self.topic_mappings = config.get('topic_mappings', {})
+ self.event_bus = EventBusClient()
+
+ def start(self):
+ log.debug('starting')
+ self._setup_subscriptions(self.topic_mappings)
+ log.info('started')
+ return self
+
+ def stop(self):
+ log.debug('stopping')
+ log.info('stopped')
+
+ def _setup_subscriptions(self, mappings):
+
+ for event_bus_topic, mapping in mappings.iteritems():
+
+ kafka_topic = mapping.get('kafka_topic', None)
+
+ if kafka_topic is None:
+ log.error('no-kafka-topic-in-config',
+ event_bus_topic=event_bus_topic,
+ mapping=mapping)
+ continue
+
+ self.event_bus.subscribe(
+ event_bus_topic,
+ # to avoid Python late-binding to the last registered
+ # kafka_topic, we force instant binding with the default arg
+ lambda _, m, k=kafka_topic: self.forward(k, m))
+
+ log.info('event-to-kafka', kafka_topic=kafka_topic,
+ event_bus_topic=event_bus_topic)
+
+ def forward(self, kafka_topic, msg):
+ self.kafka_proxy.send_message(kafka_topic, msg)
+
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 341f9b1..854531d 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -20,10 +20,11 @@
)
from afkak.producer import Producer as _kafkaProducer
from structlog import get_logger
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import implementer
from common.utils.consulhelpers import get_endpoint_from_consul
+from voltha.northbound.kafka.event_bus_publisher import EventBusPublisher
from voltha.registry import IComponent
log = get_logger()
@@ -36,9 +37,12 @@
"""
_kafka_instance = None
- def __init__(self, consul_endpoint='localhost:8500',
+ def __init__(self,
+ consul_endpoint='localhost:8500',
kafka_endpoint='localhost:9092',
- ack_timeout=1000, max_req_attempts=10):
+ ack_timeout=1000,
+ max_req_attempts=10,
+ config={}):
# return an exception if the object already exist
if KafkaProxy._kafka_instance:
@@ -51,15 +55,20 @@
self.max_req_attempts = max_req_attempts
self.consul_endpoint = consul_endpoint
self.kafka_endpoint = kafka_endpoint
+ self.config = config
self.kclient = None
self.kproducer = None
+ self.event_bus_publisher = None
+ @inlineCallbacks
def start(self):
log.debug('starting')
self._get_kafka_producer()
KafkaProxy._kafka_instance = self
+ self.event_bus_publisher = yield EventBusPublisher(
+ self, self.config.get('event_bus_publisher', {})).start()
log.info('started')
- return self
+ returnValue(self)
def stop(self):
log.debug('stopping')
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index e5d7e18..a8d8119 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -59,3 +59,16 @@
leader:
workload_track_error_to_prevent_flood: 1
members_track_error_to_prevent_flood: 1
+
+kafka-proxy:
+ event_bus_publisher:
+ topic_mappings:
+ 'model-change-events':
+ kafka_topic: 'voltha.events'
+ filters: [null]
+ 'alarms':
+ kafka_topic: 'voltha.alarms'
+ filters: [null]
+ 'kpis':
+ kafka_topic: 'voltha.kpis'
+ filters: [null]