[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index 19775cb..dfac1d3 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -15,24 +15,24 @@
#
"""
-Fully simulated OLT/ONU adapter.
+Represents an ONU device
"""
-import sys
import structlog
from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
-from adapters.common.utils.asleep import asleep
+from adapters.common.utils.asleep import asleep
from adapters.iadapter import OnuAdapter
from adapters.protos import third_party
from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
+from adapters.protos.core_adapter_pb2 import PortCapability, \
+ InterAdapterMessageType, InterAdapterResponseBody
from adapters.protos.device_pb2 import Port
from adapters.protos.logical_device_pb2 import LogicalPort
from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD
from adapters.protos.openflow_13_pb2 import ofp_port
from adapters.protos.ponsim_pb2 import FlowTable
-from adapters.protos.core_adapter_pb2 import PortCapability
_ = third_party
log = structlog.get_logger()
@@ -41,11 +41,13 @@
def mac_str_to_tuple(mac):
return tuple(int(d, 16) for d in mac.split(':'))
+
class PonSimOnuAdapter(OnuAdapter):
- def __init__(self, adapter_agent, config):
+ def __init__(self, core_proxy, adapter_proxy, config):
# DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
# requires for identifying correct adapter or ranged ONU
- super(PonSimOnuAdapter, self).__init__(adapter_agent=adapter_agent,
+ super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=PonSimOnuHandler,
name='ponsim_onu',
@@ -60,8 +62,10 @@
class PonSimOnuHandler(object):
def __init__(self, adapter, device_id):
self.adapter = adapter
- self.adapter_agent = adapter.adapter_agent
+ self.core_proxy = adapter.core_proxy
+ self.adapter_proxy = adapter.adapter_proxy
self.device_id = device_id
+ self.device_parent_id = None
self.log = structlog.get_logger(device_id=device_id)
self.incoming_messages = DeferredQueue()
self.proxy_address = None
@@ -73,19 +77,18 @@
def receive_message(self, msg):
self.incoming_messages.put(msg)
-
@inlineCallbacks
def activate(self, device):
self.log.info('activating')
- # TODO: Register for proxy address
+ self.device_parent_id = device.parent_id
+ self.proxy_address = device.proxy_address
# populate device info
device.root = False
device.vendor = 'ponsim'
device.model = 'n/a'
- # device.connect_status = ConnectStatus.REACHABLE
- yield self.adapter_agent.device_update(device)
+ yield self.core_proxy.device_update(device)
# register physical ports
self.uni_port = Port(
@@ -108,11 +111,12 @@
)
]
)
- self.adapter_agent.port_created(device.id, self.uni_port)
- self.adapter_agent.port_created(device.id, self.pon_port)
+ self.core_proxy.port_created(device.id, self.uni_port)
+ self.core_proxy.port_created(device.id, self.pon_port)
- yield self.adapter_agent.device_state_update(device.id, connect_status=ConnectStatus.REACHABLE, oper_status=OperStatus.ACTIVE)
-
+ yield self.core_proxy.device_state_update(device.id,
+ connect_status=ConnectStatus.REACHABLE,
+ oper_status=OperStatus.ACTIVE)
# TODO: Return only port specific info
def get_ofp_port_info(self, device, port_no):
@@ -120,15 +124,11 @@
# return the capability. TODO: Do a lookup on the UNI port number and return the
# appropriate attributes
self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
- # port_no = device.proxy_address.channel_id
cap = OFPPF_1GB_FD | OFPPF_FIBER
return PortCapability(
- port = LogicalPort (
- # id='uni-{}'.format(port_no),
+ port=LogicalPort(
ofp_port=ofp_port(
- # port_no=port_no,
hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
- # name='uni-{}'.format(port_no),
config=0,
state=OFPPS_LIVE,
curr=cap,
@@ -144,56 +144,55 @@
@inlineCallbacks
def _get_uni_port(self):
- ports = yield self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ ports = yield self.core_proxy.get_ports(self.device_id,
+ Port.ETHERNET_UNI)
returnValue(ports)
@inlineCallbacks
def _get_pon_port(self):
- ports = yield self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ ports = yield self.core_proxy.get_ports(self.device_id, Port.PON_ONU)
returnValue(ports)
def reconcile(self, device):
self.log.info('reconciling-ONU-device-starts')
-
- # first we verify that we got parent reference and proxy info
- assert device.parent_id
- assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
-
- # register for proxied messages right away
- self.proxy_address = device.proxy_address
- self.adapter_agent.register_for_proxied_messages(device.proxy_address)
-
- # Set the connection status to REACHABLE
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
-
- # TODO: Verify that the uni, pon and logical ports exists
-
- # Mark the device as REACHABLE and ACTIVE
- device = self.adapter_agent.get_device(device.id)
- device.connect_status = ConnectStatus.REACHABLE
- device.oper_status = OperStatus.ACTIVE
- self.adapter_agent.update_device(device)
-
- self.log.info('reconciling-ONU-device-ends')
+ # TODO: complete code
@inlineCallbacks
def update_flow_table(self, flows):
+ try:
+ self.log.info('update_flow_table', flows=flows)
+ # we need to proxy through the OLT to get to the ONU
- # we need to proxy through the OLT to get to the ONU
+ # reset response queue
+ while self.incoming_messages.pending:
+ yield self.incoming_messages.get()
- # reset response queue
- while self.incoming_messages.pending:
- yield self.incoming_messages.get()
+ fb = FlowTable(
+ port=self.proxy_address.channel_id,
+ flows=flows
+ )
+ # Sends the request via proxy and wait for an ACK
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=fb,
+ type=InterAdapterMessageType.FLOW_REQUEST,
+ from_adapter=self.adapter.name,
+ to_adapter=self.proxy_address.device_type,
+ to_device_id=self.device_id,
+ proxy_device_id=self.proxy_address.device_id
+ )
+ # Wait for the full response from the proxied adapter
+ res = yield self.incoming_messages.get()
+ self.log.info('response-received', result=res)
+ except Exception as e:
+ self.log.exception("update-flow-error", e=e)
- msg = FlowTable(
- port=self.proxy_address.channel_id,
- flows=flows
- )
- self.adapter_agent.send_proxied_message(self.proxy_address, msg)
-
- yield self.incoming_messages.get()
+ def process_inter_adapter_message(self, msg):
+ self.log.info('process-inter-adapter-message', msg=msg)
+ if msg.header.type == InterAdapterMessageType.FLOW_RESPONSE:
+ body = InterAdapterResponseBody()
+ msg.body.Unpack(body)
+ self.log.info('received-response', status=body.success)
+ self.receive_message(msg)
def remove_from_flow_table(self, flows):
self.log.debug('remove-from-flow-table', flows=flows)
@@ -212,7 +211,8 @@
self.log.info('rebooting', device_id=self.device_id)
# Update the connect status to UNREACHABLE
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
# Sleep 10 secs, simulating a reboot
# TODO: send alert and clear alert after the reboot
@@ -220,7 +220,8 @@
# Change the connection status back to REACHABLE. With a
# real ONU the connection state must be the actual state
- yield self.adapter_agent.device_state_update(self.device_id, connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
self.log.info('rebooted', device_id=self.device_id)
@@ -233,18 +234,18 @@
log.info('self-test-device', device=device.id)
raise NotImplementedError()
-
@inlineCallbacks
def disable(self):
self.log.info('disabling', device_id=self.device_id)
# Update the device operational status to UNKNOWN
- yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.UNKNOWN, connect_status=ConnectStatus.UNREACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.UNKNOWN,
+ connect_status=ConnectStatus.UNREACHABLE)
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
-
self.log.info('disabled', device_id=self.device_id)
@inlineCallbacks
@@ -264,18 +265,20 @@
self.pon_port = ports.items[0]
# Update the state of the UNI port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.ETHERNET_UNI,
- port_no=self.uni_port.port_no,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.ETHERNET_UNI,
+ port_no=self.uni_port.port_no,
+ oper_status=OperStatus.ACTIVE)
# Update the state of the PON port
- yield self.adapter_agent.port_state_update(self.device_id,
- port_type=Port.PON_ONU,
- port_no=self.pon_port.port_no,
- oper_status=OperStatus.ACTIVE)
+ yield self.core_proxy.port_state_update(self.device_id,
+ port_type=Port.PON_ONU,
+ port_no=self.pon_port.port_no,
+ oper_status=OperStatus.ACTIVE)
- yield self.adapter_agent.device_state_update(self.device_id, oper_status=OperStatus.ACTIVE, connect_status=ConnectStatus.REACHABLE)
+ yield self.core_proxy.device_state_update(self.device_id,
+ oper_status=OperStatus.ACTIVE,
+ connect_status=ConnectStatus.REACHABLE)
self.log.info('re-enabled', device_id=self.device_id)
except Exception, e: