VOL-1364 - OpenOMCI ONU Adpater concurrency issue with multiple UNIs
All tasks converted to exclusive until OMCI supports stop-and-wait protocol.
Multiple Tech Profile Instances operating in concurrency corrupt the shared data structures
in PON object used by multiple Tech Profile Specific / VLAN Tasks. Need to take a snapshot
of GEM/TCONTs from PON when launching each task to avoid the internal structure changing
"during" the task as other techprofile instances are processed.
Change-Id: Ide13c1017c737bc6ce30606741f46153926fb88e
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index 45bc45e..9fed3f9 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -392,7 +392,7 @@
self._create_gemports(uni_id, downstream_gem_port_attribute_list, alloc_id, "DOWNSTREAM")
def load_and_configure_tech_profile(self, uni_id, tp_path):
- self.log.debug("loading-tech-profile-configuration")
+ self.log.debug("loading-tech-profile-configuration", uni_id=uni_id, tp_path=tp_path)
if uni_id not in self._tp_service_specific_task:
self._tp_service_specific_task[uni_id] = dict()
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
index 6dea69e..48f712d 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_mib_download_task.py
@@ -202,7 +202,7 @@
# And re-enable the UNIs if needed
yield self.enable_uni(uni_port, False)
- self.deferred.callback('initial-download-success')
+ self.deferred.callback('initial-download-success')
except TimeoutError as e:
self.log.error('initial-download-failure', e=e)
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py
index 89d39e3..3e0178c 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_tp_service_specific_task.py
@@ -55,27 +55,25 @@
:param omci_agent: (OmciAdapterAgent) OMCI Adapter agent
:param device_id: (str) ONU Device ID
"""
-
- self.log = structlog.get_logger(device_id=handler.device_id)
- self.log.debug('function-entry')
+ log = structlog.get_logger(device_id=handler.device_id, uni_id=uni_id)
+ log.debug('function-entry')
super(BrcmTpServiceSpecificTask, self).__init__(BrcmTpServiceSpecificTask.name,
omci_agent,
handler.device_id,
- priority=TASK_PRIORITY)
- self._handler = handler
+ priority=TASK_PRIORITY,
+ exclusive=True)
+
+ self.log = log
+
self._onu_device = omci_agent.get_device(handler.device_id)
self._local_deferred = None
# Frame size
self._max_gem_payload = DEFAULT_GEM_PAYLOAD
- # TODO: only using a single UNI/ethernet port
- self._uni_port = self._handler.uni_ports[uni_id]
- self._uni_port_num = self._uni_port.mac_bridge_port_num
- self._ethernet_uni_entity_id = self._uni_port.entity_id
-
- self._pon = handler.pon_port
+ self._uni_port = handler.uni_ports[uni_id]
+ assert self._uni_port.uni_id == uni_id
# Port numbers
self._input_tpid = DEFAULT_TPID
@@ -89,13 +87,27 @@
# IDs set to None are discovered/set
self._mac_bridge_service_profile_entity_id = \
- self._handler.mac_bridge_service_profile_entity_id
+ handler.mac_bridge_service_profile_entity_id
self._ieee_mapper_service_profile_entity_id = \
- self._pon.ieee_mapper_service_profile_entity_id
+ handler.pon_port.ieee_mapper_service_profile_entity_id
self._mac_bridge_port_ani_entity_id = \
- self._pon.mac_bridge_port_ani_entity_id
+ handler.pon_port.mac_bridge_port_ani_entity_id
self._gal_enet_profile_entity_id = \
- self._handler.gal_enet_profile_entity_id
+ handler.gal_enet_profile_entity_id
+
+ # Extract the current set of TCONT and GEM Ports from the Handler's pon_port that are
+ # relevant to this task's UNI. It won't change. But, the underlying pon_port may change
+ # due to additional tasks on different UNIs. So, it we cannot use the pon_port affter
+ # this initializer
+ self._tconts = []
+ for tcont in handler.pon_port.tconts.itervalues():
+ if tcont.uni_id is not None and tcont.uni_id != self._uni_port.uni_id: continue
+ self._tconts.append(tcont);
+
+ self._gem_ports = []
+ for gem_port in handler.pon_port.gem_ports.itervalues():
+ if gem_port.uni_id is not None and gem_port.uni_id != self._uni_port.uni_id: continue
+ self._gem_ports.append(gem_port);
self.tcont_me_to_queue_map = dict()
self.uni_port_to_queue_map = dict()
@@ -178,8 +190,7 @@
tcont_idents = self._onu_device.query_mib(Tcont.class_id)
self.log.debug('tcont-idents', tcont_idents=tcont_idents)
- for tcont in self._handler.pon_port.tconts.itervalues():
- if tcont.uni_id is not None and tcont.uni_id != self._uni_port.uni_id: continue
+ for tcont in self._tconts:
free_entity_id = None
for k, v in tcont_idents.items():
alloc_check = v.get('attributes', {}).get('alloc_id', 0)
@@ -272,9 +283,7 @@
self.log.debug("ul-prior-q", ul_prior_q=self.tcont_me_to_queue_map)
self.log.debug("dl-prior-q", dl_prior_q=self.uni_port_to_queue_map)
- for gem_port in self._handler.pon_port.gem_ports.itervalues():
- if gem_port.uni_id is not None and gem_port.uni_id != self._uni_port.uni_id: continue
-
+ for gem_port in self._gem_ports:
# TODO: Traffic descriptor will be available after meter bands are available
tcont = gem_port.tcont
if tcont is None:
@@ -333,8 +342,8 @@
#
gem_entity_ids = [OmciNullPointer] * 8
- for gem_port in self._handler.pon_port.gem_ports.itervalues():
- if gem_port.uni_id is not None and gem_port.uni_id != self._uni_port.uni_id: continue
+ for gem_port in self._gem_ports:
+ self.log.debug("tp-gem-port", entity_id=gem_port.entity_id, uni_id=gem_port.uni_id)
if gem_port.direction == "upstream" or \
gem_port.direction == "bi-directional":
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
index 4eae51b..9f605d2 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_uni_lock_task.py
@@ -48,7 +48,7 @@
omci_agent,
device_id,
priority=priority,
- exclusive=False)
+ exclusive=True)
self._device = omci_agent.get_device(device_id)
self._lock = lock
self._results = None
diff --git a/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py b/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
index 5413a8f..a46db8c 100644
--- a/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
+++ b/voltha/adapters/brcm_openomci_onu/omci/brcm_vlan_filter_task.py
@@ -50,7 +50,7 @@
omci_agent,
device_id,
priority=priority,
- exclusive=False)
+ exclusive=True)
self._device = omci_agent.get_device(device_id)
self._uni_port = uni_port
self._set_vlan_id = set_vlan_id
diff --git a/voltha/adapters/brcm_openomci_onu/onu_gem_port.py b/voltha/adapters/brcm_openomci_onu/onu_gem_port.py
index b5e8f60..42457ee 100644
--- a/voltha/adapters/brcm_openomci_onu/onu_gem_port.py
+++ b/voltha/adapters/brcm_openomci_onu/onu_gem_port.py
@@ -280,7 +280,7 @@
dl_prior_q_entity_id):
self.log.debug('function-entry')
- self.log.debug('add-to-hardware', gem_id=self.gem_id,
+ self.log.debug('add-to-hardware', entity_id=self.entity_id, gem_id=self.gem_id,
tcont_entity_id=tcont_entity_id,
ieee_mapper_service_profile_entity_id=ieee_mapper_service_profile_entity_id,
gal_enet_profile_entity_id=gal_enet_profile_entity_id,
diff --git a/voltha/adapters/brcm_openomci_onu/pon_port.py b/voltha/adapters/brcm_openomci_onu/pon_port.py
index a5ee55d..fbde551 100644
--- a/voltha/adapters/brcm_openomci_onu/pon_port.py
+++ b/voltha/adapters/brcm_openomci_onu/pon_port.py
@@ -29,7 +29,7 @@
"""Wraps northbound-port/ANI support for ONU"""
# TODO: possibly get from olt
MIN_GEM_ENTITY_ID = 0x408
- MAX_GEM_ENTITY_ID = 0x40F
+ MAX_GEM_ENTITY_ID = 0x4FF # TODO: This limits is internal to specific ONU. It should be more "discoverable"?
def __init__(self, handler, port_no):
self.log = structlog.get_logger(device_id=handler.device_id, port_no=port_no)
@@ -184,7 +184,7 @@
:param reflow: (boolean) If true, force add (used during h/w resync)
:return: (deferred)
"""
- self.log.debug('function-entry')
+ self.log.debug('function-entry', tcont=tcont.alloc_id)
if not self._valid:
return # Deleting
@@ -192,7 +192,7 @@
if not reflow and tcont.alloc_id in self._tconts:
return # already created
- self.log.info('add', tcont=tcont, reflow=reflow)
+ self.log.info('add-tcont', tcont=tcont.alloc_id, reflow=reflow)
self._tconts[tcont.alloc_id] = tcont
def update_tcont_td(self, alloc_id, new_td):
@@ -251,7 +251,7 @@
:param reflow: (boolean) If true, force add (used during h/w resync)
:return: (deferred)
"""
- self.log.debug('function-entry', gem_port=gem_port)
+ self.log.debug('function-entry', gem_port=gem_port.gem_id)
if not self._valid:
return # Deleting
@@ -259,7 +259,7 @@
if not reflow and gem_port.gem_id in self._gem_ports:
return # nop
- self.log.info('add', gem_port=gem_port, reflow=reflow)
+ self.log.info('add-gem-port', gem_port=gem_port.gem_id, reflow=reflow)
self._gem_ports[(gem_port.gem_id, gem_port.direction)] = gem_port
@inlineCallbacks