Even bus connected to Kafka publisher

Change-Id: I44e3d732f9689bfa7b46e274eb062b825645d450
diff --git a/voltha/main.py b/voltha/main.py
index 18ed7ad..6911ae4 100755
--- a/voltha/main.py
+++ b/voltha/main.py
@@ -33,6 +33,7 @@
 from voltha.adapters.loader import AdapterLoader
 from voltha.coordinator import Coordinator
 from voltha.core.core import VolthaCore
+from voltha.northbound.diagnostics import Diagnostics
 from voltha.northbound.grpc.grpc_server import VolthaGrpcServer
 from voltha.northbound.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
 from voltha.northbound.rest.health_check import init_rest_service
@@ -275,7 +276,11 @@
 
             yield registry.register(
                 'kafka_proxy',
-                KafkaProxy(self.args.consul, self.args.kafka)
+                KafkaProxy(
+                    self.args.consul,
+                    self.args.kafka,
+                    config=self.config.get('kafka-proxy', {})
+                )
             ).start()
 
             yield registry.register(
@@ -297,6 +302,11 @@
                 AdapterLoader(config=self.config.get('adapter_loader', {}))
             ).start()
 
+            yield registry.register(
+                'diag',
+                Diagnostics(config=self.config.get('diagnostics', {}))
+            ).start()
+
             self.log.info('started-internal-services')
 
         except Exception as e:
diff --git a/voltha/northbound/diagnostics.py b/voltha/northbound/diagnostics.py
new file mode 100644
index 0000000..1395d2d
--- /dev/null
+++ b/voltha/northbound/diagnostics.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Voltha internal diagnostics
+"""
+import structlog
+import time
+import gc
+
+from simplejson import dumps
+from twisted.internet.defer import Deferred
+from twisted.internet.task import LoopingCall
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from common.event_bus import EventBusClient
+from voltha.registry import IComponent, registry
+
+log = structlog.get_logger()
+
+
+@implementer(IComponent)
+class Diagnostics(object):
+
+    def __init__(self, config):
+        self.config = config
+        self.periodic_check_interval = config.get('periodic_check_interval', 15)
+        self.periodic_checks = None
+        self.event_bus = EventBusClient()
+        self.instance_id = registry('main').get_args().instance_id
+
+    def start(self):
+        log.debug('starting')
+        self.periodic_checks = LoopingCall(self.run_periodic_checks)
+        self.periodic_checks.start(self.periodic_check_interval)
+        log.info('started')
+        return self
+
+    def stop(self):
+        log.debug('stopping')
+        if self.periodic_checks is not None:
+            self.periodic_checks.stop()
+        log.info('stopped')
+
+    def run_periodic_checks(self):
+
+        ts = time.time(),  # TODO switch to '2016.12.10T10:12:32Z' format?
+
+        backlog = dict(
+            ts=ts,
+            object='voltha.{}'.format(self.instance_id),
+            metric='internal-backlog',
+            value=len(gc.get_referrers(Deferred))
+        )
+
+        self.event_bus.publish('kpis', dumps(backlog))
+
+        log.debug('periodic-check', ts=ts)
+
+        Deferred()
diff --git a/voltha/northbound/kafka/event_bus_publisher.py b/voltha/northbound/kafka/event_bus_publisher.py
new file mode 100644
index 0000000..6cd7083
--- /dev/null
+++ b/voltha/northbound/kafka/event_bus_publisher.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A gateway between the internal event bus and the Kafka publisher proxy
+to publish select topics and messages posted to the Voltha-internal event
+bus toward the external world.
+"""
+import structlog
+
+from common.event_bus import EventBusClient
+
+log = structlog.get_logger()
+
+
+class EventBusPublisher(object):
+
+    def __init__(self, kafka_proxy, config):
+        self.kafka_proxy = kafka_proxy
+        self.config = config
+        self.topic_mappings = config.get('topic_mappings', {})
+        self.event_bus = EventBusClient()
+
+    def start(self):
+        log.debug('starting')
+        self._setup_subscriptions(self.topic_mappings)
+        log.info('started')
+        return self
+
+    def stop(self):
+        log.debug('stopping')
+        log.info('stopped')
+
+    def _setup_subscriptions(self, mappings):
+
+        for event_bus_topic, mapping in mappings.iteritems():
+
+            kafka_topic = mapping.get('kafka_topic', None)
+
+            if kafka_topic is None:
+                log.error('no-kafka-topic-in-config',
+                          event_bus_topic=event_bus_topic,
+                          mapping=mapping)
+                continue
+
+            self.event_bus.subscribe(
+                event_bus_topic,
+                # to avoid Python late-binding to the last registered
+                # kafka_topic, we force instant binding with the default arg
+                lambda _, m, k=kafka_topic: self.forward(k, m))
+
+            log.info('event-to-kafka', kafka_topic=kafka_topic,
+                     event_bus_topic=event_bus_topic)
+
+    def forward(self, kafka_topic, msg):
+        self.kafka_proxy.send_message(kafka_topic, msg)
+
diff --git a/voltha/northbound/kafka/kafka_proxy.py b/voltha/northbound/kafka/kafka_proxy.py
index 341f9b1..854531d 100644
--- a/voltha/northbound/kafka/kafka_proxy.py
+++ b/voltha/northbound/kafka/kafka_proxy.py
@@ -20,10 +20,11 @@
 )
 from afkak.producer import Producer as _kafkaProducer
 from structlog import get_logger
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
 from zope.interface import implementer
 
 from common.utils.consulhelpers import get_endpoint_from_consul
+from voltha.northbound.kafka.event_bus_publisher import EventBusPublisher
 from voltha.registry import IComponent
 
 log = get_logger()
@@ -36,9 +37,12 @@
     """
     _kafka_instance = None
 
-    def __init__(self, consul_endpoint='localhost:8500',
+    def __init__(self,
+                 consul_endpoint='localhost:8500',
                  kafka_endpoint='localhost:9092',
-                 ack_timeout=1000, max_req_attempts=10):
+                 ack_timeout=1000,
+                 max_req_attempts=10,
+                 config={}):
 
         # return an exception if the object already exist
         if KafkaProxy._kafka_instance:
@@ -51,15 +55,20 @@
         self.max_req_attempts = max_req_attempts
         self.consul_endpoint = consul_endpoint
         self.kafka_endpoint = kafka_endpoint
+        self.config = config
         self.kclient = None
         self.kproducer = None
+        self.event_bus_publisher = None
 
+    @inlineCallbacks
     def start(self):
         log.debug('starting')
         self._get_kafka_producer()
         KafkaProxy._kafka_instance = self
+        self.event_bus_publisher = yield EventBusPublisher(
+            self, self.config.get('event_bus_publisher', {})).start()
         log.info('started')
-        return self
+        returnValue(self)
 
     def stop(self):
         log.debug('stopping')
diff --git a/voltha/voltha.yml b/voltha/voltha.yml
index e5d7e18..a8d8119 100644
--- a/voltha/voltha.yml
+++ b/voltha/voltha.yml
@@ -59,3 +59,16 @@
 leader:
     workload_track_error_to_prevent_flood: 1
     members_track_error_to_prevent_flood: 1
+
+kafka-proxy:
+    event_bus_publisher:
+        topic_mappings:
+            'model-change-events':
+                kafka_topic: 'voltha.events'
+                filters:     [null]
+            'alarms':
+                kafka_topic: 'voltha.alarms'
+                filters:     [null]
+            'kpis':
+                kafka_topic: 'voltha.kpis'
+                filters:     [null]