PONSIM: PON simulator with real dataplane handling

This was needed because neither CPQD nor OVS can handle
both zero-tagged packets and 802.1ad (QinQ).

- extensive unittest proves ponsim functional correctness
  (for the common use-cases needed in the PON scenario)
- integrated with frameio and coupled with a rather
  simple gRPC NBI, ponsim can be operated from Voltha
  just like a real PON system
- posim_olt/_onu adapters added to Voltha to work on
  ponsim
- CLI can be used to preprovision and activate a PONSIM
  instance (e.g., preprovision_olt -t ponsim_olt -H localhost:50060)
- Some of olt-oftest:olt-complex testcases can be run on
  the ponsim device (in vagrant/Ubuntu environment),
  but there are some remaining issues to work out:
  - barrier calls in OF do not guaranty that the flow
    is already installed on the device. This is a generic
    issue, not just for ponsim.
  - the whole test framework is inconsistent about zero-
    tagged vs. untagged frames at the ONUs, while ponsim
    is rather pedantica and does exactly what was defined
    in the flows.

Change-Id: I0dd564c932416ae1566935492134cb5b08113bdc
diff --git a/.gitignore b/.gitignore
index 50df298..bd10220 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,6 +60,7 @@
 
 # Files copied over during make
 ofagent/protos/third_party/google
+ponsim/protos/third_party/google
 
 # Voltha CLI hostory files
 .voltha_cli_history
diff --git a/chameleon/main.py b/chameleon/main.py
index 4d3939d..9031727 100755
--- a/chameleon/main.py
+++ b/chameleon/main.py
@@ -186,6 +186,7 @@
   \____| |_| |_|  \__,_| |_| |_| |_|  \___| |_|  \___|  \___/  |_| |_|
 
 '''
+
 def print_banner(log):
     for line in banner.strip('\n').splitlines():
         log.info(line)
diff --git a/cli/main.py b/cli/main.py
index 3f2dd7a..a54e4c4 100755
--- a/cli/main.py
+++ b/cli/main.py
@@ -287,12 +287,16 @@
         make_option('-m', '--mac-address', action='store', dest='mac_address',
                     default='00:0c:e2:31:40:00'),
         make_option('-i', '--ip-address', action='store', dest='ip_address'),
+        make_option('-H', '--host_and_port', action='store',
+                    dest='host_and_port'),
     ])
     def do_preprovision_olt(self, line, opts):
         """Preprovision a new OLT with given device type"""
         stub = voltha_pb2.VolthaLocalServiceStub(self.get_channel())
         kw = dict(type=opts.device_type)
-        if opts.ip_address:
+        if opts.host_and_port:
+            kw['host_and_port'] = opts.host_and_port
+        elif opts.ip_address:
             kw['ipv4_address'] = opts.ip_address
         elif opts.mac_address:
             kw['mac_address'] = opts.mac_address
diff --git a/cli/utils.py b/cli/utils.py
index b1aad1a..0d677d1 100644
--- a/cli/utils.py
+++ b/cli/utils.py
@@ -136,10 +136,17 @@
             table.add_cell(i, *field_printers[type](ofb))
 
         for instruction in flow['instructions']:
-            if instruction['type'] == 4:
+            itype = instruction['type']
+            if itype == 4:
                 for action in instruction['actions']['actions']:
-                    type = action['type'][len('OFPAT_'):]
-                    table.add_cell(i, *action_printers[type](action))
+                    atype = action['type'][len('OFPAT_'):]
+                    table.add_cell(i, *action_printers[atype](action))
+            elif itype == 1:
+                table.add_cell(i, 10000, 'goto-table',
+                               instruction['goto_table']['table_id'])
+            else:
+                raise NotImplementedError(
+                    'not handling instruction type {}'.format(itype))
 
     table.print_table(header, printfn)
 
diff --git a/ponsim/__init__.py b/ponsim/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/ponsim/__init__.py
diff --git a/ponsim/grpc_server.py b/ponsim/grpc_server.py
new file mode 100644
index 0000000..20b5dab
--- /dev/null
+++ b/ponsim/grpc_server.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import grpc
+import structlog
+from concurrent import futures
+
+from common.utils.grpc_utils import twisted_async
+from voltha.protos import third_party
+from voltha.protos.ponsim_pb2 import PonSimServicer, \
+    add_PonSimServicer_to_server, PonSimDeviceInfo
+from google.protobuf.empty_pb2 import Empty
+
+_ = third_party
+
+log = structlog.get_logger()
+
+
+class FlowUpdateHandler(PonSimServicer):
+
+    def __init__(self, thread_pool, ponsim):
+        self.thread_pool = thread_pool
+        self.ponsim = ponsim
+
+    @twisted_async
+    def GetDeviceInfo(self, request, context):
+        log.info('get-device-info')
+        ports = self.ponsim.get_ports()
+        return PonSimDeviceInfo(
+            nni_port=ports[0],
+            uni_ports=ports[1:]
+        )
+
+    @twisted_async
+    def UpdateFlowTable(self, request, context):
+        log.info('flow-table-update', request=request, port=request.port)
+        if request.port == 0:
+            # by convention this is the olt port
+            self.ponsim.olt_install_flows(request.flows)
+        else:
+            self.ponsim.onu_install_flows(request.port, request.flows)
+        return Empty()
+
+class GrpcServer(object):
+
+    def __init__(self, port, ponsim):
+        self.port = port
+        self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
+        self.server = grpc.server(self.thread_pool)
+        self.ponsim = ponsim
+
+    def start(self):
+        log.debug('starting')
+        handler = FlowUpdateHandler(self.thread_pool, self.ponsim)
+        add_PonSimServicer_to_server(handler, self.server)
+        self.server.add_insecure_port('[::]:%s' % self.port)
+        self.server.start()
+        log.info('started')
+
+    def stop(self, grace=0):
+        log.debug('stopping')
+        self.server.stop(grace)
+        log.info('stopped')
diff --git a/ponsim/main.py b/ponsim/main.py
new file mode 100755
index 0000000..f3d0ffd
--- /dev/null
+++ b/ponsim/main.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PON Simulator process, able to move packets across NNI and UNIs, as well
+as take MGMT calls via gRPC.
+It can only work on Linux.
+"""
+import argparse
+import os
+
+import yaml
+from twisted.internet.defer import inlineCallbacks
+
+from common.structlog_setup import setup_logging
+from grpc_server import GrpcServer
+from ponsim import PonSim
+from realio import RealIo
+
+defs = dict(
+    config=os.environ.get('CONFIG', './ponsim.yml'),
+    grpc_port=int(os.environ.get('GRPC_PORT', 50060)),
+    name=os.environ.get('NAME', 'pon1'),
+    onus=int(os.environ.get("ONUS", 1))
+)
+
+
+def load_config(args):
+    path = args.config
+    if path.startswith('.'):
+        dir = os.path.dirname(os.path.abspath(__file__))
+        path = os.path.join(dir, path)
+    path = os.path.abspath(path)
+    with open(path) as fd:
+        config = yaml.load(fd)
+    return config
+
+
+banner = r'''
+ ____   __   __ _  ____  __  _  _
+(  _ \ /  \ (  ( \/ ___)(  )( \/ )
+ ) __/(  O )/    /\___ \ )( / \/ \
+(__)   \__/ \_)__)(____/(__)\_)(_/
+'''
+
+def print_banner(log):
+    for line in banner.strip('\n').splitlines():
+        log.info(line)
+    log.info('(to stop: press Ctrl-C)')
+
+
+def parse_args():
+
+    parser = argparse.ArgumentParser()
+
+    _help = ('Path to chameleon.yml config file (default: %s). '
+             'If relative, it is relative to main.py of chameleon.'
+             % defs['config'])
+    parser.add_argument('-c', '--config',
+                        dest='config',
+                        action='store',
+                        default=defs['config'],
+                        help=_help)
+
+    _help = ('port number of the GRPC service exposed by voltha (default: %s)'
+             % defs['grpc_port'])
+    parser.add_argument('-g', '--grpc-port',
+                        dest='grpc_port',
+                        action='store',
+                        default=defs['grpc_port'],
+                        help=_help)
+
+    _help = ('number of ONUs to simulate (default: %d)' % defs['onus'])
+    parser.add_argument('-o', '--onus',
+                        dest='onus',
+                        action='store',
+                        type=int,
+                        default=defs['onus'],
+                        help=_help)
+
+    _help = ('name of the PON natework used as a prefix for all network'
+             ' resource created on behalf of the PON (default: %s)' %
+             defs['name'])
+    parser.add_argument('-N', '--name',
+                        dest='name',
+                        action='store',
+                        default=defs['name'],
+                        help=_help)
+
+    _help = "suppress debug and info logs"
+    parser.add_argument('-q', '--quiet',
+                        dest='quiet',
+                        action='count',
+                        help=_help)
+
+    _help = 'enable verbose logging'
+    parser.add_argument('-v', '--verbose',
+                        dest='verbose',
+                        action='count',
+                        help=_help)
+
+    _help = 'omit startup banner log lines'
+    parser.add_argument('-n', '--no-banner',
+                        dest='no_banner',
+                        action='store_true',
+                        default=False,
+                        help=_help)
+
+    args = parser.parse_args()
+
+    return args
+
+
+class Main(object):
+
+    def __init__(self):
+
+        self.args = args = parse_args()
+        self.config = load_config(args)
+
+        verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+        self.log = setup_logging(self.config.get('logging', {}),
+                                 args.name,
+                                 verbosity_adjust=verbosity_adjust)
+
+        # components
+        self.io = None
+        self.ponsim = None
+        self.grpc_server = None
+
+        if not args.no_banner:
+            print_banner(self.log)
+
+        self.startup_components()
+
+    def start(self):
+        self.start_reactor()  # will not return except Keyboard interrupt
+
+    @inlineCallbacks
+    def startup_components(self):
+        try:
+            self.log.info('starting-internal-components')
+
+            iface_map = self.setup_networking_assets(self.args.name,
+                                                     self.args.onus)
+            self.io = yield RealIo(iface_map).start()
+            self.ponsim = PonSim(self.args.onus, self.io.egress)
+            self.io.register_ponsim(self.ponsim)
+
+            self.grpc_server = GrpcServer(self.args.grpc_port, self.ponsim)
+            yield self.grpc_server.start()
+
+            self.log.info('started-internal-services')
+
+        except Exception, e:
+            self.log.exception('startup-failed', e=e)
+
+    @inlineCallbacks
+    def shutdown_components(self):
+        """Execute before the reactor is shut down"""
+        self.log.info('exiting-on-keyboard-interrupt')
+        if self.io is not None:
+            self.io.stop()
+        self.teardown_networking_assets(self.args.name, self.args.onus)
+        if self.grpc_server is not None:
+            yield self.grpc_server.stop()
+
+    def start_reactor(self):
+        from twisted.internet import reactor
+        reactor.callWhenRunning(
+            lambda: self.log.info('twisted-reactor-started'))
+        reactor.addSystemEventTrigger('before', 'shutdown',
+                                      self.shutdown_components)
+        reactor.run()
+
+    def setup_networking_assets(self, prefix, n_unis):
+        # setup veth pairs for NNI and each UNI, using prefix and port numbers
+        port_map = dict()
+        for portnum in [0] + range(128, 128 + n_unis):
+            external_name = '%s_%d' % (prefix, portnum)
+            internal_name = external_name + 'sim'
+            os.system('sudo ip link add dev {} type veth peer name {}'.format(
+                external_name, internal_name
+            ))
+            os.system('sudo ip link set {} up'.format(external_name))
+            os.system('sudo ip link set {} up'.format(internal_name))
+            if portnum == 0:
+                os.system('sudo brctl addif ponmgmt {}'.format(external_name))
+            port_map[portnum] = internal_name
+        return port_map
+
+    def teardown_networking_assets(self, prefix, n_unis):
+        # undo all the networking stuff
+        for portnum in [0] + range(128, 128 + n_unis):
+            external_name = '%s_%d' % (prefix, portnum)
+            os.system('sudo ip link del {}'.format(external_name))
+
+
+if __name__ == '__main__':
+    Main().start()
diff --git a/ponsim/ponsim.py b/ponsim/ponsim.py
new file mode 100644
index 0000000..c855e39
--- /dev/null
+++ b/ponsim/ponsim.py
@@ -0,0 +1,236 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple PON Simulator which would not be needed if openvswitch could do
+802.1ad (QinQ), which it cannot (the reason is beyond me), or if CPQD could
+handle 0-tagged packets (no comment).
+"""
+import structlog
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Packet
+
+from common.frameio.frameio import hexify
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+def ipv4int2str(ipv4int):
+    return '{}.{}.{}.{}'.format(
+        (ipv4int >> 24) & 0xff,
+        (ipv4int >> 16) & 0xff,
+        (ipv4int >> 8) & 0xff,
+        ipv4int & 0xff
+    )
+
+
+class SimDevice(object):
+
+    def __init__(self, name, logical_port_no):
+        self.name = name
+        self.logical_port_no = logical_port_no
+        self.links = dict()
+        self.flows = list()
+        self.log = structlog.get_logger(name=name,
+                                        logical_port_no=logical_port_no)
+
+    def link(self, port, egress_fun):
+        self.links.setdefault(port, []).append(egress_fun)
+
+    def ingress(self, port, frame):
+        self.log.debug('ingress', ingress_port=port)
+        outcome = self.process_frame(port, frame)
+        if outcome is not None:
+            egress_port, egress_frame = outcome
+            forwarded = 0
+            links = self.links.get(egress_port)
+            if links is not None:
+                for fun in links:
+                    forwarded += 1
+                    self.log.debug('forwarding', egress_port=egress_port)
+                    fun(egress_port, egress_frame)
+            if not forwarded:
+                self.log.debug('no-one-to-forward-to', egress_port=egress_port)
+        else:
+            self.log.debug('dropped')
+
+    def install_flows(self, flows):
+        # store flows in precedence order so we can roll down on frame arrival
+        self.flows = sorted(flows, key=lambda fm: fm.priority, reverse=True)
+
+    def process_frame(self, ingress_port, ingress_frame):
+        for flow in self.flows:
+            if self.is_match(flow, ingress_port, ingress_frame):
+                egress_port, egress_frame = self.process_actions(
+                    flow, ingress_frame)
+                return egress_port, egress_frame
+        return None
+
+    @staticmethod
+    def is_match(flow, ingress_port, frame):
+
+        def get_non_shim_ether_type(f):
+            if f.haslayer(Dot1Q):
+                f = f.getlayer(Dot1Q)
+            return f.type
+
+        def get_vlan_pcp(f):
+            if f.haslayer(Dot1Q):
+                return f.getlayer(Dot1Q).prio
+
+        def get_ip_proto(f):
+            if f.haslayer(IP):
+                return f.getlayer(IP).proto
+
+        def get_ipv4_dst(f):
+            if f.haslayer(IP):
+                return f.getlayer(IP).dst
+
+        def get_udp_dst(f):
+            if f.haslayer(UDP):
+                return f.getlayer(UDP).dport
+
+        for field in get_ofb_fields(flow):
+
+            if field.type == IN_PORT:
+                if field.port != ingress_port:
+                    return False
+
+            elif field.type == ETH_TYPE:
+                if field.eth_type != get_non_shim_ether_type(frame):
+                    return False
+
+            elif field.type == IP_PROTO:
+                if field.ip_proto != get_ip_proto(frame):
+                    return False
+
+            elif field.type == VLAN_VID:
+                expected_vlan = field.vlan_vid
+                tagged = frame.haslayer(Dot1Q)
+                if bool(expected_vlan & 4096) != bool(tagged):
+                    return False
+                if tagged:
+                    actual_vid = frame.getlayer(Dot1Q).vlan
+                    if actual_vid != expected_vlan & 4095:
+                        return False
+
+            elif field.type == VLAN_PCP:
+                if field.vlan_pcp != get_vlan_pcp(frame):
+                    return False
+
+            elif field.type == IPV4_DST:
+                if ipv4int2str(field.ipv4_dst) != get_ipv4_dst(frame):
+                    return False
+
+            elif field.type == UDP_DST:
+                if field.udsp_dst != get_udp_dst(frame):
+                    return False
+
+            else:
+                raise NotImplementedError('field.type=%d' % field.type)
+
+        return True
+
+    @staticmethod
+    def process_actions(flow, frame):
+        egress_port = None
+        for action in get_actions(flow):
+
+            if action.type == OUTPUT:
+                egress_port = action.output.port
+
+            elif action.type == POP_VLAN:
+                if frame.haslayer(Dot1Q):
+                    shim = frame.getlayer(Dot1Q)
+                    frame = Ether(
+                        src=frame.src,
+                        dst=frame.dst,
+                        type=shim.type) / shim.payload
+
+            elif action.type == PUSH_VLAN:
+                frame = (
+                    Ether(src=frame.src, dst=frame.dst,
+                          type=action.push.ethertype) /
+                    Dot1Q(type=frame.type) /
+                    frame.payload
+                )
+
+            elif action.type == SET_FIELD:
+                assert (action.set_field.field.oxm_class ==
+                        ofp.OFPXMC_OPENFLOW_BASIC)
+                field = action.set_field.field.ofb_field
+
+                if field.type == VLAN_VID:
+                    shim = frame.getlayer(Dot1Q)
+                    shim.vlan = field.vlan_vid & 4095
+
+                elif field.type == VLAN_PCP:
+                    shim = frame.getlayer(Dot1Q)
+                    shim.prio = field.vlan_pcp
+
+                else:
+                    raise NotImplementedError('set_field.field.type=%d'
+                                              % field.type)
+
+            else:
+                raise NotImplementedError('action.type=%d' % action.type)
+
+        return egress_port, frame
+
+
+class PonSim(object):
+
+    def __init__(self, onus, egress_fun):
+        self.egress_fun = egress_fun
+
+        # Create OLT and hook NNI port up for egress
+        self.olt = SimDevice('olt', 0)
+        self.olt.link(2, lambda _, frame: self.egress_fun(0, frame))
+        self.devices = dict()
+        self.devices[0] = self.olt
+
+        # Create ONUs of the requested number and hook them up with OLT
+        # and with egress fun
+        def mk_egress_fun(port_no):
+            return lambda _, frame: self.egress_fun(port_no, frame)
+
+        def mk_onu_ingress(onu):
+            return lambda _, frame: onu.ingress(1, frame)
+
+        for i in range(onus):
+            port_no = 128 + i
+            onu = SimDevice('onu%d' % i, port_no)
+            onu.link(1, lambda _, frame: self.olt.ingress(1, frame))
+            onu.link(2, mk_egress_fun(port_no))
+            self.olt.link(1, mk_onu_ingress(onu))
+            self.devices[port_no] = onu
+
+    def get_ports(self):
+        return sorted(self.devices.keys())
+
+    def olt_install_flows(self, flows):
+        self.olt.install_flows(flows)
+
+    def onu_install_flows(self, onu_port, flows):
+        self.devices[onu_port].install_flows(flows)
+
+    def ingress(self, port, frame):
+        if not isinstance(frame, Packet):
+            frame = Ether(frame)
+        self.devices[port].ingress(2, frame)
+
diff --git a/ponsim/ponsim.yml b/ponsim/ponsim.yml
new file mode 100644
index 0000000..0f4ad04
--- /dev/null
+++ b/ponsim/ponsim.yml
@@ -0,0 +1,31 @@
+logging:
+    version: 1
+
+    formatters:
+      brief:
+        format: '%(message)s'
+      default:
+        format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(module)s.%(funcName)s %(message)s'
+        datefmt: '%Y%m%dT%H%M%S'
+
+    handlers:
+        console:
+            class : logging.StreamHandler
+            level: DEBUG
+            formatter: default
+            stream: ext://sys.stdout
+        null:
+            class: logging.NullHandler
+
+    loggers:
+        amqp:
+            handlers: [null]
+            propagate: False
+        conf:
+            handlers: [null]
+            propagate: False
+        '': # root logger
+            handlers: [console]
+            level: INFO # this can be bumped up/down by -q and -v command line
+                        # options
+            propagate: False
diff --git a/ponsim/realio.py b/ponsim/realio.py
new file mode 100644
index 0000000..bca3382
--- /dev/null
+++ b/ponsim/realio.py
@@ -0,0 +1,68 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from common.frameio.frameio import FrameIOManager, hexify
+
+log = structlog.get_logger()
+
+
+class RealIo(object):
+
+    def __init__(self, iface_map):
+        self.port_to_iface_name = iface_map
+        self.iface_name_to_port = dict((n, p) for p, n in iface_map.items())
+        self.frame_io = FrameIOManager()
+        self.ponsim = None
+        self.io_ports = dict()
+
+    @inlineCallbacks
+    def start(self):
+        log.debug('starting')
+        yield self.frame_io.start()
+        for port, iface_name in self.port_to_iface_name.items():
+            io_port = self.frame_io.add_interface(iface_name, self.ingress)
+            self.io_ports[port] = io_port
+        log.info('started')
+        returnValue(self)
+
+    def stop(self):
+        log.debug('stopping')
+        for port in self.io_ports.values():
+            self.frame_io.del_interface(port.iface_name)
+        self.frame_io.stop()
+        log.info('stopped')
+
+    def register_ponsim(self, ponsim):
+        self.ponsim = ponsim
+
+    def ingress(self, io_port, frame):
+        port = self.iface_name_to_port.get(io_port.iface_name)
+        log.debug('ingress', port=port, iface_name=io_port.iface_name,
+                  frame=hexify(frame))
+        decoded_frame = Ether(frame)
+        if self.ponsim is not None:
+            self.ponsim.ingress(port, decoded_frame)
+
+    def egress(self, port, frame):
+        if isinstance(frame, Packet):
+            frame = str(frame)
+        io_port = self.io_ports[port]
+        log.debug('sending', port=port, frame=hexify(frame))
+        io_port.send(frame)
diff --git a/ponsim/test_ponsim.py b/ponsim/test_ponsim.py
new file mode 100644
index 0000000..365468b
--- /dev/null
+++ b/ponsim/test_ponsim.py
@@ -0,0 +1,372 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q, EAPOL
+
+from ponsim import PonSim
+from voltha.extensions.IGMP import IGMP_TYPE_V3_MEMBERSHIP_REPORT, IGMPv3gr, \
+    IGMPv3, IGMP_TYPE_MEMBERSHIP_QUERY
+from voltha.extensions.IGMP import IGMP_V3_GR_TYPE_EXCLUDE
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+class TestPonSim(TestCase):
+
+    def setUp(self):
+        self.output = []
+        self.pon = PonSim(onus=2, egress_fun=lambda port, frame:
+            self.output.append((port, frame)))
+
+    def reset_output(self):
+        while self.output:
+            self.output.pop()
+
+    def ingress_frame(self, frame, ports=None):
+        if ports is None:
+            ports = self.pon.get_ports()
+        if isinstance(ports, int):
+            ports = [ports]
+        for port in ports:
+            self.pon.ingress(port, frame)
+
+    def assert_dont_pass(self, frame, ports=None):
+        self.reset_output()
+        self.ingress_frame(frame, ports)
+        self.assertEqual(self.output, [])
+
+    def assert_untagged_frames_dont_pass(self, ports=None):
+        self.assert_dont_pass(Ether(), ports=ports)
+
+    def test_basics(self):
+        self.assertEqual(self.pon.get_ports(), [0, 128, 129])
+
+    def test_by_default_no_traffic_passes(self):
+        self.assert_untagged_frames_dont_pass()
+
+    def test_downstream_unicast_forwarding(self):
+
+        self.pon.olt_install_flows([
+            mk_flow_stat(
+                match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+                actions=[pop_vlan(), output(1)]
+            )
+        ])
+        self.pon.onu_install_flows(128, [
+            mk_flow_stat(
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+            )
+        ])
+
+        # untagged frames shall not get through
+        self.assert_untagged_frames_dont_pass()
+
+        # incorrect single- or double-tagged frames don't pass
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=128) / Dot1Q(vlan=1000) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP())
+
+        # properly tagged downstream frame gets through and pops up at port 128
+        # as untagged
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame)])
+
+    def test_upstream_unicast_forwarding(self):
+
+        self.pon.onu_install_flows(128, [
+            mk_flow_stat(
+                match_fields=[in_port(2), vlan_vid(4096 + 0)],
+                actions=[set_field(vlan_vid(4096 + 128)), output(1)]
+            )
+        ])
+        self.pon.olt_install_flows([
+            mk_flow_stat(
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+                         output(2)]
+            )
+        ])
+
+        # untagged frames shall not get through
+        self.assert_untagged_frames_dont_pass()
+
+        # incorrect single- or double-tagged frames don't pass
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+        self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP())
+        self.assert_dont_pass(
+            Ether() / Dot1Q(vlan=129) / Dot1Q(vlan=1000) / IP())
+
+        # properly tagged downstream frame gets through and pops up at port 128
+        # as untagged
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(0, out_frame)])
+
+
+    def setup_all_flows(self):
+
+        self.pon.olt_install_flows([
+            mk_flow_stat(
+                priority=2000,
+                match_fields=[in_port(2), vlan_vid(4096 + 4000), vlan_pcp(0)],
+                actions=[pop_vlan(), output(1)]
+            ),
+            mk_flow_stat(
+                priority=2000,
+                match_fields=[in_port(1), eth_type(0x888e)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+                         output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[in_port(1), eth_type(0x800), ip_proto(2)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+                         output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[in_port(1), eth_type(0x800), ip_proto(17),
+                              udp_dst(67)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+                         output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[in_port(2), vlan_vid(4096 + 140)],
+                actions=[pop_vlan(), output(1)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+                actions=[pop_vlan(), output(1)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[
+                    push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+                    output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 129)],
+                actions=[
+                    push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+                    output(2)]
+            ),
+        ])
+
+        self.pon.onu_install_flows(128, [
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(4096 + 0)],
+                actions=[
+                    set_field(vlan_vid(4096 + 128)), output(1)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010102)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 128)],
+                actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(0)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 128)),
+                         output(1)]
+            )
+        ])
+
+        self.pon.onu_install_flows(129, [
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(4096 + 0)],
+                actions=[
+                    set_field(vlan_vid(4096 + 129)), output(1)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010103)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=1000,
+                match_fields=[
+                    in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+                actions=[output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(1), vlan_vid(4096 + 129)],
+                actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+            ),
+            mk_flow_stat(
+                priority=500,
+                match_fields=[in_port(2), vlan_vid(0)],
+                actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 129)),
+                         output(1)]
+            )
+        ])
+
+    def test_combo_block_untagged_downstream(self):
+        self.setup_all_flows()
+        self.assert_untagged_frames_dont_pass(ports=0)
+
+    def test_eapol_in(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / EAPOL(type=1)
+        out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+        out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+    def test_eapol_out(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+        in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / EAPOL(type=1)
+        self.ingress_frame(in_frame1)
+        self.ingress_frame(in_frame2)
+        self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+    def test_igmp_in(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+                    gaddr="224.0.0.1")
+        mr.grps = [
+            IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="228.1.1.3")]
+
+        in_frame = Ether(**kw) / IP() / mr
+        out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+                     in_frame.payload.copy()
+        out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+                     in_frame.payload.copy()
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+    def test_igmp_out(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120)
+        in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+                    IP() / mq.copy()
+        in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+                    IP() / mq.copy()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP() / mq.copy()
+        self.ingress_frame(in_frame1)
+        self.ingress_frame(in_frame2)
+        self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+    def test_combo_downstream_unicast_onu1(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.reset_output()
+        self.ingress_frame(in_frame, ports=0)
+        self.assertEqual(self.output, [(128, out_frame)])
+
+    def test_combo_downstream_unicast_onu2(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.reset_output()
+        self.ingress_frame(in_frame, ports=0)
+        self.assertEqual(self.output, [(129, out_frame)])
+
+    def test_combo_upstream_unicast_onu1(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame)])
+
+    def test_combo_upstream_unicast_onu2(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+        out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(129, out_frame)])
+
+    def test_combo_multicast_stream1(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.1')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [])
+
+    def test_combo_multicast_stream2(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.2')
+        out_frame = Ether(**kw) / IP(dst='228.1.1.2')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame),])
+
+    def test_combo_multicast_stream3(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.3')
+        out_frame = Ether(**kw) / IP(dst='228.1.1.3')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(129, out_frame),])
+
+    def test_combo_multicast_stream4(self):
+        self.setup_all_flows()
+        kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+        in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.4')
+        out_frame = Ether(**kw) / IP(dst='228.1.1.4')
+        self.ingress_frame(in_frame)
+        self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+
+if __name__ == '__main__':
+    main()
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index 4ddeeb5..d2ee546 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -215,3 +215,7 @@
 
         # by returning we allow the device to be shown as active, which
         # indirectly verified that message passing works
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index e59d5dd..5e9fef6 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -141,6 +141,16 @@
         :return: None
         """
 
+    def receive_packet_out(logical_device_id, egress_port_no, msg):
+        """
+        Pass a packet_out message content to adapter so that it can forward it
+        out to the device. This is only called on root devices.
+        :param logical_device_id:
+        :param egress_port: egress logical port number
+        :param msg: actual message
+        :return: None
+        """
+
     # TODO work in progress
     # ...
 
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 559a430..bcc442b 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -297,3 +297,7 @@
 
     def receive_proxied_message(self, proxy_address, msg):
         raise NotImplementedError()
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/microsemi/microsemi.py b/voltha/adapters/microsemi/microsemi.py
index d59a994..aea79f9 100644
--- a/voltha/adapters/microsemi/microsemi.py
+++ b/voltha/adapters/microsemi/microsemi.py
@@ -122,6 +122,10 @@
     def update_flows_incrementally(self, device, flow_changes, group_changes):
         raise NotImplementedError()
 
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
+
     ##
     # Private methods
     ##
diff --git a/voltha/adapters/ponsim_olt/__init__.py b/voltha/adapters/ponsim_olt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/ponsim_olt/__init__.py
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
new file mode 100644
index 0000000..9bd267a
--- /dev/null
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -0,0 +1,322 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fully simulated OLT/ONU adapter.
+"""
+from uuid import uuid4
+
+import grpc
+import structlog
+from scapy.layers.l2 import Ether, Dot1Q
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from common.frameio.frameio import BpfProgramFilter, hexify
+from voltha.adapters.interface import IAdapterInterface
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos import third_party
+from voltha.protos import ponsim_pb2
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
+from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
+    AdminState
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port, Device
+from voltha.protos.health_pb2 import HealthStatus
+from google.protobuf.empty_pb2 import Empty
+
+from voltha.protos.logical_device_pb2 import LogicalPort, LogicalDevice
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD, \
+    OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
+    ofp_switch_features, ofp_desc
+from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.ponsim_pb2 import FlowTable
+from voltha.registry import registry
+
+_ = third_party
+log = structlog.get_logger()
+
+
+PACKET_IN_VLAN = 4000
+is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
+    PACKET_IN_VLAN))
+
+
+@implementer(IAdapterInterface)
+class PonSimOltAdapter(object):
+
+    name = 'ponsim_olt'
+
+    supported_device_types = [
+        DeviceType(
+            id=name,
+            adapter=name,
+            accepts_bulk_flow_update=True
+        )
+    ]
+
+    def __init__(self, adapter_agent, config):
+        self.adapter_agent = adapter_agent
+        self.config = config
+        self.descriptor = Adapter(
+            id=self.name,
+            vendor='Voltha project',
+            version='0.4',
+            config=AdapterConfig(log_level=LogLevel.INFO)
+        )
+        self.devices_handlers = dict()  # device_id -> PonSimOltHandler()
+        self.logical_device_id_to_root_device_id = dict()
+
+    def start(self):
+        log.debug('starting')
+        log.info('started')
+
+    def stop(self):
+        log.debug('stopping')
+        log.info('stopped')
+
+    def adapter_descriptor(self):
+        return self.descriptor
+
+    def device_types(self):
+        return DeviceTypes(items=self.supported_device_types)
+
+    def health(self):
+        return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
+
+    def change_master_state(self, master):
+        raise NotImplementedError()
+
+    def adopt_device(self, device):
+        self.devices_handlers[device.id] = PonSimOltHandler(self, device.id)
+        reactor.callLater(0, self.devices_handlers[device.id].activate, device)
+        return device
+
+    def abandon_device(self, device):
+        raise NotImplementedError()
+
+    def deactivate_device(self, device):
+        raise NotImplementedError()
+
+    def update_flows_bulk(self, device, flows, groups):
+        log.info('bulk-flow-update', device_id=device.id,
+                  flows=flows, groups=groups)
+        assert len(groups.items) == 0
+        handler = self.devices_handlers[device.id]
+        return handler.update_flow_table(flows.items)
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        raise NotImplementedError()
+
+    def send_proxied_message(self, proxy_address, msg):
+        log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+        handler = self.devices_handlers[proxy_address.device_id]
+        handler.send_proxied_message(proxy_address, msg)
+
+    def receive_proxied_message(self, proxy_address, msg):
+        raise NotImplementedError()
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+
+        def ldi_to_di(ldi):
+            di = self.logical_device_id_to_root_device_id.get(ldi)
+            if di is None:
+                logical_device = self.adapter_agent.get_logical_device(ldi)
+                di = logical_device.root_device_id
+                self.logical_device_id_to_root_device_id[ldi] = di
+            return di
+
+        device_id = ldi_to_di(logical_device_id)
+        handler = self.devices_handlers[device_id]
+        handler.packet_out(egress_port_no, msg)
+
+
+class PonSimOltHandler(object):
+
+    def __init__(self, adapter, device_id):
+        self.adapter = adapter
+        self.adapter_agent = adapter.adapter_agent
+        self.device_id = device_id
+        self.log = structlog.get_logger(device_id=device_id)
+        self.channel = None
+        self.io_port = None
+        self.logical_device_id = None
+        self.interface = registry('main').get_args().interface
+
+    def __del__(self):
+        if self.io_port is not None:
+            registry('frameio').del_interface(self.interface)
+
+    def get_channel(self):
+        if self.channel is None:
+            device = self.adapter_agent.get_device(self.device_id)
+            self.channel = grpc.insecure_channel(device.host_and_port)
+        return self.channel
+
+    def activate(self, device):
+        self.log.info('activating')
+
+        if not device.host_and_port:
+            device.oper_status = OperStatus.FAILED
+            device.reason = 'No host_and_port field provided'
+            self.adapter_agent.update_device(device)
+            return
+
+        stub = ponsim_pb2.PonSimStub(self.get_channel())
+        info = stub.GetDeviceInfo(Empty())
+        log.info('got-info', info=info)
+
+        device.root = True
+        device.vendor = 'ponsim'
+        device.model ='n/a'
+        device.serial_number = device.host_and_port
+        device.connect_status = ConnectStatus.REACHABLE
+        self.adapter_agent.update_device(device)
+
+        nni_port = Port(
+            port_no=2,
+            label='NNI facing Ethernet port',
+            type=Port.ETHERNET_NNI,
+            admin_state=AdminState.ENABLED,
+            oper_status=OperStatus.ACTIVE
+        )
+        self.adapter_agent.add_port(device.id, nni_port)
+        self.adapter_agent.add_port(device.id, Port(
+            port_no=1,
+            label='PON port',
+            type=Port.PON_OLT,
+            admin_state=AdminState.ENABLED,
+            oper_status=OperStatus.ACTIVE
+        ))
+
+        ld = LogicalDevice(
+            # not setting id and datapth_id will let the adapter agent pick id
+            desc=ofp_desc(
+                mfr_desc='cord porject',
+                hw_desc='simualted pon',
+                sw_desc='simualted pon',
+                serial_num=uuid4().hex,
+                dp_desc='n/a'
+            ),
+            switch_features=ofp_switch_features(
+                n_buffers=256,  # TODO fake for now
+                n_tables=2,  # TODO ditto
+                capabilities=(  # TODO and ditto
+                    OFPC_FLOW_STATS
+                    | OFPC_TABLE_STATS
+                    | OFPC_PORT_STATS
+                    | OFPC_GROUP_STATS
+                )
+            ),
+            root_device_id=device.id
+        )
+        ld_initialized = self.adapter_agent.create_logical_device(ld)
+        cap = OFPPF_1GB_FD | OFPPF_FIBER
+        self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
+            id='nni',
+            ofp_port=ofp_port(
+                port_no=info.nni_port,
+                hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % info.nni_port),
+                name='nni',
+                config=0,
+                state=OFPPS_LIVE,
+                curr=cap,
+                advertised=cap,
+                peer=cap,
+                curr_speed=OFPPF_1GB_FD,
+                max_speed=OFPPF_1GB_FD
+            ),
+            device_id=device.id,
+            device_port_no=nni_port.port_no,
+            root_port=True
+        ))
+
+        device = self.adapter_agent.get_device(device.id)
+        device.parent_id = ld_initialized.id
+        device.oper_status = OperStatus.ACTIVE
+        self.adapter_agent.update_device(device)
+        self.logical_device_id = ld_initialized.id
+
+        # register ONUS per uni port
+        for port_no in info.uni_ports:
+            vlan_id = port_no
+            self.adapter_agent.child_device_detected(
+                parent_device_id=device.id,
+                parent_port_no=1,
+                child_device_type='ponsim_onu',
+                proxy_address=Device.ProxyAddress(
+                    device_id=device.id,
+                    channel_id=vlan_id
+                ),
+                vlan=vlan_id
+            )
+
+        # finally, open the frameio port to receive in-band packet_in messages
+        self.log.info('registering-frameio')
+        self.io_port = registry('frameio').add_interface(
+            self.interface, self.rcv_io, is_inband_frame)
+        self.log.info('registered-frameio')
+
+    def rcv_io(self, port, frame):
+        self.log.info('reveived', iface_name=port.iface_name,
+                       frame_len=len(frame))
+        pkt = Ether(frame)
+        if pkt.haslayer(Dot1Q):
+            outer_shim = pkt.getlayer(Dot1Q)
+            if isinstance(outer_shim.payload, Dot1Q):
+                inner_shim = outer_shim.payload
+                cvid = inner_shim.vlan
+                logical_port = cvid
+                popped_frame = (
+                    Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+                    inner_shim.payload
+                )
+                kw = dict(
+                    logical_device_id=self.logical_device_id,
+                    logical_port_no=logical_port,
+                )
+                self.log.info('sending-packet-in', **kw)
+                self.adapter_agent.send_packet_in(
+                    packet=str(popped_frame), **kw)
+
+    def update_flow_table(self, flows):
+        stub = ponsim_pb2.PonSimStub(self.get_channel())
+        self.log.info('pushing-olt-flow-table')
+        stub.UpdateFlowTable(FlowTable(
+            port=0,
+            flows=flows
+        ))
+        self.log.info('success')
+
+    def send_proxied_message(self, proxy_address, msg):
+        self.log.info('sending-proxied-message')
+        if isinstance(msg, FlowTable):
+            stub = ponsim_pb2.PonSimStub(self.get_channel())
+            self.log.info('pushing-onu-flow-table', port=msg.port)
+            res = stub.UpdateFlowTable(msg)
+            self.adapter_agent.receive_proxied_message(proxy_address, res)
+
+    def packet_out(self, egress_port, msg):
+        self.log.info('sending-packet-out', egress_port=egress_port,
+                      msg=hexify(msg))
+        pkt = Ether(msg)
+        out_pkt = (
+            Ether(src=pkt.src, dst=pkt.dst) /
+            Dot1Q(vlan=4000) /
+            Dot1Q(vlan=egress_port, type=pkt.type) /
+            pkt.payload
+        )
+        self.io_port.send(str(out_pkt))
diff --git a/voltha/adapters/ponsim_onu/__init__.py b/voltha/adapters/ponsim_onu/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/ponsim_onu/__init__.py
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
new file mode 100644
index 0000000..d416db7
--- /dev/null
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -0,0 +1,221 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fully simulated OLT/ONU adapter.
+"""
+
+import structlog
+from twisted.internet import reactor
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from zope.interface import implementer
+
+from voltha.adapters.interface import IAdapterInterface
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos import third_party
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
+from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
+    AdminState
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port
+from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD
+from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.ponsim_pb2 import FlowTable
+
+_ = third_party
+log = structlog.get_logger()
+
+
+@implementer(IAdapterInterface)
+class PonSimOnuAdapter(object):
+
+    name = 'ponsim_onu'
+
+    supported_device_types = [
+        DeviceType(
+            id=name,
+            adapter=name,
+            accepts_bulk_flow_update=True
+        )
+    ]
+
+    def __init__(self, adapter_agent, config):
+        self.adapter_agent = adapter_agent
+        self.config = config
+        self.descriptor = Adapter(
+            id=self.name,
+            vendor='Voltha project',
+            version='0.4',
+            config=AdapterConfig(log_level=LogLevel.INFO)
+        )
+        self.devices_handlers = dict()  # device_id -> PonSimOltHandler()
+
+    def start(self):
+        log.debug('starting')
+        log.info('started')
+
+    def stop(self):
+        log.debug('stopping')
+        log.info('stopped')
+
+    def adapter_descriptor(self):
+        return self.descriptor
+
+    def device_types(self):
+        return DeviceTypes(items=self.supported_device_types)
+
+    def health(self):
+        return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
+
+    def change_master_state(self, master):
+        raise NotImplementedError()
+
+    def adopt_device(self, device):
+        self.devices_handlers[device.id] = PonSimOnuHandler(self, device.id)
+        reactor.callLater(0, self.devices_handlers[device.id].activate, device)
+        return device
+
+    def abandon_device(self, device):
+        raise NotImplementedError()
+
+    def deactivate_device(self, device):
+        raise NotImplementedError()
+
+    def update_flows_bulk(self, device, flows, groups):
+        log.info('bulk-flow-update', device_id=device.id,
+                  flows=flows, groups=groups)
+        assert len(groups.items) == 0
+        handler = self.devices_handlers[device.id]
+        return handler.update_flow_table(flows.items)
+
+    def update_flows_incrementally(self, device, flow_changes, group_changes):
+        raise NotImplementedError()
+
+    def send_proxied_message(self, proxy_address, msg):
+        log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+
+    def receive_proxied_message(self, proxy_address, msg):
+        log.info('receive-proxied-message', proxy_address=proxy_address,
+                 device_id=proxy_address.device_id, msg=msg)
+        handler = self.devices_handlers[proxy_address.device_id]
+        handler.receive_message(msg)
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
+
+
+class PonSimOnuHandler(object):
+
+    def __init__(self, adapter, device_id):
+        self.adapter = adapter
+        self.adapter_agent = adapter.adapter_agent
+        self.device_id = device_id
+        self.log = structlog.get_logger(device_id=device_id)
+        self.incoming_messages = DeferredQueue()
+        self.proxy_address = None
+
+    def receive_message(self, msg):
+        self.incoming_messages.put(msg)
+
+    def activate(self, device):
+        self.log.info('activating')
+
+        # first we verify that we got parent reference and proxy info
+        assert device.parent_id
+        assert device.proxy_address.device_id
+        assert device.proxy_address.channel_id
+
+        # register for proxied messages right away
+        self.proxy_address = device.proxy_address
+        self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+        # populate device info
+        device.root = True
+        device.vendor = 'ponsim'
+        device.model ='n/a'
+        device.connect_status = ConnectStatus.REACHABLE
+        self.adapter_agent.update_device(device)
+
+        # register physical ports
+        uni_port = Port(
+            port_no=2,
+            label='UNI facing Ethernet port',
+            type=Port.ETHERNET_UNI,
+            admin_state=AdminState.ENABLED,
+            oper_status=OperStatus.ACTIVE
+        )
+        self.adapter_agent.add_port(device.id, uni_port)
+        self.adapter_agent.add_port(device.id, Port(
+            port_no=1,
+            label='PON port',
+            type=Port.PON_ONU,
+            admin_state=AdminState.ENABLED,
+            oper_status=OperStatus.ACTIVE,
+            peers=[
+                Port.PeerPort(
+                    device_id=device.parent_id,
+                    port_no=device.parent_port_no
+                )
+            ]
+        ))
+
+        # add uni port to logical device
+        parent_device = self.adapter_agent.get_device(device.parent_id)
+        logical_device_id = parent_device.parent_id
+        assert logical_device_id
+        port_no = device.proxy_address.channel_id
+        cap = OFPPF_1GB_FD | OFPPF_FIBER
+        self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+            id='uni-{}'.format(port_no),
+            ofp_port=ofp_port(
+                port_no=port_no,
+                hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+                name='uni-{}'.format(port_no),
+                config=0,
+                state=OFPPS_LIVE,
+                curr=cap,
+                advertised=cap,
+                peer=cap,
+                curr_speed=OFPPF_1GB_FD,
+                max_speed=OFPPF_1GB_FD
+            ),
+            device_id=device.id,
+            device_port_no=uni_port.port_no
+        ))
+
+        device = self.adapter_agent.get_device(device.id)
+        device.oper_status = OperStatus.ACTIVE
+        self.adapter_agent.update_device(device)
+
+    @inlineCallbacks
+    def update_flow_table(self, flows):
+
+        # we need to proxy through the OLT to get to the ONU
+
+        # reset response queue
+        while self.incoming_messages.pending:
+            yield self.incoming_messages.get()
+
+        msg = FlowTable(
+            port=self.proxy_address.channel_id,
+            flows=flows
+        )
+        self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+
+        yield self.incoming_messages.get()
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 1fdc965..0c9227e 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -498,7 +498,7 @@
 
     def send_proxied_message(self, proxy_address, msg):
         log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
-        # we mimick a response by sending the same message back in a short time
+        # we mimic a response by sending the same message back in a short time
         reactor.callLater(
             0.2,
             self.adapter_agent.receive_proxied_message,
@@ -509,6 +509,10 @@
     def receive_proxied_message(self, proxy_address, msg):
         raise NotImplementedError()
 
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
+
     # ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
 
     def get_test_control_site(self):
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index bdddb99..ca072bb 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -356,3 +356,7 @@
 
         # by returning we allow the device to be shown as active, which
         # indirectly verified that message passing works
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 0d260c3..ca7652f 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -504,3 +504,7 @@
 
     def receive_proxied_message(self, proxy_address, msg):
         raise NotImplementedError()
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index f6c88c9..93714db 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -375,3 +375,7 @@
 
         # by returning we allow the device to be shown as active, which
         # indirectly verified that message passing works
+
+    def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+        log.info('packet-out', logical_device_id=logical_device_id,
+                 egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 0c5448f..3bf8857 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -31,6 +31,7 @@
 from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
     LogicalPort, AdminState
 from voltha.registry import registry
+from voltha.core.flow_decomposer import OUTPUT
 
 
 @implementer(IAdapterAgent)
@@ -64,11 +65,11 @@
         try:
             adapter = self.adapter_cls(self, config)
             yield adapter.start()
+            self.adapter = adapter
+            self.adapter_node_proxy = self._update_adapter_node()
+            self._update_device_types()
         except Exception, e:
             self.log.exception(e)
-        self.adapter = adapter
-        self.adapter_node_proxy = self._update_adapter_node()
-        self._update_device_types()
         self.log.info('started')
         returnValue(self)
 
@@ -199,6 +200,10 @@
                 return i
             i += 1
 
+    def get_logical_device(self, logical_device_id):
+        return self.root_proxy.get('/logical_devices/{}'.format(
+            logical_device_id))
+
     def create_logical_device(self, logical_device):
         assert isinstance(logical_device, LogicalDevice)
 
@@ -210,8 +215,24 @@
         self._make_up_to_date('/logical_devices',
                               logical_device.id, logical_device)
 
+        self.event_bus.subscribe(
+            topic='packet-out:{}'.format(logical_device.id),
+            callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
+        )
+
         return logical_device
 
+    def receive_packet_out(self, logical_device_id, ofp_packet_out):
+
+        def get_port_out(opo):
+            for action in opo.actions:
+                if action.type == OUTPUT:
+                    return action.output.port
+
+        out_port = get_port_out(ofp_packet_out)
+        frame = ofp_packet_out.data
+        self.adapter.receive_packet_out(logical_device_id, out_port, frame)
+
     def add_logical_port(self, logical_device_id, port):
         assert isinstance(port, LogicalPort)
         self._make_up_to_date(
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index f11247e..7dbf0b5 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -17,23 +17,23 @@
 """
 Model that captures the current state of a logical device
 """
-import threading
 from collections import OrderedDict
 
 import structlog
 
 from common.event_bus import EventBusClient
+from common.frameio.frameio import hexify
 from voltha.core.config.config_proxy import CallbackType
 from voltha.core.device_graph import DeviceGraph
 from voltha.core.flow_decomposer import FlowDecomposer, \
     flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
-    mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field
+    mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
+    push_vlan
 from voltha.protos import third_party
 from voltha.protos import openflow_13_pb2 as ofp
 from voltha.protos.device_pb2 import Port
 from voltha.protos.logical_device_pb2 import LogicalPort
 from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
-from voltha.registry import registry
 
 _ = third_party
 
@@ -450,16 +450,9 @@
     # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
     def packet_out(self, ofp_packet_out):
-        self.log.debug('packet-out', packet=ofp_packet_out)
-        print threading.current_thread().name
-        print 'PACKET_OUT:', ofp_packet_out
-        # TODO for debug purposes, lets turn this around and send it back
-        if 0:
-            self.packet_in(ofp.ofp_packet_in(
-                buffer_id=ofp_packet_out.buffer_id,
-                reason=ofp.OFPR_NO_MATCH,
-                data=ofp_packet_out.data
-            ))
+        self.log.info('packet-out', packet=ofp_packet_out)
+        topic = 'packet-out:{}'.format(self.logical_device_id)
+        self.event_bus.publish(topic, ofp_packet_out)
 
     # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -485,8 +478,8 @@
         self.packet_in(packet_in)
 
     def packet_in(self, ofp_packet_in):
-        # TODO
-        print 'PACKET_IN:', ofp_packet_in
+        self.log.info('packet-in', logical_device_id=self.logical_device_id,
+                      pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
         self.local_handler.send_packet_in(
             self.logical_device_id, ofp_packet_in)
 
@@ -630,7 +623,30 @@
                         set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
                         output(upstream_ports[0].port_no)
                     ]
-                )
+                ),
+                mk_flow_stat(
+                    priority=500,
+                    match_fields=[
+                        in_port(downstream_ports[0].port_no),
+                        vlan_vid(0)
+                    ],
+                    actions=[
+                        push_vlan(0x8100),
+                        set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+                        output(upstream_ports[0].port_no)
+                    ]
+                ),
+                mk_flow_stat(
+                    priority=500,
+                    match_fields=[
+                        in_port(upstream_ports[0].port_no),
+                        vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
+                    ],
+                    actions=[
+                        set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
+                        output(downstream_ports[0].port_no)
+                    ]
+                ),
             ])
             groups = OrderedDict()
             return flows, groups
diff --git a/voltha/extensions/IGMP.py b/voltha/extensions/IGMP.py
new file mode 100644
index 0000000..6473add
--- /dev/null
+++ b/voltha/extensions/IGMP.py
@@ -0,0 +1,282 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from scapy.fields import ByteEnumField, FieldLenField, IPField, \
+    FieldListField, XShortField, ConditionalField, BitField, ShortField, \
+    PacketListField
+from scapy.fields import ByteField
+from scapy.layers.inet import IP, bind_layers
+from scapy.layers.inet import IPOption_Router_Alert
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet
+from scapy.utils import atol, hexdump
+from scapy.utils import checksum
+
+IGMP_TYPE_MEMBERSHIP_QUERY     = 0x11
+IGMP_TYPE_V3_MEMBERSHIP_REPORT = 0x22
+IGMP_TYPE_V1_MEMBERSHIP_REPORT = 0x12
+IGMP_TYPE_V2_MEMBERSHIP_REPORT = 0x16
+IGMP_TYPE_V2_LEAVE_GROUP       = 0x17
+
+IGMP_V3_GR_TYPE_INCLUDE           = 0x01
+IGMP_V3_GR_TYPE_EXCLUDE           = 0x02
+IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE = 0x03
+IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE = 0x04
+IGMP_V3_GR_TYPE_ALLOW_NEW         = 0x05
+IGMP_V3_GR_TYPE_BLOCK_OLD         = 0x06
+
+
+"""
+IGMPV3_ALL_ROUTERS = '224.0.0.22'
+IGMPv3 = 3
+IP_SRC = '1.2.3.4'
+ETHERTYPE_IP = 0x0800
+IGMP_DST_MAC = "01:00:5e:00:01:01"
+IGMP_SRC_MAC = "5a:e1:ac:ec:4d:a1"
+"""
+
+
+class IGMPv3gr(Packet):
+    """IGMPv3 Group Record, used in membership report"""
+
+    name = "IGMPv3gr"
+
+    igmp_v3_gr_types = {
+        IGMP_V3_GR_TYPE_INCLUDE: "Include Mode",
+        IGMP_V3_GR_TYPE_EXCLUDE: "Exclude Mode",
+        IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE: "Change to Include Mode",
+        IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE: "Change to Exclude Mode",
+        IGMP_V3_GR_TYPE_ALLOW_NEW: "Allow New Sources",
+        IGMP_V3_GR_TYPE_BLOCK_OLD: "Block Old Sources"
+    }
+
+    fields_desc = [
+        ByteEnumField("rtype", IGMP_V3_GR_TYPE_INCLUDE, igmp_v3_gr_types),
+        ByteField("aux_data_len", 0),
+        FieldLenField("numsrc", None, count_of="sources"),
+        IPField("mcaddr", "0.0.0.0"),
+        FieldListField("sources", None, IPField("src", "0.0.0.0"), "numsrc")
+    ]
+
+    def post_build(self, pkt, payload):
+        pkt += payload
+        if self.aux_data_len != 0:
+            print "WARNING: Auxiliary Data Length must be zero (0)"
+        return pkt
+
+
+class IGMPv3(Packet):
+
+    name = "IGMPv3"
+
+    igmp_v3_types = {
+        IGMP_TYPE_MEMBERSHIP_QUERY: "Membership Query",
+        IGMP_TYPE_V3_MEMBERSHIP_REPORT: " Version 3 Mebership Report",
+        IGMP_TYPE_V2_MEMBERSHIP_REPORT: " Version 2 Mebership Report",
+        IGMP_TYPE_V1_MEMBERSHIP_REPORT: " Version 1 Mebership Report",
+        IGMP_TYPE_V2_LEAVE_GROUP: "Version 2 Leave Group"
+    }
+
+    fields_desc = [
+        ByteEnumField("type", IGMP_TYPE_MEMBERSHIP_QUERY, igmp_v3_types),
+        ByteField("max_resp_code", 0),
+        XShortField("checksum", None),
+        #IPField("group_address", "0.0.0.0"),
+
+        # membership query fields
+        ConditionalField(IPField("gaddr", "0.0.0.0"),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+        ConditionalField(BitField("resv", 0, 4),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+        ConditionalField(BitField("s", 0, 1),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+        ConditionalField(BitField("qrv", 0, 3),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+        ConditionalField(ByteField("qqic", 0),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+        ConditionalField(FieldLenField("numsrc", None, count_of="srcs"),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+        ConditionalField(FieldListField("srcs", None,
+                                        IPField("src", "0.0.0.0"), "numsrc"),
+                         lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+
+        # membership report fields
+        ConditionalField(
+            ShortField("resv2", 0),
+            lambda pkt: pkt.type == IGMP_TYPE_V3_MEMBERSHIP_REPORT),
+        ConditionalField(
+            FieldLenField("numgrp", None, count_of="grps"),
+            lambda pkt: pkt.type == IGMP_TYPE_V3_MEMBERSHIP_REPORT),
+        ConditionalField(
+            PacketListField("grps", [], IGMPv3gr),
+            lambda pkt: pkt.type == IGMP_TYPE_V3_MEMBERSHIP_REPORT)
+
+        # TODO: v2 and v3 membership reports?
+
+    ]
+
+    def post_build(self, pkt, payload):
+
+        pkt += payload
+
+        # max_resp_code field is reserved (0)
+        if self.type in [IGMP_TYPE_V3_MEMBERSHIP_REPORT,]:
+            mrc = 0
+        else:
+            mrc = self.encode_float(self.max_resp_code)
+        pkt = pkt[:1] + chr(mrc) + pkt[2:]
+
+        if self.checksum is None:
+            chksum = checksum(pkt)
+            pkt = pkt[:2] + chr(chksum >> 8) + chr(chksum & 0xff) + pkt[4:]
+
+        return pkt
+
+    def encode_float(self, value):
+        """Encode max response time value per RFC 3376."""
+        if value < 128:
+            return value
+        if value > 31743:
+            return 255
+        exp = 0
+        value >>= 3
+        while value > 31:
+            exp += 1
+            value >>= 1
+        return 0x80 | (exp << 4) | (value & 0xf)
+
+
+    def decode_float(self, code):
+        if code < 128:
+            return code
+        mant = code & 0xf
+        exp = (code >> 4) & 0x7
+        return (mant | 0x10) << (exp + 3)
+
+    @staticmethod
+    def is_valid_mcaddr(ip):
+        byte1 = atol(ip) >> 24 & 0xff
+        return (byte1 & 0xf0) == 0xe0
+
+    @staticmethod
+    def fixup(pkt):
+        """Fixes up the underlying IP() and Ether() headers."""
+        assert pkt.haslayer(IGMPv3), \
+            "This packet is not an IGMPv4 packet; cannot fix it up"
+
+        igmp = pkt.getlayer(IGMPv3)
+
+        if pkt.haslayer(IP):
+            ip = pkt.getlayer(IP)
+            ip.ttl = 1
+            ip.proto = 2
+            ip.tos = 0xc0
+            ip.options = [IPOption_Router_Alert()]
+
+            if igmp.type == IGMP_TYPE_MEMBERSHIP_QUERY:
+                if igmp.gaddr == "0.0.0.0":
+                    ip.dst = "224.0.0.1"
+                else:
+                    assert IGMPv3.is_valid_mcaddr(igmp.gaddr), \
+                        "IGMP membership query with invalid mcast address"
+                    ip.dst = igmp.gaddr
+
+            elif igmp.type == IGMP_TYPE_V2_LEAVE_GROUP and \
+                    IGMPv3.is_valid_mcaddr(igmp.gaddr):
+                ip.dst = "224.0.0.2"
+
+            elif igmp.type in (IGMP_TYPE_V1_MEMBERSHIP_REPORT,
+                              IGMP_TYPE_V2_MEMBERSHIP_REPORT) and \
+                  IGMPv3.is_valid_mcaddr(igmp.gaddr):
+                ip.dst = igmp.gaddr
+
+           # We do not need to fixup the ether layer, it is done by scapy
+           #
+           # if pkt.haslayer(Ether):
+           #     eth = pkt.getlayer(Ether)
+           #     ip_long = atol(ip.dst)
+           #     ether.dst = '01:00:5e:%02x:%02x:%02x' % (
+           #        (ip_long >> 16) & 0x7f, (ip_long >> 8) & 0xff,
+           #        ip_long & 0xff )
+
+        return pkt
+
+
+bind_layers(IP,       IGMPv3,   frag=0, proto=2, ttl=1, tos=0xc0)
+bind_layers(IGMPv3,   IGMPv3gr, frag=0, proto=2)
+bind_layers(IGMPv3gr, IGMPv3gr, frag=0, proto=2)
+
+
+if __name__ == "__main__":
+
+    print "test float encoding"
+    from math import log
+    max_expected_error = 1.0 / (2<<3) # four bit precision
+    p = IGMPv3()
+    for v in range(0, 31745):
+        c = p.encode_float(v)
+        d = p.decode_float(c)
+        rel_err = float(v-d)/v if v!=0 else 0.0
+        assert rel_err <= max_expected_error
+
+    print "construct membership query - general query"
+    mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120)
+    hexdump(str(mq))
+
+    print "construct membership query - group-specific query"
+    mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120,
+                gaddr="224.0.0.1")
+    hexdump(str(mq))
+
+    print "construct membership query - group-and-source-specific query"
+    mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120,
+                gaddr="224.0.0.1")
+    mq.srcs = ['1.2.3.4', '5.6.7.8']
+    hexdump(str(mq))
+
+    print "fixup"
+    mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY)
+    mq.srcs = ['1.2.3.4', '5.6.7.8']
+    pkt = Ether() / IP() / mq
+    print "before fixup:"
+    hexdump(str(pkt))
+
+    print "after fixup:"
+    IGMPv3.fixup(pkt)
+    hexdump(str(pkt))
+
+    print "construct v3 membership report - join a single group"
+    mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+                gaddr="224.0.0.1")
+    mr.grps = [IGMPv3gr( rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="229.10.20.30")]
+    hexdump(mr)
+
+    print "construct v3 membership report - join two groups"
+    mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+                gaddr="224.0.0.1")
+    mr.grps = [
+        IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="229.10.20.30"),
+        IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="229.10.20.31")
+    ]
+    hexdump(mr)
+
+    print "construct v3 membership report - leave a group"
+    mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+                gaddr="224.0.0.1")
+    mr.grps = [IGMPv3gr(rtype=IGMP_V3_GR_TYPE_INCLUDE, mcaddr="229.10.20.30")]
+    hexdump(mr)
+
+    print "all ok"
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 285c5ae..b4d3c46 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -113,6 +113,8 @@
         // ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
         string ipv6_address = 15;
 
+        string host_and_port = 21;
+
     };
 
     ProxyAddress proxy_address = 19;
@@ -121,6 +123,8 @@
 
     OperStatus.OperStatus oper_status = 17 [(access) = READ_ONLY];
 
+    string reason = 22 [(access) = READ_ONLY];  //  Used in FAILED state
+
     ConnectStatus.ConnectStatus connect_status = 18 [(access) = READ_ONLY];
 
     // TODO additional common attribute here
diff --git a/voltha/protos/ponsim.proto b/voltha/protos/ponsim.proto
new file mode 100644
index 0000000..3bc4d36
--- /dev/null
+++ b/voltha/protos/ponsim.proto
@@ -0,0 +1,27 @@
+syntax = "proto3";
+
+package voltha;
+
+import "google/protobuf/empty.proto";
+import "openflow_13.proto";
+
+
+message PonSimDeviceInfo {
+    int32 nni_port = 1;
+    repeated int32 uni_ports = 2;
+}
+
+message FlowTable {
+    int32 port = 1;  // Used to address right device
+    repeated openflow_13.ofp_flow_stats flows = 2;
+}
+
+service PonSim {
+
+    rpc GetDeviceInfo(google.protobuf.Empty)
+        returns(PonSimDeviceInfo) {}
+
+    rpc UpdateFlowTable(FlowTable)
+        returns(google.protobuf.Empty) {}
+
+}