PONSIM: PON simulator with real dataplane handling
This was needed because neither CPQD nor OVS can handle
both zero-tagged packets and 802.1ad (QinQ).
- extensive unittest proves ponsim functional correctness
(for the common use-cases needed in the PON scenario)
- integrated with frameio and coupled with a rather
simple gRPC NBI, ponsim can be operated from Voltha
just like a real PON system
- posim_olt/_onu adapters added to Voltha to work on
ponsim
- CLI can be used to preprovision and activate a PONSIM
instance (e.g., preprovision_olt -t ponsim_olt -H localhost:50060)
- Some of olt-oftest:olt-complex testcases can be run on
the ponsim device (in vagrant/Ubuntu environment),
but there are some remaining issues to work out:
- barrier calls in OF do not guaranty that the flow
is already installed on the device. This is a generic
issue, not just for ponsim.
- the whole test framework is inconsistent about zero-
tagged vs. untagged frames at the ONUs, while ponsim
is rather pedantica and does exactly what was defined
in the flows.
Change-Id: I0dd564c932416ae1566935492134cb5b08113bdc
diff --git a/ponsim/__init__.py b/ponsim/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/ponsim/__init__.py
diff --git a/ponsim/grpc_server.py b/ponsim/grpc_server.py
new file mode 100644
index 0000000..20b5dab
--- /dev/null
+++ b/ponsim/grpc_server.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import grpc
+import structlog
+from concurrent import futures
+
+from common.utils.grpc_utils import twisted_async
+from voltha.protos import third_party
+from voltha.protos.ponsim_pb2 import PonSimServicer, \
+ add_PonSimServicer_to_server, PonSimDeviceInfo
+from google.protobuf.empty_pb2 import Empty
+
+_ = third_party
+
+log = structlog.get_logger()
+
+
+class FlowUpdateHandler(PonSimServicer):
+
+ def __init__(self, thread_pool, ponsim):
+ self.thread_pool = thread_pool
+ self.ponsim = ponsim
+
+ @twisted_async
+ def GetDeviceInfo(self, request, context):
+ log.info('get-device-info')
+ ports = self.ponsim.get_ports()
+ return PonSimDeviceInfo(
+ nni_port=ports[0],
+ uni_ports=ports[1:]
+ )
+
+ @twisted_async
+ def UpdateFlowTable(self, request, context):
+ log.info('flow-table-update', request=request, port=request.port)
+ if request.port == 0:
+ # by convention this is the olt port
+ self.ponsim.olt_install_flows(request.flows)
+ else:
+ self.ponsim.onu_install_flows(request.port, request.flows)
+ return Empty()
+
+class GrpcServer(object):
+
+ def __init__(self, port, ponsim):
+ self.port = port
+ self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
+ self.server = grpc.server(self.thread_pool)
+ self.ponsim = ponsim
+
+ def start(self):
+ log.debug('starting')
+ handler = FlowUpdateHandler(self.thread_pool, self.ponsim)
+ add_PonSimServicer_to_server(handler, self.server)
+ self.server.add_insecure_port('[::]:%s' % self.port)
+ self.server.start()
+ log.info('started')
+
+ def stop(self, grace=0):
+ log.debug('stopping')
+ self.server.stop(grace)
+ log.info('stopped')
diff --git a/ponsim/main.py b/ponsim/main.py
new file mode 100755
index 0000000..f3d0ffd
--- /dev/null
+++ b/ponsim/main.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PON Simulator process, able to move packets across NNI and UNIs, as well
+as take MGMT calls via gRPC.
+It can only work on Linux.
+"""
+import argparse
+import os
+
+import yaml
+from twisted.internet.defer import inlineCallbacks
+
+from common.structlog_setup import setup_logging
+from grpc_server import GrpcServer
+from ponsim import PonSim
+from realio import RealIo
+
+defs = dict(
+ config=os.environ.get('CONFIG', './ponsim.yml'),
+ grpc_port=int(os.environ.get('GRPC_PORT', 50060)),
+ name=os.environ.get('NAME', 'pon1'),
+ onus=int(os.environ.get("ONUS", 1))
+)
+
+
+def load_config(args):
+ path = args.config
+ if path.startswith('.'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+ path = os.path.abspath(path)
+ with open(path) as fd:
+ config = yaml.load(fd)
+ return config
+
+
+banner = r'''
+ ____ __ __ _ ____ __ _ _
+( _ \ / \ ( ( \/ ___)( )( \/ )
+ ) __/( O )/ /\___ \ )( / \/ \
+(__) \__/ \_)__)(____/(__)\_)(_/
+'''
+
+def print_banner(log):
+ for line in banner.strip('\n').splitlines():
+ log.info(line)
+ log.info('(to stop: press Ctrl-C)')
+
+
+def parse_args():
+
+ parser = argparse.ArgumentParser()
+
+ _help = ('Path to chameleon.yml config file (default: %s). '
+ 'If relative, it is relative to main.py of chameleon.'
+ % defs['config'])
+ parser.add_argument('-c', '--config',
+ dest='config',
+ action='store',
+ default=defs['config'],
+ help=_help)
+
+ _help = ('port number of the GRPC service exposed by voltha (default: %s)'
+ % defs['grpc_port'])
+ parser.add_argument('-g', '--grpc-port',
+ dest='grpc_port',
+ action='store',
+ default=defs['grpc_port'],
+ help=_help)
+
+ _help = ('number of ONUs to simulate (default: %d)' % defs['onus'])
+ parser.add_argument('-o', '--onus',
+ dest='onus',
+ action='store',
+ type=int,
+ default=defs['onus'],
+ help=_help)
+
+ _help = ('name of the PON natework used as a prefix for all network'
+ ' resource created on behalf of the PON (default: %s)' %
+ defs['name'])
+ parser.add_argument('-N', '--name',
+ dest='name',
+ action='store',
+ default=defs['name'],
+ help=_help)
+
+ _help = "suppress debug and info logs"
+ parser.add_argument('-q', '--quiet',
+ dest='quiet',
+ action='count',
+ help=_help)
+
+ _help = 'enable verbose logging'
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help=_help)
+
+ _help = 'omit startup banner log lines'
+ parser.add_argument('-n', '--no-banner',
+ dest='no_banner',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ args = parser.parse_args()
+
+ return args
+
+
+class Main(object):
+
+ def __init__(self):
+
+ self.args = args = parse_args()
+ self.config = load_config(args)
+
+ verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+ self.log = setup_logging(self.config.get('logging', {}),
+ args.name,
+ verbosity_adjust=verbosity_adjust)
+
+ # components
+ self.io = None
+ self.ponsim = None
+ self.grpc_server = None
+
+ if not args.no_banner:
+ print_banner(self.log)
+
+ self.startup_components()
+
+ def start(self):
+ self.start_reactor() # will not return except Keyboard interrupt
+
+ @inlineCallbacks
+ def startup_components(self):
+ try:
+ self.log.info('starting-internal-components')
+
+ iface_map = self.setup_networking_assets(self.args.name,
+ self.args.onus)
+ self.io = yield RealIo(iface_map).start()
+ self.ponsim = PonSim(self.args.onus, self.io.egress)
+ self.io.register_ponsim(self.ponsim)
+
+ self.grpc_server = GrpcServer(self.args.grpc_port, self.ponsim)
+ yield self.grpc_server.start()
+
+ self.log.info('started-internal-services')
+
+ except Exception, e:
+ self.log.exception('startup-failed', e=e)
+
+ @inlineCallbacks
+ def shutdown_components(self):
+ """Execute before the reactor is shut down"""
+ self.log.info('exiting-on-keyboard-interrupt')
+ if self.io is not None:
+ self.io.stop()
+ self.teardown_networking_assets(self.args.name, self.args.onus)
+ if self.grpc_server is not None:
+ yield self.grpc_server.stop()
+
+ def start_reactor(self):
+ from twisted.internet import reactor
+ reactor.callWhenRunning(
+ lambda: self.log.info('twisted-reactor-started'))
+ reactor.addSystemEventTrigger('before', 'shutdown',
+ self.shutdown_components)
+ reactor.run()
+
+ def setup_networking_assets(self, prefix, n_unis):
+ # setup veth pairs for NNI and each UNI, using prefix and port numbers
+ port_map = dict()
+ for portnum in [0] + range(128, 128 + n_unis):
+ external_name = '%s_%d' % (prefix, portnum)
+ internal_name = external_name + 'sim'
+ os.system('sudo ip link add dev {} type veth peer name {}'.format(
+ external_name, internal_name
+ ))
+ os.system('sudo ip link set {} up'.format(external_name))
+ os.system('sudo ip link set {} up'.format(internal_name))
+ if portnum == 0:
+ os.system('sudo brctl addif ponmgmt {}'.format(external_name))
+ port_map[portnum] = internal_name
+ return port_map
+
+ def teardown_networking_assets(self, prefix, n_unis):
+ # undo all the networking stuff
+ for portnum in [0] + range(128, 128 + n_unis):
+ external_name = '%s_%d' % (prefix, portnum)
+ os.system('sudo ip link del {}'.format(external_name))
+
+
+if __name__ == '__main__':
+ Main().start()
diff --git a/ponsim/ponsim.py b/ponsim/ponsim.py
new file mode 100644
index 0000000..c855e39
--- /dev/null
+++ b/ponsim/ponsim.py
@@ -0,0 +1,236 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple PON Simulator which would not be needed if openvswitch could do
+802.1ad (QinQ), which it cannot (the reason is beyond me), or if CPQD could
+handle 0-tagged packets (no comment).
+"""
+import structlog
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Packet
+
+from common.frameio.frameio import hexify
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+def ipv4int2str(ipv4int):
+ return '{}.{}.{}.{}'.format(
+ (ipv4int >> 24) & 0xff,
+ (ipv4int >> 16) & 0xff,
+ (ipv4int >> 8) & 0xff,
+ ipv4int & 0xff
+ )
+
+
+class SimDevice(object):
+
+ def __init__(self, name, logical_port_no):
+ self.name = name
+ self.logical_port_no = logical_port_no
+ self.links = dict()
+ self.flows = list()
+ self.log = structlog.get_logger(name=name,
+ logical_port_no=logical_port_no)
+
+ def link(self, port, egress_fun):
+ self.links.setdefault(port, []).append(egress_fun)
+
+ def ingress(self, port, frame):
+ self.log.debug('ingress', ingress_port=port)
+ outcome = self.process_frame(port, frame)
+ if outcome is not None:
+ egress_port, egress_frame = outcome
+ forwarded = 0
+ links = self.links.get(egress_port)
+ if links is not None:
+ for fun in links:
+ forwarded += 1
+ self.log.debug('forwarding', egress_port=egress_port)
+ fun(egress_port, egress_frame)
+ if not forwarded:
+ self.log.debug('no-one-to-forward-to', egress_port=egress_port)
+ else:
+ self.log.debug('dropped')
+
+ def install_flows(self, flows):
+ # store flows in precedence order so we can roll down on frame arrival
+ self.flows = sorted(flows, key=lambda fm: fm.priority, reverse=True)
+
+ def process_frame(self, ingress_port, ingress_frame):
+ for flow in self.flows:
+ if self.is_match(flow, ingress_port, ingress_frame):
+ egress_port, egress_frame = self.process_actions(
+ flow, ingress_frame)
+ return egress_port, egress_frame
+ return None
+
+ @staticmethod
+ def is_match(flow, ingress_port, frame):
+
+ def get_non_shim_ether_type(f):
+ if f.haslayer(Dot1Q):
+ f = f.getlayer(Dot1Q)
+ return f.type
+
+ def get_vlan_pcp(f):
+ if f.haslayer(Dot1Q):
+ return f.getlayer(Dot1Q).prio
+
+ def get_ip_proto(f):
+ if f.haslayer(IP):
+ return f.getlayer(IP).proto
+
+ def get_ipv4_dst(f):
+ if f.haslayer(IP):
+ return f.getlayer(IP).dst
+
+ def get_udp_dst(f):
+ if f.haslayer(UDP):
+ return f.getlayer(UDP).dport
+
+ for field in get_ofb_fields(flow):
+
+ if field.type == IN_PORT:
+ if field.port != ingress_port:
+ return False
+
+ elif field.type == ETH_TYPE:
+ if field.eth_type != get_non_shim_ether_type(frame):
+ return False
+
+ elif field.type == IP_PROTO:
+ if field.ip_proto != get_ip_proto(frame):
+ return False
+
+ elif field.type == VLAN_VID:
+ expected_vlan = field.vlan_vid
+ tagged = frame.haslayer(Dot1Q)
+ if bool(expected_vlan & 4096) != bool(tagged):
+ return False
+ if tagged:
+ actual_vid = frame.getlayer(Dot1Q).vlan
+ if actual_vid != expected_vlan & 4095:
+ return False
+
+ elif field.type == VLAN_PCP:
+ if field.vlan_pcp != get_vlan_pcp(frame):
+ return False
+
+ elif field.type == IPV4_DST:
+ if ipv4int2str(field.ipv4_dst) != get_ipv4_dst(frame):
+ return False
+
+ elif field.type == UDP_DST:
+ if field.udsp_dst != get_udp_dst(frame):
+ return False
+
+ else:
+ raise NotImplementedError('field.type=%d' % field.type)
+
+ return True
+
+ @staticmethod
+ def process_actions(flow, frame):
+ egress_port = None
+ for action in get_actions(flow):
+
+ if action.type == OUTPUT:
+ egress_port = action.output.port
+
+ elif action.type == POP_VLAN:
+ if frame.haslayer(Dot1Q):
+ shim = frame.getlayer(Dot1Q)
+ frame = Ether(
+ src=frame.src,
+ dst=frame.dst,
+ type=shim.type) / shim.payload
+
+ elif action.type == PUSH_VLAN:
+ frame = (
+ Ether(src=frame.src, dst=frame.dst,
+ type=action.push.ethertype) /
+ Dot1Q(type=frame.type) /
+ frame.payload
+ )
+
+ elif action.type == SET_FIELD:
+ assert (action.set_field.field.oxm_class ==
+ ofp.OFPXMC_OPENFLOW_BASIC)
+ field = action.set_field.field.ofb_field
+
+ if field.type == VLAN_VID:
+ shim = frame.getlayer(Dot1Q)
+ shim.vlan = field.vlan_vid & 4095
+
+ elif field.type == VLAN_PCP:
+ shim = frame.getlayer(Dot1Q)
+ shim.prio = field.vlan_pcp
+
+ else:
+ raise NotImplementedError('set_field.field.type=%d'
+ % field.type)
+
+ else:
+ raise NotImplementedError('action.type=%d' % action.type)
+
+ return egress_port, frame
+
+
+class PonSim(object):
+
+ def __init__(self, onus, egress_fun):
+ self.egress_fun = egress_fun
+
+ # Create OLT and hook NNI port up for egress
+ self.olt = SimDevice('olt', 0)
+ self.olt.link(2, lambda _, frame: self.egress_fun(0, frame))
+ self.devices = dict()
+ self.devices[0] = self.olt
+
+ # Create ONUs of the requested number and hook them up with OLT
+ # and with egress fun
+ def mk_egress_fun(port_no):
+ return lambda _, frame: self.egress_fun(port_no, frame)
+
+ def mk_onu_ingress(onu):
+ return lambda _, frame: onu.ingress(1, frame)
+
+ for i in range(onus):
+ port_no = 128 + i
+ onu = SimDevice('onu%d' % i, port_no)
+ onu.link(1, lambda _, frame: self.olt.ingress(1, frame))
+ onu.link(2, mk_egress_fun(port_no))
+ self.olt.link(1, mk_onu_ingress(onu))
+ self.devices[port_no] = onu
+
+ def get_ports(self):
+ return sorted(self.devices.keys())
+
+ def olt_install_flows(self, flows):
+ self.olt.install_flows(flows)
+
+ def onu_install_flows(self, onu_port, flows):
+ self.devices[onu_port].install_flows(flows)
+
+ def ingress(self, port, frame):
+ if not isinstance(frame, Packet):
+ frame = Ether(frame)
+ self.devices[port].ingress(2, frame)
+
diff --git a/ponsim/ponsim.yml b/ponsim/ponsim.yml
new file mode 100644
index 0000000..0f4ad04
--- /dev/null
+++ b/ponsim/ponsim.yml
@@ -0,0 +1,31 @@
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: '%(message)s'
+ default:
+ format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(module)s.%(funcName)s %(message)s'
+ datefmt: '%Y%m%dT%H%M%S'
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ level: DEBUG
+ formatter: default
+ stream: ext://sys.stdout
+ null:
+ class: logging.NullHandler
+
+ loggers:
+ amqp:
+ handlers: [null]
+ propagate: False
+ conf:
+ handlers: [null]
+ propagate: False
+ '': # root logger
+ handlers: [console]
+ level: INFO # this can be bumped up/down by -q and -v command line
+ # options
+ propagate: False
diff --git a/ponsim/realio.py b/ponsim/realio.py
new file mode 100644
index 0000000..bca3382
--- /dev/null
+++ b/ponsim/realio.py
@@ -0,0 +1,68 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from common.frameio.frameio import FrameIOManager, hexify
+
+log = structlog.get_logger()
+
+
+class RealIo(object):
+
+ def __init__(self, iface_map):
+ self.port_to_iface_name = iface_map
+ self.iface_name_to_port = dict((n, p) for p, n in iface_map.items())
+ self.frame_io = FrameIOManager()
+ self.ponsim = None
+ self.io_ports = dict()
+
+ @inlineCallbacks
+ def start(self):
+ log.debug('starting')
+ yield self.frame_io.start()
+ for port, iface_name in self.port_to_iface_name.items():
+ io_port = self.frame_io.add_interface(iface_name, self.ingress)
+ self.io_ports[port] = io_port
+ log.info('started')
+ returnValue(self)
+
+ def stop(self):
+ log.debug('stopping')
+ for port in self.io_ports.values():
+ self.frame_io.del_interface(port.iface_name)
+ self.frame_io.stop()
+ log.info('stopped')
+
+ def register_ponsim(self, ponsim):
+ self.ponsim = ponsim
+
+ def ingress(self, io_port, frame):
+ port = self.iface_name_to_port.get(io_port.iface_name)
+ log.debug('ingress', port=port, iface_name=io_port.iface_name,
+ frame=hexify(frame))
+ decoded_frame = Ether(frame)
+ if self.ponsim is not None:
+ self.ponsim.ingress(port, decoded_frame)
+
+ def egress(self, port, frame):
+ if isinstance(frame, Packet):
+ frame = str(frame)
+ io_port = self.io_ports[port]
+ log.debug('sending', port=port, frame=hexify(frame))
+ io_port.send(frame)
diff --git a/ponsim/test_ponsim.py b/ponsim/test_ponsim.py
new file mode 100644
index 0000000..365468b
--- /dev/null
+++ b/ponsim/test_ponsim.py
@@ -0,0 +1,372 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q, EAPOL
+
+from ponsim import PonSim
+from voltha.extensions.IGMP import IGMP_TYPE_V3_MEMBERSHIP_REPORT, IGMPv3gr, \
+ IGMPv3, IGMP_TYPE_MEMBERSHIP_QUERY
+from voltha.extensions.IGMP import IGMP_V3_GR_TYPE_EXCLUDE
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+class TestPonSim(TestCase):
+
+ def setUp(self):
+ self.output = []
+ self.pon = PonSim(onus=2, egress_fun=lambda port, frame:
+ self.output.append((port, frame)))
+
+ def reset_output(self):
+ while self.output:
+ self.output.pop()
+
+ def ingress_frame(self, frame, ports=None):
+ if ports is None:
+ ports = self.pon.get_ports()
+ if isinstance(ports, int):
+ ports = [ports]
+ for port in ports:
+ self.pon.ingress(port, frame)
+
+ def assert_dont_pass(self, frame, ports=None):
+ self.reset_output()
+ self.ingress_frame(frame, ports)
+ self.assertEqual(self.output, [])
+
+ def assert_untagged_frames_dont_pass(self, ports=None):
+ self.assert_dont_pass(Ether(), ports=ports)
+
+ def test_basics(self):
+ self.assertEqual(self.pon.get_ports(), [0, 128, 129])
+
+ def test_by_default_no_traffic_passes(self):
+ self.assert_untagged_frames_dont_pass()
+
+ def test_downstream_unicast_forwarding(self):
+
+ self.pon.olt_install_flows([
+ mk_flow_stat(
+ match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+ actions=[pop_vlan(), output(1)]
+ )
+ ])
+ self.pon.onu_install_flows(128, [
+ mk_flow_stat(
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+ )
+ ])
+
+ # untagged frames shall not get through
+ self.assert_untagged_frames_dont_pass()
+
+ # incorrect single- or double-tagged frames don't pass
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=128) / Dot1Q(vlan=1000) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP())
+
+ # properly tagged downstream frame gets through and pops up at port 128
+ # as untagged
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame)])
+
+ def test_upstream_unicast_forwarding(self):
+
+ self.pon.onu_install_flows(128, [
+ mk_flow_stat(
+ match_fields=[in_port(2), vlan_vid(4096 + 0)],
+ actions=[set_field(vlan_vid(4096 + 128)), output(1)]
+ )
+ ])
+ self.pon.olt_install_flows([
+ mk_flow_stat(
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+ output(2)]
+ )
+ ])
+
+ # untagged frames shall not get through
+ self.assert_untagged_frames_dont_pass()
+
+ # incorrect single- or double-tagged frames don't pass
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=129) / Dot1Q(vlan=1000) / IP())
+
+ # properly tagged downstream frame gets through and pops up at port 128
+ # as untagged
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(0, out_frame)])
+
+
+ def setup_all_flows(self):
+
+ self.pon.olt_install_flows([
+ mk_flow_stat(
+ priority=2000,
+ match_fields=[in_port(2), vlan_vid(4096 + 4000), vlan_pcp(0)],
+ actions=[pop_vlan(), output(1)]
+ ),
+ mk_flow_stat(
+ priority=2000,
+ match_fields=[in_port(1), eth_type(0x888e)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[in_port(1), eth_type(0x800), ip_proto(2)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[in_port(1), eth_type(0x800), ip_proto(17),
+ udp_dst(67)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[in_port(2), vlan_vid(4096 + 140)],
+ actions=[pop_vlan(), output(1)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+ actions=[pop_vlan(), output(1)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[
+ push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 129)],
+ actions=[
+ push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+ output(2)]
+ ),
+ ])
+
+ self.pon.onu_install_flows(128, [
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(4096 + 0)],
+ actions=[
+ set_field(vlan_vid(4096 + 128)), output(1)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010102)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(0)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 128)),
+ output(1)]
+ )
+ ])
+
+ self.pon.onu_install_flows(129, [
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(4096 + 0)],
+ actions=[
+ set_field(vlan_vid(4096 + 129)), output(1)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010103)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 129)],
+ actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(0)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 129)),
+ output(1)]
+ )
+ ])
+
+ def test_combo_block_untagged_downstream(self):
+ self.setup_all_flows()
+ self.assert_untagged_frames_dont_pass(ports=0)
+
+ def test_eapol_in(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / EAPOL(type=1)
+ out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+ out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+ def test_eapol_out(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+ in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / EAPOL(type=1)
+ self.ingress_frame(in_frame1)
+ self.ingress_frame(in_frame2)
+ self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+ def test_igmp_in(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+ gaddr="224.0.0.1")
+ mr.grps = [
+ IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="228.1.1.3")]
+
+ in_frame = Ether(**kw) / IP() / mr
+ out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+ in_frame.payload.copy()
+ out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+ in_frame.payload.copy()
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+ def test_igmp_out(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120)
+ in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+ IP() / mq.copy()
+ in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+ IP() / mq.copy()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP() / mq.copy()
+ self.ingress_frame(in_frame1)
+ self.ingress_frame(in_frame2)
+ self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+ def test_combo_downstream_unicast_onu1(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.reset_output()
+ self.ingress_frame(in_frame, ports=0)
+ self.assertEqual(self.output, [(128, out_frame)])
+
+ def test_combo_downstream_unicast_onu2(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.reset_output()
+ self.ingress_frame(in_frame, ports=0)
+ self.assertEqual(self.output, [(129, out_frame)])
+
+ def test_combo_upstream_unicast_onu1(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame)])
+
+ def test_combo_upstream_unicast_onu2(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(129, out_frame)])
+
+ def test_combo_multicast_stream1(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.1')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [])
+
+ def test_combo_multicast_stream2(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.2')
+ out_frame = Ether(**kw) / IP(dst='228.1.1.2')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame),])
+
+ def test_combo_multicast_stream3(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.3')
+ out_frame = Ether(**kw) / IP(dst='228.1.1.3')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(129, out_frame),])
+
+ def test_combo_multicast_stream4(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.4')
+ out_frame = Ether(**kw) / IP(dst='228.1.1.4')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+
+if __name__ == '__main__':
+ main()