VOL-1344:
1) Place all of resource manager and tech profile KV store data under /service/voltha
2) Ensure flow_ids are released on the KV store when device is deleted
3) Ensure pon resources are re-used on voltha restart
4) Few other code re-organization and bug fixes
Change-Id: Ia7bc8062d88b7a8eec5d4b87209536d81b115575
diff --git a/common/pon_resource_manager/resource_kv_store.py b/common/pon_resource_manager/resource_kv_store.py
index a1a5c14..307b0cd 100644
--- a/common/pon_resource_manager/resource_kv_store.py
+++ b/common/pon_resource_manager/resource_kv_store.py
@@ -21,7 +21,7 @@
from voltha.core.config.config_backend import EtcdStore
# KV store uses this prefix to store resource info
-PATH_PREFIX = 'resource_manager/{}'
+PATH_PREFIX = 'service/voltha/resource_manager/{}'
class ResourceKvStore(object):
diff --git a/common/pon_resource_manager/resource_manager.py b/common/pon_resource_manager/resource_manager.py
index a88b407..bdb45ee 100644
--- a/common/pon_resource_manager/resource_manager.py
+++ b/common/pon_resource_manager/resource_manager.py
@@ -578,7 +578,17 @@
else False
"""
status = False
-
+ known_resource_types = [PONResourceManager.ONU_ID,
+ PONResourceManager.ALLOC_ID,
+ PONResourceManager.GEMPORT_ID,
+ PONResourceManager.FLOW_ID]
+ if resource_type not in known_resource_types:
+ self._log.error("unknown-resource-type",
+ resource_type=resource_type)
+ return status
+ if release_content is None:
+ self._log.debug("nothing-to-release")
+ return status
# delegate to the master instance if sharing enabled across instances
shared_resource_mgr = self.shared_resource_mgrs[self.shared_idx_by_type[resource_type]]
if shared_resource_mgr is not None and shared_resource_mgr is not self:
@@ -590,17 +600,13 @@
try:
resource = self._get_resource(path)
- if resource is not None and (
- resource_type == PONResourceManager.ONU_ID or
- resource_type == PONResourceManager.FLOW_ID):
- self._release_id(resource, release_content)
- elif resource is not None and (
- resource_type == PONResourceManager.ALLOC_ID or
- resource_type == PONResourceManager.GEMPORT_ID):
+ if resource is None:
+ raise Exception("get-resource-failed")
+ if isinstance(release_content, list):
for content in release_content:
self._release_id(resource, content)
else:
- raise Exception("get-resource-failed")
+ self._release_id(resource, release_content)
self._log.debug("Free-" + resource_type + "-success", path=path)
@@ -673,16 +679,40 @@
:param pon_intf_onu_id: reference of PON interface id and onu id
"""
# remove pon_intf_onu_id tuple to alloc_ids map
- alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- self._kv_store.remove_from_kv_store(alloc_id_path)
+ try:
+ alloc_id_path = PONResourceManager.ALLOC_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(alloc_id_path)
+ except Exception as e:
+ self._log.error("error-removing-alloc-id", e=e)
- # remove pon_intf_onu_id tuple to gemport_ids map
- gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
- self.device_id, str(pon_intf_onu_id)
- )
- self._kv_store.remove_from_kv_store(gemport_id_path)
+ try:
+ # remove pon_intf_onu_id tuple to gemport_ids map
+ gemport_id_path = PONResourceManager.GEMPORT_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id)
+ )
+ self._kv_store.remove_from_kv_store(gemport_id_path)
+ except Exception as e:
+ self._log.error("error-removing-gem-ports", e=e)
+
+ flow_id_path = PONResourceManager.FLOW_ID_RESOURCE_MAP_PATH.format(
+ self.device_id, str(pon_intf_onu_id))
+ flow_ids = self._kv_store.get_from_kv_store(flow_id_path)
+
+ if flow_ids and isinstance(flow_ids, list):
+ for flow_id in flow_ids:
+ try:
+ flow_id_info_path = PONResourceManager.FLOW_ID_INFO_PATH.format(
+ self.device_id, str(pon_intf_onu_id), flow_id)
+ self._kv_store.remove_from_kv_store(flow_id_info_path)
+ except Exception as e:
+ self._log.error("error-removing-flow-info", flow_id=flow_id, e=e)
+ continue
+ try:
+ self._kv_store.remove_from_kv_store(flow_id_path)
+ except Exception as e:
+ self._log.error("error-removing-flow-ids", e=e)
def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
"""
diff --git a/common/tech_profile/tech_profile.py b/common/tech_profile/tech_profile.py
index c3a9993..150667e 100644
--- a/common/tech_profile/tech_profile.py
+++ b/common/tech_profile/tech_profile.py
@@ -24,7 +24,6 @@
from voltha.registry import registry
from voltha.adapters.openolt.protos import openolt_pb2
-
# logger
log = structlog.get_logger()
@@ -126,7 +125,7 @@
pbits = ['0b11111111']
# Tech profile path prefix in kv store
- KV_STORE_TECH_PROFILE_PATH_PREFIX = 'voltha/technology_profiles'
+ KV_STORE_TECH_PROFILE_PATH_PREFIX = 'service/voltha/technology_profiles'
# Tech profile path in kv store
TECH_PROFILE_PATH = '{}/{}' # <technology>/<table_id>
@@ -174,13 +173,13 @@
host, port = self.args.etcd.split(':', 1)
self._kv_store = EtcdStore(
host, port, TechProfile.
- KV_STORE_TECH_PROFILE_PATH_PREFIX)
+ KV_STORE_TECH_PROFILE_PATH_PREFIX)
elif self.args.backend == 'consul':
# KV store's IP Address and PORT
host, port = self.args.consul.split(':', 1)
self._kv_store = ConsulStore(
host, port, TechProfile.
- KV_STORE_TECH_PROFILE_PATH_PREFIX)
+ KV_STORE_TECH_PROFILE_PATH_PREFIX)
# self.tech_profile_instance_store = dict()
except Exception as e:
@@ -257,17 +256,14 @@
path=path, tech_profile_instance=None, exception=e)
return None
- def delete_tech_profile_instance(self, table_id, uni_port_name):
- # path to delete tech profile instance json from kv store
- path = TechProfile.TECH_PROFILE_INSTANCE_PATH.format(
- self.resource_mgr.technology, table_id, uni_port_name)
+ def delete_tech_profile_instance(self, tp_path):
try:
- del self._kv_store[path]
- log.debug("Delete-tech-profile-instance-success", path=path)
+ del self._kv_store[tp_path]
+ log.debug("Delete-tech-profile-instance-success", path=tp_path)
return True
except Exception as e:
- log.debug("Delete-tech-profile-instance-failed", path=path,
+ log.debug("Delete-tech-profile-instance-failed", path=tp_path,
exception=e)
return False
@@ -537,7 +533,7 @@
gemport_list = list()
if isinstance(gem_ports, int):
- gemport_list.append(gem_ports)
+ gemport_list.append(gem_ports)
elif isinstance(gem_ports, list):
for gem in gem_ports:
gemport_list.append(gem)
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index fde2591..45bc45e 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -664,8 +664,10 @@
reactor.callLater(0, self._onu_omci_device.stop)
# Let TP download happen again
- for i in self._tp_service_specific_task: i.clear()
- for i in self._tech_profile_download_done: i.clear()
+ for uni_id in self._tp_service_specific_task:
+ self._tp_service_specific_task[uni_id].clear()
+ for uni_id in self._tech_profile_download_done:
+ self._tech_profile_download_done[uni_id].clear()
self.disable_ports(onu_device)
onu_device.reason = "stopping-openomci"
@@ -685,8 +687,10 @@
reactor.callLater(0, self._onu_omci_device.stop)
# Let TP download happen again
- for i in self._tp_service_specific_task: i.clear()
- for i in self._tech_profile_download_done: i.clear()
+ for uni_id in self._tp_service_specific_task:
+ self._tp_service_specific_task[uni_id].clear()
+ for uni_id in self._tech_profile_download_done:
+ self._tech_profile_download_done[uni_id].clear()
self.disable_ports(onu_device)
onu_device.reason = "stopping-openomci"
@@ -731,8 +735,10 @@
reactor.callLater(0, self._onu_omci_device.stop)
# Let TP download happen again
- for i in self._tp_service_specific_task: i.clear()
- for i in self._tech_profile_download_done: i.clear()
+ for uni_id in self._tp_service_specific_task:
+ self._tp_service_specific_task[uni_id].clear()
+ for uni_id in self._tech_profile_download_done:
+ self._tech_profile_download_done[uni_id].clear()
self.disable_ports(device)
device.oper_status = OperStatus.UNKNOWN
@@ -1035,4 +1041,4 @@
self._pon._port = pon_port
self.adapter_agent.add_port_reference_to_parent(self.device_id,
- pon_port)
\ No newline at end of file
+ pon_port)
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 0858005..7332c4a 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -1007,17 +1007,20 @@
# TODO FIXME - For each uni.
# TODO FIXME - Flows are not deleted
uni_id = 0 # FIXME
+ self.flow_mgr.delete_tech_profile_instance(
+ child_device.proxy_address.channel_id,
+ child_device.proxy_address.onu_id,
+ uni_id
+ )
pon_intf_id_onu_id = (child_device.proxy_address.channel_id,
child_device.proxy_address.onu_id,
uni_id)
- alloc_id = self.resource_mgr.get_alloc_id(pon_intf_id_onu_id)
# Free any PON resources that were reserved for the ONU
self.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
onu = openolt_pb2.Onu(intf_id=child_device.proxy_address.channel_id,
onu_id=child_device.proxy_address.onu_id,
- serial_number=serial_number,
- alloc_id=alloc_id)
+ serial_number=serial_number)
self.stub.DeleteOnu(onu)
def reboot(self):
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 1b996e7..3b08edc 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -17,6 +17,8 @@
from twisted.internet import reactor
import grpc
from google.protobuf.json_format import MessageToDict
+import hashlib
+from simplejson import dumps
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
@@ -30,11 +32,6 @@
# Flow categories
HSIA_FLOW = "HSIA_FLOW"
-DHCP_FLOW = "DHCP_FLOW"
-EAPOL_PRIMARY_FLOW = "EAPOL_PRIMARY_FLOW"
-EAPOL_SECONDARY_FLOW = "EAPOL_SECONDARY_FLOW"
-IGMP_FLOW = "IGMP_FLOW"
-LLDP_FLOW = "LLDP_FLOW"
EAP_ETH_TYPE = 0x888e
LLDP_ETH_TYPE = 0x88cc
@@ -71,7 +68,6 @@
TRAP_TO_HOST = 'trap_to_host'
-
class OpenOltFlowMgr(object):
def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
@@ -215,7 +211,6 @@
def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
uni_port_no = None
- flow_category = HSIA_FLOW # default
child_device_id = None
if flow_direction == UPSTREAM:
for field in fd.get_ofb_fields(flow):
@@ -223,15 +218,6 @@
is_uni, child_device_id = self._is_uni_port(field.port)
if is_uni:
uni_port_no = field.port
- elif field.type == fd.IP_PROTO:
- if field.ip_proto == IGMP_PROTO:
- flow_category = IGMP_FLOW
- elif field.type == fd.ETH_TYPE:
- if field.eth_type == EAP_ETH_TYPE:
- flow_category = EAPOL_PRIMARY_FLOW
- elif field.eth_type == LLDP_ETH_TYPE:
- flow_category = LLDP_FLOW
-
elif flow_direction == DOWNSTREAM:
for field in fd.get_ofb_fields(flow):
if field.type == fd.METADATA:
@@ -248,7 +234,7 @@
if is_uni:
uni_port_no = action.output.port
- if flow_category and child_device_id:
+ if child_device_id:
child_device = self.adapter_agent.get_device(child_device_id)
pon_intf = child_device.proxy_address.channel_id
onu_id = child_device.proxy_address.onu_id
@@ -270,7 +256,6 @@
self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
else:
self.log.error("invalid-info", uni_port_no=uni_port_no,
- flow_category=flow_category,
child_device_id=child_device_id)
def remove_flow(self, flow):
@@ -333,6 +318,22 @@
ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
return ofp_port_name
+ def get_tp_path(self, intf_id, ofp_port_name):
+ # FIXME Should get Table id form the flow, as of now hardcoded to
+ # DEFAULT_TECH_PROFILE_TABLE_ID (64)
+ # 'tp_path' contains the suffix part of the tech_profile_instance path.
+ # The prefix to the 'tp_path' should be set to \
+ # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
+ return self.tech_profile[intf_id]. \
+ get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+
+ def delete_tech_profile_instance(self, intf_id, onu_id, uni_id):
+ # Remove the TP instance associated with the ONU
+ ofp_port_name = self._get_ofp_port_name(intf_id, onu_id, uni_id)
+ tp_path = self.get_tp_path(intf_id, ofp_port_name)
+ return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
+
def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
action, flow):
@@ -372,7 +373,6 @@
if vlan_id is not None:
self.add_eapol_flow(
intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
- eapol_flow_category=EAPOL_SECONDARY_FLOW,
vlan_id=vlan_id)
parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
onu_device = self.adapter_agent.get_child_device(self.device_id,
@@ -383,14 +383,7 @@
self.log.error("port-name-not-found")
return
- # FIXME Should get Table id form the flow, as of now hardcoded to
- # DEFAULT_TECH_PROFILE_TABLE_ID (64)
- # 'tp_path' contains the suffix part of the tech_profile_instance path.
- # The prefix to the 'tp_path' should be set to \
- # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
- tp_path = self.tech_profile[intf_id]. \
- get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
- ofp_port_name)
+ tp_path = self.get_tp_path(intf_id, ofp_port_name)
self.log.debug('Load-tech-profile-request-to-brcm-handler',
tp_path=tp_path)
@@ -418,6 +411,16 @@
def create_tcont_gemport(self, intf_id, onu_id, uni_id, table_id):
alloc_id, gem_port_ids = None, None
+ pon_intf_onu_id = (intf_id, onu_id)
+
+ # If we already have allocated alloc_id and gem_ports earlier, render them
+ alloc_id = \
+ self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+ gem_port_ids = \
+ self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
+ if alloc_id is not None and gem_port_ids is not None:
+ return alloc_id, gem_port_ids
+
try:
(ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
if ofp_port_name is None:
@@ -437,31 +440,30 @@
tech_profile_instance = self.tech_profile[intf_id]. \
create_tech_profile_instance(table_id, ofp_port_name,
intf_id)
- if tech_profile_instance is not None:
-
- # upstream scheduler
- us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
- tech_profile_instance)
- # downstream scheduler
- ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
- tech_profile_instance)
- # create Tcont
- tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
- us_scheduler,
- ds_scheduler)
-
- self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
- onu_id=onu_id,
- uni_id=uni_id,
- port_no=ofp_port_no,
- tconts=tconts))
- else:
+ if tech_profile_instance is None:
raise Exception('Tech-profile-instance-creation-failed')
else:
self.log.debug(
'Tech-profile-instance-already-exist-for-given port-name',
ofp_port_name=ofp_port_name)
+ # upstream scheduler
+ us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
+ tech_profile_instance)
+ # downstream scheduler
+ ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
+ tech_profile_instance)
+ # create Tcont
+ tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
+ us_scheduler,
+ ds_scheduler)
+
+ self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ tconts=tconts))
+
# Fetch alloc id and gemports from tech profile instance
alloc_id = tech_profile_instance.us_scheduler.alloc_id
gem_port_ids = []
@@ -469,7 +471,7 @@
tech_profile_instance.upstream_gem_port_attribute_list)):
gem_port_ids.append(
tech_profile_instance.upstream_gem_port_attribute_list[i].
- gemport_id)
+ gemport_id)
except BaseException as e:
self.log.exception(exception=e)
@@ -504,8 +506,7 @@
(eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
if eap_active:
self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
- gemport_id, eapol_flow_category=EAPOL_SECONDARY_FLOW,
- vlan_id=uplink_classifier[VLAN_VID])
+ gemport_id, vlan_id=uplink_classifier[VLAN_VID])
def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, downlink_classifier,
downlink_action, flow, alloc_id, gemport_id):
@@ -521,7 +522,18 @@
def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier, action,
direction, logical_flow, alloc_id, gemport_id):
- flow_id = self.resource_mgr.get_hsia_flow_for_uni(intf_id, onu_id, uni_id, gemport_id)
+ flow_store_cookie = self._get_flow_store_cookie(classifier,
+ gemport_id)
+
+ # One of the OLT platform (Broadcom BAL) requires that symmetric
+ # flows require the same flow_id to be used across UL and DL.
+ # Since HSIA flow is the only symmetric flow currently, we need to
+ # re-use the flow_id across both direction. The 'flow_category'
+ # takes priority over flow_cookie to find any available HSIA_FLOW
+ # id for the ONU.
+ flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
+ flow_store_cookie,
+ HSIA_FLOW)
if flow_id is None:
self.log.error("hsia-flow-unavailable")
return
@@ -537,8 +549,11 @@
cookie=logical_flow.cookie)
if self.add_flow_to_device(flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(flow, HSIA_FLOW)
- self.update_flow_info_to_kv_store(flow.access_intf_id, flow.onu_id, flow.uni_id,
+ flow_info = self._get_flow_info_as_json_blob(flow,
+ flow_store_cookie,
+ HSIA_FLOW)
+ self.update_flow_info_to_kv_store(flow.access_intf_id,
+ flow.onu_id, flow.uni_id,
flow.flow_id, flow_info)
def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
@@ -554,7 +569,12 @@
classifier[PACKET_TAG_TYPE] = SINGLE_TAG
classifier.pop(VLAN_VID, None)
- flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+ flow_store_cookie = self._get_flow_store_cookie(classifier,
+ gemport_id)
+
+ flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
dhcp_flow = openolt_pb2.Flow(
onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
@@ -567,7 +587,7 @@
cookie=logical_flow.cookie)
if self.add_flow_to_device(dhcp_flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(dhcp_flow, DHCP_FLOW)
+ flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
dhcp_flow.onu_id,
dhcp_flow.uni_id,
@@ -575,8 +595,7 @@
flow_info)
def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
- gemport_id, eapol_flow_category=EAPOL_PRIMARY_FLOW,
- vlan_id=DEFAULT_MGMT_VLAN):
+ gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
uplink_classifier = dict()
uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
@@ -586,11 +605,12 @@
uplink_action = dict()
uplink_action[TRAP_TO_HOST] = True
+ flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
+ gemport_id)
# Add Upstream EAPOL Flow.
- if eapol_flow_category == EAPOL_PRIMARY_FLOW:
- uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
- else:
- uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+ uplink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
upstream_flow = openolt_pb2.Flow(
access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
@@ -608,29 +628,26 @@
logical_flow.match.type = OFPMT_OXM
if self.add_flow_to_device(upstream_flow, logical_flow):
- if eapol_flow_category == EAPOL_PRIMARY_FLOW:
- flow_info = self._get_flow_info_as_json_blob(upstream_flow,
- EAPOL_PRIMARY_FLOW)
- self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
- upstream_flow.onu_id,
- upstream_flow.uni_id,
- upstream_flow.flow_id,
- flow_info)
- else:
- flow_info = self._get_flow_info_as_json_blob(upstream_flow,
- EAPOL_SECONDARY_FLOW)
- self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
- upstream_flow.onu_id,
- upstream_flow.uni_id,
- upstream_flow.flow_id,
- flow_info)
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ flow_store_cookie)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.uni_id,
+ upstream_flow.flow_id,
+ flow_info)
if vlan_id == DEFAULT_MGMT_VLAN:
# Add Downstream EAPOL Flow, Only for first EAP flow (BAL
# requirement)
- # FIXME this is actually not used. But, here to be consistent with the upstream tagging from ONU.
- # It needs refactored to be completely removed
- special_vlan_downstream_flow = 4091 # 4000 - onu_id
+ # On one of the platforms (Broadcom BAL), when same DL classifier
+ # vlan was used across multiple ONUs, eapol flow re-adds after
+ # flow delete (cases of onu reboot/disable) fails.
+ # In order to generate unique vlan, a combination of intf_id
+ # onu_id and uni_id is used.
+ # uni_id defaults to 0, so add 1 to it.
+ special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
+ # Assert that we do not generate invalid vlans under no condition
+ assert (special_vlan_downstream_flow >= 2, 'invalid-vlan-generated')
downlink_classifier = dict()
downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
@@ -640,7 +657,13 @@
downlink_action[PUSH_VLAN] = True
downlink_action[VLAN_VID] = vlan_id
- downlink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+
+ flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
+ gemport_id)
+
+ downlink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
downstream_flow = openolt_pb2.Flow(
access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
@@ -668,7 +691,7 @@
if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
flow_info = self._get_flow_info_as_json_blob(downstream_flow,
- EAPOL_PRIMARY_FLOW)
+ flow_store_cookie)
self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
downstream_flow.onu_id,
downstream_flow.uni_id,
@@ -715,7 +738,9 @@
# *********************************************
onu_id = -1
uni_id = -1
- flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id)
+ flow_store_cookie = self._get_flow_store_cookie(classifier)
+ flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
+ flow_store_cookie)
downstream_flow = openolt_pb2.Flow(
access_intf_id=-1, # access_intf_id not required
@@ -822,7 +847,7 @@
for flow in flows:
in_port = fd.get_in_port(flow)
out_port = fd.get_out_port(flow)
- if in_port == port and \
+ if in_port == port and out_port is not None and \
self.platform.intf_id_to_port_type_name(out_port) \
== Port.ETHERNET_NNI:
fields = fd.get_ofb_fields(flow)
@@ -984,11 +1009,14 @@
# the device
assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
- def _get_flow_info_as_json_blob(self, flow, flow_category):
+ def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
+ flow_category=None):
json_blob = MessageToDict(message=flow,
preserving_proto_field_name=True)
self.log.debug("flow-info", json_blob=json_blob)
- json_blob['flow_category'] = flow_category
+ json_blob['flow_store_cookie'] = flow_store_cookie
+ if flow_category is not None:
+ json_blob['flow_category'] = flow_category
flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
flow.onu_id, flow.uni_id, flow.flow_id)
@@ -1000,3 +1028,13 @@
flow_info.append(json_blob)
return flow_info
+
+ @staticmethod
+ def _get_flow_store_cookie(classifier, gem_port=None):
+ assert isinstance(classifier, dict)
+ # We need unique flows per gem_port
+ if gem_port is not None:
+ to_hash = dumps(classifier, sort_keys=True) + str(gem_port)
+ else:
+ to_hash = dumps(classifier, sort_keys=True)
+ return hashlib.md5(to_hash).hexdigest()[:12]
diff --git a/voltha/adapters/openolt/openolt_resource_manager.py b/voltha/adapters/openolt/openolt_resource_manager.py
index 49b353d..2b6e432 100644
--- a/voltha/adapters/openolt/openolt_resource_manager.py
+++ b/voltha/adapters/openolt/openolt_resource_manager.py
@@ -27,7 +27,7 @@
class OpenOltResourceMgr(object):
- BASE_PATH_KV_STORE = "openolt/{}" # openolt/<device_id>
+ BASE_PATH_KV_STORE = "service/voltha/openolt/{}" # service/voltha/openolt/<device_id>
def __init__(self, device_id, host_and_port, extra_args, device_info):
self.log = structlog.get_logger(id=device_id,
@@ -137,12 +137,33 @@
return onu_id
- def get_flow_id(self, pon_intf_id, onu_id, uni_id):
+ def get_flow_id(self, pon_intf_id, onu_id, uni_id, flow_store_cookie,
+ flow_category=None):
pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ try:
+ flow_ids = self.resource_mgrs[pon_intf_id]. \
+ get_current_flow_ids_for_onu(pon_intf_onu_id)
+ if flow_ids is not None:
+ for flow_id in flow_ids:
+ flows = self.get_flow_id_info(pon_intf_id, onu_id, uni_id, flow_id)
+ assert (isinstance(flows, list))
+ for flow in flows:
+
+ if flow_category is not None and \
+ 'flow_category' in flow and \
+ flow['flow_category'] == flow_category:
+ return flow_id
+ if flow['flow_store_cookie'] == flow_store_cookie:
+ return flow_id
+ except Exception as e:
+ self.log.error("error-retrieving-flow-info", e=e)
+
flow_id = self.resource_mgrs[pon_intf_id].get_resource_id(
- pon_intf_id, PONResourceManager.FLOW_ID)
+ pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
if flow_id is not None:
- self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(pon_intf_onu_id, flow_id)
+ self.resource_mgrs[pon_intf_id].update_flow_id_for_onu(
+ pon_intf_onu_id, flow_id
+ )
return flow_id
@@ -159,24 +180,6 @@
return self.resource_mgrs[pon_intf_id].update_flow_id_info_for_onu(
pon_intf_onu_id, flow_id, flow_data)
- def get_hsia_flow_for_uni(self, pon_intf_id, onu_id, uni_id, gemport_id):
- pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
- try:
- flow_ids = self.resource_mgrs[pon_intf_id]. \
- get_current_flow_ids_for_onu(pon_intf_onu_id)
- if flow_ids is not None:
- for flow_id in flow_ids:
- flows = self.get_flow_id_info(pon_intf_id, onu_id, uni_id, flow_id)
- assert (isinstance(flows, list))
- for flow in flows:
- if flow['flow_category'] == HSIA_FLOW and \
- flow['gemport_id'] == gemport_id:
- return flow_id
- except Exception as e:
- self.log.error("error-retrieving-flow-info", e=e)
-
- return self.get_flow_id(pon_intf_id, onu_id, uni_id)
-
def get_alloc_id(self, pon_intf_onu_id):
# Derive the pon_intf from the pon_intf_onu_id tuple
pon_intf = pon_intf_onu_id[0]
@@ -207,9 +210,16 @@
def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
pon_intf_id = pon_intf_onu_id[0]
- assert False, 'unused function'
return self.resource_mgrs[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_onu_id)
+ def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+ pon_intf_id = pon_intf_onu_id[0]
+ alloc_ids = self.resource_mgrs[pon_intf_id].get_current_alloc_ids_for_onu(pon_intf_onu_id)
+ if alloc_ids is None:
+ return None
+ # We support only one tcont at the moment
+ return alloc_ids[0]
+
def update_gemports_ponport_to_onu_map_on_kv_store(self, gemport_list, pon_port, onu_id, uni_id):
for gemport in gemport_list:
pon_intf_gemport = (pon_port, gemport)
@@ -286,6 +296,12 @@
PONResourceManager.GEMPORT_ID,
gemport_ids)
+ flow_ids = \
+ self.resource_mgrs[pon_intf_id].get_current_flow_ids_for_onu(pon_intf_id_onu_id)
+ self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
+ PONResourceManager.FLOW_ID,
+ flow_ids)
+
self.resource_mgrs[pon_intf_id].free_resource_id(pon_intf_id,
PONResourceManager.ONU_ID,
onu_id)