VOL-1366: Prevent numerous reflows to BAL
Change-Id: I3bf5d9504688d69c05e03a366a5ad70d4c8b3c64
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 9b55091..00e5b4b 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -121,6 +121,7 @@
device.oper_status = OperStatus.ACTIVATING
self.adapter_agent.update_device(device)
+ self.logical_device_id = None
# If logical device does exist use it, else create one after connecting to device
if device.parent_id:
# logical device already exists
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index e298db2..acfcbfd 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -255,7 +255,7 @@
# between DS and US.
return
- self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
+ self.resource_mgr.free_flow_id(pon_intf, onu_id, uni_id, flow_id)
else:
self.log.error("invalid-info", uni_port_no=uni_port_no,
child_device_id=child_device_id)
@@ -534,35 +534,41 @@
flow_store_cookie = self._get_flow_store_cookie(classifier,
gemport_id)
- # One of the OLT platform (Broadcom BAL) requires that symmetric
- # flows require the same flow_id to be used across UL and DL.
- # Since HSIA flow is the only symmetric flow currently, we need to
- # re-use the flow_id across both direction. The 'flow_category'
- # takes priority over flow_cookie to find any available HSIA_FLOW
- # id for the ONU.
- flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
- flow_store_cookie,
- HSIA_FLOW)
- if flow_id is None:
- self.log.error("hsia-flow-unavailable")
- return
- flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
- flow_type=direction, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
- gemport_id=gemport_id,
- classifier=self.mk_classifier(classifier),
- action=self.mk_action(action),
- priority=logical_flow.priority,
- port_no=port_no,
- cookie=logical_flow.cookie)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
- if self.add_flow_to_device(flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(flow,
- flow_store_cookie,
- HSIA_FLOW)
- self.update_flow_info_to_kv_store(flow.access_intf_id,
- flow.onu_id, flow.uni_id,
- flow.flow_id, flow_info)
+ # One of the OLT platform (Broadcom BAL) requires that symmetric
+ # flows require the same flow_id to be used across UL and DL.
+ # Since HSIA flow is the only symmetric flow currently, we need to
+ # re-use the flow_id across both direction. The 'flow_category'
+ # takes priority over flow_cookie to find any available HSIA_FLOW
+ # id for the ONU.
+ flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
+ flow_store_cookie,
+ HSIA_FLOW)
+ if flow_id is None:
+ self.log.error("hsia-flow-unavailable")
+ return
+
+ flow = openolt_pb2.Flow(
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
+ flow_type=direction, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+ gemport_id=gemport_id,
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action),
+ priority=logical_flow.priority,
+ port_no=port_no,
+ cookie=logical_flow.cookie)
+
+ if self.add_flow_to_device(flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(flow,
+ flow_store_cookie,
+ HSIA_FLOW)
+ self.update_flow_info_to_kv_store(flow.access_intf_id,
+ flow.onu_id, flow.uni_id,
+ flow.flow_id, flow_info)
def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
alloc_id, gemport_id):
@@ -579,27 +585,31 @@
flow_store_cookie = self._get_flow_store_cookie(classifier,
gemport_id)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
+ flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
- flow_id = self.resource_mgr.get_flow_id(
- intf_id, onu_id, uni_id, flow_store_cookie
- )
- dhcp_flow = openolt_pb2.Flow(
- onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
- access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
- priority=logical_flow.priority,
- classifier=self.mk_classifier(classifier),
- action=self.mk_action(action),
- port_no=port_no,
- cookie=logical_flow.cookie)
+ dhcp_flow = openolt_pb2.Flow(
+ onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
+ access_intf_id=intf_id, gemport_id=gemport_id,
+ alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+ priority=logical_flow.priority,
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action),
+ port_no=port_no,
+ cookie=logical_flow.cookie)
- if self.add_flow_to_device(dhcp_flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
- self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
- dhcp_flow.onu_id,
- dhcp_flow.uni_id,
- dhcp_flow.flow_id,
- flow_info)
+ if self.add_flow_to_device(dhcp_flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
+ self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
+ dhcp_flow.onu_id,
+ dhcp_flow.uni_id,
+ dhcp_flow.flow_id,
+ flow_info)
def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
@@ -614,34 +624,39 @@
flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
gemport_id)
- # Add Upstream EAPOL Flow.
- uplink_flow_id = self.resource_mgr.get_flow_id(
- intf_id, onu_id, uni_id, flow_store_cookie
- )
- upstream_flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
- flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
- gemport_id=gemport_id,
- classifier=self.mk_classifier(uplink_classifier),
- action=self.mk_action(uplink_action),
- priority=logical_flow.priority,
- port_no=port_no,
- cookie=logical_flow.cookie)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
+ # Add Upstream EAPOL Flow.
+ uplink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
- logical_flow = copy.deepcopy(logical_flow)
- logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
- vlan_id | 0x1000)]))
- logical_flow.match.type = OFPMT_OXM
+ upstream_flow = openolt_pb2.Flow(
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
+ flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+ gemport_id=gemport_id,
+ classifier=self.mk_classifier(uplink_classifier),
+ action=self.mk_action(uplink_action),
+ priority=logical_flow.priority,
+ port_no=port_no,
+ cookie=logical_flow.cookie)
- if self.add_flow_to_device(upstream_flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(upstream_flow,
- flow_store_cookie)
- self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
- upstream_flow.onu_id,
- upstream_flow.uni_id,
- upstream_flow.flow_id,
- flow_info)
+ logical_flow = copy.deepcopy(logical_flow)
+ logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
+ vlan_id | 0x1000)]))
+ logical_flow.match.type = OFPMT_OXM
+
+ if self.add_flow_to_device(upstream_flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ flow_store_cookie)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.uni_id,
+ upstream_flow.flow_id,
+ flow_info)
if vlan_id == DEFAULT_MGMT_VLAN:
# Add Downstream EAPOL Flow, Only for first EAP flow (BAL
@@ -667,43 +682,47 @@
flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
gemport_id)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
- downlink_flow_id = self.resource_mgr.get_flow_id(
- intf_id, onu_id, uni_id, flow_store_cookie
- )
+ downlink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
- downstream_flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
- flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
- gemport_id=gemport_id,
- classifier=self.mk_classifier(downlink_classifier),
- action=self.mk_action(downlink_action),
- priority=logical_flow.priority,
- port_no=port_no,
- cookie=logical_flow.cookie)
+ downstream_flow = openolt_pb2.Flow(
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
+ flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+ gemport_id=gemport_id,
+ classifier=self.mk_classifier(downlink_classifier),
+ action=self.mk_action(downlink_action),
+ priority=logical_flow.priority,
+ port_no=port_no,
+ cookie=logical_flow.cookie)
- downstream_logical_flow = ofp_flow_stats(
- id=logical_flow.id, cookie=logical_flow.cookie,
- table_id=logical_flow.table_id, priority=logical_flow.priority,
- flags=logical_flow.flags)
+ downstream_logical_flow = ofp_flow_stats(
+ id=logical_flow.id, cookie=logical_flow.cookie,
+ table_id=logical_flow.table_id, priority=logical_flow.priority,
+ flags=logical_flow.flags)
- downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
- fd.in_port(fd.get_out_port(logical_flow)),
- fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
- downstream_logical_flow.match.type = OFPMT_OXM
+ downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
+ fd.in_port(fd.get_out_port(logical_flow)),
+ fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
+ downstream_logical_flow.match.type = OFPMT_OXM
- downstream_logical_flow.instructions.extend(
- fd.mk_instructions_from_actions([fd.output(
- self.platform.mk_uni_port_num(intf_id, onu_id, uni_id))]))
+ downstream_logical_flow.instructions.extend(
+ fd.mk_instructions_from_actions([fd.output(
+ self.platform.mk_uni_port_num(intf_id, onu_id, uni_id))]))
- if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
- flow_info = self._get_flow_info_as_json_blob(downstream_flow,
- flow_store_cookie)
- self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
- downstream_flow.onu_id,
- downstream_flow.uni_id,
- downstream_flow.flow_id,
- flow_info)
+ if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+ flow_store_cookie)
+ self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
+ downstream_flow.onu_id,
+ downstream_flow.uni_id,
+ downstream_flow.flow_id,
+ flow_info)
def repush_all_different_flows(self):
# Check if the device is supposed to have flows, if so add them
@@ -746,28 +765,35 @@
onu_id = -1
uni_id = -1
flow_store_cookie = self._get_flow_store_cookie(classifier)
- flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
- flow_store_cookie)
- downstream_flow = openolt_pb2.Flow(
- access_intf_id=-1, # access_intf_id not required
- onu_id=onu_id, # onu_id not required
- uni_id=uni_id, # uni_id not used
- flow_id=flow_id,
- flow_type=DOWNSTREAM,
- network_intf_id=network_intf_id,
- gemport_id=-1, # gemport_id not required
- classifier=self.mk_classifier(classifier),
- action=self.mk_action(action),
- priority=logical_flow.priority,
- port_no=port_no,
- cookie=logical_flow.cookie)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(network_intf_id, onu_id, uni_id,
+ flow_store_cookie):
+ self.log.debug('flow-exists--not-re-adding')
+ else:
+ flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
+ flow_store_cookie)
- self.log.debug('add lldp downstream trap', classifier=classifier,
- action=action, flow=downstream_flow, port_no=port_no)
- if self.add_flow_to_device(downstream_flow, logical_flow):
- self.update_flow_info_to_kv_store(network_intf_id, onu_id, uni_id,
- flow_id, downstream_flow)
+ downstream_flow = openolt_pb2.Flow(
+ access_intf_id=-1, # access_intf_id not required
+ onu_id=onu_id, # onu_id not required
+ uni_id=uni_id, # uni_id not used
+ flow_id=flow_id,
+ flow_type=DOWNSTREAM,
+ network_intf_id=network_intf_id,
+ gemport_id=-1, # gemport_id not required
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action),
+ priority=logical_flow.priority,
+ port_no=port_no,
+ cookie=logical_flow.cookie)
+
+ self.log.debug('add lldp downstream trap', classifier=classifier,
+ action=action, flow=downstream_flow, port_no=port_no)
+ if self.add_flow_to_device(downstream_flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+ flow_store_cookie)
+ self.update_flow_info_to_kv_store(network_intf_id, onu_id, uni_id,
+ flow_id, flow_info)
def mk_classifier(self, classifier_info):
@@ -884,8 +910,8 @@
return True
def update_flow_info_to_kv_store(self, intf_id, onu_id, uni_id, flow_id, flow):
- self.resource_mgr.update_flow_id_info_for_uni(intf_id, onu_id, uni_id,
- flow_id, flow)
+ self.resource_mgr.update_flow_id_info(intf_id, onu_id, uni_id,
+ flow_id, flow)
def register_flow(self, logical_flow, device_flow):
self.log.debug('registering flow in device',
@@ -962,7 +988,7 @@
get_tech_profile_instance(
DEFAULT_TECH_PROFILE_TABLE_ID,
ofp_port_name)
- flow_ids = self.resource_mgr.get_current_flow_ids_for_uni(pon_port, onu_id, uni_id)
+ flow_ids = self.resource_mgr.get_current_flow_ids(pon_port, onu_id, uni_id)
self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
for flow_id in flow_ids:
flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id, uni_id, flow_id)
@@ -980,7 +1006,7 @@
else:
raise grpc_e
- self.resource_mgr.free_flow_id_for_uni(pon_port, onu_id, uni_id, flow_id)
+ self.resource_mgr.free_flow_id(pon_port, onu_id, uni_id, flow_id)
try:
tconts = self.tech_profile[pon_port].get_tconts(tech_profile_instance)
@@ -1028,8 +1054,19 @@
json_blob['flow_store_cookie'] = flow_store_cookie
if flow_category is not None:
json_blob['flow_category'] = flow_category
- flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
- flow.onu_id, flow.uni_id, flow.flow_id)
+
+ # For flows which trap out of the NNI, the access_intf_id is invalid (set to -1).
+ # In such cases, we need to refer to the network_intf_id.
+ if flow.access_intf_id != -1:
+ flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
+ flow.onu_id, flow.uni_id,
+ flow.flow_id)
+ else:
+ # Case of LLDP trap flow from the NNI. We can't use flow.access_intf_id
+ # in that case, as it is invalid. We use flow.network_intf_id.
+ flow_info = self.resource_mgr.get_flow_id_info(flow.network_intf_id,
+ flow.onu_id, flow.uni_id,
+ flow.flow_id)
if flow_info is None:
flow_info = list()
@@ -1060,3 +1097,4 @@
self.nni_intf_id = self.platform.intf_id_from_nni_port_num(logical_port.ofp_port.port_no)
self.log.debug("nni-intf-d ", nni_intf_id=self.nni_intf_id)
return self.nni_intf_id
+
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
index 760471d..ae94343 100644
--- a/voltha/adapters/openolt/openolt_resource_manager.py
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -140,15 +140,15 @@
return onu_id
- def get_flow_id(self, pon_intf_id, onu_id, uni_id, flow_store_cookie,
+ def get_flow_id(self, intf_id, onu_id, uni_id, flow_store_cookie,
flow_category=None):
- pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ intf_onu_id = (intf_id, onu_id, uni_id)
try:
- flow_ids = self.resource_mgrs[pon_intf_id]. \
- get_current_flow_ids_for_onu(pon_intf_onu_id)
+ flow_ids = self.resource_mgrs[intf_id]. \
+ get_current_flow_ids_for_onu(intf_onu_id)
if flow_ids is not None:
for flow_id in flow_ids:
- flows = self.get_flow_id_info(pon_intf_id, onu_id, uni_id, flow_id)
+ flows = self.get_flow_id_info(intf_id, onu_id, uni_id, flow_id)
assert (isinstance(flows, list))
for flow in flows:
@@ -161,27 +161,39 @@
except Exception as e:
self.log.error("error-retrieving-flow-info", e=e)
- flow_id = self.resource_mgrs[pon_intf_id].get_resource_id(
- pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
+ flow_id = self.resource_mgrs[intf_id].get_resource_id(
+ intf_onu_id[0], PONResourceManager.FLOW_ID)
if flow_id is not None:
- self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(
- pon_intf_onu_id, flow_id
+ self.resource_mgrs[intf_id].update_flow_id_for_onu(
+ intf_onu_id, flow_id
)
return flow_id
- def get_flow_id_info(self, pon_intf_id, onu_id, uni_id, flow_id):
- pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
- return self.resource_mgrs[pon_intf_id].get_flow_id_info(pon_intf_onu_id, flow_id)
+ def get_flow_id_info(self, intf_id, onu_id, uni_id, flow_id):
+ '''
+ Note: For flows which trap from the NNI and not really associated with any particular
+ ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
+ '''
+ intf_onu_id = (intf_id, onu_id, uni_id)
+ return self.resource_mgrs[intf_id].get_flow_id_info(intf_onu_id, flow_id)
- def get_current_flow_ids_for_uni(self, pon_intf_id, onu_id, uni_id):
- pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
- return self.resource_mgrs[pon_intf_id].get_current_flow_ids_for_onu(pon_intf_onu_id)
+ def get_current_flow_ids(self, intf_id, onu_id, uni_id):
+ '''
+ Note: For flows which trap from the NNI and not really associated with any particular
+ ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
+ '''
+ intf_onu_id = (intf_id, onu_id, uni_id)
+ return self.resource_mgrs[intf_id].get_current_flow_ids_for_onu(intf_onu_id)
- def update_flow_id_info_for_uni(self, pon_intf_id, onu_id, uni_id, flow_id, flow_data):
- pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
- return self.resource_mgrs[pon_intf_id].update_flow_id_info_for_onu(
- pon_intf_onu_id, flow_id, flow_data)
+ def update_flow_id_info(self, intf_id, onu_id, uni_id, flow_id, flow_data):
+ '''
+ Note: For flows which trap from the NNI and not really associated with any particular
+ ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
+ '''
+ intf_onu_id = (intf_id, onu_id, uni_id)
+ return self.resource_mgrs[intf_id].update_flow_id_info_for_onu(
+ intf_onu_id, flow_id, flow_data)
def get_alloc_id(self, pon_intf_onu_id):
# Derive the pon_intf from the pon_intf_onu_id tuple
@@ -274,14 +286,14 @@
self.resource_mgrs[pon_intf_id].remove_resource_map(
pon_intf_onu_id)
- def free_flow_id_for_uni(self, pon_intf_id, onu_id, uni_id, flow_id):
- self.resource_mgrs[pon_intf_id].free_resource_id(
- pon_intf_id, PONResourceManager.FLOW_ID, flow_id)
- pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
- self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(pon_intf_onu_id,
- flow_id, False)
- self.resource_mgrs[pon_intf_id].remove_flow_id_info(pon_intf_onu_id,
- flow_id)
+ def free_flow_id(self, intf_id, onu_id, uni_id, flow_id):
+ self.resource_mgrs[intf_id].free_resource_id(
+ intf_id, PONResourceManager.FLOW_ID, flow_id)
+ intf_onu_id = (intf_id, onu_id, uni_id)
+ self.resource_mgrs[intf_id].update_flow_id_for_onu(intf_onu_id,
+ flow_id, False)
+ self.resource_mgrs[intf_id].remove_flow_id_info(intf_onu_id,
+ flow_id)
def free_pon_resources_for_onu(self, pon_intf_id_onu_id):
@@ -433,3 +445,24 @@
# Make sure loaded range fits the platform bit encoding ranges
resource_mgr.update_ranges(uni_id_start_idx=0, uni_id_end_idx=OpenOltPlatform.MAX_UNIS_PER_ONU-1)
+
+ def is_flow_cookie_on_kv_store(self, intf_id, onu_id, uni_id, flow_store_cookie):
+ '''
+ Note: For flows which trap from the NNI and not really associated with any particular
+ ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
+ '''
+ intf_onu_id = (intf_id, onu_id, uni_id)
+ try:
+ flow_ids = self.resource_mgrs[intf_id]. \
+ get_current_flow_ids_for_onu(intf_onu_id)
+ if flow_ids is not None:
+ for flow_id in flow_ids:
+ flows = self.get_flow_id_info(intf_id, onu_id, uni_id, flow_id)
+ assert (isinstance(flows, list))
+ for flow in flows:
+ if flow['flow_store_cookie'] == flow_store_cookie:
+ return True
+ except Exception as e:
+ self.log.error("error-retrieving-flow-info", e=e)
+
+ return False