Adtran OLT DA update. Flows, h/w sync, and mcast
Change-Id: Icd796ee91edf3b46226e6fe606e8a7d572b6c956
diff --git a/voltha/adapters/adtran_olt/README.md b/voltha/adapters/adtran_olt/README.md
index f9257e9..2205d81 100644
--- a/voltha/adapters/adtran_olt/README.md
+++ b/voltha/adapters/adtran_olt/README.md
@@ -4,16 +4,18 @@
extension of the existing **preprovision_olt** command and these are placed after
entering two dashes '_--_'. The full syntax to use is.
-| Short | Long | Default | Notes |
-| :---: | :------------: | :-----: | ----- |
-| -u | --nc_username | '' | NETCONF Username |
-| -p | --nc_password | '' | NETCONF Password |
-| -t | --nc_port | 830 | NETCONF TCP Port |
-| -U | --rc_username | '' | REST Username |
-| -P | --rc_password | '' | REST Password |
-| -T | --rc_port | 8081 | REST TCP Port |
-| -z | --zmq_port | 5656 | ZeroMQ OMCI Proxy Port |
-| -a | --autoactivate | False | Autoactivate ONUs, xPON othewise |
+| Short | Long | Default | Notes |
+| :---: | :--------------: | :-----: | ----- |
+| -u | --nc_username | '' | NETCONF Username |
+| -p | --nc_password | '' | NETCONF Password |
+| -t | --nc_port | 830 | NETCONF TCP Port |
+| -U | --rc_username | '' | REST Username |
+| -P | --rc_password | '' | REST Password |
+| -T | --rc_port | 8081 | REST TCP Port |
+| -z | --zmq_port | 5656 | ZeroMQ OMCI Proxy Port |
+| -a | --autoactivate | False | Autoactivate ONUs, xPON othewise |
+| -M | --multicast_vlan | 4092 | Multicast VLANs (comma-delimeted) |
+| -V | --packet_in_vlan | 4000 | OpenFlow Packet-In/Out VLAN |
For example, if your Adtran OLT is address 10.17.174.193 with the default TCP ports and
NETCONF credentials of admin/admin and REST credentials of ADMIN/ADMIN, the command line
diff --git a/voltha/adapters/adtran_olt/adapter_pm_metrics.py b/voltha/adapters/adtran_olt/adapter_pm_metrics.py
index aef8be1..dc74877 100644
--- a/voltha/adapters/adtran_olt/adapter_pm_metrics.py
+++ b/voltha/adapters/adtran_olt/adapter_pm_metrics.py
@@ -1,5 +1,4 @@
-#
-# Copyright 2017 the original author or authors.
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -22,13 +21,14 @@
class AdapterPmMetrics:
def __init__(self, adapter, device):
- self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
- 'tx_256_511_pkts', 'tx_512_1023_pkts',
- 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
- 'rx_64_pkts', 'rx_65_127_pkts',
- 'rx_128_255_pkts', 'rx_256_511_pkts',
- 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
- 'rx_1519_9k_pkts'}
+ # self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+ # 'tx_256_511_pkts', 'tx_512_1023_pkts',
+ # 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+ # 'rx_64_pkts', 'rx_65_127_pkts',
+ # 'rx_128_255_pkts', 'rx_256_511_pkts',
+ # 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+ # 'rx_1519_9k_pkts'}
+ self.pm_names = {'rx_frames', 'tx_frames'}
self.log = structlog.get_logger(device_id=device.id)
self.device = device
self.id = device.id
@@ -67,33 +67,40 @@
enabled=pm.enabled)])
return pm_config
- def collect_port_metrics(self, channel):
- rtrn_port_metrics = dict()
+ def collect_port_metrics(self):
+ port_metrics = dict()
# TODO: Implement
- # stub = ponsim_pb2.PonSimStub(channel)
- # stats = stub.GetStats(Empty())
- # rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
- # rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
- return rtrn_port_metrics
+ stats = {}
+ port_metrics['pon'] = self.extract_pon_metrics(stats, 100)
+ port_metrics['nni'] = self.extract_nni_metrics(stats, 200)
+ return port_metrics
- def extract_pon_metrics(self, stats):
- rtrn_pon_metrics = dict()
+ def extract_pon_metrics(self, stats, fake_value):
+ return {
+ 'rx_frames': fake_value,
+ 'tx_frames': fake_value
+ }
+ # rtrn_pon_metrics = dict()
+ #
+ # for m in stats.metrics:
+ # if m.port_name == "pon":
+ # for p in m.packets:
+ # if self.pon_metrics_config[p.name].enabled:
+ # rtrn_pon_metrics[p.name] = p.value
+ # return rtrn_pon_metrics
- for m in stats.metrics:
- if m.port_name == "pon":
- for p in m.packets:
- if self.pon_metrics_config[p.name].enabled:
- rtrn_pon_metrics[p.name] = p.value
- return rtrn_pon_metrics
-
- def extract_nni_metrics(self, stats):
- rtrn_pon_metrics = dict()
- for m in stats.metrics:
- if m.port_name == "nni":
- for p in m.packets:
- if self.pon_metrics_config[p.name].enabled:
- rtrn_pon_metrics[p.name] = p.value
- return rtrn_pon_metrics
+ def extract_nni_metrics(self, stats, fake_value):
+ return {
+ 'rx_frames': fake_value,
+ 'tx_frames': fake_value
+ }
+ # rtrn_pon_metrics = dict()
+ # for m in stats.metrics:
+ # if m.port_name == "nni":
+ # for p in m.packets:
+ # if self.pon_metrics_config[p.name].enabled:
+ # rtrn_pon_metrics[p.name] = p.value
+ # return rtrn_pon_metrics
def start_collector(self, callback):
self.log.info("starting-pm-collection", device_name=self.name,
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index 8f6ed6c..5422a07 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -25,6 +25,7 @@
import json
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python.failure import Failure
from voltha.adapters.adtran_olt.net.adtran_netconf import AdtranNetconfClient
from voltha.adapters.adtran_olt.net.adtran_rest import AdtranRestClient
@@ -46,10 +47,9 @@
_ = third_party
-_PACKET_IN_VLAN = 4000
-_MULTICAST_VLAN = 4092
+DEFAULT_PACKET_IN_VLAN = 4000
+DEFAULT_MULTICAST_VLAN = 4050
_MANAGEMENT_VLAN = 4093
-_is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(_PACKET_IN_VLAN))
_DEFAULT_RESTCONF_USERNAME = ""
_DEFAULT_RESTCONF_PASSWORD = ""
@@ -99,13 +99,15 @@
self.adapter_agent = adapter.adapter_agent
self.device_id = device_id
self.log = structlog.get_logger(device_id=device_id)
- self.startup = None
+ self.startup = None # Startup/reboot defeered
self.channel = None # Proxy messaging channel with 'send' method
self.io_port = None
self.logical_device_id = None
self.interface = registry('main').get_args().interface
self.pm_metrics = None
self.alarms = None
+ self.packet_in_vlan = DEFAULT_PACKET_IN_VLAN
+ self.multicast_vlans = [DEFAULT_MULTICAST_VLAN]
# Northbound and Southbound ports
self.northbound_ports = {} # port number -> Port
@@ -142,18 +144,15 @@
# registered (via xPON API/CLI) before they are activated.
self._autoactivate = False
-
- # TODO Remove items below after one PON fully supported and working as expected
- self.max_nni_ports = 1
- self.max_pon_ports = 1
-
+ self.max_nni_ports = 1 # TODO: This is a VOLTHA imposed limit in 'low_decomposer.py
+ # and logical_device_agent.py
# OMCI ZMQ Channel
self.zmq_port = DEFAULT_ZEROMQ_OMCI_TCP_PORT
# Heartbeat support
self.heartbeat_count = 0
self.heartbeat_miss = 0
- self.heartbeat_interval = 10 # TODO: Decrease before release or any scale testing
+ self.heartbeat_interval = 5 # TODO: Decrease before release or any scale testing
self.heartbeat_failed_limit = 3
self.heartbeat_timeout = 5
self.heartbeat = None
@@ -205,7 +204,7 @@
return list(self._evcs.values())
def add_evc(self, evc):
- if self._evcs is not None:
+ if self._evcs is not None and evc.name not in self._evcs:
self._evcs[evc.name] = evc
def remove_evc(self, evc):
@@ -229,6 +228,12 @@
raise argparse.ArgumentTypeError("%s is a not a valid port number" % value)
return ivalue
+ def check_vid(value):
+ ivalue = int(value)
+ if ivalue <= 1 or ivalue > 4094:
+ raise argparse.ArgumentTypeError("Valid VLANs are 2..4094")
+ return ivalue
+
parser = argparse.ArgumentParser(description='Adtran Device Adapter')
parser.add_argument('--nc_username', '-u', action='store', default=_DEFAULT_NETCONF_USERNAME,
help='NETCONF username')
@@ -246,10 +251,21 @@
type=check_tcp_port, help='ZeroMQ Port')
parser.add_argument('--autoactivate', '-a', action='store_true', default=False,
help='Autoactivate / Demo mode')
+ parser.add_argument('--multicast_vlan', '-M', action='store',
+ default='{}'.format(DEFAULT_MULTICAST_VLAN),
+ help='Multicast VLAN')
+ parser.add_argument('--packet_in_vlan', '-V', action='store', default=DEFAULT_PACKET_IN_VLAN,
+ type=check_vid, help='OpenFlow Packet-In/Out VLAN')
try:
args = parser.parse_args(shlex.split(device.extra_args))
+ self.packet_in_vlan = args.packet_in_vlan
+ self._is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.
+ format(self.packet_in_vlan))
+ # May have multiple multicast VLANs
+ self.multicast_vlans = [int(vid.strip()) for vid in args.multicast_vlan.split(',')]
+
self.netconf_username = args.nc_username
self.netconf_password = args.nc_password
self.netconf_port = args.nc_port
@@ -276,7 +292,7 @@
well as ONU auto activation. useful for demos
If autoactivate is enabled, the default startup state (first time) for a PON port is disabled
- If autoactivate is disabled, the efault startup state for a PON port is enabled
+ If autoactivate is disabled, the default startup state for a PON port is enabled
"""
return self._autoactivate
@@ -346,11 +362,16 @@
image_names = list(set([results.get(img, 'unknown') for img in leafs]))
images = []
+ image_count = 1
for name in image_names:
# TODO: Look into how to find out hash, is_valid, and install date/time
- image = Image(name=name, version=name,
+ image = Image(name='Candidate_{}'.format(image_count),
+ version=name,
is_active=(name == results.get('running-revision', 'xxx')),
- is_committed=(name == results.get('startup-revision', 'xxx')))
+ is_committed=True,
+ is_valid=True,
+ install_datetime='Not Available')
+ image_count += 1
images.append(image)
return images
@@ -367,12 +388,17 @@
try:
# Enumerate and create Northbound NNI interfaces
+ device.reason = 'Enumerating NNI Interfaces'
+ self.adapter_agent.update_device(device)
self.startup = self.enumerate_northbound_ports(device)
results = yield self.startup
self.startup = self.process_northbound_ports(device, results)
yield self.startup
+ device.reason = 'Adding NNI Interfaces to Adapter'
+ self.adapter_agent.update_device(device)
+
if not reconciling:
for port in self.northbound_ports.itervalues():
self.adapter_agent.add_port(device.id, port.get_port())
@@ -384,12 +410,17 @@
try:
# Enumerate and create southbound interfaces
+ device.reason = 'Enumerating PON Interfaces'
+ self.adapter_agent.update_device(device)
self.startup = self.enumerate_southbound_ports(device)
results = yield self.startup
self.startup = self.process_southbound_ports(device, results)
yield self.startup
+ device.reason = 'Adding PON Interfaces to Adapter'
+ self.adapter_agent.update_device(device)
+
if not reconciling:
for port in self.southbound_ports.itervalues():
self.adapter_agent.add_port(device.id, port.get_port())
@@ -409,7 +440,8 @@
# Reconcile child devices
self.adapter_agent.reconcile_child_devices(device.id)
ld_initialized = self.adapter_agent.get_logical_device()
- assert device.parent_id == ld_initialized.id
+ assert device.parent_id == ld_initialized.id, \
+ 'parent ID not Logical device ID'
else:
# Complete activation by setting up logical device for this OLT and saving
@@ -419,20 +451,32 @@
############################################################################
# Setup PM configuration for this device
+ try:
+ device.reason = 'Setting up PM configuration'
+ self.adapter_agent.update_device(device)
- # self.pm_metrics = AdapterPmMetrics(device)
- # pm_config = self.pm_metrics.make_proto()
- # self.log.info("initial-pm-config", pm_config=pm_config)
- # self.adapter_agent.update_device_pm_config(pm_config, init=True)
+ self.pm_metrics = AdapterPmMetrics(self.adapter, device)
+ pm_config = self.pm_metrics.make_proto()
+ self.log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
+ except Exception as e:
+ self.log.exception('pm-setup', e=e)
+ self.activate_failed(device, e.message)
############################################################################
# Setup Alarm handler
+ device.reason = 'Setting up Adapter Alarms'
+ self.adapter_agent.update_device(device)
+
self.alarms = AdapterAlarms(self.adapter, device)
############################################################################
# Create logical ports for all southbound and northbound interfaces
try:
+ device.reason = 'Creating logical ports'
+ self.adapter_agent.update_device(device)
self.startup = self.create_logical_ports(device, ld_initialized, reconciling)
yield self.startup
@@ -440,6 +484,10 @@
self.log.exception('logical-port', e=e)
self.activate_failed(device, e.message)
+ ############################################################################
+ # Register for ONU detection
+ # self.adapter_agent.register_for_onu_detect_state(device.id)
+
# Complete device specific steps
try:
self.log.debug('device-activation-procedures')
@@ -452,8 +500,8 @@
# Schedule the heartbeat for the device
- self.log.debug('Starting-heartbeat')
- self.start_heartbeat(delay=5)
+ self.log.debug('starting-heartbeat')
+ self.start_heartbeat(delay=10)
device = self.adapter_agent.get_device(device.id)
device.parent_id = ld_initialized.id
@@ -466,9 +514,9 @@
self._activate_io_port()
# Start collecting stats from the device after a brief pause
- reactor.callLater(5, self.start_kpi_collection, device.id)
+ reactor.callLater(10, self.start_kpi_collection, device.id)
- self.log.info('Activated')
+ self.log.info('activated')
def activate_failed(self, device, reason, reachable=True):
"""
@@ -564,8 +612,8 @@
sw_desc=version,
serial_num=device.serial_number,
dp_desc='n/a'),
- switch_features=ofp_switch_features(n_buffers=256, # TODO fake for now
- n_tables=2, # TODO ditto
+ switch_features=ofp_switch_features(n_buffers=256,
+ n_tables=2,
capabilities=(
OFPC_FLOW_STATS |
OFPC_TABLE_STATS |
@@ -579,9 +627,9 @@
@inlineCallbacks
def create_logical_ports(self, device, ld_initialized, reconciling):
- results = defer.fail()
-
if not reconciling:
+ # Add the ports to the logical device
+
for port in self.northbound_ports.itervalues():
lp = port.get_logical_port()
if lp is not None:
@@ -596,40 +644,44 @@
try:
for port in self.northbound_ports.itervalues():
self.startup = yield port.reset()
- results = yield self.startup
for port in self.southbound_ports.itervalues():
self.startup = yield port.reset()
- results = yield self.startup
except Exception as e:
- self.log.exception('Failed to reset ports to known good initial state', e=e)
- self.activate_failed(device, e.message)
+ self.log.exception('port-reset', e=e)
+ self.activate_failed(device, e.message)
- # Clean up all EVC and EVC maps (exceptions ok/not-fatal)
+ # Clean up all EVC and EVC maps (exceptions are ok)
try:
from flow.evc import EVC
self.startup = yield EVC.remove_all(self.netconf_client)
except Exception as e:
- self.log.exception('Failed attempting to clean up existing EVCs', e=e)
+ self.log.exception('evc-cleanup', e=e)
try:
from flow.evc_map import EVCMap
self.startup = yield EVCMap.remove_all(self.netconf_client)
except Exception as e:
- self.log.exception('Failed attempting to clean up existing EVC-Maps', e=e)
+ self.log.exception('evc-map-cleanup', e=e)
- # Start/stop the interfaces as needed
+ # Start/stop the interfaces as needed. These are deferred calls
- for port in self.northbound_ports.itervalues():
- self.startup = port.start()
- results = yield self.startup
+ try:
+ dl = []
+ for port in self.northbound_ports.itervalues():
+ dl.append(port.start())
- for port in self.southbound_ports.itervalues():
- self.startup = port.start() if port.admin_state == AdminState.ENABLED else port.stop()
- results = yield self.startup
+ for port in self.southbound_ports.itervalues():
+ dl.append(port.start() if port.admin_state == AdminState.ENABLED else port.stop())
+
+ results = yield defer.gatherResults(dl)
+
+ except Exception as e:
+ self.log.exception('port-startup', e=e)
+ results = defer.fail(Failure())
returnValue(results)
@@ -737,8 +789,11 @@
# Kill any heartbeat poll
h, self.heartbeat = self.heartbeat, None
- if h is not None and not h.called:
- h.cancel()
+ try:
+ if h is not None and not h.called:
+ h.cancel()
+ except:
+ pass
# TODO: What else (delete logical device, ???)
@@ -750,22 +805,28 @@
# Cancel any running enable/disable/... in progress
d, self.startup = self.startup, None
- if d is not None and not d.called:
- d.cancel()
-
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
# Deactivate in-band packets
self._deactivate_io_port()
+ # Drop registration for ONU detection
+ # self.adapter_agent.unregister_for_onu_detect_state(self.device.id)
+
# Suspend any active healthchecks / pings
h, self.heartbeat = self.heartbeat, None
-
- if h is not None and not h.called:
- h.cancel()
-
+ try:
+ if h is not None and not h.called:
+ h.cancel()
+ except:
+ pass
# Update the operational status to UNKNOWN
device.oper_status = OperStatus.UNKNOWN
@@ -830,9 +891,11 @@
# Cancel any running enable/disable/... in progress
d, self.startup = self.startup, None
- if d is not None and not d.called:
- d.cancel()
-
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
@@ -848,14 +911,12 @@
except Exception as e:
self.log.exception('adtran-hello-reconnect', e=e)
- # TODO: What is best way to handle reenable failure?
try:
yield self.make_netconf_connection()
except Exception as e:
self.log.exception('NETCONF-re-connection', e=e)
- # TODO: What is best way to handle reenable failure?
# Recreate the logical device
@@ -863,7 +924,12 @@
# Create logical ports for all southbound and northbound interfaces
- self.create_logical_ports(device, ld_initialized, False)
+ try:
+ self.startup = self.create_logical_ports(device, ld_initialized, False)
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('logical-port-creation', e=e)
device = self.adapter_agent.get_device(device.id)
device.parent_id = ld_initialized.id
@@ -893,6 +959,9 @@
self.startup = defer.gatherResults(dl)
results = yield self.startup
+ # Re-subscribe for ONU detection
+ # self.adapter_agent.register_for_onu_detect_state(self.device.id)
+
# TODO:
# 1) Restart health check / pings
@@ -912,8 +981,22 @@
# Cancel any running enable/disable/... in progress
d, self.startup = self.startup, None
- if d is not None and not d.called:
- d.cancel()
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+ # Issue reboot command
+
+ if not self.is_virtual_olt:
+ try:
+ yield self.netconf_client.rpc(AdtranDeviceHandler.RESTART_RPC)
+
+ except Exception as e:
+ self.log.exception('NETCONF-shutdown', e=e)
+ returnValue(defer.fail(Failure()))
+
+ # self.adapter_agent.unregister_for_onu_detect_state(self.device.id)
# Update the operational status to ACTIVATING and connect status to
# UNREACHABLE
@@ -928,27 +1011,18 @@
# Update the child devices connect state to UNREACHABLE
self.adapter_agent.update_child_devices_state(self.device_id,
connect_status=ConnectStatus.UNREACHABLE)
- # Issue reboot command
- if not self.is_virtual_olt:
- try:
- yield self.netconf_client.rpc(AdtranDeviceHandler.RESTART_RPC)
+ # Shutdown communications with OLT. Typically it takes about 2 seconds
+ # or so after the reply before the restart actually occurs
- except Exception as e:
- self.log.exception('NETCONF-shutdown', e=e)
- # TODO: On failure, what is the best thing to do?
+ try:
+ response = yield self.netconf_client.close()
+ self.log.debug('Restart response XML was: {}'.format('ok' if response.ok else 'bad'))
- # Shutdown communications with OLT. Typically it takes about 2 seconds
- # or so after the reply before the restart actually occurs
+ except Exception as e:
+ self.log.exception('NETCONF-client-shutdown', e=e)
- try:
- response = yield self.netconf_client.close()
- self.log.debug('Restart response XML was: {}'.format('ok' if response.ok else 'bad'))
-
- except Exception as e:
- self.log.exception('NETCONF-client-shutdown', e=e)
-
- # Clear off clients
+ # Clear off clients
self._netconf_client = None
self._rest_client = None
@@ -959,13 +1033,10 @@
current_time = time.time()
timeout = current_time + self.restart_failure_timeout
- try:
- yield reactor.callLater(10, self._finish_reboot, timeout,
- previous_oper_status, previous_conn_status)
- except Exception as e:
- self.log.exception('finish-reboot', e=e)
-
- returnValue('Waiting for reboot')
+ self.startup = reactor.callLater(10, self._finish_reboot, timeout,
+ previous_oper_status,
+ previous_conn_status)
+ returnValue(self.startup)
@inlineCallbacks
def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
@@ -976,8 +1047,7 @@
if self.rest_client is None:
try:
- response = yield self.make_restconf_connection(get_timeout=10)
- # self.log.debug('Restart RESTCONF connection JSON was: {}'.format(response))
+ yield self.make_restconf_connection(get_timeout=10)
except Exception:
self.log.debug('No RESTCONF connection yet')
@@ -986,7 +1056,6 @@
if self.netconf_client is None:
try:
yield self.make_netconf_connection(connect_timeout=10)
- # self.log.debug('Restart NETCONF connection succeeded')
except Exception as e:
try:
@@ -1000,13 +1069,10 @@
if (self.netconf_client is None and not self.is_virtual_olt) or self.rest_client is None:
current_time = time.time()
if current_time < timeout:
- try:
- yield reactor.callLater(5, self._finish_reboot, timeout,
- previous_oper_status, previous_conn_status)
- except Exception:
- self.log.debug('Rebooted-check', e=e)
-
- returnValue('Waiting some more...')
+ self.startup = reactor.callLater(5, self._finish_reboot, timeout,
+ previous_oper_status,
+ previous_conn_status)
+ returnValue(self.startup)
if self.netconf_client is None and not self.is_virtual_olt:
self.log.error('NETCONF-restore-failure')
@@ -1044,6 +1110,9 @@
except Exception as e:
self.log.exception('port-restart', e=e)
+ # Re-subscribe for ONU detection
+ # self.adapter_agent.register_for_onu_detect_state(self.device.id)
+
# Request reflow of any EVC/EVC-MAPs
if len(self._evcs) > 0:
@@ -1070,12 +1139,18 @@
# Cancel any outstanding tasks
d, self.startup = self.startup, None
- if d is not None and not d.called:
- d.cancel()
-
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
h, self.heartbeat = self.heartbeat, None
- if h is not None and not h.called:
- h.cancel()
+ try:
+ if h is not None and not h.called:
+ h.cancel()
+ except:
+ pass
+ # self.adapter_agent.unregister_for_onu_detect_state(self.device.id)
# Remove all flows from the device
# TODO: Create a bulk remove-all by device-id
@@ -1126,7 +1201,7 @@
if self.io_port is None:
self.log.info('registering-frameio')
self.io_port = registry('frameio').open_port(
- self.interface, self._rcv_io, _is_inband_frame)
+ self.interface, self._rcv_io, self._is_inband_frame)
def _deactivate_io_port(self):
io, self.io_port = self.io_port, None
@@ -1164,9 +1239,22 @@
self.log.info('sending-packet-out', egress_port=egress_port,
msg=hexify(msg))
pkt = Ether(msg)
+
+ #ADTRAN To remove any extra tags
+ while ( pkt.type == 0x8100 ):
+ msg_hex=hexify(msg)
+ msg_hex=msg_hex[:24]+msg_hex[32:]
+ bytes = []
+ msg_hex = ''.join( msg_hex.split(" ") )
+ for i in range(0, len(msg_hex), 2):
+ bytes.append( chr( int (msg_hex[i:i+2], 16 ) ) )
+ msg = ''.join( bytes )
+ pkt = Ether(msg)
+ #END
+
out_pkt = (
Ether(src=pkt.src, dst=pkt.dst) /
- Dot1Q(vlan=_PACKET_IN_VLAN) /
+ Dot1Q(vlan=self.packet_in_vlan) /
Dot1Q(vlan=egress_port, type=pkt.type) /
pkt.payload
)
@@ -1181,10 +1269,11 @@
# TODO: This has not been tested
def _collect(device_id, prefix):
from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
+ import random
try:
# Step 1: gather metrics from device
- port_metrics = self.pm_metrics.collect_port_metrics(self.get_channel())
+ port_metrics = self.pm_metrics.collect_port_metrics()
# Step 2: prepare the KpiEvent for submission
# we can time-stamp them here (or could use time derived from OLT
@@ -1205,7 +1294,7 @@
except Exception as e:
self.log.exception('failed-to-submit-kpis', e=e)
- # self.pm_metrics.start_collector(_collect)
+ self.pm_metrics.start_collector(_collect)
@inlineCallbacks
def get_device_info(self, device):
@@ -1224,7 +1313,7 @@
returnValue(device)
def start_heartbeat(self, delay=10):
- assert delay > 1
+ assert delay > 1, 'Minimum heartbeat is 1 second'
self.log.info('Starting-Device-Heartbeat ***')
self.heartbeat = reactor.callLater(delay, self.check_pulse)
return self.heartbeat
@@ -1249,7 +1338,6 @@
self.heartbeat_alarm(False, self.heartbeat_miss)
else:
- assert results
# Update device states
self.log.info('heartbeat-success')
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index 1b66359..23ba5eb 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -51,7 +51,7 @@
self.descriptor = Adapter(
id=self.name,
vendor='Adtran, Inc.',
- version='0.4',
+ version='0.5',
config=AdapterConfig(log_level=LogLevel.INFO)
)
log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
@@ -440,8 +440,10 @@
in the devices
"""
log.info('create-interface', data=data)
- handler = self.devices_handlers[device.id]
- handler.create_interface(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_interface(data)
def update_interface(self, device, data):
"""
@@ -449,8 +451,10 @@
in the devices
"""
log.info('update-interface', data=data)
- handler = self.devices_handlers[device.id]
- handler.update_interface(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_interface(data)
def remove_interface(self, device, data):
"""
@@ -458,8 +462,10 @@
in the devices
"""
log.info('remove-interface', data=data)
- handler = self.devices_handlers[device.id]
- handler.remove_interface(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_interface(data)
def receive_onu_detect_state(self, proxy_address, state):
"""
@@ -480,8 +486,10 @@
"""
log.info('create-tcont', tcont_data=tcont_data,
traffic_descriptor_data=traffic_descriptor_data)
- handler = self.devices_handlers[device.id]
- handler.create_tcont(tcont_data, traffic_descriptor_data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_tcont(tcont_data, traffic_descriptor_data)
def update_tcont(self, device, tcont_data, traffic_descriptor_data):
"""
@@ -493,8 +501,10 @@
"""
log.info('update-tcont', tcont_data=tcont_data,
traffic_descriptor_data=traffic_descriptor_data)
- handler = self.devices_handlers[device.id]
- handler.update_tcont(tcont_data, traffic_descriptor_data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_tcont(tcont_data, traffic_descriptor_data)
def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
"""
@@ -506,8 +516,10 @@
"""
log.info('remove-tcont', tcont_data=tcont_data,
traffic_descriptor_data=traffic_descriptor_data)
- handler = self.devices_handlers[device.id]
- handler.remove_tcont(tcont_data, traffic_descriptor_data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_tcont(tcont_data, traffic_descriptor_data)
def create_gemport(self, device, data):
"""
@@ -517,8 +529,10 @@
:return: None
"""
log.info('create-gemport', data=data)
- handler = self.devices_handlers[device.id]
- handler.create_gemport(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_gemport(data)
def update_gemport(self, device, data):
"""
@@ -528,8 +542,10 @@
:return: None
"""
log.info('update-gemport', data=data)
- handler = self.devices_handlers[device.id]
- handler.update_gemport(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_gemport(data)
def remove_gemport(self, device, data):
"""
@@ -539,8 +555,10 @@
:return: None
"""
log.info('remove-gemport', data=data)
- handler = self.devices_handlers[device.id]
- handler.remove_gemport(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_gemport(data)
def create_multicast_gemport(self, device, data):
"""
@@ -550,8 +568,10 @@
:return: None
"""
log.info('create-mcast-gemport', data=data)
- handler = self.devices_handlers[device.id]
- handler.create_multicast_gemport(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_gemport(data)
def update_multicast_gemport(self, device, data):
"""
@@ -561,8 +581,10 @@
:return: None
"""
log.info('update-mcast-gemport', data=data)
- handler = self.devices_handlers[device.id]
- handler.update_multicast_gemport(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_multicast_gemport(data)
def remove_multicast_gemport(self, device, data):
"""
@@ -572,8 +594,10 @@
:return: None
"""
log.info('remove-mcast-gemport', data=data)
- handler = self.devices_handlers[device.id]
- handler.remove_multicast_gemport(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_multicast_gemport(data)
def create_multicast_distribution_set(self, device, data):
"""
@@ -584,8 +608,10 @@
:return: None
"""
log.info('create-mcast-distribution-set', data=data)
- handler = self.devices_handlers[device.id]
- handler.create_multicast_distribution_set(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_distribution_set(data)
def update_multicast_distribution_set(self, device, data):
"""
@@ -596,8 +622,10 @@
:return: None
"""
log.info('update-mcast-distribution-set', data=data)
- handler = self.devices_handlers[device.id]
- handler.create_multicast_distribution_set(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_distribution_set(data)
def remove_multicast_distribution_set(self, device, data):
"""
@@ -608,5 +636,7 @@
:return: None
"""
log.info('remove-mcast-distribution-set', data=data)
- handler = self.devices_handlers[device.id]
- handler.create_multicast_distribution_set(data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_distribution_set(data)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index a4cba1c..524648b 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
import datetime
import random
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.internet.defer import returnValue, inlineCallbacks, succeed
from adtran_device_handler import AdtranDeviceHandler
@@ -31,6 +31,8 @@
ChannelgroupConfig, ChannelpartitionConfig, ChannelpairConfig, ChannelterminationConfig, \
OntaniConfig, VOntaniConfig, VEnetConfig
+FIXED_ONU = True # Enhanced ONU support
+
class AdtranOltHandler(AdtranDeviceHandler):
"""
@@ -121,32 +123,52 @@
"""
from codec.physical_entities_state import PhysicalEntitiesState
- device = {}
-
+ device = {
+ 'model': 'n/a',
+ 'hardware_version': 'n/a',
+ 'serial_number': 'n/a',
+ 'vendor': 'Adtran, Inc.',
+ 'firmware_version': 'n/a',
+ 'running-revision': 'n/a',
+ 'candidate-revision': 'n/a',
+ 'startup-revision': 'n/a',
+ }
if self.is_virtual_olt:
returnValue(device)
- pe_state = PhysicalEntitiesState(self.netconf_client)
- self.startup = pe_state.get_state()
- results = yield self.startup
+ try:
+ pe_state = PhysicalEntitiesState(self.netconf_client)
+ self.startup = pe_state.get_state()
+ results = yield self.startup
- if results.ok:
- modules = pe_state.get_physical_entities('adtn-phys-mod:module')
- if isinstance(modules, list):
- module = modules[0]
- name = str(module['model-name']).translate(None, '?')
- model = str(module['model-number']).translate(None, '?')
+ if results.ok:
+ modules = pe_state.get_physical_entities('adtn-phys-mod:module')
- device['model'] = '{} - {}'.format(name, model) if len(name) > 0 else \
- module['parent-entity']
- device['hardware_version'] = str(module['hardware-revision']).translate(None, '?')
- device['serial_number'] = str(module['serial-number']).translate(None, '?')
- device['vendor'] = 'Adtran, Inc.'
- device['firmware_version'] = str(device.get('firmware-revision', 'unknown')).translate(None, '?')
- software = module['software']['software']
- device['running-revision'] = str(software['running-revision']).translate(None, '?')
- device['candidate-revision'] = str(software['candidate-revision']).translate(None, '?')
- device['startup-revision'] = str(software['startup-revision']).translate(None, '?')
+ if isinstance(modules, list):
+ module = modules[0]
+
+ name = str(module.get('model-name', 'n/a')).translate(None, '?')
+ model = str(module.get('model-number', 'n/a')).translate(None, '?')
+
+ device['model'] = '{} - {}'.format(name, model) if len(name) > 0 else \
+ module.get('parent-entity', 'n/a')
+ device['hardware_version'] = str(module.get('hardware-revision',
+ 'n/a')).translate(None, '?')
+ device['serial_number'] = str(module.get('serial-number',
+ 'n/a')).translate(None, '?')
+ device['firmware_version'] = str(device.get('firmware-revision',
+ 'unknown')).translate(None, '?')
+ if 'software' in module:
+ if 'software' in module['software']:
+ software = module['software']['software']
+ device['running-revision'] = str(software.get('running-revision',
+ 'n/a')).translate(None, '?')
+ device['candidate-revision'] = str(software.get('candidate-revision',
+ 'n/a')).translate(None, '?')
+ device['startup-revision'] = str(software.get('startup-revision',
+ 'n/a')).translate(None, '?')
+ except Exception as e:
+ self.log.exception('get-pe-state', e=e)
returnValue(device)
@@ -193,8 +215,8 @@
for port in results:
port_no = port['port_no']
self.log.info('processing-nni', port_no=port_no, name=port['port_no'])
- assert port_no
- assert port_no not in self.northbound_ports
+ assert port_no, 'Port number not found'
+ assert port_no not in self.northbound_ports, 'Port number is not a northbound port'
self.northbound_ports[port_no] = NniPort(self, **port) if not self.is_virtual_olt \
else MockNniPort(self, **port)
@@ -236,20 +258,16 @@
for pon in results:
# Number PON Ports after the NNI ports
pon_id = pon['pon-id']
- log.info('Processing-pon-port', pon_id=pon_id)
- assert pon_id not in self.southbound_ports
-
- admin_state = AdminState.ENABLED if pon.get('enabled',
- PonPort.DEFAULT_ENABLED) else AdminState.DISABLED
+ log.info('processing-pon-port', pon_id=pon_id)
+ assert pon_id not in self.southbound_ports,\
+ 'Pon ID not found in southbound ports'
self.southbound_ports[pon_id] = PonPort(pon_id,
self._pon_id_to_port_number(pon_id),
- self,
- admin_state=admin_state)
-
- # TODO: For now, limit number of PON ports to make debugging easier
- if len(self.southbound_ports) >= self.max_pon_ports:
- break
+ self)
+ if self.autoactivate:
+ self.southbound_ports[pon_id].downstream_fec_enable = True
+ self.southbound_ports[pon_id].upstream_fec_enable = True
self.num_southbound_ports = len(self.southbound_ports)
@@ -271,7 +289,7 @@
#
# o Discover any new or missing ONT/ONUs
#
- # o TODO Discover any LOS for any ONT/ONUs
+ # o Discover any LOS for any ONT/ONUs
#
# o TODO Update some PON level statistics
@@ -308,9 +326,11 @@
c.shutdown()
d, self.status_poll = self.status_poll, None
- if d is not None and not d.called:
- d.cancel()
-
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
super(AdtranOltHandler, self).reboot()
def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
@@ -325,9 +345,11 @@
c.shutdown()
d, self.status_poll = self.status_poll, None
- if d is not None and not d.called:
- d.cancel()
-
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
super(AdtranOltHandler, self).delete()
def rx_packet(self, message):
@@ -396,9 +418,11 @@
# OLT Specific things here
d, self.startup = self.startup, None
- if d is not None and not d.called:
- d.cancel()
-
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
# self.pons.clear()
# TODO: Any other? OLT specific deactivate steps
@@ -492,6 +516,8 @@
def get_channel_id(self, pon_id, onu_id):
from pon_port import PonPort
+ if FIXED_ONU:
+ return self._onu_offset(onu_id)
return self._onu_offset(onu_id) + (pon_id * PonPort.MAX_ONUS_SUPPORTED)
def _onu_offset(self, onu_id):
@@ -502,6 +528,8 @@
def _channel_id_to_pon_id(self, channel_id, onu_id):
from pon_port import PonPort
+ if FIXED_ONU:
+ return channel_id - self._onu_offset(onu_id)
return (channel_id - self._onu_offset(onu_id)) / PonPort.MAX_ONUS_SUPPORTED
def _pon_id_to_port_number(self, pon_id):
@@ -762,16 +790,16 @@
if item in items:
del items[name]
- self._cached_xpon_pon_info = {} # Clear cached data
-
pass # TODO Do something....
raise NotImplementedError('TODO: not yet supported')
def on_channel_termination_config(self, name, operation, pon_type='xgs-ponid'):
supported_operations = ['create']
- assert operation in supported_operations
- assert name in self._channel_terminations
+ assert operation in supported_operations, \
+ 'Unsupported channel-term operation: {}'.format(operation)
+ assert name in self._channel_terminations, \
+ '{} is not a channel-termination'.format(name)
ct = self._channel_terminations[name]
pon_id = ct[pon_type]
@@ -781,11 +809,14 @@
if pon_port is None:
raise ValueError('Unknown PON port. PON-ID: {}'.format(pon_id))
- assert ct['channel-pair'] in self._channel_pairs
+ assert ct['channel-pair'] in self._channel_pairs, \
+ '{} is not a channel-pair'.format(ct['channel-pair'])
cpair = self._channel_pairs[ct['channel-pair']]
- assert cpair['channel-group'] in self._channel_groups
- assert cpair['channel-partition'] in self._channel_partitions
+ assert cpair['channel-group'] in self._channel_groups, \
+ '{} is not a -group'.format(cpair['channel-group'])
+ assert cpair['channel-partition'] in self._channel_partitions, \
+ '{} is not a channel-partition'.format(cpair('channel-partition'))
cg = self._channel_groups[cpair['channel-group']]
cpart = self._channel_partitions[cpair['channel-partition']]
@@ -794,8 +825,8 @@
polling_period = cg['polling-period']
authentication_method = cpart['authentication-method']
# line_rate = cpair['line-rate']
- # downstream_fec = cpart['fec-downstream']
- # deployment_range = cpart['differential-fiber-distance']
+ downstream_fec = cpart['fec-downstream']
+ deployment_range = cpart['differential-fiber-distance']
# mcast_aes = cpart['mcast-aes']
# TODO: Support BER calculation period
@@ -806,8 +837,8 @@
pon_port.xpon_name = name
pon_port.discovery_tick = polling_period
pon_port.authentication_method = authentication_method
- # TODO: pon_port.deployment_range = deployment_range
- # TODO: pon_port.fec_enable = downstream_fec
+ pon_port.deployment_range = deployment_range * 1000 # pon-agent uses meters
+ pon_port.downstream_fec_enable = downstream_fec
# TODO: pon_port.mcast_aes = mcast_aes
pon_port.admin_state = AdminState.ENABLED if enabled else AdminState.DISABLED
@@ -906,7 +937,7 @@
pass
raise NotImplementedError('TODO: Not yet supported')
- def delete_gemport(self, data):
+ def remove_gemport(self, data):
"""
Delete GEM Port
:param data:
diff --git a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
index 5cf9c79..65cf58f 100644
--- a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
+++ b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# CCopyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/voltha/adapters/adtran_olt/codec/olt_config.py b/voltha/adapters/adtran_olt/codec/olt_config.py
index 89a4afe..3db361c 100644
--- a/voltha/adapters/adtran_olt/codec/olt_config.py
+++ b/voltha/adapters/adtran_olt/codec/olt_config.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -98,7 +98,7 @@
@property
def onus(self):
if self._onus is None:
- self._onus = OltConfig.Pon.decode(self._packet.get('pon', None))
+ self._onus = OltConfig.Pon.Onu.decode(self._packet.get('onus', None))
return self._onus
class Onu(object):
@@ -109,7 +109,9 @@
assert 'onu-id' in packet
self._packet = packet
self._tconts = None
+ self._tconts_dict = None
self._gem_ports = None
+ self._gem_ports_dict = None
def __str__(self):
return "OltConfig.Pon.Onu: onu-id: {}".format(self.onu_id)
@@ -151,10 +153,22 @@
return self._tconts
@property
+ def tconts_dict(self): # TODO: Remove if not used
+ if self._tconts_dict is None:
+ self._tconts_dict = {tcont.alloc_id: tcont for tcont in self.tconts}
+ return self._tconts_dict
+
+ @property
def gem_ports(self):
if self._gem_ports is None:
self._gem_ports = OltConfig.Pon.Onu.GemPort.decode(self._packet.get('gem-ports', None))
- return self._tconts
+ return self._gem_ports
+
+ @property
+ def gem_ports_dict(self): # TODO: Remove if not used
+ if self._gem_ports_dict is None:
+ self._gem_ports_dict = {gem.gem_id: gem for gem in self.gem_ports}
+ return self._gem_ports_dict
class TCont(object):
"""
@@ -208,15 +222,24 @@
@property
def fixed_bandwidth(self):
- return self._packet['fixed-bandwidth']
+ try:
+ return int(self._packet.get('fixed-bandwidth', 0))
+ except:
+ return 0
@property
def assured_bandwidth(self):
- return self._packet['assured-bandwidth']
+ try:
+ return int(self._packet.get('assured-bandwidth', 0))
+ except:
+ return 0
@property
def maximum_bandwidth(self):
- return self._packet['maximum-bandwidth']
+ try:
+ return int(self._packet.get('maximum-bandwidth', 0))
+ except:
+ return 0
@property
def additional_bandwidth_eligibility(self):
@@ -265,7 +288,7 @@
gem_ports = {}
for gem_port_data in gem_port_container.get('gem-port', []):
gem_port = OltConfig.Pon.Onu.GemPort(gem_port_data)
- assert gem_port.port_id not in gem_port
+ assert gem_port.port_id not in gem_ports
gem_ports[gem_port.port_id] = gem_port
return gem_ports
@@ -276,6 +299,11 @@
return self._packet['port-id']
@property
+ def gem_id(self):
+ """The ID used to identify the GEM Port"""
+ return self.port_id
+
+ @property
def alloc_id(self):
"""The Alloc-ID of the T-CONT to which this GEM port is mapped"""
return self._packet['alloc-id']
diff --git a/voltha/adapters/adtran_olt/codec/olt_state.py b/voltha/adapters/adtran_olt/codec/olt_state.py
index 8ab07db..cf55d43 100644
--- a/voltha/adapters/adtran_olt/codec/olt_state.py
+++ b/voltha/adapters/adtran_olt/codec/olt_state.py
@@ -1,4 +1,3 @@
-#
# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
@@ -167,7 +166,7 @@
key/value: serial-number (string)
"""
return frozenset([sn['serial-number'] for sn in self._packet.get('discovered-onu', [])
- if 'serial-number' in sn])
+ if 'serial-number' in sn and sn['serial-number'] != 'AAAAAAAAAAA='])
@property
def gems(self):
@@ -189,7 +188,7 @@
"""
def __init__(self, packet):
- assert 'onu-id' in packet
+ assert 'onu-id' in packet, 'onu-id not found in packet'
self._packet = packet
def __str__(self):
@@ -226,3 +225,14 @@
def rssi(self):
"""The received signal strength indication of the ONU"""
return self._packet.get('rssi', -9999)
+
+ @property
+ def equalization_delay(self):
+ """Equalization delay (bits)"""
+ return self._packet.get('equalization-delay', 0)
+
+ @property
+ def fiber_length(self):
+ """Distance to ONU"""
+ return self._packet.get('fiber-length', 0)
+
diff --git a/voltha/adapters/adtran_olt/codec/physical_entities_state.py b/voltha/adapters/adtran_olt/codec/physical_entities_state.py
index 947eab6..cf2f085 100644
--- a/voltha/adapters/adtran_olt/codec/physical_entities_state.py
+++ b/voltha/adapters/adtran_olt/codec/physical_entities_state.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/voltha/adapters/adtran_olt/flow/evc.py b/voltha/adapters/adtran_olt/flow/evc.py
index ee31788..add54e8 100644
--- a/voltha/adapters/adtran_olt/flow/evc.py
+++ b/voltha/adapters/adtran_olt/flow/evc.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -82,7 +82,6 @@
self._flow = flow_entry
self._name = self._create_name()
self._evc_maps = {} # Map Name -> evc-map
- self._install_deferred = None
self._flow_type = EVC.ElineFlowType.UNKNOWN
@@ -142,7 +141,7 @@
@stpid.setter
def stpid(self, value):
- assert self._stpid is None or self._stpid == value
+ assert self._stpid is None or self._stpid == value, 'STPID can only be set once'
self._stpid = value
@property
@@ -151,7 +150,8 @@
@switching_method.setter
def switching_method(self, value):
- assert self._switching_method is None or self._switching_method == value
+ assert self._switching_method is None or self._switching_method == value,\
+ 'Switching Method can only be set once'
self._switching_method = value
@property
@@ -160,7 +160,8 @@
@ce_vlan_preservation.setter
def ce_vlan_preservation(self, value):
- assert self._ce_vlan_preservation is None or self._ce_vlan_preservation == value
+ assert self._ce_vlan_preservation is None or self._ce_vlan_preservation == value,\
+ 'CE VLAN Preservation can only be set once'
self._ce_vlan_preservation = value
@property
@@ -169,13 +170,22 @@
@men_to_uni_tag_manipulation.setter
def men_to_uni_tag_manipulation(self, value):
- assert self._men_to_uni_tag_manipulation is None or self._men_to_uni_tag_manipulation == value
+ assert self._men_to_uni_tag_manipulation is None or self._men_to_uni_tag_manipulation == value, \
+ 'MEN-to-UNI tag manipulation can only be set once'
self._men_to_uni_tag_manipulation = value
@property
def flow_entry(self):
+ # Note that the first flow used to create the EVC is saved and it may
+ # eventually get deleted while others still use the EVC. This should
+ # be okay as the downstream flow/signature table is used to maintain
+ # the lifetime on this EVC object.
return self._flow
+ @flow_entry.setter
+ def flow_entry(self, value):
+ self._flow = value
+
@property
def evc_maps(self):
"""
@@ -184,6 +194,14 @@
"""
return list(self._evc_maps.values())
+ @property
+ def evc_map_names(self):
+ """
+ Get all EVC Map names that reference this EVC
+ :return: list of EVCMap names
+ """
+ return list(self._evc_maps.keys())
+
def add_evc_map(self, evc_map):
if self._evc_maps is not None:
self._evc_maps[evc_map.name] = evc_map
@@ -192,24 +210,11 @@
if self._evc_maps is not None and evc_map.name in self._evc_maps:
del self._evc_maps[evc_map.name]
- def cancel_defers(self):
- d, self._install_deferred = self._install_deferred, None
- if d is not None and not d.called:
- try:
- d.cancel()
- except:
- pass
-
def schedule_install(self):
"""
Try to install EVC and all MAPs in a single operational sequence
"""
- self.cancel_defers()
-
- if self._valid and self._install_deferred is None:
- self._install_deferred = reactor.callLater(0, self._do_install)
-
- return self._install_deferred
+ return reactor.callLater(0, self._do_install) if self._valid else succeed('Not VALID')
@staticmethod
def _xml_header(operation=None):
@@ -222,8 +227,6 @@
@inlineCallbacks
def _do_install(self):
- self._install_deferred = None
-
# Install the EVC if needed
if self._valid and not self._installed:
@@ -239,15 +242,15 @@
if self._s_tag is not None:
xml += '<stag>{}</stag>'.format(self._s_tag)
- xml += '<stag-tpid>{:#x}</stag-tpid>'.format(self._stpid or DEFAULT_STPID)
+ xml += '<stag-tpid>{}</stag-tpid>'.format(self._stpid or DEFAULT_STPID)
else:
xml += 'no-stag/'
for port in self._men_ports:
xml += '<men-ports>{}</men-ports>'.format(port)
- xml += EVC.Men2UniManipulation.xml(self._men_to_uni_tag_manipulation)
- xml += EVC.SwitchingMethod.xml(self._switching_method)
+ # xml += EVC.Men2UniManipulation.xml(self._men_to_uni_tag_manipulation)
+ # xml += EVC.SwitchingMethod.xml(self._switching_method)
xml += EVC._xml_trailer()
log.debug("Creating EVC {}: '{}'".format(self.name, xml))
@@ -268,7 +271,7 @@
if self._installed:
for evc_map in self.evc_maps:
try:
- results = yield evc_map.install()
+ yield evc_map.install()
pass # TODO: What to do on error?
except Exception as e:
@@ -283,8 +286,6 @@
:param remove_maps: (boolean)
:return: (deferred)
"""
- self.cancel_defers()
-
if not self.installed:
return succeed('Not installed')
@@ -297,6 +298,7 @@
def _failure(results):
log.error('remove-failed', results=results)
+ self._installed = False
xml = EVC._xml_header('delete') + '<name>{}</name>'.format(self.name) + EVC._xml_trailer()
d = self._flow.handler.netconf_client.edit_config(xml, lock_timeout=30)
@@ -318,6 +320,8 @@
try:
dl = [self.remove()]
+ self._valid = False
+
if delete_maps:
for evc_map in self.evc_maps:
dl.append(evc_map.delete()) # TODO: implement bulk-flow procedures
@@ -340,8 +344,8 @@
:param reflow_maps: (boolean) Flag indication if EVC-MAPs should be reflowed as well
:return: (deferred)
"""
- self.cancel_defers()
self._installed = False
+
if reflow_maps:
for evc_map in self.evc_maps:
evc_map.installed = False
@@ -360,8 +364,8 @@
self._s_tag = self._flow.vlan_id
- # if self._flow.inner_vid is not None:
- # self._switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED TODO: Future support
+ if self._flow.inner_vid is not None:
+ self._switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
# Note: The following fields will get set when the first EVC-MAP
# is associated with this object. Once set, they cannot be changed to
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
index 6f54e57..5186ce4 100644
--- a/voltha/adapters/adtran_olt/flow/evc_map.py
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -68,8 +68,9 @@
def __init__(self, flow, evc, is_ingress_map):
self._flow = flow
self._evc = evc
- self._gem_ids_and_vid = None
+ self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
self._is_ingress_map = is_ingress_map
+ self._pon_id = None
self._installed = False
self._status_message = None
@@ -78,8 +79,6 @@
self._uni_port = None
self._evc_connection = EVCMap.EvcConnection.DEFAULT
self._evc_name = None
- self._is_pon_port = None
-
self._men_priority = EVCMap.PriorityOption.DEFAULT
self._men_pri = 0 # If Explicit Priority
@@ -137,7 +136,7 @@
@installed.setter
def installed(self, value):
- assert not value # Can only reset
+ assert not value, 'installed can only be reset' # Can only reset
self._installed = False
@property
@@ -157,6 +156,18 @@
return self._eth_type is not None or self._ip_protocol is not None or\
self._ipv4_dst is not None or self._udp_dst is not None or self._udp_src is not None
+ @property
+ def pon_id(self):
+ return self._pon_id # May be None
+
+ @property
+ def onu_ids(self):
+ return self._gem_ids_and_vid.keys()
+
+ @property
+ def gem_ids_and_vid(self):
+ return self._gem_ids_and_vid.copy()
+
@staticmethod
def _xml_header(operation=None):
return '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"{}><evc-map>'.\
@@ -166,73 +177,79 @@
def _xml_trailer():
return '</evc-map></evc-maps>'
+ def _common_install_xml(self):
+ xml = '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
+ xml += '<uni>{}</uni>'.format(self._uni_port)
+
+ if self._evc_name is not None:
+ xml += '<evc>{}</evc>'.format(self._evc_name)
+ else:
+ xml += EVCMap.EvcConnection.xml(self._evc_connection)
+
+ xml += '<match-untagged>{}</match-untagged>'.format('true'
+ if self._match_untagged
+ else 'false')
+ # if self._c_tag is not None:
+ # xml += '<ctag>{}</ctag>'.format(self._c_tag)
+ # TODO: The following is not yet supported (and in some cases, not decoded)
+ # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_pri = 0 # If Explicit Priority
+ #
+ # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_ctag_pri = 0 # If Explicit Priority
+ #
+ # self._match_ce_vlan_id = None
+ # self._match_untagged = True
+ # self._match_destination_mac_address = None
+ # self._eth_type = None
+ # self._ip_protocol = None
+ # self._ipv4_dst = None
+ # self._udp_dst = None
+ # self._udp_src = None
+ return xml
+
+ def _ingress_install_xml(self, onu_s_gem_ids_and_vid):
+ from ..onu import Onu
+
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
+
+ for onu_or_vlan_id, gem_ids_and_vid in onu_s_gem_ids_and_vid.iteritems():
+ first_gem_id = True
+ vid = gem_ids_and_vid[1]
+ ident = '{}.{}'.format(self._pon_id, onu_or_vlan_id) if vid is None \
+ else onu_or_vlan_id
+
+ for gem_id in gem_ids_and_vid[0]:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, ident, gem_id)
+ xml += '<ce-vlan-id>{}</ce-vlan-id>'.format(Onu.gem_id_to_gvid(gem_id))
+
+ # GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
+ if first_gem_id and vid is not None:
+ first_gem_id = False
+ xml += '<network-ingress-filter>'
+ xml += '<men-ctag>{}</men-ctag>'.format(vid) # Added in August 2017 model
+ xml += '</network-ingress-filter>'
+
+ xml += self._common_install_xml()
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ def _egress_install_xml(self):
+ xml = EVCMap._xml_header()
+ xml += '<name>{}</name>'.format(self.name)
+ xml += self._common_install_xml()
+ xml += EVCMap._xml_trailer()
+ return xml
+
@inlineCallbacks
def install(self):
- if self._valid and not self._installed:
- def _common_xml():
- xml = '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
- xml += '<uni>{}</uni>'.format(self._uni_port)
-
- if self._evc_name is not None:
- xml += '<evc>{}</evc>'.format(self._evc_name)
- else:
- xml += EVCMap.EvcConnection.xml(self._evc_connection)
-
- xml += '<match-untagged>{}</match-untagged>'.format('true'
- if self._match_untagged
- else 'false')
- # if self._c_tag is not None:
- # xml += '<ctag>{}</ctag>'.format(self._c_tag)
- # TODO: The following is not yet supported (and in some cases, not decoded)
- # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
- # self._men_pri = 0 # If Explicit Priority
- #
- # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
- # self._men_ctag_pri = 0 # If Explicit Priority
- #
- # self._match_ce_vlan_id = None
- # self._match_untagged = True
- # self._match_destination_mac_address = None
- # self._eth_type = None
- # self._ip_protocol = None
- # self._ipv4_dst = None
- # self._udp_dst = None
- # self._udp_src = None
- return xml
-
- def _ingress_xml():
- from ..onu import Onu
- xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
- for onu_id, gem_ids_and_vid in self._gem_ids_and_vid.iteritems():
- first_gem_id = True
- vid = gem_ids_and_vid[1]
-
- for gem_id in gem_ids_and_vid[0]:
- xml += '<evc-map>'
- xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
- xml += '<ce-vlan-id>{}</ce-vlan-id>'.format(Onu.gem_id_to_gvid(gem_id))
-
- if first_gem_id and vid is not None:
- first_gem_id = False
- xml += '<network-ingress-filter>'
- xml += '<men-ctag>{}</men-ctag>'.format(vid) # Added in august 2017 model
- xml += '</network-ingress-filter>'
-
- xml += _common_xml()
- xml += '</evc-map>'
- xml += '</evc-maps>'
- return xml
-
- def _egress_xml():
- xml = EVCMap._xml_header()
- xml += '<name>{}</name>'.format(self.name)
- xml += _common_xml()
- xml += EVCMap._xml_trailer()
- return xml
-
+ if self._valid and not self._installed and len(self._gem_ids_and_vid) > 0:
try:
# TODO: create generator of XML once we have MANY to install at once
- map_xml = _ingress_xml() if self._is_ingress_map else _egress_xml()
+ map_xml = self._ingress_install_xml(self._gem_ids_and_vid) \
+ if self._is_ingress_map else self._egress_install_xml()
log.debug('install', xml=map_xml, name=self.name)
results = yield self._flow.handler.netconf_client.edit_config(map_xml,
@@ -240,50 +257,47 @@
self._installed = results.ok
self.status = '' if results.ok else results.error
- if self._pon_port is not None:
- self._pon_port.add_pon_evc_map(self)
-
except Exception as e:
log.exception('install', name=self.name, e=e)
raise
returnValue(self._installed and self._valid)
+ def _ingress_remove_xml(self, onus_gem_ids_and_vid):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation="delete">'
+
+ for onu_id, gem_ids_and_vid in onus_gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ def _egress_remove_xml(self):
+ return EVCMap._xml_header('delete') + \
+ '<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
+
+ @inlineCallbacks
def remove(self):
if not self.installed:
- return succeed('Not installed')
+ returnValue(succeed('Not installed'))
log.info('removing', evc_map=self)
- def _ingress_xml():
- xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
- ' xc:operation = "delete">'
-
- for onu_id, gem_ids_and_vid in self._gem_ids_and_vid.iteritems():
- for gem_id in gem_ids_and_vid[0]:
- xml += '<evc-map>'
- xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
- xml += '</evc-map>'
- xml += '</evc-maps>'
-
- return xml
-
- def _egress_xml():
- return EVCMap._xml_header('delete') + \
- '<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
-
def _success(rpc_reply):
log.debug('remove-success', rpc_reply=rpc_reply)
self._installed = False
- def _failure(results):
- log.error('remove-failed', results=results)
-
- if self._pon_port is not None:
- self._pon_port.remove_pon_evc_map(self)
+ def _failure(failure):
+ log.error('remove-failed', failure=failure)
+ self._installed = False
# TODO: create generator of XML once we have MANY to install at once
- map_xml = _ingress_xml() if self._is_ingress_map else _egress_xml()
+ map_xml = self._ingress_remove_xml(self._gem_ids_and_vid) if self._is_ingress_map \
+ else self._egress_remove_xml()
+
d = self._flow.handler.netconf_client.edit_config(map_xml, lock_timeout=30)
d.addCallbacks(_success, _failure)
return d
@@ -295,6 +309,10 @@
"""
if self._evc is not None:
self._evc.remove_evc_map(self)
+ self._evc = None
+
+ self._flow = None
+ self._valid = False
try:
yield self.remove()
@@ -302,61 +320,19 @@
except Exception as e:
log.exception('removal', e=e)
- self._flow = None
- self._evc = None
returnValue('Done')
- def add_onu(self, onu):
- """
- Add an ONU to a pon-wide EVC Map
-
- :param onu: (Onu) ONU to add
- :return: (defeered)
- """
- if self._pon_port is not None:
- gem_ids = onu.gem_ids(True)
- vid = onu.onu_vid
- pass # TODO: Implement this
-
- def remove_onu(self, onu):
- """
- Remove an ONU to a pon-wide EVC Map
-
- :param onu: (Onu) ONU to add
- :return: (defeered)
- """
- if self._pon_port is not None:
- gem_ids = onu.gem_ids(True)
- vid = onu.onu_vid
- pass # TODO: Implement this
-
- def add_gem_id(self, onu, gem_id):
- """
- Add a GEM ID to and existing EVC_MAP
-
- :param onu: (Onu) ONU
- :param gem_id: (Int) GEM ID
- :return: (defeered)
- """
- pass # TODO: Implement this
-
- def remove_gem_id(self, onu, gem_id):
- """
- Remove a GEM ID from and existing EVC_MAP
-
- :param onu: (Onu) ONU
- :param gem_id: (Int) GEM ID
- :return: (defeered)
- """
- pass # TODO: Implement this
+ @staticmethod
+ def create_evc_map_name(flow):
+ return EVC_MAP_NAME_FORMAT.format(flow.in_port, flow.flow_id)
def _decode(self):
from evc import EVC
from flow_entry import FlowEntry
- flow = self._flow
+ flow = self._flow # TODO: Drop saving of flow once debug complete
- self._name = EVC_MAP_NAME_FORMAT.format(flow.in_port, flow.flow_id)
+ self._name = EVCMap.create_evc_map_name(flow)
if self._evc:
self._evc_connection = EVCMap.EvcConnection.EVC
@@ -394,12 +370,12 @@
pon_port = flow.handler.get_southbound_port(flow.in_port)
if pon_port is not None:
- if flow.onu_vid is None:
- self._pon_port = pon_port # EVC Map is for all ONUs on port
+ self._pon_id = pon_port.pon_id
+ self._gem_ids_and_vid = pon_port.gem_ids(flow.logical_port,
+ self._needs_acl_support,
+ flow.is_multicast_flow)
- self._gem_ids_and_vid = pon_port.gem_ids(flow.onu_vid, self._needs_acl_support)
-
- # TODO: Only EAPOL ACL support for the first demo
+ # TODO: Only EAPOL ACL support for the first demo - FIXED_ONU
if self._needs_acl_support and self._eth_type != FlowEntry.EtherType.EAPOL.value:
self._gem_ids_and_vid = dict()
@@ -409,16 +385,19 @@
# If a push of a single VLAN is present with a POP of the VLAN in the EVC's
# flow, then this is a traditional EVC flow
- if len(flow.push_vlan_id) == 1 and self._evc.flow_entry.pop_vlan == 1:
- self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.SYMMETRIC
- self._evc.switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
- self._evc.stpid = flow.push_vlan_tpid[0]
+ self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
+ self._evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
- elif len(flow.push_vlan_id) == 2 and self._evc.flow_entry.pop_vlan == 1:
- self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
- self._evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
- # self._match_ce_vlan_id = 'TODO: something maybe'
- raise NotImplementedError('TODO: Not supported/needed yet')
+ # if len(flow.push_vlan_id) == 1 and self._evc.flow_entry.pop_vlan == 1:
+ # self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.SYMMETRIC
+ # self._evc.switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
+ # self._evc.stpid = flow.push_vlan_tpid[0]
+ #
+ # elif len(flow.push_vlan_id) == 2 and self._evc.flow_entry.pop_vlan == 1:
+ # self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
+ # self._evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
+ # # self._match_ce_vlan_id = 'something maybe'
+ # raise NotImplementedError('TODO: Not supported/needed yet')
return True
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
index b5bb36b..189c7d0 100644
--- a/voltha/adapters/adtran_olt/flow/flow_entry.py
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -31,9 +31,17 @@
17, # UDP
]
-_existing_flow_entries = {} # device-id -> flow dictionary
- # |
- # +-> flow-id -> flow-entry
+_existing_downstream_flow_entries = {} # device-id -> signature-table
+ # |
+ # +-> downstream-signature
+ # |
+ # +-> 'evc' -> EVC
+ # |
+ # +-> flow-ids -> flow-entry
+
+_existing_upstream_flow_entries = {} # device-id -> flow dictionary
+ # |
+ # +-> flow-id -> flow-entry
class FlowEntry(object):
@@ -72,16 +80,18 @@
UDP = 17
def __init__(self, flow, handler):
- self._flow = flow
+ self._flow = flow # TODO: Remove later
self._handler = handler
+ self.flow_id = flow.id
self.evc = None # EVC this flow is part of
self.evc_map = None # EVC-MAP this flow is part of
self._flow_direction = FlowEntry.FlowDirection.OTHER
- self.onu_vid = None
+ self._logical_port = None # Currently ONU VID is logical port if not doing xPON
+ self._is_multicast = False
- self._name = self._create_flow_name()
# A value used to locate possible related flow entries
self.signature = None
+ self.downstream_signature = None # Valid for upstream EVC-MAP Flows
# Selection properties
self.in_port = None
@@ -100,12 +110,17 @@
self.push_vlan_tpid = []
self.push_vlan_id = []
+ self._name = self.create_flow_name()
+
+ def __str__(self):
+ return 'flow_entry: {}, in: {}, out: {}'.format(self.name, self.in_port,
+ self.output)
+
@property
def name(self):
return self._name # TODO: Is a name really needed in production?
- # TODO: Is a name really needed in production?
- def _create_flow_name(self):
+ def create_flow_name(self):
return 'flow-{}-{}'.format(self.device_id, self.flow_id)
@property
@@ -113,10 +128,6 @@
return self._flow
@property
- def flow_id(self):
- return self.flow.id
-
- @property
def handler(self):
return self._handler
@@ -128,6 +139,14 @@
def flow_direction(self):
return self._flow_direction
+ @property
+ def is_multicast_flow(self):
+ return self._is_multicast
+
+ @property
+ def logical_port(self):
+ return self._logical_port # NNI or UNI Logical Port
+
@staticmethod
def create(flow, handler):
"""
@@ -151,84 +170,135 @@
try:
flow_entry = FlowEntry(flow, handler)
- if flow_entry.device_id not in _existing_flow_entries:
- _existing_flow_entries[flow_entry.device_id] = {}
-
- flow_table = _existing_flow_entries[flow_entry.device_id]
-
- if flow_entry.flow_id in flow_table:
- return flow_entry, None
-
- #########################################
- # A new flow, decode it into the items of interest
-
if not flow_entry._decode():
return None, None
+ if flow_entry.device_id not in _existing_downstream_flow_entries:
+ _existing_downstream_flow_entries[flow_entry.device_id] = {}
+
+ if flow_entry.device_id not in _existing_upstream_flow_entries:
+ _existing_upstream_flow_entries[flow_entry.device_id] = {}
+
+ downstream_sig_table = _existing_downstream_flow_entries[flow_entry.device_id]
+ upstream_flow_table = _existing_upstream_flow_entries[flow_entry.device_id]
+
+ if flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM and\
+ flow_entry.flow_id in upstream_flow_table:
+ return flow_entry, None
+
+ if flow_entry.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM and\
+ flow_entry.signature in downstream_sig_table and\
+ flow_entry.flow_id in downstream_sig_table[flow_entry.signature]:
+ return flow_entry, None
+
# Look for any matching flows in the other direction that might help make an EVC
# and then save it off in the device specific flow table
# TODO: For now, only support for E-LINE services between NNI and UNI
- flow_candidates = [_flow for _flow in flow_table.itervalues()
- if _flow.signature == flow_entry.signature and
- _flow.in_port == flow_entry.output and
- (_flow.flow_direction == FlowEntry.FlowDirection.UPSTREAM or
- _flow.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM)
- ]
+ downstream_flow = None
+ upstream_flows = None
+ downstream_sig = None
- flow_table[flow_entry.flow_id] = flow_entry
-
- # TODO: For now, only support for E-LINE services between NNI and UNI
- if len(flow_candidates) == 0 or (flow_entry.flow_direction != FlowEntry.FlowDirection.UPSTREAM and
- flow_entry.flow_direction != FlowEntry.FlowDirection.DOWNSTREAM):
- return flow_entry, None
-
- # Possible candidate found. Currently, the logical_device_agent sends us the load downstream
- # flow first and then all the matching upstreams. So we should have only one match
-
- if flow_entry.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ if flow_entry._is_multicast: # Uni-directional flow
+ assert flow_entry._flow_direction == FlowEntry.FlowDirection.DOWNSTREAM, \
+ 'Only downstream Multicast supported'
downstream_flow = flow_entry
- else:
- assert len(flow_candidates) != 0
- downstream_flow = flow_candidates[0]
+ downstream_sig = flow_entry.signature
+ upstream_flows = []
+ elif flow_entry.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ downstream_flow = flow_entry
+ downstream_sig = flow_entry.signature
+
+ elif flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ downstream_sig = flow_entry.downstream_signature
+
+ if downstream_sig is None:
+ return None, None
+
+ if downstream_sig not in downstream_sig_table:
+ downstream_sig_table[downstream_sig] = {}
+ downstream_sig_table[downstream_sig]['evc'] = None
+
+ downstream_flow_table = downstream_sig_table[downstream_sig]
+ evc = downstream_flow_table['evc']
+
+ # Save to proper flow table
if flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
- upstream_flows = [flow_entry]
- else:
- upstream_flows = flow_candidates
+ upstream_flow_table[flow_entry.flow_id] = flow_entry
+ downstream_flow = evc.flow_entry if evc is not None else \
+ next((_flow for _flow in downstream_flow_table.itervalues() if isinstance(_flow, FlowEntry)), None)
- return flow_entry, FlowEntry._create_evc_and_maps(downstream_flow, upstream_flows)
+ elif flow_entry.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ downstream_flow_table[flow_entry.flow_id] = flow_entry
+
+ # Now find all the upstream flows
+ if downstream_flow is not None:
+ upstream_flows = [_flow for _flow in upstream_flow_table.itervalues()
+ if _flow.downstream_signature == downstream_flow.signature]
+ if len(upstream_flows) == 0 and not downstream_flow.is_multicast_flow:
+ upstream_flows = None
+
+ # Compute EVC and and maps
+
+ evc = FlowEntry._create_evc_and_maps(evc, downstream_flow, upstream_flows)
+ if evc is not None and evc.valid and downstream_flow_table['evc'] is None:
+ downstream_flow_table['evc'] = evc
+
+ return flow_entry, evc
except Exception as e:
log.exception('flow_entry-processing', e=e)
+ return None, None
@staticmethod
- def _create_evc_and_maps(downstream_flow, upstream_flows):
+ def _create_evc_and_maps(evc, downstream_flow, upstream_flows):
"""
Give a set of flows, find (or create) the EVC and any needed EVC-MAPs
- :param downstream_flow: NNI -> UNI flow (provides much of the EVC values)
- :param upstream_flows: UNI -> NNI flows (provides much of the EVC-MAP values)
+ :param evc: (EVC) Existing EVC for downstream flow. May be null if not created
+ :param downstream_flow: (FlowEntry) NNI -> UNI flow (provides much of the EVC values)
+ :param upstream_flows: (list of FlowEntry) UNI -> NNI flows (provides much of the EVC-MAP values)
:return: EVC object
"""
+ if (evc is None and downstream_flow is None) or upstream_flows is None:
+ return None
+
# Get any existing EVC if a flow is already created
if downstream_flow.evc is None:
- downstream_flow.evc = EVC(downstream_flow)
+ if evc is not None:
+ downstream_flow.evc = evc
- evc = downstream_flow.evc
- if not evc.valid:
+ elif downstream_flow.is_multicast_flow:
+ from mcast import MCastEVC
+ downstream_flow.evc = MCastEVC.create(downstream_flow)
+
+ else:
+ downstream_flow.evc = EVC(downstream_flow)
+
+ if not downstream_flow.evc.valid:
return None
- # Create EVC-MAPs
+ # Create EVC-MAPs. Note upstream_flows is empty list for multicast
+
for flow in upstream_flows:
if flow.evc_map is None:
- flow.evc_map = EVCMap.create_ingress_map(flow, evc)
+ flow.evc_map = EVCMap.create_ingress_map(flow, downstream_flow.evc)
- all_valid = all(flow.evc_map.valid for flow in upstream_flows)
+ all_maps_valid = all(flow.evc_map.valid for flow in upstream_flows) \
+ or downstream_flow.is_multicast_flow
- return evc if all_valid else None
+ return downstream_flow.evc if all_maps_valid else None
+
+ @property
+ def _needs_acl_support(self):
+ """
+ TODO: This is only while there is only a single downstream exception flow
+ """
+ return self.eth_type is not None or self.ip_protocol is not None or\
+ self.ipv4_dst is not None or self.udp_dst is not None or self.udp_src is not None
def _decode(self):
"""
@@ -261,23 +331,34 @@
ports.sort()
# 3 - The outer VID
+ # 4 - The inner VID. Wildcard if downstream
push_len = len(self.push_vlan_id)
- assert push_len <= 2
-
- outer = self.vlan_id or None if push_len == 0 else self.push_vlan_id[0]
-
- # 4 - The inner VID.
- if self.inner_vid is not None:
+ if push_len == 0:
+ outer = self.vlan_id
inner = self.inner_vid
else:
- inner = self.vlan_id if (push_len > 0 and outer is not None) else None
- self.onu_vid = inner if self._flow_direction == FlowEntry.FlowDirection.UPSTREAM else None
+ outer = self.push_vlan_id[-1]
+ if push_len == 1:
+ inner = self.vlan_id
+ else:
+ inner = self.push_vlan_id[-2]
- self.signature = '{}'.format(dev_id)
+ upstream_sig = '{}'.format(dev_id)
+ downstream_sig = '{}'.format(dev_id)
+
for port in ports:
- self.signature += '.{}'.format(port)
- self.signature += '.{}.{}'.format(outer, inner)
+ upstream_sig += '.{}'.format(port)
+ downstream_sig += '.{}'.format(port if self.handler.is_nni_port(port) else '*')
+
+ upstream_sig += '.{}.{}'.format(outer, inner)
+ downstream_sig += '.{}.*'.format(outer)
+
+ if self._flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ self.signature = downstream_sig
+ else:
+ self.signature = upstream_sig
+ self.downstream_signature = downstream_sig
return status
@@ -293,11 +374,18 @@
for field in fd.get_ofb_fields(self._flow):
if field.type == IN_PORT:
- pass # Handled earlier
+ assert self.in_port == field.port, 'Multiple Input Ports found in flow rule'
+
+ if self._handler.is_nni_port(self.in_port):
+ self._logical_port = self.in_port
elif field.type == VLAN_VID:
# log.info('*** field.type == VLAN_VID', value=field.vlan_vid & 0xfff)
self.vlan_id = field.vlan_vid & 0xfff
+ self._is_multicast = self.vlan_id in self._handler.multicast_vlans
+
+ if self._handler.is_pon_port(self.in_port):
+ self._logical_port = self.vlan_id
elif field.type == VLAN_PCP:
# log.info('*** field.type == VLAN_PCP', value=field.vlan_pcp)
@@ -347,6 +435,7 @@
for act in fd.get_actions(self._flow):
if act.type == fd.OUTPUT:
+ assert self.output == act.output.port, 'Multiple Output Ports found in flow rule'
pass # Handled earlier
elif act.type == POP_VLAN:
@@ -376,15 +465,47 @@
@staticmethod
def drop_missing_flows(device_id, valid_flow_ids):
- flow_table = _existing_flow_entries.get(device_id, None)
+ dl = []
+
+ flow_table = _existing_upstream_flow_entries.get(device_id)
+ if flow_table is not None:
+ flows_to_drop = [flow for flow_id, flow in flow_table.items()
+ if flow_id not in valid_flow_ids]
+ dl.extend([flow.remove() for flow in flows_to_drop])
+
+ sig_table = _existing_downstream_flow_entries.get(device_id)
+ if sig_table is not None:
+ for flow_table in sig_table.itervalues():
+ flows_to_drop = [flow for flow_id, flow in flow_table.items()
+ if isinstance(flow, FlowEntry) and flow_id not in valid_flow_ids]
+ dl.extend([flow.remove() for flow in flows_to_drop])
+
+ return gatherResults(dl, consumeErrors=True)
+
+ @staticmethod
+ def find_evc_map_flows(device_id, pon_id, onu_id=None):
+ """
+ For a given OLT, find all the EVC Maps for a specific PON ID and optionally a
+ specific ONU
+ :param device_id: Device ID
+ :param pon_id: (int) PON ID
+ :param onu_id: (int) Optional ONU ID
+ :return: (list) of matching flows
+ """
+ # EVCs are only in the downstream table, EVC Map are in upstream
+ flow_table = _existing_upstream_flow_entries.get(device_id, None)
+
if flow_table is None:
- return succeed('No table')
+ return []
- flows_to_drop = [flow for flow_id, flow in flow_table.items() if flow_id not in valid_flow_ids]
- if len(flows_to_drop) == 0:
- return succeed('No flows')
-
- return gatherResults([flow.remove() for flow in flows_to_drop])
+ flows = []
+ for flow in flow_table.itervalues():
+ evc_map = flow.evc_map
+ if evc_map is not None and evc_map.pon_id is not None and evc_map.pon_id == pon_id:
+ # PON ID Matches
+ if onu_id is None or onu_id in evc_map.gem_ids_and_vid:
+ flows.append(evc_map)
+ return flows
@inlineCallbacks
def remove(self):
@@ -393,34 +514,66 @@
if needed
"""
# Remove from exiting table list
+
device_id = self._handler.device_id
flow_id = self._flow.id
- flow_table = _existing_flow_entries.get(device_id, None)
+ flow_table = None
+ sig_table = None
+
+ if self.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ flow_table = _existing_upstream_flow_entries.get(device_id)
+
+ elif self.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ sig_table = _existing_downstream_flow_entries.get(device_id)
+ flow_table = sig_table.get(self.signature)
if flow_table is None or flow_id not in flow_table:
returnValue(succeed('NOP'))
+ # Remove from flow table and clean up flow table if empty
+
del flow_table[flow_id]
- if len(flow_table) == 0:
- del _existing_flow_entries[device_id]
+ evc_map, self.evc_map = self.evc_map, None
+ evc = None
+
+ if self.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ if len(flow_table) == 0:
+ del _existing_upstream_flow_entries[device_id]
+
+ elif self.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ flow_evc = flow_table['evc']
+
+ # If this flow owns the EVC, assign it to a remaining flow
+ if flow_id == flow_evc.flow_entry.flow_id:
+ flow_table['evc'].flow_entry = next((_flow for _flow in flow_table.itervalues()
+ if isinstance(_flow, FlowEntry)
+ and _flow.flow_id != flow_id), None)
+
+ if len(flow_table) == 1: # Only 'evc' entry present
+ evc = flow_evc
+ del flow_table['evc']
+ del sig_table[self.signature]
+ if len(sig_table) == 0:
+ del _existing_downstream_flow_entries[device_id]
+ else:
+ assert flow_table['evc'] is not None, 'EVC flow re-assignment error'
# Remove flow from the hardware
try:
dl = []
- if self.evc_map is not None:
- dl.append(self.evc_map.delete())
+ if evc_map is not None:
+ dl.append(evc_map.delete())
- if self.evc is not None:
- dl.append(self.evc.delete())
+ if evc is not None:
+ dl.append(evc.delete())
yield gatherResults(dl)
except Exception as e:
log.exception('removal', e=e)
- self.evc_map = None
self.evc = None
- returnValue('Done')
+ returnValue(succeed('Done'))
######################################################
# Bulk operations
diff --git a/voltha/adapters/adtran_olt/flow/mcast.py b/voltha/adapters/adtran_olt/flow/mcast.py
new file mode 100644
index 0000000..80d10ef
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/mcast.py
@@ -0,0 +1,184 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from voltha.core.flow_decomposer import *
+from evc import EVC
+from flow_entry import FlowEntry
+from twisted.internet import defer
+from twisted.internet.defer import returnValue, inlineCallbacks, succeed, gatherResults
+
+log = structlog.get_logger()
+
+EVC_NAME_FORMAT = 'VOLTHA-MCAST-{}' # format(flow.vlan_id)
+EVC_NAME_REGEX_ALL = EVC_NAME_FORMAT.format('*')
+
+
+_mcast_evcs = {} # device-id -> flow dictionary
+ # |
+ # +-> vlan-id -> evcs
+
+
+class MCastEVC(EVC):
+ """
+ Class to wrap Multicast EVC and EVC-MAP functionality
+ """
+ def __init__(self, flow_entry):
+ super(MCastEVC, self).__init__(flow_entry)
+ self._downstream_flows = {flow_entry.flow_id} # Matching Downstream Flow IDs
+
+ def __str__(self):
+ return "MCAST-{}: MEN: {}, VLAN: {}".format(self._name, self._men_ports, self._s_tag)
+
+ def _create_name(self):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.vlan_id)
+
+ def _create_evc_map(self, flow_entry):
+ from evc_map import EVCMap
+ flow = FakeUpstreamFlow(flow_entry.flow, flow_entry.handler)
+ return EVCMap.create_ingress_map(flow, self)
+
+ @staticmethod
+ def create(flow_entry):
+ from evc_map import EVCMap
+
+ device_id = flow_entry.device_id
+ if device_id not in _mcast_evcs:
+ _mcast_evcs[device_id] = {}
+
+ evc_table = _mcast_evcs[device_id]
+
+ try:
+ evc = evc_table.get(flow_entry.vlan_id)
+
+ if evc is None:
+ # Create EVC and initial EVC Map
+ evc = MCastEVC(flow_entry)
+ evc_table[flow_entry.vlan_id] = evc
+ else:
+ if flow_entry.flow_id in evc.downstream_flows: # TODO: Debug only to see if flow_ids are unique
+ pass
+ else:
+ evc.add_downstream_flows(flow_entry.flow_id)
+
+ fake_flow = FakeUpstreamFlow(flow_entry.flow, flow_entry.handler)
+ evc_map_name = EVCMap.create_evc_map_name(fake_flow)
+
+ if evc_map_name not in evc.evc_map_names:
+ EVCMap.create_ingress_map(fake_flow, evc)
+
+ return evc
+
+ except Exception as e:
+ log.exception('mcast-create', e=e)
+ return None
+
+ @property
+ def flow_entry(self):
+ return self._flow
+
+ @property
+ def downstream_flows(self):
+ return frozenset(self._downstream_flows)
+
+ def add_downstream_flows(self, flow_id):
+ self._downstream_flows.add(flow_id)
+
+ def remove_downstream_flows(self, flow_id):
+ self._downstream_flows.discard(flow_id)
+
+ @inlineCallbacks
+ def remove(self, remove_maps=True):
+ """
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
+ """
+ log.info('removing', evc=self, remove_maps=remove_maps)
+
+ device_id = self._handler.device_id
+ flow_id = self._flow.id
+ evc_table = _mcast_evcs.get(device_id)
+
+ if evc_table is None or flow_id not in evc_table:
+ returnValue(succeed('NOP'))
+
+ # Remove flow reference
+
+ if self._flow.flow_id in self._downstream_flows:
+ del self._downstream_flows[self._flow.flow_id]
+
+ if len(self._downstream_flows) == 0:
+ # Use base class to clean up
+ returnValue(super(MCastEVC, self).remove(remove_maps=True))
+
+ returnValue(succeed('More references'))
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
+ try:
+ dl = [self.remove()]
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete()) # TODO: implement bulk-flow procedures
+
+ yield defer.gatherResults(dl)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ def reflow(self, reflow_maps=True):
+ pass # TODO: Implement or use base class?
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_NAME_REGEX_ALL):
+ """
+ Remove all matching EVCs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ pass # TODO: ???
+
+
+class FakeUpstreamFlow(FlowEntry):
+ def __init__(self, flow, handler):
+ super(FakeUpstreamFlow, self).__init__(flow, handler)
+ self._decode()
+ # Change name that the base class set
+ self._name = self.create_flow_name()
+ self._flow_direction = FlowEntry.FlowDirection.UPSTREAM
+ self.in_port, self.output = self.output, self.in_port
+ self.flow_id = '{}-MCAST'.format(self.vlan_id)
+ self._logical_port = self.vlan_id
+ self.push_vlan_id = [self.vlan_id]
+ self.vlan_id = None
+ self.signature = None
+ self.inner_vid = None
+ self.pop_vlan = 0
+
+ def create_flow_name(self):
+ return 'flow-{}-{}-MCAST'.format(self.device_id, self.vlan_id)
diff --git a/voltha/adapters/adtran_olt/gem_port.py b/voltha/adapters/adtran_olt/gem_port.py
index 0be0d35..d4b744c 100644
--- a/voltha/adapters/adtran_olt/gem_port.py
+++ b/voltha/adapters/adtran_olt/gem_port.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,6 +13,7 @@
# limitations under the License.
import structlog
+import json
from voltha.protos.bbf_fiber_gemport_body_pb2 import GemportsConfigData
log = structlog.get_logger()
@@ -30,7 +31,7 @@
ident=None,
traffic_class=None,
intf_ref=None,
- exception=False, # TODO: Debug only, remove in production
+ exception=False, # FIXED_ONU
name=None):
self.name = name
self.gem_id = gem_id
@@ -39,10 +40,10 @@
self.intf_ref = intf_ref
self.traffic_class = traffic_class
self.id = ident
- self.encryption = encryption
- self.omci_transport = omci_transport
+ self._encryption = encryption
+ self._omci_transport = omci_transport
self.multicast = multicast
- self.exception = exception
+ self.exception = exception # FIXED_ONU
def __str__(self):
return "GemPort: {}, alloc-id: {}, gem-id: {}".format(self.name,
@@ -70,6 +71,14 @@
pass
return self._alloc_id
+ @property
+ def encryption(self):
+ return self._encryption
+
+ @property
+ def omci_transport(self):
+ return self._omci_transport
+
def to_dict(self):
return {
'port-id': self.gem_id,
@@ -77,3 +86,21 @@
'encryption': self.encryption,
'omci-transport': self.omci_transport
}
+
+ def add_to_hardware(self, session, pon_id, onu_id, operation='POST'):
+ from adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_GEM_CONFIG_LIST_URI.format(pon_id, onu_id)
+ data = json.dumps(self.to_dict())
+ name = 'gem-port-create-{}-{}: {}/{}'.format(pon_id, onu_id,
+ self.gem_id,
+ self.alloc_id)
+
+ return session.request(operation, uri, data=data, name=name)
+
+ def remove_from_hardware(self, session, pon_id, onu_id):
+ from adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_GEM_CONFIG_URI.format(pon_id, onu_id, self.gem_id)
+ name = 'gem-port-delete-{}-{}: {}'.format(pon_id, onu_id, self.gem_id)
+ return session.request('DELETE', uri, name=name)
diff --git a/voltha/adapters/adtran_olt/net/adtran_netconf.py b/voltha/adapters/adtran_olt/net/adtran_netconf.py
index 1665a70..7cbea60 100644
--- a/voltha/adapters/adtran_olt/net/adtran_netconf.py
+++ b/voltha/adapters/adtran_olt/net/adtran_netconf.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -43,7 +43,7 @@
"""
Performs NETCONF requests
"""
- def __init__(self, host_ip, port=830, username='', password='', timeout=20):
+ def __init__(self, host_ip, port=830, username='', password='', timeout=10):
self._ip = host_ip
self._port = port
self._username = username
diff --git a/voltha/adapters/adtran_olt/net/adtran_rest.py b/voltha/adapters/adtran_olt/net/adtran_rest.py
index edb64ad..b420485 100644
--- a/voltha/adapters/adtran_olt/net/adtran_rest.py
+++ b/voltha/adapters/adtran_olt/net/adtran_rest.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -66,7 +66,7 @@
for _method in _valid_methods:
assert _method in _valid_results # Make sure we have a results entry for each supported method
- def __init__(self, host_ip, port, username='', password='', timeout=20):
+ def __init__(self, host_ip, port, username='', password='', timeout=10):
"""
REST Client initialization
@@ -98,7 +98,7 @@
:param is_retry: (boolean) True if this method called recursively in order to recover
from a connection loss. Can happen sometimes in debug sessions
and in the real world.
- :return: (deferred)
+ :return: (dict) On success with the proper results
"""
if method.upper() not in self._valid_methods:
raise NotImplementedError("REST method '{}' is not supported".format(method))
@@ -140,7 +140,7 @@
except (ConnectionDone, ConnectionLost) as e:
if is_retry:
- returnValue(e)
+ raise
returnValue(self.request(method, uri, data=data, name=name,
timeout=timeout, is_retry=True))
diff --git a/voltha/adapters/adtran_olt/net/adtran_zmq.py b/voltha/adapters/adtran_olt/net/adtran_zmq.py
index 1c83ae1..9242130 100644
--- a/voltha/adapters/adtran_olt/net/adtran_zmq.py
+++ b/voltha/adapters/adtran_olt/net/adtran_zmq.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/voltha/adapters/adtran_olt/net/mock_netconf_client.py b/voltha/adapters/adtran_olt/net/mock_netconf_client.py
index c1d40dd..087a929 100644
--- a/voltha/adapters/adtran_olt/net/mock_netconf_client.py
+++ b/voltha/adapters/adtran_olt/net/mock_netconf_client.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index 751f8bb..e3c8a67 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -17,10 +17,11 @@
import random
import structlog
+import xmltodict
from enum import Enum
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
-
+from twisted.python.failure import Failure
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.protos.common_pb2 import OperStatus, AdminState
from voltha.protos.device_pb2 import Port
@@ -43,11 +44,11 @@
def __init__(self, parent, **kwargs):
# TODO: Weed out those properties supported by common 'Port' object
- assert parent
- assert 'port_no' in kwargs
+ assert parent, 'parent is None'
+ assert 'port_no' in kwargs, 'Port number not found'
self.log = structlog.get_logger(port_no=kwargs.get('port_no'))
- self.log.info('Creating NNI Port')
+ self.log.info('creating')
self._port_no = kwargs.get('port_no')
self._name = kwargs.get('name', 'nni-{}'.format(self._port_no))
@@ -55,12 +56,16 @@
self._logical_port = None
self._parent = parent
+ self._sync_tick = 20.0 # TODO: Implement
+ self._sync_deferred = None
+
self._deferred = None
self._state = NniPort.State.INITIAL
# Local cache of NNI configuration
self._enabled = None
+ self._ianatype = '<type xmlns:ianaift="urn:ietf:params:xml:ns:yang:iana-if-type">ianaift:ethernetCsmacd</type>'
# And optional parameters
# TODO: Currently cannot update admin/oper status, so create this enabled and active
@@ -109,10 +114,31 @@
def adapter_agent(self):
return self.olt.adapter_agent
+ @property
+ def iana_type(self):
+ return self._ianatype
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ assert isinstance(value, bool), 'enabled is a boolean'
+ if self._enabled != value:
+ if value:
+ self.start()
+ self.stop()
+
def _cancel_deferred(self):
- d, self._deferred = self._deferred, None
- if d is not None and not d.called:
- d.cancel()
+ d1, self._deferred = self._deferred, None
+ d2, self._sync_deferred = self._sync_deferred, None
+ for d in [d1, d2]:
+ try:
+ if d is not None and d.called:
+ d.cancel()
+ except:
+ pass
def _update_adapter_agent(self):
# TODO: Currently the adapter_agent does not allow 'update' of port status
@@ -165,7 +191,7 @@
if self._state == NniPort.State.RUNNING:
return succeed('Running')
- self.log.info('Starting NNI port')
+ self.log.info('starting')
self._cancel_deferred()
self._oper_status = OperStatus.ACTIVATING
@@ -173,7 +199,7 @@
# Do the rest of the startup in an async method
self._deferred = reactor.callLater(0, self._finish_startup)
- return self._deferred
+ return succeed('Scheduled')
@inlineCallbacks
def _finish_startup(self):
@@ -196,6 +222,9 @@
# TODO: Start status polling of NNI interfaces
self._deferred = None # = reactor.callLater(3, self.do_stuff)
self._state = NniPort.State.RUNNING
+ # Begin hardware sync
+ self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
+
returnValue(self._deferred)
@inlineCallbacks
@@ -203,7 +232,7 @@
if self._state == NniPort.State.STOPPED:
returnValue(succeed('Stopped'))
- self.log.info('stopping-nni')
+ self.log.info('stopping')
self._cancel_deferred()
# NOTE: Leave all NNI ports active (may have inband management)
@@ -220,7 +249,7 @@
results = yield self.set_config('enabled', False)
except Exception as e:
- self.log.exception('nni-start', e=e)
+ self.log.exception('nni-stop', e=e)
self._admin_state = AdminState.UNKNOWN
raise
@@ -239,7 +268,7 @@
Parent device is being deleted. Do not change any config but
stop all polling
"""
- self.log.info('Deleteing {}'.format(self._label))
+ self.log.info('deleting', label=self._label)
self._state = NniPort.State.DELETING
self._cancel_deferred()
@@ -250,10 +279,10 @@
NNI 'Start' is done elsewhere
"""
if self._state != NniPort.State.INITIAL:
- self.log.error('Reset ignored, only valid during initial startup', state=self._state)
+ self.log.error('reset-ignored', state=self._state)
returnValue('Ignored')
- self.log.info('Reset {}'.format(self._label))
+ self.log.info('resetting', label=self._label)
# Always enable our NNI ports
@@ -264,17 +293,20 @@
returnValue(results)
except Exception as e:
- self.log.exception('Reset of NNI to initial state failed', e=e)
+ self.log.exception('reset', e=e)
self._admin_state = AdminState.UNKNOWN
raise
@inlineCallbacks
def set_config(self, leaf, value):
- data = {'leaf': leaf, 'value': value}
+ if isinstance(value, bool):
+ value = 'true' if value else 'false'
+
config = '<interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
' <interface>' + \
' <name>{}</name>'.format(self._name) + \
- ' <{d[leaf]}>{d[value]}</{d[leaf]}>'.format(d=data) + \
+ ' {}'.format(self._ianatype) + \
+ ' <{}>{}</{}>'.format(leaf, value, leaf) + \
' </interface>' + \
'</interfaces>'
try:
@@ -282,9 +314,50 @@
returnValue(results)
except Exception as e:
- self.log.exception('Set Config', leaf=leaf, value=value, e=e)
+ self.log.exception('set', leaf=leaf, value=value, e=e)
raise
+ def get_nni_config(self):
+ config = '<filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ ' <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._name) + \
+ ' <enabled/>' + \
+ ' </interface>' + \
+ ' </interfaces>' + \
+ '</filter>'
+ return self._parent.netconf_client.get(config)
+
+ def _sync_hardware(self):
+ if self._state == NniPort.State.RUNNING or self._state == NniPort.State.STOPPED:
+ def read_config(results):
+ self.log.debug('read-config', results=results)
+ try:
+ result_dict = xmltodict.parse(results.data_xml)
+ entries = result_dict['data']['interfaces']['interface']
+
+ enabled = entries.get('enabled',
+ str(not self.enabled).lower()) == 'true'
+
+ return succeed('in-sync') if self.enabled == enabled else \
+ self.set_config('enabled', self.enabled)
+
+ except Exception as e:
+ self.log.exception('read-config', e=e)
+ return fail(Failure())
+
+ def failure(reason):
+ self.log.error('hardware-sync-failed', reason=reason)
+
+ def reschedule(_):
+ delay = self._sync_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
+
+ self._sync_deferred = self.get_nni_config()
+ self._sync_deferred.addCallbacks(read_config, failure)
+ self._sync_deferred.addBoth(reschedule)
+
class MockNniPort(NniPort):
"""
@@ -326,10 +399,10 @@
NNI 'Start' is done elsewhere
"""
if self._state != NniPort.State.INITIAL:
- self.log.error('Reset ignored, only valid during initial startup', state=self._state)
+ self.log.error('reset-ignored', state=self._state)
return fail()
- self.log.info('Reset {}'.format(self._label))
+ self.log.info('resetting', label=self._label)
# Always enable our NNI ports
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index bff3682..d7fd0cf 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -16,11 +16,12 @@
import binascii
import json
import structlog
+from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from adtran_olt_handler import AdtranOltHandler
-# Following is only used in autoactivate/demo mode. Otherwise xPON
+# Following is only used in autoactivate/demo mode. Otherwise xPON commands should be used
_VSSN_TO_VENDOR = {
'ADTN': 'adtran_onu',
'BCM?': 'broadcom_onu', # TODO: Get actual VSSN for this vendor
@@ -39,8 +40,6 @@
MIN_ONU_ID = 0
MAX_ONU_ID = 253 # G.984. 0..253, 254=reserved, 255=broadcast
BROADCAST_ONU_ID = 255
- # MAX_ONU_ID = 1022 # G.987. 0..1022, 1023=broadcast
- # BROADCAST_ONU_ID = 1023
DEFAULT_PASSWORD = ''
def __init__(self, onu_info):
@@ -58,21 +57,37 @@
if self._onu_id is None:
raise ValueError('No ONU ID available')
+ pon = onu_info['pon']
self._serial_number_base64 = Onu.string_to_serial_number(onu_info['serial-number'])
self._serial_number_string = onu_info['serial-number']
+ self._device_id = onu_info['device-id']
self._password = onu_info['password']
- self._pon = onu_info['pon']
- self._name = '{}@{}'.format(self._pon.name, self._onu_id)
+ self._olt = pon.olt
+ self._pon_id = pon.pon_id
+ self._name = '{}@{}'.format(pon.name, self._onu_id)
self._xpon_name = onu_info['xpon-name']
- # TODO: Change to OrderedDict sorted by ascending gem-id
self._gem_ports = {} # gem-id -> GemPort
self._tconts = {} # alloc-id -> TCont
self._onu_vid = onu_info['onu-vid']
+ self._uni_ports = [onu_info['onu-vid']]
+ assert len(self._uni_ports) == 1, 'Only one UNI port supported at this time'
+ self._channel_id = onu_info['channel-id']
self._enabled = onu_info['enabled']
+ self._rssi = -9999
+ self._equalization_delay = 0
+ self._fiber_length = 0
+ self._valid = True # Set false during delete/cleanup
+
+ self._include_multicast = True # TODO: May need to add multicast on a per-ONU basis
+
+ self._sync_tick = 60.0
+ self._expedite_sync = False
+ self._expedite_count = 0
+ self._sync_deferred = None # For sync of ONT config to hardware
# TODO: enable and upstream-channel-speed not yet supported
- self.log = structlog.get_logger(pon_id=self._pon.pon_id, onu_id=self._onu_id)
+ self.log = structlog.get_logger(pon_id=self._pon_id, onu_id=self._onu_id)
self._vendor_id = _VSSN_TO_VENDOR.get(self._serial_number_string.upper()[:4],
'Unsupported_{}'.format(self._serial_number_string))
@@ -81,7 +96,7 @@
pass
def __str__(self):
- return "Onu-{}-{}, PON: {}".format(self._onu_id, self._serial_number_string, self._pon)
+ return "Onu-{}-{}, PON ID: {}".format(self._onu_id, self._serial_number_string, self._pon_id)
@staticmethod
def serial_number_to_string(value):
@@ -97,12 +112,12 @@
return base64.b64encode(bvalue)
@property
- def pon(self):
- return self._pon
+ def olt(self):
+ return self._olt
@property
- def olt(self):
- return self.pon.olt
+ def pon(self):
+ return self.olt.southbound_ports[self._pon_id]
@property
def onu_id(self):
@@ -117,6 +132,15 @@
return self._onu_vid
@property
+ def logical_port(self):
+ """Return the logical PORT number of this ONU's UNI"""
+ return self._uni_ports[0]
+
+ @property
+ def channel_id(self):
+ return self._channel_id
+
+ @property
def serial_number(self):
return self._serial_number_base64
@@ -124,19 +148,60 @@
def vendor_id(self):
return self._vendor_id
+ @property
+ def rssi(self):
+ """The received signal strength indication of the ONU"""
+ return self._rssi
+
+ @rssi.setter
+ def rssi(self, value):
+ if self._rssi != value:
+ self._rssi = value
+ # TODO: Notify anyone?
+
+ @property
+ def equalization_delay(self):
+ """Equalization delay (bits)"""
+ return self._equalization_delay
+
+ @equalization_delay.setter
+ def equalization_delay(self, value):
+ if self._equalization_delay != value:
+ self._equalization_delay = value
+ # TODO: Notify anyone?
+
+ @property
+ def fiber_length(self):
+ """Distance to ONU"""
+ return self._fiber_length
+
+ @fiber_length.setter
+ def fiber_length(self, value):
+ if self._fiber_length != value:
+ self._fiber_length = value
+ # TODO: Notify anyone?
+
+ def _cancel_deferred(self):
+ d, self._sync_deferred = self._sync_deferred, None
+ if d is not None and not d.called:
+ try:
+ d.cancel()
+ except Exception:
+ pass
+
@inlineCallbacks
def create(self, tconts, gem_ports):
"""
POST -> /restconf/data/gpon-olt-hw:olt/pon=<pon-id>/onus/onu ->
"""
self.log.debug('create')
+ self._cancel_deferred()
- pon_id = self.pon.pon_id
data = json.dumps({'onu-id': self._onu_id,
'serial-number': self._serial_number_base64,
'enable': self._enabled})
- uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(pon_id)
- name = 'onu-create-{}-{}-{}: {}'.format(pon_id, self._onu_id,
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
+ name = 'onu-create-{}-{}-{}: {}'.format(self._pon_id, self._onu_id,
self._serial_number_base64, self._enabled)
try:
@@ -157,29 +222,241 @@
for _, gem_port in gem_ports.items():
try:
- if gem_port.multicast:
- self.log.warning('multicast-not-yet-supported', gem_port=gem_port) # TODO Support it
- continue
results = yield self.add_gem_port(gem_port)
except Exception as e:
self.log.exception('add-gem_port', gem_port=gem_port, e=e)
+ self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
+
returnValue(results)
+ @inlineCallbacks
+ def delete(self):
+ """
+ Clean up ONU (gems/tconts). ONU removal from OLT h/w done by PonPort
+ :return: (deferred)
+ """
+ self._valid = False
+ self._cancel_deferred()
+
+ # Remove from H/W
+
+ gem_ids = self._gem_ports.keys()
+ alloc_ids = self._tconts.keys()
+
+ dl = []
+ for gem_id in gem_ids:
+ dl.append(self.remove_gem_id(gem_id))
+
+ try:
+ yield defer.gatherResults(dl, consumeErrors=True)
+ except Exception:
+ pass
+
+ dl = []
+ for alloc_id in alloc_ids:
+ dl.append(self.remove_tcont(alloc_id))
+
+ try:
+ yield defer.gatherResults(dl, consumeErrors=True)
+ except Exception:
+ pass
+
+ self._gem_ports.clear()
+ self._tconts.clear()
+ self._olt = None
+ self._channel_id = None
+
+ returnValue(succeed('deleted'))
+
def restart(self):
+ if not self._valid:
+ return succeed('Deleting')
tconts, self._tconts = self._tconts, {}
gem_ports, self._gem_ports = self._gem_ports, {}
return self.create(tconts, gem_ports)
+ def _sync_hardware(self):
+ from codec.olt_config import OltConfig
+
+ def read_config(results):
+ self.log.debug('read-config', results=results)
+
+ config = OltConfig.Pon.Onu.decode([results])
+ assert self.onu_id in config, 'sync-onu-not-found-{}'.format(self.onu_id)
+ config = config[self.onu_id]
+ dl = []
+
+ if self._enabled != config.enable:
+ dl.append(self._set_config('enable', self._enabled))
+
+ if self.serial_number != config.serial_number:
+ dl.append(self._set_config('serial-number', self.serial_number))
+
+ # Sync TCONTs if everything else in sync
+
+ if len(dl) == 0:
+ dl.extend(sync_tconts(config.tconts))
+
+ # Sync GEM Ports if everything else in sync
+
+ if len(dl) == 0:
+ dl.extend(sync_gem_ports(config.gem_ports))
+
+ # Run h/w sync again a bit faster if we had to sync anything
+ self._expedite_sync = len(dl) > 0
+
+ # TODO: do checks
+ return defer.gatherResults(dl, consumeErrors=True)
+
+ def sync_tconts(hw_tconts):
+ hw_alloc_ids = frozenset(hw_tconts.iterkeys())
+ my_alloc_ids = frozenset(self._tconts.iterkeys())
+ dl = []
+
+ extra_alloc_ids = hw_alloc_ids - my_alloc_ids
+ dl.extend(sync_delete_extra_tconts(extra_alloc_ids))
+
+ missing_alloc_ids = my_alloc_ids - hw_alloc_ids
+ dl.extend(sync_add_missing_tconts(missing_alloc_ids))
+
+ matching_alloc_ids = my_alloc_ids & hw_alloc_ids
+ matching_hw_tconts = {alloc_id: tcont
+ for alloc_id, tcont in hw_tconts.iteritems()
+ if alloc_id in matching_alloc_ids}
+ dl.extend(sync_matching_tconts(matching_hw_tconts))
+
+ return dl
+
+ def sync_delete_extra_tconts(alloc_ids):
+ return [self.remove_tcont(alloc_id) for alloc_id in alloc_ids]
+
+ def sync_add_missing_tconts(alloc_ids):
+ return [self.add_tcont(self._tconts[alloc_id], add_always=True) for alloc_id in alloc_ids]
+
+ def sync_matching_tconts(hw_tconts):
+ from tcont import TrafficDescriptor
+
+ dl = []
+ # TODO: sync TD & Best Effort. Only other TCONT leaf is the key
+ for alloc_id, hw_tcont in hw_tconts.iteritems():
+ my_tcont = self._tconts[alloc_id]
+ my_td = my_tcont.traffic_descriptor
+ hw_td = hw_tcont.traffic_descriptor
+ if my_td is None:
+ continue
+
+ my_additional = TrafficDescriptor.AdditionalBwEligibility.\
+ to_string(my_td.additional_bandwidth_eligibility)
+
+ reflow = hw_td is None or \
+ my_td.fixed_bandwidth != hw_td.fixed_bandwidth or \
+ my_td.assured_bandwidth != hw_td.assured_bandwidth or \
+ my_td.maximum_bandwidth != hw_td.maximum_bandwidth or \
+ my_additional != hw_td.additional_bandwidth_eligibility
+
+ if not reflow and \
+ my_td.additional_bandwidth_eligibility == \
+ TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING and \
+ my_td.best_effort is not None:
+
+ hw_be = hw_td.best_effort
+ my_be = my_td.best_effort
+
+ reflow = hw_be is None or \
+ my_be.bandwidth != hw_be.bandwidth or \
+ my_be.priority != hw_be.priority or \
+ my_be.weight != hw_be.weight
+
+ if reflow:
+ dl.append(my_tcont.add_to_hardware(self.olt.rest_client,
+ self._pon_id,
+ self._onu_id,
+ operation="PATCH"))
+ return dl
+
+ def sync_gem_ports(hw_gem_ports):
+ hw_gems_ids = frozenset(hw_gem_ports.iterkeys())
+ my_gems_ids = frozenset(self._gem_ports.iterkeys())
+ dl = []
+
+ extra_gems_ids = hw_gems_ids - my_gems_ids
+ dl.extend(sync_delete_extra_gem_ports(extra_gems_ids))
+
+ missing_gem_ids = my_gems_ids - hw_gems_ids
+ dl.extend(sync_add_missing_gem_ports(missing_gem_ids))
+
+ matching_gem_ids = my_gems_ids & hw_gems_ids
+ matching_hw_gem_ports = {gem_id: gem_port
+ for gem_id, gem_port in hw_gem_ports.iteritems()
+ if gem_id in matching_gem_ids}
+ dl.extend(sync_matching_gem_ports(matching_hw_gem_ports))
+
+ return dl
+
+ def sync_delete_extra_gem_ports(gem_ids):
+ return [self.remove_gem_id(gem_id) for gem_id in gem_ids]
+
+ def sync_add_missing_gem_ports(gem_ids):
+ return [self.add_gem_port(self._gem_ports[gem_id], add_always=True) for gem_id in gem_ids]
+
+ def sync_matching_gem_ports(hw_gem_ports):
+ dl = []
+ for gem_id, hw_gem_port in hw_gem_ports.iteritems():
+ gem_port = self._gem_ports[gem_id]
+
+ if gem_port.alloc_id != hw_gem_port.alloc_id or\
+ gem_port.encryption != hw_gem_port.encryption or\
+ gem_port.omci_transport != hw_gem_port.omci_transport:
+ dl.append(gem_port.add_to_hardware(self.olt.rest_client,
+ self.pon.pon_id,
+ self.onu_id,
+ operation='PATCH'))
+ return dl
+
+ def failure(reason):
+ # self.log.error('hardware-sync-get-config-failed', reason=reason)
+ pass
+
+ def reschedule(_):
+ import random
+ delay = self._sync_tick
+
+ # Speed up sequential resync a limited number of times if out of sync
+ # With 60 second initial an typical worst case resync of 4 times, this
+ # should resync an ONU and all it's gem-ports and tconts within <90 seconds
+
+ if self._expedite_sync:
+ self._expedite_count += 1
+ if self._expedite_count < 5:
+ delay = 5
+ else:
+ self._expedite_count = 0
+
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
+ self._expedite_sync = False
+
+ pon_enabled = self.pon.enabled
+ if not pon_enabled:
+ return reschedule('not-enabled')
+
+ self._sync_deferred = self._get_config()
+ self._sync_deferred.addCallbacks(read_config, failure)
+ self._sync_deferred.addBoth(reschedule)
+
+ def _get_config(self):
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self.onu_id)
+ name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, self.onu_id)
+ return self.olt.rest_client.request('GET', uri, name=name)
+
def set_config(self, leaf, value):
self.log.debug('set-config', leaf=leaf, value=value)
- pon_id = self.pon.pon_id
- data = json.dumps({'onu-id': self._onu_id,
- leaf: value})
- uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(pon_id)
- name = 'onu-set-config-{}-{}-{}: {}'.format(pon_id, self._onu_id, leaf, value)
+ data = json.dumps({'onu-id': self._onu_id, leaf: value})
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
+ name = 'onu-set-config-{}-{}-{}: {}'.format(self._pon_id, self._onu_id, leaf, value)
return self.olt.rest_client.request('PATCH', uri, data=data, name=name)
@property
@@ -190,91 +467,87 @@
return frozenset(self._tconts.keys())
@inlineCallbacks
- def add_tcont(self, tcont):
+ def add_tcont(self, tcont, add_always=False):
"""
Creates/ a T-CONT with the given alloc-id
:param tcont: (TCont) Object that maintains the TCONT properties
+ :param add_always: (boolean) If true, force add (used during h/w resync)
+ :return: (deferred)
"""
- from tcont import TrafficDescriptor
+ if not self._valid:
+ returnValue(succeed('Deleting'))
- if tcont.alloc_id in self._tconts:
+ if not add_always and tcont.alloc_id in self._tconts:
returnValue(succeed('already created'))
- pon_id = self.pon.pon_id
- uri = AdtranOltHandler.GPON_TCONT_CONFIG_LIST_URI.format(pon_id, self.onu_id)
- data = json.dumps({'alloc-id': tcont.alloc_id})
- name = 'tcont-create-{}-{}: {}'.format(pon_id, self._onu_id, tcont.alloc_id)
-
try:
- results = yield self.olt.rest_client.request('POST', uri, data=data, name=name)
+ results = yield tcont.add_to_hardware(self.olt.rest_client,
+ self._pon_id, self._onu_id)
self._tconts[tcont.alloc_id] = tcont
except Exception as e:
self.log.exception('tcont', tcont=tcont, e=e)
raise
- # TODO May want to pull this out and have it accessible elsewhere once xpon work supports TDs
-
- if tcont.traffic_descriptor is not None:
- uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, self.onu_id, tcont.alloc_id)
- data = json.dumps({'traffic-descriptor': tcont.traffic_descriptor.to_dict()})
- name = 'tcont-td-{}-{}: {}'.format(pon_id, self._onu_id, tcont.alloc_id)
- try:
- results = yield self.olt.rest_client.request('PATCH', uri, data=data, name=name)
-
- except Exception as e:
- self.log.exception('traffic-descriptor', td=tcont.traffic_descriptor, e=e)
-
- if tcont.traffic_descriptor.additional_bandwidth_eligibility == \
- TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
- if tcont.best_effort is None:
- raise ValueError('TCONT {} is best-effort but does not define best effort sharing'.
- format(tcont.name))
-
- data = json.dumps({'best-effort': tcont.best_effort.to_dict()})
- name = 'tcont-best-effort-{}-{}: {}'.format(pon_id, self._onu_id, tcont.alloc_id)
- try:
- results = yield self.olt.rest_client.request('PATCH', uri, data=data, name=name)
-
- except Exception as e:
- self.log.exception('best-effort', best_effort=tcont.best_effort, e=e)
- raise
-
returnValue(results)
+ @inlineCallbacks
def remove_tcont(self, alloc_id):
- if alloc_id in self._tconts:
- del self._tconts[alloc_id]
+ # TODO: If alloc-id in use by a gemport, should we deny request?
+ tcont = self._tconts.get(alloc_id)
- # Always remove from OLT hardware
- pon_id = self.pon.pon_id
- uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, self.onu_id, alloc_id)
- name = 'tcont-delete-{}-{}: {}'.format(pon_id, self._onu_id, alloc_id)
- return self.olt.rest_client.request('DELETE', uri, name=name)
+ if tcont is None:
+ returnValue(succeed('nop'))
- #@property
+ del self._tconts[alloc_id]
+
+ try:
+ results = yield tcont.remove_from_hardware()
+
+ except Exception as e:
+ self.log.exception('delete', e=e)
+ raise
+
+ returnValue(succeed(results))
+
def gem_ids(self, exception_gems):
"""Get all GEM Port IDs used by this ONU"""
- return frozenset([gem_id for gem_id, gem in self._gem_ports.items()
- if gem.exception == exception_gems])
- # return frozenset(self._gem_ports.keys())
+ if exception_gems:
+ gem_ids = sorted([gem_id for gem_id, gem in self._gem_ports.items()
+ if gem.exception and not gem.multicast]) # FIXED_ONU
+ return gem_ids
+ else:
+ return sorted([gem_id for gem_id, gem in self._gem_ports.items()
+ if not gem.multicast and not gem.exception]) # FIXED_ONU
@inlineCallbacks
- def add_gem_port(self, gem_port):
- if gem_port.gem_id in self._gem_ports:
+ def add_gem_port(self, gem_port, add_always=False):
+ """
+ Add a GEM Port to this ONU
+
+ :param gem_port: (GemPort) GEM Port to add
+ :param add_always: (boolean) If true, force add (used during h/w resync)
+ :return: (deferred)
+ """
+ if not self._valid:
+ returnValue(succeed('Deleting'))
+
+ if not add_always and gem_port.gem_id in self._gem_ports:
returnValue(succeed('already created'))
- pon_id = self.pon.pon_id
- uri = AdtranOltHandler.GPON_GEM_CONFIG_LIST_URI.format(pon_id, self.onu_id)
- data = json.dumps(gem_port.to_dict())
- name = 'gem-port-create-{}-{}: {}/{}'.format(pon_id, self._onu_id,
- gem_port.gem_id,
- gem_port.alloc_id)
try:
- results = yield self.olt.rest_client.request('POST', uri, data=data, name=name)
+ results = yield gem_port.add_to_hardware(self.olt.rest_client,
+ self._pon_id,
+ self.onu_id)
self._gem_ports[gem_port.gem_id] = gem_port
- # TODO: May need to update flow tables/evc-maps
+
+ # May need to update flow tables/evc-maps
+ if gem_port.alloc_id in self._tconts:
+ # GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
+ from flow.flow_entry import FlowEntry
+ evc_maps = FlowEntry.find_evc_map_flows(self._device_id, self._pon_id, self._onu_id)
+ pass
except Exception as e:
self.log.exception('gem-port', e=e)
@@ -282,16 +555,29 @@
returnValue(results)
+ @inlineCallbacks
def remove_gem_id(self, gem_id):
- if gem_id in self._gem_ports:
- del self._gem_ports[gem_id]
- # TODO: May need to update flow tables/evc-maps
+ gem_port = self._gem_ports.get(gem_id)
- # Always remove from OLT hardware
- pon_id = self.pon.pon_id
- uri = AdtranOltHandler.GPON_GEM_CONFIG_URI.format(pon_id, self.onu_id, gem_id)
- name = 'gem-port-delete-{}-{}: {}'.format(pon_id, self._onu_id, gem_id)
- return self.olt.rest_client.request('DELETE', uri, name=name)
+ if gem_port is None:
+ returnValue(succeed('nop'))
+
+ del self._gem_ports[gem_id]
+
+ try:
+ if gem_port.alloc_id in self._tconts:
+ # May need to update flow tables/evc-maps
+ # GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
+ pass
+
+ results = yield gem_port.remove_from_hardware(self.olt.rest_client,
+ self._pon_id,
+ self.onu_id)
+ except Exception as e:
+ self.log.exception('delete', e=e)
+ raise
+
+ returnValue(succeed(results))
@staticmethod
def gem_id_to_gvid(gem_id):
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index 1ab4a7b..aae1e20 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,8 +13,8 @@
# limitations under the License.
import json
-import pprint
import random
+import arrow
import structlog
from enum import Enum
@@ -27,6 +27,7 @@
from voltha.protos.common_pb2 import OperStatus, AdminState
from voltha.protos.device_pb2 import Device
from voltha.protos.device_pb2 import Port
+from voltha.protos.events_pb2 import AlarmEventType, AlarmEventSeverity, AlarmEventState, AlarmEventCategory
class PonPort(object):
@@ -38,6 +39,10 @@
"""
MAX_ONUS_SUPPORTED = 256
DEFAULT_ENABLED = False
+ MAX_DEPLOYMENT_RANGE = 40000 # Meters
+
+ _MCAST_ONU_ID = 253
+ _MCAST_ALLOC_BASE = 0x500
class State(Enum):
INITIAL = 0 # Created and initialization in progress
@@ -48,10 +53,8 @@
_SUPPORTED_ACTIVATION_METHODS = ['autodiscovery', 'autoactivate']
_SUPPORTED_AUTHENTICATION_METHODS = ['serial-number']
- def __init__(self, pon_index, port_no, parent, admin_state=AdminState.UNKNOWN, label=None):
+ def __init__(self, pon_index, port_no, parent, label=None):
# TODO: Weed out those properties supported by common 'Port' object (future)
- assert admin_state != AdminState.UNKNOWN
-
self.log = structlog.get_logger(device_id=parent.device_id, pon_id=pon_index)
self._parent = parent
@@ -63,25 +66,37 @@
self._no_onu_discover_tick = 5.0 # TODO: Decrease to 1 or 2 later
self._discovery_tick = 20.0
self._discovered_onus = [] # List of serial numbers
+ self._sync_tick = 20.0
+ self._in_sync = False
self._onus = {} # serial_number-base64 -> ONU (allowed list)
self._onu_by_id = {} # onu-id -> ONU
- self._next_onu_id = Onu.MIN_ONU_ID
- self._pon_evc_map = {} # evc-map name -> EVC Map
+ self._next_onu_id = Onu.MIN_ONU_ID + 128
+ self._mcast_gem_ports = {} # VLAN -> GemPort
self._admin_state = AdminState.DISABLED
self._oper_status = OperStatus.DISCOVERED
+ self._state = PonPort.State.INITIAL
self._deferred = None # General purpose
self._discovery_deferred = None # Specifically for ONU discovery
- self._state = PonPort.State.INITIAL
+ self._sync_deferred = None # For sync of PON config to hardware
- # Local cache of PON configuration
+ self._active_los_alarms = set() # ONU-ID
+
+ # xPON configuration
self._xpon_name = None
- self._enabled = None
- self._downstream_fec_enable = None
- self._upstream_fec_enable = None
+ self._enabled = False
+ self._downstream_fec_enable = False
+ self._upstream_fec_enable = False
+ self._deployment_range = 25000
self._authentication_method = 'serial-number'
- self._activation_method = 'autoactivate' if self.olt.autoactivate else 'autodiscovery'
+
+ if self.olt.autoactivate:
+ # Enable PON on startup
+ self._activation_method = 'autoactivate'
+ self._admin_state = AdminState.ENABLED
+ else:
+ self._activation_method = 'autodiscovery'
def __del__(self):
self.stop()
@@ -154,6 +169,60 @@
return self.olt.adapter_agent
@property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ assert isinstance(value, bool), 'enabled is a boolean'
+ if self._enabled != value:
+ if value:
+ self.start()
+ self.stop()
+
+ @property
+ def downstream_fec_enable(self):
+ return self._downstream_fec_enable
+
+ @downstream_fec_enable.setter
+ def downstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'downstream FEC enabled is a boolean'
+
+ if self._downstream_fec_enable != value:
+ self._downstream_fec_enable = value
+ if self._state == PonPort.State.RUNNING:
+ self._deferred = self._set_pon_config("downstream-fec-enable", value)
+
+ @property
+ def upstream_fec_enable(self):
+ return self._upstream_fec_enable
+
+ @upstream_fec_enable.setter
+ def upstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'upstream FEC enabled is a boolean'
+
+ if self._upstream_fec_enable != value:
+ self._upstream_fec_enable = value
+ if self._state == PonPort.State.RUNNING:
+ self._deferred = self._set_pon_config("upstream-fec-enable", value)
+
+ @property
+ def deployment_range(self):
+ """Maximum deployment range (in meters)"""
+ return self._deployment_range
+
+ @deployment_range.setter
+ def deployment_range(self, value):
+ """Maximum deployment range (in meters)"""
+ if not 0 <= value <= PonPort.MAX_DEPLOYMENT_RANGE:
+ raise ValueError('Deployment range should be 0..{} meters'.
+ format(PonPort.MAX_DEPLOYMENT_RANGE))
+ if self._deployment_range != value:
+ self._deployment_range = value
+ if self._state == PonPort.State.RUNNING:
+ self._deferred = self._set_pon_config("deployment-range", value)
+
+ @property
def discovery_tick(self):
return self._discovery_tick * 10
@@ -165,9 +234,13 @@
if self.discovery_tick != value:
self._discovery_tick = value / 10
- if self._discovery_deferred is not None and not self._discovery_deferred.called:
- self._discovery_deferred.cancel()
- self._discovery_deferred = None
+ try:
+ if self._discovery_deferred is not None and \
+ not self._discovery_deferred.called:
+ self._discovery_deferred.cancel()
+ except:
+ pass
+ self._discovery_deferred = None
if self._discovery_tick > 0:
self._discovery_deferred = reactor.callLater(self._discovery_tick,
@@ -207,16 +280,12 @@
def _cancel_deferred(self):
d1, self._deferred = self._deferred, None
d2, self._discovery_deferred = self._discovery_deferred, None
-
- if d1 is not None and not d1.called:
- try:
- d1.cancel()
- except Exception as e:
- pass
+ d3, self._sync_deferred = self._sync_deferred, None
- if d2 is not None and not d2.called:
+ for d in [d1, d2, d3]:
try:
- d2.cancel()
+ if d is not None and not d.called:
+ d.cancel()
except Exception as e:
pass
@@ -238,12 +307,13 @@
self._cancel_deferred()
self._state = PonPort.State.INITIAL
self._oper_status = OperStatus.ACTIVATING
+ self._enabled = True
# Do the rest of the startup in an async method
self._deferred = reactor.callLater(0.5, self._finish_startup)
self._update_adapter_agent()
- return self._deferred
+ return succeed('Scheduled')
@inlineCallbacks
def _finish_startup(self):
@@ -255,107 +325,108 @@
self.log.debug('final-startup')
- if self._enabled is None or self._downstream_fec_enable is None or self._upstream_fec_enable is None:
+ try:
+ self._deferred = self._get_pon_config()
+ results = yield self._deferred
+
+ except Exception as e:
+ self.log.exception('initial-GET', e=e)
+ self._deferred = reactor.callLater(5, self._finish_startup)
+ returnValue(self._deferred)
+
+ # Load config from hardware
+
+ enabled = results.get('enabled', False)
+ downstream_fec_enable = results.get('downstream-fec-enable', False)
+ upstream_fec_enable = results.get('upstream-fec-enable', False)
+ deployment_range = results.get('deployment-range', 25000)
+ self._in_sync = True
+
+ if enabled != self._enabled:
try:
- self._deferred = self.get_pon_config()
- results = yield self._deferred
-
- except Exception as e:
- self.log.exception('initial-GET', e=e)
- self._deferred = reactor.callLater(5, self._finish_startup)
- returnValue(self._deferred)
-
- # Load cache
-
- self._enabled = results.get('enabled', False)
- self._downstream_fec_enable = results.get('downstream-fec-enable', False)
- self._upstream_fec_enable = results.get('upstream-fec-enable', False)
-
- if not self._enabled:
- try:
- self._deferred = self.set_pon_config("enabled", True)
- results = yield self._deferred
- self._enabled = True
+ self._deferred = self._set_pon_config("enabled", True)
+ yield self._deferred
except Exception as e:
self.log.exception('final-startup-enable', e=e)
self._deferred = reactor.callLater(3, self._finish_startup)
returnValue(self._deferred)
- if not self._downstream_fec_enable:
+ if downstream_fec_enable != self._downstream_fec_enable:
try:
- self._deferred = self.set_pon_config("downstream-fec-enable", True)
- results = yield self._deferred
- self._downstream_fec_enable = True
+ self._deferred = self._set_pon_config("downstream-fec-enable",
+ self._downstream_fec_enable)
+ yield self._deferred
except Exception as e:
- self.log.exception('final-startup-downstream-FEC', e=e)
- self._deferred = reactor.callLater(5, self._finish_startup)
- returnValue(self._deferred)
+ self.log.warning('final-startup-downstream-FEC', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
- if not self._upstream_fec_enable:
+ if upstream_fec_enable != self._upstream_fec_enable:
try:
- self._deferred = self.set_pon_config("upstream-fec-enable", True)
- results = yield self._deferred
- self._upstream_fec_enable = True
+ self._deferred = self._set_pon_config("upstream-fec-enable",
+ self._upstream_fec_enable)
+ yield self._deferred
except Exception as e:
- self.log.exception('final-startup-upstream-FEC', e=e)
- self._deferred = reactor.callLater(5, self._finish_startup)
- returnValue(self._deferred)
+ self.log.warning('final-startup-upstream-FEC', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
- self.log.debug('startup-complete', results=pprint.PrettyPrinter().pformat(results))
+ if deployment_range != self._deployment_range:
+ try:
+ self._deferred = self._set_pon_config("deployment-range",
+ self._deployment_range)
+ yield self._deferred
- if self._enabled:
- self._admin_state = AdminState.ENABLED
- self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
- self._state = PonPort.State.RUNNING
+ except Exception as e:
+ self.log.warning('final-startup-deployment-range', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
- # Restart any ONU's in case here due to reboot
+ # If here, initial settings were successfully written to hardware
- if len(self._onus) > 0:
- dl = []
- for onu in self._onus.itervalues():
- dl.append(onu.restart())
- yield defer.gatherResults(dl)
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+ self._state = PonPort.State.RUNNING
- # Begin to ONU discovery
+ # Restart any ONU's in case here due to reboot
- self._discovery_deferred = reactor.callLater(5, self._discover_onus)
+ if len(self._onus) > 0:
+ dl = []
+ for onu in self._onus.itervalues():
+ dl.append(onu.restart())
+ yield defer.gatherResults(dl, consumeErrors=True)
- self._update_adapter_agent()
- returnValue('Enabled')
+ # Begin to ONU discovery and hardware sync
- else:
- # Startup failed. Could be due to object creation with an invalid initial admin_status
- # state. May want to schedule a start to occur again if this happens
- self._admin_state = AdminState.DISABLED
- self._oper_status = OperStatus.FAILED
- self._state = PonPort.State.STOPPED
+ self._discovery_deferred = reactor.callLater(5, self._discover_onus)
+ self._sync_deferred = reactor.callLater(60, self._sync_hardware)
- self._update_adapter_agent()
- returnValue('Disabled')
+ self._update_adapter_agent()
+ returnValue('Enabled')
+ @inlineCallbacks
def stop(self):
if self._state == PonPort.State.STOPPED:
- return succeed('Stopped')
+ self.log.debug('already stopped')
+ returnValue(succeed('Stopped'))
self.log.info('stopping')
self._cancel_deferred()
- self._deferred = self.set_pon_config("enabled", False)
-
- # Flush config cache
- self._enabled = None
- self._downstream_fec_enable = None
- self._upstream_fec_enable = None
+ self._enabled = False
+ results = yield self._set_pon_config("enabled", False)
+ self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
self._admin_state = AdminState.DISABLED
self._oper_status = OperStatus.UNKNOWN
self._update_adapter_agent()
self._state = PonPort.State.STOPPED
- return self._deferred
+ self.log.debug('stopped')
+ returnValue(results)
@inlineCallbacks
def reset(self):
@@ -367,52 +438,49 @@
self.log.error('reset-ignored', state=self._state)
returnValue('Ignored')
- self.log.info('reset')
+ initial_port_state = AdminState.ENABLED if self.olt.autoactivate else AdminState.DISABLED
+ self.log.info('reset', initial_state=initial_port_state)
try:
- self._deferred = self.get_pon_config()
+ self._deferred = self._get_pon_config()
results = yield self._deferred
-
- # Load cache
- self._enabled = results.get('enabled', False)
+ enabled = results.get('enabled', False)
except Exception as e:
- self._enabled = None
- self.log.exception('GET-failed', e=e)
+ self.log.exception('get-config', e=e)
+ enabled = False
- initial_port_state = AdminState.ENABLED if self.olt.autoactivate else AdminState.DISABLED
+ enable = initial_port_state == AdminState.ENABLED
- if self._admin_state != initial_port_state:
+ if enable != enabled:
try:
- enable = initial_port_state == AdminState.ENABLED
- if self._enabled is None or self._enabled != enable:
- yield self.set_pon_config("enabled", enable)
-
- # TODO: Move to 'set_pon_config' method and also make sure GRPC/Port is ok
- self._admin_state = AdminState.ENABLED if enable else AdminState.DISABLED
-
+ self._deferred = yield self._set_pon_config("enabled", enable)
except Exception as e:
- self.log.exception('reset', e=e)
- raise
+ self.log.exception('reset-enabled', e=e, enabled=enabled)
- # Walk the provisioned ONU list and disable any exiting ONUs
+ # TODO: Move to 'set_pon_config' method and also make sure GRPC/Port is ok
+ self._admin_state = AdminState.ENABLED if enable else AdminState.DISABLED
try:
- results = yield self.get_onu_config()
+ # Walk the provisioned ONU list and disable any exiting ONUs
+ results = yield self._get_onu_config()
if isinstance(results, list) and len(results) > 0:
onu_configs = OltConfig.Pon.Onu.decode(results)
+ dl = []
for onu_id in onu_configs.iterkeys():
- try:
- yield self.delete_onu(onu_id)
+ dl.append(self.delete_onu(onu_id))
- except Exception as e:
- self.log.exception('rest-ONU-delete', onu_id=onu_id, e=e)
- pass # Non-fatal
+ try:
+ if len(dl) > 0:
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ self.log.exception('rest-ONU-delete', onu_id=onu_id, e=e)
+ pass # Non-fatal
except Exception as e:
self.log.exception('onu-delete', e=e)
- raise
returnValue('Reset complete')
@@ -420,9 +488,6 @@
if self._state == PonPort.State.RUNNING or self._state == PonPort.State.STOPPED:
start_it = (self._state == PonPort.State.RUNNING)
self._state = PonPort.State.INITIAL
- self._enabled = None
- self._downstream_fec_enable = None
- self._upstream_fec_enable = None
return self.start() if start_it else self.stop()
return succeed('nop')
@@ -436,28 +501,39 @@
self._state = PonPort.State.DELETING
self._cancel_deferred()
- # @property
- def gem_ids(self, onu_vid, exception_gems):
+ def gem_ids(self, vid, exception_gems, multicast_gems):
"""
Get all GEM Port IDs used on a given PON
- :param onu_vid: (int) ONU VLAN ID if customer ONU specific. None if for all ONUs
- on PON
+ :param vid: (int) VLAN ID if customer ONU specific. None if for all ONUs
+ on PON, if Multicast, VID for Multicast, or None for all\
+ Multicast GEMPorts
:param exception_gems: (boolean) Select from special purpose ACL GEM-Portas
- :return: (dict) key -> onu-id, value -> tuple(frozenset of GEM Port IDs, onu_vid)
+ :param multicast_gems: (boolean) Select from available Multicast GEM Ports
+ :return: (dict) data_gem -> key -> onu-id, value -> tuple(sorted list of GEM Port IDs, onu_vid)
+ mcast_gem-> key -> mcast-vid, value -> GEM Port IDs
"""
gem_ids = {}
- for onu_id, onu in self._onu_by_id.iteritems():
- if onu_vid is None or onu_vid == onu.onu_vid:
- gem_ids[onu_id] = (onu.gem_ids(exception_gems), onu.onu_vid)
+
+ if multicast_gems:
+ # Multicast GEMs belong to the PON, but we may need to register them on
+ # all ONUs. Rework when BBF MCAST Gems are supported
+ for vlan, gem_port in self._mcast_gem_ports.iteritems():
+ if vid is None or (vid == vlan and vid in self.olt.multicast_vlans):
+ gem_ids[vlan] = ([gem_port.gem_id], None)
+ else:
+ for onu_id, onu in self._onu_by_id.iteritems():
+ if vid is None or vid == onu.onu_vid:
+ gem_ids[onu_id] = (onu.gem_ids(exception_gems), onu.onu_vid) # FIXED_ONU
+
return gem_ids
- def get_pon_config(self):
+ def _get_pon_config(self):
uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
name = 'pon-get-config-{}'.format(self._pon_id)
return self._parent.rest_client.request('GET', uri, name=name)
- def get_onu_config(self, onu_id=None):
+ def _get_onu_config(self, onu_id=None):
if onu_id is None:
uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
else:
@@ -466,35 +542,104 @@
name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, onu_id)
return self._parent.rest_client.request('GET', uri, name=name)
- def set_pon_config(self, leaf, value):
+ def _set_pon_config(self, leaf, value):
data = json.dumps({leaf: value})
uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
name = 'pon-set-config-{}-{}-{}'.format(self._pon_id, leaf, str(value))
return self._parent.rest_client.request('PATCH', uri, data=data, name=name)
def _discover_onus(self):
- self.log.debug('discovery')
-
+ self.log.debug('discovery', state=self._admin_state, in_sync=self._in_sync)
if self._admin_state == AdminState.ENABLED:
- data = json.dumps({'pon-id': self._pon_id})
- uri = AdtranOltHandler.GPON_PON_DISCOVER_ONU
- name = 'pon-discover-onu-{}'.format(self._pon_id)
+ if self._in_sync:
+ data = json.dumps({'pon-id': self._pon_id})
+ uri = AdtranOltHandler.GPON_PON_DISCOVER_ONU
+ name = 'pon-discover-onu-{}'.format(self._pon_id)
- self._discovery_deferred = self._parent.rest_client.request('POST', uri, data, name=name)
- self._discovery_deferred.addBoth(self._onu_discovery_init_complete)
+ self._discovery_deferred = self._parent.rest_client.request('POST', uri, data, name=name)
+ self._discovery_deferred.addBoth(self._onu_discovery_init_complete)
+ else:
+ self.discovery_deferred = reactor.callLater(0,
+ self._onu_discovery_init_complete,
+ None)
def _onu_discovery_init_complete(self, _):
"""
This method is called after the REST POST to request ONU discovery is
completed. The results (body) of the post is always empty / 204 NO CONTENT
"""
- # Reschedule
-
delay = self._no_onu_discover_tick if len(self._onus) == 0 else self._discovery_tick
delay += random.uniform(-delay / 10, delay / 10)
-
self._discovery_deferred = reactor.callLater(delay, self._discover_onus)
+ def _sync_hardware(self):
+ if self._state == PonPort.State.RUNNING or self._state == PonPort.State.STOPPED:
+ def read_config(results):
+ self.log.debug('read-config', results=results)
+ config = OltConfig.Pon.decode([results])
+ assert self.pon_id in config, 'sync-pon-not-found-{}'.format(self.pon_id)
+ config = config[self.pon_id]
+ self._in_sync = True
+
+ dl = [defer.succeed(config)] # Forward get_config on to ont SYNC
+
+ if self.enabled != config.enabled:
+ self._in_sync = False
+ dl.append(self._set_pon_config("enabled", self.enabled))
+
+ if self._state == PonPort.State.RUNNING:
+ if self.downstream_fec_enable != config.downstream_fec_enable:
+ self._in_sync = False
+ dl.append(self._set_pon_config("downstream-fec-enable",
+ self.downstream_fec_enable))
+
+ if self.upstream_fec_enable != config.upstream_fec_enable:
+ self._in_sync = False
+ dl.append(self._set_pon_config("upstream-fec-enable",
+ self.upstream_fec_enable))
+
+ if self.deployment_range != config.deployment_range:
+ self._in_sync = False
+ dl.append(self._set_pon_config("deployment-range",
+ self.deployment_range))
+ return defer.gatherResults(dl)
+
+ def sync_onus(results):
+ if self._state == PonPort.State.RUNNING:
+ self.log.debug('sync-pon-results', results=results)
+ assert isinstance(results, list), 'expected-list'
+ assert isinstance(results[0], OltConfig.Pon), 'expected-pon-at-front'
+ hw_onus = results[0].onus
+
+ # ONU's have their own sync task, extra (should be deleted) are
+ # handled here. Missing are handled by normal discovery mechanisms.
+
+ hw_onu_ids = frozenset([onu.onu_id for onu in hw_onus])
+ my_onu_ids = frozenset(self._onu_by_id.keys())
+
+ extra_onus = hw_onu_ids - my_onu_ids
+ dl = [self.delete_onu(onu_id) for onu_id in extra_onus]
+
+ missing_onus = my_onu_ids - hw_onu_ids
+ # TODO: Need to remove from this PONs dicts so discovery and 'Add' work
+ # properly. May be able to just call add_onu?
+
+ return defer.gatherResults(dl, consumeErrors=True)
+
+ def failure(reason, what):
+ self.log.error('hardware-sync-{}-failed'.format(what), reason=reason)
+ self._in_sync = False
+
+ def reschedule(_):
+ delay = self._sync_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
+
+ self._sync_deferred = self._get_pon_config()
+ self._sync_deferred.addCallbacks(read_config, failure, errbackArgs=['get-config'])
+ self._sync_deferred.addCallbacks(sync_onus, failure, errbackArgs=['pon-sync'])
+ self._sync_deferred.addBoth(reschedule)
+
def process_status_poll(self, status):
"""
Process PON status poll request
@@ -524,14 +669,63 @@
# if len(missing):
# self.log.info('missing-ONUs', missing=missing)
+ # Process discovered ONU list
+
for serial_number in new:
reactor.callLater(0, self.add_onu, serial_number, status)
- # Process discovered ONU list
+ # Process LOS list
+ self._process_los_alarms(frozenset(status.ont_los))
- # TODO: Process LOS list
- # TODO: Process status
- pass
+ # Process ONU info. Note that newly added ONUs will not be processed
+ # until the next pass
+
+ self._update_onu_status(status.onus)
+
+ def _update_onu_status(self, onus):
+ """
+ Process ONU status for this PON
+ :param onus: (dict) onu_id: ONU State
+ """
+ for onu_id, onu_status in onus.iteritems():
+ if onu_id in self._onu_by_id:
+ self._onu_by_id[onu_id].rssi = onu_status.rssi
+ self._onu_by_id[onu_id].equalization_delay = onu_status.equalization_delay
+ self._onu_by_id[onu_id].fiber_length = onu_status.fiber_length
+
+ def _process_los_alarms(self, ont_los):
+ """
+ Walk current LOS and set/clear LOS as appropriate
+ :param ont_los: (frozenset) ONU IDs of ONUs in LOS alarm state
+ """
+ cleared_alarms = self._active_los_alarms - ont_los
+ new_alarms = ont_los - self._active_los_alarms
+
+ def los_alarm(status, _id):
+ alarm = 'LOS'
+ alarm_data = {
+ 'ts': arrow.utcnow().timestamp,
+ 'description': self.olt.alarms.format_description('onu LOS', alarm, status),
+ 'id': self.olt.alarms.format_id(alarm),
+ 'type': AlarmEventType.COMMUNICATION,
+ 'category': AlarmEventCategory.ONT,
+ 'severity': AlarmEventSeverity.MAJOR,
+ 'state': AlarmEventState.RAISED if status else AlarmEventState.CLEARED
+ }
+ context_data = {'onu_id': _id}
+ self.olt.alarms.send_alarm(context_data, alarm_data)
+
+ if len(cleared_alarms) > 0 or len(new_alarms) > 0:
+ self.log.info('onu-los', cleared=cleared_alarms, new=new_alarms)
+
+ for onu_id in cleared_alarms:
+ # TODO: test 'clear' of LOS alarm when you delete an ONU in LOS
+ self._active_los_alarms.remove(onu_id)
+ los_alarm(False, onu_id)
+
+ for onu_id in new_alarms:
+ self._active_los_alarms.add(onu_id)
+ los_alarm(True, onu_id)
def _process_status_onu_list(self, onus):
"""
@@ -571,7 +765,7 @@
new_onus = discovered_onus - my_onus
# TODO: Remove later if not needed -> missing_onus = my_onus - discovered_onus
- return new_onus, None # , missing_onus
+ return new_onus, None # , missing_onus
def _get_onu_info(self, serial_number):
"""
@@ -580,10 +774,12 @@
:return: (dict) onu config data or None on lookup failure
"""
try:
- from flow.demo_data import get_tconts, get_gem_ports
+ from flow.demo_data import get_tconts, get_gem_ports, get_onu_id
if self.activation_method == "autoactivate":
- onu_id = self.get_next_onu_id()
+ onu_id = get_onu_id(serial_number)
+ if onu_id is None:
+ onu_id = self.get_next_onu_id()
enabled = True
channel_speed = 0
tconts = get_tconts(serial_number, onu_id)
@@ -594,6 +790,7 @@
gpon_info = self.olt.get_xpon_info(self.pon_id)
try:
+ # TODO: Change iteration to itervalues below
vont_info = next(info for _, info in gpon_info['v-ont-anis'].items()
if info.get('expected-serial-number') == serial_number)
@@ -616,6 +813,7 @@
return None
onu_info = {
+ 'device-id': self.olt.device_id,
'serial-number': serial_number,
'xpon-name': None,
'pon': self,
@@ -625,7 +823,8 @@
'password': Onu.DEFAULT_PASSWORD,
't-conts': tconts,
'gem-ports': gem_ports,
- 'onu-vid': self.olt.get_channel_id(self._pon_id, onu_id)
+ 'onu-vid': self.olt.get_channel_id(self._pon_id, onu_id),
+ 'channel-id': self.olt.get_channel_id(self._pon_id, onu_id)
}
# Hold off ONU activation until at least one GEM Port is defined.
@@ -637,7 +836,7 @@
@inlineCallbacks
def add_onu(self, serial_number, status):
- self.log.info('add-ONU', serial_number=serial_number)
+ self.log.info('add-onu', serial_number=serial_number, status=status)
if serial_number not in status.onus:
# Newly found and not enabled ONU, enable it now if not at max
@@ -662,21 +861,22 @@
try:
tconts = onu_info['t-conts']
gem_ports = onu_info['gem-ports']
+
+ # Add Multicast to PON on a per-ONU basis until xPON multicast support is ready
+ # In xPON/BBF, mcast gems tie back to the channel-pair
+ # MCAST VLAN IDs stored as a negative value
+
+ for id_or_vid, gem_port in gem_ports.iteritems(): # TODO: Deprecate this when BBF ready
+ if gem_port.multicast:
+ self.add_mcast_gem_port(gem_port, -id_or_vid)
+
yield onu.create(tconts, gem_ports)
self.activate_onu(onu)
- if len(self._pon_evc_map) > 0:
- # Add gem-id's to maps
- dl = []
- for evc_map in self._pon_evc_map.itervalues():
- dl = evc_map.add_onu(onu)
-
- yield defer.gatherResults(dl)
-
except Exception as e:
del self._onus[serial_number]
del self._onu_by_id[onu.onu_id]
- self.log.exception('add_onu', serial_number=serial_number, e=e)
+ self.log.exception('add-onu', serial_number=serial_number, e=e)
def activate_onu(self, onu):
"""
@@ -705,7 +905,7 @@
self._next_onu_id += 1
if self._next_onu_id > Onu.MAX_ONU_ID:
- self._next_onu_id = Onu.MIN_ONU_ID
+ self._next_onu_id = Onu.MIN_ONU_ID + 128
if onu_id not in used_ids:
return onu_id
@@ -728,34 +928,41 @@
except Exception as e:
self.log.exception('onu', serial_number=onu.serial_number, e=e)
- if onu is not None and len(self._pon_evc_map) > 0:
- # Drop gem-id's from any existing maps
- dl = []
- for evc_map in self._pon_evc_map.itervalues():
- dl = evc_map.remove_onu(onu)
- try:
- yield defer.gatherResults(dl)
+ if onu is not None:
+ # Clean up adapter agent of this ONU
- except Exception as e:
- self.log.exception('maps', serial_number=onu.serial_number, e=e)
+ proxy = Device.ProxyAddress(device_id=self.olt.device_id,
+ channel_id=onu.channel_id)
+ onu_device = self.olt.adapter_agent.get_child_device_with_proxy_address(proxy)
- # TODO: Need removal from VOLTHA child_device method
+ if onu_device is not None:
+ self.olt.adapter_agent.delete_child_device(self.olt.device_id,
+ onu_device.device_id)
- def add_pon_evc_map(self, evc_map):
- """
- Add an EVC MAP that covers all ONUs on a PON (typically control exception flows)
- :param evc_map: (EVCMap) EVC Map
- """
- assert evc_map.name not in self._pon_evc_map
- self._pon_evc_map[evc_map.name] = evc_map
+ self.olt.adapter_agent.update_child_devices_state(self.olt.device_id,
+ admin_state=AdminState.DISABLED)
- def remove_pon_evc_map(self, evc_map):
+ def delete_child_device(self, parent_device_id, child_device_id):
+ onu_device = self.root_proxy.get('/devices/{}'.format(child_device_id))
+ if onu_device is not None:
+ if onu_device.parent_id == parent_device_id:
+ self.log.debug('deleting-child-device', parent_device_id=parent_device_id,
+ child_device_id=child_device_id)
+ self._remove_node('/devices', child_device_id)
+
+ def add_mcast_gem_port(self, mcast_gem, vlan):
"""
- Remove an EVC MAP that covers all ONUs on a PON (typically control exception flows)
- :param evc_map: (EVCMap) EVC Map
+ Add any new Multicast GEM Ports to the PON
+ :param mcast_gem: (GemPort)
"""
- if evc_map.name in self._pon_evc_map:
- del self._pon_evc_map[evc_map.name]
+ if vlan in self._mcast_gem_ports:
+ return
+
+ assert len(self._mcast_gem_ports) == 0, 'Only 1 MCAST GEMPort until BBF Support'
+ assert 1 <= vlan <= 4095, 'Invalid Multicast VLAN ID'
+ assert len(self.olt.multicast_vlans) == 1, 'Only support 1 MCAST VLAN until BBF Support'
+
+ self._mcast_gem_ports[vlan] = mcast_gem
@inlineCallbacks
def channel_partition(self, name, partition=0, xpon_system=0, operation=None):
diff --git a/voltha/adapters/adtran_olt/tcont.py b/voltha/adapters/adtran_olt/tcont.py
index 3dc4da0..29361a3 100644
--- a/voltha/adapters/adtran_olt/tcont.py
+++ b/voltha/adapters/adtran_olt/tcont.py
@@ -1,4 +1,4 @@
-# Copyright 2017-present Open Networking Foundation
+# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -13,9 +13,11 @@
# limitations under the License.
import structlog
+import json
from enum import Enum
from voltha.protos.bbf_fiber_tcont_body_pb2 import TcontsConfigData
from voltha.protos.bbf_fiber_traffic_descriptor_profile_body_pb2 import TrafficDescriptorProfileData
+from twisted.internet.defer import succeed, inlineCallbacks, returnValue
log = structlog.get_logger()
@@ -34,7 +36,7 @@
self.vont_ani = vont_ani # (string) reference
def __str__(self):
- return "TCont: {}, alloc-id: {}".format(self.name,self.alloc_id)
+ return "TCont: {}, alloc-id: {}".format(self.name, self.alloc_id)
@staticmethod
def create(data, td):
@@ -44,6 +46,41 @@
return TCont(data.alloc_id, td, best_effort=td.best_effort,
name=data.name, ident=data.id, vont_ani=data.interface_reference)
+ @inlineCallbacks
+ def add_to_hardware(self, session, pon_id, onu_id, operation='POST'):
+ from adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_LIST_URI.format(pon_id, onu_id)
+ data = json.dumps({'alloc-id': self.alloc_id})
+ name = 'tcont-create-{}-{}: {}'.format(pon_id, onu_id, self.alloc_id)
+ what = 'tcont'
+
+ try:
+ # For TCONT, only leaf is the key. So only post needed
+ if operation == 'POST':
+ results = yield session.request('POST', uri, data=data, name=name)
+ else:
+ results = succeed('nop')
+
+ if self.traffic_descriptor is not None:
+ what = 'traffic-descriptor'
+ results = yield self.traffic_descriptor.add_to_hardware(session,
+ pon_id, onu_id,
+ self.alloc_id,
+ self.best_effort)
+ except Exception as e:
+ log.exception(what, tcont=self, td=self.traffic_descriptor, e=e)
+ raise
+
+ returnValue(results)
+
+ def remove_from_hardware(self, session, pon_id, onu_id):
+ from adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, onu_id, self.alloc_id)
+ name = 'tcont-delete-{}-{}: {}'.format(pon_id, onu_id, self.alloc_id)
+ return succeed(session.request('DELETE', uri, name=name))
+
class TrafficDescriptor(object):
"""
@@ -129,6 +166,34 @@
}
return val
+ @inlineCallbacks
+ def add_to_hardware(self, session, pon_id, onu_id, alloc_id, best_effort):
+ from adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, onu_id, alloc_id)
+ data = json.dumps({'traffic-descriptor': self.to_dict()})
+ name = 'tcont-td-{}-{}: {}'.format(pon_id, onu_id, alloc_id)
+ try:
+ results = yield session.request('PATCH', uri, data=data, name=name)
+
+ except Exception as e:
+ log.exception('traffic-descriptor', td=self, e=e)
+ raise
+
+ if self.additional_bandwidth_eligibility == \
+ TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
+ if best_effort is None:
+ raise ValueError('TCONT is best-effort but does not define best effort sharing')
+
+ try:
+ results = yield best_effort.add_to_hardware(session, pon_id, onu_id, alloc_id)
+
+ except Exception as e:
+ log.exception('best-effort', best_effort=best_effort, e=e)
+ raise
+
+ returnValue(results)
+
class BestEffort(object):
def __init__(self, bandwidth, priority, weight):
@@ -148,3 +213,12 @@
'weight': self.weight
}
return val
+
+ def add_to_hardware(self, session, pon_id, onu_id, alloc_id, best_effort):
+ from adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, onu_id, alloc_id)
+ data = json.dumps({'best-effort': best_effort.to_dict()})
+ name = 'tcont-best-effort-{}-{}: {}'.format(pon_id, onu_id, alloc_id)
+
+ return session.request('PATCH', uri, data=data, name=name)