Initial commit moving openolt adapter from voltha-go to the new repo.
This version works with ponsim rather than openolt, this is temporary.
It is currently being fixed to work with openolt.

Change-Id: I34a800c98f050140b367e2d474b7aa8b79f34b9a
Signed-off-by: William Kurkian <>
diff --git a/python/adapters/kafka/ b/python/adapters/kafka/
new file mode 100644
index 0000000..efcf558
--- /dev/null
+++ b/python/adapters/kafka/
@@ -0,0 +1,133 @@
+# Copyright 2018 the original author or authors.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+The superclass for all kafka proxy subclasses.
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+from python.common.utils.deferred_utils import DeferredWithTimeout, \
+    TimeOutError
+from python.common.utils.registry import IComponent
+log = structlog.get_logger()
+class KafkaMessagingError(BaseException):
+    def __init__(self, error):
+        self.error = error
+class ContainerProxy(object):
+    def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+        self.kafka_proxy = kafka_proxy
+        self.listening_topic = my_listening_topic
+        self.core_topic = core_topic
+        self.default_timeout = 3
+    def start(self):
+        return self
+    def stop(self):
+    @classmethod
+    def wrap_request(cls, return_cls):
+        def real_wrapper(func):
+            @inlineCallbacks
+            def wrapper(*args, **kw):
+                try:
+                    (success, d) = yield func(*args, **kw)
+                    if success:
+                        log.debug("successful-response", func=func, val=d)
+                        if return_cls is not None:
+                            rc = return_cls()
+                            if d is not None:
+                                d.Unpack(rc)
+                            returnValue(rc)
+                        else:
+                            log.debug("successful-response-none", func=func,
+                                      val=None)
+                            returnValue(None)
+                    else:
+                        log.warn("unsuccessful-request", func=func, args=args,
+                                 kw=kw)
+                        returnValue(d)
+                except Exception as e:
+                    log.exception("request-wrapper-exception", func=func, e=e)
+                    raise
+            return wrapper
+        return real_wrapper
+    @inlineCallbacks
+    def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
+        @inlineCallbacks
+        def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
+            try:
+                log.debug("sending-request",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic)
+                if to_topic is None:
+                    to_topic = self.core_topic
+                if reply_topic is None:
+                    reply_topic = self.listening_topic
+                result = yield self.kafka_proxy.send_request(rpc=rpc,
+                                                             to_topic=to_topic,
+                                                             reply_topic=reply_topic,
+                                                             callback=None,
+                                                             **kwargs)
+                if not m_callback.called:
+                    m_callback.callback(result)
+                else:
+                    log.debug('timeout-already-occurred', rpc=rpc)
+            except Exception as e:
+                log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+                if not m_callback.called:
+                    m_callback.errback(failure.Failure())
+        # We are going to resend the request on the to_topic if there is a
+        # timeout error. This time the timeout will be longer.  If the second
+        # request times out then we will send the request to the default
+        # core_topic.
+        timeouts = [self.default_timeout,
+                    self.default_timeout*2,
+                    self.default_timeout]
+        retry = 0
+        max_retry = 2
+        for timeout in timeouts:
+            cb = DeferredWithTimeout(timeout=timeout)
+            _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+            try:
+                res = yield cb
+                returnValue(res)
+            except TimeOutError as e:
+                log.warn('invoke-timeout', e=e)
+                if retry == max_retry:
+                    raise e
+                retry += 1
+                if retry == max_retry:
+                    to_topic = self.core_topic