[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/kafka/container_proxy.py b/adapters/kafka/container_proxy.py
new file mode 100644
index 0000000..79918cd
--- /dev/null
+++ b/adapters/kafka/container_proxy.py
@@ -0,0 +1,114 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+The superclass for all kafka proxy subclasses.
+"""
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python import failure
+from zope.interface import implementer
+
+from adapters.common.utils.deferred_utils import DeferredWithTimeout, \
+ TimeOutError
+from adapters.common.utils.registry import IComponent
+
+log = structlog.get_logger()
+
+
+class KafkaMessagingError(BaseException):
+ def __init__(self, error):
+ self.error = error
+
+
+@implementer(IComponent)
+class ContainerProxy(object):
+
+ def __init__(self, kafka_proxy, core_topic, my_listening_topic):
+ self.kafka_proxy = kafka_proxy
+ self.listening_topic = my_listening_topic
+ self.core_topic = core_topic
+ self.default_timeout = 3
+
+ def start(self):
+ log.info('started')
+
+ return self
+
+ def stop(self):
+ log.info('stopped')
+
+ @classmethod
+ def wrap_request(cls, return_cls):
+ def real_wrapper(func):
+ @inlineCallbacks
+ def wrapper(*args, **kw):
+ try:
+ (success, d) = yield func(*args, **kw)
+ if success:
+ log.debug("successful-response", func=func, val=d)
+ if return_cls is not None:
+ rc = return_cls()
+ if d is not None:
+ d.Unpack(rc)
+ returnValue(rc)
+ else:
+ log.debug("successful-response-none", func=func,
+ val=None)
+ returnValue(None)
+ else:
+ log.warn("unsuccessful-request", func=func, args=args,
+ kw=kw)
+ returnValue(d)
+ except Exception as e:
+ log.exception("request-wrapper-exception", func=func, e=e)
+ raise
+
+ return wrapper
+
+ return real_wrapper
+
+ @inlineCallbacks
+ def invoke(self, rpc, to_topic=None, **kwargs):
+ @inlineCallbacks
+ def _send_request(rpc, m_callback, to_topic, **kwargs):
+ try:
+ log.debug("sending-request", rpc=rpc)
+ if to_topic is None:
+ to_topic = self.core_topic
+ result = yield self.kafka_proxy.send_request(rpc=rpc,
+ to_topic=to_topic,
+ reply_topic=self.listening_topic,
+ callback=None,
+ **kwargs)
+ if not m_callback.called:
+ m_callback.callback(result)
+ else:
+ log.debug('timeout-already-occurred', rpc=rpc)
+ except Exception as e:
+ log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
+ if not m_callback.called:
+ m_callback.errback(failure.Failure())
+
+ cb = DeferredWithTimeout(timeout=self.default_timeout)
+ _send_request(rpc, cb, to_topic, **kwargs)
+ try:
+ res = yield cb
+ returnValue(res)
+ except TimeOutError as e:
+ log.warn('invoke-timeout', e=e)
+ raise e