[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.

Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/kafka/adapter_proxy.py b/adapters/kafka/adapter_proxy.py
new file mode 100644
index 0000000..2d4831a
--- /dev/null
+++ b/adapters/kafka/adapter_proxy.py
@@ -0,0 +1,110 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Agent to play gateway between adapters.
+"""
+
+import structlog
+from uuid import uuid4
+from twisted.internet.defer import inlineCallbacks, returnValue
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos import third_party
+from adapters.protos.core_adapter_pb2 import InterAdapterHeader, \
+    InterAdapterMessage
+import time
+
+_ = third_party
+log = structlog.get_logger()
+
+
+class AdapterProxy(ContainerProxy):
+
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        super(AdapterProxy, self).__init__(kafka_proxy,
+                                           core_topic,
+                                           my_listening_topic)
+
+    def _to_string(self, unicode_str):
+        if unicode_str is not None:
+            if type(unicode_str) == unicode:
+                return unicode_str.encode('ascii', 'ignore')
+            else:
+                return unicode_str
+        else:
+            return ""
+
+    @ContainerProxy.wrap_request(None)
+    @inlineCallbacks
+    def send_inter_adapter_message(self,
+                                   msg,
+                                   type,
+                                   from_adapter,
+                                   to_adapter,
+                                   to_device_id=None,
+                                   proxy_device_id=None,
+                                   message_no=None):
+        """
+        Sends a message directly to an adapter. This is typically used to send
+        proxied messages from one adapter to another.  An initial ACK response
+        is sent back to the invoking adapter.  If there is subsequent response
+        to be sent back (async) then the adapter receiving this request will
+        use this same API to send back the async response.
+        :param msg : GRPC message to send
+        :param type : InterAdapterMessageType of the message to send
+        :param from_adapter: Name of the adapter making the request.
+        :param to_adapter: Name of the remote adapter.
+        :param to_device_id: The ID of the device for to the message is
+        intended. if it's None then the message is not intended to a specific
+        device.  Its interpretation is adapter specific.
+        :param proxy_device_id: The ID of the device which will proxy that
+        message. If it's None then there is no specific device to proxy the
+        message.  Its interpretation is adapter specific.
+        :param message_no: A unique number for this transaction that the
+        adapter may use to correlate a request and an async response.
+        """
+
+        try:
+            # validate params
+            assert msg
+            assert from_adapter
+            assert to_adapter
+
+            # Build the inter adapter message
+            h = InterAdapterHeader()
+            h.type = type
+            h.from_topic = self._to_string(from_adapter)
+            h.to_topic = self._to_string(to_adapter)
+            h.to_device_id = self._to_string(to_device_id)
+            h.proxy_device_id = self._to_string(proxy_device_id)
+
+            if message_no:
+                h.id = self._to_string(message_no)
+            else:
+                h.id = uuid4().hex
+
+            h.timestamp = int(round(time.time() * 1000))
+            iaMsg = InterAdapterMessage()
+            iaMsg.header.CopyFrom(h)
+            iaMsg.body.Pack(msg)
+
+            log.debug("sending-inter-adapter-message", header=iaMsg.header)
+            res = yield self.invoke(rpc="process_inter_adapter_message",
+                                    to_topic=iaMsg.header.to_topic,
+                                    msg=iaMsg)
+            returnValue(res)
+        except Exception as e:
+            log.exception("error-sending-request", e=e)
diff --git a/adapters/kafka/adapter_request_facade.py b/adapters/kafka/adapter_request_facade.py
index 3009206..67f7869 100644
--- a/adapters/kafka/adapter_request_facade.py
+++ b/adapters/kafka/adapter_request_facade.py
@@ -15,35 +15,18 @@
 #
 
 """
-Agent to play gateway between CORE and an individual adapter.
+This facade handles kafka-formatted messages from the Core, extracts the kafka
+formatting and forwards the request to the concrete handler.
 """
-from uuid import uuid4
 
-import arrow
-import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
 from adapters.interface import IAdapterInterface
+from adapters.protos.core_adapter_pb2 import IntType, InterAdapterMessage
 from adapters.protos.device_pb2 import Device
-
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
-    AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
-    LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
-from adapters.common.utils.registry import registry
-from adapters.common.utils.id_generation import create_cluster_device_id
-from adapters.protos.core_adapter_pb2 import IntType
-from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, FlowGroupChanges
-import re
+from adapters.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
+    FlowGroupChanges
 
 
 class MacAddressError(BaseException):
@@ -107,7 +90,6 @@
 
         return True, self.adapter.get_ofp_port_info(d, p.val)
 
-
     def reconcile_device(self, device):
         return self.adapter.reconcile_device(device)
 
@@ -207,3 +189,11 @@
     def unsuppress_alarm(self, filter):
         return self.adapter.unsuppress_alarm(filter)
 
+    def process_inter_adapter_message(self, msg):
+        m = InterAdapterMessage()
+        if msg:
+            msg.Unpack(m)
+        else:
+            return (False, m)
+
+        return (True, self.adapter.process_inter_adapter_message(m))
diff --git a/adapters/kafka/container_proxy.py b/adapters/kafka/container_proxy.py
new file mode 100644
index 0000000..79918cd
--- /dev/null
+++ b/adapters/kafka/container_proxy.py
@@ -0,0 +1,114 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The superclass for all kafka proxy subclasses.
+"""
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from adapters.common.utils.deferred_utils import DeferredWithTimeout, \
+    TimeOutError
+from adapters.common.utils.registry import IComponent
+
+log = structlog.get_logger()
+
+
+class KafkaMessagingError(BaseException):
+    def __init__(self, error):
+        self.error = error
+
+
+@implementer(IComponent)
+class ContainerProxy(object):
+
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        self.kafka_proxy = kafka_proxy
+        self.listening_topic = my_listening_topic
+        self.core_topic = core_topic
+        self.default_timeout = 3
+
+    def start(self):
+        log.info('started')
+
+        return self
+
+    def stop(self):
+        log.info('stopped')
+
+    @classmethod
+    def wrap_request(cls, return_cls):
+        def real_wrapper(func):
+            @inlineCallbacks
+            def wrapper(*args, **kw):
+                try:
+                    (success, d) = yield func(*args, **kw)
+                    if success:
+                        log.debug("successful-response", func=func, val=d)
+                        if return_cls is not None:
+                            rc = return_cls()
+                            if d is not None:
+                                d.Unpack(rc)
+                            returnValue(rc)
+                        else:
+                            log.debug("successful-response-none", func=func,
+                                      val=None)
+                            returnValue(None)
+                    else:
+                        log.warn("unsuccessful-request", func=func, args=args,
+                                 kw=kw)
+                        returnValue(d)
+                except Exception as e:
+                    log.exception("request-wrapper-exception", func=func, e=e)
+                    raise
+
+            return wrapper
+
+        return real_wrapper
+
+    @inlineCallbacks
+    def invoke(self, rpc, to_topic=None, **kwargs):
+        @inlineCallbacks
+        def _send_request(rpc, m_callback, to_topic, **kwargs):
+            try:
+                log.debug("sending-request", rpc=rpc)
+                if to_topic is None:
+                    to_topic = self.core_topic
+                result = yield self.kafka_proxy.send_request(rpc=rpc,
+                                                             to_topic=to_topic,
+                                                             reply_topic=self.listening_topic,
+                                                             callback=None,
+                                                             **kwargs)
+                if not m_callback.called:
+                    m_callback.callback(result)
+                else:
+                    log.debug('timeout-already-occurred', rpc=rpc)
+            except Exception as e:
+                log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+                if not m_callback.called:
+                    m_callback.errback(failure.Failure())
+
+        cb = DeferredWithTimeout(timeout=self.default_timeout)
+        _send_request(rpc, cb, to_topic, **kwargs)
+        try:
+            res = yield cb
+            returnValue(res)
+        except TimeOutError as e:
+            log.warn('invoke-timeout', e=e)
+            raise e
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index 512262f..36459ed 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -1,5 +1,5 @@
 #
-# Copyright 2017 the original author or authors.
+# Copyright 2018 the original author or authors.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,122 +15,28 @@
 #
 
 """
-Agent to play gateway between CORE and an individual adapter.
+Agent to play gateway between CORE and an adapter.
 """
-from uuid import uuid4
-
-import arrow
 import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.python import failure
-from zope.interface import implementer
-
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
-from adapters.interface import IAdapterInterface
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
-    AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
-    LogicalPort, AlarmFilterRuleKey, CoreInstance
-from adapters.common.utils.registry import registry, IComponent
-from adapters.common.utils.id_generation import create_cluster_device_id
-import re
-from adapters.interface import ICoreSouthBoundInterface
-from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
-from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
 from google.protobuf.message import Message
-from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
+from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
+from adapters.protos.device_pb2 import Device, Ports
+from adapters.protos.voltha_pb2 import CoreInstance
 
 log = structlog.get_logger()
 
-class KafkaMessagingError(BaseException):
-    def __init__(self, error):
-        self.error = error
 
-def wrap_request(return_cls):
-    def real_wrapper(func):
-        @inlineCallbacks
-        def wrapper(*args, **kw):
-            try:
-                (success, d) = yield func(*args, **kw)
-                if success:
-                    log.debug("successful-response", func=func, val=d)
-                    if return_cls is not None:
-                        rc = return_cls()
-                        if d is not None:
-                            d.Unpack(rc)
-                        returnValue(rc)
-                    else:
-                        log.debug("successful-response-none", func=func,
-                                  val=None)
-                        returnValue(None)
-                else:
-                    log.warn("unsuccessful-request", func=func, args=args, kw=kw)
-                    returnValue(d)
-            except Exception as e:
-                log.exception("request-wrapper-exception", func=func, e=e)
-                raise
-        return wrapper
-    return real_wrapper
-
-
-@implementer(IComponent, ICoreSouthBoundInterface)
-class CoreProxy(object):
+class CoreProxy(ContainerProxy):
 
     def __init__(self, kafka_proxy, core_topic, my_listening_topic):
-        self.kafka_proxy = kafka_proxy
-        self.listening_topic = my_listening_topic
-        self.core_topic = core_topic
-        self.default_timeout = 3
+        super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+                                        my_listening_topic)
 
-    def start(self):
-        log.info('started')
-
-        return self
-
-    def stop(self):
-        log.info('stopped')
-
-    @inlineCallbacks
-    def invoke(self, rpc, to_topic=None, **kwargs):
-        @inlineCallbacks
-        def _send_request(rpc, m_callback,to_topic, **kwargs):
-            try:
-                log.debug("sending-request", rpc=rpc)
-                if to_topic is None:
-                    to_topic = self.core_topic
-                result = yield self.kafka_proxy.send_request(rpc=rpc,
-                                                             to_topic=to_topic,
-                                                             reply_topic=self.listening_topic,
-                                                             callback=None,
-                                                             **kwargs)
-                if not m_callback.called:
-                    m_callback.callback(result)
-                else:
-                    log.debug('timeout-already-occurred', rpc=rpc)
-            except Exception as e:
-                log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
-                if not m_callback.called:
-                    m_callback.errback(failure.Failure())
-
-        log.debug('invoke-request', rpc=rpc)
-        cb = DeferredWithTimeout(timeout=self.default_timeout)
-        _send_request(rpc, cb, to_topic, **kwargs)
-        try:
-            res = yield cb
-            returnValue(res)
-        except TimeOutError as e:
-            log.warn('invoke-timeout', e=e)
-            raise e
-
-
-    @wrap_request(CoreInstance)
+    @ContainerProxy.wrap_request(CoreInstance)
     @inlineCallbacks
     def register(self, adapter):
         log.debug("register")
@@ -142,7 +48,7 @@
             log.exception("registration-exception", e=e)
             raise
 
-    @wrap_request(Device)
+    @ContainerProxy.wrap_request(Device)
     @inlineCallbacks
     def get_device(self, device_id):
         log.debug("get-device")
@@ -151,15 +57,12 @@
         res = yield self.invoke(rpc="GetDevice", device_id=id)
         returnValue(res)
 
-    @wrap_request(Device)
+    @ContainerProxy.wrap_request(Device)
     @inlineCallbacks
     def get_child_device(self, parent_device_id, **kwargs):
         raise NotImplementedError()
 
-    # def add_device(self, device):
-    #     raise NotImplementedError()
-
-    @wrap_request(Ports)
+    @ContainerProxy.wrap_request(Ports)
     @inlineCallbacks
     def get_ports(self, device_id, port_type):
         id = ID()
@@ -179,7 +82,7 @@
 
     def _to_proto(self, **kwargs):
         encoded = {}
-        for k,v in kwargs.iteritems():
+        for k, v in kwargs.iteritems():
             if isinstance(v, Message):
                 encoded[k] = v
             elif type(v) == int:
@@ -196,8 +99,7 @@
                 encoded[k] = b_proto
         return encoded
 
-
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def child_device_detected(self,
                               parent_device_id,
@@ -217,14 +119,13 @@
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
                                 parent_device_id=id,
-                                parent_port_no = ppn,
-                                child_device_type= cdt,
+                                parent_port_no=ppn,
+                                child_device_type=cdt,
                                 channel_id=channel,
                                 **args)
         returnValue(res)
 
-
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
@@ -234,21 +135,20 @@
     def child_device_removed(parent_device_id, child_device_id):
         raise NotImplementedError()
 
-
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def device_state_update(self, device_id,
-                                   oper_status=None,
-                                   connect_status=None):
+                            oper_status=None,
+                            connect_status=None):
         id = ID()
         id.id = device_id
         o_status = IntType()
-        if oper_status or oper_status==OperStatus.UNKNOWN:
+        if oper_status or oper_status == OperStatus.UNKNOWN:
             o_status.val = oper_status
         else:
             o_status.val = -1
         c_status = IntType()
-        if connect_status or connect_status==ConnectStatus.UNKNOWN:
+        if connect_status or connect_status == ConnectStatus.UNKNOWN:
             c_status.val = connect_status
         else:
             c_status.val = -1
@@ -259,21 +159,20 @@
                                 connect_status=c_status)
         returnValue(res)
 
-
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def children_state_update(self, device_id,
-                            oper_status=None,
-                            connect_status=None):
+                              oper_status=None,
+                              connect_status=None):
         id = ID()
         id.id = device_id
         o_status = IntType()
-        if oper_status or oper_status==OperStatus.UNKNOWN:
+        if oper_status or oper_status == OperStatus.UNKNOWN:
             o_status.val = oper_status
         else:
             o_status.val = -1
         c_status = IntType()
-        if connect_status or connect_status==ConnectStatus.UNKNOWN:
+        if connect_status or connect_status == ConnectStatus.UNKNOWN:
             c_status.val = connect_status
         else:
             c_status.val = -1
@@ -284,7 +183,7 @@
                                 connect_status=c_status)
         returnValue(res)
 
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def port_state_update(self,
                           device_id,
@@ -307,9 +206,7 @@
                                 oper_status=o_status)
         returnValue(res)
 
-
-
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def child_devices_state_update(self, parent_device_id,
                                    oper_status=None,
@@ -318,12 +215,12 @@
         id = ID()
         id.id = parent_device_id
         o_status = IntType()
-        if oper_status or oper_status==OperStatus.UNKNOWN:
+        if oper_status or oper_status == OperStatus.UNKNOWN:
             o_status.val = oper_status
         else:
             o_status.val = -1
         c_status = IntType()
-        if connect_status or connect_status==ConnectStatus.UNKNOWN:
+        if connect_status or connect_status == ConnectStatus.UNKNOWN:
             c_status.val = connect_status
         else:
             c_status.val = -1
@@ -334,12 +231,10 @@
                                 connect_status=c_status)
         returnValue(res)
 
-
     def child_devices_removed(parent_device_id):
         raise NotImplementedError()
 
-
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def device_pm_config_update(self, device_pm_config, init=False):
         log.debug("device_pm_config_update")
@@ -349,16 +244,16 @@
                                 device_pm_config=device_pm_config, init=b)
         returnValue(res)
 
-    @wrap_request(None)
+    @ContainerProxy.wrap_request(None)
     @inlineCallbacks
     def port_created(self, device_id, port):
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
-        res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
+        res = yield self.invoke(rpc="PortCreated", device_id=proto_id,
+                                port=port)
         returnValue(res)
 
-
     def port_removed(device_id, port):
         raise NotImplementedError()
 
diff --git a/adapters/kafka/kafka_inter_container_library.py b/adapters/kafka/kafka_inter_container_library.py
index f5bb720..3f6f5eb 100644
--- a/adapters/kafka/kafka_inter_container_library.py
+++ b/adapters/kafka/kafka_inter_container_library.py
@@ -14,28 +14,32 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from zope.interface import Interface, implementer
-from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+import time
+from uuid import uuid4
+
+import structlog
+from afkak.client import KafkaClient
+from afkak.consumer import OFFSET_LATEST, Consumer
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
     DeferredQueue, gatherResults
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
-import structlog
-from adapters.common.utils import asleep
-from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
-    InterContainerRequestBody, InterContainerMessage, Header, InterContainerResponseBody
-import time
-from uuid import uuid4
-from adapters.common.utils.registry import IComponent
+from zope.interface import implementer
 
+from adapters.common.utils import asleep
+from adapters.common.utils.registry import IComponent
+from adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adapters.protos.core_adapter_pb2 import MessageType, Argument, \
+    InterContainerRequestBody, InterContainerMessage, Header, \
+    InterContainerResponseBody
 
 log = structlog.get_logger()
 
+
 class KafkaMessagingError(BaseException):
     def __init__(self, error):
         self.error = error
 
+
 @implementer(IComponent)
 class IKafkaMessagingProxy(object):
     _kafka_messaging_instance = None
@@ -115,7 +119,6 @@
         except Exception as e:
             log.exception("Failed-to-start-proxy", e=e)
 
-
     def stop(self):
         """
         Invoked to stop the kafka proxy
@@ -135,7 +138,6 @@
         except Exception as e:
             log.exception("Exception-when-stopping-messaging-proxy:", e=e)
 
-
     @inlineCallbacks
     def _wait_until_topic_is_ready(self, client, topic):
         e = True
@@ -165,7 +167,8 @@
                              for partition in partitions]
                 self.topic_consumer_map[topic] = consumers
 
-            log.debug("_subscribe", topic=topic, consumermap=self.topic_consumer_map)
+            log.debug("_subscribe", topic=topic,
+                      consumermap=self.topic_consumer_map)
 
             if target_cls is not None and callback is None:
                 # Scenario #1
@@ -409,6 +412,7 @@
         Default internal method invoked for every batch of messages received
         from Kafka.
         """
+
         def _toDict(args):
             """
             Convert a repeatable Argument type into a python dictionary
@@ -443,24 +447,6 @@
             message.ParseFromString(val)
 
             if message.header.type == MessageType.Value("REQUEST"):
-                # if self.num_messages == 0:
-                #     self.init_time = int(round(time.time() * 1000))
-                #     self.init_received_time = message.header.timestamp
-                #     log.debug("INIT_TIME", time=self.init_time,
-                #               time_sent=message.header.timestamp)
-                # self.num_messages = self.num_messages + 1
-                #
-                # self.total_time = self.total_time + current_time - message.header.timestamp
-                #
-                # if self.num_messages % 10 == 0:
-                #     log.debug("TOTAL_TIME ...",
-                #               num=self.num_messages,
-                #               total=self.total_time,
-                #               duration=current_time - self.init_time,
-                #               time_since_first_msg=current_time - self.init_received_time,
-                #               average=self.total_time / 10)
-                #     self.total_time = 0
-
                 # Get the target class for that specific topic
                 targetted_topic = self._to_string(message.header.to_topic)
                 msg_body = InterContainerRequestBody()
@@ -497,16 +483,6 @@
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map:
-                    # self.num_responses = self.num_responses + 1
-                    # self.total_time_responses = self.total_time_responses + current_time - \
-                    #                             message.header.timestamp
-                    # if self.num_responses % 10 == 0:
-                    #     log.debug("TOTAL RESPONSES ...",
-                    #               num=self.num_responses,
-                    #               total=self.total_time_responses,
-                    #               average=self.total_time_responses / 10)
-                    #     self.total_time_responses = 0
-
                     resp = self._parse_response(val)
 
                     self.transaction_id_deferred_map[trns_id].callback(resp)
@@ -568,9 +544,9 @@
                 self.transaction_id_deferred_map[
                     self._to_string(request.header.id)] = wait_for_result
 
-            log.debug("BEFORE-SENDING", to_topic=to_topic, from_topic=reply_topic)
             yield self._send_kafka_message(to_topic, request)
-            log.debug("AFTER-SENDING", to_topic=to_topic, from_topic=reply_topic)
+            log.debug("message-sent", to_topic=to_topic,
+                      from_topic=reply_topic)
 
             if response_required:
                 res = yield wait_for_result
diff --git a/adapters/kafka/kafka_proxy.py b/adapters/kafka/kafka_proxy.py
index 10fdbf8..c11caa7 100644
--- a/adapters/kafka/kafka_proxy.py
+++ b/adapters/kafka/kafka_proxy.py
@@ -16,7 +16,7 @@
 
 from afkak.client import KafkaClient as _KafkaClient
 from afkak.common import (
-    PRODUCER_ACK_LOCAL_WRITE, PRODUCER_ACK_NOT_REQUIRED
+    PRODUCER_ACK_NOT_REQUIRED
 )
 from afkak.producer import Producer as _kafkaProducer
 from structlog import get_logger
@@ -24,9 +24,8 @@
 from zope.interface import implementer
 
 from adapters.common.utils.consulhelpers import get_endpoint_from_consul
-from adapters.kafka.event_bus_publisher import EventBusPublisher
 from adapters.common.utils.registry import IComponent
-import time
+from adapters.kafka.event_bus_publisher import EventBusPublisher
 
 log = get_logger()
 
@@ -96,21 +95,12 @@
                 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
                 pass
 
-            #try:
-            #    if self.event_bus_publisher:
-            #        yield self.event_bus_publisher.stop()
-            #        self.event_bus_publisher = None
-            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
-            #except Exception, e:
-            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
-            #    pass
-
             log.debug('stopped-kafka-proxy')
 
         except Exception, e:
             self.kclient = None
             self.kproducer = None
-            #self.event_bus_publisher = None
+            # self.event_bus_publisher = None
             log.exception('failed-stopped-kafka-proxy', e=e)
             pass
 
@@ -122,7 +112,8 @@
             if self.kafka_endpoint.startswith('@'):
                 try:
                     _k_endpoint = get_endpoint_from_consul(self.consul_endpoint,
-                                                           self.kafka_endpoint[1:])
+                                                           self.kafka_endpoint[
+                                                           1:])
                     log.debug('found-kafka-service', endpoint=_k_endpoint)
 
                 except Exception as e:
@@ -160,7 +151,8 @@
                     self._get_kafka_producer()
                     # Lets the next message request do the retry if still a failure
                     if self.kproducer is None:
-                        log.error('no-kafka-producer', endpoint=self.kafka_endpoint)
+                        log.error('no-kafka-producer',
+                                  endpoint=self.kafka_endpoint)
                         return
 
                 # log.debug('sending-kafka-msg', topic=topic, msg=msg)
@@ -206,4 +198,3 @@
 # Common method to get the singleton instance of the kafka proxy class
 def get_kafka_proxy():
     return KafkaProxy._kafka_instance
-