VOL-1344:
1) Place all of resource manager and tech profile KV store data under /service/voltha
2) Ensure flow_ids are released on the KV store when device is deleted
3) Ensure pon resources are re-used on voltha restart
4) Few other code re-organization and bug fixes
Change-Id: Ia7bc8062d88b7a8eec5d4b87209536d81b115575
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 1b996e7..3b08edc 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -17,6 +17,8 @@
from twisted.internet import reactor
import grpc
from google.protobuf.json_format import MessageToDict
+import hashlib
+from simplejson import dumps
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
@@ -30,11 +32,6 @@
# Flow categories
HSIA_FLOW = "HSIA_FLOW"
-DHCP_FLOW = "DHCP_FLOW"
-EAPOL_PRIMARY_FLOW = "EAPOL_PRIMARY_FLOW"
-EAPOL_SECONDARY_FLOW = "EAPOL_SECONDARY_FLOW"
-IGMP_FLOW = "IGMP_FLOW"
-LLDP_FLOW = "LLDP_FLOW"
EAP_ETH_TYPE = 0x888e
LLDP_ETH_TYPE = 0x88cc
@@ -71,7 +68,6 @@
TRAP_TO_HOST = 'trap_to_host'
-
class OpenOltFlowMgr(object):
def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
@@ -215,7 +211,6 @@
def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
uni_port_no = None
- flow_category = HSIA_FLOW # default
child_device_id = None
if flow_direction == UPSTREAM:
for field in fd.get_ofb_fields(flow):
@@ -223,15 +218,6 @@
is_uni, child_device_id = self._is_uni_port(field.port)
if is_uni:
uni_port_no = field.port
- elif field.type == fd.IP_PROTO:
- if field.ip_proto == IGMP_PROTO:
- flow_category = IGMP_FLOW
- elif field.type == fd.ETH_TYPE:
- if field.eth_type == EAP_ETH_TYPE:
- flow_category = EAPOL_PRIMARY_FLOW
- elif field.eth_type == LLDP_ETH_TYPE:
- flow_category = LLDP_FLOW
-
elif flow_direction == DOWNSTREAM:
for field in fd.get_ofb_fields(flow):
if field.type == fd.METADATA:
@@ -248,7 +234,7 @@
if is_uni:
uni_port_no = action.output.port
- if flow_category and child_device_id:
+ if child_device_id:
child_device = self.adapter_agent.get_device(child_device_id)
pon_intf = child_device.proxy_address.channel_id
onu_id = child_device.proxy_address.onu_id
@@ -270,7 +256,6 @@
self.resource_mgr.free_flow_id_for_uni(pon_intf, onu_id, uni_id, flow_id)
else:
self.log.error("invalid-info", uni_port_no=uni_port_no,
- flow_category=flow_category,
child_device_id=child_device_id)
def remove_flow(self, flow):
@@ -333,6 +318,22 @@
ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
return ofp_port_name
+ def get_tp_path(self, intf_id, ofp_port_name):
+ # FIXME Should get Table id form the flow, as of now hardcoded to
+ # DEFAULT_TECH_PROFILE_TABLE_ID (64)
+ # 'tp_path' contains the suffix part of the tech_profile_instance path.
+ # The prefix to the 'tp_path' should be set to \
+ # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
+ return self.tech_profile[intf_id]. \
+ get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+
+ def delete_tech_profile_instance(self, intf_id, onu_id, uni_id):
+ # Remove the TP instance associated with the ONU
+ ofp_port_name = self._get_ofp_port_name(intf_id, onu_id, uni_id)
+ tp_path = self.get_tp_path(intf_id, ofp_port_name)
+ return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
+
def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
action, flow):
@@ -372,7 +373,6 @@
if vlan_id is not None:
self.add_eapol_flow(
intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
- eapol_flow_category=EAPOL_SECONDARY_FLOW,
vlan_id=vlan_id)
parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
onu_device = self.adapter_agent.get_child_device(self.device_id,
@@ -383,14 +383,7 @@
self.log.error("port-name-not-found")
return
- # FIXME Should get Table id form the flow, as of now hardcoded to
- # DEFAULT_TECH_PROFILE_TABLE_ID (64)
- # 'tp_path' contains the suffix part of the tech_profile_instance path.
- # The prefix to the 'tp_path' should be set to \
- # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
- tp_path = self.tech_profile[intf_id]. \
- get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
- ofp_port_name)
+ tp_path = self.get_tp_path(intf_id, ofp_port_name)
self.log.debug('Load-tech-profile-request-to-brcm-handler',
tp_path=tp_path)
@@ -418,6 +411,16 @@
def create_tcont_gemport(self, intf_id, onu_id, uni_id, table_id):
alloc_id, gem_port_ids = None, None
+ pon_intf_onu_id = (intf_id, onu_id)
+
+ # If we already have allocated alloc_id and gem_ports earlier, render them
+ alloc_id = \
+ self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+ gem_port_ids = \
+ self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_onu_id)
+ if alloc_id is not None and gem_port_ids is not None:
+ return alloc_id, gem_port_ids
+
try:
(ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
if ofp_port_name is None:
@@ -437,31 +440,30 @@
tech_profile_instance = self.tech_profile[intf_id]. \
create_tech_profile_instance(table_id, ofp_port_name,
intf_id)
- if tech_profile_instance is not None:
-
- # upstream scheduler
- us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
- tech_profile_instance)
- # downstream scheduler
- ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
- tech_profile_instance)
- # create Tcont
- tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
- us_scheduler,
- ds_scheduler)
-
- self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
- onu_id=onu_id,
- uni_id=uni_id,
- port_no=ofp_port_no,
- tconts=tconts))
- else:
+ if tech_profile_instance is None:
raise Exception('Tech-profile-instance-creation-failed')
else:
self.log.debug(
'Tech-profile-instance-already-exist-for-given port-name',
ofp_port_name=ofp_port_name)
+ # upstream scheduler
+ us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
+ tech_profile_instance)
+ # downstream scheduler
+ ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
+ tech_profile_instance)
+ # create Tcont
+ tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
+ us_scheduler,
+ ds_scheduler)
+
+ self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
+ onu_id=onu_id,
+ uni_id=uni_id,
+ port_no=ofp_port_no,
+ tconts=tconts))
+
# Fetch alloc id and gemports from tech profile instance
alloc_id = tech_profile_instance.us_scheduler.alloc_id
gem_port_ids = []
@@ -469,7 +471,7 @@
tech_profile_instance.upstream_gem_port_attribute_list)):
gem_port_ids.append(
tech_profile_instance.upstream_gem_port_attribute_list[i].
- gemport_id)
+ gemport_id)
except BaseException as e:
self.log.exception(exception=e)
@@ -504,8 +506,7 @@
(eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
if eap_active:
self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
- gemport_id, eapol_flow_category=EAPOL_SECONDARY_FLOW,
- vlan_id=uplink_classifier[VLAN_VID])
+ gemport_id, vlan_id=uplink_classifier[VLAN_VID])
def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, downlink_classifier,
downlink_action, flow, alloc_id, gemport_id):
@@ -521,7 +522,18 @@
def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier, action,
direction, logical_flow, alloc_id, gemport_id):
- flow_id = self.resource_mgr.get_hsia_flow_for_uni(intf_id, onu_id, uni_id, gemport_id)
+ flow_store_cookie = self._get_flow_store_cookie(classifier,
+ gemport_id)
+
+ # One of the OLT platform (Broadcom BAL) requires that symmetric
+ # flows require the same flow_id to be used across UL and DL.
+ # Since HSIA flow is the only symmetric flow currently, we need to
+ # re-use the flow_id across both direction. The 'flow_category'
+ # takes priority over flow_cookie to find any available HSIA_FLOW
+ # id for the ONU.
+ flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id,
+ flow_store_cookie,
+ HSIA_FLOW)
if flow_id is None:
self.log.error("hsia-flow-unavailable")
return
@@ -537,8 +549,11 @@
cookie=logical_flow.cookie)
if self.add_flow_to_device(flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(flow, HSIA_FLOW)
- self.update_flow_info_to_kv_store(flow.access_intf_id, flow.onu_id, flow.uni_id,
+ flow_info = self._get_flow_info_as_json_blob(flow,
+ flow_store_cookie,
+ HSIA_FLOW)
+ self.update_flow_info_to_kv_store(flow.access_intf_id,
+ flow.onu_id, flow.uni_id,
flow.flow_id, flow_info)
def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
@@ -554,7 +569,12 @@
classifier[PACKET_TAG_TYPE] = SINGLE_TAG
classifier.pop(VLAN_VID, None)
- flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+ flow_store_cookie = self._get_flow_store_cookie(classifier,
+ gemport_id)
+
+ flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
dhcp_flow = openolt_pb2.Flow(
onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
@@ -567,7 +587,7 @@
cookie=logical_flow.cookie)
if self.add_flow_to_device(dhcp_flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(dhcp_flow, DHCP_FLOW)
+ flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
dhcp_flow.onu_id,
dhcp_flow.uni_id,
@@ -575,8 +595,7 @@
flow_info)
def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
- gemport_id, eapol_flow_category=EAPOL_PRIMARY_FLOW,
- vlan_id=DEFAULT_MGMT_VLAN):
+ gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
uplink_classifier = dict()
uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
@@ -586,11 +605,12 @@
uplink_action = dict()
uplink_action[TRAP_TO_HOST] = True
+ flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
+ gemport_id)
# Add Upstream EAPOL Flow.
- if eapol_flow_category == EAPOL_PRIMARY_FLOW:
- uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
- else:
- uplink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+ uplink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
upstream_flow = openolt_pb2.Flow(
access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
@@ -608,29 +628,26 @@
logical_flow.match.type = OFPMT_OXM
if self.add_flow_to_device(upstream_flow, logical_flow):
- if eapol_flow_category == EAPOL_PRIMARY_FLOW:
- flow_info = self._get_flow_info_as_json_blob(upstream_flow,
- EAPOL_PRIMARY_FLOW)
- self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
- upstream_flow.onu_id,
- upstream_flow.uni_id,
- upstream_flow.flow_id,
- flow_info)
- else:
- flow_info = self._get_flow_info_as_json_blob(upstream_flow,
- EAPOL_SECONDARY_FLOW)
- self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
- upstream_flow.onu_id,
- upstream_flow.uni_id,
- upstream_flow.flow_id,
- flow_info)
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ flow_store_cookie)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.uni_id,
+ upstream_flow.flow_id,
+ flow_info)
if vlan_id == DEFAULT_MGMT_VLAN:
# Add Downstream EAPOL Flow, Only for first EAP flow (BAL
# requirement)
- # FIXME this is actually not used. But, here to be consistent with the upstream tagging from ONU.
- # It needs refactored to be completely removed
- special_vlan_downstream_flow = 4091 # 4000 - onu_id
+ # On one of the platforms (Broadcom BAL), when same DL classifier
+ # vlan was used across multiple ONUs, eapol flow re-adds after
+ # flow delete (cases of onu reboot/disable) fails.
+ # In order to generate unique vlan, a combination of intf_id
+ # onu_id and uni_id is used.
+ # uni_id defaults to 0, so add 1 to it.
+ special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
+ # Assert that we do not generate invalid vlans under no condition
+ assert (special_vlan_downstream_flow >= 2, 'invalid-vlan-generated')
downlink_classifier = dict()
downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
@@ -640,7 +657,13 @@
downlink_action[PUSH_VLAN] = True
downlink_action[VLAN_VID] = vlan_id
- downlink_flow_id = self.resource_mgr.get_flow_id(intf_id, onu_id, uni_id)
+
+ flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
+ gemport_id)
+
+ downlink_flow_id = self.resource_mgr.get_flow_id(
+ intf_id, onu_id, uni_id, flow_store_cookie
+ )
downstream_flow = openolt_pb2.Flow(
access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
@@ -668,7 +691,7 @@
if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
flow_info = self._get_flow_info_as_json_blob(downstream_flow,
- EAPOL_PRIMARY_FLOW)
+ flow_store_cookie)
self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
downstream_flow.onu_id,
downstream_flow.uni_id,
@@ -715,7 +738,9 @@
# *********************************************
onu_id = -1
uni_id = -1
- flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id)
+ flow_store_cookie = self._get_flow_store_cookie(classifier)
+ flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
+ flow_store_cookie)
downstream_flow = openolt_pb2.Flow(
access_intf_id=-1, # access_intf_id not required
@@ -822,7 +847,7 @@
for flow in flows:
in_port = fd.get_in_port(flow)
out_port = fd.get_out_port(flow)
- if in_port == port and \
+ if in_port == port and out_port is not None and \
self.platform.intf_id_to_port_type_name(out_port) \
== Port.ETHERNET_NNI:
fields = fd.get_ofb_fields(flow)
@@ -984,11 +1009,14 @@
# the device
assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
- def _get_flow_info_as_json_blob(self, flow, flow_category):
+ def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
+ flow_category=None):
json_blob = MessageToDict(message=flow,
preserving_proto_field_name=True)
self.log.debug("flow-info", json_blob=json_blob)
- json_blob['flow_category'] = flow_category
+ json_blob['flow_store_cookie'] = flow_store_cookie
+ if flow_category is not None:
+ json_blob['flow_category'] = flow_category
flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
flow.onu_id, flow.uni_id, flow.flow_id)
@@ -1000,3 +1028,13 @@
flow_info.append(json_blob)
return flow_info
+
+ @staticmethod
+ def _get_flow_store_cookie(classifier, gem_port=None):
+ assert isinstance(classifier, dict)
+ # We need unique flows per gem_port
+ if gem_port is not None:
+ to_hash = dumps(classifier, sort_keys=True) + str(gem_port)
+ else:
+ to_hash = dumps(classifier, sort_keys=True)
+ return hashlib.md5(to_hash).hexdigest()[:12]