[VOL-1034, VOL-1035, VOL-1037] This commit consists of:
1) Implementation of inter-adapter communication using flows
as proxy message between an ONU and its parent OLT.
2) Update the protos to reflect the inter-adapter message structure
3) Cleanup the ponsim adapters to removed unsued references and
general cleanup.
Change-Id: Ibe913a80a96d601fed946d9b9db55bb8d4f2c15a
diff --git a/adapters/iadapter.py b/adapters/iadapter.py
index 0d32096..ee4d116 100644
--- a/adapters/iadapter.py
+++ b/adapters/iadapter.py
@@ -19,30 +19,37 @@
"""
import structlog
-from zope.interface import implementer
from twisted.internet import reactor
+from zope.interface import implementer
-from adapters.protos.common_pb2 import AdminState
-from adapters.protos.device_pb2 import DeviceType, DeviceTypes
from adapters.interface import IAdapterInterface
from adapters.protos.adapter_pb2 import Adapter
from adapters.protos.adapter_pb2 import AdapterConfig
+from adapters.protos.common_pb2 import AdminState
from adapters.protos.common_pb2 import LogLevel
+from adapters.protos.device_pb2 import DeviceType, DeviceTypes
from adapters.protos.health_pb2 import HealthStatus
-from adapters.protos.device_pb2 import Device
log = structlog.get_logger()
@implementer(IAdapterInterface)
class IAdapter(object):
- def __init__(self, adapter_agent, config, device_handler_class, name,
- vendor, version, device_type, vendor_id,
+ def __init__(self,
+ core_proxy,
+ adapter_proxy,
+ config,
+ device_handler_class,
+ name,
+ vendor,
+ version,
+ device_type, vendor_id,
accepts_bulk_flow_update=True,
- accepts_add_remove_flow_updates=False, core_proxy=None):
- log.debug('Initializing adapter: {} {} {}'.format(vendor, name, version))
- self.adapter_agent = adapter_agent
- self.core_proxy=core_proxy
+ accepts_add_remove_flow_updates=False):
+ log.debug(
+ 'Initializing adapter: {} {} {}'.format(vendor, name, version))
+ self.core_proxy = core_proxy
+ self.adapter_proxy = adapter_proxy
self.config = config
self.name = name
self.supported_device_types = [
@@ -84,19 +91,24 @@
def get_ofp_device_info(self, device):
log.debug('get_ofp_device_info_start', device_id=device.id)
- ofp_device_info = self.devices_handlers[device.id].get_ofp_device_info(device)
+ ofp_device_info = self.devices_handlers[device.id].get_ofp_device_info(
+ device)
log.debug('get_ofp_device_info_ends', device_id=device.id)
return ofp_device_info
def get_ofp_port_info(self, device, port_no):
- log.debug('get_ofp_port_info_start', device_id=device.id, port_no=port_no)
- ofp_port_info = self.devices_handlers[device.id].get_ofp_port_info(device, port_no)
- log.debug('get_ofp_port_info_ends', device_id=device.id, port_no=port_no)
+ log.debug('get_ofp_port_info_start', device_id=device.id,
+ port_no=port_no)
+ ofp_port_info = self.devices_handlers[device.id].get_ofp_port_info(
+ device, port_no)
+ log.debug('get_ofp_port_info_ends', device_id=device.id,
+ port_no=port_no)
return ofp_port_info
def adopt_device(self, device):
log.debug('adopt_device', device_id=device.id)
- self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
log.debug('adopt_device_done', device_id=device.id)
return device
@@ -142,7 +154,8 @@
def self_test_device(self, device):
log.info('self-test', device_id=device.id)
- result = reactor.callLater(0, self.devices_handlers[device.id].self_test_device)
+ result = reactor.callLater(0, self.devices_handlers[
+ device.id].self_test_device)
log.info('self-test-done', device_id=device.id)
return result
@@ -159,10 +172,10 @@
log.info('bulk-flow-update', device_id=device.id,
flows=flows, groups=groups)
assert len(groups.items) == 0
- reactor.callLater(0, self.devices_handlers[device.id].update_flow_table, flows.items)
+ reactor.callLater(0, self.devices_handlers[device.id].update_flow_table,
+ flows.items)
return device
-
def update_flows_incrementally(self, device, flow_changes, group_changes):
log.info('incremental-flow-update', device_id=device.id,
flows=flow_changes, groups=group_changes)
@@ -173,11 +186,13 @@
handler = self.devices_handlers[device.id]
# Remove flows
if len(flow_changes.to_remove.items) != 0:
- reactor.callLater(0, handler.remove_from_flow_table, flow_changes.to_remove.items)
+ reactor.callLater(0, handler.remove_from_flow_table,
+ flow_changes.to_remove.items)
# Add flows
if len(flow_changes.to_add.items) != 0:
- reactor.callLater(0, handler.add_to_flow_table, flow_changes.to_add.items)
+ reactor.callLater(0, handler.add_to_flow_table,
+ flow_changes.to_add.items)
return device
def update_pm_config(self, device, pm_config):
@@ -186,18 +201,12 @@
handler = self.devices_handlers[device.id]
handler.update_pm_config(device, pm_config)
- def send_proxied_message(self, proxy_address, msg):
- raise NotImplementedError()
-
- def receive_proxied_message(self, proxy_address, msg):
+ def process_inter_adapter_message(self, msg):
raise NotImplementedError()
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
raise NotImplementedError()
- def receive_inter_adapter_message(self, msg):
- raise NotImplementedError()
-
def suppress_alarm(self, filter):
raise NotImplementedError()
@@ -211,16 +220,25 @@
return handler
return None
+
"""
OLT Adapter base class
"""
+
+
class OltAdapter(IAdapter):
- def __init__(self, adapter_agent, config, device_handler_class, name,
- vendor, version, device_type,
+ def __init__(self,
+ core_proxy,
+ adapter_proxy,
+ config,
+ device_handler_class,
+ name,
+ vendor,
+ version, device_type,
accepts_bulk_flow_update=True,
- accepts_add_remove_flow_updates=False,
- core_proxy=None):
- super(OltAdapter, self).__init__(adapter_agent=adapter_agent,
+ accepts_add_remove_flow_updates=False):
+ super(OltAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=device_handler_class,
name=name,
@@ -229,13 +247,13 @@
device_type=device_type,
vendor_id=None,
accepts_bulk_flow_update=accepts_bulk_flow_update,
- accepts_add_remove_flow_updates=accepts_add_remove_flow_updates,
- core_proxy=None)
+ accepts_add_remove_flow_updates=accepts_add_remove_flow_updates)
self.logical_device_id_to_root_device_id = dict()
def reconcile_device(self, device):
try:
- self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
# Work only required for devices that are in ENABLED state
if device.admin_state == AdminState.ENABLED:
reactor.callLater(0,
@@ -244,7 +262,7 @@
else:
# Invoke the children reconciliation which would setup the
# basic children data structures
- self.adapter_agent.reconcile_child_devices(device.id)
+ self.core_proxy.reconcile_child_devices(device.id)
return device
except Exception, e:
log.exception('Exception', e=e)
@@ -254,11 +272,25 @@
handler = self.devices_handlers[proxy_address.device_id]
handler.send_proxied_message(proxy_address, msg)
+ def process_inter_adapter_message(self, msg):
+ log.info('process-inter-adapter-message', msg=msg)
+ # Unpack the header to know which device needs to handle this message
+ handler = None
+ if msg.header.proxy_device_id:
+ # typical request
+ handler = self.devices_handlers[msg.header.proxy_device_id]
+ elif msg.header.to_device_id and \
+ msg.header.to_device_id in self.devices_handlers:
+ # typical response
+ handler = self.devices_handlers[msg.header.to_device_id]
+ if handler:
+ reactor.callLater(0, handler.process_inter_adapter_message, msg)
+
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
def ldi_to_di(ldi):
di = self.logical_device_id_to_root_device_id.get(ldi)
if di is None:
- logical_device = self.adapter_agent.get_logical_device(ldi)
+ logical_device = self.core_proxy.get_logical_device(ldi)
di = logical_device.root_device_id
self.logical_device_id_to_root_device_id[ldi] = di
return di
@@ -274,10 +306,20 @@
class OnuAdapter(IAdapter):
- def __init__(self, adapter_agent, config, device_handler_class, name,
- vendor, version, device_type, vendor_id, accepts_bulk_flow_update=True,
+ def __init__(self,
+ core_proxy,
+ adapter_proxy,
+ config,
+ device_handler_class,
+ name,
+ vendor,
+ version,
+ device_type,
+ vendor_id,
+ accepts_bulk_flow_update=True,
accepts_add_remove_flow_updates=False):
- super(OnuAdapter, self).__init__(adapter_agent=adapter_agent,
+ super(OnuAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
config=config,
device_handler_class=device_handler_class,
name=name,
@@ -286,12 +328,11 @@
device_type=device_type,
vendor_id=vendor_id,
accepts_bulk_flow_update=accepts_bulk_flow_update,
- accepts_add_remove_flow_updates=accepts_add_remove_flow_updates,
- core_proxy=None
- )
+ accepts_add_remove_flow_updates=accepts_add_remove_flow_updates)
def reconcile_device(self, device):
- self.devices_handlers[device.id] = self.device_handler_class(self, device.id)
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
# Reconcile only if state was ENABLED
if device.admin_state == AdminState.ENABLED:
reactor.callLater(0,
@@ -304,8 +345,15 @@
device_id=proxy_address.device_id, msg=msg)
# Device_id from the proxy_address is the olt device id. We need to
# get the onu device id using the port number in the proxy_address
- device = self.adapter_agent. \
+ device = self.core_proxy. \
get_child_device_with_proxy_address(proxy_address)
if device:
handler = self.devices_handlers[device.id]
handler.receive_message(msg)
+
+ def process_inter_adapter_message(self, msg):
+ log.info('process-inter-adapter-message', msg=msg)
+ # Unpack the header to know which device needs to handle this message
+ if msg.header:
+ handler = self.devices_handlers[msg.header.to_device_id]
+ handler.process_inter_adapter_message(msg)