PONSIM: PON simulator with real dataplane handling
This was needed because neither CPQD nor OVS can handle
both zero-tagged packets and 802.1ad (QinQ).
- extensive unittest proves ponsim functional correctness
(for the common use-cases needed in the PON scenario)
- integrated with frameio and coupled with a rather
simple gRPC NBI, ponsim can be operated from Voltha
just like a real PON system
- posim_olt/_onu adapters added to Voltha to work on
ponsim
- CLI can be used to preprovision and activate a PONSIM
instance (e.g., preprovision_olt -t ponsim_olt -H localhost:50060)
- Some of olt-oftest:olt-complex testcases can be run on
the ponsim device (in vagrant/Ubuntu environment),
but there are some remaining issues to work out:
- barrier calls in OF do not guaranty that the flow
is already installed on the device. This is a generic
issue, not just for ponsim.
- the whole test framework is inconsistent about zero-
tagged vs. untagged frames at the ONUs, while ponsim
is rather pedantica and does exactly what was defined
in the flows.
Change-Id: I0dd564c932416ae1566935492134cb5b08113bdc
diff --git a/.gitignore b/.gitignore
index 50df298..bd10220 100644
--- a/.gitignore
+++ b/.gitignore
@@ -60,6 +60,7 @@
# Files copied over during make
ofagent/protos/third_party/google
+ponsim/protos/third_party/google
# Voltha CLI hostory files
.voltha_cli_history
diff --git a/chameleon/main.py b/chameleon/main.py
index 4d3939d..9031727 100755
--- a/chameleon/main.py
+++ b/chameleon/main.py
@@ -186,6 +186,7 @@
\____| |_| |_| \__,_| |_| |_| |_| \___| |_| \___| \___/ |_| |_|
'''
+
def print_banner(log):
for line in banner.strip('\n').splitlines():
log.info(line)
diff --git a/cli/main.py b/cli/main.py
index 3f2dd7a..a54e4c4 100755
--- a/cli/main.py
+++ b/cli/main.py
@@ -287,12 +287,16 @@
make_option('-m', '--mac-address', action='store', dest='mac_address',
default='00:0c:e2:31:40:00'),
make_option('-i', '--ip-address', action='store', dest='ip_address'),
+ make_option('-H', '--host_and_port', action='store',
+ dest='host_and_port'),
])
def do_preprovision_olt(self, line, opts):
"""Preprovision a new OLT with given device type"""
stub = voltha_pb2.VolthaLocalServiceStub(self.get_channel())
kw = dict(type=opts.device_type)
- if opts.ip_address:
+ if opts.host_and_port:
+ kw['host_and_port'] = opts.host_and_port
+ elif opts.ip_address:
kw['ipv4_address'] = opts.ip_address
elif opts.mac_address:
kw['mac_address'] = opts.mac_address
diff --git a/cli/utils.py b/cli/utils.py
index b1aad1a..0d677d1 100644
--- a/cli/utils.py
+++ b/cli/utils.py
@@ -136,10 +136,17 @@
table.add_cell(i, *field_printers[type](ofb))
for instruction in flow['instructions']:
- if instruction['type'] == 4:
+ itype = instruction['type']
+ if itype == 4:
for action in instruction['actions']['actions']:
- type = action['type'][len('OFPAT_'):]
- table.add_cell(i, *action_printers[type](action))
+ atype = action['type'][len('OFPAT_'):]
+ table.add_cell(i, *action_printers[atype](action))
+ elif itype == 1:
+ table.add_cell(i, 10000, 'goto-table',
+ instruction['goto_table']['table_id'])
+ else:
+ raise NotImplementedError(
+ 'not handling instruction type {}'.format(itype))
table.print_table(header, printfn)
diff --git a/ponsim/__init__.py b/ponsim/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/ponsim/__init__.py
diff --git a/ponsim/grpc_server.py b/ponsim/grpc_server.py
new file mode 100644
index 0000000..20b5dab
--- /dev/null
+++ b/ponsim/grpc_server.py
@@ -0,0 +1,75 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import grpc
+import structlog
+from concurrent import futures
+
+from common.utils.grpc_utils import twisted_async
+from voltha.protos import third_party
+from voltha.protos.ponsim_pb2 import PonSimServicer, \
+ add_PonSimServicer_to_server, PonSimDeviceInfo
+from google.protobuf.empty_pb2 import Empty
+
+_ = third_party
+
+log = structlog.get_logger()
+
+
+class FlowUpdateHandler(PonSimServicer):
+
+ def __init__(self, thread_pool, ponsim):
+ self.thread_pool = thread_pool
+ self.ponsim = ponsim
+
+ @twisted_async
+ def GetDeviceInfo(self, request, context):
+ log.info('get-device-info')
+ ports = self.ponsim.get_ports()
+ return PonSimDeviceInfo(
+ nni_port=ports[0],
+ uni_ports=ports[1:]
+ )
+
+ @twisted_async
+ def UpdateFlowTable(self, request, context):
+ log.info('flow-table-update', request=request, port=request.port)
+ if request.port == 0:
+ # by convention this is the olt port
+ self.ponsim.olt_install_flows(request.flows)
+ else:
+ self.ponsim.onu_install_flows(request.port, request.flows)
+ return Empty()
+
+class GrpcServer(object):
+
+ def __init__(self, port, ponsim):
+ self.port = port
+ self.thread_pool = futures.ThreadPoolExecutor(max_workers=10)
+ self.server = grpc.server(self.thread_pool)
+ self.ponsim = ponsim
+
+ def start(self):
+ log.debug('starting')
+ handler = FlowUpdateHandler(self.thread_pool, self.ponsim)
+ add_PonSimServicer_to_server(handler, self.server)
+ self.server.add_insecure_port('[::]:%s' % self.port)
+ self.server.start()
+ log.info('started')
+
+ def stop(self, grace=0):
+ log.debug('stopping')
+ self.server.stop(grace)
+ log.info('stopped')
diff --git a/ponsim/main.py b/ponsim/main.py
new file mode 100755
index 0000000..f3d0ffd
--- /dev/null
+++ b/ponsim/main.py
@@ -0,0 +1,214 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+PON Simulator process, able to move packets across NNI and UNIs, as well
+as take MGMT calls via gRPC.
+It can only work on Linux.
+"""
+import argparse
+import os
+
+import yaml
+from twisted.internet.defer import inlineCallbacks
+
+from common.structlog_setup import setup_logging
+from grpc_server import GrpcServer
+from ponsim import PonSim
+from realio import RealIo
+
+defs = dict(
+ config=os.environ.get('CONFIG', './ponsim.yml'),
+ grpc_port=int(os.environ.get('GRPC_PORT', 50060)),
+ name=os.environ.get('NAME', 'pon1'),
+ onus=int(os.environ.get("ONUS", 1))
+)
+
+
+def load_config(args):
+ path = args.config
+ if path.startswith('.'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+ path = os.path.abspath(path)
+ with open(path) as fd:
+ config = yaml.load(fd)
+ return config
+
+
+banner = r'''
+ ____ __ __ _ ____ __ _ _
+( _ \ / \ ( ( \/ ___)( )( \/ )
+ ) __/( O )/ /\___ \ )( / \/ \
+(__) \__/ \_)__)(____/(__)\_)(_/
+'''
+
+def print_banner(log):
+ for line in banner.strip('\n').splitlines():
+ log.info(line)
+ log.info('(to stop: press Ctrl-C)')
+
+
+def parse_args():
+
+ parser = argparse.ArgumentParser()
+
+ _help = ('Path to chameleon.yml config file (default: %s). '
+ 'If relative, it is relative to main.py of chameleon.'
+ % defs['config'])
+ parser.add_argument('-c', '--config',
+ dest='config',
+ action='store',
+ default=defs['config'],
+ help=_help)
+
+ _help = ('port number of the GRPC service exposed by voltha (default: %s)'
+ % defs['grpc_port'])
+ parser.add_argument('-g', '--grpc-port',
+ dest='grpc_port',
+ action='store',
+ default=defs['grpc_port'],
+ help=_help)
+
+ _help = ('number of ONUs to simulate (default: %d)' % defs['onus'])
+ parser.add_argument('-o', '--onus',
+ dest='onus',
+ action='store',
+ type=int,
+ default=defs['onus'],
+ help=_help)
+
+ _help = ('name of the PON natework used as a prefix for all network'
+ ' resource created on behalf of the PON (default: %s)' %
+ defs['name'])
+ parser.add_argument('-N', '--name',
+ dest='name',
+ action='store',
+ default=defs['name'],
+ help=_help)
+
+ _help = "suppress debug and info logs"
+ parser.add_argument('-q', '--quiet',
+ dest='quiet',
+ action='count',
+ help=_help)
+
+ _help = 'enable verbose logging'
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help=_help)
+
+ _help = 'omit startup banner log lines'
+ parser.add_argument('-n', '--no-banner',
+ dest='no_banner',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ args = parser.parse_args()
+
+ return args
+
+
+class Main(object):
+
+ def __init__(self):
+
+ self.args = args = parse_args()
+ self.config = load_config(args)
+
+ verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+ self.log = setup_logging(self.config.get('logging', {}),
+ args.name,
+ verbosity_adjust=verbosity_adjust)
+
+ # components
+ self.io = None
+ self.ponsim = None
+ self.grpc_server = None
+
+ if not args.no_banner:
+ print_banner(self.log)
+
+ self.startup_components()
+
+ def start(self):
+ self.start_reactor() # will not return except Keyboard interrupt
+
+ @inlineCallbacks
+ def startup_components(self):
+ try:
+ self.log.info('starting-internal-components')
+
+ iface_map = self.setup_networking_assets(self.args.name,
+ self.args.onus)
+ self.io = yield RealIo(iface_map).start()
+ self.ponsim = PonSim(self.args.onus, self.io.egress)
+ self.io.register_ponsim(self.ponsim)
+
+ self.grpc_server = GrpcServer(self.args.grpc_port, self.ponsim)
+ yield self.grpc_server.start()
+
+ self.log.info('started-internal-services')
+
+ except Exception, e:
+ self.log.exception('startup-failed', e=e)
+
+ @inlineCallbacks
+ def shutdown_components(self):
+ """Execute before the reactor is shut down"""
+ self.log.info('exiting-on-keyboard-interrupt')
+ if self.io is not None:
+ self.io.stop()
+ self.teardown_networking_assets(self.args.name, self.args.onus)
+ if self.grpc_server is not None:
+ yield self.grpc_server.stop()
+
+ def start_reactor(self):
+ from twisted.internet import reactor
+ reactor.callWhenRunning(
+ lambda: self.log.info('twisted-reactor-started'))
+ reactor.addSystemEventTrigger('before', 'shutdown',
+ self.shutdown_components)
+ reactor.run()
+
+ def setup_networking_assets(self, prefix, n_unis):
+ # setup veth pairs for NNI and each UNI, using prefix and port numbers
+ port_map = dict()
+ for portnum in [0] + range(128, 128 + n_unis):
+ external_name = '%s_%d' % (prefix, portnum)
+ internal_name = external_name + 'sim'
+ os.system('sudo ip link add dev {} type veth peer name {}'.format(
+ external_name, internal_name
+ ))
+ os.system('sudo ip link set {} up'.format(external_name))
+ os.system('sudo ip link set {} up'.format(internal_name))
+ if portnum == 0:
+ os.system('sudo brctl addif ponmgmt {}'.format(external_name))
+ port_map[portnum] = internal_name
+ return port_map
+
+ def teardown_networking_assets(self, prefix, n_unis):
+ # undo all the networking stuff
+ for portnum in [0] + range(128, 128 + n_unis):
+ external_name = '%s_%d' % (prefix, portnum)
+ os.system('sudo ip link del {}'.format(external_name))
+
+
+if __name__ == '__main__':
+ Main().start()
diff --git a/ponsim/ponsim.py b/ponsim/ponsim.py
new file mode 100644
index 0000000..c855e39
--- /dev/null
+++ b/ponsim/ponsim.py
@@ -0,0 +1,236 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Simple PON Simulator which would not be needed if openvswitch could do
+802.1ad (QinQ), which it cannot (the reason is beyond me), or if CPQD could
+handle 0-tagged packets (no comment).
+"""
+import structlog
+from scapy.layers.inet import IP, UDP
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.packet import Packet
+
+from common.frameio.frameio import hexify
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+def ipv4int2str(ipv4int):
+ return '{}.{}.{}.{}'.format(
+ (ipv4int >> 24) & 0xff,
+ (ipv4int >> 16) & 0xff,
+ (ipv4int >> 8) & 0xff,
+ ipv4int & 0xff
+ )
+
+
+class SimDevice(object):
+
+ def __init__(self, name, logical_port_no):
+ self.name = name
+ self.logical_port_no = logical_port_no
+ self.links = dict()
+ self.flows = list()
+ self.log = structlog.get_logger(name=name,
+ logical_port_no=logical_port_no)
+
+ def link(self, port, egress_fun):
+ self.links.setdefault(port, []).append(egress_fun)
+
+ def ingress(self, port, frame):
+ self.log.debug('ingress', ingress_port=port)
+ outcome = self.process_frame(port, frame)
+ if outcome is not None:
+ egress_port, egress_frame = outcome
+ forwarded = 0
+ links = self.links.get(egress_port)
+ if links is not None:
+ for fun in links:
+ forwarded += 1
+ self.log.debug('forwarding', egress_port=egress_port)
+ fun(egress_port, egress_frame)
+ if not forwarded:
+ self.log.debug('no-one-to-forward-to', egress_port=egress_port)
+ else:
+ self.log.debug('dropped')
+
+ def install_flows(self, flows):
+ # store flows in precedence order so we can roll down on frame arrival
+ self.flows = sorted(flows, key=lambda fm: fm.priority, reverse=True)
+
+ def process_frame(self, ingress_port, ingress_frame):
+ for flow in self.flows:
+ if self.is_match(flow, ingress_port, ingress_frame):
+ egress_port, egress_frame = self.process_actions(
+ flow, ingress_frame)
+ return egress_port, egress_frame
+ return None
+
+ @staticmethod
+ def is_match(flow, ingress_port, frame):
+
+ def get_non_shim_ether_type(f):
+ if f.haslayer(Dot1Q):
+ f = f.getlayer(Dot1Q)
+ return f.type
+
+ def get_vlan_pcp(f):
+ if f.haslayer(Dot1Q):
+ return f.getlayer(Dot1Q).prio
+
+ def get_ip_proto(f):
+ if f.haslayer(IP):
+ return f.getlayer(IP).proto
+
+ def get_ipv4_dst(f):
+ if f.haslayer(IP):
+ return f.getlayer(IP).dst
+
+ def get_udp_dst(f):
+ if f.haslayer(UDP):
+ return f.getlayer(UDP).dport
+
+ for field in get_ofb_fields(flow):
+
+ if field.type == IN_PORT:
+ if field.port != ingress_port:
+ return False
+
+ elif field.type == ETH_TYPE:
+ if field.eth_type != get_non_shim_ether_type(frame):
+ return False
+
+ elif field.type == IP_PROTO:
+ if field.ip_proto != get_ip_proto(frame):
+ return False
+
+ elif field.type == VLAN_VID:
+ expected_vlan = field.vlan_vid
+ tagged = frame.haslayer(Dot1Q)
+ if bool(expected_vlan & 4096) != bool(tagged):
+ return False
+ if tagged:
+ actual_vid = frame.getlayer(Dot1Q).vlan
+ if actual_vid != expected_vlan & 4095:
+ return False
+
+ elif field.type == VLAN_PCP:
+ if field.vlan_pcp != get_vlan_pcp(frame):
+ return False
+
+ elif field.type == IPV4_DST:
+ if ipv4int2str(field.ipv4_dst) != get_ipv4_dst(frame):
+ return False
+
+ elif field.type == UDP_DST:
+ if field.udsp_dst != get_udp_dst(frame):
+ return False
+
+ else:
+ raise NotImplementedError('field.type=%d' % field.type)
+
+ return True
+
+ @staticmethod
+ def process_actions(flow, frame):
+ egress_port = None
+ for action in get_actions(flow):
+
+ if action.type == OUTPUT:
+ egress_port = action.output.port
+
+ elif action.type == POP_VLAN:
+ if frame.haslayer(Dot1Q):
+ shim = frame.getlayer(Dot1Q)
+ frame = Ether(
+ src=frame.src,
+ dst=frame.dst,
+ type=shim.type) / shim.payload
+
+ elif action.type == PUSH_VLAN:
+ frame = (
+ Ether(src=frame.src, dst=frame.dst,
+ type=action.push.ethertype) /
+ Dot1Q(type=frame.type) /
+ frame.payload
+ )
+
+ elif action.type == SET_FIELD:
+ assert (action.set_field.field.oxm_class ==
+ ofp.OFPXMC_OPENFLOW_BASIC)
+ field = action.set_field.field.ofb_field
+
+ if field.type == VLAN_VID:
+ shim = frame.getlayer(Dot1Q)
+ shim.vlan = field.vlan_vid & 4095
+
+ elif field.type == VLAN_PCP:
+ shim = frame.getlayer(Dot1Q)
+ shim.prio = field.vlan_pcp
+
+ else:
+ raise NotImplementedError('set_field.field.type=%d'
+ % field.type)
+
+ else:
+ raise NotImplementedError('action.type=%d' % action.type)
+
+ return egress_port, frame
+
+
+class PonSim(object):
+
+ def __init__(self, onus, egress_fun):
+ self.egress_fun = egress_fun
+
+ # Create OLT and hook NNI port up for egress
+ self.olt = SimDevice('olt', 0)
+ self.olt.link(2, lambda _, frame: self.egress_fun(0, frame))
+ self.devices = dict()
+ self.devices[0] = self.olt
+
+ # Create ONUs of the requested number and hook them up with OLT
+ # and with egress fun
+ def mk_egress_fun(port_no):
+ return lambda _, frame: self.egress_fun(port_no, frame)
+
+ def mk_onu_ingress(onu):
+ return lambda _, frame: onu.ingress(1, frame)
+
+ for i in range(onus):
+ port_no = 128 + i
+ onu = SimDevice('onu%d' % i, port_no)
+ onu.link(1, lambda _, frame: self.olt.ingress(1, frame))
+ onu.link(2, mk_egress_fun(port_no))
+ self.olt.link(1, mk_onu_ingress(onu))
+ self.devices[port_no] = onu
+
+ def get_ports(self):
+ return sorted(self.devices.keys())
+
+ def olt_install_flows(self, flows):
+ self.olt.install_flows(flows)
+
+ def onu_install_flows(self, onu_port, flows):
+ self.devices[onu_port].install_flows(flows)
+
+ def ingress(self, port, frame):
+ if not isinstance(frame, Packet):
+ frame = Ether(frame)
+ self.devices[port].ingress(2, frame)
+
diff --git a/ponsim/ponsim.yml b/ponsim/ponsim.yml
new file mode 100644
index 0000000..0f4ad04
--- /dev/null
+++ b/ponsim/ponsim.yml
@@ -0,0 +1,31 @@
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: '%(message)s'
+ default:
+ format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(module)s.%(funcName)s %(message)s'
+ datefmt: '%Y%m%dT%H%M%S'
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ level: DEBUG
+ formatter: default
+ stream: ext://sys.stdout
+ null:
+ class: logging.NullHandler
+
+ loggers:
+ amqp:
+ handlers: [null]
+ propagate: False
+ conf:
+ handlers: [null]
+ propagate: False
+ '': # root logger
+ handlers: [console]
+ level: INFO # this can be bumped up/down by -q and -v command line
+ # options
+ propagate: False
diff --git a/ponsim/realio.py b/ponsim/realio.py
new file mode 100644
index 0000000..bca3382
--- /dev/null
+++ b/ponsim/realio.py
@@ -0,0 +1,68 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from common.frameio.frameio import FrameIOManager, hexify
+
+log = structlog.get_logger()
+
+
+class RealIo(object):
+
+ def __init__(self, iface_map):
+ self.port_to_iface_name = iface_map
+ self.iface_name_to_port = dict((n, p) for p, n in iface_map.items())
+ self.frame_io = FrameIOManager()
+ self.ponsim = None
+ self.io_ports = dict()
+
+ @inlineCallbacks
+ def start(self):
+ log.debug('starting')
+ yield self.frame_io.start()
+ for port, iface_name in self.port_to_iface_name.items():
+ io_port = self.frame_io.add_interface(iface_name, self.ingress)
+ self.io_ports[port] = io_port
+ log.info('started')
+ returnValue(self)
+
+ def stop(self):
+ log.debug('stopping')
+ for port in self.io_ports.values():
+ self.frame_io.del_interface(port.iface_name)
+ self.frame_io.stop()
+ log.info('stopped')
+
+ def register_ponsim(self, ponsim):
+ self.ponsim = ponsim
+
+ def ingress(self, io_port, frame):
+ port = self.iface_name_to_port.get(io_port.iface_name)
+ log.debug('ingress', port=port, iface_name=io_port.iface_name,
+ frame=hexify(frame))
+ decoded_frame = Ether(frame)
+ if self.ponsim is not None:
+ self.ponsim.ingress(port, decoded_frame)
+
+ def egress(self, port, frame):
+ if isinstance(frame, Packet):
+ frame = str(frame)
+ io_port = self.io_ports[port]
+ log.debug('sending', port=port, frame=hexify(frame))
+ io_port.send(frame)
diff --git a/ponsim/test_ponsim.py b/ponsim/test_ponsim.py
new file mode 100644
index 0000000..365468b
--- /dev/null
+++ b/ponsim/test_ponsim.py
@@ -0,0 +1,372 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from unittest import TestCase, main
+
+from scapy.layers.inet import IP
+from scapy.layers.l2 import Ether, Dot1Q, EAPOL
+
+from ponsim import PonSim
+from voltha.extensions.IGMP import IGMP_TYPE_V3_MEMBERSHIP_REPORT, IGMPv3gr, \
+ IGMPv3, IGMP_TYPE_MEMBERSHIP_QUERY
+from voltha.extensions.IGMP import IGMP_V3_GR_TYPE_EXCLUDE
+from voltha.protos import third_party
+from voltha.core.flow_decomposer import *
+_ = third_party
+
+
+class TestPonSim(TestCase):
+
+ def setUp(self):
+ self.output = []
+ self.pon = PonSim(onus=2, egress_fun=lambda port, frame:
+ self.output.append((port, frame)))
+
+ def reset_output(self):
+ while self.output:
+ self.output.pop()
+
+ def ingress_frame(self, frame, ports=None):
+ if ports is None:
+ ports = self.pon.get_ports()
+ if isinstance(ports, int):
+ ports = [ports]
+ for port in ports:
+ self.pon.ingress(port, frame)
+
+ def assert_dont_pass(self, frame, ports=None):
+ self.reset_output()
+ self.ingress_frame(frame, ports)
+ self.assertEqual(self.output, [])
+
+ def assert_untagged_frames_dont_pass(self, ports=None):
+ self.assert_dont_pass(Ether(), ports=ports)
+
+ def test_basics(self):
+ self.assertEqual(self.pon.get_ports(), [0, 128, 129])
+
+ def test_by_default_no_traffic_passes(self):
+ self.assert_untagged_frames_dont_pass()
+
+ def test_downstream_unicast_forwarding(self):
+
+ self.pon.olt_install_flows([
+ mk_flow_stat(
+ match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+ actions=[pop_vlan(), output(1)]
+ )
+ ])
+ self.pon.onu_install_flows(128, [
+ mk_flow_stat(
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+ )
+ ])
+
+ # untagged frames shall not get through
+ self.assert_untagged_frames_dont_pass()
+
+ # incorrect single- or double-tagged frames don't pass
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=128) / Dot1Q(vlan=1000) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP())
+
+ # properly tagged downstream frame gets through and pops up at port 128
+ # as untagged
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame)])
+
+ def test_upstream_unicast_forwarding(self):
+
+ self.pon.onu_install_flows(128, [
+ mk_flow_stat(
+ match_fields=[in_port(2), vlan_vid(4096 + 0)],
+ actions=[set_field(vlan_vid(4096 + 128)), output(1)]
+ )
+ ])
+ self.pon.olt_install_flows([
+ mk_flow_stat(
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+ output(2)]
+ )
+ ])
+
+ # untagged frames shall not get through
+ self.assert_untagged_frames_dont_pass()
+
+ # incorrect single- or double-tagged frames don't pass
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=1000) / IP())
+ self.assert_dont_pass(Ether() / Dot1Q(vlan=128) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP())
+ self.assert_dont_pass(
+ Ether() / Dot1Q(vlan=129) / Dot1Q(vlan=1000) / IP())
+
+ # properly tagged downstream frame gets through and pops up at port 128
+ # as untagged
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(0, out_frame)])
+
+
+ def setup_all_flows(self):
+
+ self.pon.olt_install_flows([
+ mk_flow_stat(
+ priority=2000,
+ match_fields=[in_port(2), vlan_vid(4096 + 4000), vlan_pcp(0)],
+ actions=[pop_vlan(), output(1)]
+ ),
+ mk_flow_stat(
+ priority=2000,
+ match_fields=[in_port(1), eth_type(0x888e)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[in_port(1), eth_type(0x800), ip_proto(2)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[in_port(1), eth_type(0x800), ip_proto(17),
+ udp_dst(67)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 4000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[in_port(2), vlan_vid(4096 + 140)],
+ actions=[pop_vlan(), output(1)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(4096 + 1000)],
+ actions=[pop_vlan(), output(1)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[
+ push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+ output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 129)],
+ actions=[
+ push_vlan(0x8100), set_field(vlan_vid(4096 + 1000)),
+ output(2)]
+ ),
+ ])
+
+ self.pon.onu_install_flows(128, [
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(4096 + 0)],
+ actions=[
+ set_field(vlan_vid(4096 + 128)), output(1)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010102)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 128)],
+ actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(0)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 128)),
+ output(1)]
+ )
+ ])
+
+ self.pon.onu_install_flows(129, [
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(4096 + 0)],
+ actions=[
+ set_field(vlan_vid(4096 + 129)), output(1)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010103)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=1000,
+ match_fields=[
+ in_port(1), eth_type(0x800), ipv4_dst(0xe4010104)],
+ actions=[output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(1), vlan_vid(4096 + 129)],
+ actions=[set_field(vlan_vid(4096 + 0)), output(2)]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[in_port(2), vlan_vid(0)],
+ actions=[push_vlan(0x8100), set_field(vlan_vid(4096 + 129)),
+ output(1)]
+ )
+ ])
+
+ def test_combo_block_untagged_downstream(self):
+ self.setup_all_flows()
+ self.assert_untagged_frames_dont_pass(ports=0)
+
+ def test_eapol_in(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / EAPOL(type=1)
+ out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+ out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+ def test_eapol_out(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) / EAPOL(type=1)
+ in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) / EAPOL(type=1)
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / EAPOL(type=1)
+ self.ingress_frame(in_frame1)
+ self.ingress_frame(in_frame2)
+ self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+ def test_igmp_in(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+ gaddr="224.0.0.1")
+ mr.grps = [
+ IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="228.1.1.3")]
+
+ in_frame = Ether(**kw) / IP() / mr
+ out_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+ in_frame.payload.copy()
+ out_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+ in_frame.payload.copy()
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(0, out_frame1), (0, out_frame2)])
+
+ def test_igmp_out(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120)
+ in_frame1 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=128) /\
+ IP() / mq.copy()
+ in_frame2 = Ether(**kw) / Dot1Q(vlan=4000) / Dot1Q(vlan=129) /\
+ IP() / mq.copy()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP() / mq.copy()
+ self.ingress_frame(in_frame1)
+ self.ingress_frame(in_frame2)
+ self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+ def test_combo_downstream_unicast_onu1(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.reset_output()
+ self.ingress_frame(in_frame, ports=0)
+ self.assertEqual(self.output, [(128, out_frame)])
+
+ def test_combo_downstream_unicast_onu2(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.reset_output()
+ self.ingress_frame(in_frame, ports=0)
+ self.assertEqual(self.output, [(129, out_frame)])
+
+ def test_combo_upstream_unicast_onu1(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=128) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame)])
+
+ def test_combo_upstream_unicast_onu2(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=1000) / Dot1Q(vlan=129) / IP()
+ out_frame = Ether(**kw) / Dot1Q(vlan=0) / IP()
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(129, out_frame)])
+
+ def test_combo_multicast_stream1(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.1')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [])
+
+ def test_combo_multicast_stream2(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.2')
+ out_frame = Ether(**kw) / IP(dst='228.1.1.2')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame),])
+
+ def test_combo_multicast_stream3(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.3')
+ out_frame = Ether(**kw) / IP(dst='228.1.1.3')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(129, out_frame),])
+
+ def test_combo_multicast_stream4(self):
+ self.setup_all_flows()
+ kw = dict(src='00:00:00:11:11:11', dst='00:00:00:22:22:22')
+ in_frame = Ether(**kw) / Dot1Q(vlan=140) / IP(dst='228.1.1.4')
+ out_frame = Ether(**kw) / IP(dst='228.1.1.4')
+ self.ingress_frame(in_frame)
+ self.assertEqual(self.output, [(128, out_frame), (129, out_frame)])
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index 4ddeeb5..d2ee546 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -215,3 +215,7 @@
# by returning we allow the device to be shown as active, which
# indirectly verified that message passing works
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index e59d5dd..5e9fef6 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -141,6 +141,16 @@
:return: None
"""
+ def receive_packet_out(logical_device_id, egress_port_no, msg):
+ """
+ Pass a packet_out message content to adapter so that it can forward it
+ out to the device. This is only called on root devices.
+ :param logical_device_id:
+ :param egress_port: egress logical port number
+ :param msg: actual message
+ :return: None
+ """
+
# TODO work in progress
# ...
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 559a430..bcc442b 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -297,3 +297,7 @@
def receive_proxied_message(self, proxy_address, msg):
raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/microsemi/microsemi.py b/voltha/adapters/microsemi/microsemi.py
index d59a994..aea79f9 100644
--- a/voltha/adapters/microsemi/microsemi.py
+++ b/voltha/adapters/microsemi/microsemi.py
@@ -122,6 +122,10 @@
def update_flows_incrementally(self, device, flow_changes, group_changes):
raise NotImplementedError()
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+
##
# Private methods
##
diff --git a/voltha/adapters/ponsim_olt/__init__.py b/voltha/adapters/ponsim_olt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/ponsim_olt/__init__.py
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
new file mode 100644
index 0000000..9bd267a
--- /dev/null
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -0,0 +1,322 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fully simulated OLT/ONU adapter.
+"""
+from uuid import uuid4
+
+import grpc
+import structlog
+from scapy.layers.l2 import Ether, Dot1Q
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from common.frameio.frameio import BpfProgramFilter, hexify
+from voltha.adapters.interface import IAdapterInterface
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos import third_party
+from voltha.protos import ponsim_pb2
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
+from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
+ AdminState
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port, Device
+from voltha.protos.health_pb2 import HealthStatus
+from google.protobuf.empty_pb2 import Empty
+
+from voltha.protos.logical_device_pb2 import LogicalPort, LogicalDevice
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD, \
+ OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
+ ofp_switch_features, ofp_desc
+from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.ponsim_pb2 import FlowTable
+from voltha.registry import registry
+
+_ = third_party
+log = structlog.get_logger()
+
+
+PACKET_IN_VLAN = 4000
+is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
+ PACKET_IN_VLAN))
+
+
+@implementer(IAdapterInterface)
+class PonSimOltAdapter(object):
+
+ name = 'ponsim_olt'
+
+ supported_device_types = [
+ DeviceType(
+ id=name,
+ adapter=name,
+ accepts_bulk_flow_update=True
+ )
+ ]
+
+ def __init__(self, adapter_agent, config):
+ self.adapter_agent = adapter_agent
+ self.config = config
+ self.descriptor = Adapter(
+ id=self.name,
+ vendor='Voltha project',
+ version='0.4',
+ config=AdapterConfig(log_level=LogLevel.INFO)
+ )
+ self.devices_handlers = dict() # device_id -> PonSimOltHandler()
+ self.logical_device_id_to_root_device_id = dict()
+
+ def start(self):
+ log.debug('starting')
+ log.info('started')
+
+ def stop(self):
+ log.debug('stopping')
+ log.info('stopped')
+
+ def adapter_descriptor(self):
+ return self.descriptor
+
+ def device_types(self):
+ return DeviceTypes(items=self.supported_device_types)
+
+ def health(self):
+ return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
+
+ def change_master_state(self, master):
+ raise NotImplementedError()
+
+ def adopt_device(self, device):
+ self.devices_handlers[device.id] = PonSimOltHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].activate, device)
+ return device
+
+ def abandon_device(self, device):
+ raise NotImplementedError()
+
+ def deactivate_device(self, device):
+ raise NotImplementedError()
+
+ def update_flows_bulk(self, device, flows, groups):
+ log.info('bulk-flow-update', device_id=device.id,
+ flows=flows, groups=groups)
+ assert len(groups.items) == 0
+ handler = self.devices_handlers[device.id]
+ return handler.update_flow_table(flows.items)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ handler = self.devices_handlers[proxy_address.device_id]
+ handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+
+ def ldi_to_di(ldi):
+ di = self.logical_device_id_to_root_device_id.get(ldi)
+ if di is None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ di = logical_device.root_device_id
+ self.logical_device_id_to_root_device_id[ldi] = di
+ return di
+
+ device_id = ldi_to_di(logical_device_id)
+ handler = self.devices_handlers[device_id]
+ handler.packet_out(egress_port_no, msg)
+
+
+class PonSimOltHandler(object):
+
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.channel = None
+ self.io_port = None
+ self.logical_device_id = None
+ self.interface = registry('main').get_args().interface
+
+ def __del__(self):
+ if self.io_port is not None:
+ registry('frameio').del_interface(self.interface)
+
+ def get_channel(self):
+ if self.channel is None:
+ device = self.adapter_agent.get_device(self.device_id)
+ self.channel = grpc.insecure_channel(device.host_and_port)
+ return self.channel
+
+ def activate(self, device):
+ self.log.info('activating')
+
+ if not device.host_and_port:
+ device.oper_status = OperStatus.FAILED
+ device.reason = 'No host_and_port field provided'
+ self.adapter_agent.update_device(device)
+ return
+
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ info = stub.GetDeviceInfo(Empty())
+ log.info('got-info', info=info)
+
+ device.root = True
+ device.vendor = 'ponsim'
+ device.model ='n/a'
+ device.serial_number = device.host_and_port
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ nni_port = Port(
+ port_no=2,
+ label='NNI facing Ethernet port',
+ type=Port.ETHERNET_NNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.adapter_agent.add_port(device.id, nni_port)
+ self.adapter_agent.add_port(device.id, Port(
+ port_no=1,
+ label='PON port',
+ type=Port.PON_OLT,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ ))
+
+ ld = LogicalDevice(
+ # not setting id and datapth_id will let the adapter agent pick id
+ desc=ofp_desc(
+ mfr_desc='cord porject',
+ hw_desc='simualted pon',
+ sw_desc='simualted pon',
+ serial_num=uuid4().hex,
+ dp_desc='n/a'
+ ),
+ switch_features=ofp_switch_features(
+ n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ OFPC_FLOW_STATS
+ | OFPC_TABLE_STATS
+ | OFPC_PORT_STATS
+ | OFPC_GROUP_STATS
+ )
+ ),
+ root_device_id=device.id
+ )
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(ld_initialized.id, LogicalPort(
+ id='nni',
+ ofp_port=ofp_port(
+ port_no=info.nni_port,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % info.nni_port),
+ name='nni',
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=nni_port.port_no,
+ root_port=True
+ ))
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld_initialized.id
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+ self.logical_device_id = ld_initialized.id
+
+ # register ONUS per uni port
+ for port_no in info.uni_ports:
+ vlan_id = port_no
+ self.adapter_agent.child_device_detected(
+ parent_device_id=device.id,
+ parent_port_no=1,
+ child_device_type='ponsim_onu',
+ proxy_address=Device.ProxyAddress(
+ device_id=device.id,
+ channel_id=vlan_id
+ ),
+ vlan=vlan_id
+ )
+
+ # finally, open the frameio port to receive in-band packet_in messages
+ self.log.info('registering-frameio')
+ self.io_port = registry('frameio').add_interface(
+ self.interface, self.rcv_io, is_inband_frame)
+ self.log.info('registered-frameio')
+
+ def rcv_io(self, port, frame):
+ self.log.info('reveived', iface_name=port.iface_name,
+ frame_len=len(frame))
+ pkt = Ether(frame)
+ if pkt.haslayer(Dot1Q):
+ outer_shim = pkt.getlayer(Dot1Q)
+ if isinstance(outer_shim.payload, Dot1Q):
+ inner_shim = outer_shim.payload
+ cvid = inner_shim.vlan
+ logical_port = cvid
+ popped_frame = (
+ Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+ inner_shim.payload
+ )
+ kw = dict(
+ logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port,
+ )
+ self.log.info('sending-packet-in', **kw)
+ self.adapter_agent.send_packet_in(
+ packet=str(popped_frame), **kw)
+
+ def update_flow_table(self, flows):
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ self.log.info('pushing-olt-flow-table')
+ stub.UpdateFlowTable(FlowTable(
+ port=0,
+ flows=flows
+ ))
+ self.log.info('success')
+
+ def send_proxied_message(self, proxy_address, msg):
+ self.log.info('sending-proxied-message')
+ if isinstance(msg, FlowTable):
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ self.log.info('pushing-onu-flow-table', port=msg.port)
+ res = stub.UpdateFlowTable(msg)
+ self.adapter_agent.receive_proxied_message(proxy_address, res)
+
+ def packet_out(self, egress_port, msg):
+ self.log.info('sending-packet-out', egress_port=egress_port,
+ msg=hexify(msg))
+ pkt = Ether(msg)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=4000) /
+ Dot1Q(vlan=egress_port, type=pkt.type) /
+ pkt.payload
+ )
+ self.io_port.send(str(out_pkt))
diff --git a/voltha/adapters/ponsim_onu/__init__.py b/voltha/adapters/ponsim_onu/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/ponsim_onu/__init__.py
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
new file mode 100644
index 0000000..d416db7
--- /dev/null
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -0,0 +1,221 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Fully simulated OLT/ONU adapter.
+"""
+
+import structlog
+from twisted.internet import reactor
+from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from zope.interface import implementer
+
+from voltha.adapters.interface import IAdapterInterface
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos import third_party
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
+from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
+ AdminState
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port
+from voltha.protos.health_pb2 import HealthStatus
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_1GB_FD
+from voltha.protos.openflow_13_pb2 import ofp_port
+from voltha.protos.ponsim_pb2 import FlowTable
+
+_ = third_party
+log = structlog.get_logger()
+
+
+@implementer(IAdapterInterface)
+class PonSimOnuAdapter(object):
+
+ name = 'ponsim_onu'
+
+ supported_device_types = [
+ DeviceType(
+ id=name,
+ adapter=name,
+ accepts_bulk_flow_update=True
+ )
+ ]
+
+ def __init__(self, adapter_agent, config):
+ self.adapter_agent = adapter_agent
+ self.config = config
+ self.descriptor = Adapter(
+ id=self.name,
+ vendor='Voltha project',
+ version='0.4',
+ config=AdapterConfig(log_level=LogLevel.INFO)
+ )
+ self.devices_handlers = dict() # device_id -> PonSimOltHandler()
+
+ def start(self):
+ log.debug('starting')
+ log.info('started')
+
+ def stop(self):
+ log.debug('stopping')
+ log.info('stopped')
+
+ def adapter_descriptor(self):
+ return self.descriptor
+
+ def device_types(self):
+ return DeviceTypes(items=self.supported_device_types)
+
+ def health(self):
+ return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
+
+ def change_master_state(self, master):
+ raise NotImplementedError()
+
+ def adopt_device(self, device):
+ self.devices_handlers[device.id] = PonSimOnuHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].activate, device)
+ return device
+
+ def abandon_device(self, device):
+ raise NotImplementedError()
+
+ def deactivate_device(self, device):
+ raise NotImplementedError()
+
+ def update_flows_bulk(self, device, flows, groups):
+ log.info('bulk-flow-update', device_id=device.id,
+ flows=flows, groups=groups)
+ assert len(groups.items) == 0
+ handler = self.devices_handlers[device.id]
+ return handler.update_flow_table(flows.items)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ log.info('receive-proxied-message', proxy_address=proxy_address,
+ device_id=proxy_address.device_id, msg=msg)
+ handler = self.devices_handlers[proxy_address.device_id]
+ handler.receive_message(msg)
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+
+
+class PonSimOnuHandler(object):
+
+ def __init__(self, adapter, device_id):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.incoming_messages = DeferredQueue()
+ self.proxy_address = None
+
+ def receive_message(self, msg):
+ self.incoming_messages.put(msg)
+
+ def activate(self, device):
+ self.log.info('activating')
+
+ # first we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # populate device info
+ device.root = True
+ device.vendor = 'ponsim'
+ device.model ='n/a'
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # register physical ports
+ uni_port = Port(
+ port_no=2,
+ label='UNI facing Ethernet port',
+ type=Port.ETHERNET_UNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE
+ )
+ self.adapter_agent.add_port(device.id, uni_port)
+ self.adapter_agent.add_port(device.id, Port(
+ port_no=1,
+ label='PON port',
+ type=Port.PON_ONU,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE,
+ peers=[
+ Port.PeerPort(
+ device_id=device.parent_id,
+ port_no=device.parent_port_no
+ )
+ ]
+ ))
+
+ # add uni port to logical device
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ logical_device_id = parent_device.parent_id
+ assert logical_device_id
+ port_no = device.proxy_address.channel_id
+ cap = OFPPF_1GB_FD | OFPPF_FIBER
+ self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
+ id='uni-{}'.format(port_no),
+ ofp_port=ofp_port(
+ port_no=port_no,
+ hw_addr=mac_str_to_tuple('00:00:00:00:00:%02x' % port_no),
+ name='uni-{}'.format(port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=cap,
+ advertised=cap,
+ peer=cap,
+ curr_speed=OFPPF_1GB_FD,
+ max_speed=OFPPF_1GB_FD
+ ),
+ device_id=device.id,
+ device_port_no=uni_port.port_no
+ ))
+
+ device = self.adapter_agent.get_device(device.id)
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ @inlineCallbacks
+ def update_flow_table(self, flows):
+
+ # we need to proxy through the OLT to get to the ONU
+
+ # reset response queue
+ while self.incoming_messages.pending:
+ yield self.incoming_messages.get()
+
+ msg = FlowTable(
+ port=self.proxy_address.channel_id,
+ flows=flows
+ )
+ self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+
+ yield self.incoming_messages.get()
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 1fdc965..0c9227e 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -498,7 +498,7 @@
def send_proxied_message(self, proxy_address, msg):
log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
- # we mimick a response by sending the same message back in a short time
+ # we mimic a response by sending the same message back in a short time
reactor.callLater(
0.2,
self.adapter_agent.receive_proxied_message,
@@ -509,6 +509,10 @@
def receive_proxied_message(self, proxy_address, msg):
raise NotImplementedError()
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+
# ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
def get_test_control_site(self):
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index bdddb99..ca072bb 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -356,3 +356,7 @@
# by returning we allow the device to be shown as active, which
# indirectly verified that message passing works
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 0d260c3..ca7652f 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -504,3 +504,7 @@
def receive_proxied_message(self, proxy_address, msg):
raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index f6c88c9..93714db 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -375,3 +375,7 @@
# by returning we allow the device to be shown as active, which
# indirectly verified that message passing works
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 0c5448f..3bf8857 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -31,6 +31,7 @@
from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
LogicalPort, AdminState
from voltha.registry import registry
+from voltha.core.flow_decomposer import OUTPUT
@implementer(IAdapterAgent)
@@ -64,11 +65,11 @@
try:
adapter = self.adapter_cls(self, config)
yield adapter.start()
+ self.adapter = adapter
+ self.adapter_node_proxy = self._update_adapter_node()
+ self._update_device_types()
except Exception, e:
self.log.exception(e)
- self.adapter = adapter
- self.adapter_node_proxy = self._update_adapter_node()
- self._update_device_types()
self.log.info('started')
returnValue(self)
@@ -199,6 +200,10 @@
return i
i += 1
+ def get_logical_device(self, logical_device_id):
+ return self.root_proxy.get('/logical_devices/{}'.format(
+ logical_device_id))
+
def create_logical_device(self, logical_device):
assert isinstance(logical_device, LogicalDevice)
@@ -210,8 +215,24 @@
self._make_up_to_date('/logical_devices',
logical_device.id, logical_device)
+ self.event_bus.subscribe(
+ topic='packet-out:{}'.format(logical_device.id),
+ callback=lambda _, p: self.receive_packet_out(logical_device.id, p)
+ )
+
return logical_device
+ def receive_packet_out(self, logical_device_id, ofp_packet_out):
+
+ def get_port_out(opo):
+ for action in opo.actions:
+ if action.type == OUTPUT:
+ return action.output.port
+
+ out_port = get_port_out(ofp_packet_out)
+ frame = ofp_packet_out.data
+ self.adapter.receive_packet_out(logical_device_id, out_port, frame)
+
def add_logical_port(self, logical_device_id, port):
assert isinstance(port, LogicalPort)
self._make_up_to_date(
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index f11247e..7dbf0b5 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -17,23 +17,23 @@
"""
Model that captures the current state of a logical device
"""
-import threading
from collections import OrderedDict
import structlog
from common.event_bus import EventBusClient
+from common.frameio.frameio import hexify
from voltha.core.config.config_proxy import CallbackType
from voltha.core.device_graph import DeviceGraph
from voltha.core.flow_decomposer import FlowDecomposer, \
flow_stats_entry_from_flow_mod_message, group_entry_from_group_mod, \
- mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field
+ mk_flow_stat, in_port, vlan_vid, vlan_pcp, pop_vlan, output, set_field, \
+ push_vlan
from voltha.protos import third_party
from voltha.protos import openflow_13_pb2 as ofp
from voltha.protos.device_pb2 import Port
from voltha.protos.logical_device_pb2 import LogicalPort
from voltha.protos.openflow_13_pb2 import Flows, FlowGroups
-from voltha.registry import registry
_ = third_party
@@ -450,16 +450,9 @@
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_OUT ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
def packet_out(self, ofp_packet_out):
- self.log.debug('packet-out', packet=ofp_packet_out)
- print threading.current_thread().name
- print 'PACKET_OUT:', ofp_packet_out
- # TODO for debug purposes, lets turn this around and send it back
- if 0:
- self.packet_in(ofp.ofp_packet_in(
- buffer_id=ofp_packet_out.buffer_id,
- reason=ofp.OFPR_NO_MATCH,
- data=ofp_packet_out.data
- ))
+ self.log.info('packet-out', packet=ofp_packet_out)
+ topic = 'packet-out:{}'.format(self.logical_device_id)
+ self.event_bus.publish(topic, ofp_packet_out)
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ PACKET_IN ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -485,8 +478,8 @@
self.packet_in(packet_in)
def packet_in(self, ofp_packet_in):
- # TODO
- print 'PACKET_IN:', ofp_packet_in
+ self.log.info('packet-in', logical_device_id=self.logical_device_id,
+ pkt=ofp_packet_in, data=hexify(ofp_packet_in.data))
self.local_handler.send_packet_in(
self.logical_device_id, ofp_packet_in)
@@ -630,7 +623,30 @@
set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
output(upstream_ports[0].port_no)
]
- )
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(downstream_ports[0].port_no),
+ vlan_vid(0)
+ ],
+ actions=[
+ push_vlan(0x8100),
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | device.vlan)),
+ output(upstream_ports[0].port_no)
+ ]
+ ),
+ mk_flow_stat(
+ priority=500,
+ match_fields=[
+ in_port(upstream_ports[0].port_no),
+ vlan_vid(ofp.OFPVID_PRESENT | device.vlan)
+ ],
+ actions=[
+ set_field(vlan_vid(ofp.OFPVID_PRESENT | 0)),
+ output(downstream_ports[0].port_no)
+ ]
+ ),
])
groups = OrderedDict()
return flows, groups
diff --git a/voltha/extensions/IGMP.py b/voltha/extensions/IGMP.py
new file mode 100644
index 0000000..6473add
--- /dev/null
+++ b/voltha/extensions/IGMP.py
@@ -0,0 +1,282 @@
+#
+# Copyright 2016 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from scapy.fields import ByteEnumField, FieldLenField, IPField, \
+ FieldListField, XShortField, ConditionalField, BitField, ShortField, \
+ PacketListField
+from scapy.fields import ByteField
+from scapy.layers.inet import IP, bind_layers
+from scapy.layers.inet import IPOption_Router_Alert
+from scapy.layers.l2 import Ether
+from scapy.packet import Packet
+from scapy.utils import atol, hexdump
+from scapy.utils import checksum
+
+IGMP_TYPE_MEMBERSHIP_QUERY = 0x11
+IGMP_TYPE_V3_MEMBERSHIP_REPORT = 0x22
+IGMP_TYPE_V1_MEMBERSHIP_REPORT = 0x12
+IGMP_TYPE_V2_MEMBERSHIP_REPORT = 0x16
+IGMP_TYPE_V2_LEAVE_GROUP = 0x17
+
+IGMP_V3_GR_TYPE_INCLUDE = 0x01
+IGMP_V3_GR_TYPE_EXCLUDE = 0x02
+IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE = 0x03
+IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE = 0x04
+IGMP_V3_GR_TYPE_ALLOW_NEW = 0x05
+IGMP_V3_GR_TYPE_BLOCK_OLD = 0x06
+
+
+"""
+IGMPV3_ALL_ROUTERS = '224.0.0.22'
+IGMPv3 = 3
+IP_SRC = '1.2.3.4'
+ETHERTYPE_IP = 0x0800
+IGMP_DST_MAC = "01:00:5e:00:01:01"
+IGMP_SRC_MAC = "5a:e1:ac:ec:4d:a1"
+"""
+
+
+class IGMPv3gr(Packet):
+ """IGMPv3 Group Record, used in membership report"""
+
+ name = "IGMPv3gr"
+
+ igmp_v3_gr_types = {
+ IGMP_V3_GR_TYPE_INCLUDE: "Include Mode",
+ IGMP_V3_GR_TYPE_EXCLUDE: "Exclude Mode",
+ IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE: "Change to Include Mode",
+ IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE: "Change to Exclude Mode",
+ IGMP_V3_GR_TYPE_ALLOW_NEW: "Allow New Sources",
+ IGMP_V3_GR_TYPE_BLOCK_OLD: "Block Old Sources"
+ }
+
+ fields_desc = [
+ ByteEnumField("rtype", IGMP_V3_GR_TYPE_INCLUDE, igmp_v3_gr_types),
+ ByteField("aux_data_len", 0),
+ FieldLenField("numsrc", None, count_of="sources"),
+ IPField("mcaddr", "0.0.0.0"),
+ FieldListField("sources", None, IPField("src", "0.0.0.0"), "numsrc")
+ ]
+
+ def post_build(self, pkt, payload):
+ pkt += payload
+ if self.aux_data_len != 0:
+ print "WARNING: Auxiliary Data Length must be zero (0)"
+ return pkt
+
+
+class IGMPv3(Packet):
+
+ name = "IGMPv3"
+
+ igmp_v3_types = {
+ IGMP_TYPE_MEMBERSHIP_QUERY: "Membership Query",
+ IGMP_TYPE_V3_MEMBERSHIP_REPORT: " Version 3 Mebership Report",
+ IGMP_TYPE_V2_MEMBERSHIP_REPORT: " Version 2 Mebership Report",
+ IGMP_TYPE_V1_MEMBERSHIP_REPORT: " Version 1 Mebership Report",
+ IGMP_TYPE_V2_LEAVE_GROUP: "Version 2 Leave Group"
+ }
+
+ fields_desc = [
+ ByteEnumField("type", IGMP_TYPE_MEMBERSHIP_QUERY, igmp_v3_types),
+ ByteField("max_resp_code", 0),
+ XShortField("checksum", None),
+ #IPField("group_address", "0.0.0.0"),
+
+ # membership query fields
+ ConditionalField(IPField("gaddr", "0.0.0.0"),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+ ConditionalField(BitField("resv", 0, 4),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+ ConditionalField(BitField("s", 0, 1),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+ ConditionalField(BitField("qrv", 0, 3),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+ ConditionalField(ByteField("qqic", 0),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+ ConditionalField(FieldLenField("numsrc", None, count_of="srcs"),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+ ConditionalField(FieldListField("srcs", None,
+ IPField("src", "0.0.0.0"), "numsrc"),
+ lambda pkt: pkt.type == IGMP_TYPE_MEMBERSHIP_QUERY),
+
+ # membership report fields
+ ConditionalField(
+ ShortField("resv2", 0),
+ lambda pkt: pkt.type == IGMP_TYPE_V3_MEMBERSHIP_REPORT),
+ ConditionalField(
+ FieldLenField("numgrp", None, count_of="grps"),
+ lambda pkt: pkt.type == IGMP_TYPE_V3_MEMBERSHIP_REPORT),
+ ConditionalField(
+ PacketListField("grps", [], IGMPv3gr),
+ lambda pkt: pkt.type == IGMP_TYPE_V3_MEMBERSHIP_REPORT)
+
+ # TODO: v2 and v3 membership reports?
+
+ ]
+
+ def post_build(self, pkt, payload):
+
+ pkt += payload
+
+ # max_resp_code field is reserved (0)
+ if self.type in [IGMP_TYPE_V3_MEMBERSHIP_REPORT,]:
+ mrc = 0
+ else:
+ mrc = self.encode_float(self.max_resp_code)
+ pkt = pkt[:1] + chr(mrc) + pkt[2:]
+
+ if self.checksum is None:
+ chksum = checksum(pkt)
+ pkt = pkt[:2] + chr(chksum >> 8) + chr(chksum & 0xff) + pkt[4:]
+
+ return pkt
+
+ def encode_float(self, value):
+ """Encode max response time value per RFC 3376."""
+ if value < 128:
+ return value
+ if value > 31743:
+ return 255
+ exp = 0
+ value >>= 3
+ while value > 31:
+ exp += 1
+ value >>= 1
+ return 0x80 | (exp << 4) | (value & 0xf)
+
+
+ def decode_float(self, code):
+ if code < 128:
+ return code
+ mant = code & 0xf
+ exp = (code >> 4) & 0x7
+ return (mant | 0x10) << (exp + 3)
+
+ @staticmethod
+ def is_valid_mcaddr(ip):
+ byte1 = atol(ip) >> 24 & 0xff
+ return (byte1 & 0xf0) == 0xe0
+
+ @staticmethod
+ def fixup(pkt):
+ """Fixes up the underlying IP() and Ether() headers."""
+ assert pkt.haslayer(IGMPv3), \
+ "This packet is not an IGMPv4 packet; cannot fix it up"
+
+ igmp = pkt.getlayer(IGMPv3)
+
+ if pkt.haslayer(IP):
+ ip = pkt.getlayer(IP)
+ ip.ttl = 1
+ ip.proto = 2
+ ip.tos = 0xc0
+ ip.options = [IPOption_Router_Alert()]
+
+ if igmp.type == IGMP_TYPE_MEMBERSHIP_QUERY:
+ if igmp.gaddr == "0.0.0.0":
+ ip.dst = "224.0.0.1"
+ else:
+ assert IGMPv3.is_valid_mcaddr(igmp.gaddr), \
+ "IGMP membership query with invalid mcast address"
+ ip.dst = igmp.gaddr
+
+ elif igmp.type == IGMP_TYPE_V2_LEAVE_GROUP and \
+ IGMPv3.is_valid_mcaddr(igmp.gaddr):
+ ip.dst = "224.0.0.2"
+
+ elif igmp.type in (IGMP_TYPE_V1_MEMBERSHIP_REPORT,
+ IGMP_TYPE_V2_MEMBERSHIP_REPORT) and \
+ IGMPv3.is_valid_mcaddr(igmp.gaddr):
+ ip.dst = igmp.gaddr
+
+ # We do not need to fixup the ether layer, it is done by scapy
+ #
+ # if pkt.haslayer(Ether):
+ # eth = pkt.getlayer(Ether)
+ # ip_long = atol(ip.dst)
+ # ether.dst = '01:00:5e:%02x:%02x:%02x' % (
+ # (ip_long >> 16) & 0x7f, (ip_long >> 8) & 0xff,
+ # ip_long & 0xff )
+
+ return pkt
+
+
+bind_layers(IP, IGMPv3, frag=0, proto=2, ttl=1, tos=0xc0)
+bind_layers(IGMPv3, IGMPv3gr, frag=0, proto=2)
+bind_layers(IGMPv3gr, IGMPv3gr, frag=0, proto=2)
+
+
+if __name__ == "__main__":
+
+ print "test float encoding"
+ from math import log
+ max_expected_error = 1.0 / (2<<3) # four bit precision
+ p = IGMPv3()
+ for v in range(0, 31745):
+ c = p.encode_float(v)
+ d = p.decode_float(c)
+ rel_err = float(v-d)/v if v!=0 else 0.0
+ assert rel_err <= max_expected_error
+
+ print "construct membership query - general query"
+ mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120)
+ hexdump(str(mq))
+
+ print "construct membership query - group-specific query"
+ mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120,
+ gaddr="224.0.0.1")
+ hexdump(str(mq))
+
+ print "construct membership query - group-and-source-specific query"
+ mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY, max_resp_code=120,
+ gaddr="224.0.0.1")
+ mq.srcs = ['1.2.3.4', '5.6.7.8']
+ hexdump(str(mq))
+
+ print "fixup"
+ mq = IGMPv3(type=IGMP_TYPE_MEMBERSHIP_QUERY)
+ mq.srcs = ['1.2.3.4', '5.6.7.8']
+ pkt = Ether() / IP() / mq
+ print "before fixup:"
+ hexdump(str(pkt))
+
+ print "after fixup:"
+ IGMPv3.fixup(pkt)
+ hexdump(str(pkt))
+
+ print "construct v3 membership report - join a single group"
+ mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+ gaddr="224.0.0.1")
+ mr.grps = [IGMPv3gr( rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="229.10.20.30")]
+ hexdump(mr)
+
+ print "construct v3 membership report - join two groups"
+ mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+ gaddr="224.0.0.1")
+ mr.grps = [
+ IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="229.10.20.30"),
+ IGMPv3gr(rtype=IGMP_V3_GR_TYPE_EXCLUDE, mcaddr="229.10.20.31")
+ ]
+ hexdump(mr)
+
+ print "construct v3 membership report - leave a group"
+ mr = IGMPv3(type=IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
+ gaddr="224.0.0.1")
+ mr.grps = [IGMPv3gr(rtype=IGMP_V3_GR_TYPE_INCLUDE, mcaddr="229.10.20.30")]
+ hexdump(mr)
+
+ print "all ok"
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 285c5ae..b4d3c46 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -113,6 +113,8 @@
// ("xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx:xxxx")
string ipv6_address = 15;
+ string host_and_port = 21;
+
};
ProxyAddress proxy_address = 19;
@@ -121,6 +123,8 @@
OperStatus.OperStatus oper_status = 17 [(access) = READ_ONLY];
+ string reason = 22 [(access) = READ_ONLY]; // Used in FAILED state
+
ConnectStatus.ConnectStatus connect_status = 18 [(access) = READ_ONLY];
// TODO additional common attribute here
diff --git a/voltha/protos/ponsim.proto b/voltha/protos/ponsim.proto
new file mode 100644
index 0000000..3bc4d36
--- /dev/null
+++ b/voltha/protos/ponsim.proto
@@ -0,0 +1,27 @@
+syntax = "proto3";
+
+package voltha;
+
+import "google/protobuf/empty.proto";
+import "openflow_13.proto";
+
+
+message PonSimDeviceInfo {
+ int32 nni_port = 1;
+ repeated int32 uni_ports = 2;
+}
+
+message FlowTable {
+ int32 port = 1; // Used to address right device
+ repeated openflow_13.ofp_flow_stats flows = 2;
+}
+
+service PonSim {
+
+ rpc GetDeviceInfo(google.protobuf.Empty)
+ returns(PonSimDeviceInfo) {}
+
+ rpc UpdateFlowTable(FlowTable)
+ returns(google.protobuf.Empty) {}
+
+}