blob: ac536591f9f15eaa0cc9e042835a00fbee429454 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from evc import EVC
from evc_map import EVCMap
from enum import Enum
import voltha.core.flow_decomposer as fd
from voltha.core.flow_decomposer import *
from voltha.protos.openflow_13_pb2 import OFPP_CONTROLLER, OFPP_LOCAL, OFPP_ANY, OFPP_MAX
from twisted.internet.defer import returnValue, inlineCallbacks
log = structlog.get_logger()
# IP Protocol numbers
_supported_ip_protocols = [
1, # ICMP
2, # IGMP
6, # TCP
17, # UDP
]
_existing_flow_entries = {} # device-id -> flow dictionary
# |
# +-> flow-id -> flow-entry
class FlowEntry(object):
"""
Provide a class that wraps the flow rule and also provides state/status for a FlowEntry.
When a new flow is sent, it is first decoded to check for any potential errors. If None are
found, the entry is created and it is analyzed to see if it can be combined to with any other flows
to create or modify an existing EVC.
Note: Since only E-LINE is supported, modification of an existing EVC is not performed.
"""
class FlowDirection(Enum):
UPSTREAM = 0 # UNI port to NNI Port
DOWNSTREAM = 1 # NNI port to UNI Port
NNI = 2 # NNI port to NNI Port
UNI = 3 # UNI port to UNI Port
OTHER = 4 # Unable to determine
_flow_dir_map = {
(FlowDirection.UNI, FlowDirection.NNI): FlowDirection.UPSTREAM,
(FlowDirection.NNI, FlowDirection.UNI): FlowDirection.DOWNSTREAM,
(FlowDirection.UNI, FlowDirection.UNI): FlowDirection.UNI,
(FlowDirection.NNI, FlowDirection.NNI): FlowDirection.NNI,
}
# Well known EtherTypes
class EtherType(Enum):
EAPOL = 0x888E
IPv4 = 0x0800
ARP = 0x0806
# Well known IP Protocols
class IpProtocol(Enum):
IGMP = 2
UDP = 17
def __init__(self, flow, handler):
self._flow = flow
self._handler = handler
self.evc = None # EVC this flow is part of
self.evc_map = None # EVC-MAP this flow is part of
self._flow_direction = FlowEntry.FlowDirection.OTHER
self._name = self._create_flow_name()
# A value used to locate possible related flow entries
self.signature = None
# Selection properties
self.in_port = None
self.vlan_id = None
self.pcp = None
self.eth_type = None
self.ip_protocol = None
self.ipv4_dst = None
self.udp_dst = None # UDP Port #
self.udp_src = None # UDP Port #
self.inner_vid = None
# Actions
self.output = None
self.pop_vlan = 0
self.push_vlan_tpid = []
self.push_vlan_id = []
@property
def name(self):
return self._name # TODO: Is a name really needed in production?
# TODO: Is a name really needed in production?
def _create_flow_name(self):
return 'flow-{}-{}'.format(self.device_id, self.flow_id)
@property
def flow(self):
return self._flow
@property
def flow_id(self):
return self.flow.id
@property
def handler(self):
return self._handler
@property
def device_id(self):
return self.handler.device_id
@property
def flow_direction(self):
return self._flow_direction
@staticmethod
def create(flow, handler):
"""
Create the appropriate FlowEntry wrapper for the flow. This method returns a two
results.
The first result is the flow entry that was created. This could be a match to an
existing flow since it is a bulk update. None is returned only if no match to
an existing entry is found and decode failed (unsupported field)
The second result is the EVC this flow should be added to. This could be an
existing flow (so your adding another EVC-MAP) or a brand new EVC (no existing
EVC-MAPs). None is returned if there are not a valid EVC that can be created YET.
:param flow: (Flow) Flow entry passed to VOLTHA adapter
:param handler: (AdtranDeviceHandler) handler for the device
:return: (FlowEntry, EVC)
"""
# Exit early if it already exists
try:
flow_entry = FlowEntry(flow, handler)
if flow_entry.device_id not in _existing_flow_entries:
_existing_flow_entries[flow_entry.device_id] = {}
flow_table = _existing_flow_entries[flow_entry.device_id]
if flow_entry.flow_id in flow_table:
return flow_entry, None
#########################################
# A new flow, decode it into the items of interest
if not flow_entry._decode():
return None, None
# Look for any matching flows in the other direction that might help make an EVC
# and then save it off in the device specific flow table
# TODO: For now, only support for E-LINE services between NNI and UNI
flow_candidates = [_flow for _flow in flow_table.itervalues()
if _flow.signature == flow_entry.signature and
_flow.in_port == flow_entry.output and
(_flow.flow_direction == FlowEntry.FlowDirection.UPSTREAM or
_flow.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM)
]
flow_table[flow_entry.flow_id] = flow_entry
# TODO: For now, only support for E-LINE services between NNI and UNI
if len(flow_candidates) == 0 or (flow_entry.flow_direction != FlowEntry.FlowDirection.UPSTREAM and
flow_entry.flow_direction != FlowEntry.FlowDirection.DOWNSTREAM):
return flow_entry, None
# Possible candidate found. Currently, the logical_device_agent sends us the load downstream
# flow first and then all the matching upstreams. So we should have only one match
if flow_entry.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
downstream_flow = flow_entry
else:
assert len(flow_candidates) != 0
downstream_flow = flow_candidates[0]
if flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
upstream_flows = [flow_entry]
else:
upstream_flows = flow_candidates
return flow_entry, FlowEntry._create_evc_and_maps(downstream_flow, upstream_flows)
except Exception as e:
log.exception('flow_entry-processing', e=e)
@staticmethod
def _create_evc_and_maps(downstream_flow, upstream_flows):
"""
Give a set of flows, find (or create) the EVC and any needed EVC-MAPs
:param downstream_flow: NNI -> UNI flow (provides much of the EVC values)
:param upstream_flows: UNI -> NNI flows (provides much of the EVC-MAP values)
:return: EVC object
"""
# Get any existing EVC if a flow is already created
if downstream_flow.evc is None:
downstream_flow.evc = EVC(downstream_flow)
evc = downstream_flow.evc
if not evc.valid:
return None
# Create EVC-MAPs
for flow in upstream_flows:
if flow.evc_map is None:
flow.evc_map = EVCMap.create_ingress_map(flow, evc)
all_valid = all(flow.evc_map.valid for flow in upstream_flows)
return evc if all_valid else None
def _decode(self):
"""
Examine flow rules and extract appropriate settings
"""
status = self._decode_traffic_selector() and self._decode_traffic_treatment()
if status:
# Determine direction of the flow
def port_type(port):
if port in self._handler.northbound_ports:
return FlowEntry.FlowDirection.NNI
elif port <= OFPP_MAX:
return FlowEntry.FlowDirection.UNI
return FlowEntry.FlowDirection.OTHER
self._flow_direction = FlowEntry._flow_dir_map.get((port_type(self.in_port), port_type(self.output)),
FlowEntry.FlowDirection.OTHER)
# Create a signature that will help locate related flow entries on a device.
# These are not exact, just ones that may be put together to make an EVC. The
# basic rules are:
#
# 1 - Same device
dev_id = self._handler.device_id
# 2 - Port numbers in increasing order
ports = [self.in_port, self.output]
ports.sort()
# 3 - The outer VID
push_len = len(self.push_vlan_id)
assert push_len <= 2
outer = self.vlan_id or None if push_len == 0 else self.push_vlan_id[0]
# 4 - The inner VID.
if self.inner_vid is not None:
inner = self.inner_vid
else:
inner = self.vlan_id if (push_len > 0 and outer is not None) else None
self.signature = '{}'.format(dev_id)
for port in ports:
self.signature += '.{}'.format(port)
self.signature += '.{}.{}'.format(outer, inner)
return status
def _decode_traffic_selector(self):
"""
Extract EVC related traffic selection settings
"""
self.in_port = fd.get_in_port(self._flow)
if self.in_port > OFPP_MAX:
log.warn('Logical-input-ports-not-supported')
return False
for field in fd.get_ofb_fields(self._flow):
if field.type == IN_PORT:
pass # Handled earlier
elif field.type == VLAN_VID:
# log.info('*** field.type == VLAN_VID', value=field.vlan_vid & 0xfff)
self.vlan_id = field.vlan_vid & 0xfff
elif field.type == VLAN_PCP:
# log.info('*** field.type == VLAN_PCP', value=field.vlan_pcp)
self.pcp = field.vlan_pcp
elif field.type == ETH_TYPE:
# log.info('*** field.type == ETH_TYPE', value=field.eth_type)
self.eth_type = field.eth_type
elif field.type == IP_PROTO:
# log.info('*** field.type == IP_PROTO', value=field.ip_proto)
self.ip_protocol = field.ip_proto
if self.ip_protocol not in _supported_ip_protocols:
# log.error('Unsupported IP Protocol')
return False
elif field.type == IPV4_DST:
# log.info('*** field.type == IPV4_DST', value=field.ipv4_dst)
self.ipv4_dst = field.ipv4_dst
elif field.type == UDP_DST:
# log.info('*** field.type == UDP_DST', value=field.udp_dst)
self.udp_dst = field.udp_dst
elif field.type == UDP_SRC:
# log.info('*** field.type == UDP_SRC', value=field.udp_src)
self.udp_src = field.udp_src
elif field.type == METADATA:
# log.info('*** field.type == METADATA', value=field.table_metadata)
self.inner_vid = field.table_metadata
else:
log.warn('unsupported-selection-field', type=field.type)
self._status_message = 'Unsupported field.type={}'.format(field.type)
return False
return True
def _decode_traffic_treatment(self):
self.output = fd.get_out_port(self._flow)
if self.output > OFPP_MAX:
log.warn('Logical-output-ports-not-supported')
return False
for act in fd.get_actions(self._flow):
if act.type == fd.OUTPUT:
pass # Handled earlier
elif act.type == POP_VLAN:
# log.info('*** action.type == POP_VLAN')
self.pop_vlan += 1
elif act.type == PUSH_VLAN:
# log.info('*** action.type == PUSH_VLAN', value=act.push)
# TODO: Do we want to test the ethertype for support?
tpid = act.push.ethertype
self.push_vlan_tpid.append(tpid)
elif act.type == SET_FIELD:
# log.info('*** action.type == SET_FIELD', value=act.set_field.field)
assert (act.set_field.field.oxm_class == ofp.OFPXMC_OPENFLOW_BASIC)
field = act.set_field.field.ofb_field
if field.type == VLAN_VID:
self.push_vlan_id.append(field.vlan_vid & 0xfff)
else:
# TODO: May need to modify ce-preservation
log.warn('unsupported-action', action=act)
self._status_message = 'Unsupported action.type={}'.format(act.type)
return False
return True
@staticmethod
def drop_missing_flows(device_id, valid_flow_ids):
flow_table = _existing_flow_entries.get(device_id, None)
if flow_table is not None:
flows_to_drop = [flow for flow_id, flow in flow_table.items() if flow_id not in valid_flow_ids]
for flow in flows_to_drop:
try:
yield flow.remove()
except Exception as e:
log.exception('stale-flow', flow=flow, e=e)
@inlineCallbacks
def remove(self):
"""
Remove this flow entry from the list of existing entries and drop EVC
if needed
"""
# Remove from exiting table list
device_id = self._handler.device_id
flow_id = self._flow.id
flow_table = _existing_flow_entries.get(device_id, None)
if flow_table is not None and flow_id in flow_table:
del flow_table[flow_id]
if len(flow_table) == 0:
del _existing_flow_entries[device_id]
# Remove flow from the hardware
evc_map, self.evc_map = self.evc_map, None
evc, self.evc = self.evc, None
if evc_map is not None:
yield evc_map.delete()
if evc is not None:
yield evc.delete()
self._flow = None
self._handler = None
returnValue('done')
######################################################
# Bulk operations
@staticmethod
def enable_all():
# TODO: May want to be device specific or regex based
raise NotImplemented("TODO: Implement this")
@staticmethod
def disable_all():
# TODO: May want to be device specific or regex based
raise NotImplemented("TODO: Implement this")
@staticmethod
def remove_all():
"""
Remove all matching EVCs and associated EVC MAPs from hardware
:param regex_: (String) Regular expression for name matching
"""
raise NotImplemented("TODO: Implement this")