VOL-1490: Push supplimental eap needed for rg restarts and onu restarts
Needed so that re-eap can happen after subscriber flows are pushed.
Also stub out new possible local device flow store needed for
olt disable/restart.
Change-Id: I03171cc7bb2d63d877d061d08509c287decac2c3
diff --git a/python/adapters/openolt/openolt_flow_mgr.py b/python/adapters/openolt/openolt_flow_mgr.py
index 7dc00c8..651ed1a 100644
--- a/python/adapters/openolt/openolt_flow_mgr.py
+++ b/python/adapters/openolt/openolt_flow_mgr.py
@@ -287,8 +287,11 @@
def remove_flow(self, flow):
self.log.debug('trying to remove flows from logical flow :',
logical_flow=flow)
+
+ # TODO NEW CORE: Keep track of device flows locally. need new array
device_flows_to_remove = []
- device_flows = self.flows_proxy.get('/').items
+ #device_flows = self.flows_proxy.get('/').items
+ device_flows = []
for f in device_flows:
if f.cookie == flow.id:
device_flows_to_remove.append(f)
@@ -373,13 +376,12 @@
self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, flow, alloc_id,
gemport_id)
- # TODO NEW CORE: Skip trying to add subsequent eap capture for subscriber vlan
- # (later attempts at re-eap)
- #vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
- #if vlan_id is not None:
- # self.add_eapol_flow(
- # intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
- # vlan_id=vlan_id)
+ vlan_id = yield self.get_subscriber_vlan(fd.get_child_port_from_tunnelid(flow))
+ if vlan_id is not None:
+ self.log.debug('adding-supplimental-eap-flow-vlan', vlan_id=vlan_id)
+ self.add_eapol_flow(
+ intf_id, onu_id, uni_id, port_no, flow, alloc_id, gemport_id,
+ vlan_id=vlan_id)
parent_port_no = self.platform.intf_id_to_port_no(intf_id, Port.PON_OLT)
self.log.debug('get-child-device', intf_id=intf_id, onu_id=onu_id,
@@ -503,6 +505,7 @@
return alloc_id, gem_port_ids
+ @inlineCallbacks
def add_upstream_data_flow(self, intf_id, onu_id, uni_id, port_no, uplink_classifier,
uplink_action, logical_flow, alloc_id,
gemport_id):
@@ -514,8 +517,9 @@
logical_flow, alloc_id, gemport_id)
# Secondary EAP on the subscriber vlan
- (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id, uni_id)
- if eap_active:
+ eap_logical_flow = yield self.is_eap_enabled(intf_id, onu_id, uni_id)
+ if eap_logical_flow is not None:
+ self.log.debug('adding-supplimental-eap-flow', flow=eap_logical_flow)
self.add_eapol_flow(intf_id, onu_id, uni_id, port_no, eap_logical_flow, alloc_id,
gemport_id, vlan_id=uplink_classifier[VLAN_VID])
@@ -768,8 +772,11 @@
def repush_all_different_flows(self):
# Check if the device is supposed to have flows, if so add them
# Recover static flows after a reboot
- logical_flows = self.logical_flows_proxy.get('/').items
- devices_flows = self.flows_proxy.get('/').items
+ # TODO NEW CORE: Keep track of device flows locally. need new array
+ #logical_flows = self.logical_flows_proxy.get('/').items
+ #devices_flows = self.flows_proxy.get('/').items
+ logical_flows = []
+ devices_flows = []
logical_flows_ids_provisioned = [f.cookie for f in devices_flows]
for logical_flow in logical_flows:
try:
@@ -779,7 +786,9 @@
self.log.exception('Problem reading this flow', e=e)
def reset_flows(self):
- self.flows_proxy.update('/', Flows())
+ # TODO NEW CORE: Keep track of device flows locally. need new array. here clear them out
+ #self.flows_proxy.update('/', Flows())
+ pass
""" Add a downstream LLDP trap flow on the NNI interface
"""
@@ -886,8 +895,13 @@
return
return action
+ @inlineCallbacks
def is_eap_enabled(self, intf_id, onu_id, uni_id):
- flows = self.logical_flows_proxy.get('/').items
+ self.log.debug('looking from eap enabled for port', intf_id=intf_id, onu_id=onu_id, uni_id=uni_id)
+
+ # TODO NEW CORE: This is really spendy, likely perfomance implications
+ device = yield self.core_proxy.get_device(self.device_id)
+ flows = device.flows.items
for flow in flows:
eap_flow = False
@@ -898,11 +912,11 @@
if field.type == fd.ETH_TYPE:
if field.eth_type == EAP_ETH_TYPE:
eap_flow = True
- if field.type == fd.IN_PORT:
- eap_intf_id = self.platform.intf_id_from_uni_port_num(
- field.port)
- eap_onu_id = self.platform.onu_id_from_port_num(field.port)
- eap_uni_id = self.platform.uni_id_from_port_num(field.port)
+ if field.type == fd.TUNNEL_ID:
+ port = fd.get_child_port_from_tunnelid(flow)
+ eap_intf_id = self.platform.intf_id_from_uni_port_num(port)
+ eap_onu_id = self.platform.onu_id_from_port_num(port)
+ eap_uni_id = self.platform.uni_id_from_port_num(port)
if eap_flow:
self.log.debug('eap flow detected', onu_id=onu_id, uni_id=uni_id,
@@ -910,17 +924,27 @@
eap_onu_id=eap_onu_id,
eap_uni_id=eap_uni_id)
if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id and uni_id == eap_uni_id:
- return True, flow
+ returnValue(flow)
- return False, None
+ returnValue(None)
+ @inlineCallbacks
def get_subscriber_vlan(self, port):
self.log.debug('looking from subscriber flow for port', port=port)
- flows = self.logical_flows_proxy.get('/').items
+ # TODO NEW CORE: This is really spendy, likely perfomance implications
+ device = yield self.core_proxy.get_device(self.device_id)
+ flows = device.flows.items
+
for flow in flows:
- in_port = fd.get_in_port(flow)
+ in_port = fd.get_child_port_from_tunnelid(flow)
out_port = fd.get_out_port(flow)
+ self.log.debug('looping-flows', in_port=in_port, out_port=out_port)
+
+ if self.platform.is_controller(out_port):
+ self.log.debug('skipping-controller-flow', in_port=in_port, out_port=out_port)
+ continue
+
if in_port == port and out_port is not None and \
self.platform.intf_id_to_port_type_name(out_port) \
== Port.ETHERNET_NNI:
@@ -930,9 +954,9 @@
if field.type == OFPXMT_OFB_VLAN_VID:
self.log.debug('subscriber vlan found',
vlan_id=field.vlan_vid)
- return field.vlan_vid & 0x0fff
+ returnValue(field.vlan_vid & 0x0fff)
self.log.debug('No subscriber flow found', port=port)
- return None
+ returnValue(None)
def add_flow_to_device(self, flow, logical_flow):
self.log.debug('pushing flow to device', flow=flow)
@@ -949,8 +973,7 @@
except Exception as f:
self.log.exception("unexpected-openolt-agent-error", flow=flow, logical_flow=logical_flow, f=f)
else:
- # TODO NEW CORE: Should not need. Core keeps track of logical flows. no need to keep track. verify, especially olt reboot!
- # self.register_flow(logical_flow, flow)
+ self.register_flow(logical_flow, flow)
return True
def update_flow_info_to_kv_store(self, intf_id, onu_id, uni_id, flow_id, flow):
@@ -968,54 +991,11 @@
flow_id=device_flow.flow_id,
direction=device_flow.flow_type)
stored_flow.cookie = logical_flow.id
- flows = self.flows_proxy.get('/')
- flows.items.extend([stored_flow])
- self.flows_proxy.update('/', flows)
- def find_next_flow(self, flow):
- table_id = fd.get_goto_table_id(flow)
- metadata = 0
- # Prior to ONOS 1.13.5, Metadata contained the UNI output port number. In
- # 1.13.5 and later, the lower 32-bits is the output port number and the
- # upper 32-bits is the inner-vid we are looking for. Use just the lower 32
- # bits. Allows this code to work with pre- and post-1.13.5 ONOS OltPipeline
-
- for field in fd.get_ofb_fields(flow):
- if field.type == fd.METADATA:
- metadata = field.table_metadata & 0xFFFFFFFF
- if table_id is None:
- return None
- flows = self.logical_flows_proxy.get('/').items
- next_flows = []
- for f in flows:
- if f.table_id == table_id:
- # FIXME
- if fd.get_in_port(f) == fd.get_in_port(flow) and \
- fd.get_out_port(f) == metadata:
- next_flows.append(f)
-
- if len(next_flows) == 0:
- self.log.warning('no next flow found, it may be a timing issue',
- flow=flow, number_of_flows=len(flows))
- if flow.id in self.retry_add_flow_list:
- self.log.debug('flow is already in retry list', flow_id=flow.id)
- else:
- self.retry_add_flow_list.append(flow.id)
- reactor.callLater(5, self.retry_add_flow, flow)
- return None
-
- next_flows.sort(key=lambda f: f.priority, reverse=True)
-
- return next_flows[0]
-
- def update_children_flows(self, device_rules_map):
-
- for device_id, (flows, groups) in device_rules_map.iteritems():
- if device_id != self.device_id:
- self.root_proxy.update('/devices/{}/flows'.format(device_id),
- Flows(items=flows.values()))
- self.root_proxy.update('/devices/{}/flow_groups'.format(
- device_id), FlowGroups(items=groups.values()))
+ # TODO NEW CORE: Keep track of device flows locally. need new array
+ #flows = self.flows_proxy.get('/')
+ #flows.items.extend([stored_flow])
+ #self.flows_proxy.update('/', flows)
def clear_flows_and_scheduler_for_logical_port(self, child_device, logical_port):
ofp_port_name = logical_port.ofp_port.name