netconf client support and disable-enable support
Change-Id: Idd9bbdd15f59783abf3c70745d3a00e00177687e
diff --git a/requirements.txt b/requirements.txt
index c63fc59..7009e4e 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -47,6 +47,8 @@
zmq>=0.0.0
pyzmq>=16.0.2
txZMQ==0.8.0
+ncclient==0.5.3
+xmltodict==0.11.0
dicttoxml
# python-consul>=0.6.1 we need the pre-released version for now, because 0.6.1 does not
diff --git a/voltha/adapters/adtran_olt/README.md b/voltha/adapters/adtran_olt/README.md
index d0344e5..889157f 100644
--- a/voltha/adapters/adtran_olt/README.md
+++ b/voltha/adapters/adtran_olt/README.md
@@ -1,2 +1,30 @@
# Adtran OLT Device Adapter
+To preprovision an Adtran OLT, you will need to provide the IP Address and
+the NETCONF/REST credentials for the device. The NETCONF/REST credentials are an
+extension of the existing **preprovision_olt** command and these are placed after
+entering two dashes '_--_'. The full syntax to use is.
+
+| Short | Long | Default | Notes
+| :---: + :-----------: + :-----: + -----
+| -u | --nc_username | '' | NETCONF username
+| -p | --nc_password | '' | NETCONF Password
+| -t | --nc_port | 830 | NETCONF TCP Port
+| -U | --rc_username | '' | REST USERNAME
+| -P | --rc_password | '' | REST PASSWORD
+| -T | --rc_port | 8081 | REST PORT
+
+For example, if your Adtran OLT is address 10.17.174.193 with the default TCP ports and
+NETCONF credentials of admin/admin and REST credentials of ADMIN/ADMIN, the command line
+would be
+
+```bash
+ preprovision_olt -t adtran_olt -i 10.17.174.193 -- -u admin -p admin -U ADMIN -P ADMIN
+```
+or
+```bash
+ preprovision_olt -t adtran_olt -i 10.17.174.193 -- --nc_username admin --nc_password admin --rc_username ADMIN --rc_password ADMIN
+```
+
+Currently the Adtran Device Adapter will enable all PON ports on startup and attempt to activate any discovered ONUs.
+This behaviour will change once PON Management is fully supported.
\ No newline at end of file
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index fe8370a..e61eb8d 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -16,15 +16,18 @@
"""
Adtran generic VOLTHA device handler
"""
+import argparse
import datetime
import pprint
+import shlex
+import time
import arrow
-import re
import structlog
from twisted.internet import reactor, defer
-from twisted.internet.defer import inlineCallbacks
+from twisted.internet.defer import inlineCallbacks, returnValue
+from voltha.adapters.adtran_olt.net.adtran_netconf import AdtranNetconfClient
from voltha.adapters.adtran_olt.net.adtran_rest import AdtranRestClient
from voltha.protos import third_party
from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
@@ -36,6 +39,8 @@
OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
from voltha.registry import registry
+from common.utils.asleep import asleep
+
_ = third_party
@@ -68,6 +73,9 @@
# HTTP shortcuts
HELLO_URI = '/restconf/adtran-hello:hello'
+ # RPC XML shortcuts
+ RESTART_RPC = '<system-restart xmlns="urn:ietf:params:xml:ns:yang:ietf-system"/>'
+
def __init__(self, adapter, device_id, username='', password='', timeout=20):
self.adapter = adapter
self.adapter_agent = adapter.adapter_agent
@@ -82,30 +90,45 @@
# Northbound and Southbound ports
self.northbound_ports = {} # port number -> Port
self.southbound_ports = {} # port number -> Port (For PON, use pon-id as key)
- self.management_ports = {} # port number -> Port TODO: Not currently supported
+ # self.management_ports = {} # port number -> Port TODO: Not currently supported
self.num_northbound_ports = None
self.num_southbound_ports = None
- self.num_management_ports = None
+ # self.num_management_ports = None
+
+ self.ip_address = None
+ self.timeout = timeout
+ self.restart_failure_timeout = 5 * 60 # 5 Minute timeout
# REST Client
- self.ip_address = None
self.rest_port = None
- self.rest_timeout = timeout
self.rest_username = username
self.rest_password = password
self.rest_client = None
+ # NETCONF Client
+ self.netconf_port = None
+ self.netconf_username = username
+ self.netconf_password = password
+ self.netconf_client = None
+
# Heartbeat support
self.heartbeat_count = 0
self.heartbeat_miss = 0
- self.heartbeat_interval = 10 # TODO: Decrease before release
+ self.heartbeat_interval = 10 # TODO: Decrease before release or any scale testing
self.heartbeat_failed_limit = 3
self.heartbeat_timeout = 5
self.heartbeat = None
self.heartbeat_last_reason = ''
- self.max_ports = 1 # TODO: Remove later
+ # Virtualized OLT Support
+ self.is_virtual_olt = False
+
+ # Installed flows
+ self.flow_entries = {} # Flow ID/name -> FlowEntry
+
+ # TODO Remove items below after one PON fully supported and working as expected
+ self.max_ports = 1
def __del__(self):
# Kill any startup or heartbeat defers
@@ -114,65 +137,108 @@
if d is not None:
d.cancel()
+ ldi, self.logical_device_id = self.logical_device_id, None
+
h, self.heartbeat = self.heartbeat, None
if h is not None:
h.cancel()
+ # Remove the logical device
+
+ if ldi is not None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ self.adapter_agent.delete_logical_device(logical_device)
+
self.northbound_ports.clear()
self.southbound_ports.clear()
def __str__(self):
- return "AdtranDeviceHandler: {}:{}".format(self.ip_address, self.rest_port)
+ return "AdtranDeviceHandler: {}".format(self.ip_address)
+
+ def parse_provisioning_options(self, device):
+ if not device.ipv4_address:
+ self.activate_failed(device, 'No ip_address field provided')
+
+ self.ip_address = device.ipv4_address
+
+ #############################################################
+ # Now optional parameters
+
+ def check_tcp_port(value):
+ ivalue = int(value)
+ if ivalue <= 0 or ivalue > 65535:
+ raise argparse.ArgumentTypeError("%s is a not a valid port number" % value)
+ return ivalue
+
+ parser = argparse.ArgumentParser(description='Adtran Device Adapter')
+ parser.add_argument('--nc_username', '-u', action='store', default='hsvroot', help='NETCONF username')
+ parser.add_argument('--nc_password', '-p', action='store', default='BOSCO', help='NETCONF Password')
+ parser.add_argument('--nc_port', '-t', action='store', default=830, type=check_tcp_port,
+ help='NETCONF TCP Port')
+ parser.add_argument('--rc_username', '-U', action='store', default='ADMIN', help='REST username')
+ parser.add_argument('--rc_password', '-P', action='store', default='PASSWORD', help='REST Password')
+ parser.add_argument('--rc_port', '-T', action='store', default=8081, type=check_tcp_port,
+ help='REST TCP Port')
+
+ try:
+ args = parser.parse_args(shlex.split(device.extra_args))
+
+ self.netconf_username = args.nc_username
+ self.netconf_password = args.nc_password
+ self.netconf_port = args.nc_port
+
+ self.rest_username = args.rc_username
+ self.rest_password = args.rc_password
+ self.rest_port = args.rc_port
+
+ except argparse.ArgumentError as e:
+ self.activate_failed(device,
+ 'Invalid arguments: {}'.format(e.message),
+ reachable=False)
+ except Exception as e:
+ self.log.exception('parsing error: {}'.format(e.message))
@inlineCallbacks
- def activate(self, device):
+ def activate(self, device, reconciling=False):
"""
Activate the OLT device
:param device: A voltha.Device object, with possible device-type
- specific extensions.
+ specific extensions.
+ :param reconciling: If True, this adapter is taking over for a previous adapter
+ for an existing OLT
"""
- self.log.info('AdtranDeviceHandler.activating', device=device)
+ self.log.info('AdtranDeviceHandler.activating', device=device, reconciling=reconciling)
if self.logical_device_id is None:
- if not device.host_and_port:
- self.activate_failed(device, 'No host_and_port field provided')
-
- pattern = '(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
- info = re.match(pattern, device.host_and_port)
-
- if not info or len(info.group('host')) == 0 or len(info.group('port')) == 0 or \
- (int(info.group('port')) if info.group('port') else None) is None:
- self.activate_failed(device, 'Invalid Host or Port provided',
- reachable=False)
-
- self.ip_address = str(info.group('host'))
- self.rest_port = int(info.group('port'))
+ # Parse our command line options for this device
+ self.parse_provisioning_options(device)
############################################################################
# Start initial discovery of RESTCONF support (if any)
- self.rest_client = AdtranRestClient(self.ip_address,
- self.rest_port,
- self.rest_username,
- self.rest_password,
- self.rest_timeout)
+
try:
- # content: (dict) Modules from the hello message
-
- self.startup = self.rest_client.request('GET', self.HELLO_URI, name='hello')
-
+ self.startup = self.make_restconf_connection()
results = yield self.startup
self.log.debug('HELLO Contents: {}'.format(pprint.PrettyPrinter().pformat(results)))
+ # See if this is a virtualized OLT. If so, no NETCONF support available
+
+ self.is_virtual_olt = 'module-info' in results and\
+ any(mod.get('module-name', None) == 'adtran-ont-mock'
+ for mod in results['module-info'])
+ if self.is_virtual_olt:
+ self.log.info('*** VIRTUAL OLT detected ***')
+
except Exception as e:
- results = None
self.log.exception('Initial RESTCONF adtran-hello failed', e=e)
self.activate_failed(device, e.message, reachable=False)
############################################################################
- # TODO: Get these six via NETCONF and from the derived class
+ # Start initial discovery of NETCONF support (if any)
+<<<<<<< HEAD
device.model = 'TODO: Adtran PizzaBox, YUM'
device.hardware_version = 'TODO: H/W Version'
device.firmware_version = 'TODO: S/W Version'
@@ -180,11 +246,42 @@
Image(version="TODO: S/W Version")
])
device.serial_number = 'TODO: Serial Number'
+=======
+ if not self.is_virtual_olt:
+ try:
+ self.startup = self.make_netconf_connection()
+ yield self.startup
+>>>>>>> c577acb... netconf client support and disable-enable support
- device.root = True
- device.vendor = 'Adtran, Inc.'
- device.connect_status = ConnectStatus.REACHABLE
- self.adapter_agent.update_device(device)
+ except Exception as e:
+ self.log.exception('Initial NETCONF connection failed', e=e)
+ self.activate_failed(device, e.message, reachable=False)
+
+ ############################################################################
+ # Get the device Information
+
+ if reconciling:
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+ else:
+ try:
+ self.startup = self.get_device_info(device)
+ results = yield self.startup
+
+ device.model = results.get('model', 'unknown')
+ device.hardware_version = results.get('hardware_version', 'unknown')
+ device.firmware_version = results.get('firmware_version', 'unknown')
+ device.software_version = results.get('software_version', 'unknown')
+ device.serial_number = results.get('serial_number', 'unknown')
+
+ device.root = True
+ device.vendor = results.get('vendor', 'Adtran, Inc.')
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ except Exception as e:
+ self.log.exception('Device Information request(s) failed', e=e)
+ self.activate_failed(device, e.message, reachable=False)
try:
# Enumerate and create Northbound NNI interfaces
@@ -195,8 +292,9 @@
self.startup = self.process_northbound_ports(device, results)
yield self.startup
- for port in self.northbound_ports.itervalues():
- self.adapter_agent.add_port(device.id, port.get_port())
+ if not reconciling:
+ for port in self.northbound_ports.itervalues():
+ self.adapter_agent.add_port(device.id, port.get_port())
except Exception as e:
self.log.exception('Northbound port enumeration and creation failed', e=e)
@@ -212,13 +310,15 @@
self.startup = self.process_southbound_ports(device, results)
yield self.startup
- for port in self.southbound_ports.itervalues():
- self.adapter_agent.add_port(device.id, port.get_port())
+ if not reconciling:
+ for port in self.southbound_ports.itervalues():
+ self.adapter_agent.add_port(device.id, port.get_port())
except Exception as e:
self.log.exception('Southbound port enumeration and creation failed', e=e)
self.activate_failed(device, e.message)
+<<<<<<< HEAD
# Complete activation by setting up logical device for this OLT and saving
# off the devices parent_id
@@ -273,25 +373,31 @@
for port in self.northbound_ports.itervalues():
self.startup = port.start()
yield self.startup
+=======
+ if reconciling:
+ if device.admin_state == AdminState.ENABLED:
+ if device.parent_id:
+ self.logical_device_id = device.parent_id
+ self.adapter_agent.reconcile_logical_device(device.parent_id)
+ else:
+ self.log.info('no-logical-device-set')
+>>>>>>> c577acb... netconf client support and disable-enable support
- except Exception as e:
- self.log.exception('Failed to start northbound port(s)', e=e)
- self.activate_failed(device, e.message)
+ # Reconcile child devices
+ self.adapter_agent.reconcile_child_devices(device.id)
+ else:
+ # Complete activation by setting up logical device for this OLT and saving
+ # off the devices parent_id
- try:
- start_downlinks = self.initial_port_state == AdminState.ENABLED
+ self.logical_device_id = self.create_logical_device(device)
- for port in self.southbound_ports.itervalues():
- self.startup = port.start() if start_downlinks else port.stop()
- yield self.startup
+ # Create logical ports for all southbound and northbound interfaces
- except Exception as e:
- self.log.exception('Failed to start southbound port(s)', e=e)
- self.activate_failed(device, e.message)
+ self.create_logical_ports(device, self.logical_device_id, reconciling)
# Complete device specific steps
try:
- self.startup = self.complete_device_specific_activation(device, results)
+ self.startup = self.complete_device_specific_activation(device, reconciling)
if self.startup is not None:
yield self.startup
@@ -303,12 +409,8 @@
self.start_heartbeat(delay=10)
- # Save off logical ID and specify that we active
-
- self.logical_device_id = ld_initialized.id
-
device = self.adapter_agent.get_device(device.id)
- device.parent_id = ld_initialized.id
+ device.parent_id = self.logical_device_id
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
@@ -331,6 +433,127 @@
self.adapter_agent.update_device(device)
raise RuntimeError('Failed to activate OLT: {}'.format(device.reason))
+ def make_netconf_connection(self, connect_timeout=None):
+ ############################################################################
+ # Start initial discovery of NETCONF support
+
+ if self.netconf_client is None:
+ self.netconf_client = AdtranNetconfClient(self.ip_address,
+ self.netconf_port,
+ self.netconf_username,
+ self.netconf_password,
+ self.timeout)
+ if self.netconf_client.connected:
+ return defer.returnValue(True)
+
+ timeout = connect_timeout or self.timeout
+ return self.netconf_client.connect(timeout)
+
+ def make_restconf_connection(self, get_timeout=None):
+ if self.rest_client is None:
+ self.rest_client = AdtranRestClient(self.ip_address,
+ self.rest_port,
+ self.rest_username,
+ self.rest_password,
+ self.timeout)
+
+ timeout = get_timeout or self.timeout
+ return self.rest_client.request('GET', self.HELLO_URI, name='hello', timeout=timeout)
+
+ def create_logical_device(self, device):
+ ld = LogicalDevice(
+ # NOTE: not setting id and datapath_id will let the adapter agent pick id
+ desc=ofp_desc(mfr_desc=device.vendor,
+ hw_desc=device.hardware_version,
+ sw_desc=device.software_version,
+ serial_num=device.serial_number,
+ dp_desc='n/a'),
+ switch_features=ofp_switch_features(n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=(
+ # OFPC_FLOW_STATS | # TODO: Enable if we support it
+ # OFPC_TABLE_STATS | # TODO: Enable if we support it
+ # OFPC_GROUP_STATS | # TODO: Enable if we support it
+ OFPC_PORT_STATS)),
+ root_device_id=device.id)
+
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
+
+ return ld_initialized
+
+ @inlineCallbacks
+ def create_logical_ports(self, device, ld_initialized, reconciling):
+
+ if not reconciling:
+ for port in self.northbound_ports.itervalues():
+ lp = port.get_logical_port()
+ if lp is not None:
+ self.adapter_agent.add_logical_port(ld_initialized.id, lp)
+
+ for port in self.southbound_ports.itervalues():
+ lp = port.get_logical_port()
+ if lp is not None:
+ self.adapter_agent.add_logical_port(ld_initialized.id, lp)
+
+ # Set the ports in a known good initial state
+ try:
+ for port in self.northbound_ports.itervalues():
+ self.startup = port.reset()
+ results = yield self.startup
+ self.log.debug('Northbound Port reset results', results=results)
+
+ except Exception as e:
+ self.log.exception('Failed to reset northbound ports to known good initial state', e=e)
+ self.activate_failed(device, e.message)
+
+ try:
+ for port in self.southbound_ports.itervalues():
+ self.startup = port.reset()
+ results = yield self.startup
+ self.log.debug('Southbound Port reset results', results=results)
+
+ except Exception as e:
+ self.log.exception('Failed to reset southbound ports to known good initial state', e=e)
+ self.activate_failed(device, e.message)
+
+ # Start/stop the interfaces as needed
+ try:
+ for port in self.northbound_ports.itervalues():
+ self.startup = port.start()
+ results = yield self.startup
+ self.log.debug('Northbound Port start results', results=results)
+
+ except Exception as e:
+ self.log.exception('Failed to start northbound port(s)', e=e)
+ self.activate_failed(device, e.message)
+
+ try:
+ if reconciling:
+ start_downlinks = device.admin_state == AdminState.ENABLED
+ else:
+ start_downlinks = self.initial_port_state == AdminState.ENABLED
+
+ for port in self.southbound_ports.itervalues():
+ self.startup = port.start() if start_downlinks else port.stop()
+ results = yield self.startup
+ self.log.debug('Southbound Port start results', results=results)
+
+ except Exception as e:
+ self.log.exception('Failed to start southbound port(s)', e=e)
+ self.activate_failed(device, e.message)
+
+ @inlineCallbacks
+ def device_information(self, device):
+ """
+ Examine the various managment models and extract device information for
+ VOLTHA use
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
@inlineCallbacks
def enumerate_northbound_ports(self, device):
"""
@@ -396,7 +619,7 @@
yield defer.Deferred(lambda c: c.callback("Not Required"))
@inlineCallbacks
- def complete_device_specific_activation(self, _device, _content):
+ def complete_device_specific_activation(self, _device, _reconciling):
return None
def deactivate(self, device):
@@ -409,6 +632,342 @@
if h is not None:
h.cancel()
+ # TODO: What else (delete logical device, ???)
+
+ @inlineCallbacks
+ def disable(self):
+ """
+ This is called when a previously enabled device needs to be disabled based on a NBI call.
+ """
+ self.log.info('disabling', device_id=self.device_id)
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Suspend any active healthchecks / pings
+
+ h, self.heartbeat = self.heartbeat, None
+
+ if h is not None:
+ h.cancel()
+
+ # Update the operational status to UNKNOWN
+
+ device.oper_status = OperStatus.UNKNOWN
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Remove the logical device
+ ldi, self.logical_device_id = self.logical_device_id, None
+
+ if ldi is not None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ # Disable all child devices first
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ admin_state=AdminState.DISABLED)
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ # Set all ports to disabled
+ self.adapter_agent.disable_all_ports(self.device_id)
+
+ for port in self.northbound_ports.itervalues():
+ port.stop()
+
+ for port in self.southbound_ports.itervalues():
+ port.stop()
+
+ # Disable all flows TODO: Do we want to delete them?
+ # TODO: Use bulk methods if possible
+
+ for flow in self.flow_entries.itervalues():
+ flow.disable()
+
+ # Shutdown communications with OLT
+
+ if self.netconf_client is not None:
+ try:
+ yield self.netconf_client.close()
+ except Exception as e:
+ self.log.exception('NETCONF client shutdown failed', e=e)
+
+ def _null_clients():
+ self.netconf_client = None
+ self.rest_client = None
+
+ reactor.callLater(0, _null_clients)
+
+ self.log.info('disabled', device_id=device.id)
+
+ @inlineCallbacks
+ def reenable(self):
+ """
+ This is called when a previously disabled device needs to be enabled based on a NBI call.
+ """
+ self.log.info('re-enabling', device_id=self.device_id)
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Set all ports to enabled
+ self.adapter_agent.enable_all_ports(self.device_id)
+
+ try:
+ yield self.make_restconf_connection()
+
+ except Exception as e:
+ self.log.exception('RESTCONF adtran-hello reconnect failed', e=e)
+ # TODO: What is best way to handle reenable failure?
+
+ if not self.is_virtual_olt:
+ try:
+ yield self.make_netconf_connection()
+
+ except Exception as e:
+ self.log.exception('NETCONF re-connection failed', e=e)
+ # TODO: What is best way to handle reenable failure?
+
+ # Recreate the logical device
+
+ ld_initialized = self.create_logical_device(device)
+
+ # Create logical ports for all southbound and northbound interfaces
+
+ self.create_logical_ports(device, ld_initialized, False)
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld_initialized.id
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+ self.logical_device_id = ld_initialized.id
+
+ # Reenable all child devices
+ self.adapter_agent.update_child_devices_state(device.id,
+ admin_state=AdminState.ENABLED)
+
+ for port in self.northbound_ports.itervalues():
+ port.start()
+
+ for port in self.southbound_ports.itervalues():
+ port.start()
+
+ # TODO:
+ # 1) Restart health check / pings
+
+ # Enable all flows
+ # TODO: Use bulk methods if possible
+
+ for flow in self.flow_entries:
+ flow.enable()
+
+ self.log.info('re-enabled', device_id=device.id)
+
+ @inlineCallbacks
+ def reboot(self):
+ """
+ This is called to reboot a device based on a NBI call. The admin state of the device
+ will not change after the reboot.
+ """
+ self.log.debug('reboot')
+
+ # Update the operational status to ACTIVATING and connect status to
+ # UNREACHABLE
+
+ device = self.adapter_agent.get_device(self.device_id)
+ previous_oper_status = device.oper_status
+ previous_conn_status = device.connect_status
+ device.oper_status = OperStatus.ACTIVATING
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Update the child devices connect state to UNREACHABLE
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
+ # Issue reboot command
+
+ if not self.is_virtual_olt:
+ try:
+ yield self.netconf_client.rpc(AdtranDeviceHandler.RESTART_RPC)
+
+ except Exception as e:
+ self.log.exception('NETCONF client shutdown', e=e)
+ # TODO: On failure, what is the best thing to do?
+
+ # Shutdown communications with OLT. Typically it takes about 2 seconds
+ # or so after the reply before the restart actually occurs
+
+ try:
+ response = yield self.netconf_client.close()
+ self.log.debug('Restart response XML was: {}'.format('ok' if response.ok else 'bad'))
+
+ except Exception as e:
+ self.log.exception('NETCONF client shutdown', e=e)
+
+ def _null_clients():
+ self.netconf_client = None
+ self.rest_client = None
+
+ yield reactor.callLater(0, _null_clients)
+
+ # Run remainder of reboot process as a new task. The OLT then may be up in a
+ # few moments or may take 3 minutes or more depending on any self tests enabled
+
+ current_time = time.time();
+ timeout = current_time + self.restart_failure_timeout
+
+ self.log('*** Current time is {}, timeout is {}'.format(current_time, timeout))
+
+ yield reactor.callLater(10, self._finish_reboot, timeout,
+ previous_oper_status, previous_conn_status)
+
+ @inlineCallbacks
+ def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
+ # Now wait until REST & NETCONF are re-established or we timeout
+
+ if self.netconf_client is None and not self.is_virtual_olt:
+ self.log.debug('Attempting to restore NETCONF connection')
+ try:
+ response = yield self.make_netconf_connection(connect_timeout=3)
+ self.log.debug('Restart NETCONF connection XML was: {}'.format(response.xml))
+
+ except Exception as e:
+ self.log.debug('No NETCONF connection yet: {}'.format(e.message))
+ try:
+ yield self.netconf_client.close()
+ except Exception as e:
+ self.log.exception(e.message)
+ finally:
+ def _null_netconf():
+ self.log.debug('Nulling out the NETCONF client')
+ self.netconf_client = None
+ reactor.callLater(0, _null_netconf)
+
+ elif self.rest_client is None:
+ self.log.debug('Attempting to restore RESTCONF connection')
+ try:
+ response = yield self.make_restconf_connection(get_timeout=3)
+ self.log.debug('Restart RESTCONF connection XML was: {}'.format(response.xml))
+
+ except Exception:
+ self.log.debug('No RESTCONF connection yet')
+ self.rest_client = None
+
+ if (self.netconf_client is None and not self.is_virtual_olt) or self.rest_client is None:
+ current_time = time.time();
+
+ self.log('Current time is {}, timeout is {}'.format(current_time, timeout))
+
+ if current_time < timeout:
+ self.log.info('Device not responding yet, will try again...')
+ yield reactor.callLater(10, self._finish_reboot, timeout,
+ previous_oper_status, previous_conn_status)
+
+ if self.netconf_client is None and not self.is_virtual_olt:
+ self.log.error('Could not restore NETCONF communications after device RESET')
+ pass # TODO: What is best course of action if cannot get clients back?
+
+ if self.rest_client is None:
+ self.log.error('Could not restore RESTCONF communications after device RESET')
+ pass # TODO: What is best course of action if cannot get clients back?
+
+ # Pause additional 5 seconds to let things OLT microservices complete some more initialization
+
+ yield asleep(5)
+
+ # Get the latest device reference
+
+ device = self.adapter_agent.get_device(self.device_id)
+ device.oper_status = previous_oper_status
+ device.connect_status = previous_conn_status
+ self.adapter_agent.update_device(device)
+
+ # Update the child devices connect state to REACHABLE
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
+
+ # Connect back up to OLT so heartbeats/polls start working again
+ try:
+ yield self.make_restconf_connection()
+
+ except Exception as e:
+ self.log.exception('RESTCONF adtran-hello connect after reboot failed', e=e)
+ # TODO: What is best way to handle reenable failure?
+
+ if not self.is_virtual_olt:
+ try:
+ yield self.make_netconf_connection()
+
+ except Exception as e:
+ self.log.exception('NETCONF re-connection after reboot failed', e=e)
+ # TODO: What is best way to handle reenable failure?
+
+ self.log.info('rebooted', device_id=self.device_id)
+
+ @inlineCallbacks
+ def delete(self):
+ """
+ This is called to delete a device from the PON based on a NBI call.
+ If the device is an OLT then the whole PON will be deleted.
+ """
+ self.log.info('deleting', device_id=self.device_id)
+
+ # Cancel any outstanding tasks
+
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
+ h, self.heartbeat = self.heartbeat, None
+ if h is not None:
+ h.cancel()
+
+ # TODO:
+ # 1) Remove all flows from the device
+
+ self.flow_entries.clear()
+
+ # Remove all child devices
+ self.adapter_agent.delete_all_child_devices(self.device_id)
+
+ # Remove the logical device
+ logical_device = self.adapter_agent.get_logical_device(self.logical_device_id)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ # Tell all ports to stop any background processing
+
+ for port in self.northbound_ports.itervalues():
+ port.delete()
+
+ for port in self.southbound_ports.itervalues():
+ port.delete()
+
+ self.northbound_ports.clear()
+ self.southbound_ports.clear()
+
+ # Shutdown communications with OLT
+
+ if self.netconf_client is not None:
+ try:
+ yield self.netconf_client.close()
+ except Exception as e:
+ self.log.exception('NETCONF client shutdown', e=e)
+
+ self.netconf_client = None
+
+ self.rest_client = None
+
+ self.log.info('deleted', device_id=self.device_id)
+
@inlineCallbacks
def get_device_info(self, device):
"""
@@ -422,8 +981,15 @@
specific extensions. Such extensions shall be described as part of
the device type specification returned by device_types().
"""
- pass
- return None # raise NotImplementedError('TODO: You should override this in your derived class???')
+ device = {}
+ # device['model'] = 'TODO: Adtran PizzaBox, YUM'
+ # device['hardware_version'] = 'TODO: H/W Version'
+ # device['firmware_version'] = 'TODO: S/W Version'
+ # device['software_version'] = 'TODO: S/W Version'
+ # device['serial_number'] = 'TODO: Serial Number'
+ # device['vendor'] = 'Adtran, Inc.'
+
+ returnValue(device)
def start_heartbeat(self, delay=10):
assert delay > 1
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index 3f23288..32d34fd 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -68,7 +68,6 @@
:return: (None or Deferred)
"""
- log.debug('starting', interface=self.interface)
log.info('started', interface=self.interface)
def stop(self):
@@ -78,7 +77,6 @@
:return: (None or Deferred)
"""
- log.debug('stopping', interface=self.interface)
log.info('stopped', interface=self.interface)
def adapter_descriptor(self):
@@ -140,16 +138,21 @@
reactor.callLater(0, self.devices_handlers[device.id].activate, device)
return device
-
def reconcile_device(self, device):
"""
- TODO: Is invoked whenever a Voltha instance is started using data
- from a failed instance.
- :param device:
- :return:
- """
- raise NotImplementedError()
+ Make sure the adapter looks after given device. Called when this device has
+ changed ownership from another Voltha instance to this one (typically, this
+ occurs when the previous voltha instance went down).
+ :param device: A voltha.Device object, with possible device-type specific
+ extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+ log.info('reconcile-device', device=device)
+ self.devices_handlers[device.id] = AdtranOltHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].activate, device, reconciling=True)
+ return device
def abandon_device(self, device):
"""
@@ -160,12 +163,13 @@
:return: (Deferred) Shall be fired to acknowledge abandonment.
"""
log.info('abandon-device', device=device)
- handler = self.devices_handlers.pop(device.id)
-
- if handler is not None:
- reactor.callLater(0, handler.deactivate, device)
-
- return device
+ raise NotImplementedError()
+ # handler = self.devices_handlers.pop(device.id)
+ #
+ # if handler is not None:
+ # reactor.callLater(0, handler.deactivate, device)
+ #
+ # return device
def disable_device(self, device):
"""
@@ -176,7 +180,8 @@
:return: (Deferred) Shall be fired to acknowledge disabling the device.
"""
log.debug('disable_device', device=device)
- raise NotImplementedError()
+ reactor.callLater(0, self.devices_handlers[device.id].disable)
+ return device
def reenable_device(self, device):
"""
@@ -187,7 +192,8 @@
:return: (Deferred) Shall be fired to acknowledge re-enabling the device.
"""
log.debug('reenable_device', device=device)
- raise NotImplementedError()
+ reactor.callLater(0, self.devices_handlers[device.id].reenable)
+ return device
def reboot_device(self, device):
"""
@@ -198,7 +204,8 @@
:return: (Deferred) Shall be fired to acknowledge the reboot.
"""
log.info('reboot_device', device=device)
- raise NotImplementedError()
+ reactor.callLater(0, self.devices_handlers[device.id].reboot)
+ return device
def self_test_device(self, device):
"""
@@ -218,7 +225,8 @@
:return: (Deferred) Shall be fired to acknowledge the deletion.
"""
log.info('delete_device', device=device)
- raise NotImplementedError()
+ reactor.callLater(0, self.devices_handlers[device.id].delete)
+ return device
def get_device_details(self, device):
"""
@@ -245,7 +253,7 @@
log.info('bulk-flow-update', device_id=device.id, flows=flows,
groups=groups)
assert len(groups.items) == 0, "Cannot yet deal with groups"
- raise NotImplementedError()
+
handler = self.devices_handlers[device.id]
return handler.update_flow_table(flows.items, device)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index f037565..03046b4 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -22,14 +22,11 @@
from adtran_device_handler import AdtranDeviceHandler
from codec.olt_state import OltState
+from flow.flow_entry import FlowEntry
from net.adtran_zmq import AdtranZmqClient
from voltha.extensions.omci.omci import *
-from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.common_pb2 import AdminState
from voltha.protos.device_pb2 import Device
-from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE
-
-
-# from ncclient import manager
class AdtranOltHandler(AdtranDeviceHandler):
@@ -85,7 +82,48 @@
AdtranDeviceHandler.__del__(self)
def __str__(self):
- return "AdtranOltHandler: {}:{}".format(self.ip_address, self.rest_port)
+ return "AdtranOltHandler: {}".format(self.ip_address)
+
+ @inlineCallbacks
+ def get_device_info(self, device):
+ """
+ Perform an initial network operation to discover the device hardware
+ and software version. Serial Number would be helpful as well.
+
+ Upon successfully retrieving the information, remember to call the
+ 'start_heartbeat' method to keep in contact with the device being managed
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ """
+ from codec.physical_entities_state import PhysicalEntitiesState
+
+ device = {}
+
+ if self.is_virtual_olt:
+ returnValue(device)
+
+ pe_state = PhysicalEntitiesState(self.netconf_client)
+ self.startup = pe_state.get_state()
+ results = yield self.startup
+
+ modules = pe_state.get_physical_entities('adtn-phys-mod:module')
+ if isinstance(modules, list):
+ module = modules[0]
+ name = str(module['model-name']).translate(None, '?')
+ model = str(module['model-number']).translate(None, '?')
+
+ device['model'] = '{} - {}'.format(name, model) if len(name) > 0 else \
+ module['parent-entity']
+ device['hardware_version'] = str(module['hardware-revision']).translate(None, '?')
+ device['serial_number'] = str(module['serial-number']).translate(None, '?')
+ device['vendor'] = 'Adtran, Inc.'
+ software = module['software']['software']
+ device['firmware_version'] = str(software['startup-revision']).translate(None, '?')
+ device['software_version'] = str(software['running-revision']).translate(None, '?')
+
+ returnValue(device)
@inlineCallbacks
def enumerate_northbound_ports(self, device):
@@ -96,40 +134,24 @@
specific extensions.
:return: (Deferred or None).
"""
- # TODO: For now, hard code some JSON. Eventually will be XML from NETConf
+ try:
+ from codec.ietf_interfaces import IetfInterfacesState
+ from nni_port import MockNniPort
- ports = [
- {'port_no': 1,
- 'admin_state': AdminState.ENABLED,
- 'oper_status': OperStatus.ACTIVE,
- 'ofp_state': OFPPS_LIVE,
- 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
- 'current_speed': OFPPF_100GB_FD,
- 'max_speed': OFPPF_100GB_FD},
- {'port_no': 2,
- 'admin_state': AdminState.ENABLED,
- 'oper_status': OperStatus.ACTIVE,
- 'ofp_state': OFPPS_LIVE,
- 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
- 'current_speed': OFPPF_100GB_FD,
- 'max_speed': OFPPF_100GB_FD},
- {'port_no': 3,
- 'admin_state': AdminState.ENABLED,
- 'oper_status': OperStatus.ACTIVE,
- 'ofp_state': OFPPS_LIVE,
- 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
- 'current_speed': OFPPF_100GB_FD,
- 'max_speed': OFPPF_100GB_FD},
- {'port_no': 4,
- 'admin_state': AdminState.ENABLED,
- 'oper_status': OperStatus.ACTIVE,
- 'ofp_state': OFPPS_LIVE,
- 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
- 'current_speed': OFPPF_100GB_FD,
- 'max_speed': OFPPF_100GB_FD}
- ]
+ if self.is_virtual_olt:
+ results = MockNniPort.get_nni_port_state_results()
+ else:
+ ietf_interfaces = IetfInterfacesState(self.netconf_client)
+ self.startup = ietf_interfaces.get_state()
+ results = yield self.startup
- yield returnValue(ports)
+ ports = ietf_interfaces.get_nni_port_entries(results)
+
+ yield returnValue(ports)
+
+ except Exception as e:
+ log.exception('enumerate_northbound_ports', e=e)
+ raise
def process_northbound_ports(self, device, results):
"""
@@ -141,14 +163,15 @@
you implemented. The type and contents are up to you to
:return: (Deferred or None).
"""
- from nni_port import NniPort
+ from nni_port import NniPort, MockNniPort
for port in results:
port_no = port['port_no']
self.log.info('Processing northbound port {}/{}'.format(port_no, port['port_no']))
assert port_no
assert port_no not in self.northbound_ports
- self.northbound_ports[port_no] = NniPort(self, **port)
+ self.northbound_ports[port_no] = NniPort(self, **port) if not self.is_virtual_olt \
+ else MockNniPort(self, **port)
self.num_northbound_ports = len(self.northbound_ports)
@@ -182,11 +205,9 @@
from pon_port import PonPort
for pon in results:
-
# Number PON Ports after the NNI ports
pon_id = pon['pon-id']
log.info('Processing pon port {}'.format(pon_id))
-
assert pon_id not in self.southbound_ports
admin_state = AdminState.ENABLED if pon.get('enabled',
@@ -198,13 +219,12 @@
admin_state=admin_state)
# TODO: For now, limit number of PON ports to make debugging easier
-
if len(self.southbound_ports) >= self.max_ports:
break
self.num_southbound_ports = len(self.southbound_ports)
- def complete_device_specific_activation(self, device, results):
+ def complete_device_specific_activation(self, device, reconciling):
"""
Perform an initial network operation to discover the device hardware
and software version. Serial Number would be helpful as well.
@@ -215,9 +235,8 @@
specific extensions. Such extensions shall be described as part of
the device type specification returned by device_types().
- :param results: (dict) original adtran-hello RESTCONF results body
+ :param reconciling: (boolean) True if taking over for another VOLTHA
"""
- #
# For the pizzabox OLT, periodically query the OLT state of all PONs. This
# is simpler then having each PON port do its own poll. From this, we can:
#
@@ -228,16 +247,54 @@
# o TODO Update some PON level statistics
self.zmq_client = AdtranZmqClient(self.ip_address, self.rx_packet)
- # self.nc_client = manager.connect(host='', # self.ip_address,
- # username=self.rest_username,
- # password=self.rest_password,
- # hostkey_verify=False,
- # allow_agent=False,
- # look_for_keys=False)
-
self.status_poll = reactor.callLater(1, self.poll_for_status)
return None
+ def disable(self):
+ c, self.zmq_client = self.zmq_client, None
+ if c is not None:
+ c.shutdown()
+
+ d, self.status_poll = self.status_poll, None
+ if d is not None:
+ d.cancel()
+
+ super(AdtranOltHandler, self).disable()
+
+ def reenable(self):
+ super(AdtranOltHandler, self).reenable()
+
+ self.zmq_client = AdtranZmqClient(self.ip_address, self.rx_packet)
+ self.status_poll = reactor.callLater(1, self.poll_for_status)
+
+ def reboot(self):
+ c, self.zmq_client = self.zmq_client, None
+ if c is not None:
+ c.shutdown()
+
+ d, self.status_poll = self.status_poll, None
+ if d is not None:
+ d.cancel()
+
+ super(AdtranOltHandler, self).reboot()
+
+ def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
+ super(AdtranOltHandler, self)._finish_reboot(timeout, previous_oper_status, previous_conn_status)
+
+ self.zmq_client = AdtranZmqClient(self.ip_address, self.rx_packet)
+ self.status_poll = reactor.callLater(1, self.poll_for_status)
+
+ def delete(self):
+ c, self.zmq_client = self.zmq_client, None
+ if c is not None:
+ c.shutdown()
+
+ d, self.status_poll = self.status_poll, None
+ if d is not None:
+ d.cancel()
+
+ super(AdtranOltHandler, self).delete()
+
def rx_packet(self, message):
try:
self.log.info('rx_Packet: Message from ONU')
@@ -314,7 +371,18 @@
@inlineCallbacks
def update_flow_table(self, flows, device):
self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
- raise NotImplementedError('TODO: Not yet implemented')
+
+ for flow in flows:
+ # TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
+
+ flow_entry = FlowEntry.create(flow, self)
+
+ if flow_entry is not None:
+ flow_entry.install()
+
+ if flow_entry.name not in self.flow_entries:
+ # TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
+ self.flow_entries[flow_entry.name] = flow_entry
@inlineCallbacks
def send_proxied_message(self, proxy_address, msg):
diff --git a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
new file mode 100644
index 0000000..e041040
--- /dev/null
+++ b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
@@ -0,0 +1,283 @@
+from twisted.internet.defer import inlineCallbacks, returnValue
+import xmltodict
+import structlog
+from voltha.protos.openflow_13_pb2 import OFPPF_1GB_FD, OFPPF_10GB_FD, OFPPF_40GB_FD, OFPPF_100GB_FD
+from voltha.protos.openflow_13_pb2 import OFPPF_FIBER, OFPPF_COPPER
+from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPC_PORT_DOWN, OFPPS_LINK_DOWN, OFPPF_OTHER
+from voltha.protos.common_pb2 import OperStatus, AdminState
+
+log = structlog.get_logger()
+
+_ietf_interfaces_config_rpc = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface/>
+ </interfaces>
+ </filter>
+"""
+
+_ietf_interfaces_state_rpc = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface>
+ <name/>
+ <type/>
+ <admin-status/>
+ <oper-status/>
+ <last-change/>
+ <phys-address/>
+ <speed/>
+ </interface>
+ </interfaces-state>
+ </filter>
+"""
+
+_allowed_with_default_types = ['report-all', 'report-all-tagged', 'trim', 'explicit']
+
+# TODO: Centralize the item below as a function in a core util module
+
+
+def _with_defaults(default_type=None):
+ if default_type is None:
+ return ""
+
+ assert(default_type in _allowed_with_default_types)
+ return """
+ <with-defaults xmlns="urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults">
+ {}</with-defaults>""".format(default_type)
+
+
+class IetfInterfacesConfig(object):
+ def __init__(self, session):
+ self._session = session
+
+ @inlineCallbacks
+ def get_config(self, source='running', with_defaults=None):
+
+ filter = _ietf_interfaces_config_rpc + _with_defaults(with_defaults)
+
+ request = self._session.get(source, filter=filter)
+ rpc_reply = yield request
+ returnValue(rpc_reply)
+
+ def get_interfaces(self, rpc_reply, interface_type=None):
+ """
+ Get the physical entities of a particular type
+ :param rpc_reply: Reply from previous get or request
+ :param interface_type: (String or List) The type of interface (case-insensitive)
+ :return: list) of OrderDict interface entries
+ """
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+
+ entries = result_dict['data']['interfaces']
+
+ if interface_type is None:
+ return entries
+
+ for entry in entries:
+ import pprint
+ log.info(pprint.PrettyPrinter(indent=2).pformat(entry))
+
+ def _matches(entry, value):
+ if 'type' in entry and '#text' in entry['type']:
+ text_val = entry['type']['#text'].lower()
+ if isinstance(value, list):
+ return any(v.lower() in text_val for v in value)
+ return value.lower() in text_val
+ return False
+
+ return [entry for entry in entries if _matches(entry, interface_type)]
+
+
+class IetfInterfacesState(object):
+ def __init__(self, session):
+ self._session = session
+
+ @inlineCallbacks
+ def get_state(self):
+ try:
+ request = self._session.get(_ietf_interfaces_state_rpc)
+ rpc_reply = yield request
+ returnValue(rpc_reply)
+
+ except Exception as e:
+ log.exception('get_state', e=e)
+ raise
+
+ @staticmethod
+ def get_interfaces(self, rpc_reply, key='type', key_value=None):
+ """
+ Get the physical entities of a particular type
+ :param key_value: (String or List) The type of interface (case-insensitive)
+ :return: list) of OrderDict interface entries
+ """
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['interfaces-state']['interface']
+
+ if key_value is None:
+ return entries
+
+ for entry in entries:
+ import pprint
+ log.info(pprint.PrettyPrinter(indent=2).pformat(entry))
+
+ def _matches(entry, key, value):
+ if key in entry and '#text' in entry[key]:
+ text_val = entry[key]['#text'].lower()
+ if isinstance(value, list):
+ return any(v.lower() in text_val for v in value)
+ return value.lower() in text_val
+ return False
+
+ return [entry for entry in entries if _matches(entry, key, key_value)]
+
+ @staticmethod
+ def _get_admin_state(entry):
+ state_map = {
+ 'up': AdminState.ENABLED,
+ 'down': AdminState.DISABLED,
+ 'testing': AdminState.DISABLED
+ }
+ return state_map.get(entry.get('admin-status', 'down'),
+ AdminState.UNKNOWN)
+
+ @staticmethod
+ def _get_oper_status(entry):
+ state_map = {
+ 'up': OperStatus.ACTIVE,
+ 'down': OperStatus.FAILED,
+ 'testing': OperStatus.TESTING,
+ 'unknown': OperStatus.UNKNOWN,
+ 'dormant': OperStatus.DISCOVERED,
+ 'not-present': OperStatus.UNKNOWN,
+ 'lower-layer-down': OperStatus.FAILED
+ }
+ return state_map.get(entry.get('oper-status', 'down'),
+ OperStatus.UNKNOWN)
+
+ @staticmethod
+ def _get_mac_addr(entry):
+ mac_addr = entry.get('phys-address', None)
+ if mac_addr is None:
+ import random
+ # TODO: Get with qumram team about phys addr
+ mac_addr = '08:00:{}{}:{}{}:{}{}:00'.format(random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9))
+ return mac_addr
+
+ @staticmethod
+ def _get_speed_value(entry):
+ speed = entry.get('speed') or IetfInterfacesState._get_speed_via_name(entry.get('name'))
+ if isinstance(speed, str):
+ return long(speed)
+ return speed
+
+ @staticmethod
+ def _get_speed_via_name(name):
+ speed_map = {
+ 'terabit': 1000000000000,
+ 'hundred-gigabit': 100000000000,
+ 'fourty-gigabit': 40000000000,
+ 'ten-gigabit': 10000000000,
+ 'gigabit': 1000000000,
+ }
+ for n,v in speed_map.iteritems():
+ if n in name.lower():
+ return v
+ return 0
+
+ @staticmethod
+ def _get_of_state(entry):
+ # If port up and ready: OFPPS_LIVE
+ # If port config bit is down: OFPPC_PORT_DOWN
+ # If port state bit is down: OFPPS_LINK_DOWN
+ if IetfInterfacesState._get_admin_state(entry) == AdminState.ENABLED:
+ return OFPPS_LIVE \
+ if IetfInterfacesState._get_oper_status(entry) == OperStatus.ACTIVE \
+ else OFPPS_LINK_DOWN
+
+ return OFPPC_PORT_DOWN
+
+ @staticmethod
+ def _get_of_capabilities(entry):
+ # The capabilities field is a bitmap that uses a combination of the following flags :
+ # Capabilities supported by the datapath
+ # enum ofp_capabilities {
+ # OFPC_FLOW_STATS = 1 << 0, /* Flow statistics. */
+ # OFPC_TABLE_STATS = 1 << 1, /* Table statistics. */
+ # OFPC_PORT_STATS = 1 << 2, /* Port statistics. */
+ # OFPC_GROUP_STATS = 1 << 3, /* Group statistics. */
+ # OFPC_IP_REASM = 1 << 5, /* Can reassemble IP fragments. */
+ # OFPC_QUEUE_STATS = 1 << 6, /* Queue statistics. */
+ # OFPC_PORT_BLOCKED = 1 << 8, /* Switch will block looping ports. */
+ # OFPC_BUNDLES = 1 << 9, /* Switch supports bundles. */
+ # OFPC_FLOW_MONITORING = 1 << 10, /* Switch supports flow monitoring. */
+ # }
+ # enum ofp_port_features {
+ # OFPPF_10MB_HD = 1 << 0, /* 10 Mb half-duplex rate support. */
+ # OFPPF_10MB_FD = 1 << 1, /* 10 Mb full-duplex rate support. */
+ # OFPPF_100MB_HD = 1 << 2, /* 100 Mb half-duplex rate support. */
+ # OFPPF_100MB_FD = 1 << 3, /* 100 Mb full-duplex rate support. */
+ # OFPPF_1GB_HD = 1 << 4, /* 1 Gb half-duplex rate support. */
+ # OFPPF_1GB_FD = 1 << 5, /* 1 Gb full-duplex rate support. */
+ # OFPPF_10GB_FD = 1 << 6, /* 10 Gb full-duplex rate support. */
+ # OFPPF_40GB_FD = 1 << 7, /* 40 Gb full-duplex rate support. */
+ # OFPPF_100GB_FD = 1 << 8, /* 100 Gb full-duplex rate support. */
+ # OFPPF_1TB_FD = 1 << 9, /* 1 Tb full-duplex rate support. */
+ # OFPPF_OTHER = 1 << 10, /* Other rate, not in the list. */
+ # OFPPF_COPPER = 1 << 11, /* Copper medium. */
+ # OFPPF_FIBER = 1 << 12, /* Fiber medium. */
+ # OFPPF_AUTONEG = 1 << 13, /* Auto-negotiation. */
+ # OFPPF_PAUSE = 1 << 14, /* Pause. */
+ # OFPPF_PAUSE_ASYM = 1 << 15 /* Asymmetric pause. */
+ # }
+ # TODO: Look into adtran-physical-entities and decode xSFP type any other settings
+ return IetfInterfacesState._get_of_speed(entry) | OFPPF_FIBER
+
+ @staticmethod
+ def _get_of_speed(entry):
+ speed = IetfInterfacesState._get_speed_value(entry)
+ speed_map = {
+ 1000000000: OFPPF_1GB_FD,
+ 10000000000: OFPPF_10GB_FD,
+ 40000000000: OFPPF_40GB_FD,
+ 100000000000: OFPPF_100GB_FD,
+ }
+ return speed_map.get(speed, OFPPF_OTHER)
+
+ @staticmethod
+ def get_nni_port_entries(rpc_reply, nni_type='ethernet'):
+ """
+ Get the port entries that make up the northbound interfaces
+
+ :param rpc_reply:
+ :param nni_type:
+ :return:
+ """
+ port_no = 1
+ ports = []
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['interfaces-state']['interface']
+ nni_ports = [entry for entry in entries if 'name' in entry and nni_type in entry['name']]
+
+ for entry in nni_ports:
+ port = {
+ 'port_no': port_no,
+ 'name': entry.get('name', 'unknown'),
+ # 'label': None,
+ 'mac_address': IetfInterfacesState._get_mac_addr(entry),
+ 'admin_state': IetfInterfacesState._get_admin_state(entry),
+ 'oper_status': IetfInterfacesState._get_oper_status(entry),
+ 'ofp_state': IetfInterfacesState._get_of_state(entry),
+ 'ofp_capabilities': IetfInterfacesState._get_of_capabilities(entry),
+ 'current_speed': IetfInterfacesState._get_of_speed(entry),
+ 'max_speed': IetfInterfacesState._get_of_speed(entry),
+ }
+ ports.append(port)
+ port_no += 1
+
+ return ports
diff --git a/voltha/adapters/adtran_olt/codec/olt_state.py b/voltha/adapters/adtran_olt/codec/olt_state.py
index 32b43aa..8ab07db 100644
--- a/voltha/adapters/adtran_olt/codec/olt_state.py
+++ b/voltha/adapters/adtran_olt/codec/olt_state.py
@@ -146,8 +146,8 @@
mobility protecting state.
onu-bit-octects:
type binary { length "4 .. 1024"; }
- description each bit position indicates corresponding ONU's status
- (true or false) whether that ONU's is in
+ description each bit position indicates corresponding ONU's status
+ (true or false) whether that ONU's is in
wavelength mobility protecting state or not
For 128 ONTs per PON, the size of this
array will be 16. onu-bit-octects[0] and MSB bit in that byte
@@ -164,7 +164,7 @@
def discovered_onu(self):
"""
Immutable Set of each Optical Network Unit(ONU) that has been activated via discovery
- key/value: serial-number (string)
+ key/value: serial-number (string)
"""
return frozenset([sn['serial-number'] for sn in self._packet.get('discovered-onu', [])
if 'serial-number' in sn])
diff --git a/voltha/adapters/adtran_olt/codec/physical_entities_state.py b/voltha/adapters/adtran_olt/codec/physical_entities_state.py
new file mode 100644
index 0000000..ed0156f
--- /dev/null
+++ b/voltha/adapters/adtran_olt/codec/physical_entities_state.py
@@ -0,0 +1,66 @@
+from ..net.adtran_netconf import adtran_module_url
+from twisted.internet.defer import inlineCallbacks, returnValue
+import xmltodict
+import structlog
+
+log = structlog.get_logger()
+
+_phys_entities_rpc = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <physical-entities-state xmlns="{}">
+ <physical-entity/>
+ </physical-entities-state>
+ </filter>
+ """.format(adtran_module_url('adtran-physical-entities'))
+
+
+class PhysicalEntitiesState(object):
+ def __init__(self, session):
+ self._session = session
+ self._rpc_reply = None
+
+ @inlineCallbacks
+ def get_state(self):
+ self._rpc_reply = None
+ request = self._session.get(_phys_entities_rpc)
+ self._rpc_reply = yield request
+ returnValue(self._rpc_reply)
+
+ @property
+ def physical_entities(self):
+ """
+ :return: (list) of OrderDict physical entities
+ """
+ if self._rpc_reply is None:
+ # TODO: Support auto-get?
+ return None
+
+ result_dict = xmltodict.parse(self._rpc_reply.data_xml)
+ return result_dict['data']['physical-entities-state']['physical-entity']
+
+ def get_physical_entities(self, classification=None):
+ """
+ Get the physical entities of a particular type
+ :param classification: (String or List) The classification or general hardware type of the
+ component identified by this physical entity
+ (case-insensitive)
+ :return: (list) of OrderDict physical entities
+ """
+ entries = self.physical_entities
+
+ if classification is None:
+ return entries
+
+ # for entry in entries:
+ # import pprint
+ # log.info(pprint.PrettyPrinter(indent=2).pformat(entry))
+
+ def _matches(entry, value):
+ if 'classification' in entry and '#text' in entry['classification']:
+ text_val = entry['classification']['#text'].lower()
+ if isinstance(value, list):
+ return any(v.lower() in text_val for v in value)
+ return value.lower() in text_val
+ return False
+
+ return [entry for entry in entries if _matches(entry, classification)]
diff --git a/voltha/adapters/adtran_olt/flow/__init__.py b/voltha/adapters/adtran_olt/flow/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/__init__.py
diff --git a/voltha/adapters/adtran_olt/flow/acl.py b/voltha/adapters/adtran_olt/flow/acl.py
new file mode 100644
index 0000000..6228afc
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/acl.py
@@ -0,0 +1,174 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+
+from enum import Enum
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
+
+import voltha.core.flow_decomposer as fd
+
+log = structlog.get_logger()
+
+_acl_list = {} # Key -> Name: List of encoded EVCs
+
+
+class ACL(object):
+ """
+ Class to wrap Trap-to-Controller functionality
+ """
+
+ def __init__(self, flow_entry):
+ self._installed = False
+ self._status_message = None
+ self._parent = flow_entry # FlowEntry parent
+ self._flow = flow_entry.flow
+ self._handler = flow_entry.handler
+ self._name = None
+
+ self._valid = self._decode()
+
+ @staticmethod
+ def create(flow_entry):
+ pass # TODO: Start here Thursday
+
+ @staticmethod
+ def flow_to_name(flow, handler):
+ return 'ACL-{}-{}'.format(flow.id, handler.id)
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def status(self):
+ return self._status_message
+
+ def install(self):
+ if not self._installed:
+ if self._name in _acl_list:
+ self._status_message = "ACL '{}' already is installed".format(self.name)
+ raise Exception(self._status_message) # TODO: A unique exception type would work here
+
+ raise NotImplemented('TODO: Implement this')
+
+ self._installed = True
+ _acl_list[self.name] = self
+ pass
+
+ return self._installed
+
+ def remove(self):
+ if self._installed:
+ raise NotImplemented('TODO: Implement this')
+
+ self._installed = False
+ _acl_list.pop(self._name)
+ pass
+
+ return not self._installed
+
+ def enable(self):
+ if not self._enabled:
+ raise NotImplemented("TODO: Implement this")
+ self._enabled = False
+
+ def disable(self):
+ if self._enabled:
+ raise NotImplemented("TODO: Implement this")
+ self._enabled = True
+
+ def _decode(self):
+ """
+ Examine flow rules and extract appropriate settings for both this EVC
+ and creates any EVC-Maps required.
+ """
+ self._name = ACL.flow_to_name(self._flow, self._handler)
+
+ # Determine this flow's type
+
+ status = self._decode_traffic_selector() and self._decode_traffic_treatment()
+
+ if status:
+ pass # TODO
+
+ if status:
+ pass # TODO
+ else:
+ pass # TODO
+
+ return status
+
+ def _is_men_port(self, port):
+ return port in self._handler.northbound_ports(port)
+
+ def _is_uni_port(self, port):
+ return port in self._handler.southbound_ports(port)
+
+ def _is_logical_port(self, port):
+ return not self._is_men_port(port) and not self._is_uni_port(port)
+
+ def _get_port_name(self, port):
+ if self._is_logical_port(port):
+ raise NotImplemented('TODO: Logical ports not yet supported')
+
+ if self._is_men_port(port):
+ return self._handler.northbound_ports[port].name
+
+ return None
+
+ def _decode_traffic_selector(self):
+ """
+ Extract EVC related traffic selection settings
+ """
+ in_port = fd.get_in_port(self._flow)
+ assert in_port is not None
+
+ return True
+
+ def _decode_traffic_treatment(self):
+ out_port = fd.get_out_port(self._flow)
+
+ return True
+
+ # BULK operations
+
+ @staticmethod
+ def enable_all():
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def disable_all():
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def remove_all():
+ """
+ Remove all ACLs from hardware
+ """
+ raise NotImplemented("TODO: Implement this")
+
diff --git a/voltha/adapters/adtran_olt/flow/evc.py b/voltha/adapters/adtran_olt/flow/evc.py
new file mode 100644
index 0000000..27ce42f
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/evc.py
@@ -0,0 +1,292 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+
+from enum import Enum
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
+
+import voltha.core.flow_decomposer as fd
+
+log = structlog.get_logger()
+
+_evc_list = {} # Key -> Name: List of encoded EVCs
+
+EVC_NAME_FORMAT = 'EVC-VOLTHA-{}-{}'
+EVC_NAME_REGEX = 'EVC-VOLTHA-{}'.format('regex-here')
+DEFAULT_STPID = 0x8100
+
+
+class EVC(object):
+ """
+ Class to wrap EVC functionality
+ """
+ class SwitchingMethod(Enum):
+ SINGLE_TAGGED = 0
+ DOUBLE_TAGGED = 1
+ MAC_SWITCHED = 2
+
+ class Men2UniManipulation(Enum):
+ SYMETRIC = 0
+ POP_OUT_TAG_ONLY = 1
+
+ class ElineFlowType(Enum):
+ NNI_TO_UNI = 0,
+ UNI_TO_NNI = 1,
+ NNI_TO_NNI = 2,
+ ACL_FILTER = 3,
+ UNKNOWN = 4,
+ UNSUPPORTED = 5 # Or Invalid
+
+ def __init__(self, flow_entry):
+ self._installed = False
+ self._status_message = None
+ self._parent = flow_entry # FlowEntry parent
+ self._flow = flow_entry.flow
+ self._handler = flow_entry.handler
+ self._evc_maps = [] # One if E-Line
+
+ self._flow_type = EVC.ElineFlowType.UNKNOWN
+
+ # EVC related properties
+ self._name = EVC.flow_to_name(flow_entry.flow, flow_entry.handler)
+ self._enabled = True
+ self._ce_vlan_preservation = True
+ self._men_ports = []
+ self._s_tag = -1
+ self._stpid = DEFAULT_STPID
+
+ self._switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
+ self._men_to_uni_tag_manipulation = EVC.Men2UniManipulation.SYMETRIC
+
+ self._valid = self._decode()
+
+ @staticmethod
+ def flow_to_name(flow, handler):
+ return EVC_NAME_FORMAT.format(flow.id, handler.id)
+
+ @staticmethod
+ def create(flow_entry):
+ # Does it already exist?
+
+ evc = _evc_list.get(EVC.flow_to_name(flow_entry.flow, flow_entry.handler))
+
+ if evc is None:
+ evc = EVC(flow_entry.flow, flow_entry.handler)
+
+ if evc is not None:
+ pass # Look up any EVC that
+ return
+ pass # Start decode here
+
+ return evc
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def status(self):
+ return self._status_message
+
+ def install(self):
+ if not self._installed:
+ if self._name in _evc_list:
+ self._status_message = "EVC '{}' already is installed".format(self._name)
+ raise Exception(self._status_message) # TODO: A unique exception type would work here
+
+ raise NotImplemented('TODO: Implement this')
+ # xml = '<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' \
+ # '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs">' \
+ # '<adtn-evc:evc xmlns:adtn-evc="http://www.adtran.com/ns/yang/adtran-evcs">'
+ #
+ # xml += '<adtn-evc:name>' + name + '</adtn-evc:name>'
+ #
+ # if stag:
+ # xml += '<adtn-evc:stag>' + stag + '</adtn-evc:stag>'
+ #
+ # if preserve:
+ # xml += '<adtn-evc:ce-vlan-preservation>' + preserve + '</adtn-evc:ce-vlan-preservation>'
+ #
+ # if enabled:
+ # xml += '<adtn-evc:enabled>' + enabled + '</adtn-evc:enabled>'
+ # else:
+ # xml += '<adtn-evc:enabled>' + "true" + '</adtn-evc:enabled>'
+ #
+ # xml += '</adtn-evc:evc></evc></config>'
+ #
+ # print "Creating EVC %s" % name
+ #
+ # print mgr.mgr.edit_config(target="running",
+ # config=xml,
+ # default_operation="merge",
+ # format="xml")
+
+ self._installed = True
+ _evc_list[self.name] = self
+ pass
+
+ return self._installed
+
+ def remove(self):
+ if self._installed:
+ raise NotImplemented('TODO: Implement this')
+ # xml = '<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' \
+ # '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs">' \
+ # '<adtn-evc:evc xmlns:adtn-evc="http://www.adtran.com/ns/yang/adtran-evcs" nc:operation="delete">'
+ #
+ # xml += '<adtn-evc:name>' + name + '</adtn-evc:name>'
+ #
+ # xml += '</adtn-evc:evc></evc></config>'
+ #
+ # print "Deleting EVC %s" % name
+ #
+ # print mgr.mgr.edit_config(target="running",
+ # config=xml,
+ # default_operation="merge",
+ # format="xml")
+
+ self._installed = False
+ _evc_list.pop(self.name)
+ pass
+
+ return not self._installed
+
+ def enable(self):
+ if not self._enabled:
+ raise NotImplemented("TODO: Implement this")
+ self._enabled = False
+
+ def disable(self):
+ if self._enabled:
+ raise NotImplemented("TODO: Implement this")
+ self._enabled = True
+
+ def _decode(self):
+ """
+ Examine flow rules and extract appropriate settings for both this EVC
+ and creates any EVC-Maps required.
+ """
+ from evc_map import EVCMap
+
+ # Determine this flow's type
+
+ status = self._decode_traffic_selector() and self._decode_traffic_treatment()
+
+ if status:
+ ingress_map = EVCMap.createIngressMap(self._flow, self._device)
+ egress_map = EVCMap.createEgressMap(self._flow, self._device)
+
+ status = ingress_map.valid and egress_map.valid
+
+ if status:
+ self._evc_maps.append(ingress_map)
+ self._evc_maps.append(egress_map)
+ else:
+ self._status_message = 'Ingress MAP invalid: {}'.format(ingress_map.status)\
+ if not ingress_map.valid else 'Egress MAP invalid: {}'.format(egress_map.status)
+
+ return status
+
+ def _is_men_port(self, port):
+ return port in self._handler.northbound_ports(port)
+
+ def _is_uni_port(self, port):
+ return port in self._handler.southbound_ports(port)
+
+ def _is_logical_port(self, port):
+ return not self._is_men_port(port) and not self._is_uni_port(port)
+
+ def _get_port_name(self, port):
+ if self._is_logical_port(port):
+ raise NotImplemented('TODO: Logical ports not yet supported')
+
+ if self._is_men_port(port):
+ return self._handler.northbound_ports[port].name
+
+ return None
+
+ def _decode_traffic_selector(self):
+ """
+ Extract EVC related traffic selection settings
+ """
+ in_port = fd.get_in_port(self._flow)
+ assert in_port is not None
+
+ if self._is_men_port(in_port):
+ log.debug('in_port is a MEN Port', port=in_port)
+ self._men_ports.append(self._get_port_name(in_port))
+ else:
+ pass # UNI Ports handled in the EVC Maps
+
+ for field in fd.get_ofb_fields(self._flow):
+ log.debug('Found OFB field', field=field)
+ self._status_message = 'Unsupported field.type={}'.format(field.type)
+ return False
+
+ return True
+
+ def _decode_traffic_treatment(self):
+ out_port = fd.get_out_port(self._flow)
+ num_outputs = 0
+
+ if self._is_men_port(out_port):
+ log.debug('out_port is a MEN Port', port=out_port)
+ self._men_ports.append(self._get_port_name(out_port))
+ else:
+ pass # UNI Ports handled in the EVC Maps
+
+ for action in fd.get_actions(self._flow):
+ if action.type == fd.OUTPUT:
+ num_outputs += 1 # Handled earlier
+ assert num_outputs <= 1 # Only E-LINE supported and no UNI<->UNI
+
+ else:
+ # TODO: May need to modify ce-preservation
+ log.debug('Found action', action=action)
+
+ return True
+
+ # BULK operations
+
+ @staticmethod
+ def enable_all(regex_=EVC_NAME_REGEX):
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def disable_all(regex_=EVC_NAME_REGEX):
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def remove_all(regex_=EVC_NAME_REGEX):
+ """
+ Remove all matching EVCs and associated EVC MAPs from hardware
+
+ :param regex_: (String) Regular expression for name matching
+ """
+ raise NotImplemented("TODO: Implement this")
+
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
new file mode 100644
index 0000000..1d16daf
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -0,0 +1,131 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+
+import structlog
+from enum import Enum
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
+
+import voltha.core.flow_decomposer as fd
+
+log = structlog.get_logger()
+
+EVC_MAP_NAME_INGRESS_FORMAT = 'EVCMap-VOLTHA-ingress-{}'
+EVC_MAP_NAME_EGRESS_FORMAT = 'EVCMap-VOLTHA-egress-{}'
+
+EVC_MAP_NAME_INGRESS_REGEX_FORMAT = EVC_MAP_NAME_INGRESS_FORMAT.format('regex here')
+EVC_MAP_NAME_EGRESS_REGEX_FORMAT = EVC_MAP_NAME_EGRESS_FORMAT.format('regex here')
+
+class EVCMap(object):
+ """
+ Class to wrap EVC functionality
+ """
+ class EvcConnection(Enum):
+ NO_EVC_CONNECTION = 0
+ EVC = 1
+ DISCARD = 2
+
+ class Priority_Option(Enum):
+ INHERIT_PRIORITY = 0
+ EXPLICIT_PRIORITY = 1
+
+ def __init__(self, flow, handler, evc, is_ingress_map):
+ self._installed = False
+ self._status_message = None
+ self._flow = flow
+ self._handler = handler
+
+ self._name = None
+ self._enabled = True
+ self._uni_port = None
+ self._evc_connection = EVCMap.EvcConnection.NO_EVC_CONNECTION
+ self._evc_name = None
+
+ self._men_priority = EVCMap.Priority_Option.INHERIT_PRIORITY
+ self._men_pri = 0 # If Explicit Priority
+
+ self._c_tag = -1
+ self._men_ctag_priority = EVCMap.Priority_Option.INHERIT_PRIORITY
+ self._men_ctag_pri = 0 # If Explicit Priority
+
+ self._match_ce_vlan_id = -1
+ self._match_untagged = True
+ self._match_destination_mac_address = None
+ self._match_l2cp = False
+ self._match_broadcast = False
+ self._match_multicast = False
+ self._match_unicast = False
+ self._match_igmp = False
+
+ self._evc = evc
+ self._is_ingress_map = is_ingress_map
+
+ self._valid = self.decode()
+
+ @staticmethod
+ def createIngressMap(flow, device, evc):
+ return EVCMap(flow, device, evc, True)
+
+ @staticmethod
+ def createEgressMap(flow, device, evc):
+ return EVCMap(flow, device, evc, False)
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def status(self):
+ return self._status_message
+
+ def install(self):
+ if not self._installed:
+ pass
+
+ return self._installed
+
+ def remove(self):
+ if self._installed:
+ pass
+
+ return not self._installed
+
+ def _decode(self):
+ self._name = 'EVC-MAP-{}-{}'.format('i' if self._is_ingress_map else 'e', self._flow.id)
+
+ return self._decode_traffic_selector() and self._decode_traffic_treatment()
+
+ def _decode_traffic_selector(self):
+ self._status_message('TODO: Not yet implemented')
+ return False
+
+ def _decode_traffic_treatment(self):
+ self._status_message('TODO: Not yet implemented')
+ return False
+
+
+
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
new file mode 100644
index 0000000..b683cab
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -0,0 +1,164 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+from evc import EVC
+from acl import ACL
+
+import voltha.core.flow_decomposer as fd
+from voltha.protos.openflow_13_pb2 import OFPP_IN_PORT, OFPP_TABLE, OFPP_NORMAL, OFPP_FLOOD, OFPP_ALL
+from voltha.protos.openflow_13_pb2 import OFPP_CONTROLLER, OFPP_LOCAL, OFPP_ANY, OFPP_MAX
+
+log = structlog.get_logger()
+
+
+class FlowEntry(object):
+ """
+ Provide a class that wraps the flow rule and also provides state/status for
+ a FlowEntry.
+ """
+ def __init__(self, flow, handler):
+ self._flow = flow
+ self._handler = handler
+ log.debug('Initializing a new FlowEntry', flow=flow)
+
+ @property
+ def name(self):
+ return 'Flow-{}'.format(self.flow.id)
+
+ @property
+ def flow(self):
+ return self._flow
+
+ @property
+ def handler(self):
+ return self._handler
+
+ @staticmethod
+ def create(flow, handler):
+ """
+ Create the appropriate FlowEntry wrapper for the flow
+
+ :param flow: (Flow) Flow entry passed to VOLTHA adapter
+ :param handler: (AdtranDeviceHandler) handler for the device
+
+ :return: (FlowEntry) A flow entry of the appropriate type
+ """
+ # Determine the type of flow entry. An ACL type entry is use to send
+ # packets to a reserved port (controller) or to drop them.
+
+ in_port = fd.get_in_port(flow)
+ out_port = fd.get_out_port(flow)
+
+ if in_port or out_port is None:
+ return None
+
+ # Convert all possible physical ports into a single number for matching purposes
+
+ if in_port <= OFPP_MAX:
+ in_port = OFPP_MAX
+
+ if out_port <= OFPP_MAX:
+ in_port = OFPP_MAX
+
+ # Commented out entries below represent future desireable combinations, but not supported
+ # in initial release of this device adapter.
+
+ flow_type = {
+ (OFPP_MAX, OFPP_MAX): EVCFlowEntry, # Physical port to physical port
+ (OFPP_ANY, OFPP_CONTROLLER): ACLFlowEntry, # A common SDN/Openflow operation
+ (OFPP_MAX, OFPP_TABLE): EVCFlowEntry, # Perhaps double-tagging?
+ # (OFPP_MAX, OFPP_LOCAL): ACLFlowEntry,
+ # (OFPP_ANY, OFPP_LOCAL): ACLFlowEntry,
+ # (OFPP_LOCAL, OFPP_MAX): ACLFlowEntry,
+ # (OFPP_MAX, OFPP_IN_PORT): EVCFlowEntry,
+ # (OFPP_ANY, OFPP_IN_PORT): EVCFlowEntry,
+
+ }.get((in_port, out_port), None)
+
+ return None if flow_type is None else flow_type(FlowEntry(flow, handler))
+
+ ######################################################
+ # Bulk operations
+
+ @staticmethod
+ def enable_all():
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def disable_all():
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def remove_all():
+ """
+ Remove all matching EVCs and associated EVC MAPs from hardware
+
+ :param regex_: (String) Regular expression for name matching
+ """
+ raise NotImplemented("TODO: Implement this")
+
+
+class EVCFlowEntry(FlowEntry):
+ def __init__(self, flow, handler):
+ super(FlowEntry, self).__init__(flow, handler)
+ self.evc = EVC.create(flow, handler)
+
+ @property
+ def valid(self):
+ return self.evc.valid
+
+ @property
+ def installed(self):
+ return self.evc.installed
+
+ def install(self):
+ return self.evc.install()
+
+ def remove(self):
+ return self.evc.remove()
+
+ def enable(self):
+ return self.evc.enable()
+
+ def disable(self):
+ return self.evc.disable()
+
+
+class ACLFlowEntry(FlowEntry):
+ def __init__(self, flow, handler):
+ super(FlowEntry, self).__init__(flow, handler)
+ self.acl = ACL.create(flow, handler)
+
+ @property
+ def valid(self):
+ return self.acl.valid
+
+ @property
+ def installed(self):
+ return self.acl.installed
+
+ def install(self):
+ return self.acl.install()
+
+ def remove(self):
+ return self.acl.remove()
+
+ def enable(self):
+ return self.acl.enable()
+
+ def disable(self):
+ return self.evc.disable()
+
diff --git a/voltha/adapters/adtran_olt/net/adtran_netconf.py b/voltha/adapters/adtran_olt/net/adtran_netconf.py
new file mode 100644
index 0000000..f75e115
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/adtran_netconf.py
@@ -0,0 +1,352 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+from lxml import etree
+from ncclient import manager
+from ncclient.operations import RPCError
+from ncclient.transport.errors import SSHError
+from twisted.internet import defer, threads
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = structlog.get_logger('ncclient')
+
+ADTRAN_NS = 'http://www.adtran.com/ns/yang'
+
+
+def adtran_module_url(module):
+ return '{}/{}'.format(ADTRAN_NS, module)
+
+
+def phys_entities_rpc():
+ return """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <physical-entities-state xmlns="{}">
+ <physical-entity/>
+ </physical-entities-state>
+ </filter>
+ """.format(adtran_module_url('adtran-physical-entities'))
+
+
+class AdtranNetconfClient(object):
+ """
+ Performs NETCONF requests
+ """
+
+ def __init__(self, host_ip, port=830, username='', password='', timeout=20):
+ self._ip = host_ip
+ self._port = port
+ self._username = username
+ self._password = password
+ self._timeout = timeout
+ self._session = None
+
+ def __str__(self):
+ return "AdtranNetconfClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+ @property
+ def capabilities(self):
+ """
+ Get the server's NETCONF capabilities
+
+ :return: (ncclient.capabilities.Capabilities) object representing the server's capabilities.
+ """
+ return self._session.server_capabilities if self._session else None
+
+ @property
+ def connected(self):
+ """
+ Is this client connected to a NETCONF server
+ :return: (boolean) True if connected
+ """
+ return self._session is not None and self._session.connected
+
+ def connect(self, connect_timeout=None):
+ """
+ Connect to the NETCONF server
+
+ o To disable attempting publickey authentication altogether, call with
+ allow_agent and look_for_keys as False.
+
+ o hostkey_verify enables hostkey verification from ~/.ssh/known_hosts
+
+ :return: (deferred) Deferred request
+ """
+ timeout = connect_timeout or self._timeout
+
+ return threads.deferToThread(self._do_connect, timeout)
+
+ def _do_connect(self, timeout):
+ try:
+ self._session = manager.connect(host=self._ip,
+ port=self._port,
+ username=self._username,
+ password=self._password,
+ allow_agent=False,
+ look_for_keys=False,
+ hostkey_verify=False,
+ timeout=timeout)
+
+ log.debug('Dumping Server Capabilities')
+ for cap in self.capabilities:
+ log.debug(' {}'.format(cap))
+ except SSHError as e:
+ # Log and rethrow exception so any errBack is called
+ log.exception('SSH Error during connect: {}'.format(e.message))
+ raise e
+
+ except Exception as e:
+ # Log and rethrow exception so any errBack is called
+ log.exception('Connect request failed: {}'.format(e.message))
+ raise e
+
+ # If debug logging is enabled, decrease the level, DEBUG is a significant
+ # performance hit during response XML decode
+
+ if log.isEnabledFor('DEBUG'):
+ log.setLevel('INFO')
+
+ # TODO: ncclient also supports RaiseMode:NONE to limit exceptions. To set use:
+ #
+ # self._session.raise_mode = RaiseMode:NONE
+ #
+ # and the when you get a response back, you can check 'response.ok' to see if it is 'True'
+ # if it is not, you can enumerate the 'response.errors' list for more information
+
+ return self._session
+
+ def close(self):
+ """
+ Close the connection to the NETCONF server
+ :return: (deferred) Deferred request
+ """
+ s, self._session = self._session, None
+
+ if s is None or not s.connected:
+ return defer.returnValue(True)
+
+ return threads.deferToThread(self._do_close, s)
+
+ def _do_close(self, old_session):
+ return old_session.close_session()
+
+ def get_config(self, source='running'):
+ """
+ Get the configuration from the specified source
+
+ :param source: (string) Configuration source, 'running', 'candidate', ...
+
+ :return: (deferred) Deferred request that wraps the GetReply class
+ """
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_get_config, source)
+
+ def _do_get_config(self, source):
+ """
+ Get the configuration from the specified source
+
+ :param source: (string) Configuration source, 'running', 'candidate', ...
+
+ :return: (GetReply) The configuration.
+ """
+ return self._session.get_config(source)
+
+ def get(self, payload):
+ """
+ Get the requested data from the server
+
+ :param payload: Payload/filter
+ :return: (defeered) for GetReply
+ """
+ log.debug('get', filter=payload)
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_get, payload)
+
+ def _do_get(self, payload):
+ """
+ Get the requested data from the server
+
+ :param payload: Payload/filter
+ :return: (GetReply) response
+ """
+ try:
+ response = self._session.get(payload)
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('get Exception: {}'.format(e.message))
+ raise
+
+ return response
+
+ def lock(self, source, lock_timeout):
+ """
+ Lock the configuration system
+ :return: (defeered) for RpcReply
+ """
+ log.debug('lock', source=source, timeout=lock_timeout)
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_lock, source, lock_timeout)
+
+ def _do_lock(self, source, lock_timeout):
+ """
+ Lock the configuration system
+ """
+ try:
+ response = self._session.lock(source, timeout=lock_timeout)
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('lock Exception: {}'.format(e.message))
+ raise
+
+ return response
+
+ def unlock(self, source):
+ """
+ Get the requested data from the server
+ :param rpc_string: RPC request
+
+ :return: (defeered) for RpcReply
+ """
+ log.debug('unlock', source=source)
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_unlock, source)
+
+ def _do_unlock(self, source):
+ """
+ Lock the configuration system
+ """
+ try:
+ response = self._session.unlock(source)
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('unlock Exception: {}'.format(e.message))
+ raise
+
+ return response
+
+ @inlineCallbacks
+ def edit_config(self, config, target='running', default_operation=None,
+ test_option=None, error_option=None, lock_timeout=-1):
+ """
+ Loads all or part of the specified config to the target configuration datastore with the ability to lock
+ the datastore during the edit. To change multiple items, use your own calls to lock/unlock instead of
+ using the lock_timeout value
+
+ :param config is the configuration, which must be rooted in the config element. It can be specified
+ either as a string or an Element.format="xml"
+ :param target is the name of the configuration datastore being edited
+ :param default_operation if specified must be one of { 'merge', 'replace', or 'none' }
+ :param test_option if specified must be one of { 'test_then_set', 'set' }
+ :param error_option if specified must be one of { 'stop-on-error', 'continue-on-error', 'rollback-on-error' }
+ The 'rollback-on-error' error_option depends on the :rollback-on-error capability.
+ :param lock_timeout if >0, the maximum number of seconds to hold a lock on the datastore while the edit
+ operation is underway
+
+ :return: (defeered) for RpcReply
+ """
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ rpc_reply = None
+ if lock_timeout > 0:
+ try:
+ request = self._session.lock(target, lock_timeout)
+ rpc_reply = yield request
+
+ except Exception as e:
+ log.exception('edit_config Lock Exception: {}'.format(e.message))
+ raise
+ try:
+ if config[:7] != '<config':
+ config = '<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ config + '</config>'
+
+ rpc_reply = yield threads.deferToThread(self._do_edit_config, target,
+ config, default_operation,
+ test_option, error_option)
+ except Exception as e:
+ log.exception('edit_config Edit Exception: {}'.format(e.message))
+ raise
+
+ finally:
+ if lock_timeout > 0:
+ try:
+ yield self._session.lock(target, lock_timeout)
+
+ except Exception as e:
+ log.exception('edit_config unlock Exception: {}'.format(e.message))
+ # Note that we just fall through and do not re-raise this exception
+
+ returnValue(rpc_reply)
+
+ def _do_edit_config(self, target, config, default_operation, test_option, error_option):
+ """
+ Lock the configuration system
+ """
+ try:
+ response = self._session.edit_config(target=target, config=config
+ # TODO: Support additional options later
+ # ,default_operation=default_operation,
+ # test_option=test_option,
+ # error_option=error_option
+ )
+ # To get XML, use response.xml
+ # To check status, use response.ok (boolean)
+
+ except RPCError as e:
+ log.exception('edit_config Exception: {}'.format(e.message))
+ raise
+
+ return response
+
+ def rpc(self, rpc_string):
+ """
+ Custom RPC request
+ :param rpc_string: (string) RPC request
+ :return: (defeered) for GetReply
+ """
+ log.debug('rpc', rpc=rpc_string)
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_rpc, rpc_string)
+
+ def _do_rpc(self, rpc_string):
+ try:
+ response = self._session.dispatch(etree.fromstring(rpc_string))
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('rpc Exception: {}'.format(e.message))
+ raise
+
+ return response
diff --git a/voltha/adapters/adtran_olt/net/adtran_rest.py b/voltha/adapters/adtran_olt/net/adtran_rest.py
index 456b3c5..aa42c8e 100644
--- a/voltha/adapters/adtran_olt/net/adtran_rest.py
+++ b/voltha/adapters/adtran_olt/net/adtran_rest.py
@@ -23,6 +23,13 @@
log = structlog.get_logger()
+class RestInvalidResponseCode(Exception):
+ def __init__(self, message, url, code):
+ super(RestInvalidResponseCode, self).__init__(message)
+ self.url = url
+ self.code = code
+
+
class AdtranRestClient(object):
"""
Performs Adtran RESTCONF requests
@@ -70,14 +77,17 @@
:param password: (string) Password for credentials
:param timeout: (int) Number of seconds to wait for a response before timing out
"""
- self.ip = host_ip
- self.rest_port = port
- self.username = username
- self.password = password
- self.timeout = timeout
+ self._ip = host_ip
+ self._port = port
+ self._username = username
+ self._password = password
+ self._timeout = timeout
+
+ def __str__(self):
+ return "AdtranRestClient {}@{}:{}".format(self._username, self._ip, self._port)
@inlineCallbacks
- def request(self, method, uri, data=None, name=''):
+ def request(self, method, uri, data=None, name='', timeout=None):
"""
Send a REST request to the Adtran device
@@ -91,31 +101,34 @@
if method.upper() not in self._valid_methods:
raise NotImplementedError("REST method '{}' is not supported".format(method))
- url = 'http://{}:{}{}{}'.format(self.ip, self.rest_port,
+ url = 'http://{}:{}{}{}'.format(self._ip, self._port,
'/' if uri[0] != '/' else '',
uri)
+ response = None
+ timeout = timeout or self._timeout
+
try:
if method.upper() == 'GET':
response = yield treq.get(url,
- auth=(self.username, self.password),
- timeout=self.timeout,
+ auth=(self._username, self._password),
+ timeout=timeout,
headers=self.REST_GET_REQUEST_HEADER)
elif method.upper() == 'POST' or method.upper() == 'PUT':
response = yield treq.post(url,
data=data,
- auth=(self.username, self.password),
- timeout=self.timeout,
+ auth=(self._username, self._password),
+ timeout=timeout,
headers=self.REST_POST_REQUEST_HEADER)
elif method.upper() == 'PATCH':
response = yield treq.patch(url,
data=data,
- auth=(self.username, self.password),
- timeout=self.timeout,
+ auth=(self._username, self._password),
+ timeout=timeout,
headers=self.REST_PATCH_REQUEST_HEADER)
elif method.upper() == 'DELETE':
response = yield treq.delete(url,
- auth=(self.username, self.password),
- timeout=self.timeout,
+ auth=(self._username, self._password),
+ timeout=timeout,
headers=self.REST_DELETE_REQUEST_HEADER)
else:
raise NotImplementedError("REST method '{}' is not supported".format(method))
@@ -126,7 +139,7 @@
except ConnectionClosed:
returnValue(None)
- except Exception, e:
+ except Exception as e:
log.exception("REST {} '{}' request to '{}' failed: {}".format(method, name, url, str(e)))
raise
@@ -134,7 +147,7 @@
message = "REST {} '{}' request to '{}' failed with status code {}".format(method, name,
url, response.code)
log.error(message)
- raise Exception(message)
+ raise RestInvalidResponseCode(message, url, response.code)
if response.code == self.HTTP_NO_CONTENT:
returnValue(None)
@@ -154,7 +167,7 @@
try:
result = json.loads(content)
- except Exception, e:
+ except Exception as e:
log.exception("REST {} '{}' JSON decode of '{}' failure: {}".format(method, name,
url, str(e)))
raise
diff --git a/voltha/adapters/adtran_olt/net/adtran_zmq.py b/voltha/adapters/adtran_olt/net/adtran_zmq.py
index bdfaa45..9cbeae6 100644
--- a/voltha/adapters/adtran_olt/net/adtran_zmq.py
+++ b/voltha/adapters/adtran_olt/net/adtran_zmq.py
@@ -32,8 +32,8 @@
class AdtranZmqClient(object):
"""
Adtran ZeroMQ Client for PON Agent packet in/out service
-
- PON Agent expects and external PAIR socket with
+
+ PON Agent expects and external PAIR socket with
"""
def __init__(self, ip_address, rx_callback=None,
@@ -53,6 +53,10 @@
except Exception as e:
log.exception(e.message)
+ def shutdown(self):
+ self.socket.onReceive = AdtranZmqClient.rx_nop
+ self.socket.shutdown()
+
@staticmethod
def rx_nop(message):
log.debug('Discarding ZMQ message, no receiver specified')
@@ -61,11 +65,11 @@
def encode_omci_message(msg, pon_index, onu_id):
"""
Create an OMCI Tx Packet for the specified ONU
-
- :param msg: (str) OMCI message to send
+
+ :param msg: (str) OMCI message to send
:param pon_index: (unsigned int) PON Port index
:param onu_id: (unsigned int) ONU ID
-
+
:return: (bytes) octet string to send
"""
assert msg
@@ -79,8 +83,8 @@
def decode_packet(packet):
"""
Decode the packet provided by the ZMQ client
-
- :param packet: (bytes) Packet
+
+ :param packet: (bytes) Packet
:return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
and a flag indicating if it is OMCI
"""
@@ -96,8 +100,8 @@
def _decode_omci_message(packet):
"""
Decode the packet provided by the ZMQ client
-
- :param packet: (bytes) Packet
+
+ :param packet: (bytes) Packet
:return: (long, long, bytes) PON Index, ONU ID, OMCI Frame Contents
"""
(pon_index, onu_id) = struct.unpack_from('!II', packet)
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index 55160f3..11a676c 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -17,7 +17,9 @@
import random
import structlog
-from twisted.internet.defer import inlineCallbacks, returnValue
+from enum import Enum
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.protos.common_pb2 import OperStatus, AdminState
@@ -25,8 +27,6 @@
from voltha.protos.logical_device_pb2 import LogicalPort
from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
-log = structlog.get_logger()
-
class NniPort(object):
"""
@@ -36,105 +36,308 @@
so we do not duplicate fields/properties/methods
"""
+ class State(Enum):
+ INITIAL = 0 # Created and initialization in progress
+ RUNNING = 1 # PON port contacted, ONU discovery active
+ STOPPED = 2 # Disabled
+ DELETING = 3 # Cleanup
+
def __init__(self, parent, **kwargs):
# TODO: Weed out those properties supported by common 'Port' object
assert parent
assert 'port_no' in kwargs
- self.port = None
- self.logical_port = None
- self.parent = parent
- self.port_no = kwargs.get('port_no')
+ self.log = structlog.get_logger(port_no=kwargs.get('port_no'))
- self.startup = None
- log.info('Creating NNI Port {}'.format(self.port_no))
+ self._port_no = kwargs.get('port_no')
+ self._name = kwargs.get('name', 'nni-{}'.format(self._port_no))
+ self._port = None
+ self._logical_port = None
+ self._parent = parent
+
+ self._deferred = None
+ self._state = NniPort.State.INITIAL
+ self.log.info('Creating NNI Port')
+
+ # Local cache of NNI configuration
+
+ self._enabled = None
# And optional parameters
- self.admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
- self.oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
- self.label = kwargs.pop('label', 'NNI port {}'.format(self.port_no))
- self.name = kwargs.pop('name', 'nni-{}'.format(self.port_no))
- self.mac_address = kwargs.pop('mac_address',
- '08:00:{}{}:{}{}:{}{}:00'.format(random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9)))
+ self._admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
+ self._oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
+ self._label = kwargs.pop('label', 'NNI port {}'.format(self._port_no))
+ self._mac_address = kwargs.pop('mac_address',
+ '08:00:{}{}:{}{}:{}{}:00'.format(random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9)))
# TODO: May need to refine capabilities into current, advertised, and peer
- self.ofp_capabilities = kwargs.pop('ofp_capabilities', OFPPF_100GB_FD | OFPPF_FIBER)
- self.ofp_state = kwargs.pop('ofp_state', OFPPS_LIVE)
- self.current_speed = kwargs.pop('current_speed', OFPPF_100GB_FD)
- self.max_speed = kwargs.pop('max_speed', OFPPF_100GB_FD)
- self.device_port_no = kwargs.pop('device_port_no', self.port_no)
+ self._ofp_capabilities = kwargs.pop('ofp_capabilities', OFPPF_100GB_FD | OFPPF_FIBER)
+ self._ofp_state = kwargs.pop('ofp_state', OFPPS_LIVE)
+ self._current_speed = kwargs.pop('current_speed', OFPPF_100GB_FD)
+ self._max_speed = kwargs.pop('max_speed', OFPPF_100GB_FD)
+ self._device_port_no = kwargs.pop('device_port_no', self._port_no)
+
+ def __del__(self):
+ self.stop()
def __str__(self):
- return "NniPort-{}: Admin: {}, Oper: {}, parent: {}".format(self.port_no,
- self.admin_state,
- self.oper_status,
- self.parent)
+ return "NniPort-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+
+ @property
+ def port_number(self):
+ return self._port_no
+
+ @property
+ def olt(self):
+ return self._parent
+
+ @property
+ def state(self):
+ return self._state
+
+ @property
+ def adapter_agent(self):
+ return self.olt.adapter_agent
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ if d is not None:
+ d.cancel()
+
+ def _update_adapter_agent(self):
+ # TODO: Currently the adapter_agent does not allow 'update' of port status
+ # self.adapter_agent.update_port(self.olt.device_id, self.get_port())
+ pass
def get_port(self):
"""
Get the VOLTHA PORT object for this port
:return: VOLTHA Port object
"""
- if self.port is None:
- self.port = Port(port_no=self.port_no,
- label=self.label,
- type=Port.ETHERNET_NNI,
- admin_state=self.admin_state,
- oper_status=self.oper_status)
- return self.port
+ if self._port is None:
+ self._port = Port(port_no=self._port_no,
+ label=self._label,
+ type=Port.ETHERNET_NNI,
+ admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ return self._port
def get_logical_port(self):
"""
Get the VOLTHA logical port for this port
:return: VOLTHA logical port or None if not supported
"""
- if self.logical_port is None:
- openflow_port = ofp_port(port_no=self.port_no,
- hw_addr=mac_str_to_tuple(self.mac_address),
- name=self.name,
+ if self._logical_port is None:
+ openflow_port = ofp_port(port_no=self._port_no,
+ hw_addr=mac_str_to_tuple(self._mac_address),
+ name=self._name,
config=0,
- state=self.ofp_state,
- curr=self.ofp_capabilities,
- advertised=self.ofp_capabilities,
- peer=self.ofp_capabilities,
- curr_speed=self.current_speed,
- max_speed=self.max_speed)
+ state=self._ofp_state,
+ curr=self._ofp_capabilities,
+ advertised=self._ofp_capabilities,
+ peer=self._ofp_capabilities,
+ curr_speed=self._current_speed,
+ max_speed=self._max_speed)
- self.logical_port = LogicalPort(id='nni{}'.format(self.port_no),
- ofp_port=openflow_port,
- device_id=self.parent.device_id,
- device_port_no=self.device_port_no,
- root_port=True)
- return self.logical_port
+ self._logical_port = LogicalPort(id='nni{}'.format(self._port_no),
+ ofp_port=openflow_port,
+ device_id=self._parent.device_id,
+ device_port_no=self._device_port_no,
+ root_port=True)
+ return self._logical_port
- @inlineCallbacks
def start(self):
"""
Start/enable this NNI
:return: (deferred)
"""
- log.info('Starting NNI port {}'.format(self.port_no))
+ if self._state == NniPort.State.RUNNING:
+ return succeed('Running')
+
+ self.log.info('Starting NNI port')
# TODO: Start up any watchdog/polling tasks here
- yield returnValue('NNI Port start is a NOP at this time')
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
+ self._update_adapter_agent()
+
+ # Do the rest of the startup in an async method
+ self._deferred = reactor.callLater(0, self._finish_startup)
+ return self._deferred
+
+ @inlineCallbacks
+ def _finish_startup(self):
+ if self._state != NniPort.State.INITIAL:
+ returnValue('Done')
+
+ returnValue('TODO: Implement startup of each NNI port')
+
+ if self._enabled:
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+ self._state = NniPort.State.RUNNING
+
+ # TODO: Start status polling of NNI interfaces
+ self._deferred = None # = reactor.callLater(3, self.do_stuff)
+
+ self._update_adapter_agent()
+ returnValue('Enabled')
+
+ else:
+ # Startup failed. Could be due to object creation with an invalid initial admin_status
+ # state. May want to schedule a start to occur again if this happens
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.UNKNOWN
+ self._state = NniPort.State.STOPPED
+
+ self._update_adapter_agent()
+ returnValue('Disabled')
def stop(self):
- log.info('Stopping NNI port {}'.format(self.port_no))
- d, self.startup = self.startup, None
- if d is not None:
- d.cancel()
+ if self._state == NniPort.State.STOPPED:
+ return succeed('Stopped')
- self.admin_state = AdminState.DISABLED
- self.oper_status = OperStatus.UNKNOWN
+ self.log.info('Stopping NNI port')
- yield returnValue('NNI Port stop may need more work')
- # TODO: How do we reflect this into VOLTHA
+ self._cancel_deferred()
+ # NOTE: Leave all NNI ports active (may have inband management)
+ # TODO: Revisit leaving NNI Ports active on disable
+
+ # Flush config cache
+ self._enabled = None
+
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.UNKNOWN
+ self._update_adapter_agent()
+
+ self._state = NniPort.State.STOPPED
+ return self._deferred
+
+ def delete(self):
+ """
+ Parent device is being deleted. Do not change any config but
+ stop all polling
+ """
+ self.log.info('Deleteing {}'.format(self._label))
+ self._state = NniPort.State.DELETING
+ self._cancel_deferred()
+
+ @inlineCallbacks
+ def reset(self):
+ """
+ Set the NNI Port to a known good state on initial port startup. Actual
+ NNI 'Start' is done elsewhere
+ """
+ if self._state != NniPort.State.INITIAL:
+ self.log.error('Reset ignored, only valid during initial startup', state=self._state)
+ returnValue('Ignored')
+
+ self.log.info('Reset {}'.format(self._label))
+
+ # Always enable our NNI ports
+
+ try:
+ results = yield self.set_config('enabled', True)
+ self._admin_state = AdminState.ENABLED
+ self._enabled = True
+ returnValue(results)
+
+ except Exception as e:
+ self.log.exception('Reset of NNI to initial state failed', e=e)
+ self._admin_state = AdminState.UNKNOWN
+ raise
+
+ @inlineCallbacks
+ def set_config(self, leaf, value):
+ data = {'leaf': leaf, 'value': value}
+ config = '<interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._name) + \
+ ' <{d[leaf]}>{d[value]}</{d[leaf]}>'.format(d=data) + \
+ ' </interface>' + \
+ '</interfaces>'
+ try:
+ results = yield self._parent.netconf_client.edit_config(config)
+ returnValue(results)
+
+ except Exception as e:
+ self.log.exception('Set Config', leaf=leaf, value=value, e=e)
+ raise
+
+
+class MockNniPort(NniPort):
+ """
+ A class similar to the 'Port' class in the VOLTHA but for a non-existent (virtual OLT)
+
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+
+ def __init__(self, parent, **kwargs):
+ super(MockNniPort, self).__init__(parent, **kwargs)
+
+ def __str__(self):
+ return "NniPort-mock-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+
+ @staticmethod
+ def get_nni_port_state_results():
+ from ncclient.operations.retrieve import GetReply
+ raw = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
+ xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0"
+ message-id="urn:uuid:59e71979-01bb-462f-b17a-b3a45e1889ac">
+ <data>
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface><name>hundred-gigabit-ethernet 0/1</name></interface>
+ <interface><name>hundred-gigabit-ethernet 0/2</name></interface>
+ <interface><name>hundred-gigabit-ethernet 0/3</name></interface>
+ <interface><name>hundred-gigabit-ethernet 0/4</name></interface>
+ </interfaces-state>
+ </data>
+ </rpc-reply>
+ """
+ return GetReply(raw)
+
+ @inlineCallbacks
+ def reset(self):
+ """
+ Set the NNI Port to a known good state on initial port startup. Actual
+ NNI 'Start' is done elsewhere
+ """
+ if self._state != NniPort.State.INITIAL:
+ self.log.error('Reset ignored, only valid during initial startup', state=self._state)
+ returnValue('Ignored')
+
+ self.log.info('Reset {}'.format(self._label))
+
+ # Always enable our NNI ports
+
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ returnValue('Enabled')
+
+ @inlineCallbacks
+ def set_config(self, leaf, value):
+
+ if leaf == 'enabled':
+ self._enabled = value
+ else:
+ raise NotImplemented("Leaf '{}' is not supported".format(leaf))
+
+ returnValue('Success')
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index 16ebf18..6cba0c8 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -24,6 +24,7 @@
_VSSN_TO_VENDOR = {
'adtn': 'adtran_onu',
+ 'adtr': 'adtran_onu',
'bcm?': 'broadcom_onu', # TODO: Get actual VSSN for this vendor
'dp??': 'dpoe_onu', # TODO: Get actual VSSN for this vendor
'pmc?': 'pmcs_onu', # TODO: Get actual VSSN for this vendor
@@ -75,6 +76,7 @@
'enable': enabled})
uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(pon_id)
name = 'onu-create-{}-{}-{}: {}'.format(pon_id, self.onu_id, self.serial_number, enabled)
+
return self.parent.parent.rest_client.request('POST', uri, data=data, name=name)
def set_config(self, leaf, value):
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index 1eacbaf..896b8aa 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -19,8 +19,9 @@
import os
import structlog
+from enum import Enum
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from adtran_olt_handler import AdtranOltHandler
from codec.olt_config import OltConfig
@@ -29,8 +30,6 @@
from voltha.protos.device_pb2 import Device
from voltha.protos.device_pb2 import Port
-log = structlog.get_logger()
-
class PonPort(object):
"""
@@ -42,46 +41,61 @@
MAX_ONUS_SUPPORTED = 256
DEFAULT_ENABLED = False
+ class State(Enum):
+ INITIAL = 0 # Created and initialization in progress
+ RUNNING = 1 # PON port contacted, ONU discovery active
+ STOPPED = 2 # Disabled
+ DELETING = 3 # Cleanup
+
def __init__(self, pon_index, port_no, parent, admin_state=AdminState.UNKNOWN, label=None):
- # TODO: Weed out those properties supported by common 'Port' object
+ # TODO: Weed out those properties supported by common 'Port' object (future)
assert admin_state != AdminState.UNKNOWN
- self.parent = parent
- self._pon_index = pon_index
+
+ self.log = structlog.get_logger(pon_id=pon_index)
+
+ self._parent = parent
+ self._pon_id = pon_index
self._port_no = port_no
- self.label = label or 'PON-{}'.format(pon_index)
- self.admin_state = admin_state
- self.oper_status = OperStatus.ACTIVE # TODO: Need to discover
- self.startup = None
- self.onu_discovery = None
- self.port = None
- self.no_onu_discover_tick = 5.0 # TODO: Decrease to 1 or 2 later
- self.discovery_tick = 20.0
- self.discovered_onus = [] # List of serial numbers
- self.onus = {} # serial_number -> ONU (allowed list)
- self.next_onu_id = Onu.MIN_ONU_ID
+ self._label = label or 'PON-{}'.format(pon_index)
+ self._port = None
+ self._no_onu_discover_tick = 5.0 # TODO: Decrease to 1 or 2 later
+ self._discovery_tick = 20.0
+ self._discovered_onus = [] # List of serial numbers
+ self._onus = {} # serial_number -> ONU (allowed list)
+ self._next_onu_id = Onu.MIN_ONU_ID
+
+ self._admin_state = admin_state
+ self._oper_status = OperStatus.UNKNOWN
+ self._deferred = None
+ self._state = PonPort.State.INITIAL
+
+ # Local cache of PON configuration
+
+ self._enabled = None
+ self._downstream_fec_enable = None
+ self._upstream_fec_enable = None
def __del__(self):
- # self.stop()
- pass
+ self.stop()
def __str__(self):
- return "PonPort-{}: Admin: {}, Oper: {}, parent: {}".format(self.label,
- self.admin_state,
- self.oper_status,
- self.parent)
+ return "PonPort-{}: Admin: {}, Oper: {}, parent: {}".format(self._label,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
def get_port(self):
"""
Get the VOLTHA PORT object for this port
:return: VOLTHA Port object
"""
- if self.port is None:
- self.port = Port(port_no=self.port_number,
- label=self.label,
- type=Port.PON_OLT,
- admin_state=self.admin_state,
- oper_status=self.oper_status)
- return self.port
+ if self._port is None:
+ self._port = Port(port_no=self._port_no,
+ label=self._label,
+ type=Port.PON_OLT,
+ admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ return self._port
@property
def port_number(self):
@@ -89,105 +103,183 @@
@property
def pon_id(self):
- return self._pon_index
+ return self._pon_id
+
+ @property
+ def olt(self):
+ return self._parent
+
+ @property
+ def state(self):
+ return self._state
+
+ @property
+ def adapter_agent(self):
+ return self.olt.adapter_agent
def get_logical_port(self):
"""
- Get the VOLTHA logical port for this port
+ Get the VOLTHA logical port for this port. For PON ports, a logical port
+ is not currently created, so always return None
+
:return: VOLTHA logical port or None if not supported
"""
return None
- @inlineCallbacks
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ if d is not None:
+ d.cancel()
+
+ def _update_adapter_agent(self):
+ # TODO: Currently the adapter_agent does not allow 'update' of port status
+ # self.adapter_agent.update_port(self.olt.device_id, self.get_port())
+ pass
+
def start(self):
"""
Start/enable this PON and start ONU discover
:return: (deferred)
"""
- log.info('Starting {}'.format(self.label))
+ if self._state == PonPort.State.RUNNING:
+ return succeed('Running')
+ self.log.info('Starting {}'.format(self._label))
+
+ self._cancel_deferred()
+ self._state = PonPort.State.INITIAL
+
+ # Do the rest of the startup in an async method
+ self._deferred = reactor.callLater(0, self._finish_startup)
+ return self._deferred
+
+ @inlineCallbacks
+ def _finish_startup(self):
"""
- Here is where I will start to bring up a PON port and discover an ONT
-
- Note: For some reason, you cannot chain the FEC enables with the pon enable below?
+ Do all startup offline since REST may fail
"""
- try:
- self.startup = self.set_pon_config("enabled", True)
- yield self.startup
+ if self._state != PonPort.State.INITIAL:
+ returnValue('Done')
- except Exception, e:
- log.exception("enabled failed: {}".format(str(e)))
- raise
+ if self._enabled is None or self._downstream_fec_enable is None or self._upstream_fec_enable is None:
+ try:
+ self._deferred = self.get_pon_config()
+ results = yield self._deferred
- try:
- self.startup = self.set_pon_config("downstream-fec-enable", True)
- yield self.startup
+ except Exception as e:
+ self.log.exception('Initial GET of config failed: {}'.format(e.message))
+ self._deferred = reactor.callLater(3, self._finish_startup)
+ returnValue(self._deferred)
- except Exception, e:
- log.exception("downstream FEC enable failed: {}".format(str(e)))
- raise
+ # Load cache
- try:
- self.startup = self.set_pon_config("upstream-fec-enable", True)
- results = yield self.startup
+ self._enabled = results.get('enabled', False)
+ self._downstream_fec_enable = results.get('downstream-fec-enable', False)
+ self._upstream_fec_enable = results.get('upstream-fec-enable', False)
- except Exception, e:
- log.exception("upstream FEC enable failed: {}".format(str(e)))
- raise
+ if not self._enabled:
+ try:
+ self._deferred = self.set_pon_config("enabled", True)
+ results = yield self._deferred
+ self._enabled = True
- log.debug('ONU Startup complete: results: {}'.
- format(pprint.PrettyPrinter().pformat(results)))
+ except Exception as e:
+ self.log.exception('enabled failed: {}'.format(str(e)))
+ self._deferred = reactor.callLater(3, self._finish_startup)
+ returnValue(self._deferred)
- if isinstance(results, dict) and results.get('enabled', False):
- self.admin_state = AdminState.ENABLED
- self.oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+ if not self._downstream_fec_enable:
+ try:
+ self._deferred = self.set_pon_config("downstream-fec-enable", True)
+ results = yield self._deferred
+ self._downstream_fec_enable = True
- # Begin to ONU discovery. Once a second if no ONUs found and once every 20
- # seconds after one or more ONUs found on the PON
- self.onu_discovery = reactor.callLater(3, self.discover_onus)
- returnValue(self.onu_discovery)
+ except Exception as e:
+ self.log.exception('downstream FEC enable failed: {}'.format(str(e)))
+ self._deferred = reactor.callLater(3, self._finish_startup)
+ returnValue(self._deferred)
+
+ if not self._upstream_fec_enable:
+ try:
+ self._deferred = self.set_pon_config("upstream-fec-enable", True)
+ results = yield self._deferred
+ self._upstream_fec_enable = True
+
+ except Exception as e:
+ self.log.exception('upstream FEC enable failed: {}'.format(str(e)))
+ self._deferred = reactor.callLater(3, self._finish_startup)
+ returnValue(self._deferred)
+
+ self.log.debug('ONU Startup complete: results: {}'.format(pprint.PrettyPrinter().pformat(results)))
+
+ if self._enabled:
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+ self._state = PonPort.State.RUNNING
+
+ # Begin to ONU discovery. Once a second if no ONUs found and once every 20
+ # seconds after one or more ONUs found on the PON
+ self._deferred = reactor.callLater(3, self.discover_onus)
+
+ self._update_adapter_agent()
+ returnValue('Enabled')
else:
# Startup failed. Could be due to object creation with an invalid initial admin_status
# state. May want to schedule a start to occur again if this happens
- self.admin_state = AdminState.DISABLED
- self.oper_status = OperStatus.UNKNOWN
- raise NotImplementedError('TODO: Support of PON startup failure not yet supported')
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.UNKNOWN
+ self._state = PonPort.State.STOPPED
- @inlineCallbacks
+ self._update_adapter_agent()
+ returnValue('Disabled')
+
def stop(self):
- log.info('Stopping {}'.format(self.label))
- d, self.startup = self.startup, None
- if d is not None:
- d.cancel()
+ if self._state == PonPort.State.STOPPED:
+ return succeed('Stopped')
- d, self.onu_discovery = self.onu_discovery, None
- if d is not None:
- d.cancel()
+ self.log.info('Stopping {}'.format(self._label))
- self.reset(False)
- self.admin_state = AdminState.DISABLED
- self.oper_status = OperStatus.UNKNOWN
- # TODO: How do we reflect this into VOLTHA?
+ self._cancel_deferred()
+ self._deferred = self.set_pon_config("enabled", False)
+
+ # Flush config cache
+ self._enabled = None
+ self._downstream_fec_enable = None
+ self._upstream_fec_enable = None
+
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.UNKNOWN
+ self._update_adapter_agent()
+
+ self._state = PonPort.State.STOPPED
+ return self._deferred
@inlineCallbacks
def reset(self):
- log.info('Reset {}'.format(self.label))
+ """
+ Set the PON Port to a known good state on initial port startup. Actual
+ PON 'Start' is done elsewhere
+ """
+ if self._state != PonPort.State.INITIAL:
+ self.log.error('Reset ignored, only valid during initial startup', state=self._state)
+ returnValue('Ignored')
- if self.admin_state != self.parent.initial_port_state:
+ self.log.info('Reset {}'.format(self._label))
+
+ if self._admin_state != self._parent.initial_port_state:
try:
- enable = self.parent.initial_port_state == AdminState.ENABLED
+ enable = self._parent.initial_port_state == AdminState.ENABLED
yield self.set_pon_config("enabled", enable)
# TODO: Move to 'set_pon_config' method and also make sure GRPC/Port is ok
- self.admin_state = AdminState.ENABLED if enable else AdminState.DISABLE
+ self._admin_state = AdminState.ENABLED if enable else AdminState.DISABLE
except Exception as e:
- log.exception('Reset of PON {} to initial state failed'.
- format(self.pon_id), e=e)
+ self.log.exception('Reset of PON to initial state failed', e=e)
raise
- if self.admin_state == AdminState.ENABLED and self.parent.initial_onu_state == AdminState.DISABLED:
+ if self._admin_state == AdminState.ENABLED and self._parent.initial_onu_state == AdminState.DISABLED:
try:
# Walk the provisioned ONU list and disable any exiting ONUs
results = yield self.get_onu_config()
@@ -199,57 +291,64 @@
yield self.delete_onu(onu_id)
except Exception as e:
- log.exception('Delete of ONU {} on PON {} failed'.
- format(onu_id, self.pon_id), e=e)
+ self.log.exception('Delete of ONU {} on PON failed'.format(onu_id), e=e)
pass # Non-fatal
except Exception as e:
- log.exception('Failed to get current ONU config for PON {}'.
- format(self.pon_id), e=e)
+ self.log.exception('Failed to get current ONU config', e=e)
raise
+ def delete(self):
+ """
+ Parent device is being deleted. Do not change any config but
+ stop all polling
+ """
+ self.log.info('Deleteing {}'.format(self._label))
+ self._state = PonPort.State.DELETING
+ self._cancel_deferred()
+
def get_pon_config(self):
- uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self.pon_id)
- name = 'pon-get-config-{}'.format(self.pon_id)
- return self.parent.rest_client.request('GET', uri, name=name)
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
+ name = 'pon-get-config-{}'.format(self._pon_id)
+ return self._parent.rest_client.request('GET', uri, name=name)
def get_onu_config(self, onu_id=None):
- uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(self.pon_id)
+ uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(self._pon_id)
if onu_id is not None:
uri += '={}'.format(onu_id)
- name = 'pon-get-onu_config-{}-{}'.format(self.pon_id, onu_id)
- return self.parent.rest_client.request('GET', uri, name=name)
+ name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, onu_id)
+ return self._parent.rest_client.request('GET', uri, name=name)
def set_pon_config(self, leaf, value):
data = json.dumps({leaf: value})
- uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self.pon_id)
- name = 'pon-set-config-{}-{}-{}'.format(self.pon_id, leaf, str(value))
- return self.parent.rest_client.request('PATCH', uri, data=data, name=name)
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
+ name = 'pon-set-config-{}-{}-{}'.format(self._pon_id, leaf, str(value))
+ return self._parent.rest_client.request('PATCH', uri, data=data, name=name)
def discover_onus(self):
- log.debug("Initiating discover of ONU/ONTs on PON {}".format(self.pon_id))
+ self.log.debug('Initiating discover of ONU/ONTs')
- if self.admin_state == AdminState.ENABLED:
- data = json.dumps({'pon-id': self.pon_id})
+ if self._admin_state == AdminState.ENABLED:
+ data = json.dumps({'pon-id': self._pon_id})
uri = AdtranOltHandler.GPON_PON_DISCOVER_ONU
- name = 'pon-discover-onu-{}'.format(self.pon_id)
- self.startup = self.parent.rest_client.request('POST', uri, data, name=name)
+ name = 'pon-discover-onu-{}'.format(self._pon_id)
- self.startup.addBoth(self.onu_discovery_init_complete)
+ self._deferred = self._parent.rest_client.request('POST', uri, data, name=name)
+ self._deferred.addBoth(self.onu_discovery_init_complete)
def onu_discovery_init_complete(self, _):
"""
This method is called after the REST POST to request ONU discovery is
completed. The results (body) of the post is always empty / 204 NO CONTENT
"""
- log.debug('PON {} ONU Discovery requested'.format(self.pon_id))
+ self.log.debug('ONU Discovery requested')
# Reschedule
- delay = self.no_onu_discover_tick if len(self.onus) == 0 else self.discovery_tick
+ delay = self._no_onu_discover_tick if len(self._onus) == 0 else self._discovery_tick
delay += random.uniform(-delay / 10, delay / 10)
- self.onu_discovery = reactor.callLater(delay, self.discover_onus)
+ self._deferred = reactor.callLater(delay, self.discover_onus)
def process_status_poll(self, status):
"""
@@ -257,10 +356,9 @@
:param status: (OltState.Pon object) results from RESTCONF GET
"""
- log.debug('process_status_poll: PON {}: {}{}'.format(self.pon_id,
- os.linesep,
- status))
- if self.admin_state != AdminState.ENABLED:
+ self.log.debug('process_status_poll: {}{}'.format(os.linesep, status))
+
+ if self._admin_state != AdminState.ENABLED:
return
# Process the ONU list in for this PON, may have previously provisioned ones there
@@ -269,8 +367,9 @@
new = self._process_status_onu_list(status.onus)
for onu_id in new:
+ import base64
# self.add_new_onu(serial_number, status)
- log.info('Found ONU {} in status list'.format(onu_id))
+ self.log.info('Found ONU {}/{} in status list'.format(onu_id, base64.decodestring(onu_id)))
raise NotImplementedError('TODO: Adding ONUs from existing ONU (status list) not supported')
# Get new/missing from the discovered ONU leaf
@@ -279,7 +378,7 @@
# TODO: Do something useful
if len(missing):
- log.info('Missing ONUs are: {}'.format(missing))
+ self.log.info('Missing ONUs are: {}'.format(missing))
for serial_number in new:
reactor.callLater(0, self.add_onu, serial_number, status)
@@ -296,9 +395,9 @@
:param onus: (dict) Set of known ONUs
"""
- log.debug('Processing ONU list: {}'.format(onus))
+ self.log.debug('Processing ONU list: {}'.format(onus))
- my_onu_ids = frozenset([o.onu_id for o in self.onus.itervalues()])
+ my_onu_ids = frozenset([o.onu_id for o in self._onus.itervalues()])
discovered_onus = frozenset(onus.keys())
new_onus_ids = discovered_onus - my_onu_ids
@@ -315,9 +414,9 @@
:param discovered_onus: (frozenset) Set of ONUs currently discovered
"""
- log.debug('Processing discovered ONU list: {}'.format(discovered_onus))
+ self.log.debug('Processing discovered ONU list: {}'.format(discovered_onus))
- my_onus = frozenset(self.onus.keys())
+ my_onus = frozenset(self._onus.keys())
new_onus = discovered_onus - my_onus
missing_onus = my_onus - discovered_onus
@@ -326,32 +425,30 @@
@inlineCallbacks
def add_onu(self, serial_number, status):
- log.info('Add ONU: {}'.format(serial_number))
+ self.log.info('Add ONU: {}'.format(serial_number))
if serial_number not in status.onus:
# Newly found and not enabled ONU, enable it now if not at max
- if len(self.onus) < self.MAX_ONUS_SUPPORTED:
+ if len(self._onus) < self.MAX_ONUS_SUPPORTED:
# TODO: For now, always allow any ONU
- if serial_number not in self.onus:
+ if serial_number not in self._onus:
onu = Onu(serial_number, self)
try:
yield onu.create(True)
self.on_new_onu_discovered(onu)
- self.onus[serial_number] = onu
+ self._onus[serial_number] = onu
except Exception as e:
- log.exception('Exception during add_onu, pon: {}, onu: {}'.
- format(self.pon_id, onu.onu_id), e=e)
+ self.log.exception('Exception during add_onu, onu: {}'.format(onu.onu_id), e=e)
else:
- log.info('TODO: Code this')
+ self.log.info('TODO: Code this')
else:
- log.warning('Maximum number of ONUs already provisioned on PON {}'.
- format(self.pon_id))
+ self.log.warning('Maximum number of ONUs already provisioned')
else:
# ONU has been enabled
pass
@@ -362,15 +459,15 @@
:param onu:
:return:
"""
- olt = self.parent
- adapter = olt.adapter_agent
+ olt = self.olt
+ adapter = self.adapter_agent
proxy = Device.ProxyAddress(device_id=olt.device_id,
- channel_id=self.port_number,
+ channel_id=self._port_no,
onu_id=onu.onu_id)
adapter.child_device_detected(parent_device_id=olt.device_id,
- parent_port_no=self.port_number,
+ parent_port_no=self._port_no,
child_device_type=onu.vendor_device,
proxy_address=proxy)
@@ -388,10 +485,10 @@
return onu_id
def delete_onu(self, onu_id):
- uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(self.pon_id)
+ uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(self._pon_id)
uri += '={}'.format(onu_id)
- name = 'pon-delete-onu-{}-{}'.format(self.pon_id, onu_id)
+ name = 'pon-delete-onu-{}-{}'.format(self._pon_id, onu_id)
# TODO: Need removal from VOLTHA child_device method
- return self.parent.rest_client.request('DELETE', uri, name=name)
+ return self._parent.rest_client.request('DELETE', uri, name=name)