pep8 cleanup of openolt_flow_mgr
Change-Id: Ibe6652d453cab7b740b2daae2e55425f6f5750eb
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 410c86a..b15d70c 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -21,7 +21,7 @@
from simplejson import dumps
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
- ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT, \
+ ofp_flow_stats, OFPMT_OXM, Flows, FlowGroups, \
OFPXMT_OFB_VLAN_VID
from voltha.protos.device_pb2 import Port
import voltha.core.flow_decomposer as fd
@@ -157,7 +157,8 @@
action_info[PUSH_VLAN] = True
action_info[TPID] = action.push.ethertype
self.log.debug('action-type-push-vlan',
- push_tpid=action_info[TPID], in_port=classifier_info[IN_PORT])
+ push_tpid=action_info[TPID],
+ in_port=classifier_info[IN_PORT])
if action.push.ethertype != 0x8100:
self.log.error('unhandled-tpid',
ethertype=action.push.ethertype)
@@ -177,9 +178,11 @@
field_type=_field.type)
else:
self.log.error('unsupported-action-type',
- action_type=action.type, in_port=classifier_info[IN_PORT])
+ action_type=action.type,
+ in_port=classifier_info[IN_PORT])
- if fd.get_goto_table_id(flow) is not None and POP_VLAN not in action_info:
+ if fd.get_goto_table_id(flow) is not None \
+ and POP_VLAN not in action_info:
self.log.debug('being taken care of by ONU', flow=flow)
return
@@ -193,17 +196,20 @@
if field.type == fd.VLAN_VID:
classifier_info[METADATA] = field.vlan_vid & 0xfff
- self.log.debug('flow-ports', classifier_inport=classifier_info[IN_PORT], action_output=action_info[OUTPUT])
- (port_no, intf_id, onu_id, uni_id) = self.platform.extract_access_from_flow(
+ self.log.debug('flow-ports',
+ classifier_inport=classifier_info[IN_PORT],
+ action_output=action_info[OUTPUT])
+ (port_no, intf_id, onu_id, uni_id) \
+ = self.platform.extract_access_from_flow(
classifier_info[IN_PORT], action_info[OUTPUT])
- self.divide_and_add_flow(intf_id, onu_id, uni_id, port_no, classifier_info,
- action_info, flow)
+ self.divide_and_add_flow(intf_id, onu_id, uni_id, port_no,
+ classifier_info, action_info, flow)
def _is_uni_port(self, port_no):
try:
- port = self.adapter_agent.get_logical_port(self.logical_device_id,
- 'uni-{}'.format(port_no))
+ port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, 'uni-{}'.format(port_no))
if port is not None:
return (not port.root_port), port.device_id
else:
@@ -241,14 +247,19 @@
child_device = self.adapter_agent.get_device(child_device_id)
pon_intf = child_device.proxy_address.channel_id
onu_id = child_device.proxy_address.onu_id
- uni_id = self.platform.uni_id_from_port_num(uni_port_no) if uni_port_no is not None else None
- flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id, uni_id, flow_id)
+ uni_id = self.platform.uni_id_from_port_num(uni_port_no) \
+ if uni_port_no is not None else None
+ uni_id = self.platform.uni_id_from_port_num(uni_port_no) \
+ if uni_port_no is not None else None
+ flows = self.resource_mgr.get_flow_id_info(pon_intf, onu_id,
+ uni_id, flow_id)
assert (isinstance(flows, list))
self.log.debug("retrieved-flows", flows=flows)
for idx in range(len(flows)):
if flow_direction == flows[idx]['flow_type']:
flows.pop(idx)
- self.update_flow_info_to_kv_store(pon_intf, onu_id, uni_id, flow_id, flows)
+ self.update_flow_info_to_kv_store(pon_intf, onu_id,
+ uni_id, flow_id, flows)
if len(flows) > 0:
# There are still flows referencing the same flow_id.
# So the flow should not be freed yet.
@@ -308,23 +319,26 @@
flow_ids_removed=flows_ids_to_remove,
number_of_flows_removed=(len(device_flows) - len(
new_flows)), expected_flows_removed=len(
- device_flows_to_remove))
+ device_flows_to_remove))
else:
self.log.debug('no device flow to remove for this flow (normal '
'for multi table flows)', flow=flow)
def _get_ofp_port_name(self, intf_id, onu_id, uni_id):
- parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
- child_device = self.adapter_agent.get_child_device(self.device_id,
- parent_port_no=parent_port_no, onu_id=onu_id)
+ parent_port_no = self.platform.intf_id_to_port_no(intf_id,
+ Port.PON_OLT)
+ child_device = self.adapter_agent.get_child_device(
+ self.device_id, parent_port_no=parent_port_no, onu_id=onu_id)
if child_device is None:
self.log.error("could-not-find-child-device",
parent_port_no=intf_id, onu_id=onu_id)
return (None, None)
- ports = self.adapter_agent.get_ports(child_device.id, Port.ETHERNET_UNI)
+ ports = self.adapter_agent.get_ports(child_device.id,
+ Port.ETHERNET_UNI)
logical_port = self.adapter_agent.get_logical_port(
self.logical_device_id, ports[uni_id].label)
- ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
+ ofp_port_name = (logical_port.ofp_port.name,
+ logical_port.ofp_port.port_no)
return ofp_port_name
def get_tp_path(self, intf_id, ofp_port_name, techprofile_id):
@@ -332,23 +346,28 @@
get_tp_path(techprofile_id,
ofp_port_name)
- def delete_tech_profile_instance(self, intf_id, onu_id, uni_id, ofp_port_name):
+ def delete_tech_profile_instance(self, intf_id, onu_id, uni_id,
+ ofp_port_name):
# Remove the TP instance associated with the ONU
if ofp_port_name is None:
- ofp_port_name, ofp_port_no = self._get_ofp_port_name(intf_id, onu_id, uni_id)
- tp_id = self.resource_mgr.get_tech_profile_id_for_onu(intf_id, onu_id, uni_id)
+ ofp_port_name, ofp_port_no = self._get_ofp_port_name(intf_id,
+ onu_id,
+ uni_id)
+ tp_id = self.resource_mgr.get_tech_profile_id_for_onu(intf_id, onu_id,
+ uni_id)
tp_path = self.get_tp_path(intf_id, ofp_port_name, tp_id)
- self.log.debug(" tp-path-in-delete",tp_path=tp_path)
+ self.log.debug(" tp-path-in-delete", tp_path=tp_path)
return self.tech_profile[intf_id].delete_tech_profile_instance(tp_path)
def divide_and_add_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
action, flow):
- self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, port_no=port_no,
- classifier=classifier, action=action)
+ self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id,
+ uni_id=uni_id, port_no=port_no, classifier=classifier,
+ action=action)
- alloc_id, gem_ports = self.create_tcont_gemport(intf_id, onu_id, uni_id,
- flow.table_id)
+ alloc_id, gem_ports = self.create_tcont_gemport(intf_id, onu_id,
+ uni_id, flow.table_id)
if alloc_id is None or gem_ports is None:
self.log.error("alloc-id-gem-ports-unavailable", alloc_id=alloc_id,
gem_ports=gem_ports)
@@ -363,8 +382,9 @@
if IP_PROTO in classifier:
if classifier[IP_PROTO] == 17:
self.log.debug('dhcp flow add')
- self.add_dhcp_trap(intf_id, onu_id, uni_id, port_no, classifier,
- action, flow, alloc_id, gemport_id)
+ self.add_dhcp_trap(intf_id, onu_id, uni_id, port_no,
+ classifier, action, flow, alloc_id,
+ gemport_id)
elif classifier[IP_PROTO] == 2:
self.log.warn('igmp flow add ignored, not implemented yet')
else:
@@ -374,32 +394,36 @@
elif ETH_TYPE in classifier:
if classifier[ETH_TYPE] == EAP_ETH_TYPE:
self.log.debug('eapol flow add')
- self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, flow, alloc_id,
- gemport_id)
+ self.add_eapol_flow(intf_id, onu_id, uni_id, port_no,
+ flow, alloc_id, gemport_id)
vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
if vlan_id is not None:
- self.add_eapol_flow(
- intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
- vlan_id=vlan_id)
- parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
- onu_device = self.adapter_agent.get_child_device(self.device_id,
- onu_id=onu_id,
- parent_port_no=parent_port_no)
- (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
+ self.add_eapol_flow(intf_id, onu_id, uni_id, port_no,
+ flow, alloc_id, gemport_id,
+ vlan_id=vlan_id)
+ parent_port_no = self.platform.intf_id_to_port_no(
+ intf_id, Port.PON_OLT)
+ onu_device = self.adapter_agent.get_child_device(
+ self.device_id, onu_id=onu_id,
+ parent_port_no=parent_port_no)
+ (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(
+ intf_id, onu_id, uni_id)
if ofp_port_name is None:
self.log.error("port-name-not-found")
return
- tp_id = self.resource_mgr.get_tech_profile_id_for_onu(intf_id, onu_id, uni_id)
+ tp_id = self.resource_mgr.get_tech_profile_id_for_onu(
+ intf_id, onu_id, uni_id)
tp_path = self.get_tp_path(intf_id, ofp_port_name, tp_id)
self.log.debug('Load-tech-profile-request-to-brcm-handler',
tp_path=tp_path)
- msg = {'proxy_address': onu_device.proxy_address, 'uni_id': uni_id,
- 'event': 'download_tech_profile', 'event_data': tp_path}
+ msg = {'proxy_address': onu_device.proxy_address,
+ 'uni_id': uni_id, 'event': 'download_tech_profile',
+ 'event_data': tp_path}
# Send the event message to the ONU adapter
- self.adapter_agent.publish_inter_adapter_message(onu_device.id,
- msg)
+ self.adapter_agent.publish_inter_adapter_message(
+ onu_device.id, msg)
if classifier[ETH_TYPE] == LLDP_ETH_TYPE:
self.log.debug('lldp flow add')
@@ -407,11 +431,13 @@
self.add_lldp_flow(flow, port_no, nni_intf_id)
elif PUSH_VLAN in action:
- self.add_upstream_data_flow(intf_id, onu_id, uni_id, port_no, classifier,
- action, flow, alloc_id, gemport_id)
+ self.add_upstream_data_flow(intf_id, onu_id, uni_id, port_no,
+ classifier, action, flow, alloc_id,
+ gemport_id)
elif POP_VLAN in action:
- self.add_downstream_data_flow(intf_id, onu_id, uni_id, port_no, classifier,
- action, flow, alloc_id, gemport_id)
+ self.add_downstream_data_flow(intf_id, onu_id, uni_id, port_no,
+ classifier, action, flow,
+ alloc_id, gemport_id)
else:
self.log.debug('Invalid-flow-type-to-handle',
classifier=classifier,
@@ -421,7 +447,8 @@
alloc_id, gem_port_ids = None, None
pon_intf_onu_id = (intf_id, onu_id)
- # If we already have allocated alloc_id and gem_ports earlier, render them
+ # If we already have allocated alloc_id and gem_ports earlier,
+ # render them
alloc_id = \
self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
gem_port_ids = \
@@ -430,7 +457,9 @@
return alloc_id, gem_port_ids
try:
- (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id, onu_id, uni_id)
+ (ofp_port_name, ofp_port_no) = self._get_ofp_port_name(intf_id,
+ onu_id,
+ uni_id)
if ofp_port_name is None:
self.log.error("port-name-not-found")
return alloc_id, gem_port_ids
@@ -441,7 +470,8 @@
# Check tech profile instance already exists for derived port name
tech_profile_instance = self.tech_profile[intf_id]. \
get_tech_profile_instance(table_id, ofp_port_name)
- self.log.debug('Get-tech-profile-instance-status', tech_profile_instance=tech_profile_instance)
+ self.log.debug('Get-tech-profile-instance-status',
+ tech_profile_instance=tech_profile_instance)
if tech_profile_instance is None:
# create tech profile instance
@@ -462,9 +492,8 @@
ds_scheduler = self.tech_profile[intf_id].get_ds_scheduler(
tech_profile_instance)
# create Tcont
- tconts = self.tech_profile[intf_id].get_tconts(tech_profile_instance,
- us_scheduler,
- ds_scheduler)
+ tconts = self.tech_profile[intf_id].get_tconts(
+ tech_profile_instance, us_scheduler, ds_scheduler)
self.stub.CreateTconts(openolt_pb2.Tconts(intf_id=intf_id,
onu_id=onu_id,
@@ -483,7 +512,8 @@
except BaseException as e:
self.log.exception(exception=e)
- # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store
+ # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV
+ # store
pon_intf_onu_id = (intf_id, onu_id, uni_id)
self.resource_mgr.resource_mgrs[intf_id].update_alloc_ids_for_onu(
pon_intf_onu_id,
@@ -500,9 +530,9 @@
return alloc_id, gem_port_ids
- def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no, uplink_classifier,
- uplink_action, logical_flow, alloc_id,
- gemport_id):
+ def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no,
+ uplink_classifier, uplink_action, logical_flow,
+ alloc_id, gemport_id):
uplink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
@@ -511,29 +541,33 @@
logical_flow, alloc_id, gemport_id)
# Secondary EAP on the subscriber vlan
- (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
+ (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id,
+ uni_id)
if eap_active:
- self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
- gemport_id, vlan_id=uplink_classifier[VLAN_VID])
+ self.add_eapol_flow(intf_id, onu_id, uni_id, port_no,
+ eap_logical_flow, alloc_id, gemport_id,
+ vlan_id=uplink_classifier[VLAN_VID])
- def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no, downlink_classifier,
- downlink_action, flow, alloc_id, gemport_id):
+ def add_downstream_data_flow(self, intf_id, onu_id, uni_id, port_no,
+ downlink_classifier, downlink_action, flow,
+ alloc_id, gemport_id):
downlink_classifier[PACKET_TAG_TYPE] = DOUBLE_TAG
# Needed ???? It should be already there
downlink_action[POP_VLAN] = True
downlink_action[VLAN_VID] = downlink_classifier[VLAN_VID]
- self.add_hsia_flow(intf_id, onu_id, uni_id, port_no, downlink_classifier,
- downlink_action, DOWNSTREAM,
+ self.add_hsia_flow(intf_id, onu_id, uni_id, port_no,
+ downlink_classifier, downlink_action, DOWNSTREAM,
flow, alloc_id, gemport_id)
- def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier, action,
- direction, logical_flow, alloc_id, gemport_id):
+ def add_hsia_flow(self, intf_id, onu_id, uni_id, port_no, classifier,
+ action, direction, logical_flow, alloc_id, gemport_id):
flow_store_cookie = self._get_flow_store_cookie(classifier,
gemport_id)
- if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id,
+ uni_id,
flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
@@ -552,14 +586,12 @@
return
flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
- flow_type=direction, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
- gemport_id=gemport_id,
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
+ flow_id=flow_id, flow_type=direction, alloc_id=alloc_id,
+ network_intf_id=self.get_nni_intf_id(), gemport_id=gemport_id,
classifier=self.mk_classifier(classifier),
- action=self.mk_action(action),
- priority=logical_flow.priority,
- port_no=port_no,
- cookie=logical_flow.cookie)
+ action=self.mk_action(action), priority=logical_flow.priority,
+ port_no=port_no, cookie=logical_flow.cookie)
if self.add_flow_to_device(flow, logical_flow):
flow_info = self._get_flow_info_as_json_blob(flow,
@@ -569,11 +601,12 @@
flow.onu_id, flow.uni_id,
flow.flow_id, flow_info)
- def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier, action, logical_flow,
- alloc_id, gemport_id):
+ def add_dhcp_trap(self, intf_id, onu_id, uni_id, port_no, classifier,
+ action, logical_flow, alloc_id, gemport_id):
self.log.debug('add dhcp upstream trap', classifier=classifier,
- intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, action=action)
+ intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
+ action=action)
action.clear()
action[TRAP_TO_HOST] = True
@@ -584,7 +617,8 @@
flow_store_cookie = self._get_flow_store_cookie(classifier,
gemport_id)
- if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id,
+ uni_id,
flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
@@ -593,9 +627,10 @@
)
dhcp_flow = openolt_pb2.Flow(
- onu_id=onu_id, uni_id=uni_id, flow_id=flow_id, flow_type=UPSTREAM,
- access_intf_id=intf_id, gemport_id=gemport_id,
- alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+ onu_id=onu_id, uni_id=uni_id, flow_id=flow_id,
+ flow_type=UPSTREAM, access_intf_id=intf_id,
+ gemport_id=gemport_id, alloc_id=alloc_id,
+ network_intf_id=self.get_nni_intf_id(),
priority=logical_flow.priority,
classifier=self.mk_classifier(classifier),
action=self.mk_action(action),
@@ -603,15 +638,16 @@
cookie=logical_flow.cookie)
if self.add_flow_to_device(dhcp_flow, logical_flow):
- flow_info = self._get_flow_info_as_json_blob(dhcp_flow, flow_store_cookie)
+ flow_info = self._get_flow_info_as_json_blob(dhcp_flow,
+ flow_store_cookie)
self.update_flow_info_to_kv_store(dhcp_flow.access_intf_id,
dhcp_flow.onu_id,
dhcp_flow.uni_id,
dhcp_flow.flow_id,
flow_info)
- def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow, alloc_id,
- gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
+ def add_eapol_flow(self, intf_id, onu_id, uni_id, port_no, logical_flow,
+ alloc_id, gemport_id, vlan_id=DEFAULT_MGMT_VLAN):
uplink_classifier = dict()
uplink_classifier[ETH_TYPE] = EAP_ETH_TYPE
@@ -624,7 +660,8 @@
flow_store_cookie = self._get_flow_store_cookie(uplink_classifier,
gemport_id)
- if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
+ if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id,
+ uni_id,
flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
@@ -634,9 +671,9 @@
)
upstream_flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=uplink_flow_id,
- flow_type=UPSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
- gemport_id=gemport_id,
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
+ flow_id=uplink_flow_id, flow_type=UPSTREAM, alloc_id=alloc_id,
+ network_intf_id=self.get_nni_intf_id(), gemport_id=gemport_id,
classifier=self.mk_classifier(uplink_classifier),
action=self.mk_action(uplink_action),
priority=logical_flow.priority,
@@ -668,7 +705,7 @@
# uni_id defaults to 0, so add 1 to it.
special_vlan_downstream_flow = 4090 - intf_id * onu_id * (uni_id+1)
# Assert that we do not generate invalid vlans under no condition
- assert (special_vlan_downstream_flow >= 2, 'invalid-vlan-generated')
+ assert special_vlan_downstream_flow >= 2
downlink_classifier = dict()
downlink_classifier[PACKET_TAG_TYPE] = SINGLE_TAG
@@ -678,11 +715,10 @@
downlink_action[PUSH_VLAN] = True
downlink_action[VLAN_VID] = vlan_id
-
- flow_store_cookie = self._get_flow_store_cookie(downlink_classifier,
- gemport_id)
- if self.resource_mgr.is_flow_cookie_on_kv_store(intf_id, onu_id, uni_id,
- flow_store_cookie):
+ flow_store_cookie = self._get_flow_store_cookie(
+ downlink_classifier, gemport_id)
+ if self.resource_mgr.is_flow_cookie_on_kv_store(
+ intf_id, onu_id, uni_id, flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
@@ -691,8 +727,9 @@
)
downstream_flow = openolt_pb2.Flow(
- access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id, flow_id=downlink_flow_id,
- flow_type=DOWNSTREAM, alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
+ access_intf_id=intf_id, onu_id=onu_id, uni_id=uni_id,
+ flow_id=downlink_flow_id, flow_type=DOWNSTREAM,
+ alloc_id=alloc_id, network_intf_id=self.get_nni_intf_id(),
gemport_id=gemport_id,
classifier=self.mk_classifier(downlink_classifier),
action=self.mk_action(downlink_action),
@@ -702,26 +739,28 @@
downstream_logical_flow = ofp_flow_stats(
id=logical_flow.id, cookie=logical_flow.cookie,
- table_id=logical_flow.table_id, priority=logical_flow.priority,
- flags=logical_flow.flags)
+ table_id=logical_flow.table_id,
+ priority=logical_flow.priority, flags=logical_flow.flags)
- downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
- fd.in_port(fd.get_out_port(logical_flow)),
- fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
+ downstream_logical_flow.match.oxm_fields.extend(
+ fd.mk_oxm_fields(
+ [fd.in_port(fd.get_out_port(logical_flow)),
+ fd.vlan_vid(special_vlan_downstream_flow | 0x1000)]))
downstream_logical_flow.match.type = OFPMT_OXM
downstream_logical_flow.instructions.extend(
fd.mk_instructions_from_actions([fd.output(
- self.platform.mk_uni_port_num(intf_id, onu_id, uni_id))]))
+ self.platform.mk_uni_port_num(intf_id, onu_id,
+ uni_id))]))
- if self.add_flow_to_device(downstream_flow, downstream_logical_flow):
- flow_info = self._get_flow_info_as_json_blob(downstream_flow,
- flow_store_cookie)
- self.update_flow_info_to_kv_store(downstream_flow.access_intf_id,
- downstream_flow.onu_id,
- downstream_flow.uni_id,
- downstream_flow.flow_id,
- flow_info)
+ if self.add_flow_to_device(downstream_flow,
+ downstream_logical_flow):
+ flow_info = self._get_flow_info_as_json_blob(
+ downstream_flow, flow_store_cookie)
+ self.update_flow_info_to_kv_store(
+ downstream_flow.access_intf_id, downstream_flow.onu_id,
+ downstream_flow.uni_id, downstream_flow.flow_id,
+ flow_info)
def repush_all_different_flows(self):
# Check if the device is supposed to have flows, if so add them
@@ -765,17 +804,17 @@
uni_id = -1
flow_store_cookie = self._get_flow_store_cookie(classifier)
- if self.resource_mgr.is_flow_cookie_on_kv_store(network_intf_id, onu_id, uni_id,
- flow_store_cookie):
+ if self.resource_mgr.is_flow_cookie_on_kv_store(
+ network_intf_id, onu_id, uni_id, flow_store_cookie):
self.log.debug('flow-exists--not-re-adding')
else:
- flow_id = self.resource_mgr.get_flow_id(network_intf_id, onu_id, uni_id,
- flow_store_cookie)
+ flow_id = self.resource_mgr.get_flow_id(
+ network_intf_id, onu_id, uni_id, flow_store_cookie)
downstream_flow = openolt_pb2.Flow(
access_intf_id=-1, # access_intf_id not required
- onu_id=onu_id, # onu_id not required
- uni_id=uni_id, # uni_id not used
+ onu_id=onu_id, # onu_id not required
+ uni_id=uni_id, # uni_id not used
flow_id=flow_id,
flow_type=DOWNSTREAM,
network_intf_id=network_intf_id,
@@ -787,12 +826,13 @@
cookie=logical_flow.cookie)
self.log.debug('add lldp downstream trap', classifier=classifier,
- action=action, flow=downstream_flow, port_no=port_no)
+ action=action, flow=downstream_flow,
+ port_no=port_no)
if self.add_flow_to_device(downstream_flow, logical_flow):
flow_info = self._get_flow_info_as_json_blob(downstream_flow,
flow_store_cookie)
- self.update_flow_info_to_kv_store(network_intf_id, onu_id, uni_id,
- flow_id, flow_info)
+ self.update_flow_info_to_kv_store(
+ network_intf_id, onu_id, uni_id, flow_id, flow_info)
def mk_classifier(self, classifier_info):
@@ -804,10 +844,10 @@
classifier.ip_proto = classifier_info[IP_PROTO]
if VLAN_VID in classifier_info and \
classifier_info[VLAN_VID] != RESERVED_VLAN:
- classifier.o_vid = classifier_info[VLAN_VID]
+ classifier.o_vid = classifier_info[VLAN_VID]
if METADATA in classifier_info and \
classifier_info[METADATA] != RESERVED_VLAN:
- classifier.i_vid = classifier_info[METADATA]
+ classifier.i_vid = classifier_info[METADATA]
if VLAN_PCP in classifier_info:
classifier.o_pbits = classifier_info[VLAN_PCP]
if UDP_SRC in classifier_info:
@@ -865,11 +905,12 @@
eap_uni_id = self.platform.uni_id_from_port_num(field.port)
if eap_flow:
- self.log.debug('eap flow detected', onu_id=onu_id, uni_id=uni_id,
- intf_id=intf_id, eap_intf_id=eap_intf_id,
- eap_onu_id=eap_onu_id,
+ self.log.debug('eap flow detected', onu_id=onu_id,
+ uni_id=uni_id, intf_id=intf_id,
+ eap_intf_id=eap_intf_id, eap_onu_id=eap_onu_id,
eap_uni_id=eap_uni_id)
- if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id and uni_id == eap_uni_id:
+ if eap_flow and intf_id == eap_intf_id \
+ and onu_id == eap_onu_id and uni_id == eap_uni_id:
return True, flow
return False, None
@@ -910,7 +951,8 @@
self.register_flow(logical_flow, flow)
return True
- def update_flow_info_to_kv_store(self, intf_id, onu_id, uni_id, flow_id, flow):
+ def update_flow_info_to_kv_store(self, intf_id, onu_id, uni_id, flow_id,
+ flow):
self.resource_mgr.update_flow_id_info(intf_id, onu_id, uni_id,
flow_id, flow)
@@ -931,10 +973,11 @@
def find_next_flow(self, flow):
table_id = fd.get_goto_table_id(flow)
metadata = 0
- # Prior to ONOS 1.13.5, Metadata contained the UNI output port number. In
- # 1.13.5 and later, the lower 32-bits is the output port number and the
- # upper 32-bits is the inner-vid we are looking for. Use just the lower 32
- # bits. Allows this code to work with pre- and post-1.13.5 ONOS OltPipeline
+ # Prior to ONOS 1.13.5, Metadata contained the UNI output port number.
+ # In 1.13.5 and later, the lower 32-bits is the output port number and
+ # the # upper 32-bits is the inner-vid we are looking for. Use just the
+ # lower 32 # bits. Allows this code to work with pre- and post-1.13.5
+ # ONOS OltPipeline
for field in fd.get_ofb_fields(flow):
if field.type == fd.METADATA:
@@ -954,7 +997,8 @@
self.log.warning('no next flow found, it may be a timing issue',
flow=flow, number_of_flows=len(flows))
if flow.id in self.retry_add_flow_list:
- self.log.debug('flow is already in retry list', flow_id=flow.id)
+ self.log.debug('flow is already in retry list',
+ flow_id=flow.id)
else:
self.retry_add_flow_list.append(flow.id)
reactor.callLater(5, self.retry_add_flow, flow)
@@ -973,7 +1017,8 @@
self.root_proxy.update('/devices/{}/flow_groups'.format(
device_id), FlowGroups(items=groups.values()))
- def clear_flows_and_scheduler_for_logical_port(self, child_device, logical_port):
+ def clear_flows_and_scheduler_for_logical_port(self, child_device,
+ logical_port):
ofp_port_name = logical_port.ofp_port.name
port_no = logical_port.ofp_port.port_no
pon_port = child_device.proxy_address.channel_id
@@ -982,18 +1027,21 @@
# TODO: The DEFAULT_TECH_PROFILE_ID is assumed. Right way to do,
# is probably to maintain a list of Tech-profile table IDs associated
- # with the UNI logical_port. This way, when the logical port is deleted,
- # all the associated tech-profile configuration with the UNI logical_port
- # can be cleared.
- tp_id = self.resource_mgr.get_tech_profile_id_for_onu(pon_port, onu_id, uni_id)
+ # with the UNI logical_port. This way, when the logical port is
+ # deleted, all the associated tech-profile configuration with the UNI
+ # logical_port can be cleared.
+ tp_id = self.resource_mgr.get_tech_profile_id_for_onu(pon_port, onu_id,
+ uni_id)
tech_profile_instance = self.tech_profile[pon_port]. \
get_tech_profile_instance(
tp_id,
ofp_port_name)
- flow_ids = self.resource_mgr.get_current_flow_ids(pon_port, onu_id, uni_id)
+ flow_ids = self.resource_mgr.get_current_flow_ids(pon_port, onu_id,
+ uni_id)
self.log.debug("outstanding-flows-to-be-cleared", flow_ids=flow_ids)
for flow_id in flow_ids:
- flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id, uni_id, flow_id)
+ flow_infos = self.resource_mgr.get_flow_id_info(pon_port, onu_id,
+ uni_id, flow_id)
for flow_info in flow_infos:
direction = flow_info['flow_type']
flow_to_remove = openolt_pb2.Flow(flow_id=flow_id,
@@ -1002,16 +1050,18 @@
self.stub.FlowRemove(flow_to_remove)
except grpc.RpcError as grpc_e:
if grpc_e.code() == grpc.StatusCode.NOT_FOUND:
- self.log.debug('This flow does not exist on the switch, '
+ self.log.debug('This flow does not exist on switch, '
'normal after an OLT reboot',
flow=flow_to_remove)
else:
raise grpc_e
- self.resource_mgr.free_flow_id(pon_port, onu_id, uni_id, flow_id)
+ self.resource_mgr.free_flow_id(pon_port, onu_id, uni_id,
+ flow_id)
try:
- tconts = self.tech_profile[pon_port].get_tconts(tech_profile_instance)
+ tconts = self.tech_profile[pon_port].get_tconts(
+ tech_profile_instance)
self.stub.RemoveTconts(openolt_pb2.Tconts(intf_id=pon_port,
onu_id=onu_id,
uni_id=uni_id,
@@ -1046,7 +1096,8 @@
# Make sure we have as many tech_profiles as there are pon ports on
# the device
- assert len(self.tech_profile) == self.resource_mgr.device_info.pon_ports
+ assert len(self.tech_profile) \
+ == self.resource_mgr.device_info.pon_ports
def _get_flow_info_as_json_blob(self, flow, flow_store_cookie,
flow_category=None):
@@ -1057,18 +1108,17 @@
if flow_category is not None:
json_blob['flow_category'] = flow_category
- # For flows which trap out of the NNI, the access_intf_id is invalid (set to -1).
- # In such cases, we need to refer to the network_intf_id.
+ # For flows which trap out of the NNI, the access_intf_id is invalid
+ # (set to -1). In such cases, we need to refer to the network_intf_id.
if flow.access_intf_id != -1:
- flow_info = self.resource_mgr.get_flow_id_info(flow.access_intf_id,
- flow.onu_id, flow.uni_id,
- flow.flow_id)
+ flow_info = self.resource_mgr.get_flow_id_info(
+ flow.access_intf_id, flow.onu_id, flow.uni_id, flow.flow_id)
else:
- # Case of LLDP trap flow from the NNI. We can't use flow.access_intf_id
- # in that case, as it is invalid. We use flow.network_intf_id.
- flow_info = self.resource_mgr.get_flow_id_info(flow.network_intf_id,
- flow.onu_id, flow.uni_id,
- flow.flow_id)
+ # Case of LLDP trap flow from the NNI. We can't use
+ # flow.access_intf_id in that case, as it is invalid.
+ # We use flow.network_intf_id.
+ flow_info = self.resource_mgr.get_flow_id_info(
+ flow.network_intf_id, flow.onu_id, flow.uni_id, flow.flow_id)
if flow_info is None:
flow_info = list()
@@ -1093,10 +1143,11 @@
if self.nni_intf_id is not None:
return self.nni_intf_id
- port_list = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_NNI)
- logical_port = self.adapter_agent.get_logical_port(self.logical_device_id,
- port_list[0].label)
- self.nni_intf_id = self.platform.intf_id_from_nni_port_num(logical_port.ofp_port.port_no)
+ port_list = self.adapter_agent.get_ports(self.device_id,
+ Port.ETHERNET_NNI)
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, port_list[0].label)
+ self.nni_intf_id = self.platform.intf_id_from_nni_port_num(
+ logical_port.ofp_port.port_no)
self.log.debug("nni-intf-d ", nni_intf_id=self.nni_intf_id)
return self.nni_intf_id
-