This commit consists of the following:
1) The kafka messaging proxy in Twisted python for adapters
2) Initial implementation and containerization of ponsim OLT adapter
and ponsim ONU adapter
3) Initial submission of request and response facade in both Twisted
python and Go Language
4) Initial implementation of device management and logical device management
in the Core
5) Update to the log module to allow dynamic setting of log level per
package using the gRPC API
6) Bug fixes and minor changes

Change-Id: Ia8f033da84cfd08275335bae9542802415e7bb0f
diff --git a/adapters/kafka/__init__.py b/adapters/kafka/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/adapters/kafka/__init__.py
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
new file mode 100644
index 0000000..74ed934
--- /dev/null
+++ b/adapters/kafka/adapter_request_facade.py
@@ -0,0 +1,168 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between CORE and an individual adapter.
+"""
+from uuid import uuid4
+
+import arrow
+import structlog
+from google.protobuf.json_format import MessageToJson
+from scapy.packet import Packet
+from twisted.internet.defer import inlineCallbacks, returnValue
+from zope.interface import implementer
+
+from adapters.common.event_bus import EventBusClient
+from adapters.common.frameio.frameio import hexify
+from adapters.common.utils.id_generation import create_cluster_logical_device_ids
+from adapters.interface import IAdapterInterface
+from adapters.protos.device_pb2 import Device
+
+from adapters.protos import third_party
+from adapters.protos.device_pb2 import Device, Port, PmConfigs
+from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
+    AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+from adapters.protos.events_pb2 import KpiEvent
+from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
+    LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
+from adapters.common.utils.registry import registry
+from adapters.common.utils.id_generation import create_cluster_device_id
+from adapters.protos.core_adapter_pb2 import IntType
+import re
+
+
+class MacAddressError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+class IDError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+@implementer(IAdapterInterface)
+class AdapterRequestFacade(object):
+    """
+    Gate-keeper between CORE and device adapters.
+
+    On one side it interacts with Core's internal model and update/dispatch
+    mechanisms.
+
+    On the other side, it interacts with the adapters standard interface as
+    defined in
+    """
+
+    def __init__(self, adapter):
+        self.adapter = adapter
+
+    @inlineCallbacks
+    def start(self):
+        self.log.debug('starting')
+
+    @inlineCallbacks
+    def stop(self):
+        self.log.debug('stopping')
+
+    def adopt_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return (True, self.adapter.adopt_device(d))
+        else:
+            return (False, d)
+
+    def get_ofp_device_info(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return (True, self.adapter.get_ofp_device_info(d))
+        else:
+            return (False, d)
+
+    def get_ofp_port_info(self, device, port_no):
+        d = Device()
+        if device:
+            device.Unpack(d)
+        else:
+            return (False, d)
+
+        p = IntType()
+        port_no.Unpack(p)
+
+        return (True, self.adapter.get_ofp_port_info(d, p.val))
+
+
+    def reconcile_device(self, device):
+        return self.adapter.reconcile_device(device)
+
+    def abandon_device(self, device):
+        return self.adapter.abandon_device(device)
+
+    def disable_device(self, device):
+        return self.adapter.disable_device(device)
+
+    def reenable_device(self, device):
+        return self.adapter.reenable_device(device)
+
+    def reboot_device(self, device):
+        d = Device()
+        if device:
+            device.Unpack(d)
+            return (True, self.adapter.reboot_device(d))
+        else:
+            return (False, d)
+
+    def download_image(self, device, request):
+        return self.adapter.download_image(device, request)
+
+    def get_image_download_status(self, device, request):
+        return self.adapter.get_image_download_status(device, request)
+
+    def cancel_image_download(self, device, request):
+        return self.adapter.cancel_image_download(device, request)
+
+    def activate_image_update(self, device, request):
+        return self.adapter.activate_image_update(device, request)
+
+    def revert_image_update(self, device, request):
+        return self.adapter.revert_image_update(device, request)
+
+    def self_test(self, device):
+        return self.adapter.self_test_device(device)
+
+    def delete_device(self, device):
+        # Remove all child devices
+        self.delete_all_child_devices(device.id)
+
+        return self.adapter.delete_device(device)
+
+    def get_device_details(self, device):
+        return self.adapter.get_device_details(device)
+
+    def update_flows_bulk(self, device, flows, groups):
+        return self.adapter.update_flows_bulk(device, flows, groups)
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        return self.adapter.update_flows_incrementally(device, flow_changes, group_changes)
+
+    def suppress_alarm(self, filter):
+        return self.adapter.suppress_alarm(filter)
+
+    def unsuppress_alarm(self, filter):
+        return self.adapter.unsuppress_alarm(filter)
+
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
new file mode 100644
index 0000000..bcc4239
--- /dev/null
+++ b/adapters/kafka/core_proxy.py
@@ -0,0 +1,331 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between CORE and an individual adapter.
+"""
+from uuid import uuid4
+
+import arrow
+import structlog
+from google.protobuf.json_format import MessageToJson
+from scapy.packet import Packet
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from adapters.common.event_bus import EventBusClient
+from adapters.common.frameio.frameio import hexify
+from adapters.common.utils.id_generation import create_cluster_logical_device_ids
+from adapters.interface import IAdapterInterface
+from adapters.protos import third_party
+from adapters.protos.device_pb2 import Device, Port, PmConfigs
+from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
+    AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+from adapters.protos.events_pb2 import KpiEvent
+from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
+    LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey, CoreInstance
+from adapters.common.utils.registry import registry, IComponent
+from adapters.common.utils.id_generation import create_cluster_device_id
+import re
+from adapters.interface import ICoreSouthBoundInterface
+from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
+from adapters.protos.common_pb2 import ID
+from google.protobuf.message import Message
+from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+
+log = structlog.get_logger()
+
+class KafkaMessagingError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+def wrap_request(return_cls):
+    def real_wrapper(func):
+        @inlineCallbacks
+        def wrapper(*args, **kw):
+            try:
+                (success, d) = yield func(*args, **kw)
+                if success:
+                    log.debug("successful-response", func=func, val=d)
+                    if return_cls is not None:
+                        rc = return_cls()
+                        if d is not None:
+                            d.Unpack(rc)
+                        returnValue(rc)
+                    else:
+                        log.debug("successful-response-none", func=func,
+                                  val=None)
+                        returnValue(None)
+                else:
+                    log.warn("unsuccessful-request", func=func, args=args, kw=kw)
+                    returnValue(d)
+            except Exception as e:
+                log.exception("request-wrapper-exception", func=func, e=e)
+                raise
+        return wrapper
+    return real_wrapper
+
+
+@implementer(IComponent, ICoreSouthBoundInterface)
+class CoreProxy(object):
+
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        self.kafka_proxy = kafka_proxy
+        self.listening_topic = my_listening_topic
+        self.core_topic = core_topic
+        self.default_timeout = 3
+
+    def start(self):
+        log.info('started')
+
+        return self
+
+    def stop(self):
+        log.info('stopped')
+
+    @inlineCallbacks
+    def invoke(self, rpc, to_topic=None, **kwargs):
+        @inlineCallbacks
+        def _send_request(rpc, m_callback,to_topic, **kwargs):
+            try:
+                log.debug("sending-request", rpc=rpc)
+                if to_topic is None:
+                    to_topic = self.core_topic
+                result = yield self.kafka_proxy.send_request(rpc=rpc,
+                                                             to_topic=to_topic,
+                                                             reply_topic=self.listening_topic,
+                                                             callback=None,
+                                                             **kwargs)
+                if not m_callback.called:
+                    m_callback.callback(result)
+                else:
+                    log.debug('timeout-already-occurred', rpc=rpc)
+            except Exception as e:
+                log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+                if not m_callback.called:
+                    m_callback.errback(failure.Failure())
+
+        log.debug('invoke-request', rpc=rpc)
+        cb = DeferredWithTimeout(timeout=self.default_timeout)
+        _send_request(rpc, cb, to_topic, **kwargs)
+        try:
+            res = yield cb
+            returnValue(res)
+        except TimeOutError as e:
+            log.warn('invoke-timeout', e=e)
+            raise e
+
+
+    @wrap_request(CoreInstance)
+    @inlineCallbacks
+    def register(self, adapter):
+        log.debug("register")
+        try:
+            res = yield self.invoke(rpc="Register", adapter=adapter)
+            log.info("registration-returned", res=res)
+            returnValue(res)
+        except Exception as e:
+            log.exception("registration-exception", e=e)
+            raise
+
+    @wrap_request(Device)
+    @inlineCallbacks
+    def get_device(self, device_id):
+        log.debug("get-device")
+        id = ID()
+        id.id = device_id
+        res = yield self.invoke(rpc="GetDevice", device_id=id)
+        returnValue(res)
+
+    @wrap_request(Device)
+    @inlineCallbacks
+    def get_child_device(self, parent_device_id, **kwargs):
+        raise NotImplementedError()
+
+    # def add_device(self, device):
+    #     raise NotImplementedError()
+
+    def get_ports(self, device_id, port_type):
+        raise NotImplementedError()
+
+    def get_child_devices(self, parent_device_id):
+        raise NotImplementedError()
+
+    def get_child_device_with_proxy_address(self, proxy_address):
+        raise NotImplementedError()
+
+    def _to_proto(self, **kwargs):
+        encoded = {}
+        for k,v in kwargs.iteritems():
+            if isinstance(v, Message):
+                encoded[k] = v
+            elif type(v) == int:
+                i_proto = IntType()
+                i_proto.val = v
+                encoded[k] = i_proto
+            elif type(v) == str:
+                s_proto = StrType()
+                s_proto.val = v
+                encoded[k] = s_proto
+            elif type(v) == bool:
+                b_proto = BoolType()
+                b_proto.val = v
+                encoded[k] = b_proto
+        return encoded
+
+
+    @wrap_request(None)
+    @inlineCallbacks
+    def child_device_detected(self,
+                              parent_device_id,
+                              parent_port_no,
+                              child_device_type,
+                              channel_id,
+                              **kw):
+        id = ID()
+        id.id = parent_device_id
+        ppn = IntType()
+        ppn.val = parent_port_no
+        cdt = StrType()
+        cdt.val = child_device_type
+        channel = IntType()
+        channel.val = channel_id
+
+        args = self._to_proto(**kw)
+        res = yield self.invoke(rpc="ChildDeviceDetected",
+                                parent_device_id=id,
+                                parent_port_no = ppn,
+                                child_device_type= cdt,
+                                channel_id=channel,
+                                **args)
+        returnValue(res)
+
+
+    @wrap_request(None)
+    @inlineCallbacks
+    def device_update(self, device):
+        log.debug("device_update")
+        res = yield self.invoke(rpc="DeviceUpdate", device=device)
+        returnValue(res)
+
+    def child_device_removed(parent_device_id, child_device_id):
+        raise NotImplementedError()
+
+
+    @wrap_request(None)
+    @inlineCallbacks
+    def device_state_update(self, device_id,
+                                   oper_status=None,
+                                   connect_status=None):
+
+        id = ID()
+        id.id = device_id
+        o_status = IntType()
+        if oper_status:
+            o_status.val = oper_status
+        else:
+            o_status.val = -1
+        c_status = IntType()
+        if connect_status:
+            c_status.val = connect_status
+        else:
+            c_status.val = -1
+        a_status = IntType()
+
+        res = yield self.invoke(rpc="DeviceStateUpdate",
+                                device_id=id,
+                                oper_status=o_status,
+                                connect_status=c_status)
+        returnValue(res)
+
+    @wrap_request(None)
+    @inlineCallbacks
+    def child_devices_state_update(self, parent_device_id,
+                                   oper_status=None,
+                                   connect_status=None,
+                                   admin_state=None):
+
+        id = ID()
+        id.id = parent_device_id
+        o_status = IntType()
+        if oper_status:
+            o_status.val = oper_status
+        else:
+            o_status.val = -1
+        c_status = IntType()
+        if connect_status:
+            c_status.val = connect_status
+        else:
+            c_status.val = -1
+        a_status = IntType()
+        if admin_state:
+            a_status.val = admin_state
+        else:
+            a_status.val = -1
+
+        res = yield self.invoke(rpc="child_devices_state_update",
+                                parent_device_id=id,
+                                oper_status=o_status,
+                                connect_status=c_status,
+                                admin_state=a_status)
+        returnValue(res)
+
+
+    def child_devices_removed(parent_device_id):
+        raise NotImplementedError()
+
+
+    @wrap_request(None)
+    @inlineCallbacks
+    def device_pm_config_update(self, device_pm_config, init=False):
+        log.debug("device_pm_config_update")
+        b = BoolType()
+        b.val = init
+        res = yield self.invoke(rpc="DevicePMConfigUpdate",
+                                device_pm_config=device_pm_config, init=b)
+        returnValue(res)
+
+    @wrap_request(None)
+    @inlineCallbacks
+    def port_created(self, device_id, port):
+        log.debug("port_created")
+        proto_id = ID()
+        proto_id.id = device_id
+        res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
+        returnValue(res)
+
+
+    def port_removed(device_id, port):
+        raise NotImplementedError()
+
+    def ports_enabled(device_id):
+        raise NotImplementedError()
+
+    def ports_disabled(device_id):
+        raise NotImplementedError()
+
+    def ports_oper_status_update(device_id, oper_status):
+        raise NotImplementedError()
+
+    def image_download_update(img_dnld):
+        raise NotImplementedError()
+
+    def image_download_deleted(img_dnld):
+        raise NotImplementedError()
+
+    def packet_in(device_id, egress_port_no, packet):
+        raise NotImplementedError()
diff --git a/adapters/kafka/event_bus_publisher.py b/adapters/kafka/event_bus_publisher.py
new file mode 100644
index 0000000..011fdea
--- /dev/null
+++ b/adapters/kafka/event_bus_publisher.py
@@ -0,0 +1,90 @@
+#!/usr/bin/env python
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+A gateway between the internal event bus and the Kafka publisher proxy
+to publish select topics and messages posted to the Voltha-internal event
+bus toward the external world.
+"""
+import structlog
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+
+from adapters.common.event_bus import EventBusClient
+
+log = structlog.get_logger()
+
+
+class EventBusPublisher(object):
+
+    def __init__(self, kafka_proxy, config):
+        self.kafka_proxy = kafka_proxy
+        self.config = config
+        self.topic_mappings = config.get('topic_mappings', {})
+        self.event_bus = EventBusClient()
+        self.subscriptions = None
+
+    def start(self):
+        log.debug('starting')
+        self.subscriptions = list()
+        self._setup_subscriptions(self.topic_mappings)
+        log.info('started')
+        return self
+
+    def stop(self):
+        try:
+            log.debug('stopping-event-bus')
+            if self.subscriptions:
+                for subscription in self.subscriptions:
+                    self.event_bus.unsubscribe(subscription)
+            log.info('stopped-event-bus')
+        except Exception, e:
+            log.exception('failed-stopping-event-bus', e=e)
+            return
+
+    def _setup_subscriptions(self, mappings):
+
+        for event_bus_topic, mapping in mappings.iteritems():
+
+            kafka_topic = mapping.get('kafka_topic', None)
+
+            if kafka_topic is None:
+                log.error('no-kafka-topic-in-config',
+                          event_bus_topic=event_bus_topic,
+                          mapping=mapping)
+                continue
+
+            self.subscriptions.append(self.event_bus.subscribe(
+                event_bus_topic,
+                # to avoid Python late-binding to the last registered
+                # kafka_topic, we force instant binding with the default arg
+                lambda _, m, k=kafka_topic: self.forward(k, m)))
+
+            log.info('event-to-kafka', kafka_topic=kafka_topic,
+                     event_bus_topic=event_bus_topic)
+
+    def forward(self, kafka_topic, msg):
+        try:
+            # convert to JSON string if msg is a protobuf msg
+            if isinstance(msg, Message):
+                msg = dumps(MessageToDict(msg, True, True))
+            log.debug('forward-event-bus-publisher')
+            self.kafka_proxy.send_message(kafka_topic, msg)
+        except Exception, e:
+            log.exception('failed-forward-event-bus-publisher', e=e)
+
diff --git a/adapters/kafka/kafka_inter_container_library.py b/adapters/kafka/kafka_inter_container_library.py
new file mode 100644
index 0000000..ad53812
--- /dev/null
+++ b/adapters/kafka/kafka_inter_container_library.py
@@ -0,0 +1,584 @@
+#!/usr/bin/env python
+
+from zope.interface import Interface, implementer
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
+    DeferredQueue, gatherResults
+from afkak.client import KafkaClient
+from afkak.consumer import OFFSET_LATEST, Consumer
+import structlog
+from adapters.common.utils import asleep
+from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
+    InterContainerRequestBody, InterContainerMessage, Header, InterContainerResponseBody
+import time
+from uuid import uuid4
+from adapters.common.utils.registry import IComponent
+
+
+log = structlog.get_logger()
+
+class KafkaMessagingError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+@implementer(IComponent)
+class IKafkaMessagingProxy(object):
+    _kafka_messaging_instance = None
+
+    def __init__(self,
+                 kafka_host_port,
+                 kv_store,
+                 default_topic,
+                 target_cls):
+        """
+        Initialize the kafka proxy.  This is a singleton (may change to
+        non-singleton if performance is better)
+        :param kafka_host_port: Kafka host and port
+        :param kv_store: Key-Value store
+        :param default_topic: Default topic to subscribe to
+        :param target_cls: target class - method of that class is invoked
+        when a message is received on the default_topic
+        """
+        # return an exception if the object already exist
+        if IKafkaMessagingProxy._kafka_messaging_instance:
+            raise Exception(
+                'Singleton-exist', cls=IKafkaMessagingProxy)
+
+        log.debug("Initializing-KafkaProxy")
+        self.kafka_host_port = kafka_host_port
+        self.kv_store = kv_store
+        self.default_topic = default_topic
+        self.target_cls = target_cls
+        self.topic_target_cls_map = {}
+        self.topic_consumer_map = {}
+        self.topic_callback_map = {}
+        self.subscribers = {}
+        self.kafka_client = None
+        self.kafka_proxy = None
+        self.transaction_id_deferred_map = {}
+        self.received_msg_queue = DeferredQueue()
+
+        self.init_time = 0
+        self.init_received_time = 0
+
+        self.init_resp_time = 0
+        self.init_received_resp_time = 0
+
+        self.num_messages = 0
+        self.total_time = 0
+        self.num_responses = 0
+        self.total_time_responses = 0
+        log.debug("KafkaProxy-initialized")
+
+    def start(self):
+        try:
+            # Create the kafka client
+            # assert self.kafka_host is not None
+            # assert self.kafka_port is not None
+            # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
+            self.kafka_client = KafkaClient(self.kafka_host_port)
+
+            # Get the kafka proxy instance.  If it does not exist then
+            # create it
+            self.kafka_proxy = get_kafka_proxy()
+            if self.kafka_proxy == None:
+                KafkaProxy(kafka_endpoint=self.kafka_host_port).start()
+                self.kafka_proxy = get_kafka_proxy()
+
+            # Subscribe the default topic and target_cls
+            self.topic_target_cls_map[self.default_topic] = self.target_cls
+
+            # Start the queue to handle incoming messages
+            reactor.callLater(0, self._received_message_processing_loop)
+
+            # Start listening for incoming messages
+            reactor.callLater(0, self.subscribe, self.default_topic,
+                              target_cls=self.target_cls)
+
+            # Setup the singleton instance
+            IKafkaMessagingProxy._kafka_messaging_instance = self
+        except Exception as e:
+            log.exception("Failed-to-start-proxy", e=e)
+
+
+    def stop(self):
+        """
+        Invoked to stop the kafka proxy
+        :return: None on success, Exception on failure
+        """
+        log.debug("Stopping-messaging-proxy ...")
+        try:
+            # Stop all the consumers
+            deferred_list = []
+            for key, values in self.topic_consumer_map.iteritems():
+                deferred_list.extend([c.stop() for c in values])
+
+            if not deferred_list:
+                d = gatherResults(deferred_list)
+                d.addCallback(lambda result: self.kafka_client.close())
+            log.debug("Messaging-proxy-stopped.")
+        except Exception as e:
+            log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+
+
+    @inlineCallbacks
+    def _wait_until_topic_is_ready(self, client, topic):
+        e = True
+        while e:
+            yield client.load_metadata_for_topics(topic)
+            e = client.metadata_error_for_topic(topic)
+            if e:
+                log.debug("Topic-not-ready-retrying...", topic=topic)
+
+    def _clear_backoff(self):
+        if self.retries:
+            log.info('reconnected-to-consul', after_retries=self.retries)
+            self.retries = 0
+
+    @inlineCallbacks
+    def _subscribe(self, topic, callback=None, target_cls=None):
+        try:
+            yield self._wait_until_topic_is_ready(self.kafka_client, topic)
+            partitions = self.kafka_client.topic_partitions[topic]
+            consumers = []
+
+            # First setup the generic callback - all received messages will
+            # go through that queue
+            if topic not in self.topic_consumer_map:
+                consumers = [Consumer(self.kafka_client, topic, partition,
+                                      self._enqueue_received_message)
+                             for partition in partitions]
+                self.topic_consumer_map[topic] = consumers
+
+            log.debug("_subscribe", topic=topic, consumermap=self.topic_consumer_map)
+
+            if target_cls is not None and callback is None:
+                # Scenario #1
+                if topic not in self.topic_target_cls_map:
+                    self.topic_target_cls_map[topic] = target_cls
+            elif target_cls is None and callback is not None:
+                # Scenario #2
+                log.debug("custom-callback", topic=topic,
+                          callback_map=self.topic_callback_map)
+                if topic not in self.topic_callback_map:
+                    self.topic_callback_map[topic] = [callback]
+                else:
+                    self.topic_callback_map[topic].extend([callback])
+            else:
+                log.warn("invalid-parameters")
+
+            def cb_closed(result):
+                """
+                Called when a consumer cleanly stops.
+                """
+                log.debug("Consumers-cleanly-stopped")
+
+            def eb_failed(failure):
+                """
+                Called when a consumer fails due to an uncaught exception in the
+                processing callback or a network error on shutdown. In this case we
+                simply log the error.
+                """
+                log.warn("Consumers-failed", failure=failure)
+
+            for c in consumers:
+                c.start(OFFSET_LATEST).addCallbacks(cb_closed, eb_failed)
+
+            returnValue(True)
+        except Exception as e:
+            log.exception("Exception-during-subscription", e=e)
+            returnValue(False)
+
+    def subscribe(self, topic, callback=None, target_cls=None,
+                  max_retry=3):
+        """
+        Scenario 1:  invoked to subscribe to a specific topic with a
+        target_cls to invoke when a message is received on that topic.  This
+        handles the case of request/response where this library performs the
+        heavy lifting. In this case the m_callback must to be None
+
+        Scenario 2:  invoked to subscribe to a specific topic with a
+        specific callback to invoke when a message is received on that topic.
+        This handles the case where the caller wants to process the message
+        received itself. In this case the target_cls must to be None
+
+        :param topic: topic to subscribe to
+        :param callback: Callback to invoke when a message is received on
+        the topic. Either one of callback or target_cls needs can be none
+        :param target_cls:  Target class to use when a message is
+        received on the topic. There can only be 1 target_cls per topic.
+        Either one of callback or target_cls needs can be none
+        :param max_retry:  the number of retries before reporting failure
+        to subscribe.  This caters for scenario where the kafka topic is not
+        ready.
+        :return: True on success, False on failure
+        """
+        RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
+
+        def _backoff(msg, retries):
+            wait_time = RETRY_BACKOFF[min(retries,
+                                          len(RETRY_BACKOFF) - 1)]
+            log.info(msg, retry_in=wait_time)
+            return asleep(wait_time)
+
+        retry = 0
+        while not self._subscribe(topic, callback=callback,
+                                  target_cls=target_cls):
+            if retry > max_retry:
+                return False
+            else:
+                _backoff("subscription-not-complete", retry)
+                retry += 1
+        return True
+
+    def unsubscribe(self, topic):
+        """
+        Invoked when unsubscribing to a topic
+        :param topic: topic to unsubscibe from
+        :return: None on success or Exception on failure
+        """
+        log.debug("Unsubscribing-to-topic", topic=topic)
+
+        def remove_topic(topic):
+            if topic in self.topic_consumer_map:
+                del self.topic_consumer_map[topic]
+
+        try:
+            if topic in self.topic_consumer_map:
+                consumers = self.topic_consumer_map[topic]
+                d = gatherResults([c.stop() for c in consumers])
+                d.addCallback(remove_topic, topic)
+                log.debug("Unsubscribed-to-topic.", topic=topic)
+            else:
+                log.debug("Topic-does-not-exist.", topic=topic)
+        except Exception as e:
+            log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+
+    @inlineCallbacks
+    def _enqueue_received_message(self, reactor, message_list):
+        """
+        Internal method to continuously queue all received messaged
+        irrespective of topic
+        :param reactor: A requirement by the Twisted Python kafka library
+        :param message_list: Received list of messages
+        :return: None on success, Exception on failure
+        """
+        try:
+            for m in message_list:
+                log.debug("received-msg", msg=m)
+                yield self.received_msg_queue.put(m)
+        except Exception as e:
+            log.exception("Failed-enqueueing-received-message", e=e)
+
+    @inlineCallbacks
+    def _received_message_processing_loop(self):
+        """
+        Internal method to continuously process all received messages one
+        at a time
+        :return: None on success, Exception on failure
+        """
+        while True:
+            try:
+                message = yield self.received_msg_queue.get()
+                yield self._process_message(message)
+            except Exception as e:
+                log.exception("Failed-dequeueing-received-message", e=e)
+
+    def _to_string(self, unicode_str):
+        if unicode_str is not None:
+            if type(unicode_str) == unicode:
+                return unicode_str.encode('ascii', 'ignore')
+            else:
+                return unicode_str
+        else:
+            return None
+
+    def _format_request(self,
+                        rpc,
+                        to_topic,
+                        reply_topic,
+                        **kwargs):
+        """
+        Format a request to send over kafka
+        :param rpc: Requested remote API
+        :param to_topic: Topic to send the request
+        :param reply_topic: Topic to receive the resulting response, if any
+        :param kwargs: Dictionary of key-value pairs to pass as arguments to
+        the remote rpc API.
+        :return: A InterContainerMessage message type on success or None on
+        failure
+        """
+        try:
+            transaction_id = uuid4().hex
+            request = InterContainerMessage()
+            request_body = InterContainerRequestBody()
+            request.header.id = transaction_id
+            request.header.type = MessageType.Value("REQUEST")
+            request.header.from_topic = self.default_topic
+            request.header.to_topic = to_topic
+
+            response_required = False
+            if reply_topic:
+                request_body.reply_to_topic = reply_topic
+                response_required = True
+
+            request.header.timestamp = int(round(time.time() * 1000))
+            request_body.rpc = rpc
+            for a, b in kwargs.iteritems():
+                arg = Argument()
+                arg.key = a
+                try:
+                    arg.value.Pack(b)
+                    request_body.args.extend([arg])
+                except Exception as e:
+                    log.exception("Failed-parsing-value", e=e)
+            request_body.reply_to_topic = self.default_topic
+            request_body.response_required = response_required
+            request.body.Pack(request_body)
+            return request, transaction_id, response_required
+        except Exception as e:
+            log.exception("formatting-request-failed",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic,
+                          args=kwargs)
+            return None, None, None
+
+    def _format_response(self, msg_header, msg_body, status):
+        """
+        Format a response
+        :param msg_header: The header portion of a received request
+        :param msg_body: The response body
+        :param status: True is this represents a successful response
+        :return: a InterContainerMessage message type
+        """
+        try:
+            assert isinstance(msg_header, Header)
+            response = InterContainerMessage()
+            response_body = InterContainerResponseBody()
+            response.header.id = msg_header.id
+            response.header.timestamp = int(
+                round(time.time() * 1000))
+            response.header.type = MessageType.Value("RESPONSE")
+            response.header.from_topic = msg_header.to_topic
+            response.header.to_topic = msg_header.from_topic
+            if msg_body is not None:
+                response_body.result.Pack(msg_body)
+            response_body.success = status
+            response.body.Pack(response_body)
+            return response
+        except Exception as e:
+            log.exception("formatting-response-failed", header=msg_header,
+                          body=msg_body, status=status, e=e)
+            return None
+
+    def _parse_response(self, msg):
+        try:
+            message = InterContainerMessage()
+            message.ParseFromString(msg)
+            resp = InterContainerResponseBody()
+            if message.body.Is(InterContainerResponseBody.DESCRIPTOR):
+                message.body.Unpack(resp)
+            else:
+                log.debug("unsupported-msg", msg_type=type(message.body))
+                return None
+            log.debug("parsed-response", input=message, output=resp)
+            return resp
+        except Exception as e:
+            log.exception("parsing-response-failed", msg=msg, e=e)
+            return None
+
+    @inlineCallbacks
+    def _process_message(self, m):
+        """
+        Default internal method invoked for every batch of messages received
+        from Kafka.
+        """
+        def _toDict(args):
+            """
+            Convert a repeatable Argument type into a python dictionary
+            :param args: Repeatable core_adapter.Argument type
+            :return: a python dictionary
+            """
+            if args is None:
+                return None
+            result = {}
+            for arg in args:
+                assert isinstance(arg, Argument)
+                result[arg.key] = arg.value
+            return result
+
+        current_time = int(round(time.time() * 1000))
+        # log.debug("Got Message", message=m)
+        try:
+            val = m.message.value
+            # print m.topic
+
+            # Go over customized callbacks first
+            if m.topic in self.topic_callback_map:
+                for c in self.topic_callback_map[m.topic]:
+                    yield c(val)
+
+            #  Check whether we need to process request/response scenario
+            if m.topic not in self.topic_target_cls_map:
+                return
+
+            # Process request/response scenario
+            message = InterContainerMessage()
+            message.ParseFromString(val)
+
+            if message.header.type == MessageType.Value("REQUEST"):
+                # if self.num_messages == 0:
+                #     self.init_time = int(round(time.time() * 1000))
+                #     self.init_received_time = message.header.timestamp
+                #     log.debug("INIT_TIME", time=self.init_time,
+                #               time_sent=message.header.timestamp)
+                # self.num_messages = self.num_messages + 1
+                #
+                # self.total_time = self.total_time + current_time - message.header.timestamp
+                #
+                # if self.num_messages % 10 == 0:
+                #     log.debug("TOTAL_TIME ...",
+                #               num=self.num_messages,
+                #               total=self.total_time,
+                #               duration=current_time - self.init_time,
+                #               time_since_first_msg=current_time - self.init_received_time,
+                #               average=self.total_time / 10)
+                #     self.total_time = 0
+
+                # Get the target class for that specific topic
+                targetted_topic = self._to_string(message.header.to_topic)
+                msg_body = InterContainerRequestBody()
+                if message.body.Is(InterContainerRequestBody.DESCRIPTOR):
+                    message.body.Unpack(msg_body)
+                else:
+                    log.debug("unsupported-msg", msg_type=type(message.body))
+                    return
+                if targetted_topic in self.topic_target_cls_map:
+                    if msg_body.args:
+                        log.debug("message-body-args-present", body=msg_body)
+                        (status, res) = yield getattr(
+                            self.topic_target_cls_map[targetted_topic],
+                            self._to_string(msg_body.rpc))(
+                            **_toDict(msg_body.args))
+                    else:
+                        log.debug("message-body-args-absent", body=msg_body,
+                                  rpc=msg_body.rpc)
+                        (status, res) = yield getattr(
+                            self.topic_target_cls_map[targetted_topic],
+                            self._to_string(msg_body.rpc))()
+                    if msg_body.response_required:
+                        response = self._format_response(
+                            msg_header=message.header,
+                            msg_body=res,
+                            status=status,
+                        )
+                        if response is not None:
+                            res_topic = self._to_string(
+                                response.header.to_topic)
+                            self._send_kafka_message(res_topic, response)
+
+                        log.debug("Response-sent", response=response.body)
+            elif message.header.type == MessageType.Value("RESPONSE"):
+                trns_id = self._to_string(message.header.id)
+                if trns_id in self.transaction_id_deferred_map:
+                    # self.num_responses = self.num_responses + 1
+                    # self.total_time_responses = self.total_time_responses + current_time - \
+                    #                             message.header.timestamp
+                    # if self.num_responses % 10 == 0:
+                    #     log.debug("TOTAL RESPONSES ...",
+                    #               num=self.num_responses,
+                    #               total=self.total_time_responses,
+                    #               average=self.total_time_responses / 10)
+                    #     self.total_time_responses = 0
+
+                    resp = self._parse_response(val)
+
+                    self.transaction_id_deferred_map[trns_id].callback(resp)
+            else:
+                log.error("!!INVALID-TRANSACTION-TYPE!!")
+
+        except Exception as e:
+            log.exception("Failed-to-process-message", message=m, e=e)
+
+    @inlineCallbacks
+    def _send_kafka_message(self, topic, msg):
+        try:
+            yield self.kafka_proxy.send_message(topic, msg.SerializeToString())
+        except Exception, e:
+            log.exception("Failed-sending-message", message=msg, e=e)
+
+    @inlineCallbacks
+    def send_request(self,
+                     rpc,
+                     to_topic,
+                     reply_topic=None,
+                     callback=None,
+                     **kwargs):
+        """
+        Invoked to send a message to a remote container and receive a
+        response if required.
+        :param rpc: The remote API to invoke
+        :param to_topic: Send the message to this kafka topic
+        :param reply_topic: If not None then a response is expected on this
+        topic.  If set to None then no response is required.
+        :param callback: Callback to invoke when a response is received.
+        :param kwargs: Key-value pairs representing arguments to pass to the
+        rpc remote API.
+        :return: Either no response is required, or a response is returned
+        via the callback or the response is a tuple of (status, return_cls)
+        """
+        try:
+            # Ensure all strings are not unicode encoded
+            rpc = self._to_string(rpc)
+            to_topic = self._to_string(to_topic)
+            reply_topic = self._to_string(reply_topic)
+
+            request, transaction_id, response_required = \
+                self._format_request(
+                    rpc=rpc,
+                    to_topic=to_topic,
+                    reply_topic=reply_topic,
+                    **kwargs)
+
+            if request is None:
+                return
+
+            # Add the transaction to the transaction map before sending the
+            # request.  This will guarantee the eventual response will be
+            # processed.
+            wait_for_result = None
+            if response_required:
+                wait_for_result = Deferred()
+                self.transaction_id_deferred_map[
+                    self._to_string(request.header.id)] = wait_for_result
+
+            log.debug("BEFORE-SENDING", to_topic=to_topic, from_topic=reply_topic)
+            yield self._send_kafka_message(to_topic, request)
+            log.debug("AFTER-SENDING", to_topic=to_topic, from_topic=reply_topic)
+
+            if response_required:
+                res = yield wait_for_result
+
+                if res is None or not res.success:
+                    raise KafkaMessagingError(error="Failed-response:{"
+                                                    "}".format(res))
+
+                # Remove the transaction from the transaction map
+                del self.transaction_id_deferred_map[transaction_id]
+
+                log.debug("send-message-response", rpc=rpc, result=res)
+
+                if callback:
+                    callback((res.success, res.result))
+                else:
+                    returnValue((res.success, res.result))
+        except Exception as e:
+            log.exception("Exception-sending-request", e=e)
+            raise KafkaMessagingError(error=e)
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_messaging_proxy():
+    return IKafkaMessagingProxy._kafka_messaging_instance
diff --git a/adapters/kafka/kafka_proxy.py b/adapters/kafka/kafka_proxy.py
new file mode 100644
index 0000000..10fdbf8
--- /dev/null
+++ b/adapters/kafka/kafka_proxy.py
@@ -0,0 +1,209 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from afkak.client import KafkaClient as _KafkaClient
+from afkak.common import (
+    PRODUCER_ACK_LOCAL_WRITE, PRODUCER_ACK_NOT_REQUIRED
+)
+from afkak.producer import Producer as _kafkaProducer
+from structlog import get_logger
+from twisted.internet.defer import inlineCallbacks, returnValue
+from zope.interface import implementer
+
+from adapters.common.utils.consulhelpers import get_endpoint_from_consul
+from adapters.kafka.event_bus_publisher import EventBusPublisher
+from adapters.common.utils.registry import IComponent
+import time
+
+log = get_logger()
+
+
+@implementer(IComponent)
+class KafkaProxy(object):
+    """
+    This is a singleton proxy kafka class to hide the kafka client details.
+    """
+    _kafka_instance = None
+
+    def __init__(self,
+                 consul_endpoint='localhost:8500',
+                 kafka_endpoint='localhost:9092',
+                 ack_timeout=1000,
+                 max_req_attempts=10,
+                 config={}):
+
+        # return an exception if the object already exist
+        if KafkaProxy._kafka_instance:
+            raise Exception('Singleton exist for :{}'.format(KafkaProxy))
+
+        log.debug('initializing', endpoint=kafka_endpoint)
+        self.ack_timeout = ack_timeout
+        self.max_req_attempts = max_req_attempts
+        self.consul_endpoint = consul_endpoint
+        self.kafka_endpoint = kafka_endpoint
+        self.config = config
+        self.kclient = None
+        self.kproducer = None
+        self.event_bus_publisher = None
+        self.stopping = False
+        self.faulty = False
+        log.debug('initialized', endpoint=kafka_endpoint)
+
+    @inlineCallbacks
+    def start(self):
+        log.debug('starting')
+        self._get_kafka_producer()
+        KafkaProxy._kafka_instance = self
+        self.event_bus_publisher = yield EventBusPublisher(
+            self, self.config.get('event_bus_publisher', {})).start()
+        log.info('started')
+        KafkaProxy.faulty = False
+        self.stopping = False
+        returnValue(self)
+
+    @inlineCallbacks
+    def stop(self):
+        try:
+            log.debug('stopping-kafka-proxy')
+            try:
+                if self.kclient:
+                    yield self.kclient.close()
+                    self.kclient = None
+                    log.debug('stopped-kclient-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kclient-kafka-proxy', e=e)
+                pass
+
+            try:
+                if self.kproducer:
+                    yield self.kproducer.stop()
+                    self.kproducer = None
+                    log.debug('stopped-kproducer-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
+                pass
+
+            #try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            #except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
+
+            log.debug('stopped-kafka-proxy')
+
+        except Exception, e:
+            self.kclient = None
+            self.kproducer = None
+            #self.event_bus_publisher = None
+            log.exception('failed-stopped-kafka-proxy', e=e)
+            pass
+
+    def _get_kafka_producer(self):
+        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
+        #  to a local log before sending response
+        try:
+
+            if self.kafka_endpoint.startswith('@'):
+                try:
+                    _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
+                                                           self.kafka_endpoint[1:])
+                    log.debug('found-kafka-service', endpoint=_k_endpoint)
+
+                except Exception as e:
+                    log.exception('no-kafka-service-in-consul', e=e)
+
+                    self.kproducer = None
+                    self.kclient = None
+                    return
+            else:
+                _k_endpoint = self.kafka_endpoint
+
+            self.kclient = _KafkaClient(_k_endpoint)
+            self.kproducer = _kafkaProducer(self.kclient,
+                                            req_acks=PRODUCER_ACK_NOT_REQUIRED,
+                                            # req_acks=PRODUCER_ACK_LOCAL_WRITE,
+                                            # ack_timeout=self.ack_timeout,
+                                            # max_req_attempts=self.max_req_attempts)
+                                            )
+        except Exception, e:
+            log.exception('failed-get-kafka-producer', e=e)
+            return
+
+    @inlineCallbacks
+    def send_message(self, topic, msg):
+        assert topic is not None
+        assert msg is not None
+
+        # first check whether we have a kafka producer.  If there is none
+        # then try to get one - this happens only when we try to lookup the
+        # kafka service from consul
+        try:
+            if self.faulty is False:
+
+                if self.kproducer is None:
+                    self._get_kafka_producer()
+                    # Lets the next message request do the retry if still a failure
+                    if self.kproducer is None:
+                        log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+                        return
+
+                # log.debug('sending-kafka-msg', topic=topic, msg=msg)
+                msgs = [msg]
+
+                if self.kproducer and self.kclient and \
+                        self.event_bus_publisher and self.faulty is False:
+                    # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
+
+                    yield self.kproducer.send_messages(topic, msgs=msgs)
+                    # self.kproducer.send_messages(topic, msgs=msgs)
+                    # log.debug('sent-kafka-msg', topic=topic, msg=msg)
+                else:
+                    return
+
+        except Exception, e:
+            self.faulty = True
+            log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+
+            # set the kafka producer to None.  This is needed if the
+            # kafka docker went down and comes back up with a different
+            # port number.
+            if self.stopping is False:
+                log.debug('stopping-kafka-proxy')
+                try:
+                    self.stopping = True
+                    self.stop()
+                    self.stopping = False
+                    self.faulty = False
+                    log.debug('stopped-kafka-proxy')
+                except Exception, e:
+                    log.exception('failed-stopping-kafka-proxy', e=e)
+                    pass
+            else:
+                log.info('already-stopping-kafka-proxy')
+
+            return
+
+    def is_faulty(self):
+        return self.faulty
+
+
+# Common method to get the singleton instance of the kafka proxy class
+def get_kafka_proxy():
+    return KafkaProxy._kafka_instance
+