VOL-1191 Remove flows from storage on device down, push all different flows on flow update
Change-Id: Icadef7a4afa26d603fef27c2aee545fa8bfafb09
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 257ec3a..644a036 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -84,7 +84,8 @@
{'trigger': 'go_state_down',
'source': ['state_up'],
'dest': 'state_down',
- 'before': 'do_state_down'}]
+ 'before': 'do_state_down',
+ 'after': 'post_down'}]
def __init__(self, **kwargs):
super(OpenoltDevice, self).__init__()
@@ -178,8 +179,6 @@
self.log.debug("do_state_connected")
device = self.adapter_agent.get_device(self.device_id)
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
self.stub = openolt_pb2_grpc.OpenoltStub(self.channel)
self.flow_mgr = OpenOltFlowMgr(self.log, self.stub, self.device_id,
@@ -190,6 +189,12 @@
self.stats_mgr = OpenOltStatisticsMgr(self, self.log)
self.bw_mgr = OpenOltBW(self.log, self.proxy)
+ # TODO: check for uptime and reboot if too long (VOL-1192)
+
+
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
def do_state_up(self, event):
self.log.debug("do_state_up")
@@ -254,7 +259,16 @@
device.oper_status = oper_state
device.connect_status = connect_state
- self.adapter_agent.update_device(device)
+ reactor.callLater(2, self.adapter_agent.update_device, device)
+
+ # def post_up(self, event):
+ # self.log.debug('post-up')
+ # self.flow_mgr.reseed_flows()
+
+ def post_down(self, event):
+ self.log.debug('post_down')
+ self.flow_mgr.reset_flows()
+
def indications_thread(self):
self.log.debug('starting-indications-thread')
@@ -898,6 +912,10 @@
flows_to_remove=[f.id for f in flows_to_remove])
return
+ try:
+ self.flow_mgr.update_children_flows(device_rules_map)
+ except Exception as e:
+ self.log.error('Error updating children flows', error=e)
self.log.debug('logical flows update', flows_to_add=flows_to_add,
flows_to_remove=flows_to_remove)
@@ -916,7 +934,6 @@
except Exception as e:
self.log.error('failed to add flow', flow=flow, e=e)
- self.flow_mgr.update_children_flows(device_rules_map)
for flow in flows_to_remove:
@@ -925,6 +942,8 @@
except Exception as e:
self.log.error('failed to remove flow', flow=flow, e=e)
+ self.flow_mgr.repush_all_different_flows()
+
# There has to be a better way to do this
def ip_hex(self, ip):
octets = ip.split(".")
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 54d77e9..edf9028 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -18,7 +18,8 @@
import grpc
from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
- ofp_flow_stats, ofp_match, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT
+ ofp_flow_stats, ofp_match, OFPMT_OXM, Flows, FlowGroups, \
+ OFPXMT_OFB_IN_PORT, OFPXMT_OFB_VLAN_VID
import voltha.core.flow_decomposer as fd
import openolt_platform as platform
from voltha.adapters.openolt.protos import openolt_pb2
@@ -228,6 +229,12 @@
if classifier['eth_type'] == EAP_ETH_TYPE:
self.log.debug('eapol flow add')
self.add_eapol_flow(intf_id, onu_id, flow)
+ vlan_id = self.get_subscriber_vlan(fd.get_in_port(flow))
+ if vlan_id is not None:
+ self.add_eapol_flow(intf_id, onu_id, flow,
+ uplink_eapol_id=EAPOL_UPLINK_SECONDARY_FLOW_INDEX,
+ downlink_eapol_id=EAPOL_DOWNLINK_SECONDARY_FLOW_INDEX,
+ vlan_id=vlan_id)
elif 'push_vlan' in action:
self.add_upstream_data_flow(intf_id, onu_id, classifier, action,
@@ -406,7 +413,21 @@
self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+ def repush_all_different_flows(self):
+ # Check if the device is supposed to have flows, if so add them
+ # Recover static flows after a reboot
+ logical_flows = self.logical_flows_proxy.get('/').items
+ devices_flows = self.flows_proxy.get('/').items
+ logical_flows_ids_provisioned = [f.cookie for f in devices_flows]
+ for logical_flow in logical_flows:
+ try:
+ if logical_flow.id not in logical_flows_ids_provisioned:
+ self.add_flow(logical_flow)
+ except Exception as e:
+ self.log.debug('Problem readding this flow', error=e)
+ def reset_flows(self):
+ self.flows_proxy.update('/', Flows())
def mk_classifier(self, classifier_info):
@@ -483,6 +504,25 @@
return (False, None)
+ def get_subscriber_vlan(self, port):
+ self.log.debug('looking from subscriber flow for port', port=port)
+
+ flows = self.logical_flows_proxy.get('/').items
+ for flow in flows:
+ in_port = fd.get_in_port(flow)
+ out_port = fd.get_out_port(flow)
+ #FIXME
+ if in_port == port and out_port == 128:
+ fields = fd.get_ofb_fields(flow)
+ self.log.debug('subscriber flow found', fields=fields)
+ for field in fields:
+ if field.type == OFPXMT_OFB_VLAN_VID:
+ self.log.debug('subscriber vlan found',
+ vlan_id=field.vlan_vid)
+ return field.vlan_vid & 0x0fff
+ self.log.debug('No subscriber flow found', port=port)
+ return None
+
def add_flow_to_device(self, flow, logical_flow):
self.log.debug('pushing flow to device', flow=flow)
self.stub.FlowAdd(flow)