[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/kafka/core_proxy.py b/adapters/kafka/core_proxy.py
index 512262f..36459ed 100644
--- a/adapters/kafka/core_proxy.py
+++ b/adapters/kafka/core_proxy.py
@@ -1,5 +1,5 @@
#
-# Copyright 2017 the original author or authors.
+# Copyright 2018 the original author or authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,122 +15,28 @@
#
"""
-Agent to play gateway between CORE and an individual adapter.
+Agent to play gateway between CORE and an adapter.
"""
-from uuid import uuid4
-
-import arrow
import structlog
-from google.protobuf.json_format import MessageToJson
-from scapy.packet import Packet
-from twisted.internet.defer import inlineCallbacks, returnValue
-from twisted.python import failure
-from zope.interface import implementer
-
-from adapters.common.event_bus import EventBusClient
-from adapters.common.frameio.frameio import hexify
-from adapters.common.utils.id_generation import create_cluster_logical_device_ids
-from adapters.interface import IAdapterInterface
-from adapters.protos import third_party
-from adapters.protos.device_pb2 import Device, Port, Ports, PmConfigs
-from adapters.protos.events_pb2 import AlarmEvent, AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
-from adapters.protos.events_pb2 import KpiEvent
-from adapters.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AlarmFilterRuleKey, CoreInstance
-from adapters.common.utils.registry import registry, IComponent
-from adapters.common.utils.id_generation import create_cluster_device_id
-import re
-from adapters.interface import ICoreSouthBoundInterface
-from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
-from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
from google.protobuf.message import Message
-from adapters.common.utils.deferred_utils import DeferredWithTimeout, TimeOutError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from adapters.kafka.container_proxy import ContainerProxy
+from adapters.protos.common_pb2 import ID, ConnectStatus, OperStatus
+from adapters.protos.core_adapter_pb2 import StrType, BoolType, IntType
+from adapters.protos.device_pb2 import Device, Ports
+from adapters.protos.voltha_pb2 import CoreInstance
log = structlog.get_logger()
-class KafkaMessagingError(BaseException):
- def __init__(self, error):
- self.error = error
-def wrap_request(return_cls):
- def real_wrapper(func):
- @inlineCallbacks
- def wrapper(*args, **kw):
- try:
- (success, d) = yield func(*args, **kw)
- if success:
- log.debug("successful-response", func=func, val=d)
- if return_cls is not None:
- rc = return_cls()
- if d is not None:
- d.Unpack(rc)
- returnValue(rc)
- else:
- log.debug("successful-response-none", func=func,
- val=None)
- returnValue(None)
- else:
- log.warn("unsuccessful-request", func=func, args=args, kw=kw)
- returnValue(d)
- except Exception as e:
- log.exception("request-wrapper-exception", func=func, e=e)
- raise
- return wrapper
- return real_wrapper
-
-
-@implementer(IComponent, ICoreSouthBoundInterface)
-class CoreProxy(object):
+class CoreProxy(ContainerProxy):
def __init__(self, kafka_proxy, core_topic, my_listening_topic):
- self.kafka_proxy = kafka_proxy
- self.listening_topic = my_listening_topic
- self.core_topic = core_topic
- self.default_timeout = 3
+ super(CoreProxy, self).__init__(kafka_proxy, core_topic,
+ my_listening_topic)
- def start(self):
- log.info('started')
-
- return self
-
- def stop(self):
- log.info('stopped')
-
- @inlineCallbacks
- def invoke(self, rpc, to_topic=None, **kwargs):
- @inlineCallbacks
- def _send_request(rpc, m_callback,to_topic, **kwargs):
- try:
- log.debug("sending-request", rpc=rpc)
- if to_topic is None:
- to_topic = self.core_topic
- result = yield self.kafka_proxy.send_request(rpc=rpc,
- to_topic=to_topic,
- reply_topic=self.listening_topic,
- callback=None,
- **kwargs)
- if not m_callback.called:
- m_callback.callback(result)
- else:
- log.debug('timeout-already-occurred', rpc=rpc)
- except Exception as e:
- log.exception("Failure-sending-request", rpc=rpc, kw=kwargs)
- if not m_callback.called:
- m_callback.errback(failure.Failure())
-
- log.debug('invoke-request', rpc=rpc)
- cb = DeferredWithTimeout(timeout=self.default_timeout)
- _send_request(rpc, cb, to_topic, **kwargs)
- try:
- res = yield cb
- returnValue(res)
- except TimeOutError as e:
- log.warn('invoke-timeout', e=e)
- raise e
-
-
- @wrap_request(CoreInstance)
+ @ContainerProxy.wrap_request(CoreInstance)
@inlineCallbacks
def register(self, adapter):
log.debug("register")
@@ -142,7 +48,7 @@
log.exception("registration-exception", e=e)
raise
- @wrap_request(Device)
+ @ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_device(self, device_id):
log.debug("get-device")
@@ -151,15 +57,12 @@
res = yield self.invoke(rpc="GetDevice", device_id=id)
returnValue(res)
- @wrap_request(Device)
+ @ContainerProxy.wrap_request(Device)
@inlineCallbacks
def get_child_device(self, parent_device_id, **kwargs):
raise NotImplementedError()
- # def add_device(self, device):
- # raise NotImplementedError()
-
- @wrap_request(Ports)
+ @ContainerProxy.wrap_request(Ports)
@inlineCallbacks
def get_ports(self, device_id, port_type):
id = ID()
@@ -179,7 +82,7 @@
def _to_proto(self, **kwargs):
encoded = {}
- for k,v in kwargs.iteritems():
+ for k, v in kwargs.iteritems():
if isinstance(v, Message):
encoded[k] = v
elif type(v) == int:
@@ -196,8 +99,7 @@
encoded[k] = b_proto
return encoded
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def child_device_detected(self,
parent_device_id,
@@ -217,14 +119,13 @@
args = self._to_proto(**kw)
res = yield self.invoke(rpc="ChildDeviceDetected",
parent_device_id=id,
- parent_port_no = ppn,
- child_device_type= cdt,
+ parent_port_no=ppn,
+ child_device_type=cdt,
channel_id=channel,
**args)
returnValue(res)
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_update(self, device):
log.debug("device_update")
@@ -234,21 +135,20 @@
def child_device_removed(parent_device_id, child_device_id):
raise NotImplementedError()
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_state_update(self, device_id,
- oper_status=None,
- connect_status=None):
+ oper_status=None,
+ connect_status=None):
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -259,21 +159,20 @@
connect_status=c_status)
returnValue(res)
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def children_state_update(self, device_id,
- oper_status=None,
- connect_status=None):
+ oper_status=None,
+ connect_status=None):
id = ID()
id.id = device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -284,7 +183,7 @@
connect_status=c_status)
returnValue(res)
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def port_state_update(self,
device_id,
@@ -307,9 +206,7 @@
oper_status=o_status)
returnValue(res)
-
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def child_devices_state_update(self, parent_device_id,
oper_status=None,
@@ -318,12 +215,12 @@
id = ID()
id.id = parent_device_id
o_status = IntType()
- if oper_status or oper_status==OperStatus.UNKNOWN:
+ if oper_status or oper_status == OperStatus.UNKNOWN:
o_status.val = oper_status
else:
o_status.val = -1
c_status = IntType()
- if connect_status or connect_status==ConnectStatus.UNKNOWN:
+ if connect_status or connect_status == ConnectStatus.UNKNOWN:
c_status.val = connect_status
else:
c_status.val = -1
@@ -334,12 +231,10 @@
connect_status=c_status)
returnValue(res)
-
def child_devices_removed(parent_device_id):
raise NotImplementedError()
-
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def device_pm_config_update(self, device_pm_config, init=False):
log.debug("device_pm_config_update")
@@ -349,16 +244,16 @@
device_pm_config=device_pm_config, init=b)
returnValue(res)
- @wrap_request(None)
+ @ContainerProxy.wrap_request(None)
@inlineCallbacks
def port_created(self, device_id, port):
log.debug("port_created")
proto_id = ID()
proto_id.id = device_id
- res = yield self.invoke(rpc="PortCreated", device_id=proto_id, port=port)
+ res = yield self.invoke(rpc="PortCreated", device_id=proto_id,
+ port=port)
returnValue(res)
-
def port_removed(device_id, port):
raise NotImplementedError()