Openolt data model - Hash map onu info cache on intf_id, gemport_id
- use onu info cache to lookup port_no for packet_in (needed for packet_in in BBsim)
- code refactor in openolt_device.py and openolt_flow_mgr.py
Change-Id: Ia089a4ec01740d047c68888faa17a48b2c862e2b
diff --git a/voltha/adapters/openolt/openolt.py b/voltha/adapters/openolt/openolt.py
index 51462ad..db9fe35 100644
--- a/voltha/adapters/openolt/openolt.py
+++ b/voltha/adapters/openolt/openolt.py
@@ -110,8 +110,9 @@
kwargs = {
'support_classes': OpenOltDefaults['support_classes'],
'adapter_agent': self.adapter_agent,
- 'device': device,
- 'device_num': self.num_devices + 1
+ 'device_id': device.id,
+ 'host_and_port': device.host_and_port,
+ 'extra_args': device.extra_args
}
try:
self.devices[device.id] = OpenoltDevice(**kwargs)
diff --git a/voltha/adapters/openolt/openolt_data_model.py b/voltha/adapters/openolt/openolt_data_model.py
index b32ca28..5bad88d 100644
--- a/voltha/adapters/openolt/openolt_data_model.py
+++ b/voltha/adapters/openolt/openolt_data_model.py
@@ -31,8 +31,9 @@
from voltha.registry import registry
-# Onu info cache is hashed on both (intf_id, onu_id) and onu serial number
+# Onu info cache is hashed on onu_id, serial number, gemport_id
OnuId = collections.namedtuple('OnuId', ['intf_id', 'onu_id'])
+GemPortId = collections.namedtuple('GemPortId', ['intf_id', 'gemport_id'])
OnuInfo = collections.namedtuple('OnuInfo', ['intf_id',
'onu_id',
'serial_number'])
@@ -40,11 +41,12 @@
class OpenOltDataModel(object):
- def __init__(self, device, adapter_agent, platform):
+ def __init__(self, device_id, adapter_agent, platform):
self.log = structlog.get_logger()
- self.device = device
self.adapter_agent = adapter_agent
+ self.device = self.adapter_agent.get_device(device_id)
+
self.platform = platform
self.logical_device_id = None
@@ -52,15 +54,21 @@
self.device.connect_status = ConnectStatus.UNREACHABLE
self.device.oper_status = OperStatus.ACTIVATING
- self.adapter_agent.update_device(device)
+ self.adapter_agent.update_device(self.device)
self.nni_intf_id = None
self.proxy = registry('core').get_proxy('/')
+ # Hash map OnuId -> OnuInfo
self._onu_ids = {}
+
+ # Hash map onu serial_number (string) -> OnuInfo
self._onu_serial_numbers = {}
+ # Hash map GemPortId -> OnuInfo
+ self._onu_gemport_ids = {}
+
def reconcile(self):
assert self.logical_device_id is not None
self.adapter_agent.reconcile_logical_device(
@@ -235,31 +243,6 @@
onu_id=onu_info.onu_id)]
del self._onu_serial_numbers[serial_number]
- def onu_id(self, serial_number):
- """ Get onu_id from serial_number
- Returns: onu_id
- Raises:
- ValueError: no onu_id found for serial_number
- """
- try:
- return self._onu_serial_numbers[serial_number].onu_id
- except KeyError:
- raise ValueError('onu_id not found, serial_number=%s'
- % serial_number)
-
- def serial_number(self, intf_id, onu_id):
- """ Get serial_number from intf_id, onu_id
- Returns: onu_id
- Raises:
- ValueError: no serial_number found for intf_id, onu_id
- """
- try:
- return self._onu_ids[OnuId(intf_id=intf_id,
- onu_id=onu_id)].serial_number
- except KeyError:
- raise ValueError('serial_number not found, intf_id=%s, onu_id=%s'
- % (intf_id, onu_id))
-
def onu_oper_down(self, intf_id, onu_id):
onu_device = self.adapter_agent.get_child_device(
@@ -356,11 +339,17 @@
self.adapter_agent.receive_proxied_message(onu_device.proxy_address,
pkt)
- def onu_send_packet_in(self, intf_type, intf_id, port_no, pkt):
+ def onu_send_packet_in(self, intf_type, intf_id, port_no, gemport_id, pkt):
if intf_type == "pon":
- if not port_no:
- raise ValueError("invalid port_no")
- logical_port_num = port_no
+ if port_no:
+ logical_port_num = port_no
+ else:
+ # Get logical_port_num from cache
+ onu_id = self.onu_id(intf_id=intf_id, gemport_id=gemport_id)
+ uni_id = 0 # FIXME - multi-uni support
+ logical_port_num = self.platform.mk_uni_port_num(intf_id,
+ onu_id,
+ uni_id)
elif intf_type == "nni":
logical_port_num = self.platform.intf_id_to_port_no(
intf_id,
@@ -374,6 +363,50 @@
packet=str(ether_pkt))
# #######################################################################
+ #
+ # Caching
+ #
+ # #######################################################################
+
+ def onu_id(self, serial_number=None, intf_id=None, gemport_id=None):
+ """ Lookup onu_id by serial_number or (intf_id, gemport_id)
+
+ Returns:
+ onu_id
+ Raises:
+ ValueError -- if no onu_id is found for serial_number
+ or (intf_id, gemport_id)
+ """
+ try:
+ if serial_number is not None:
+ return self._onu_serial_numbers[serial_number].onu_id
+ elif intf_id is not None and gemport_id is not None:
+ gem = GemPortId(intf_id=intf_id, gemport_id=gemport_id)
+ return self._onu_gemport_ids[gem].onu_id
+ except KeyError:
+ raise ValueError('onu_id not found, serial_number=%s, \
+ intf_id=%s, gemport_id=%s'
+ % (serial_number, intf_id, gemport_id))
+
+ def serial_number(self, intf_id, onu_id):
+ """ Get serial_number from intf_id, onu_id
+ Returns: onu_id
+ Raises:
+ ValueError: no serial_number found for intf_id, onu_id
+ """
+ try:
+ return self._onu_ids[OnuId(intf_id=intf_id,
+ onu_id=onu_id)].serial_number
+ except KeyError:
+ raise ValueError('serial_number not found, intf_id=%s, onu_id=%s'
+ % (intf_id, onu_id))
+
+ def gemport_id_add(self, intf_id, onu_id, gemport_id):
+ onu_info = self._onu_ids[OnuId(intf_id=intf_id, onu_id=onu_id)]
+ gem = GemPortId(intf_id=intf_id, gemport_id=gemport_id)
+ self._onu_gemport_ids[gem] = onu_info
+
+ # #######################################################################
# Flow decomposer utility functions
#
# Flow related functions that are used by the OpenOLT flow decomposer.
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index f6f07d5..81231d2 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -74,8 +74,7 @@
self.admin_state = "up"
adapter_agent = kwargs['adapter_agent']
- self.device_num = kwargs['device_num']
- device = kwargs['device']
+ self.device_id = kwargs['device_id']
self.data_model_class = kwargs['support_classes']['data_model']
self.platform_class = kwargs['support_classes']['platform']
@@ -86,18 +85,16 @@
self.stats_mgr_class = kwargs['support_classes']['stats_mgr']
is_reconciliation = kwargs.get('reconciliation', False)
- self.device_id = device.id
- self.host_and_port = device.host_and_port
- self.extra_args = device.extra_args
+ self.host_and_port = kwargs['host_and_port']
+ self.extra_args = kwargs['extra_args']
self.log = structlog.get_logger(ip=self.host_and_port)
self.log.info('openolt-device-init')
- self.data_model = self.data_model_class(device, adapter_agent,
+ self.data_model = self.data_model_class(self.device_id, adapter_agent,
self.platform)
if is_reconciliation:
self.log.info('reconcile data model')
- assert device.parent_id is not None
self.data_model.reconcile()
# Initialize the OLT state machine
@@ -301,7 +298,7 @@
# continue for now.
try:
- onu_id = self.data_model.onu_id(serial_number_str)
+ onu_id = self.data_model.onu_id(serial_number=serial_number_str)
except ValueError:
# FIXME - resource_mgr.get_onu_id() should raise exception
onu_id = self.resource_mgr.get_onu_id(intf_id)
@@ -360,6 +357,7 @@
self.data_model.onu_send_packet_in(pkt_indication.intf_type,
pkt_indication.intf_id,
pkt_indication.port_no,
+ pkt_indication.gemport_id,
pkt_indication.pkt)
def packet_out(self, egress_port, msg):
@@ -437,29 +435,8 @@
flows_to_remove=[f.id for f in flows_to_remove])
return
- try:
- self.flow_mgr.update_children_flows(device_rules_map)
- except Exception as e:
- self.log.error('Error updating children flows', error=e)
-
- self.log.debug('logical flows update', flows_to_add=flows_to_add,
- flows_to_remove=flows_to_remove)
-
- for flow in flows_to_add:
-
- try:
- self.flow_mgr.add_flow(flow)
- except Exception as e:
- self.log.error('failed to add flow', flow=flow, e=e)
-
- for flow in flows_to_remove:
-
- try:
- self.flow_mgr.remove_flow(flow)
- except Exception as e:
- self.log.error('failed to remove flow', flow=flow, e=e)
-
- self.flow_mgr.repush_all_different_flows()
+ self.flow_mgr.update_logical_flows(flows_to_add, flows_to_remove,
+ device_rules_map)
def disable(self):
self.log.debug('sending-deactivate-olt-message')
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index edac82a..fdef71a 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -89,6 +89,32 @@
self._populate_tech_profile_per_pon_port()
self.retry_add_flow_list = []
+ def update_logical_flows(self, flows_to_add, flows_to_remove,
+ device_rules_map):
+ try:
+ self.update_children_flows(device_rules_map)
+ except Exception as e:
+ self.log.error('Error updating children flows', error=e)
+
+ self.log.debug('logical flows update', flows_to_add=flows_to_add,
+ flows_to_remove=flows_to_remove)
+
+ for flow in flows_to_add:
+
+ try:
+ self.add_flow(flow)
+ except Exception as e:
+ self.log.error('failed to add flow', flow=flow, e=e)
+
+ for flow in flows_to_remove:
+
+ try:
+ self.remove_flow(flow)
+ except Exception as e:
+ self.log.error('failed to remove flow', flow=flow, e=e)
+
+ self.repush_all_different_flows()
+
def add_flow(self, flow):
self.log.debug('add flow', flow=flow)
classifier_info = dict()
@@ -459,6 +485,9 @@
gem_port_ids, intf_id, onu_id, uni_id
)
+ for gemport_id in gem_port_ids:
+ self.data_model.gemport_id_add(intf_id, onu_id, gemport_id)
+
return alloc_id, gem_port_ids
def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no,