VOL-1221: OpenOLT Adapter/Driver will use a Technology Profile Instance to create the OLT Upstream and Downstream Queuing and Scheduling Constructs for a Bidirectional Flow.
Change-Id: Iaf1a782529e2c459c586b158bd4f6447f548e004
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 59c6c27..7e6c258 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -16,6 +16,7 @@
import copy
from twisted.internet import reactor
import grpc
+from google.protobuf.json_format import MessageToDict
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
@@ -25,27 +26,31 @@
from voltha.adapters.openolt.protos import openolt_pb2
from voltha.registry import registry
-HSIA_FLOW_INDEX = 0 # FIXME
-DHCP_FLOW_INDEX = 1 # FIXME
-DHCP_DOWNLINK_FLOW_INDEX = 6 # FIXME
-EAPOL_FLOW_INDEX = 2 # FIXME
-EAPOL_SECONDARY_FLOW_INDEX = 5 # FIXME
-DOWNSTREAM_FLOW_FOR_PACKET_OUT = 7
-LLDP_FLOW_ID = 0x3FF8 # FIXME (16376)
+from common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
+
+# Flow categories
+HSIA_FLOW = "HSIA_FLOW"
+DHCP_FLOW = "DHCP_FLOW"
+EAPOL_PRIMARY_FLOW = "EAPOL_PRIMARY_FLOW"
+EAPOL_SECONDARY_FLOW = "EAPOL_SECONDARY_FLOW"
+IGMP_FLOW = "IGMP_FLOW"
+LLDP_FLOW = "LLDP_FLOW"
EAP_ETH_TYPE = 0x888e
LLDP_ETH_TYPE = 0x88cc
+IGMP_PROTO = 2
+
# FIXME - see also BRDCM_DEFAULT_VLAN in broadcom_onu.py
DEFAULT_MGMT_VLAN = 4091
# Openolt Flow
-UPSTREAM = 'upstream'
-DOWNSTREAM = 'downstream'
-PACKET_TAG_TYPE = 'pkt_tag_type'
-UNTAGGED = 'untagged'
-SINGLE_TAG = 'single_tag'
-DOUBLE_TAG = 'double_tag'
+UPSTREAM = "upstream"
+DOWNSTREAM = "downstream"
+PACKET_TAG_TYPE = "pkt_tag_type"
+UNTAGGED = "untagged"
+SINGLE_TAG = "single_tag"
+DOUBLE_TAG = "double_tag"
# Classifier
ETH_TYPE = 'eth_type'
@@ -65,13 +70,14 @@
PUSH_VLAN = 'push_vlan'
TRAP_TO_HOST = 'trap_to_host'
-
+KV_STORE_TECH_PROFILE_PATH_PREFIX = 'voltha/technology_profiles'
class OpenOltFlowMgr(object):
- def __init__(self, log, stub, device_id, logical_device_id,
+ def __init__(self, adapter_agent, log, stub, device_id, logical_device_id,
platform, resource_mgr):
+ self.adapter_agent = adapter_agent
self.log = log
self.stub = stub
self.device_id = device_id
@@ -83,6 +89,8 @@
'/devices/{}/flows'.format(self.device_id))
self.root_proxy = registry('core').get_proxy('/')
self.resource_mgr = resource_mgr
+ self.tech_profile = dict()
+ self._populate_tech_profile_per_pon_port()
def add_flow(self, flow):
self.log.debug('add flow', flow=flow)
@@ -173,13 +181,12 @@
self.log.error('unsupported-action-type',
action_type=action.type, in_port=classifier_info[IN_PORT])
- if fd.get_goto_table_id(flow) is not None and not POP_VLAN in \
- action_info:
+ if fd.get_goto_table_id(flow) is not None and POP_VLAN not in action_info:
self.log.debug('being taken care of by ONU', flow=flow)
return
- if not OUTPUT in action_info and METADATA in classifier_info:
- #find flow in the next table
+ if OUTPUT not in action_info and METADATA in classifier_info:
+ # find flow in the next table
next_flow = self.find_next_flow(flow)
if next_flow is None:
return
@@ -188,14 +195,84 @@
if field.type == fd.VLAN_VID:
classifier_info[METADATA] = field.vlan_vid & 0xfff
-
(intf_id, onu_id) = self.platform.extract_access_from_flow(
classifier_info[IN_PORT], action_info[OUTPUT])
-
self.divide_and_add_flow(intf_id, onu_id, classifier_info,
action_info, flow)
+ def _is_uni_port(self, port_no):
+ try:
+ port = self.adapter_agent.get_logical_port(self.logical_device_id,
+ 'uni-{}'.format(port_no))
+ if port is not None:
+ return (not port.root_port), port.device_id
+ else:
+ return False, None
+ except Exception as e:
+ self.log.error("error-retrieving-port", e=e)
+ return False, None
+
+ def _clear_flow_id_from_rm(self, flow, flow_id, flow_direction):
+ uni_port_no = None
+ flow_category = HSIA_FLOW # default
+ child_device_id = None
+ if flow_direction == UPSTREAM:
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.IN_PORT:
+ is_uni, child_device_id = self._is_uni_port(field.port)
+ if is_uni:
+ uni_port_no = field.port
+ elif field.type == fd.IP_PROTO:
+ if field.ip_proto == IGMP_PROTO:
+ flow_category = IGMP_FLOW
+ elif field.type == fd.ETH_TYPE:
+ if field.eth_type == EAP_ETH_TYPE:
+ flow_category = EAPOL_PRIMARY_FLOW
+ elif field.eth_type == LLDP_ETH_TYPE:
+ flow_category = LLDP_FLOW
+
+ elif flow_direction == DOWNSTREAM:
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.METADATA:
+ uni_port = field.table_metadata & 0xFFFFFFFF
+ is_uni, child_device_id = self._is_uni_port(uni_port)
+ if is_uni:
+ uni_port_no = field.port
+
+ if uni_port_no is None:
+ for action in fd.get_actions(flow):
+ if action.type == fd.OUTPUT:
+ is_uni, child_device_id = \
+ self._is_uni_port(action.output.port)
+ if is_uni:
+ uni_port_no = action.output.port
+
+ if flow_category and child_device_id:
+ child_device = self.adapter_agent.get_device(child_device_id)
+ pon_intf = child_device.proxy_address.channel_id
+ onu_id = child_device.proxy_address.onu_id
+ flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id, flow_id)
+ assert (isinstance(flows, list))
+ self.log.debug("retrieved-flows", flows=flows)
+ for idx in range(len(flows)):
+ if flow_direction == flows[idx]['flow_type']:
+ flows.pop(idx)
+ self.update_flow_info_to_kv_store(pon_intf, onu_id,
+ flow_id, flows)
+ if len(flows) > 0:
+ # There are still flows referencing the same flow_id.
+ # So the flow should not be freed yet.
+ # For ex: Case of HSIA where same flow is shared
+ # between DS and US.
+ return
+
+ self.resource_mgr.free_flow_id(pon_intf, onu_id, flow_id)
+ else:
+ self.log.error("invalid-info", uni_port_no=uni_port_no,
+ flow_category=flow_category,
+ child_device_id=child_device_id)
+
def remove_flow(self, flow):
self.log.debug('trying to remove flows from logical flow :',
logical_flow=flow)
@@ -218,6 +295,10 @@
else:
raise grpc_e
+ # once we have successfully deleted the flow on the device
+ # release the flow_id on resource pool and also clear any
+ # data associated with the flow_id on KV store.
+ self._clear_flow_id_from_rm(f, id, direction)
self.log.debug('flow removed from device', flow=f,
flow_key=flow_to_remove)
@@ -233,203 +314,330 @@
flow_ids_removed=flows_ids_to_remove,
number_of_flows_removed=(len(device_flows) - len(
new_flows)), expected_flows_removed=len(
- device_flows_to_remove))
+ device_flows_to_remove))
else:
self.log.debug('no device flow to remove for this flow (normal '
'for multi table flows)', flow=flow)
+ def _get_ofp_port_name(self, intf_id, onu_id):
+ parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
+ child_device = self.adapter_agent.get_child_device(self.device_id,
+ parent_port_no=parent_port_no, onu_id=onu_id)
+ if child_device is None:
+ self.log.error("could-not-find-child-device",
+ parent_port_no=intf_id, onu_id=onu_id)
+ return None
+ # FIXME: Assumes single UNI for a ONU device which is visible at ONOS
+ ports = self.adapter_agent.get_ports(child_device.id, Port.ETHERNET_UNI)
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, ports[0].label)
+ ofp_port_name = logical_port.ofp_port.name
+ return ofp_port_name
+
def divide_and_add_flow(self, intf_id, onu_id, classifier,
action, flow):
self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id,
classifier=classifier, action=action)
- if IP_PROTO in classifier:
- if classifier[IP_PROTO] == 17:
- self.log.debug('dhcp flow add')
- self.add_dhcp_trap(intf_id, onu_id, classifier,
- action, flow)
- elif classifier[IP_PROTO] == 2:
- self.log.warn('igmp flow add ignored, not implemented yet')
- else:
- self.log.warn("Invalid-Classifier-to-handle",
- classifier=classifier,
- action=action)
- elif ETH_TYPE in classifier:
- if classifier[ETH_TYPE] == EAP_ETH_TYPE:
- self.log.debug('eapol flow add')
- self.add_eapol_flow(intf_id, onu_id, flow)
- vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
- if vlan_id is not None:
- self.add_eapol_flow(
- intf_id, onu_id, flow,
- eapol_id=EAPOL_SECONDARY_FLOW_INDEX,
- vlan_id=vlan_id)
- if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
- self.log.debug('lldp flow add')
- self.add_lldp_flow(flow)
+ alloc_id, gem_ports = self.create_tcont_gemport(intf_id, onu_id,
+ flow.table_id)
+ if alloc_id is None or gem_ports is None:
+ self.log.error("alloc-id-gem-ports-unavailable", alloc_id=alloc_id,
+ gem_ports=gem_ports)
+ return
- elif PUSH_VLAN in action:
- self.add_upstream_data_flow(intf_id, onu_id, classifier,
- action,
- flow)
- elif POP_VLAN in action:
- self.add_downstream_data_flow(intf_id, onu_id, classifier,
- action, flow)
- else:
- self.log.debug('Invalid-flow-type-to-handle',
- classifier=classifier,
- action=action, flow=flow)
+ self.log.debug('Generated required alloc and gemport ids',
+ alloc_id=alloc_id, gemports=gem_ports)
+
+ # Flows can't be added specific to gemport unless p-bits are received.
+ # Hence adding flows for all gemports
+ for gemport_id in gem_ports:
+ if IP_PROTO in classifier:
+ if classifier[IP_PROTO] == 17:
+ self.log.debug('dhcp flow add')
+ self.add_dhcp_trap(intf_id, onu_id, classifier,
+ action, flow, alloc_id, gemport_id)
+ elif classifier[IP_PROTO] == 2:
+ self.log.warn('igmp flow add ignored, not implemented yet')
+ else:
+ self.log.warn("Invalid-Classifier-to-handle",
+ classifier=classifier,
+ action=action)
+ elif ETH_TYPE in classifier:
+ if classifier[ETH_TYPE] == EAP_ETH_TYPE:
+ self.log.debug('eapol flow add')
+ self.add_eapol_flow(intf_id, onu_id, flow, alloc_id,
+ gemport_id)
+ vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
+ if vlan_id is not None:
+ self.add_eapol_flow(
+ intf_id, onu_id, flow, alloc_id, gemport_id,
+ eapol_flow_category=EAPOL_SECONDARY_FLOW,
+ vlan_id=vlan_id)
+ parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
+ onu_device = self.adapter_agent.get_child_device(self.device_id,
+ onu_id=onu_id,
+ parent_port_no=parent_port_no)
+ ofp_port_name = self._get_ofp_port_name(intf_id, onu_id)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return
+
+ # FIXME Should get Table id form the flow, as of now hardcoded to
+ # DEFAULT_TECH_PROFILE_TABLE_ID (64)
+ tp_path = KV_STORE_TECH_PROFILE_PATH_PREFIX + '/' + \
+ self.tech_profile[intf_id]. \
+ get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+
+ self.log.debug('Load-tech-profile-request-to-brcm-handler',
+ tp_path=tp_path)
+ msg = {'proxy_address': onu_device.proxy_address,
+ 'event': 'download_tech_profile', 'event_data': tp_path}
+
+ # Send the event message to the ONU adapter
+ self.adapter_agent.publish_inter_adapter_message(onu_device.id,
+ msg)
+
+ if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
+ self.log.debug('lldp flow add')
+ self.add_lldp_flow(flow)
+
+ elif PUSH_VLAN in action:
+ self.add_upstream_data_flow(intf_id, onu_id, classifier,
+ action, flow, alloc_id, gemport_id)
+ elif POP_VLAN in action:
+ self.add_downstream_data_flow(intf_id, onu_id, classifier,
+ action, flow, alloc_id, gemport_id)
+ else:
+ self.log.debug('Invalid-flow-type-to-handle',
+ classifier=classifier,
+ action=action, flow=flow)
+
+ def create_tcont_gemport(self, intf_id, onu_id, table_id):
+ alloc_id, gem_port_ids = None, None
+ try:
+ ofp_port_name = self._get_ofp_port_name(intf_id, onu_id)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return alloc_id, gem_port_ids
+ # FIXME: If table id is <= 63 using 64 as table id
+ if table_id < DEFAULT_TECH_PROFILE_TABLE_ID:
+ table_id = DEFAULT_TECH_PROFILE_TABLE_ID
+
+ # Check tech profile instance already exists for derived port name
+ tech_profile_instance = self.tech_profile[intf_id]. \
+ get_tech_profile_instance(table_id, ofp_port_name)
+ self.log.debug('Get-tech-profile-instance-status', tech_profile_instance=tech_profile_instance)
+
+ if tech_profile_instance is None:
+ # create tech profile instance
+ tech_profile_instance = self.tech_profile[intf_id]. \
+ create_tech_profile_instance(table_id, ofp_port_name,
+ intf_id)
+ if tech_profile_instance is not None:
+
+ # upstream scheduler
+ us_scheduler = self.tech_profile[intf_id].get_us_scheduler(
+ tech_profile_instance)
+ # downstream scheduler
+ ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
+ tech_profile_instance)
+ # create Tcont
+ tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
+ us_scheduler,
+ ds_scheduler)
+
+ self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
+ onu_id=onu_id,
+ tconts=tconts))
+ else:
+ raise Exception('Tech-profile-instance-creation-failed')
+ else:
+ self.log.debug(
+ 'Tech-profile-instance-already-exist-for-given port-name',
+ ofp_port_name=ofp_port_name)
+
+ # Fetch alloc id and gemports from tech profile instance
+ alloc_id = tech_profile_instance.us_scheduler.alloc_id
+ gem_port_ids = []
+ for i in range(len(
+ tech_profile_instance.upstream_gem_port_attribute_list)):
+ gem_port_ids.append(
+ tech_profile_instance.upstream_gem_port_attribute_list[i].
+ gemport_id)
+ except BaseException as e:
+ self.log.exception(exception=e)
+
+ # Update the allocated alloc_id and gem_port_id for the ONU to KV store
+ pon_intf_onu_id = (intf_id, onu_id)
+ self.resource_mgr.resource_mgrs[intf_id].update_alloc_ids_for_onu(
+ pon_intf_onu_id,
+ list([alloc_id])
+ )
+ self.resource_mgr.resource_mgrs[intf_id].update_gemport_ids_for_onu(
+ pon_intf_onu_id,
+ gem_port_ids
+ )
+
+ self.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(
+ gem_port_ids, intf_id, onu_id
+ )
+
+ return alloc_id, gem_port_ids
def add_upstream_data_flow(self, intf_id, onu_id, uplink_classifier,
- uplink_action, logical_flow):
-
+ uplink_action, logical_flow, alloc_id,
+ gemport_id):
uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
-
self.add_hsia_flow(intf_id, onu_id, uplink_classifier,
- uplink_action, UPSTREAM, HSIA_FLOW_INDEX,
- logical_flow)
+ uplink_action, UPSTREAM,
+ logical_flow, alloc_id, gemport_id)
# Secondary EAP on the subscriber vlan
(eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id)
if eap_active:
- self.add_eapol_flow(intf_id, onu_id, eap_logical_flow,
- eapol_id=EAPOL_SECONDARY_FLOW_INDEX,
- vlan_id=uplink_classifier[VLAN_VID])
-
+ self.add_eapol_flow(intf_id, onu_id, eap_logical_flow, alloc_id,
+ gemport_id, eapol_flow_category=EAPOL_SECONDARY_FLOW,
+ vlan_id=uplink_classifier[VLAN_VID])
def add_downstream_data_flow(self, intf_id, onu_id, downlink_classifier,
- downlink_action, flow):
+ downlink_action, flow, alloc_id, gemport_id):
downlink_classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
# Needed ???? It should be already there
downlink_action[POP_VLAN] = True
downlink_action[VLAN_VID] = downlink_classifier[VLAN_VID]
self.add_hsia_flow(intf_id, onu_id, downlink_classifier,
- downlink_action, DOWNSTREAM, HSIA_FLOW_INDEX,
- flow)
+ downlink_action, DOWNSTREAM,
+ flow, alloc_id, gemport_id)
- # To-Do right now only one GEM port is supported, so below method
- # will take care of handling all the p bits.
- # We need to revisit when mulitple gem port per p bits is needed.
- # Waiting for Technology profile
def add_hsia_flow(self, intf_id, onu_id, classifier, action,
- direction, hsia_id, logical_flow):
+ direction, logical_flow, alloc_id, gemport_id):
- pon_intf_onu_id = (intf_id, onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
-
- flow_id = self.platform.mk_flow_id(intf_id, onu_id, hsia_id)
+ flow_id = self.resource_mgr.get_hsia_flow_for_onu(intf_id, onu_id, gemport_id)
+ if flow_id is None:
+ self.log.error("hsia-flow-unavailable")
+ return
flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=flow_id, flow_type=direction,
- access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id,
- priority=logical_flow.priority,
- classifier=self.mk_classifier(classifier),
- action=self.mk_action(action))
+ access_intf_id=intf_id, onu_id=onu_id, flow_id=flow_id,
+ flow_type=direction, alloc_id=alloc_id, network_intf_id=0,
+ gemport_id=gemport_id,
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action),
+ priority=logical_flow.priority)
- self.add_flow_to_device(flow, logical_flow)
+ if self.add_flow_to_device(flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(flow, HSIA_FLOW)
+ self.update_flow_info_to_kv_store(flow.access_intf_id, flow.onu_id,
+ flow.flow_id, flow_info)
- def add_dhcp_trap(self, intf_id, onu_id, classifier, action, logical_flow):
+ def add_dhcp_trap(self, intf_id, onu_id, classifier, action, logical_flow,
+ alloc_id, gemport_id):
self.log.debug('add dhcp upstream trap', classifier=classifier,
intf_id=intf_id, onu_id=onu_id, action=action)
action.clear()
action[TRAP_TO_HOST] = True
+ classifier[UDP_SRC] = 68
+ classifier[UDP_DST] = 67
classifier[PACKET_TAG_TYPE] = SINGLE_TAG
classifier.pop(VLAN_VID, None)
pon_intf_onu_id = (intf_id, onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
-
- flow_id = self.platform.mk_flow_id(intf_id, onu_id, DHCP_FLOW_INDEX)
+ flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
dhcp_flow = openolt_pb2.Flow(
onu_id=onu_id, flow_id=flow_id, flow_type=UPSTREAM,
access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id,
+ alloc_id=alloc_id, network_intf_id=0,
priority=logical_flow.priority,
classifier=self.mk_classifier(classifier),
action=self.mk_action(action))
- self.add_flow_to_device(dhcp_flow, logical_flow)
+ if self.add_flow_to_device(dhcp_flow, logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(dhcp_flow, DHCP_FLOW)
+ self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
+ dhcp_flow.onu_id,
+ dhcp_flow.flow_id,
+ flow_info)
- def add_eapol_flow(self, intf_id, onu_id, logical_flow,
- eapol_id=EAPOL_FLOW_INDEX,
+ def add_eapol_flow(self, intf_id, onu_id, logical_flow, alloc_id,
+ gemport_id, eapol_flow_category=EAPOL_PRIMARY_FLOW,
vlan_id=DEFAULT_MGMT_VLAN):
- uplink_classifier = {}
+ uplink_classifier = dict()
uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
uplink_classifier[VLAN_VID] = vlan_id
- uplink_action = {}
+ uplink_action = dict()
uplink_action[TRAP_TO_HOST] = True
- # Add Upstream EAPOL Flow.
-
pon_intf_onu_id = (intf_id, onu_id)
- gemport_id = self.resource_mgr.get_gemport_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
- alloc_id = self.resource_mgr.get_alloc_id(
- pon_intf_onu_id=pon_intf_onu_id
- )
-
- uplink_flow_id = self.platform.mk_flow_id(intf_id, onu_id, eapol_id)
+ # Add Upstream EAPOL Flow.
+ if eapol_flow_category == EAPOL_PRIMARY_FLOW:
+ uplink_flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
+ else:
+ uplink_flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
upstream_flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=uplink_flow_id, flow_type=UPSTREAM,
- access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id,
- priority=logical_flow.priority,
+ access_intf_id=intf_id, onu_id=onu_id, flow_id=uplink_flow_id,
+ flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=0,
+ gemport_id=gemport_id,
classifier=self.mk_classifier(uplink_classifier),
- action=self.mk_action(uplink_action))
+ action=self.mk_action(uplink_action),
+ priority=logical_flow.priority)
logical_flow = copy.deepcopy(logical_flow)
logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
vlan_id | 0x1000)]))
logical_flow.match.type = OFPMT_OXM
- self.add_flow_to_device(upstream_flow, logical_flow)
+ if self.add_flow_to_device(upstream_flow, logical_flow):
+ if eapol_flow_category == EAPOL_PRIMARY_FLOW:
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ EAPOL_PRIMARY_FLOW)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.flow_id,
+ flow_info)
+ else:
+ flow_info = self._get_flow_info_as_json_blob(upstream_flow,
+ EAPOL_SECONDARY_FLOW)
+ self.update_flow_info_to_kv_store(upstream_flow.access_intf_id,
+ upstream_flow.onu_id,
+ upstream_flow.flow_id,
+ flow_info)
if vlan_id == DEFAULT_MGMT_VLAN:
-
# Add Downstream EAPOL Flow, Only for first EAP flow (BAL
# requirement)
special_vlan_downstream_flow = 4000 - onu_id
- downlink_classifier = {}
+ downlink_classifier = dict()
downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
downlink_classifier[VLAN_VID] = special_vlan_downstream_flow
- downlink_action = {}
+ downlink_action = dict()
downlink_action[PUSH_VLAN] = True
downlink_action[VLAN_VID] = vlan_id
- downlink_flow_id = self.platform.mk_flow_id(
- intf_id, onu_id, DOWNSTREAM_FLOW_FOR_PACKET_OUT)
+ pon_intf_onu_id = (intf_id, onu_id)
+ downlink_flow_id = self.resource_mgr.get_flow_id(pon_intf_onu_id)
downstream_flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=downlink_flow_id, flow_type=DOWNSTREAM,
- access_intf_id=intf_id, gemport_id=gemport_id,
- priority=logical_flow.priority,
+ access_intf_id=intf_id, onu_id=onu_id, flow_id=downlink_flow_id,
+ flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=0,
+ gemport_id=gemport_id,
classifier=self.mk_classifier(downlink_classifier),
- action=self.mk_action(downlink_action))
+ action=self.mk_action(downlink_action),
+ priority=logical_flow.priority)
downstream_logical_flow = ofp_flow_stats(
id=logical_flow.id, cookie=logical_flow.cookie,
@@ -438,14 +646,20 @@
downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
fd.in_port(fd.get_out_port(logical_flow)),
- fd.vlan_vid((special_vlan_downstream_flow) | 0x1000)]))
+ fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
downstream_logical_flow.match.type = OFPMT_OXM
downstream_logical_flow.instructions.extend(
fd.mk_instructions_from_actions([fd.output(
self.platform.mk_uni_port_num(intf_id, onu_id))]))
- self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+ if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(downstream_flow,
+ EAPOL_PRIMARY_FLOW)
+ self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
+ downstream_flow.onu_id,
+ downstream_flow.flow_id,
+ flow_info)
def repush_all_different_flows(self):
# Check if the device is supposed to have flows, if so add them
@@ -463,33 +677,48 @@
def reset_flows(self):
self.flows_proxy.update('/', Flows())
-
""" Add a downstream LLDP trap flow on the NNI interface
"""
+
def add_lldp_flow(self, logical_flow, network_intf_id=0):
- classifier = {}
+ classifier = dict()
classifier[ETH_TYPE] = LLDP_ETH_TYPE
classifier[PACKET_TAG_TYPE] = UNTAGGED
- action = {}
+ action = dict()
action[TRAP_TO_HOST] = True
- flow_id = LLDP_FLOW_ID # FIXME
+ # LLDP flow is installed to trap LLDP packets on the NNI port.
+ # We manage flow_id resource pool on per PON port basis.
+ # Since this situation is tricky, as a hack, we pass the NNI port
+ # index (network_intf_id) as PON port Index for the flow_id resource
+ # pool. Also, there is no ONU Id available for trapping LLDP packets
+ # on NNI port, use onu_id as -1 (invalid)
+ # ****************** CAVEAT *******************
+ # This logic works if the NNI Port Id falls within the same valid
+ # range of PON Port Ids. If this doesn't work for some OLT Vendor
+ # we need to have a re-look at this.
+ # *********************************************
+ onu_id = -1
+ intf_id_onu_id = (network_intf_id, onu_id)
+ flow_id = self.resource_mgr.get_flow_id(intf_id_onu_id)
downstream_flow = openolt_pb2.Flow(
- onu_id=-1, # onu_id not required
- gemport_id=-1, # gemport_id not required
access_intf_id=-1, # access_intf_id not required
+ onu_id=onu_id, # onu_id not required
flow_id=flow_id,
flow_type=DOWNSTREAM,
- priority=logical_flow.priority,
network_intf_id=network_intf_id,
+ gemport_id=-1, # gemport_id not required
classifier=self.mk_classifier(classifier),
- action=self.mk_action(action))
+ action=self.mk_action(action),
+ priority=logical_flow.priority)
self.log.debug('add lldp downstream trap', classifier=classifier,
action=action, flow=downstream_flow)
- self.add_flow_to_device(downstream_flow, logical_flow)
+ if self.add_flow_to_device(downstream_flow, logical_flow):
+ self.update_flow_info_to_kv_store(network_intf_id, onu_id,
+ flow_id, downstream_flow)
def mk_classifier(self, classifier_info):
@@ -562,9 +791,9 @@
intf_id=intf_id, eap_intf_id=eap_intf_id,
eap_onu_id=eap_onu_id)
if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id:
- return (True, flow)
+ return True, flow
- return (False, None)
+ return False, None
def get_subscriber_vlan(self, port):
self.log.debug('looking from subscriber flow for port', port=port)
@@ -574,7 +803,7 @@
in_port = fd.get_in_port(flow)
out_port = fd.get_out_port(flow)
if in_port == port and \
- self.platform.intf_id_to_port_type_name(out_port) \
+ self.platform.intf_id_to_port_type_name(out_port) \
== Port.ETHERNET_NNI:
fields = fd.get_ofb_fields(flow)
self.log.debug('subscriber flow found', fields=fields)
@@ -597,8 +826,15 @@
self.log.error('failed to add flow',
logical_flow=logical_flow, flow=flow,
grpc_error=grpc_e)
+ return False
else:
self.register_flow(logical_flow, flow)
+ return True
+
+ def update_flow_info_to_kv_store(self, intf_id, onu_id, flow_id, flow):
+ pon_intf_onu_id = (intf_id, onu_id)
+ self.resource_mgr.update_flow_id_info_for_onu(pon_intf_onu_id,
+ flow_id, flow)
def register_flow(self, logical_flow, device_flow):
self.log.debug('registering flow in device',
@@ -634,7 +870,6 @@
# FIXME
if fd.get_in_port(f) == fd.get_in_port(flow) and \
fd.get_out_port(f) == metadata:
-
next_flows.append(f)
if len(next_flows) == 0:
@@ -656,6 +891,51 @@
self.root_proxy.update('/devices/{}/flow_groups'.format(
device_id), FlowGroups(items=groups.values()))
+ def clear_flows_and_scheduler_for_logical_port(self, child_device, logical_port):
+ ofp_port_name = logical_port.ofp_port.name
+ pon_port = child_device.proxy_address.channel_id
+ onu_id = child_device.proxy_address.onu_id
+ # TODO: The DEFAULT_TECH_PROFILE_ID is assumed. Right way to do,
+ # is probably to maintain a list of Tech-profile table IDs associated
+ # with the UNI logical_port. This way, when the logical port is deleted,
+ # all the associated tech-profile configuration with the UNI logical_port
+ # can be cleared.
+ tech_profile_instance = self.tech_profile[pon_port]. \
+ get_tech_profile_instance(
+ DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+ flow_ids = self.resource_mgr.get_current_flow_ids_for_onu(pon_port,
+ onu_id)
+ self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
+ for flow_id in flow_ids:
+ flow_infos = self.resource_mgr.get_flow_id_info(pon_port,
+ onu_id,
+ flow_id)
+ for flow_info in flow_infos:
+ direction = flow_info['flow_type']
+ flow_to_remove = openolt_pb2.Flow(flow_id=flow_id,
+ flow_type=direction)
+ try:
+ self.stub.FlowRemove(flow_to_remove)
+ except grpc.RpcError as grpc_e:
+ if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
+ self.log.debug('This flow does not exist on the switch, '
+ 'normal after an OLT reboot',
+ flow=flow_to_remove)
+ else:
+ raise grpc_e
+
+ self.resource_mgr.free_flow_id(pon_port, onu_id, flow_id)
+
+ try:
+ tconts = self.tech_profile[pon_port].get_tconts(tech_profile_instance)
+ self.stub.RemoveTconts(openolt_pb2.Tconts(intf_id=pon_port,
+ onu_id=onu_id,
+ tconts=tconts))
+ except grpc.RpcError as grpc_e:
+ self.log.error('error-removing-tcont-scheduler-queues',
+ err=grpc_e)
+
def generate_stored_id(self, flow_id, direction):
if direction == UPSTREAM:
self.log.debug('upstream flow, shifting id')
@@ -669,6 +949,33 @@
def decode_stored_id(self, id):
if id >> 15 == 0x1:
- return (id & 0x7fff, UPSTREAM)
+ return id & 0x7fff, UPSTREAM
else:
- return (id, DOWNSTREAM)
+ return id, DOWNSTREAM
+
+ def _populate_tech_profile_per_pon_port(self):
+ for arange in self.resource_mgr.device_info.ranges:
+ for intf_id in arange.intf_ids:
+ self.tech_profile[intf_id] = \
+ self.resource_mgr.resource_mgrs[intf_id].tech_profile
+
+ # Make sure we have as many tech_profiles as there are pon ports on
+ # the device
+ assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
+
+ def _get_flow_info_as_json_blob(self, flow, flow_category):
+ json_blob = MessageToDict(message=flow,
+ preserving_proto_field_name=True)
+ self.log.debug("flow-info", json_blob=json_blob)
+ json_blob['flow_category'] = flow_category
+ flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
+ flow.onu_id, flow.flow_id)
+
+ if flow_info is None:
+ flow_info = list()
+ flow_info.append(json_blob)
+ else:
+ assert (isinstance(flow_info, list))
+ flow_info.append(json_blob)
+
+ return flow_info