VOL-1044 VOL-1114 VOL-1115 Openolt acts on logical flows directly and can remove flows
Change-Id: I4d71a365d1af3fa4285b2e84280afe4f7039e343
diff --git a/voltha/adapters/openolt/openolt_flow_mgr.py b/voltha/adapters/openolt/openolt_flow_mgr.py
index 48c877d..ae388e0 100644
--- a/voltha/adapters/openolt/openolt_flow_mgr.py
+++ b/voltha/adapters/openolt/openolt_flow_mgr.py
@@ -13,8 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import copy
+from twisted.internet import reactor
-from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC
+from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, \
+ ofp_flow_stats, ofp_match, OFPMT_OXM, Flows, FlowGroups, OFPXMT_OFB_IN_PORT
import voltha.core.flow_decomposer as fd
import openolt_platform as platform
from voltha.adapters.openolt.protos import openolt_pb2
@@ -37,20 +40,23 @@
class OpenOltFlowMgr(object):
- def __init__(self, log, stub, device_id):
+ def __init__(self, log, stub, device_id, logical_device_id):
self.log = log
self.stub = stub
self.device_id = device_id
- self.flow_proxy = registry('core').get_proxy(
+ self.logical_device_id = logical_device_id
+ self.logical_flows_proxy = registry('core').get_proxy(
+ '/logical_devices/{}/flows'.format(self.logical_device_id))
+ self.flows_proxy = registry('core').get_proxy(
'/devices/{}/flows'.format(self.device_id))
+ self.root_proxy = registry('core').get_proxy('/')
+ self.fd_object = fd.FlowDecomposer()
- def add_flow(self, flow, is_down_stream):
- self.log.debug('add flow', flow=flow, is_down_stream=is_down_stream)
+ def add_flow(self, flow):
+ self.log.debug('add flow', flow=flow)
classifier_info = dict()
action_info = dict()
- in_port = fd.get_in_port(flow)
- assert in_port is not None
for field in fd.get_ofb_fields(flow):
if field.type == fd.ETH_TYPE:
@@ -104,13 +110,16 @@
output=action_info['output'],
in_port=classifier_info['in_port'])
elif action.type == fd.POP_VLAN:
+ if fd.get_goto_table_id(flow) is None:
+ self.log.debug('being taken care of by ONU', flow=flow)
+ return
action_info['pop_vlan'] = True
- self.log.debug('action-type-pop-vlan', in_port=in_port)
+ self.log.debug('action-type-pop-vlan', in_port=classifier_info['in_port'])
elif action.type == fd.PUSH_VLAN:
action_info['push_vlan'] = True
action_info['tpid'] = action.push.ethertype
self.log.debug('action-type-push-vlan',
- push_tpid=action_info['tpid'], in_port=in_port)
+ push_tpid=action_info['tpid'], in_port=classifier_info['in_port'])
if action.push.ethertype != 0x8100:
self.log.error('unhandled-tpid',
ethertype=action.push.ethertype)
@@ -120,7 +129,7 @@
assert (action.set_field.field.oxm_class ==
OFPXMC_OPENFLOW_BASIC)
self.log.debug('action-type-set-field',
- field=_field, in_port=in_port)
+ field=_field, in_port=classifier_info['in_port'])
if _field.type == fd.VLAN_VID:
self.log.debug('set-field-type-vlan-vid',
vlan_vid=_field.vlan_vid & 0xfff)
@@ -130,112 +139,148 @@
field_type=_field.type)
else:
self.log.error('unsupported-action-type',
- action_type=action.type, in_port=in_port)
+ action_type=action.type, in_port=classifier_info['in_port'])
- # FIXME - Why ignore downstream flows?
- if is_down_stream is False:
- intf_id = platform.intf_id_from_uni_port_num(
- classifier_info['in_port'])
- onu_id = platform.onu_id_from_port_num(
- classifier_info['in_port'])
- self.divide_and_add_flow(intf_id, onu_id,
- flow.priority, classifier_info,
- action_info)
- # else:
- # self.log.info('ignore downstream flow', flow=flow,
- # classifier_info=classifier_info,
- # action_info=action_info)
+ if fd.get_goto_table_id(flow) is not None and not 'pop_vlan' in \
+ action_info:
+ self.log.debug('being taken care of by ONU', flow=flow)
- # FIXME - No need for divide_and_add_flow if
- # both upstream and downstream flows
- # are acted upon (not just upstream flows).
- def divide_and_add_flow(self, intf_id, onu_id, priority, classifier,
- action):
+ if not 'output' in action_info and 'metadata' in classifier_info:
+ #find flow in the next table
+ next_flow = self.find_next_flow(flow)
+ if next_flow is None:
+ return
+ action_info['output'] = fd.get_out_port(next_flow)
+ for field in fd.get_ofb_fields(next_flow):
+ if field.type == fd.VLAN_VID:
+ classifier_info['metadata'] = field.vlan_vid & 0xfff
+
+
+ (intf_id, onu_id) = platform.extract_access_from_flow(
+ classifier_info['in_port'], action_info['output'])
+
+
+ self.divide_and_add_flow(intf_id, onu_id, classifier_info,
+ action_info, flow)
+
+ def remove_flow(self, flow):
+ self.log.debug('trying to remove flows from logical flow :',
+ logical_flow=flow)
+ device_flows_to_remove = []
+ device_flows = self.flows_proxy.get('/').items
+ for f in device_flows:
+ if f.cookie == flow.id:
+ device_flows_to_remove.append(f)
+
+
+ for f in device_flows_to_remove:
+ (id, direction) = self.decode_stored_id(f.id)
+ flow_to_remove = openolt_pb2.Flow(flow_id=id, flow_type=direction)
+ self.stub.FlowRemove(flow_to_remove)
+ self.log.debug('flow removed from device', flow=f,
+ flow_key=flow_to_remove)
+
+ if len(device_flows_to_remove) > 0:
+ new_flows = []
+ flows_ids_to_remove = [f.id for f in device_flows_to_remove]
+ for f in device_flows:
+ if f.id not in flows_ids_to_remove:
+ new_flows.append(f)
+
+ self.flows_proxy.update('/', Flows(items=new_flows))
+ self.log.debug('flows removed from the data store',
+ flow_ids_removed=flows_ids_to_remove,
+ number_of_flows_removed=(len(device_flows) - len(
+ new_flows)), expected_flows_removed=len(
+ device_flows_to_remove))
+ else:
+ self.log.debug('no device flow to remove for this flow (normal '
+ 'for multi table flows)', flow=flow)
+
+
+ def divide_and_add_flow(self, intf_id, onu_id, classifier,
+ action, flow):
+
+ self.log.debug('sorting flow', intf_id=intf_id, onu_id=onu_id,
+ classifier=classifier, action=action)
+
if 'ip_proto' in classifier:
if classifier['ip_proto'] == 17:
self.log.debug('dhcp flow add')
- self.add_dhcp_trap(intf_id, onu_id, priority, classifier,
- action)
+ self.add_dhcp_trap(intf_id, onu_id, classifier,
+ action, flow)
elif classifier['ip_proto'] == 2:
- self.log.debug('igmp flow add ignored')
+ self.log.warn('igmp flow add ignored, not implemented yet')
else:
- self.log.debug("Invalid-Classifier-to-handle",
+ self.log.warn("Invalid-Classifier-to-handle",
classifier=classifier,
action=action)
elif 'eth_type' in classifier:
if classifier['eth_type'] == EAP_ETH_TYPE:
self.log.debug('eapol flow add')
- self.add_eapol_flow(intf_id, onu_id, priority)
+ self.add_eapol_flow(intf_id, onu_id, flow)
elif 'push_vlan' in action:
- self.add_data_flow(intf_id, onu_id, priority, classifier, action)
+ self.add_upstream_data_flow(intf_id, onu_id, classifier, action,
+ flow)
+ elif 'pop_vlan' in action:
+ self.add_downstream_data_flow(intf_id, onu_id, classifier,
+ action, flow)
else:
self.log.debug('Invalid-flow-type-to-handle',
classifier=classifier,
- action=action)
+ action=action, flow=flow)
- def add_data_flow(self, intf_id, onu_id, priority, uplink_classifier,
- uplink_action):
- downlink_classifier = dict(uplink_classifier)
- downlink_action = dict(uplink_action)
+ def add_upstream_data_flow(self, intf_id, onu_id, uplink_classifier,
+ uplink_action, logical_flow):
+
uplink_classifier['pkt_tag_type'] = 'single_tag'
- downlink_classifier['pkt_tag_type'] = 'double_tag'
- downlink_classifier['vlan_vid'] = uplink_action['vlan_vid']
- downlink_classifier['metadata'] = uplink_classifier['vlan_vid']
- del downlink_action['push_vlan']
- downlink_action['pop_vlan'] = True
-
- # To-Do right now only one GEM port is supported, so below method
- # will take care of handling all the p bits.
- # We need to revisit when mulitple gem port per p bits is needed.
- self.add_hsia_flow(intf_id, onu_id, priority, uplink_classifier,
- uplink_action, downlink_classifier, downlink_action,
- HSIA_FLOW_INDEX)
+ self.add_hsia_flow(intf_id, onu_id, uplink_classifier,
+ uplink_action, 'upstream', HSIA_FLOW_INDEX,
+ logical_flow)
# Secondary EAP on the subscriber vlan
- (eap_active, eap_priority) = self.is_eap_enabled(intf_id, onu_id)
+ (eap_active, eap_logical_flow) = self.is_eap_enabled(intf_id, onu_id)
if eap_active:
- self.add_eapol_flow(intf_id, onu_id, eap_priority,
+ self.add_eapol_flow(intf_id, onu_id, eap_logical_flow,
uplink_eapol_id=EAPOL_UPLINK_SECONDARY_FLOW_INDEX,
downlink_eapol_id=EAPOL_DOWNLINK_SECONDARY_FLOW_INDEX,
vlan_id=uplink_classifier['vlan_vid'])
- def add_hsia_flow(self, intf_id, onu_id, priority, uplink_classifier,
- uplink_action, downlink_classifier, downlink_action,
- hsia_id):
+
+ def add_downstream_data_flow(self, intf_id, onu_id, downlink_classifier,
+ downlink_action, flow):
+ downlink_classifier['pkt_tag_type'] = 'double_tag'
+ # Needed ???? It should be already there
+ downlink_action['pop_vlan'] = True
+ downlink_action['vlan_vid'] = downlink_classifier['vlan_vid']
+
+ self.add_hsia_flow(intf_id, onu_id, downlink_classifier,
+ downlink_action, 'downstream', HSIA_FLOW_INDEX,
+ flow)
+
+ # To-Do right now only one GEM port is supported, so below method
+ # will take care of handling all the p bits.
+ # We need to revisit when mulitple gem port per p bits is needed.
+ def add_hsia_flow(self, intf_id, onu_id, classifier, action,
+ direction, hsia_id, logical_flow):
gemport_id = platform.mk_gemport_id(onu_id)
flow_id = platform.mk_flow_id(intf_id, onu_id, hsia_id)
- self.log.debug('add upstream flow', onu_id=onu_id,
- classifier=uplink_classifier, action=uplink_action,
- gemport_id=gemport_id, flow_id=flow_id)
-
flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=flow_id, flow_type="upstream",
- access_intf_id=intf_id, gemport_id=gemport_id, priority=priority,
- classifier=self.mk_classifier(uplink_classifier),
- action=self.mk_action(uplink_action))
-
- self.stub.FlowAdd(flow)
-
- self.log.debug('add downstream flow', classifier=downlink_classifier,
- action=downlink_action, gemport_id=gemport_id,
- flow_id=flow_id)
-
- flow = openolt_pb2.Flow(
- onu_id=onu_id, flow_id=flow_id, flow_type="downstream",
+ onu_id=onu_id, flow_id=flow_id, flow_type=direction,
access_intf_id=intf_id, gemport_id=gemport_id,
- priority=priority,
- classifier=self.mk_classifier(downlink_classifier),
- action=self.mk_action(downlink_action))
+ priority=logical_flow.priority,
+ classifier=self.mk_classifier(classifier),
+ action=self.mk_action(action))
- self.stub.FlowAdd(flow)
+ self.add_flow_to_device(flow, logical_flow)
- def add_dhcp_trap(self, intf_id, onu_id, priority, classifier, action):
+ def add_dhcp_trap(self, intf_id, onu_id, classifier, action, logical_flow):
self.log.debug('add dhcp upstream trap', classifier=classifier,
action=action)
@@ -250,17 +295,22 @@
upstream_flow = openolt_pb2.Flow(
onu_id=onu_id, flow_id=flow_id, flow_type="upstream",
- access_intf_id=intf_id, network_intf_id=0, gemport_id=gemport_id,
- priority=priority, classifier=self.mk_classifier(classifier),
+ access_intf_id=intf_id, gemport_id=gemport_id,
+ priority=logical_flow.priority,
+ classifier=self.mk_classifier(classifier),
action=self.mk_action(action))
- self.stub.FlowAdd(upstream_flow)
+ self.add_flow_to_device(upstream_flow, logical_flow)
- # FIXME - Fix OpenOLT handling of downstream flows instead
- # of duplicating the downstream flow from the upstream
- # flow.
+
# FIXME - ONOS should send explicit upstream and downstream
# exact dhcp trap flow.
+
+ downstream_logical_flow = copy.deepcopy(logical_flow)
+ for oxm_field in downstream_logical_flow.match.oxm_fields:
+ if oxm_field.ofb_field.type == OFPXMT_OFB_IN_PORT:
+ oxm_field.ofb_field.port = 128
+
classifier['udp_src'] = 67
classifier['udp_dst'] = 68
classifier['pkt_tag_type'] = 'double_tag'
@@ -272,22 +322,18 @@
downstream_flow = openolt_pb2.Flow(
onu_id=onu_id, flow_id=flow_id, flow_type="downstream",
access_intf_id=intf_id, network_intf_id=0, gemport_id=gemport_id,
- priority=priority, classifier=self.mk_classifier(classifier),
+ priority=logical_flow.priority, classifier=self.mk_classifier(
+ classifier),
action=self.mk_action(action))
- self.log.debug('add dhcp downstream trap', access_intf_id=intf_id,
- onu_id=onu_id, flow_id=flow_id)
- self.stub.FlowAdd(downstream_flow)
- def add_eapol_flow(self, intf_id, onu_id, priority,
+ self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+
+ def add_eapol_flow(self, intf_id, onu_id, logical_flow,
uplink_eapol_id=EAPOL_FLOW_INDEX,
downlink_eapol_id=EAPOL_DOWNLINK_FLOW_INDEX,
vlan_id=DEFAULT_MGMT_VLAN):
- # self.log.debug('add eapol flow pre-process',
- # classifier=uplink_classifier)
- # #action=uplink_action)
-
downlink_classifier = {}
downlink_classifier['eth_type'] = EAP_ETH_TYPE
downlink_classifier['pkt_tag_type'] = 'single_tag'
@@ -313,11 +359,17 @@
upstream_flow = openolt_pb2.Flow(
onu_id=onu_id, flow_id=uplink_flow_id, flow_type="upstream",
- access_intf_id=intf_id, gemport_id=gemport_id, priority=priority,
+ access_intf_id=intf_id, gemport_id=gemport_id,
+ priority=logical_flow.priority,
classifier=self.mk_classifier(uplink_classifier),
action=self.mk_action(uplink_action))
- self.stub.FlowAdd(upstream_flow)
+ logical_flow = copy.deepcopy(logical_flow)
+ logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([fd.vlan_vid(
+ vlan_id | 0x1000)]))
+ logical_flow.match.type = OFPMT_OXM
+
+ self.add_flow_to_device(upstream_flow, logical_flow)
# Add Downstream EAPOL Flow.
downlink_flow_id = platform.mk_flow_id(intf_id, onu_id,
@@ -326,13 +378,26 @@
downstream_flow = openolt_pb2.Flow(
onu_id=onu_id, flow_id=downlink_flow_id, flow_type="downstream",
access_intf_id=intf_id, gemport_id=gemport_id,
+ priority=logical_flow.priority,
classifier=self.mk_classifier(downlink_classifier),
action=self.mk_action(downlink_action))
- self.stub.FlowAdd(downstream_flow)
+ downstream_logical_flow = ofp_flow_stats(id=logical_flow.id,
+ cookie=logical_flow.cookie, table_id=logical_flow.table_id,
+ priority=logical_flow.priority, flags=logical_flow.flags)
- self.log.debug('eap flows', upstream_flow=upstream_flow,
- downstream_flow=downstream_flow)
+ downstream_logical_flow.match.oxm_fields.extend(fd.mk_oxm_fields([
+ fd.in_port(fd.get_out_port(logical_flow)),
+ fd.eth_type(EAP_ETH_TYPE), fd.vlan_vid(vlan_id | 0x1000)]))
+ downstream_logical_flow.match.type = OFPMT_OXM
+
+ downstream_logical_flow.instructions.extend(
+ fd.mk_instructions_from_actions([fd.output(
+ platform.mk_uni_port_num(intf_id, onu_id))]))
+
+ self.add_flow_to_device(downstream_flow, downstream_logical_flow)
+
+
def mk_classifier(self, classifier_info):
@@ -385,7 +450,7 @@
return action
def is_eap_enabled(self, intf_id, onu_id):
- flows = self.flow_proxy.get('/').items
+ flows = self.logical_flows_proxy.get('/').items
for flow in flows:
eap_flow = False
@@ -405,6 +470,81 @@
intf_id=intf_id, eap_intf_id=eap_intf_id,
eap_onu_id=eap_onu_id)
if eap_flow and intf_id == eap_intf_id and onu_id == eap_onu_id:
- return (True, flow.priority)
+ return (True, flow)
- return (False, 0)
+ return (False, None)
+
+ def add_flow_to_device(self, flow, logical_flow):
+ self.log.debug('pushing flow to device', flow=flow)
+ self.stub.FlowAdd(flow)
+ self.register_flow(logical_flow, flow)
+
+ def register_flow(self, logical_flow, device_flow):
+ self.log.debug('registering flow in device',
+ logical_flow=logical_flow, device_flow=device_flow)
+ stored_flow = copy.deepcopy(logical_flow)
+ stored_flow.id = self.generate_stored_id(device_flow.flow_id,
+ device_flow.flow_type)
+ self.log.debug('generated device flow id', id=stored_flow.id,
+ flow_id=device_flow.flow_id,
+ direction=device_flow.flow_type)
+ stored_flow.cookie = logical_flow.id
+ flows = self.flows_proxy.get('/')
+ flows.items.extend([stored_flow])
+ self.flows_proxy.update('/', flows)
+
+
+ def find_next_flow(self, flow):
+ table_id = fd.get_goto_table_id(flow)
+ metadata = 0
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.METADATA:
+ metadata = field.table_metadata
+ if table_id is None:
+ return None
+ flows = self.logical_flows_proxy.get('/').items
+ next_flows = []
+ for f in flows:
+ if f.table_id == table_id:
+ #FIXME:
+ if fd.get_in_port(f) == fd.get_in_port(flow) and \
+ fd.get_out_port(f) == metadata:
+
+ next_flows.append(f)
+
+
+ if len(next_flows) == 0:
+ self.log.warning('no next flow found, it may be a timing issue',
+ flow=flow, number_of_flows=len(flows))
+ reactor.callLater(5, self.add_flow, flow)
+ return None
+
+ next_flows.sort(key=lambda f:f.priority, reverse=True)
+
+ return next_flows[0]
+
+ def update_children_flows(self, device_rules_map):
+
+ for device_id, (flows, groups) in device_rules_map.iteritems():
+ if device_id != self.device_id:
+ self.root_proxy.update('/devices/{}/flows'.format(device_id),
+ Flows(items=flows.values()))
+ self.root_proxy.update('/devices/{}/flow_groups'.format(
+ device_id), FlowGroups(items=groups.values()))
+
+ def generate_stored_id(self, flow_id, direction):
+ if direction == 'upstream':
+ self.log.debug('upstream flow, shifting id')
+ return 0x1 << 15 | flow_id
+ elif direction == 'downstream':
+ self.log.debug('downstream flow, not shifting id')
+ return flow_id
+ else:
+ self.log.warn('Unrecognized direction', direction=direction)
+ return flow_id
+
+ def decode_stored_id(self, id):
+ if id >> 15 == 0x1:
+ return (id & 0x7fff, 'upstream')
+ else:
+ return (id, 'downstream')
\ No newline at end of file