PONSIM: PON simulator with real dataplane handling

This was needed because neither CPQD nor OVS can handle
both zero-tagged packets and 802.1ad (QinQ).

- extensive unittest proves ponsim functional correctness
  (for the common use-cases needed in the PON scenario)
- integrated with frameio and coupled with a rather
  simple gRPC NBI, ponsim can be operated from Voltha
  just like a real PON system
- posim_olt/_onu adapters added to Voltha to work on
  ponsim
- CLI can be used to preprovision and activate a PONSIM
  instance (e.g., preprovision_olt -t ponsim_olt -H localhost:50060)
- Some of olt-oftest:olt-complex testcases can be run on
  the ponsim device (in vagrant/Ubuntu environment),
  but there are some remaining issues to work out:
  - barrier calls in OF do not guaranty that the flow
    is already installed on the device. This is a generic
    issue, not just for ponsim.
  - the whole test framework is inconsistent about zero-
    tagged vs. untagged frames at the ONUs, while ponsim
    is rather pedantica and does exactly what was defined
    in the flows.

Change-Id: I0dd564c932416ae1566935492134cb5b08113bdc
diff --git a/ponsim/__init__.py b/ponsim/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/ponsim/__init__.py
diff --git a/ponsim/grpc_server.py b/ponsim/grpc_server.py
new file mode 100644
index 0000000..20b5dab
--- /dev/null
+++ b/ponsim/grpc_server.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import grpc
+import structlog
+from concurrent import futures
+
+from common.utils.grpc_utils import twisted_async
+from voltha.protos import third_party
+from voltha.protos.ponsim_pb2 import PonSimServicer, \
+    add_PonSimServicer_to_server, PonSimDeviceInfo
+from google.protobuf.empty_pb2 import Empty
+
+_ = third_party
+
+log = structlog.get_logger()
+
+
+class FlowUpdateHandler(PonSimServicer):
+
+    def __init__(self, thread_pool, ponsim):
+        self.thread_pool = thread_pool
+        self.ponsim = ponsim
+
+    @twisted_async
+    def GetDeviceInfo(self, request, context):
+        log.info('get-device-info')
+        ports = self.ponsim.get_ports()
+        return PonSimDeviceInfo(
+            nni_port=ports[0],
+            uni_ports=ports[1:]
+        )
+
+    @twisted_async
+    def UpdateFlowTable(self, request, context):
+        log.info('flow-table-update', request=request, port=request.port)
+        if request.port == 0:
+            # by convention this is the olt port
+            self.ponsim.olt_install_flows(request.flows)
+        else:
+            self.ponsim.onu_install_flows(request.port, request.flows)
+        return Empty()
+
+class GrpcServer(object):
+
+    def __init__(self, port, ponsim):
+        self.port = port
+        self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
+        self.server = grpc.server(self.thread_pool)
+        self.ponsim = ponsim
+
+    def start(self):
+        log.debug('starting')
+        handler = FlowUpdateHandler(self.thread_pool, self.ponsim)
+        add_PonSimServicer_to_server(handler, self.server)
+        self.server.add_insecure_port('[::]:%s' % self.port)
+        self.server.start()
+        log.info('started')
+
+    def stop(self, grace=0):
+        log.debug('stopping')
+        self.server.stop(grace)
+        log.info('stopped')
diff --git a/ponsim/main.py b/ponsim/main.py
new file mode 100755
index 0000000..f3d0ffd
--- /dev/null
+++ b/ponsim/main.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PON Simulator process, able to move packets across NNI and UNIs, as well
+as take MGMT calls via gRPC.
+It can only work on Linux.
+"""
+import argparse
+import os
+
+import yaml
+from twisted.internet.defer import inlineCallbacks
+
+from common.structlog_setup import setup_logging
+from grpc_server import GrpcServer
+from ponsim import PonSim
+from realio import RealIo
+
+defs = dict(
+    config=os.environ.get('CONFIG', './ponsim.yml'),
+    grpc_port=int(os.environ.get('GRPC_PORT', 50060)),
+    name=os.environ.get('NAME', 'pon1'),
+    onus=int(os.environ.get("ONUS", 1))
+)
+
+
+def load_config(args):
+    path = args.config
+    if path.startswith('.'):
+        dir = os.path.dirname(os.path.abspath(__file__))
+        path = os.path.join(dir, path)
+    path = os.path.abspath(path)
+    with open(path) as fd:
+        config = yaml.load(fd)
+    return config
+
+
+banner = r'''
+ ____   __   __ _  ____  __  _  _
+(  _ \ /  \ (  ( \/ ___)(  )( \/ )
+ ) __/(  O )/    /\___ \ )( / \/ \
+(__)   \__/ \_)__)(____/(__)\_)(_/
+'''
+
+def print_banner(log):
+    for line in banner.strip('\n').splitlines():
+        log.info(line)
+    log.info('(to stop: press Ctrl-C)')
+
+
+def parse_args():
+
+    parser = argparse.ArgumentParser()
+
+    _help = ('Path to chameleon.yml config file (default: %s). '
+             'If relative, it is relative to main.py of chameleon.'
+             % defs['config'])
+    parser.add_argument('-c', '--config',
+                        dest='config',
+                        action='store',
+                        default=defs['config'],
+                        help=_help)
+
+    _help = ('port number of the GRPC service exposed by voltha (default: %s)'
+             % defs['grpc_port'])
+    parser.add_argument('-g', '--grpc-port',
+                        dest='grpc_port',
+                        action='store',
+                        default=defs['grpc_port'],
+                        help=_help)
+
+    _help = ('number of ONUs to simulate (default: %d)' % defs['onus'])
+    parser.add_argument('-o', '--onus',
+                        dest='onus',
+                        action='store',
+                        type=int,
+                        default=defs['onus'],
+                        help=_help)
+
+    _help = ('name of the PON natework used as a prefix for all network'
+             ' resource created on behalf of the PON (default: %s)' %
+             defs['name'])
+    parser.add_argument('-N', '--name',
+                        dest='name',
+                        action='store',
+                        default=defs['name'],
+                        help=_help)
+
+    _help = "suppress debug and info logs"
+    parser.add_argument('-q', '--quiet',
+                        dest='quiet',
+                        action='count',
+                        help=_help)
+
+    _help = 'enable verbose logging'
+    parser.add_argument('-v', '--verbose',
+                        dest='verbose',
+                        action='count',
+                        help=_help)
+
+    _help = 'omit startup banner log lines'
+    parser.add_argument('-n', '--no-banner',
+                        dest='no_banner',
+                        action='store_true',
+                        default=False,
+                        help=_help)
+
+    args = parser.parse_args()
+
+    return args
+
+
+class Main(object):
+
+    def __init__(self):
+
+        self.args = args = parse_args()
+        self.config = load_config(args)
+
+        verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+        self.log = setup_logging(self.config.get('logging', {}),
+                                 args.name,
+                                 verbosity_adjust=verbosity_adjust)
+
+        # components
+        self.io = None
+        self.ponsim = None
+        self.grpc_server = None
+
+        if not args.no_banner:
+            print_banner(self.log)
+
+        self.startup_components()
+
+    def start(self):
+        self.start_reactor()  # will not return except Keyboard interrupt
+
+    @inlineCallbacks
+    def startup_components(self):
+        try:
+            self.log.info('starting-internal-components')
+
+            iface_map = self.setup_networking_assets(self.args.name,
+                                                     self.args.onus)
+            self.io = yield RealIo(iface_map).start()
+            self.ponsim = PonSim(self.args.onus, self.io.egress)
+            self.io.register_ponsim(self.ponsim)
+
+            self.grpc_server = GrpcServer(self.args.grpc_port, self.ponsim)
+            yield self.grpc_server.start()
+
+            self.log.info('started-internal-services')
+
+        except Exception, e:
+            self.log.exception('startup-failed', e=e)
+
+    @inlineCallbacks
+    def shutdown_components(self):
+        """Execute before the reactor is shut down"""
+        self.log.info('exiting-on-keyboard-interrupt')
+        if self.io is not None:
+            self.io.stop()
+        self.teardown_networking_assets(self.args.name, self.args.onus)
+        if self.grpc_server is not None:
+            yield self.grpc_server.stop()
+
+    def start_reactor(self):
+        from twisted.internet import reactor
+        reactor.callWhenRunning(
+            lambda: self.log.info('twisted-reactor-started'))
+        reactor.addSystemEventTrigger('before', 'shutdown',
+                                      self.shutdown_components)
+        reactor.run()
+
+    def setup_networking_assets(self, prefix, n_unis):
+        # setup veth pairs for NNI and each UNI, using prefix and port numbers
+        port_map = dict()
+        for portnum in [0] + range(128, 128 + n_unis):
+            external_name = '%s_%d' % (prefix, portnum)
+            internal_name = external_name + 'sim'
+            os.system('sudo ip link add dev {} type veth peer name {}'.format(
+                external_name, internal_name
+            ))
+            os.system('sudo ip link set {} up'.format(external_name))
+            os.system('sudo ip link set {} up'.format(internal_name))
+            if portnum == 0:
+                os.system('sudo brctl addif ponmgmt {}'.format(external_name))
+            port_map[portnum] = internal_name
+        return port_map
+
+    def teardown_networking_assets(self, prefix, n_unis):
+        # undo all the networking stuff
+        for portnum in [0] + range(128, 128 + n_unis):
+            external_name = '%s_%d' % (prefix, portnum)
+            os.system('sudo ip link del {}'.format(external_name))
+
+
+if __name__ == '__main__':
+    Main().start()
diff --git a/ponsim/ponsim.py b/ponsim/ponsim.py
new file mode 100644
index 0000000..c855e39
--- /dev/null
+++ b/ponsim/ponsim.py
@@ -0,0 +1,236 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple PON Simulator which would not be needed if openvswitch could do
+802.1ad (QinQ), which it cannot (the reason is beyond me), or if CPQD could
+handle 0-tagged packets (no comment).
+"""
+import structlog
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Packet
+
+from common.frameio.frameio import hexify
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+def ipv4int2str(ipv4int):
+    return '{}.{}.{}.{}'.format(
+        (ipv4int >> 24) & 0xff,
+        (ipv4int >> 16) & 0xff,
+        (ipv4int >> 8) & 0xff,
+        ipv4int & 0xff
+    )
+
+
+class SimDevice(object):
+
+    def __init__(self, name, logical_port_no):
+        self.name = name
+        self.logical_port_no = logical_port_no
+        self.links = dict()
+        self.flows = list()
+        self.log = structlog.get_logger(name=name,
+                                        logical_port_no=logical_port_no)
+
+    def link(self, port, egress_fun):
+        self.links.setdefault(port, []).append(egress_fun)
+
+    def ingress(self, port, frame):
+        self.log.debug('ingress', ingress_port=port)
+        outcome = self.process_frame(port, frame)
+        if outcome is not None:
+            egress_port, egress_frame = outcome
+            forwarded = 0
+            links = self.links.get(egress_port)
+            if links is not None:
+                for fun in links:
+                    forwarded += 1
+                    self.log.debug('forwarding', egress_port=egress_port)
+                    fun(egress_port, egress_frame)
+            if not forwarded:
+                self.log.debug('no-one-to-forward-to', egress_port=egress_port)
+        else:
+            self.log.debug('dropped')
+
+    def install_flows(self, flows):
+        # store flows in precedence order so we can roll down on frame arrival
+        self.flows = sorted(flows, key=lambda fm: fm.priority, reverse=True)
+
+    def process_frame(self, ingress_port, ingress_frame):
+        for flow in self.flows:
+            if self.is_match(flow, ingress_port, ingress_frame):
+                egress_port, egress_frame = self.process_actions(
+                    flow, ingress_frame)
+                return egress_port, egress_frame
+        return None
+
+    @staticmethod
+    def is_match(flow, ingress_port, frame):
+
+        def get_non_shim_ether_type(f):
+            if f.haslayer(Dot1Q):
+                f = f.getlayer(Dot1Q)
+            return f.type
+
+        def get_vlan_pcp(f):
+            if f.haslayer(Dot1Q):
+                return f.getlayer(Dot1Q).prio
+
+        def get_ip_proto(f):
+            if f.haslayer(IP):
+                return f.getlayer(IP).proto
+
+        def get_ipv4_dst(f):
+            if f.haslayer(IP):
+                return f.getlayer(IP).dst
+
+        def get_udp_dst(f):
+            if f.haslayer(UDP):
+                return f.getlayer(UDP).dport
+
+        for field in get_ofb_fields(flow):
+
+            if field.type == IN_PORT:
+                if field.port != ingress_port:
+                    return False
+
+            elif field.type == ETH_TYPE:
+                if field.eth_type != get_non_shim_ether_type(frame):
+                    return False
+
+            elif field.type == IP_PROTO:
+                if field.ip_proto != get_ip_proto(frame):
+                    return False
+
+            elif field.type == VLAN_VID:
+                expected_vlan = field.vlan_vid
+                tagged = frame.haslayer(Dot1Q)
+                if bool(expected_vlan & 4096) != bool(tagged):
+                    return False
+                if tagged:
+                    actual_vid = frame.getlayer(Dot1Q).vlan
+                    if actual_vid != expected_vlan & 4095:
+                        return False
+
+            elif field.type == VLAN_PCP:
+                if field.vlan_pcp != get_vlan_pcp(frame):
+                    return False
+
+            elif field.type == IPV4_DST:
+                if ipv4int2str(field.ipv4_dst) != get_ipv4_dst(frame):
+                    return False
+
+            elif field.type == UDP_DST:
+                if field.udsp_dst != get_udp_dst(frame):
+                    return False
+
+            else:
+                raise NotImplementedError('field.type=%d' % field.type)
+
+        return True
+
+    @staticmethod
+    def process_actions(flow, frame):
+        egress_port = None
+        for action in get_actions(flow):
+
+            if action.type == OUTPUT:
+                egress_port = action.output.port
+
+            elif action.type == POP_VLAN:
+                if frame.haslayer(Dot1Q):
+                    shim = frame.getlayer(Dot1Q)
+                    frame = Ether(
+                        src=frame.src,
+                        dst=frame.dst,
+                        type=shim.type) / shim.payload
+
+            elif action.type == PUSH_VLAN:
+                frame = (
+                    Ether(src=frame.src, dst=frame.dst,
+                          type=action.push.ethertype) /
+                    Dot1Q(type=frame.type) /
+                    frame.payload
+                )
+
+            elif action.type == SET_FIELD:
+                assert (action.set_field.field.oxm_class ==
+                        ofp.OFPXMC_OPENFLOW_BASIC)
+                field = action.set_field.field.ofb_field
+
+                if field.type == VLAN_VID:
+                    shim = frame.getlayer(Dot1Q)
+                    shim.vlan = field.vlan_vid & 4095
+
+                elif field.type == VLAN_PCP:
+                    shim = frame.getlayer(Dot1Q)
+                    shim.prio = field.vlan_pcp
+
+                else:
+                    raise NotImplementedError('set_field.field.type=%d'
+                                              % field.type)
+
+            else:
+                raise NotImplementedError('action.type=%d' % action.type)
+
+        return egress_port, frame
+
+
+class PonSim(object):
+
+    def __init__(self, onus, egress_fun):
+        self.egress_fun = egress_fun
+
+        # Create OLT and hook NNI port up for egress
+        self.olt = SimDevice('olt', 0)
+        self.olt.link(2, lambda _, frame: self.egress_fun(0, frame))
+        self.devices = dict()
+        self.devices[0] = self.olt
+
+        # Create ONUs of the requested number and hook them up with OLT
+        # and with egress fun
+        def mk_egress_fun(port_no):
+            return lambda _, frame: self.egress_fun(port_no, frame)
+
+        def mk_onu_ingress(onu):
+            return lambda _, frame: onu.ingress(1, frame)
+
+        for i in range(onus):
+            port_no = 128 + i
+            onu = SimDevice('onu%d' % i, port_no)
+            onu.link(1, lambda _, frame: self.olt.ingress(1, frame))
+            onu.link(2, mk_egress_fun(port_no))
+            self.olt.link(1, mk_onu_ingress(onu))
+            self.devices[port_no] = onu
+
+    def get_ports(self):
+        return sorted(self.devices.keys())
+
+    def olt_install_flows(self, flows):
+        self.olt.install_flows(flows)
+
+    def onu_install_flows(self, onu_port, flows):
+        self.devices[onu_port].install_flows(flows)
+
+    def ingress(self, port, frame):
+        if not isinstance(frame, Packet):
+            frame = Ether(frame)
+        self.devices[port].ingress(2, frame)
+
diff --git a/ponsim/ponsim.yml b/ponsim/ponsim.yml
new file mode 100644
index 0000000..0f4ad04
--- /dev/null
+++ b/ponsim/ponsim.yml
@@ -0,0 +1,31 @@
+logging:
+    version: 1
+
+    formatters:
+      brief:
+        format: '%(message)s'
+      default:
+        format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(module)s.%(funcName)s %(message)s'
+        datefmt: '%Y%m%dT%H%M%S'
+
+    handlers:
+        console:
+            class : logging.StreamHandler
+            level: DEBUG
+            formatter: default
+            stream: ext://sys.stdout
+        null:
+            class: logging.NullHandler
+
+    loggers:
+        amqp:
+            handlers: [null]
+            propagate: False
+        conf:
+            handlers: [null]
+            propagate: False
+        '': # root logger
+            handlers: [console]
+            level: INFO # this can be bumped up/down by -q and -v command line
+                        # options
+            propagate: False
diff --git a/ponsim/realio.py b/ponsim/realio.py
new file mode 100644
index 0000000..bca3382
--- /dev/null
+++ b/ponsim/realio.py
@@ -0,0 +1,68 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from common.frameio.frameio import FrameIOManager, hexify
+
+log = structlog.get_logger()
+
+
+class RealIo(object):
+
+    def __init__(self, iface_map):
+        self.port_to_iface_name = iface_map
+        self.iface_name_to_port = dict((n, p) for p, n in iface_map.items())
+        self.frame_io = FrameIOManager()
+        self.ponsim = None
+        self.io_ports = dict()
+
+    @inlineCallbacks
+    def start(self):
+        log.debug('starting')
+        yield self.frame_io.start()
+        for port, iface_name in self.port_to_iface_name.items():
+            io_port = self.frame_io.add_interface(iface_name, self.ingress)
+            self.io_ports[port] = io_port
+        log.info('started')
+        returnValue(self)
+
+    def stop(self):
+        log.debug('stopping')
+        for port in self.io_ports.values():
+            self.frame_io.del_interface(port.iface_name)
+        self.frame_io.stop()
+        log.info('stopped')
+
+    def register_ponsim(self, ponsim):
+        self.ponsim = ponsim
+
+    def ingress(self, io_port, frame):
+        port = self.iface_name_to_port.get(io_port.iface_name)
+        log.debug('ingress', port=port, iface_name=io_port.iface_name,
+                  frame=hexify(frame))
+        decoded_frame = Ether(frame)
+        if self.ponsim is not None:
+            self.ponsim.ingress(port, decoded_frame)
+
+    def egress(self, port, frame):
+        if isinstance(frame, Packet):
+            frame = str(frame)
+        io_port = self.io_ports[port]
+        log.debug('sending', port=port, frame=hexify(frame))
+        io_port.send(frame)
diff --git a/ponsim/test_ponsim.py b/ponsim/test_ponsim.py
new file mode 100644
index 0000000..365468b
--- /dev/null
+++ b/ponsim/test_ponsim.py
@@ -0,0 +1,372 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q, EAPOL
+
+from ponsim import PonSim
+from voltha.extensions.IGMP import IGMP_TYPE_V3_MEMBERSHIP_REPORT, IGMPv3gr, \
+    IGMPv3, IGMP_TYPE_MEMBERSHIP_QUERY
+from voltha.extensions.IGMP import IGMP_V3_GR_TYPE_EXCLUDE
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+class TestPonSim(TestCase):
+
+    def setUp(self):
+        self.output = []
+        self.pon = PonSim(onus=2, egress_fun=lambda port, frame:
+            self.output.append((port, frame)))
+
+    def reset_output(self):
+        while self.output:
+            self.output.pop()
+
+    def ingress_frame(self, frame, ports=None):
+        if ports is None:
+            ports = self.pon.get_ports()
+        if isinstance(ports, int):
+            ports = [ports]
+        for port in ports:
+            self.pon.ingress(port, frame)
+
+    def assert_dont_pass(self, frame, ports=None):
+        self.reset_output()
+        self.ingress_frame(frame, ports)
+        self.assertEqual(self.output, [])
+
+    def assert_untagged_frames_dont_pass(self, ports=None):
+        self.assert_dont_pass(Ether(), ports=ports)
+
+    def test_basics(self):
+        self.assertEqual(self.pon.get_ports(), [0, 128, 129])
+
+    def test_by_default_no_traffic_passes(self):
+        self.assert_untagged_frames_dont_pass()
+
+    def test_downstream_unicast_forwarding(self):
+
+        self.pon.olt_install_flows([
+            mk_flow_stat(
+                match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+                actions=[pop_vlan(), output(1)]
+            )
+        ])
+        self.pon.onu_install_flows(128, [
+            mk_flow_stat(
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+            )
+        ])
+
+        # untagged frames shall not get through
+        self.assert_untagged_frames_dont_pass()
+
+        # incorrect single- or double-tagged frames don't pass
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=128) / Dot1Q(vlan=1000) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP())
+
+        # properly tagged downstream frame gets through and pops up at port 128
+        # as untagged
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame)])
+
+    def test_upstream_unicast_forwarding(self):
+
+        self.pon.onu_install_flows(128, [
+            mk_flow_stat(
+                match_fields=[in_port(2), vlan_vid(4096 + 0)],
+                actions=[set_field(vlan_vid(4096 + 128)), output(1)]
+            )
+        ])
+        self.pon.olt_install_flows([
+            mk_flow_stat(
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+                         output(2)]
+            )
+        ])
+
+        # untagged frames shall not get through
+        self.assert_untagged_frames_dont_pass()
+
+        # incorrect single- or double-tagged frames don't pass
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=129) / Dot1Q(vlan=1000) / IP())
+
+        # properly tagged downstream frame gets through and pops up at port 128
+        # as untagged
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(0, out_frame)])
+
+
+    def setup_all_flows(self):
+
+        self.pon.olt_install_flows([
+            mk_flow_stat(
+                priority=2000,
+                match_fields=[in_port(2), vlan_vid(4096 + 4000), vlan_pcp(0)],
+                actions=[pop_vlan(), output(1)]
+            ),
+            mk_flow_stat(
+                priority=2000,
+                match_fields=[in_port(1), eth_type(0x888e)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+                         output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[in_port(1), eth_type(0x800), ip_proto(2)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+                         output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[in_port(1), eth_type(0x800), ip_proto(17),
+                              udp_dst(67)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+                         output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[in_port(2), vlan_vid(4096 + 140)],
+                actions=[pop_vlan(), output(1)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+                actions=[pop_vlan(), output(1)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[
+                    push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+                    output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 129)],
+                actions=[
+                    push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+                    output(2)]
+            ),
+        ])
+
+        self.pon.onu_install_flows(128, [
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(4096 + 0)],
+                actions=[
+                    set_field(vlan_vid(4096 + 128)), output(1)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010102)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(0)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 128)),
+                         output(1)]
+            )
+        ])
+
+        self.pon.onu_install_flows(129, [
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(4096 + 0)],
+                actions=[
+                    set_field(vlan_vid(4096 + 129)), output(1)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010103)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 129)],
+                actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(0)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 129)),
+                         output(1)]
+            )
+        ])
+
+    def test_combo_block_untagged_downstream(self):
+        self.setup_all_flows()
+        self.assert_untagged_frames_dont_pass(ports=0)
+
+    def test_eapol_in(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / EAPOL(type=1)
+        out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+        out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+    def test_eapol_out(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+        in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / EAPOL(type=1)
+        self.ingress_frame(in_frame1)
+        self.ingress_frame(in_frame2)
+        self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+    def test_igmp_in(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+                    gaddr="224.0.0.1")
+        mr.grps = [
+            IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="228.1.1.3")]
+
+        in_frame = Ether(**kw) / IP() / mr
+        out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+                     in_frame.payload.copy()
+        out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+                     in_frame.payload.copy()
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+    def test_igmp_out(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120)
+        in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+                    IP() / mq.copy()
+        in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+                    IP() / mq.copy()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP() / mq.copy()
+        self.ingress_frame(in_frame1)
+        self.ingress_frame(in_frame2)
+        self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+    def test_combo_downstream_unicast_onu1(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.reset_output()
+        self.ingress_frame(in_frame, ports=0)
+        self.assertEqual(self.output, [(128, out_frame)])
+
+    def test_combo_downstream_unicast_onu2(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.reset_output()
+        self.ingress_frame(in_frame, ports=0)
+        self.assertEqual(self.output, [(129, out_frame)])
+
+    def test_combo_upstream_unicast_onu1(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame)])
+
+    def test_combo_upstream_unicast_onu2(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(129, out_frame)])
+
+    def test_combo_multicast_stream1(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.1')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [])
+
+    def test_combo_multicast_stream2(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.2')
+        out_frame = Ether(**kw) / IP(dst='228.1.1.2')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame),])
+
+    def test_combo_multicast_stream3(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.3')
+        out_frame = Ether(**kw) / IP(dst='228.1.1.3')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(129, out_frame),])
+
+    def test_combo_multicast_stream4(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.4')
+        out_frame = Ether(**kw) / IP(dst='228.1.1.4')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+
+if __name__ == '__main__':
+    main()