VOL-1451 Initial checkin of openonu build

Produced docker container capable of building and running
openonu/brcm_openonci_onu.  Copied over current onu code
and resolved all imports by copying into the local source tree.

Change-Id: Ib9785d37afc65b7d32ecf74aee2456352626e2b6
diff --git a/python/kafka/__init__.py b/python/kafka/__init__.py
new file mode 100644
index 0000000..58aca1e
--- /dev/null
+++ b/python/kafka/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/python/kafka/adapter_proxy.py b/python/kafka/adapter_proxy.py
new file mode 100644
index 0000000..ddb11da
--- /dev/null
+++ b/python/kafka/adapter_proxy.py
@@ -0,0 +1,110 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between adapters.
+"""
+
+import structlog
+from uuid import uuid4
+from twisted.internet.defer import inlineCallbacks, returnValue
+from container_proxy import ContainerProxy
+from voltha.protos import third_party
+from voltha.protos.inter_container_pb2 import InterAdapterHeader, \
+    InterAdapterMessage
+import time
+
+_ = third_party
+log = structlog.get_logger()
+
+
+class AdapterProxy(ContainerProxy):
+
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        super(AdapterProxy, self).__init__(kafka_proxy,
+                                           core_topic,
+                                           my_listening_topic)
+
+    def _to_string(self, unicode_str):
+        if unicode_str is not None:
+            if type(unicode_str) == unicode:
+                return unicode_str.encode('ascii', 'ignore')
+            else:
+                return unicode_str
+        else:
+            return ""
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def send_inter_adapter_message(self,
+                                   msg,
+                                   type,
+                                   from_adapter,
+                                   to_adapter,
+                                   to_device_id=None,
+                                   proxy_device_id=None,
+                                   message_id=None):
+        """
+        Sends a message directly to an adapter. This is typically used to send
+        proxied messages from one adapter to another.  An initial ACK response
+        is sent back to the invoking adapter.  If there is subsequent response
+        to be sent back (async) then the adapter receiving this request will
+        use this same API to send back the async response.
+        :param msg : GRPC message to send
+        :param type : InterAdapterMessageType of the message to send
+        :param from_adapter: Name of the adapter making the request.
+        :param to_adapter: Name of the remote adapter.
+        :param to_device_id: The ID of the device for to the message is
+        intended. if it's None then the message is not intended to a specific
+        device.  Its interpretation is adapter specific.
+        :param proxy_device_id: The ID of the device which will proxy that
+        message. If it's None then there is no specific device to proxy the
+        message.  Its interpretation is adapter specific.
+        :param message_id: A unique number for this transaction that the
+        adapter may use to correlate a request and an async response.
+        """
+
+        try:
+            # validate params
+            assert msg
+            assert from_adapter
+            assert to_adapter
+
+            # Build the inter adapter message
+            h = InterAdapterHeader()
+            h.type = type
+            h.from_topic = self._to_string(from_adapter)
+            h.to_topic = self._to_string(to_adapter)
+            h.to_device_id = self._to_string(to_device_id)
+            h.proxy_device_id = self._to_string(proxy_device_id)
+
+            if message_id:
+                h.id = self._to_string(message_id)
+            else:
+                h.id = uuid4().hex
+
+            h.timestamp = int(round(time.time() * 1000))
+            iaMsg = InterAdapterMessage()
+            iaMsg.header.CopyFrom(h)
+            iaMsg.body.Pack(msg)
+
+            log.debug("sending-inter-adapter-message", header=iaMsg.header)
+            res = yield self.invoke(rpc="process_inter_adapter_message",
+                                    to_topic=iaMsg.header.to_topic,
+                                    msg=iaMsg)
+            returnValue(res)
+        except Exception as e:
+            log.exception("error-sending-request", e=e)
diff --git a/python/kafka/adapter_request_facade.py b/python/kafka/adapter_request_facade.py
new file mode 100644
index 0000000..7bf41e5
--- /dev/null
+++ b/python/kafka/adapter_request_facade.py
@@ -0,0 +1,337 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This facade handles kafka-formatted messages from the Core, extracts the kafka
+formatting and forwards the request to the concrete handler.
+"""
+import structlog
+from twisted.internet.defer import inlineCallbacks
+from zope.interface import implementer
+from twisted.internet import reactor
+
+from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
+from voltha.adapters.interface import IAdapterInterface
+from voltha.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
+from voltha.protos.device_pb2 import Device, ImageDownload
+from voltha.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
+    FlowGroupChanges, ofp_packet_out
+from kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
+
+log = structlog.get_logger()
+
+class MacAddressError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+class IDError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+@implementer(IAdapterInterface)
+class AdapterRequestFacade(object):
+    """
+    Gate-keeper between CORE and device adapters.
+
+    On one side it interacts with Core's internal model and update/dispatch
+    mechanisms.
+
+    On the other side, it interacts with the adapters standard interface as
+    defined in
+    """
+
+    def __init__(self, adapter):
+        self.adapter = adapter
+
+    @inlineCallbacks
+    def start(self):
+        log.debug('starting')
+
+    @inlineCallbacks
+    def stop(self):
+        log.debug('stopping')
+
+    @inlineCallbacks
+    def createKafkaDeviceTopic(self, deviceId):
+        log.debug("subscribing-to-topic", device_id=deviceId)
+        kafka_proxy = get_messaging_proxy()
+        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+        # yield kafka_proxy.create_topic(topic=device_topic)
+        yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
+        log.debug("subscribed-to-topic", topic=device_topic)
+
+    def adopt_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+
+            # Start the creation of a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device.
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
+            result = self.adapter.adopt_device(d)
+            # return True, self.adapter.adopt_device(d)
+
+            return True, result
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+
+    def get_ofp_device_info(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return True, self.adapter.get_ofp_device_info(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+
+    def get_ofp_port_info(self, device, port_no):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        p = IntType()
+        if port_no:
+            port_no.Unpack(p)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="port-no-invalid")
+
+        return True, self.adapter.get_ofp_port_info(d, p.val)
+
+    def reconcile_device(self, device):
+        return self.adapter.reconcile_device(device)
+
+    def abandon_device(self, device):
+        return self.adapter.abandon_device(device)
+
+    def disable_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return True, self.adapter.disable_device(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+
+    def reenable_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return True, self.adapter.reenable_device(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+
+    def reboot_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return (True, self.adapter.reboot_device(d))
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+
+    def download_image(self, device, request):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        img = ImageDownload()
+        if request:
+            request.Unpack(img)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="port-no-invalid")
+
+        return True, self.adapter.download_image(device, request)
+
+    def get_image_download_status(self, device, request):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        img = ImageDownload()
+        if request:
+            request.Unpack(img)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="port-no-invalid")
+
+        return True, self.adapter.get_image_download_status(device, request)
+
+    def cancel_image_download(self, device, request):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        img = ImageDownload()
+        if request:
+            request.Unpack(img)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="port-no-invalid")
+
+        return True, self.adapter.cancel_image_download(device, request)
+
+    def activate_image_update(self, device, request):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        img = ImageDownload()
+        if request:
+            request.Unpack(img)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="port-no-invalid")
+
+        return True, self.adapter.activate_image_update(device, request)
+
+    def revert_image_update(self, device, request):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        img = ImageDownload()
+        if request:
+            request.Unpack(img)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="port-no-invalid")
+
+        return True, self.adapter.revert_image_update(device, request)
+
+
+    def self_test(self, device):
+        return self.adapter.self_test_device(device)
+
+    def delete_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            result = self.adapter.delete_device(d)
+            # return (True, self.adapter.delete_device(d))
+
+            # Before we return, delete the device specific topic as we will no
+            # longer receive requests from the Core for that device
+            kafka_proxy = get_messaging_proxy()
+            device_topic = kafka_proxy.get_default_topic() + "/" + d.id
+            kafka_proxy.unsubscribe(topic=device_topic)
+
+            return (True, result)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+
+    def get_device_details(self, device):
+        return self.adapter.get_device_details(device)
+
+    def update_flows_bulk(self, device, flows, groups):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        f = Flows()
+        if flows:
+            flows.Unpack(f)
+
+        g = FlowGroups()
+        if groups:
+            groups.Unpack(g)
+
+        return (True, self.adapter.update_flows_bulk(d, f, g))
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="device-invalid")
+        f = FlowChanges()
+        if flow_changes:
+            flow_changes.Unpack(f)
+
+        g = FlowGroupChanges()
+        if group_changes:
+            group_changes.Unpack(g)
+
+        return (True, self.adapter.update_flows_incrementally(d, f, g))
+
+    def suppress_alarm(self, filter):
+        return self.adapter.suppress_alarm(filter)
+
+    def unsuppress_alarm(self, filter):
+        return self.adapter.unsuppress_alarm(filter)
+
+    def process_inter_adapter_message(self, msg):
+        m = InterAdapterMessage()
+        if msg:
+            msg.Unpack(m)
+        else:
+            return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                reason="msg-invalid")
+
+        return (True, self.adapter.process_inter_adapter_message(m))
+
+
+    def receive_packet_out(self, deviceId, outPort, packet):
+        try:
+            d_id = StrType()
+            if deviceId:
+                deviceId.Unpack(d_id)
+            else:
+                return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                    reason="deviceid-invalid")
+
+            op = IntType()
+            if outPort:
+                outPort.Unpack(op)
+            else:
+                return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                    reason="outport-invalid")
+
+            p = ofp_packet_out()
+            if packet:
+                packet.Unpack(p)
+            else:
+                return False, Error(code=ErrorCode.INVALID_PARAMETERS,
+                                    reason="packet-invalid")
+
+            return (True, self.adapter.receive_packet_out(d_id.val, op.val, p))
+        except Exception as e:
+            log.exception("error-processing-receive_packet_out", e=e)
+
diff --git a/python/kafka/container_proxy.py b/python/kafka/container_proxy.py
new file mode 100644
index 0000000..d7f18b4
--- /dev/null
+++ b/python/kafka/container_proxy.py
@@ -0,0 +1,133 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The superclass for all kafka proxy subclasses.
+"""
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from common.utils.deferred_utils import DeferredWithTimeout, \
+    TimeOutError
+from voltha.core.registry import IComponent
+
+log = structlog.get_logger()
+
+
+class KafkaMessagingError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+@implementer(IComponent)
+class ContainerProxy(object):
+
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        self.kafka_proxy = kafka_proxy
+        self.listening_topic = my_listening_topic
+        self.core_topic = core_topic
+        self.default_timeout = 3
+
+    def start(self):
+        log.info('started')
+
+        return self
+
+    def stop(self):
+        log.info('stopped')
+
+    @classmethod
+    def wrap_request(cls, return_cls):
+        def real_wrapper(func):
+            @inlineCallbacks
+            def wrapper(*args, **kw):
+                try:
+                    (success, d) = yield func(*args, **kw)
+                    if success:
+                        log.debug("successful-response", func=func, val=d)
+                        if return_cls is not None:
+                            rc = return_cls()
+                            if d is not None:
+                                d.Unpack(rc)
+                            returnValue(rc)
+                        else:
+                            log.debug("successful-response-none", func=func,
+                                      val=None)
+                            returnValue(None)
+                    else:
+                        log.warn("unsuccessful-request", func=func, args=args,
+                                 kw=kw)
+                        returnValue(d)
+                except Exception as e:
+                    log.exception("request-wrapper-exception", func=func, e=e)
+                    raise
+
+            return wrapper
+
+        return real_wrapper
+
+    @inlineCallbacks
+    def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
+        @inlineCallbacks
+        def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
+            try:
+                log.debug("sending-request",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic)
+                if to_topic is None:
+                    to_topic = self.core_topic
+                if reply_topic is None:
+                    reply_topic = self.listening_topic
+                result = yield self.kafka_proxy.send_request(rpc=rpc,
+                                                             to_topic=to_topic,
+                                                             reply_topic=reply_topic,
+                                                             callback=None,
+                                                             **kwargs)
+                if not m_callback.called:
+                    m_callback.callback(result)
+                else:
+                    log.debug('timeout-already-occurred', rpc=rpc)
+            except Exception as e:
+                log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+                if not m_callback.called:
+                    m_callback.errback(failure.Failure())
+
+        # We are going to resend the request on the to_topic if there is a
+        # timeout error. This time the timeout will be longer.  If the second
+        # request times out then we will send the request to the default
+        # core_topic.
+        timeouts = [self.default_timeout,
+                    self.default_timeout*2,
+                    self.default_timeout]
+        retry = 0
+        max_retry = 2
+        for timeout in timeouts:
+            cb = DeferredWithTimeout(timeout=timeout)
+            _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+            try:
+                res = yield cb
+                returnValue(res)
+            except TimeOutError as e:
+                log.warn('invoke-timeout', e=e)
+                if retry == max_retry:
+                    raise e
+                retry += 1
+                if retry == max_retry:
+                    to_topic = self.core_topic
diff --git a/python/kafka/core_proxy.py b/python/kafka/core_proxy.py
new file mode 100644
index 0000000..4f4579b
--- /dev/null
+++ b/python/kafka/core_proxy.py
@@ -0,0 +1,344 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between CORE and an adapter.
+"""
+import structlog
+from google.protobuf.message import Message
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from container_proxy import ContainerProxy
+from voltha.protos.common_pb2 import ID, ConnectStatus, OperStatus
+from voltha.protos.inter_container_pb2 import StrType, BoolType, IntType, Packet
+from voltha.protos.device_pb2 import Device, Ports
+from voltha.protos.voltha_pb2 import CoreInstance
+
+log = structlog.get_logger()
+
+
+def createSubTopic(*args):
+    return '_'.join(args)
+
+class CoreProxy(ContainerProxy):
+
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+                                        my_listening_topic)
+
+    @ContainerProxy.wrap_request(CoreInstance)
+    @inlineCallbacks
+    def register(self, adapter, deviceTypes):
+        log.debug("register")
+        try:
+            res = yield self.invoke(rpc="Register",
+                                    adapter=adapter,
+                                    deviceTypes=deviceTypes)
+            log.info("registration-returned", res=res)
+            returnValue(res)
+        except Exception as e:
+            log.exception("registration-exception", e=e)
+            raise
+
+    @ContainerProxy.wrap_request(Device)
+    @inlineCallbacks
+    def get_device(self, device_id):
+        log.debug("get-device")
+        id = ID()
+        id.id = device_id
+        # Once we have a device being managed, all communications between the
+        # the adapter and the core occurs over a topic associated with that
+        # device
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="GetDevice",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id)
+        returnValue(res)
+
+    @ContainerProxy.wrap_request(Device)
+    @inlineCallbacks
+    def get_child_device(self, parent_device_id, **kwargs):
+        raise NotImplementedError()
+
+    @ContainerProxy.wrap_request(Ports)
+    @inlineCallbacks
+    def get_ports(self, device_id, port_type):
+        id = ID()
+        id.id = device_id
+        p_type = IntType()
+        p_type.val = port_type
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="GetPorts",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id,
+                                port_type=p_type)
+        returnValue(res)
+
+    def get_child_devices(self, parent_device_id):
+        raise NotImplementedError()
+
+    def get_child_device_with_proxy_address(self, proxy_address):
+        raise NotImplementedError()
+
+    def _to_proto(self, **kwargs):
+        encoded = {}
+        for k, v in kwargs.iteritems():
+            if isinstance(v, Message):
+                encoded[k] = v
+            elif type(v) == int:
+                i_proto = IntType()
+                i_proto.val = v
+                encoded[k] = i_proto
+            elif type(v) == str:
+                s_proto = StrType()
+                s_proto.val = v
+                encoded[k] = s_proto
+            elif type(v) == bool:
+                b_proto = BoolType()
+                b_proto.val = v
+                encoded[k] = b_proto
+        return encoded
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def child_device_detected(self,
+                              parent_device_id,
+                              parent_port_no,
+                              child_device_type,
+                              channel_id,
+                              **kw):
+        id = ID()
+        id.id = parent_device_id
+        ppn = IntType()
+        ppn.val = parent_port_no
+        cdt = StrType()
+        cdt.val = child_device_type
+        channel = IntType()
+        channel.val = channel_id
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+        args = self._to_proto(**kw)
+        res = yield self.invoke(rpc="ChildDeviceDetected",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                parent_device_id=id,
+                                parent_port_no=ppn,
+                                child_device_type=cdt,
+                                channel_id=channel,
+                                **args)
+        returnValue(res)
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def device_update(self, device):
+        log.debug("device_update")
+        to_topic = createSubTopic(self.core_topic, device.id)
+        reply_topic = createSubTopic(self.listening_topic, device.id)
+        res = yield self.invoke(rpc="DeviceUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device=device)
+        returnValue(res)
+
+    def child_device_removed(parent_device_id, child_device_id):
+        raise NotImplementedError()
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def device_state_update(self, device_id,
+                            oper_status=None,
+                            connect_status=None):
+        id = ID()
+        id.id = device_id
+        o_status = IntType()
+        if oper_status or oper_status == OperStatus.UNKNOWN:
+            o_status.val = oper_status
+        else:
+            o_status.val = -1
+        c_status = IntType()
+        if connect_status or connect_status == ConnectStatus.UNKNOWN:
+            c_status.val = connect_status
+        else:
+            c_status.val = -1
+
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="DeviceStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id,
+                                oper_status=o_status,
+                                connect_status=c_status)
+        returnValue(res)
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def children_state_update(self, device_id,
+                              oper_status=None,
+                              connect_status=None):
+        id = ID()
+        id.id = device_id
+        o_status = IntType()
+        if oper_status or oper_status == OperStatus.UNKNOWN:
+            o_status.val = oper_status
+        else:
+            o_status.val = -1
+        c_status = IntType()
+        if connect_status or connect_status == ConnectStatus.UNKNOWN:
+            c_status.val = connect_status
+        else:
+            c_status.val = -1
+
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="ChildrenStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id,
+                                oper_status=o_status,
+                                connect_status=c_status)
+        returnValue(res)
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def port_state_update(self,
+                          device_id,
+                          port_type,
+                          port_no,
+                          oper_status):
+        id = ID()
+        id.id = device_id
+        pt = IntType()
+        pt.val = port_type
+        pNo = IntType()
+        pNo.val = port_no
+        o_status = IntType()
+        o_status.val = oper_status
+
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="PortStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id,
+                                port_type=pt,
+                                port_no=pNo,
+                                oper_status=o_status)
+        returnValue(res)
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def child_devices_state_update(self, parent_device_id,
+                                   oper_status=None,
+                                   connect_status=None):
+
+        id = ID()
+        id.id = parent_device_id
+        o_status = IntType()
+        if oper_status or oper_status == OperStatus.UNKNOWN:
+            o_status.val = oper_status
+        else:
+            o_status.val = -1
+        c_status = IntType()
+        if connect_status or connect_status == ConnectStatus.UNKNOWN:
+            c_status.val = connect_status
+        else:
+            c_status.val = -1
+
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
+        res = yield self.invoke(rpc="child_devices_state_update",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                parent_device_id=id,
+                                oper_status=o_status,
+                                connect_status=c_status)
+        returnValue(res)
+
+    def child_devices_removed(parent_device_id):
+        raise NotImplementedError()
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def device_pm_config_update(self, device_pm_config, init=False):
+        log.debug("device_pm_config_update")
+        b = BoolType()
+        b.val = init
+        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
+        res = yield self.invoke(rpc="DevicePMConfigUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_pm_config=device_pm_config,
+                                init=b)
+        returnValue(res)
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def port_created(self, device_id, port):
+        log.debug("port_created")
+        proto_id = ID()
+        proto_id.id = device_id
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="PortCreated",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=proto_id,
+                                port=port)
+        returnValue(res)
+
+    def port_removed(device_id, port):
+        raise NotImplementedError()
+
+    def ports_enabled(device_id):
+        raise NotImplementedError()
+
+    def ports_disabled(device_id):
+        raise NotImplementedError()
+
+    def ports_oper_status_update(device_id, oper_status):
+        raise NotImplementedError()
+
+    def image_download_update(img_dnld):
+        raise NotImplementedError()
+
+    def image_download_deleted(img_dnld):
+        raise NotImplementedError()
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def send_packet_in(self, device_id, port, packet):
+        log.debug("send_packet_in", device_id=device_id)
+        proto_id = ID()
+        proto_id.id = device_id
+        p = IntType()
+        p.val = port
+        pac = Packet()
+        pac.payload = packet
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="PacketIn",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=proto_id,
+                                port=p,
+                                packet=pac)
+        returnValue(res)
diff --git a/python/kafka/event_bus_publisher.py b/python/kafka/event_bus_publisher.py
new file mode 100644
index 0000000..8020bf5
--- /dev/null
+++ b/python/kafka/event_bus_publisher.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A gateway between the internal event bus and the Kafka publisher proxy
+to publish select topics and messages posted to the Voltha-internal event
+bus toward the external world.
+"""
+import structlog
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from common.event_bus import EventBusClient
+
+log = structlog.get_logger()
+
+
+class EventBusPublisher(object):
+
+    def __init__(self, kafka_proxy, config):
+        self.kafka_proxy = kafka_proxy
+        self.config = config
+        self.topic_mappings = config.get('topic_mappings', {})
+        self.event_bus = EventBusClient()
+        self.subscriptions = None
+
+    def start(self):
+        log.debug('starting')
+        self.subscriptions = list()
+        self._setup_subscriptions(self.topic_mappings)
+        log.info('started')
+        return self
+
+    def stop(self):
+        try:
+            log.debug('stopping-event-bus')
+            if self.subscriptions:
+                for subscription in self.subscriptions:
+                    self.event_bus.unsubscribe(subscription)
+            log.info('stopped-event-bus')
+        except Exception, e:
+            log.exception('failed-stopping-event-bus', e=e)
+            return
+
+    def _setup_subscriptions(self, mappings):
+
+        for event_bus_topic, mapping in mappings.iteritems():
+
+            kafka_topic = mapping.get('kafka_topic', None)
+
+            if kafka_topic is None:
+                log.error('no-kafka-topic-in-config',
+                          event_bus_topic=event_bus_topic,
+                          mapping=mapping)
+                continue
+
+            self.subscriptions.append(self.event_bus.subscribe(
+                event_bus_topic,
+                # to avoid Python late-binding to the last registered
+                # kafka_topic, we force instant binding with the default arg
+                lambda _, m, k=kafka_topic: self.forward(k, m)))
+
+            log.info('event-to-kafka', kafka_topic=kafka_topic,
+                     event_bus_topic=event_bus_topic)
+
+    def forward(self, kafka_topic, msg):
+        try:
+            # convert to JSON string if msg is a protobuf msg
+            if isinstance(msg, Message):
+                msg = dumps(MessageToDict(msg, True, True))
+            log.debug('forward-event-bus-publisher')
+            self.kafka_proxy.send_message(kafka_topic, msg)
+        except Exception, e:
+            log.exception('failed-forward-event-bus-publisher', e=e)
+
diff --git a/python/kafka/kafka_inter_container_library.py b/python/kafka/kafka_inter_container_library.py
new file mode 100644
index 0000000..cf51684
--- /dev/null
+++ b/python/kafka/kafka_inter_container_library.py
@@ -0,0 +1,570 @@
+#!/usr/bin/env python
+
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+from uuid import uuid4
+
+import structlog
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
+    DeferredQueue, gatherResults
+from zope.interface import implementer
+
+from common.utils import asleep
+from voltha.core.registry import IComponent
+from kafka_proxy import KafkaProxy, get_kafka_proxy
+from voltha.protos.inter_container_pb2 import MessageType, Argument, \
+    InterContainerRequestBody, InterContainerMessage, Header, \
+    InterContainerResponseBody
+
+log = structlog.get_logger()
+
+KAFKA_OFFSET_LATEST = 'latest'
+KAFKA_OFFSET_EARLIEST = 'earliest'
+
+
+class KafkaMessagingError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+@implementer(IComponent)
+class IKafkaMessagingProxy(object):
+    _kafka_messaging_instance = None
+
+    def __init__(self,
+                 kafka_host_port,
+                 kv_store,
+                 default_topic,
+                 group_id_prefix,
+                 target_cls):
+        """
+        Initialize the kafka proxy.  This is a singleton (may change to
+        non-singleton if performance is better)
+        :param kafka_host_port: Kafka host and port
+        :param kv_store: Key-Value store
+        :param default_topic: Default topic to subscribe to
+        :param target_cls: target class - method of that class is invoked
+        when a message is received on the default_topic
+        """
+        # return an exception if the object already exist
+        if IKafkaMessagingProxy._kafka_messaging_instance:
+            raise Exception(
+                'Singleton-exist', cls=IKafkaMessagingProxy)
+
+        log.debug("Initializing-KafkaProxy")
+        self.kafka_host_port = kafka_host_port
+        self.kv_store = kv_store
+        self.default_topic = default_topic
+        self.default_group_id = "_".join((group_id_prefix, default_topic))
+        self.target_cls = target_cls
+        self.topic_target_cls_map = {}
+        self.topic_callback_map = {}
+        self.subscribers = {}
+        self.kafka_proxy = None
+        self.transaction_id_deferred_map = {}
+        self.received_msg_queue = DeferredQueue()
+        self.stopped = False
+
+        self.init_time = 0
+        self.init_received_time = 0
+
+        self.init_resp_time = 0
+        self.init_received_resp_time = 0
+
+        self.num_messages = 0
+        self.total_time = 0
+        self.num_responses = 0
+        self.total_time_responses = 0
+        log.debug("KafkaProxy-initialized")
+
+    def start(self):
+        try:
+            log.debug("KafkaProxy-starting")
+
+            # Get the kafka proxy instance.  If it does not exist then
+            # create it
+            self.kafka_proxy = get_kafka_proxy()
+            if self.kafka_proxy == None:
+                KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
+                self.kafka_proxy = get_kafka_proxy()
+
+            # Subscribe the default topic and target_cls
+            self.topic_target_cls_map[self.default_topic] = self.target_cls
+
+            # Start the queue to handle incoming messages
+            reactor.callLater(0, self._received_message_processing_loop)
+
+            # Subscribe using the default topic and default group id.  Whenever
+            # a message is received on that topic then teh target_cls will be
+            # invoked.
+            reactor.callLater(0, self.subscribe,
+                              topic=self.default_topic,
+                              target_cls=self.target_cls,
+                              group_id=self.default_group_id)
+
+            # Setup the singleton instance
+            IKafkaMessagingProxy._kafka_messaging_instance = self
+            log.debug("KafkaProxy-started")
+        except Exception as e:
+            log.exception("Failed-to-start-proxy", e=e)
+
+    def stop(self):
+        """
+        Invoked to stop the kafka proxy
+        :return: None on success, Exception on failure
+        """
+        log.debug("Stopping-messaging-proxy ...")
+        try:
+            # Stop the kafka proxy.  This will stop all the consumers
+            # and producers
+            self.stopped = True
+            self.kafka_proxy.stop()
+            log.debug("Messaging-proxy-stopped.")
+        except Exception as e:
+            log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+
+    def get_target_cls(self):
+        return self.target_cls
+
+    def get_default_topic(self):
+        return self.default_topic
+
+    @inlineCallbacks
+    def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
+                                  target_cls=None):
+        try:
+            log.debug("subscribing-to-topic-start", topic=topic)
+            yield self.kafka_proxy.subscribe(topic,
+                                             self._enqueue_received_group_message,
+                                             group_id, offset)
+
+            if target_cls is not None and callback is None:
+                # Scenario #1
+                if topic not in self.topic_target_cls_map:
+                    self.topic_target_cls_map[topic] = target_cls
+            elif target_cls is None and callback is not None:
+                # Scenario #2
+                log.debug("custom-callback", topic=topic,
+                          callback_map=self.topic_callback_map)
+                if topic not in self.topic_callback_map:
+                    self.topic_callback_map[topic] = [callback]
+                else:
+                    self.topic_callback_map[topic].extend([callback])
+            else:
+                log.warn("invalid-parameters")
+
+            returnValue(True)
+        except Exception as e:
+            log.exception("Exception-during-subscription", e=e)
+            returnValue(False)
+
+    @inlineCallbacks
+    def subscribe(self, topic, callback=None, target_cls=None,
+                  max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
+        """
+        Scenario 1:  invoked to subscribe to a specific topic with a
+        target_cls to invoke when a message is received on that topic.  This
+        handles the case of request/response where this library performs the
+        heavy lifting. In this case the m_callback must to be None
+
+        Scenario 2:  invoked to subscribe to a specific topic with a
+        specific callback to invoke when a message is received on that topic.
+        This handles the case where the caller wants to process the message
+        received itself. In this case the target_cls must to be None
+
+        :param topic: topic to subscribe to
+        :param callback: Callback to invoke when a message is received on
+        the topic. Either one of callback or target_cls needs can be none
+        :param target_cls:  Target class to use when a message is
+        received on the topic. There can only be 1 target_cls per topic.
+        Either one of callback or target_cls needs can be none
+        :param max_retry:  the number of retries before reporting failure
+        to subscribe.  This caters for scenario where the kafka topic is not
+        ready.
+        :param group_id:  The ID of the group the consumer is subscribing to
+        :param offset: The topic offset on the kafka bus from where message consumption will start
+        :return: True on success, False on failure
+        """
+        RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+        def _backoff(msg, retries):
+            wait_time = RETRY_BACKOFF[min(retries,
+                                          len(RETRY_BACKOFF) - 1)]
+            log.info(msg, retry_in=wait_time)
+            return asleep.asleep(wait_time)
+
+        log.debug("subscribing", topic=topic, group_id=group_id,
+                  callback=callback, target=target_cls)
+
+        retry = 0
+        subscribed = False
+        if group_id is None:
+            group_id = self.default_group_id
+        while not subscribed:
+            subscribed = yield self._subscribe_group_consumer(group_id, topic,
+                                                              callback=callback,
+                                                              target_cls=target_cls,
+                                                              offset=offset)
+            if subscribed:
+                returnValue(True)
+            elif retry > max_retry:
+                returnValue(False)
+            else:
+                _backoff("subscription-not-complete", retry)
+                retry += 1
+
+    def unsubscribe(self, topic, callback=None, target_cls=None):
+        """
+        Invoked when unsubscribing to a topic
+        :param topic: topic to unsubscribe from
+        :param callback:  the callback used when subscribing to the topic, if any
+        :param target_cls: the targert class used when subscribing to the topic, if any
+        :return: None on success or Exception on failure
+        """
+        log.debug("Unsubscribing-to-topic", topic=topic)
+
+        try:
+            self.kafka_proxy.unsubscribe(topic,
+                                         self._enqueue_received_group_message)
+
+            if callback is None and target_cls is None:
+                log.error("both-call-and-target-cls-cannot-be-none",
+                          topic=topic)
+                raise KafkaMessagingError(
+                    error="both-call-and-target-cls-cannot-be-none")
+
+            if target_cls is not None and topic in self.topic_target_cls_map:
+                del self.topic_target_cls_map[topic]
+
+            if callback is not None and topic in self.topic_callback_map:
+                index = 0
+                for cb in self.topic_callback_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callback_map[topic]):
+                    self.topic_callback_map[topic].pop(index)
+
+                if len(self.topic_callback_map[topic]) == 0:
+                    del self.topic_callback_map[topic]
+        except Exception as e:
+            log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
+                          e=e)
+            return e
+
+    @inlineCallbacks
+    def _enqueue_received_group_message(self, msg):
+        """
+        Internal method to continuously queue all received messaged
+        irrespective of topic
+        :param msg: Received message
+        :return: None on success, Exception on failure
+        """
+        try:
+            log.debug("received-msg", msg=msg)
+            yield self.received_msg_queue.put(msg)
+        except Exception as e:
+            log.exception("Failed-enqueueing-received-message", e=e)
+
+    @inlineCallbacks
+    def _received_message_processing_loop(self):
+        """
+        Internal method to continuously process all received messages one
+        at a time
+        :return: None on success, Exception on failure
+        """
+        while True:
+            try:
+                message = yield self.received_msg_queue.get()
+                yield self._process_message(message)
+                if self.stopped:
+                    break
+            except Exception as e:
+                log.exception("Failed-dequeueing-received-message", e=e)
+
+    def _to_string(self, unicode_str):
+        if unicode_str is not None:
+            if type(unicode_str) == unicode:
+                return unicode_str.encode('ascii', 'ignore')
+            else:
+                return unicode_str
+        else:
+            return None
+
+    def _format_request(self,
+                        rpc,
+                        to_topic,
+                        reply_topic,
+                        **kwargs):
+        """
+        Format a request to send over kafka
+        :param rpc: Requested remote API
+        :param to_topic: Topic to send the request
+        :param reply_topic: Topic to receive the resulting response, if any
+        :param kwargs: Dictionary of key-value pairs to pass as arguments to
+        the remote rpc API.
+        :return: A InterContainerMessage message type on success or None on
+        failure
+        """
+        try:
+            transaction_id = uuid4().hex
+            request = InterContainerMessage()
+            request_body = InterContainerRequestBody()
+            request.header.id = transaction_id
+            request.header.type = MessageType.Value("REQUEST")
+            request.header.from_topic = reply_topic
+            request.header.to_topic = to_topic
+
+            response_required = False
+            if reply_topic:
+                request_body.reply_to_topic = reply_topic
+                request_body.response_required = True
+                response_required = True
+
+            request.header.timestamp = int(round(time.time() * 1000))
+            request_body.rpc = rpc
+            for a, b in kwargs.iteritems():
+                arg = Argument()
+                arg.key = a
+                try:
+                    arg.value.Pack(b)
+                    request_body.args.extend([arg])
+                except Exception as e:
+                    log.exception("Failed-parsing-value", e=e)
+            request.body.Pack(request_body)
+            return request, transaction_id, response_required
+        except Exception as e:
+            log.exception("formatting-request-failed",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic,
+                          args=kwargs)
+            return None, None, None
+
+    def _format_response(self, msg_header, msg_body, status):
+        """
+        Format a response
+        :param msg_header: The header portion of a received request
+        :param msg_body: The response body
+        :param status: True is this represents a successful response
+        :return: a InterContainerMessage message type
+        """
+        try:
+            assert isinstance(msg_header, Header)
+            response = InterContainerMessage()
+            response_body = InterContainerResponseBody()
+            response.header.id = msg_header.id
+            response.header.timestamp = int(
+                round(time.time() * 1000))
+            response.header.type = MessageType.Value("RESPONSE")
+            response.header.from_topic = msg_header.to_topic
+            response.header.to_topic = msg_header.from_topic
+            if msg_body is not None:
+                response_body.result.Pack(msg_body)
+            response_body.success = status
+            response.body.Pack(response_body)
+            return response
+        except Exception as e:
+            log.exception("formatting-response-failed", header=msg_header,
+                          body=msg_body, status=status, e=e)
+            return None
+
+    def _parse_response(self, msg):
+        try:
+            message = InterContainerMessage()
+            message.ParseFromString(msg)
+            resp = InterContainerResponseBody()
+            if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
+                message.body.Unpack(resp)
+            else:
+                log.debug("unsupported-msg", msg_type=type(message.body))
+                return None
+            log.debug("parsed-response", input=message, output=resp)
+            return resp
+        except Exception as e:
+            log.exception("parsing-response-failed", msg=msg, e=e)
+            return None
+
+    @inlineCallbacks
+    def _process_message(self, m):
+        """
+        Default internal method invoked for every batch of messages received
+        from Kafka.
+        """
+
+        def _toDict(args):
+            """
+            Convert a repeatable Argument type into a python dictionary
+            :param args: Repeatable core_adapter.Argument type
+            :return: a python dictionary
+            """
+            if args is None:
+                return None
+            result = {}
+            for arg in args:
+                assert isinstance(arg, Argument)
+                result[arg.key] = arg.value
+            return result
+
+        current_time = int(round(time.time() * 1000))
+        # log.debug("Got Message", message=m)
+        try:
+            val = m.value()
+            # val = m.message.value
+            # print m.topic
+
+            # Go over customized callbacks first
+            m_topic = m.topic()
+            if m_topic in self.topic_callback_map:
+                for c in self.topic_callback_map[m_topic]:
+                    yield c(val)
+
+            #  Check whether we need to process request/response scenario
+            if m_topic not in self.topic_target_cls_map:
+                return
+
+            # Process request/response scenario
+            message = InterContainerMessage()
+            message.ParseFromString(val)
+
+            if message.header.type == MessageType.Value("REQUEST"):
+                # Get the target class for that specific topic
+                targetted_topic = self._to_string(message.header.to_topic)
+                msg_body = InterContainerRequestBody()
+                if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
+                    message.body.Unpack(msg_body)
+                else:
+                    log.debug("unsupported-msg", msg_type=type(message.body))
+                    return
+                if targetted_topic in self.topic_target_cls_map:
+                    if msg_body.args:
+                        log.debug("message-body-args-present", body=msg_body)
+                        (status, res) = yield getattr(
+                            self.topic_target_cls_map[targetted_topic],
+                            self._to_string(msg_body.rpc))(
+                            **_toDict(msg_body.args))
+                    else:
+                        log.debug("message-body-args-absent", body=msg_body,
+                                  rpc=msg_body.rpc)
+                        (status, res) = yield getattr(
+                            self.topic_target_cls_map[targetted_topic],
+                            self._to_string(msg_body.rpc))()
+                    if msg_body.response_required:
+                        response = self._format_response(
+                            msg_header=message.header,
+                            msg_body=res,
+                            status=status,
+                        )
+                        if response is not None:
+                            res_topic = self._to_string(
+                                response.header.to_topic)
+                            self._send_kafka_message(res_topic, response)
+
+                        log.debug("Response-sent", response=response.body,
+                                  to_topic=res_topic)
+            elif message.header.type == MessageType.Value("RESPONSE"):
+                trns_id = self._to_string(message.header.id)
+                if trns_id in self.transaction_id_deferred_map:
+                    resp = self._parse_response(val)
+
+                    self.transaction_id_deferred_map[trns_id].callback(resp)
+            else:
+                log.error("!!INVALID-TRANSACTION-TYPE!!")
+
+        except Exception as e:
+            log.exception("Failed-to-process-message", message=m, e=e)
+
+    @inlineCallbacks
+    def _send_kafka_message(self, topic, msg):
+        try:
+            yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
+        except Exception, e:
+            log.exception("Failed-sending-message", message=msg, e=e)
+
+    @inlineCallbacks
+    def send_request(self,
+                     rpc,
+                     to_topic,
+                     reply_topic=None,
+                     callback=None,
+                     **kwargs):
+        """
+        Invoked to send a message to a remote container and receive a
+        response if required.
+        :param rpc: The remote API to invoke
+        :param to_topic: Send the message to this kafka topic
+        :param reply_topic: If not None then a response is expected on this
+        topic.  If set to None then no response is required.
+        :param callback: Callback to invoke when a response is received.
+        :param kwargs: Key-value pairs representing arguments to pass to the
+        rpc remote API.
+        :return: Either no response is required, or a response is returned
+        via the callback or the response is a tuple of (status, return_cls)
+        """
+        try:
+            # Ensure all strings are not unicode encoded
+            rpc = self._to_string(rpc)
+            to_topic = self._to_string(to_topic)
+            reply_topic = self._to_string(reply_topic)
+
+            request, transaction_id, response_required = \
+                self._format_request(
+                    rpc=rpc,
+                    to_topic=to_topic,
+                    reply_topic=reply_topic,
+                    **kwargs)
+
+            if request is None:
+                return
+
+            # Add the transaction to the transaction map before sending the
+            # request.  This will guarantee the eventual response will be
+            # processed.
+            wait_for_result = None
+            if response_required:
+                wait_for_result = Deferred()
+                self.transaction_id_deferred_map[
+                    self._to_string(request.header.id)] = wait_for_result
+
+            yield self._send_kafka_message(to_topic, request)
+            log.debug("message-sent", to_topic=to_topic,
+                      from_topic=reply_topic)
+
+            if response_required:
+                res = yield wait_for_result
+
+                if res is None or not res.success:
+                    raise KafkaMessagingError(error="Failed-response:{"
+                                                    "}".format(res))
+
+                # Remove the transaction from the transaction map
+                del self.transaction_id_deferred_map[transaction_id]
+
+                log.debug("send-message-response", rpc=rpc, result=res)
+
+                if callback:
+                    callback((res.success, res.result))
+                else:
+                    returnValue((res.success, res.result))
+        except Exception as e:
+            log.exception("Exception-sending-request", e=e)
+            raise KafkaMessagingError(error=e)
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_messaging_proxy():
+    return IKafkaMessagingProxy._kafka_messaging_instance
diff --git a/python/kafka/kafka_proxy.py b/python/kafka/kafka_proxy.py
new file mode 100644
index 0000000..64da9a8
--- /dev/null
+++ b/python/kafka/kafka_proxy.py
@@ -0,0 +1,338 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+
+from confluent_kafka import Producer as _kafkaProducer
+from structlog import get_logger
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
+from zope.interface import implementer
+
+from common.utils.consulhelpers import get_endpoint_from_consul
+from event_bus_publisher import EventBusPublisher
+from voltha.core.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
+
+log = get_logger()
+
+
+@implementer(IComponent)
+class KafkaProxy(object):
+    """
+    This is a singleton proxy kafka class to hide the kafka client details. This
+    proxy uses confluent-kafka-python as the kafka client. Since that client is
+    not a Twisted client then requests to that client are wrapped with
+    twisted.internet.threads.deferToThread to avoid any potential blocking of
+    the Twisted loop.
+    """
+    _kafka_instance = None
+
+    def __init__(self,
+                 consul_endpoint='localhost:8500',
+                 kafka_endpoint='localhost:9092',
+                 ack_timeout=1000,
+                 max_req_attempts=10,
+                 consumer_poll_timeout=10,
+                 config={}):
+
+        # return an exception if the object already exist
+        if KafkaProxy._kafka_instance:
+            raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+
+        log.debug('initializing', endpoint=kafka_endpoint)
+        self.ack_timeout = ack_timeout
+        self.max_req_attempts = max_req_attempts
+        self.consul_endpoint = consul_endpoint
+        self.kafka_endpoint = kafka_endpoint
+        self.config = config
+        self.kclient = None
+        self.kproducer = None
+        self.event_bus_publisher = None
+        self.stopping = False
+        self.faulty = False
+        self.consumer_poll_timeout = consumer_poll_timeout
+        self.topic_consumer_map = {}
+        self.topic_callbacks_map = {}
+        self.topic_any_map_lock = threading.Lock()
+        log.debug('initialized', endpoint=kafka_endpoint)
+
+    @inlineCallbacks
+    def start(self):
+        log.debug('starting')
+        self._get_kafka_producer()
+        KafkaProxy._kafka_instance = self
+        self.event_bus_publisher = yield EventBusPublisher(
+            self, self.config.get('event_bus_publisher', {})).start()
+        log.info('started')
+        KafkaProxy.faulty = False
+        self.stopping = False
+        returnValue(self)
+
+    @inlineCallbacks
+    def stop(self):
+        try:
+            log.debug('stopping-kafka-proxy')
+            self.stopping = True
+            try:
+                if self.kclient:
+                    yield self.kclient.close()
+                    self.kclient = None
+                    log.debug('stopped-kclient-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kclient-kafka-proxy', e=e)
+
+            try:
+                if self.kproducer:
+                    yield self.kproducer.flush()
+                    self.kproducer = None
+                    log.debug('stopped-kproducer-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
+
+            # Stop all consumers
+            try:
+                self.topic_any_map_lock.acquire()
+                log.debug('stopping-consumers-kafka-proxy')
+                for _, c in self.topic_consumer_map.iteritems():
+                    yield deferToThread(c.close)
+                self.topic_consumer_map.clear()
+                self.topic_callbacks_map.clear()
+                log.debug('stopped-consumers-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+            finally:
+                self.topic_any_map_lock.release()
+                log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+            # try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            # except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
+
+            log.debug('stopped-kafka-proxy')
+
+        except Exception, e:
+            self.kclient = None
+            self.kproducer = None
+            # self.event_bus_publisher = None
+            log.exception('failed-stopped-kafka-proxy', e=e)
+            pass
+
+    def _get_kafka_producer(self):
+
+        try:
+
+            if self.kafka_endpoint.startswith('@'):
+                try:
+                    _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                           self.kafka_endpoint[
+                                                           1:])
+                    log.debug('found-kafka-service', endpoint=_k_endpoint)
+
+                except Exception as e:
+                    log.exception('no-kafka-service-in-consul', e=e)
+
+                    self.kproducer = None
+                    self.kclient = None
+                    return
+            else:
+                _k_endpoint = self.kafka_endpoint
+            self.kproducer = _kafkaProducer(
+                {'bootstrap.servers': _k_endpoint,
+                 }
+            )
+            pass
+        except Exception, e:
+            log.exception('failed-get-kafka-producer', e=e)
+            return
+
+    @inlineCallbacks
+    def _wait_for_messages(self, consumer, topic):
+        while True:
+            try:
+                msg = yield deferToThread(consumer.poll,
+                                          self.consumer_poll_timeout)
+
+                if self.stopping:
+                    log.debug("stop-request-recieved", topic=topic)
+                    break
+
+                if msg is None:
+                    continue
+                if msg.error():
+                    # This typically is received when there are no more messages
+                    # to read from kafka. Ignore.
+                    continue
+
+                # Invoke callbacks
+                for cb in self.topic_callbacks_map[topic]:
+                    yield cb(msg)
+            except Exception as e:
+                log.debug("exception-receiving-msg", topic=topic, e=e)
+
+    @inlineCallbacks
+    def subscribe(self, topic, callback, groupId, offset='latest'):
+        """
+        subscribe allows a caller to subscribe to a given kafka topic.  This API
+        always create a group consumer.
+        :param topic - the topic to subscribe to
+        :param callback - the callback to invoke whenever a message is received
+        on that topic
+        :param groupId - the groupId for this consumer.  In the current
+        implementation there is a one-to-one mapping between a topic and a
+        groupId.  In other words, once a groupId is used for a given topic then
+        we won't be able to create another groupId for the same topic.
+        :param offset:  the kafka offset from where the consumer will start
+        consuming messages
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            if topic in self.topic_consumer_map:
+                # Just add the callback
+                if topic in self.topic_callbacks_map:
+                    self.topic_callbacks_map[topic].append(callback)
+                else:
+                    self.topic_callbacks_map[topic] = [callback]
+                return
+
+            # Create consumer for that topic
+            c = Consumer({
+                'bootstrap.servers': self.kafka_endpoint,
+                'group.id': groupId,
+                'auto.offset.reset': offset
+            })
+            yield deferToThread(c.subscribe, [topic])
+            # c.subscribe([topic])
+            self.topic_consumer_map[topic] = c
+            self.topic_callbacks_map[topic] = [callback]
+            # Start the consumer
+            reactor.callLater(0, self._wait_for_messages, c, topic)
+        except Exception, e:
+            log.exception("topic-subscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+
+    @inlineCallbacks
+    def unsubscribe(self, topic, callback):
+        """
+        Unsubscribe to a given topic.  Since there they be multiple callers
+        consuming from the same topic then to ensure only the relevant caller
+        gets unsubscribe then the callback is used as a differentiator.   The
+        kafka consumer will be closed when there are no callbacks required.
+        :param topic: topic to unsubscribe
+        :param callback: callback the caller used when subscribing to the topic.
+        If multiple callers have subscribed to a topic using the same callback
+        then the first callback on the list will be removed.
+        :return:None
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            log.debug("unsubscribing-to-topic", topic=topic)
+            if topic in self.topic_callbacks_map:
+                index = 0
+                for cb in self.topic_callbacks_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callbacks_map[topic]):
+                    self.topic_callbacks_map[topic].pop(index)
+
+                if len(self.topic_callbacks_map[topic]) == 0:
+                    # Stop the consumer
+                    if topic in self.topic_consumer_map:
+                        yield deferToThread(
+                            self.topic_consumer_map[topic].close)
+                        del self.topic_consumer_map[topic]
+                    del self.topic_callbacks_map[topic]
+                    log.debug("unsubscribed-to-topic", topic=topic)
+                else:
+                    log.debug("consumers-for-topic-still-exist", topic=topic,
+                              num=len(self.topic_callbacks_map[topic]))
+        except Exception, e:
+            log.exception("topic-unsubscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+            log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+    @inlineCallbacks
+    def send_message(self, topic, msg, key=None):
+        assert topic is not None
+        assert msg is not None
+
+        # first check whether we have a kafka producer.  If there is none
+        # then try to get one - this happens only when we try to lookup the
+        # kafka service from consul
+        try:
+            if self.faulty is False:
+
+                if self.kproducer is None:
+                    self._get_kafka_producer()
+                    # Lets the next message request do the retry if still a failure
+                    if self.kproducer is None:
+                        log.error('no-kafka-producer',
+                                  endpoint=self.kafka_endpoint)
+                        return
+
+                log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
+                msgs = [msg]
+
+                if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+                    d = deferToThread(self.kproducer.produce, topic, msg, key)
+                    yield d
+                    log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+                    # send a lightweight poll to avoid an exception after 100k messages.
+                    d1 = deferToThread(self.kproducer.poll, 0)
+                    yield d1
+                else:
+                    return
+
+        except Exception, e:
+            self.faulty = True
+            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+                      e=e)
+
+            # set the kafka producer to None.  This is needed if the
+            # kafka docker went down and comes back up with a different
+            # port number.
+            if self.stopping is False:
+                log.debug('stopping-kafka-proxy')
+                try:
+                    self.stopping = True
+                    self.stop()
+                    self.stopping = False
+                    self.faulty = False
+                    log.debug('stopped-kafka-proxy')
+                except Exception, e:
+                    log.exception('failed-stopping-kafka-proxy', e=e)
+                    pass
+            else:
+                log.info('already-stopping-kafka-proxy')
+
+            return
+
+    def is_faulty(self):
+        return self.faulty
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+    return KafkaProxy._kafka_instance