Support for converting openflow into EVC flow rules for the Adtran 1U OLT Adapter. Updated log message to call proper method and use undocumented logging format standard
Change-Id: I581f98aba6431f0bfba705edb644d09bf39d7de4
diff --git a/voltha/adapters/adtran_olt/adapter_alarms.py b/voltha/adapters/adtran_olt/adapter_alarms.py
new file mode 100644
index 0000000..4202c1c
--- /dev/null
+++ b/voltha/adapters/adtran_olt/adapter_alarms.py
@@ -0,0 +1,88 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+
+# TODO: In the device adapter, the following alarms are still TBD
+# (Taken from microsemi, so mileage may vare
+# ON_ALARM_SOFTWARE_ERROR = 0
+# PON_ALARM_LOS = 1
+# PON_ALARM_LOSI = 2
+# PON_ALARM_DOWI = 3
+# PON_ALARM_LOFI = 4
+# PON_ALARM_RDII = 5
+# PON_ALARM_LOAMI = 6
+# PON_ALARM_LCDGI = 7
+# PON_ALARM_LOAI = 8
+# PON_ALARM_SDI = 9
+# PON_ALARM_SFI = 10
+# PON_ALARM_PEE = 11
+# PON_ALARM_DGI = 12
+# PON_ALARM_LOKI = 13
+# PON_ALARM_TIWI = 14
+# PON_ALARM_TIA = 15
+# PON_ALARM_VIRTUAL_SCOPE_ONU_LASER_ALWAYS_ON = 16
+# PON_ALARM_VIRTUAL_SCOPE_ONU_SIGNAL_DEGRADATION = 17
+# PON_ALARM_VIRTUAL_SCOPE_ONU_EOL = 18
+# PON_ALARM_VIRTUAL_SCOPE_ONU_EOL_DATABASE_IS_FULL = 19
+# PON_ALARM_AUTH_FAILED_IN_REGISTRATION_ID_MODE = 20
+# PON_ALARM_SUFI = 21
+
+
+class AdapterAlarms:
+ def __init__(self, adapter, device):
+ self.adapter = adapter
+ self.device_id = device.id
+ self.lc = None
+
+ def format_id(self, alarm):
+ return 'voltha.{}.{}.{}'.format(self.adapter.name,
+ self.device_id,
+ alarm),
+
+ def format_description(self, _object, alarm, status):
+ return '{} Alarm - {} - {}'.format(_object.upper(),
+ alarm.upper(),
+ 'Raised' if status else 'Cleared')
+
+ def send_alarm(self, context_data, alarm_data):
+ try:
+ current_context = {}
+
+ for key, value in context_data.__dict__.items():
+ current_context[key] = str(value)
+
+ alarm_event = self.adapter.adapter_agent.create_alarm(
+ id=alarm_data.get('id', 'voltha.{}.{}.olt'.format(self.adapter.name,
+ self.device_id)),
+ resource_id=self.device_id,
+ description="{}.{} - {}".format(self.adapter.name, self.device_id,
+ alarm_data.get('description')),
+ type=alarm_data.get('type'),
+ category=alarm_data.get('category'),
+ severity=alarm_data.get('severity'),
+ state=alarm_data.get('state'),
+ raised_ts=alarm_data.get('ts', 0),
+ context=current_context
+ )
+ self.adapter.adapter_agent.submit_alarm(self.device_id, alarm_event)
+
+ except Exception as e:
+ self.log.exception('failed-to-send-alarm', e=e)
+
+
+
+
+
diff --git a/voltha/adapters/adtran_olt/adapter_pm_metrics.py b/voltha/adapters/adtran_olt/adapter_pm_metrics.py
new file mode 100644
index 0000000..aef8be1
--- /dev/null
+++ b/voltha/adapters/adtran_olt/adapter_pm_metrics.py
@@ -0,0 +1,103 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from twisted.internet.task import LoopingCall
+# from voltha.protos import ponsim_pb2
+from voltha.protos.device_pb2 import PmConfig, PmConfigs
+from google.protobuf.empty_pb2 import Empty
+
+
+class AdapterPmMetrics:
+ def __init__(self, adapter, device):
+ self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+ 'tx_256_511_pkts', 'tx_512_1023_pkts',
+ 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+ 'rx_64_pkts', 'rx_65_127_pkts',
+ 'rx_128_255_pkts', 'rx_256_511_pkts',
+ 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+ 'rx_1519_9k_pkts'}
+ self.log = structlog.get_logger(device_id=device.id)
+ self.device = device
+ self.id = device.id
+ self.name = adapter.name
+ self.default_freq = 150
+ self.grouped = False
+ self.freq_override = False
+ self.pon_metrics_config = dict()
+ self.nni_metrics_config = dict()
+ self.lc = None
+
+ for m in self.pm_names:
+ self.pon_metrics_config[m] = PmConfig(name=m, type=PmConfig.COUNTER,
+ enabled=True)
+ self.nni_metrics_config[m] = PmConfig(name=m, type=PmConfig.COUNTER,
+ enabled=True)
+
+ def update(self, pm_config):
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
+
+ for m in pm_config.metrics:
+ self.pon_metrics_config[m.name].enabled = m.enabled
+ self.nni_metrics_config[m.name].enabled = m.enabled
+
+ def make_proto(self):
+ pm_config = PmConfigs(id=self.id, default_freq=self.default_freq,
+ grouped=False, freq_override=False)
+
+ for m in sorted(self.pon_metrics_config):
+ pm = self.pon_metrics_config[m] # Either will do they're the same
+ pm_config.metrics.extend([PmConfig(name=pm.name, type=pm.type,
+ enabled=pm.enabled)])
+ return pm_config
+
+ def collect_port_metrics(self, channel):
+ rtrn_port_metrics = dict()
+ # TODO: Implement
+ # stub = ponsim_pb2.PonSimStub(channel)
+ # stats = stub.GetStats(Empty())
+ # rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+ # rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
+ return rtrn_port_metrics
+
+ def extract_pon_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+
+ for m in stats.metrics:
+ if m.port_name == "pon":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def extract_nni_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "nni":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def start_collector(self, callback):
+ self.log.info("starting-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+ self.lc = LoopingCall(callback, self.device.id, prefix)
+ self.lc.start(interval=self.default_freq / 10)
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index e61eb8d..37ee403 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -24,6 +24,7 @@
import arrow
import structlog
+import json
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue
@@ -38,11 +39,18 @@
from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_switch_features, OFPC_PORT_STATS, \
OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
from voltha.registry import registry
-
+from adapter_alarms import AdapterAlarms
+from common.frameio.frameio import BpfProgramFilter, hexify
+from adapter_pm_metrics import AdapterPmMetrics
from common.utils.asleep import asleep
+from scapy.layers.l2 import Ether, Dot1Q
+from scapy.layers.inet import Raw
_ = third_party
+_PACKET_IN_VLAN = 4000
+_is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(_PACKET_IN_VLAN))
+
class AdtranDeviceHandler(object):
"""
@@ -86,6 +94,8 @@
self.io_port = None
self.logical_device_id = None
self.interface = registry('main').get_args().interface
+ self.pm_metrics = None
+ self.alarms = None
# Northbound and Southbound ports
self.northbound_ports = {} # port number -> Port
@@ -104,13 +114,13 @@
self.rest_port = None
self.rest_username = username
self.rest_password = password
- self.rest_client = None
+ self._rest_client = None
# NETCONF Client
self.netconf_port = None
self.netconf_username = username
self.netconf_password = password
- self.netconf_client = None
+ self._netconf_client = None
# Heartbeat support
self.heartbeat_count = 0
@@ -125,25 +135,27 @@
self.is_virtual_olt = False
# Installed flows
- self.flow_entries = {} # Flow ID/name -> FlowEntry
+ self.evcs = {} # Flow ID/name -> FlowEntry
# TODO Remove items below after one PON fully supported and working as expected
- self.max_ports = 1
+ self.max_nni_ports = 1
+ self.max_pon_ports = 1
def __del__(self):
# Kill any startup or heartbeat defers
d, self.startup = self.startup, None
+ h, self.heartbeat = self.heartbeat, None
+ ldi, self.logical_device_id = self.logical_device_id, None
+
if d is not None:
d.cancel()
- ldi, self.logical_device_id = self.logical_device_id, None
-
- h, self.heartbeat = self.heartbeat, None
-
if h is not None:
h.cancel()
+ self._deactivate_io_port()
+
# Remove the logical device
if ldi is not None:
@@ -156,6 +168,14 @@
def __str__(self):
return "AdtranDeviceHandler: {}".format(self.ip_address)
+ @property
+ def netconf_client(self):
+ return self._netconf_client
+
+ @property
+ def rest_client(self):
+ return self._rest_client
+
def parse_provisioning_options(self, device):
if not device.ipv4_address:
self.activate_failed(device, 'No ip_address field provided')
@@ -209,7 +229,7 @@
:param reconciling: If True, this adapter is taking over for a previous adapter
for an existing OLT
"""
- self.log.info('AdtranDeviceHandler.activating', device=device, reconciling=reconciling)
+ self.log.info('AdtranDeviceHandler.activating', reconciling=reconciling)
if self.logical_device_id is None:
# Parse our command line options for this device
@@ -238,24 +258,13 @@
############################################################################
# Start initial discovery of NETCONF support (if any)
-<<<<<<< HEAD
- device.model = 'TODO: Adtran PizzaBox, YUM'
- device.hardware_version = 'TODO: H/W Version'
- device.firmware_version = 'TODO: S/W Version'
- device.images.image.extend([
- Image(version="TODO: S/W Version")
- ])
- device.serial_number = 'TODO: Serial Number'
-=======
- if not self.is_virtual_olt:
- try:
- self.startup = self.make_netconf_connection()
- yield self.startup
->>>>>>> c577acb... netconf client support and disable-enable support
+ try:
+ self.startup = self.make_netconf_connection()
+ yield self.startup
- except Exception as e:
- self.log.exception('Initial NETCONF connection failed', e=e)
- self.activate_failed(device, e.message, reachable=False)
+ except Exception as e:
+ self.log.exception('Initial NETCONF connection failed', e=e)
+ self.activate_failed(device, e.message, reachable=False)
############################################################################
# Get the device Information
@@ -271,9 +280,22 @@
device.model = results.get('model', 'unknown')
device.hardware_version = results.get('hardware_version', 'unknown')
device.firmware_version = results.get('firmware_version', 'unknown')
- device.software_version = results.get('software_version', 'unknown')
device.serial_number = results.get('serial_number', 'unknown')
+ def get_software_images():
+ leafs = ['running-revision', 'candidate-revision', 'startup-revision']
+ image_names = list(set([results.get(img, 'unknown') for img in leafs]))
+
+ images = []
+ for name in image_names:
+ # TODO: Look into how to find out hash, is_valid, and install date/time
+ image = Image(name=name, version=name,
+ is_active=(name == results.get('running-revision', 'xxx')),
+ is_committed=(name == results.get('startup-revision', 'xxx')))
+ images.append(image)
+ return images
+
+ device.images.image.extend(get_software_images())
device.root = True
device.vendor = results.get('vendor', 'Adtran, Inc.')
device.connect_status = ConnectStatus.REACHABLE
@@ -299,7 +321,6 @@
except Exception as e:
self.log.exception('Northbound port enumeration and creation failed', e=e)
self.activate_failed(device, e.message)
- results = None
try:
# Enumerate and create southbound interfaces
@@ -318,62 +339,6 @@
self.log.exception('Southbound port enumeration and creation failed', e=e)
self.activate_failed(device, e.message)
-<<<<<<< HEAD
- # Complete activation by setting up logical device for this OLT and saving
- # off the devices parent_id
-
- # There could be multiple software version on the device,
- # active, standby etc. Choose the active or running software
- # below. See simulated_olt for example implementation
- version = device.images.image[0].version
-
- ld = LogicalDevice(
- # NOTE: not setting id and datapath_id will let the adapter agent pick id
- desc=ofp_desc(mfr_desc=device.vendor,
- hw_desc=device.hardware_version,
- sw_desc=version,
- serial_num=device.serial_number,
- dp_desc='n/a'),
- switch_features=ofp_switch_features(n_buffers=256, # TODO fake for now
- n_tables=2, # TODO ditto
- capabilities=(OFPC_FLOW_STATS |
- OFPC_TABLE_STATS |
- OFPC_PORT_STATS |
- OFPC_GROUP_STATS)), # TODO and ditto
- root_device_id=device.id)
-
- ld_initialized = self.adapter_agent.create_logical_device(ld)
-
- # Create logical ports for all southbound and northbound interfaces
-
- for port in self.northbound_ports.itervalues():
- lp = port.get_logical_port()
- if lp is not None:
- self.adapter_agent.add_logical_port(ld_initialized.id, lp)
-
- for port in self.southbound_ports.itervalues():
- lp = port.get_logical_port()
- if lp is not None:
- self.adapter_agent.add_logical_port(ld_initialized.id, lp)
-
- # Set the downlinks in a known good initial state
-
- try:
- for port in self.southbound_ports.itervalues():
- self.startup = port.reset()
- yield self.startup
-
- except Exception as e:
- self.log.exception('Failed to reset southbound ports to known good initial state', e=e)
- self.activate_failed(device, e.message)
-
- # Start/stop the interfaces as needed
-
- try:
- for port in self.northbound_ports.itervalues():
- self.startup = port.start()
- yield self.startup
-=======
if reconciling:
if device.admin_state == AdminState.ENABLED:
if device.parent_id:
@@ -381,25 +346,46 @@
self.adapter_agent.reconcile_logical_device(device.parent_id)
else:
self.log.info('no-logical-device-set')
->>>>>>> c577acb... netconf client support and disable-enable support
# Reconcile child devices
self.adapter_agent.reconcile_child_devices(device.id)
+ ld_initialized = self.adapter_agent.get_logical_device()
+ assert device.parent_id == ld_initialized.id
+
else:
# Complete activation by setting up logical device for this OLT and saving
# off the devices parent_id
- self.logical_device_id = self.create_logical_device(device)
+ ld_initialized = self.create_logical_device(device)
+ ############################################################################
+ # Setup PM configuration for this device
+
+ # self.pm_metrics = AdapterPmMetrics(device)
+ # pm_config = self.pm_metrics.make_proto()
+ # self.log.info("initial-pm-config", pm_config=pm_config)
+ # self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
+ ############################################################################
+ # Setup Alarm handler
+
+ self.alarms = AdapterAlarms(self.adapter, device)
+
+ ############################################################################
# Create logical ports for all southbound and northbound interfaces
+ try:
+ self.startup = self.create_logical_ports(device, ld_initialized, reconciling)
+ yield self.startup
- self.create_logical_ports(device, self.logical_device_id, reconciling)
+ except Exception as e:
+ self.log.exception('Logical port creation failed', e=e)
+ self.activate_failed(device, e.message)
# Complete device specific steps
try:
+ self.log.debug('Performing final device specific activation procedures')
self.startup = self.complete_device_specific_activation(device, reconciling)
- if self.startup is not None:
- yield self.startup
+ yield self.startup
except Exception as e:
self.log.exception('Device specific activation failed', e=e)
@@ -407,13 +393,23 @@
# Schedule the heartbeat for the device
- self.start_heartbeat(delay=10)
+ self.log.debug('Starting heartbeat')
+ self.start_heartbeat(delay=5)
device = self.adapter_agent.get_device(device.id)
- device.parent_id = self.logical_device_id
+ device.parent_id = ld_initialized.id
device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
self.adapter_agent.update_device(device)
+ # finally, open the frameio port to receive in-band packet_in messages
+ self._activate_io_port()
+
+ # Start collecting stats from the device after a brief pause
+ reactor.callLater(5, self.start_kpi_collection, device.id)
+
+ self.log.info('Activated')
+
def activate_failed(self, device, reason, reachable=True):
"""
Activation process (adopt_device) has failed.
@@ -433,47 +429,87 @@
self.adapter_agent.update_device(device)
raise RuntimeError('Failed to activate OLT: {}'.format(device.reason))
+ @inlineCallbacks
def make_netconf_connection(self, connect_timeout=None):
############################################################################
# Start initial discovery of NETCONF support
- if self.netconf_client is None:
- self.netconf_client = AdtranNetconfClient(self.ip_address,
- self.netconf_port,
- self.netconf_username,
- self.netconf_password,
- self.timeout)
- if self.netconf_client.connected:
- return defer.returnValue(True)
+ client = self._netconf_client
+
+ if client is None:
+ if not self.is_virtual_olt:
+ client = AdtranNetconfClient(self.ip_address,
+ self.netconf_port,
+ self.netconf_username,
+ self.netconf_password,
+ self.timeout)
+ else:
+ from voltha.adapters.adtran_olt.net.mock_netconf_client import MockNetconfClient
+ client = MockNetconfClient(self.ip_address,
+ self.netconf_port,
+ self.netconf_username,
+ self.netconf_password,
+ self.timeout)
+ if client.connected:
+ self._netconf_client = client
+ returnValue(True)
timeout = connect_timeout or self.timeout
- return self.netconf_client.connect(timeout)
+ try:
+ request = client.connect(timeout)
+ results = yield request
+ self._netconf_client = client
+ returnValue(results)
+
+ except Exception as e:
+ self.log.exception('Failed to create NETCONF Client', e=e)
+ self._netconf_client = None
+ raise
+
+ @inlineCallbacks
def make_restconf_connection(self, get_timeout=None):
- if self.rest_client is None:
- self.rest_client = AdtranRestClient(self.ip_address,
- self.rest_port,
- self.rest_username,
- self.rest_password,
- self.timeout)
+ client = self._rest_client
+
+ if client is None:
+ client = AdtranRestClient(self.ip_address,
+ self.rest_port,
+ self.rest_username,
+ self.rest_password,
+ self.timeout)
timeout = get_timeout or self.timeout
- return self.rest_client.request('GET', self.HELLO_URI, name='hello', timeout=timeout)
+
+ try:
+ request = client.request('GET', self.HELLO_URI, name='hello', timeout=timeout)
+ results = yield request
+ if isinstance(results, dict) and 'module-info' in results:
+ self._rest_client = client
+ returnValue(results)
+ else:
+ from twisted.internet.error import ConnectError
+ self._rest_client = None
+ raise ConnectError(string='Results received but unexpected data type or contents')
+ except Exception:
+ self._rest_client = None
+ raise
def create_logical_device(self, device):
+ version = device.images.image[0].version
+
ld = LogicalDevice(
# NOTE: not setting id and datapath_id will let the adapter agent pick id
desc=ofp_desc(mfr_desc=device.vendor,
hw_desc=device.hardware_version,
- sw_desc=device.software_version,
+ sw_desc=version,
serial_num=device.serial_number,
dp_desc='n/a'),
switch_features=ofp_switch_features(n_buffers=256, # TODO fake for now
n_tables=2, # TODO ditto
capabilities=(
- # OFPC_FLOW_STATS | # TODO: Enable if we support it
- # OFPC_TABLE_STATS | # TODO: Enable if we support it
- # OFPC_GROUP_STATS | # TODO: Enable if we support it
+ OFPC_FLOW_STATS |
+ OFPC_TABLE_STATS |
+ OFPC_GROUP_STATS |
OFPC_PORT_STATS)),
root_device_id=device.id)
@@ -483,6 +519,7 @@
@inlineCallbacks
def create_logical_ports(self, device, ld_initialized, reconciling):
+ results = defer.fail()
if not reconciling:
for port in self.northbound_ports.itervalues():
@@ -498,49 +535,33 @@
# Set the ports in a known good initial state
try:
for port in self.northbound_ports.itervalues():
- self.startup = port.reset()
+ self.startup = yield port.reset()
results = yield self.startup
- self.log.debug('Northbound Port reset results', results=results)
- except Exception as e:
- self.log.exception('Failed to reset northbound ports to known good initial state', e=e)
- self.activate_failed(device, e.message)
-
- try:
for port in self.southbound_ports.itervalues():
- self.startup = port.reset()
+ self.startup = yield port.reset()
results = yield self.startup
- self.log.debug('Southbound Port reset results', results=results)
except Exception as e:
- self.log.exception('Failed to reset southbound ports to known good initial state', e=e)
- self.activate_failed(device, e.message)
+ self.log.exception('Failed to reset ports to known good initial state', e=e)
+ self.activate_failed(device, e.message)
# Start/stop the interfaces as needed
- try:
- for port in self.northbound_ports.itervalues():
- self.startup = port.start()
- results = yield self.startup
- self.log.debug('Northbound Port start results', results=results)
- except Exception as e:
- self.log.exception('Failed to start northbound port(s)', e=e)
- self.activate_failed(device, e.message)
+ for port in self.northbound_ports.itervalues():
+ self.startup = port.start()
+ results = yield self.startup
- try:
- if reconciling:
- start_downlinks = device.admin_state == AdminState.ENABLED
- else:
- start_downlinks = self.initial_port_state == AdminState.ENABLED
+ if reconciling:
+ start_downlinks = device.admin_state == AdminState.ENABLED
+ else:
+ start_downlinks = self.initial_port_state == AdminState.ENABLED
- for port in self.southbound_ports.itervalues():
- self.startup = port.start() if start_downlinks else port.stop()
- results = yield self.startup
- self.log.debug('Southbound Port start results', results=results)
+ for port in self.southbound_ports.itervalues():
+ self.startup = port.start() if start_downlinks else port.stop()
+ results = yield self.startup
- except Exception as e:
- self.log.exception('Failed to start southbound port(s)', e=e)
- self.activate_failed(device, e.message)
+ returnValue(results)
@inlineCallbacks
def device_information(self, device):
@@ -618,9 +639,26 @@
"""
yield defer.Deferred(lambda c: c.callback("Not Required"))
+ # TODO: Move some of the items below from here and the EVC to a utility class
+
+ def is_nni_port(self, port):
+ return port in self.northbound_ports
+
+ def is_uni_port(self, port):
+ raise NotImplementedError('implement in derived class')
+
+ def is_pon_port(self, port):
+ raise NotImplementedError('implement in derived class')
+
+ def is_logical_port(self, port):
+ return not self.is_nni_port(port) and not self.is_uni_port(port) and not self.is_pon_port(port)
+
+ def get_port_name(self, port):
+ raise NotImplementedError('implement in derived class')
+
@inlineCallbacks
def complete_device_specific_activation(self, _device, _reconciling):
- return None
+ return defer.succeed('NOP')
def deactivate(self, device):
# Clear off logical device ID
@@ -644,6 +682,9 @@
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
+ # Deactivate in-band packets
+ self._deactivate_io_port()
+
# Suspend any active healthchecks / pings
h, self.heartbeat = self.heartbeat, None
@@ -671,20 +712,24 @@
# Remove the peer references from this device
self.adapter_agent.delete_all_peer_references(self.device_id)
+ # Disable all flows TODO: Do we want to delete them?
+ # TODO: Create a bulk disable-all by device-id
+
+ for evc in self.evcs.itervalues():
+ evc.disable()
+
# Set all ports to disabled
self.adapter_agent.disable_all_ports(self.device_id)
+ dl = []
for port in self.northbound_ports.itervalues():
- port.stop()
+ dl.append(port.stop())
for port in self.southbound_ports.itervalues():
- port.stop()
+ dl.append(port.stop())
- # Disable all flows TODO: Do we want to delete them?
- # TODO: Use bulk methods if possible
-
- for flow in self.flow_entries.itervalues():
- flow.disable()
+ self.startup = defer.gatherResults(dl)
+ results = yield self.startup
# Shutdown communications with OLT
@@ -695,12 +740,13 @@
self.log.exception('NETCONF client shutdown failed', e=e)
def _null_clients():
- self.netconf_client = None
- self.rest_client = None
+ self._netconf_client = None
+ self._rest_client = None
reactor.callLater(0, _null_clients)
self.log.info('disabled', device_id=device.id)
+ returnValue(results)
@inlineCallbacks
def reenable(self):
@@ -726,13 +772,12 @@
self.log.exception('RESTCONF adtran-hello reconnect failed', e=e)
# TODO: What is best way to handle reenable failure?
- if not self.is_virtual_olt:
- try:
- yield self.make_netconf_connection()
+ try:
+ yield self.make_netconf_connection()
- except Exception as e:
- self.log.exception('NETCONF re-connection failed', e=e)
- # TODO: What is best way to handle reenable failure?
+ except Exception as e:
+ self.log.exception('NETCONF re-connection failed', e=e)
+ # TODO: What is best way to handle reenable failure?
# Recreate the logical device
@@ -745,29 +790,37 @@
device = self.adapter_agent.get_device(device.id)
device.parent_id = ld_initialized.id
device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
self.adapter_agent.update_device(device)
self.logical_device_id = ld_initialized.id
# Reenable all child devices
self.adapter_agent.update_child_devices_state(device.id,
admin_state=AdminState.ENABLED)
+ dl = []
for port in self.northbound_ports.itervalues():
- port.start()
+ dl.append(port.start())
for port in self.southbound_ports.itervalues():
- port.start()
+ dl.append(port.start())
+
+ self.startup = defer.gatherResults(dl)
+ results = yield self.startup
# TODO:
# 1) Restart health check / pings
-
# Enable all flows
- # TODO: Use bulk methods if possible
+ # TODO: Create a bulk enable-all by device-id
- for flow in self.flow_entries:
- flow.enable()
+ for evc in self.evcs:
+ evc.enable()
+
+ # Activate in-band packets
+ self._activate_io_port()
self.log.info('re-enabled', device_id=device.id)
+ returnValue(results)
@inlineCallbacks
def reboot(self):
@@ -810,64 +863,66 @@
except Exception as e:
self.log.exception('NETCONF client shutdown', e=e)
- def _null_clients():
- self.netconf_client = None
- self.rest_client = None
+ # Clear off clients
- yield reactor.callLater(0, _null_clients)
+ self._netconf_client = None
+ self._rest_client = None
# Run remainder of reboot process as a new task. The OLT then may be up in a
# few moments or may take 3 minutes or more depending on any self tests enabled
- current_time = time.time();
+ current_time = time.time()
timeout = current_time + self.restart_failure_timeout
- self.log('*** Current time is {}, timeout is {}'.format(current_time, timeout))
+ try:
+ yield reactor.callLater(10, self._finish_reboot, timeout,
+ previous_oper_status, previous_conn_status)
+ except Exception as e:
+ self.log.exception('finish reboot scheduling', e=e)
- yield reactor.callLater(10, self._finish_reboot, timeout,
- previous_oper_status, previous_conn_status)
+ returnValue('Waiting for reboot')
@inlineCallbacks
def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
# Now wait until REST & NETCONF are re-established or we timeout
- if self.netconf_client is None and not self.is_virtual_olt:
- self.log.debug('Attempting to restore NETCONF connection')
+ self.log.info('Resuming OLT activity after reboot requested',
+ remaining=timeout - time.time(), timeout=timeout, current=time.time())
+
+ if self.rest_client is None:
try:
- response = yield self.make_netconf_connection(connect_timeout=3)
- self.log.debug('Restart NETCONF connection XML was: {}'.format(response.xml))
+ response = yield self.make_restconf_connection(get_timeout=10)
+ self.log.debug('Restart RESTCONF connection JSON was: {}'.format(response))
+
+ except Exception:
+ self.log.debug('No RESTCONF connection yet')
+ self._rest_client = None
+
+ if self.netconf_client is None:
+ try:
+ yield self.make_netconf_connection(connect_timeout=10)
+ self.log.debug('Restart NETCONF connection succeeded')
except Exception as e:
self.log.debug('No NETCONF connection yet: {}'.format(e.message))
try:
- yield self.netconf_client.close()
+ if self.netconf_client is not None:
+ yield self.netconf_client.close()
except Exception as e:
self.log.exception(e.message)
finally:
- def _null_netconf():
- self.log.debug('Nulling out the NETCONF client')
- self.netconf_client = None
- reactor.callLater(0, _null_netconf)
-
- elif self.rest_client is None:
- self.log.debug('Attempting to restore RESTCONF connection')
- try:
- response = yield self.make_restconf_connection(get_timeout=3)
- self.log.debug('Restart RESTCONF connection XML was: {}'.format(response.xml))
-
- except Exception:
- self.log.debug('No RESTCONF connection yet')
- self.rest_client = None
+ self._netconf_client = None
if (self.netconf_client is None and not self.is_virtual_olt) or self.rest_client is None:
- current_time = time.time();
-
- self.log('Current time is {}, timeout is {}'.format(current_time, timeout))
-
+ current_time = time.time()
if current_time < timeout:
- self.log.info('Device not responding yet, will try again...')
- yield reactor.callLater(10, self._finish_reboot, timeout,
- previous_oper_status, previous_conn_status)
+ try:
+ yield reactor.callLater(5, self._finish_reboot, timeout,
+ previous_oper_status, previous_conn_status)
+ except Exception:
+ self.log.debug('Rebooted check rescheduling')
+
+ returnValue('Waiting some more...')
if self.netconf_client is None and not self.is_virtual_olt:
self.log.error('Could not restore NETCONF communications after device RESET')
@@ -877,8 +932,7 @@
self.log.error('Could not restore RESTCONF communications after device RESET')
pass # TODO: What is best course of action if cannot get clients back?
- # Pause additional 5 seconds to let things OLT microservices complete some more initialization
-
+ # Pause additional 5 seconds to let allow OLT microservices to complete some more initialization
yield asleep(5)
# Get the latest device reference
@@ -892,23 +946,8 @@
self.adapter_agent.update_child_devices_state(self.device_id,
connect_status=ConnectStatus.REACHABLE)
- # Connect back up to OLT so heartbeats/polls start working again
- try:
- yield self.make_restconf_connection()
-
- except Exception as e:
- self.log.exception('RESTCONF adtran-hello connect after reboot failed', e=e)
- # TODO: What is best way to handle reenable failure?
-
- if not self.is_virtual_olt:
- try:
- yield self.make_netconf_connection()
-
- except Exception as e:
- self.log.exception('NETCONF re-connection after reboot failed', e=e)
- # TODO: What is best way to handle reenable failure?
-
self.log.info('rebooted', device_id=self.device_id)
+ returnValue('Rebooted')
@inlineCallbacks
def delete(self):
@@ -928,10 +967,13 @@
if h is not None:
h.cancel()
- # TODO:
- # 1) Remove all flows from the device
+ # Remove all flows from the device
+ # TODO: Create a bulk remove-all by device-id
- self.flow_entries.clear()
+ for evc in self.evcs.itervalues():
+ evc.remove()
+
+ self.evcs.clear()
# Remove all child devices
self.adapter_agent.delete_all_child_devices(self.device_id)
@@ -939,6 +981,7 @@
# Remove the logical device
logical_device = self.adapter_agent.get_logical_device(self.logical_device_id)
self.adapter_agent.delete_logical_device(logical_device)
+ # TODO: For some reason, the logical device does not seem to get deleted
# Remove the peer references from this device
self.adapter_agent.delete_all_peer_references(self.device_id)
@@ -962,12 +1005,97 @@
except Exception as e:
self.log.exception('NETCONF client shutdown', e=e)
- self.netconf_client = None
+ self._netconf_client = None
- self.rest_client = None
+ self._rest_client = None
self.log.info('deleted', device_id=self.device_id)
+ def _activate_io_port(self):
+ if self.io_port is None:
+ self.log.info('registering-frameio')
+ self.io_port = registry('frameio').open_port(
+ self.interface, self._rcv_io, _is_inband_frame)
+
+ def _deactivate_io_port(self):
+ io, self.io_port = self.io_port, None
+
+ if io is not None:
+ registry('frameio').close_port(io)
+
+ def _rcv_io(self, port, frame):
+ self.log.info('received', iface_name=port.iface_name, frame_len=len(frame))
+
+ pkt = Ether(frame)
+ if pkt.haslayer(Dot1Q):
+ outer_shim = pkt.getlayer(Dot1Q)
+
+ if isinstance(outer_shim.payload, Dot1Q):
+ inner_shim = outer_shim.payload
+ cvid = inner_shim.vlan
+ logical_port = cvid
+ popped_frame = (Ether(src=pkt.src, dst=pkt.dst, type=inner_shim.type) /
+ inner_shim.payload)
+ kw = dict(
+ logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port,
+ )
+ self.log.info('sending-packet-in', **kw)
+ self.adapter_agent.send_packet_in(
+ packet=str(popped_frame), **kw)
+
+ elif pkt.haslayer(Raw):
+ raw_data = json.loads(pkt.getlayer(Raw).load)
+ self.alarms.send_alarm(self, raw_data)
+
+ def packet_out(self, egress_port, msg):
+ if self.io_port is not None:
+ self.log.info('sending-packet-out', egress_port=egress_port,
+ msg=hexify(msg))
+ pkt = Ether(msg)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=4000) /
+ Dot1Q(vlan=egress_port, type=pkt.type) /
+ pkt.payload
+ )
+ self.io_port.send(str(out_pkt))
+
+ def update_pm_config(self, device, pm_config):
+ # TODO: This has not been tested
+ self.log.info('update_pm_config', pm_config=pm_config)
+ self.pm_metrics.update(pm_config)
+
+ def start_kpi_collection(self, device_id):
+ # TODO: This has not been tested
+ def _collect(device_id, prefix):
+ from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
+
+ try:
+ # Step 1: gather metrics from device
+ port_metrics = self.pm_metrics.collect_port_metrics(self.get_channel())
+
+ # Step 2: prepare the KpiEvent for submission
+ # we can time-stamp them here (or could use time derived from OLT
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT NNI port
+ prefix + '.nni': MetricValuePairs(metrics=port_metrics['nni']),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(metrics=port_metrics['pon'])
+ }
+ )
+ # Step 3: submit
+ self.adapter_agent.submit_kpis(kpi_event)
+
+ except Exception as e:
+ self.log.exception('failed-to-submit-kpis', e=e)
+
+ # self.pm_metrics.start_collector(_collect)
+
@inlineCallbacks
def get_device_info(self, device):
"""
@@ -982,18 +1110,13 @@
the device type specification returned by device_types().
"""
device = {}
- # device['model'] = 'TODO: Adtran PizzaBox, YUM'
- # device['hardware_version'] = 'TODO: H/W Version'
- # device['firmware_version'] = 'TODO: S/W Version'
- # device['software_version'] = 'TODO: S/W Version'
- # device['serial_number'] = 'TODO: Serial Number'
- # device['vendor'] = 'Adtran, Inc.'
-
returnValue(device)
def start_heartbeat(self, delay=10):
assert delay > 1
+ self.log.info('*** Starting Device Heartbeat ***')
self.heartbeat = reactor.callLater(delay, self.check_pulse)
+ return self.heartbeat
def check_pulse(self):
if self.logical_device_id is not None:
@@ -1013,7 +1136,7 @@
device.reason = self.heartbeat_last_reason
self.adapter_agent.update_device(device)
- self.heartbeat_alarm(self.device_id, False, self.heartbeat_miss)
+ self.heartbeat_alarm(False, self.heartbeat_miss)
else:
assert results
# Update device states
@@ -1026,7 +1149,7 @@
device.reason = ''
self.adapter_agent.update_device(device)
- self.heartbeat_alarm(self.device_id, True)
+ self.heartbeat_alarm(True)
self.heartbeat_miss = 0
self.heartbeat_last_reason = ''
@@ -1042,27 +1165,19 @@
count=self.heartbeat_count, miss=self.heartbeat_miss)
self.heartbeat_check_status(None)
- def heartbeat_alarm(self, device_id, status, heartbeat_misses=0):
- try:
- ts = arrow.utcnow().timestamp
- alarm_data = {'heartbeats_missed': str(heartbeat_misses)}
-
- alarm_event = self.adapter_agent.create_alarm(
- id='voltha.{}.{}.olt'.format(self.adapter.name, device_id),
- resource_id='olt',
- type=AlarmEventType.EQUIPMENT,
- category=AlarmEventCategory.PON,
- severity=AlarmEventSeverity.CRITICAL,
- state=AlarmEventState.RAISED if status else AlarmEventState.CLEARED,
- description='OLT Alarm - Heartbeat - {}'.format('Raised'
- if status
- else 'Cleared'),
- context=alarm_data,
- raised_ts=ts)
- self.adapter_agent.submit_alarm(device_id, alarm_event)
-
- except Exception as e:
- self.log.exception('failed-to-submit-alarm', e=e)
+ def heartbeat_alarm(self, status, heartbeat_misses=0):
+ alarm = 'Heartbeat'
+ alarm_data = {
+ 'ts': arrow.utcnow().timestamp,
+ 'description': self.alarms.format_description('olt', alarm, status),
+ 'id': self.alarms.format_id(alarm),
+ 'type': AlarmEventType.EQUIPMENT,
+ 'category': AlarmEventCategory.PON,
+ 'severity': AlarmEventSeverity.CRITICAL,
+ 'state': AlarmEventState.RAISED if status else AlarmEventState.CLEARED
+ }
+ context_data = {'heartbeats_missed': heartbeat_misses}
+ self.alarms.send_alarm(context_data, alarm_data)
@staticmethod
def parse_module_revision(revision):
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index ae66953..96989ed 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -213,8 +213,11 @@
:param device: A Voltha.Device object.
:return: Will return result of self test
"""
+ from voltha.protos.voltha_pb2 import SelfTestResponse
log.info('self-test-device', device=device.id)
- raise NotImplementedError()
+
+ # TODO: Support self test?
+ return SelfTestResponse(result=SelfTestResponse.NOT_SUPPORTED)
def delete_device(self, device):
"""
@@ -277,7 +280,8 @@
:param pm_configs: A Pms
"""
log.debug('update_pm_config', device=device, pm_configs=pm_configs)
- raise NotImplementedError()
+ handler = self.devices_handlers[device.id]
+ handler.update_pm_config(device, pm_configs)
def send_proxied_message(self, proxy_address, msg):
"""
@@ -323,7 +327,18 @@
"""
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
- raise NotImplementedError()
+
+ def ldi_to_di(ldi):
+ di = self.logical_device_id_to_root_device_id.get(ldi)
+ if di is None:
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ di = logical_device.root_device_id
+ self.logical_device_id_to_root_device_id[ldi] = di
+ return di
+
+ device_id = ldi_to_di(logical_device_id)
+ handler = self.devices_handlers[device_id]
+ handler.packet_out(egress_port_no, msg)
def receive_inter_adapter_message(self, msg):
"""
@@ -339,21 +354,50 @@
raise NotImplementedError()
def suppress_alarm(self, filter):
+ """
+ Inform an adapter that all incoming alarms should be suppressed
+ :param filter: A Voltha.AlarmFilter object.
+ :return: (Deferred) Shall be fired to acknowledge the suppression.
+ """
log.info('suppress_alarm', filter=filter)
raise NotImplementedError()
def unsuppress_alarm(self, filter):
+ """
+ Inform an adapter that all incoming alarms should resume
+ :param filter: A Voltha.AlarmFilter object.
+ :return: (Deferred) Shall be fired to acknowledge the unsuppression.
+ """
log.info('unsuppress_alarm', filter=filter)
raise NotImplementedError()
+ # PON Mgnt APIs #
def create_interface(self, device, data):
+ """
+ API to create various interfaces (only some PON interfaces as of now)
+ in the devices
+ """
raise NotImplementedError()
def update_interface(self, device, data):
+ """
+ API to update various interfaces (only some PON interfaces as of now)
+ in the devices
+ """
raise NotImplementedError()
def remove_interface(self, device, data):
+ """
+ API to delete various interfaces (only some PON interfaces as of now)
+ in the devices
+ """
raise NotImplementedError()
def receive_onu_detect_state(self, device_id, state):
+ """
+ Receive onu detect state in ONU adapter
+ :param proxy_address: ONU device address
+ :param state: ONU detect state (bool)
+ :return: None
+ """
raise NotImplementedError()
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index 03046b4..b0e6874 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -18,14 +18,14 @@
import random
from twisted.internet import reactor
-from twisted.internet.defer import returnValue, inlineCallbacks
+from twisted.internet.defer import returnValue, inlineCallbacks, succeed
from adtran_device_handler import AdtranDeviceHandler
from codec.olt_state import OltState
from flow.flow_entry import FlowEntry
from net.adtran_zmq import AdtranZmqClient
from voltha.extensions.omci.omci import *
-from voltha.protos.common_pb2 import AdminState
+from voltha.protos.common_pb2 import AdminState, OperStatus
from voltha.protos.device_pb2 import Device
@@ -61,7 +61,6 @@
self.initial_onu_state = AdminState.DISABLED
self.zmq_client = None
- self.nc_client = None
def __del__(self):
# OLT Specific things here.
@@ -108,20 +107,23 @@
self.startup = pe_state.get_state()
results = yield self.startup
- modules = pe_state.get_physical_entities('adtn-phys-mod:module')
- if isinstance(modules, list):
- module = modules[0]
- name = str(module['model-name']).translate(None, '?')
- model = str(module['model-number']).translate(None, '?')
+ if results.ok:
+ modules = pe_state.get_physical_entities('adtn-phys-mod:module')
+ if isinstance(modules, list):
+ module = modules[0]
+ name = str(module['model-name']).translate(None, '?')
+ model = str(module['model-number']).translate(None, '?')
- device['model'] = '{} - {}'.format(name, model) if len(name) > 0 else \
- module['parent-entity']
- device['hardware_version'] = str(module['hardware-revision']).translate(None, '?')
- device['serial_number'] = str(module['serial-number']).translate(None, '?')
- device['vendor'] = 'Adtran, Inc.'
- software = module['software']['software']
- device['firmware_version'] = str(software['startup-revision']).translate(None, '?')
- device['software_version'] = str(software['running-revision']).translate(None, '?')
+ device['model'] = '{} - {}'.format(name, model) if len(name) > 0 else \
+ module['parent-entity']
+ device['hardware_version'] = str(module['hardware-revision']).translate(None, '?')
+ device['serial_number'] = str(module['serial-number']).translate(None, '?')
+ device['vendor'] = 'Adtran, Inc.'
+ device['firmware_version'] = str(device.get('firmware-revision', 'unknown')).translate(None, '?')
+ software = module['software']['software']
+ device['running-revision'] = str(software['running-revision']).translate(None, '?')
+ device['candidate-revision'] = str(software['candidate-revision']).translate(None, '?')
+ device['startup-revision'] = str(software['startup-revision']).translate(None, '?')
returnValue(device)
@@ -138,15 +140,15 @@
from codec.ietf_interfaces import IetfInterfacesState
from nni_port import MockNniPort
+ ietf_interfaces = IetfInterfacesState(self.netconf_client)
+
if self.is_virtual_olt:
results = MockNniPort.get_nni_port_state_results()
else:
- ietf_interfaces = IetfInterfacesState(self.netconf_client)
self.startup = ietf_interfaces.get_state()
results = yield self.startup
ports = ietf_interfaces.get_nni_port_entries(results)
-
yield returnValue(ports)
except Exception as e:
@@ -173,6 +175,10 @@
self.northbound_ports[port_no] = NniPort(self, **port) if not self.is_virtual_olt \
else MockNniPort(self, **port)
+ # TODO: For now, limit number of NNI ports to make debugging easier
+ if len(self.northbound_ports) >= self.max_nni_ports:
+ break
+
self.num_northbound_ports = len(self.northbound_ports)
@inlineCallbacks
@@ -219,7 +225,7 @@
admin_state=admin_state)
# TODO: For now, limit number of PON ports to make debugging easier
- if len(self.southbound_ports) >= self.max_ports:
+ if len(self.southbound_ports) >= self.max_pon_ports:
break
self.num_southbound_ports = len(self.southbound_ports)
@@ -247,8 +253,8 @@
# o TODO Update some PON level statistics
self.zmq_client = AdtranZmqClient(self.ip_address, self.rx_packet)
- self.status_poll = reactor.callLater(1, self.poll_for_status)
- return None
+ self.status_poll = reactor.callLater(5, self.poll_for_status)
+ return succeed('Done')
def disable(self):
c, self.zmq_client = self.zmq_client, None
@@ -303,7 +309,7 @@
if is_omci:
proxy_address = Device.ProxyAddress(device_id=self.device_id,
- channel_id=self._get_channel_id(pon_id, onu_id),
+ channel_id=self.get_channel_id(pon_id, onu_id),
onu_id=onu_id)
self.adapter_agent.receive_proxied_message(proxy_address, msg)
@@ -320,23 +326,24 @@
device = self.adapter_agent.get_device(self.device_id)
- if device.admin_state == AdminState.ENABLED:
+ if device.admin_state == AdminState.ENABLED and\
+ device.oper_status != OperStatus.ACTIVATING and\
+ self.rest_client is not None:
uri = AdtranOltHandler.GPON_OLT_HW_STATE_URI
name = 'pon-status-poll'
self.startup = self.rest_client.request('GET', uri, name=name)
self.startup.addBoth(self.status_poll_complete)
+ else:
+ self.startup = reactor.callLater(0, self.status_poll_complete, 'inactive')
def status_poll_complete(self, results):
"""
Results of the status poll
-
- :param results:
+ :param results:
"""
- self.log.debug('Status poll results: {}'.
- format(pprint.PrettyPrinter().pformat(results)))
-
if isinstance(results, dict) and 'pon' in results:
try:
+ self.log.debug('Status poll success')
for pon_id, pon in OltState(results).pons.iteritems():
if pon_id in self.southbound_ports:
self.southbound_ports[pon_id].process_status_poll(pon)
@@ -361,7 +368,7 @@
if d is not None:
d.cancel()
- self.pons.clear()
+ # self.pons.clear()
# TODO: Any other? OLT specific deactivate steps
@@ -370,19 +377,72 @@
@inlineCallbacks
def update_flow_table(self, flows, device):
- self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
+ """
+ Update the flow table on the OLT. If an existing flow is not in the list, it needs
+ to be removed from the device.
+
+ :param flows: List of flows that should be installed upon completion of this function
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ """
+ self.log.info('bulk-flow-update: {} flows'.format(len(flows)),
+ device_id=device.id, flows=flows)
+
+ valid_flows = []
for flow in flows:
# TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
- flow_entry = FlowEntry.create(flow, self)
+ try:
+ # Try to create an EVC.
+ #
+ # The first result is the flow entry that was created. This could be a match to an
+ # existing flow since it is a bulk update. None is returned only if no match to
+ # an existing entry is found and decode failed (unsupported field)
+ #
+ # The second result is the EVC this flow should be added to. This could be an
+ # existing flow (so your adding another EVC-MAP) or a brand new EVC (no existing
+ # EVC-MAPs). None is returned if there are not a valid EVC that can be created YET.
- if flow_entry is not None:
- flow_entry.install()
+ valid_flow, evc = FlowEntry.create(flow, self)
- if flow_entry.name not in self.flow_entries:
- # TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
- self.flow_entries[flow_entry.name] = flow_entry
+ if valid_flow is not None:
+ valid_flows.append(valid_flow.flow_id)
+
+ if evc is not None:
+ try:
+ results = yield evc.install()
+
+ if evc.name not in self.evcs:
+ self.evcs[evc.name] = evc
+ else:
+ # TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
+ pass
+
+ # Also make sure all EVC MAPs are installed
+
+ for evc_map in evc.evc_maps:
+ try:
+ results = yield evc_map.install()
+ pass # TODO: What to do on error?
+
+ except Exception as e:
+ evc_map.status = 'Exception during EVC-MAP Install: {}'.format(e.message)
+ self.log.exception(evc_map.status, e=e)
+
+ except Exception as e:
+ evc.status = 'Exception during EVC Install: {}'.format(e.message)
+ self.log.exception(evc.status, e=e)
+
+ except Exception as e:
+ self.log.exception('Failure during bulk flow update - add', e=e)
+
+ # Now drop all flows from this device that were not in this bulk update
+ try:
+ FlowEntry.drop_missing_flows(device.id, valid_flows)
+
+ except Exception as e:
+ self.log.exception('Failure during bulk flow update - remove', e=e)
@inlineCallbacks
def send_proxied_message(self, proxy_address, msg):
@@ -417,14 +477,15 @@
return AdtranDeviceHandler.parse_module_revision(item.get('revision', None))
return None
- def _onu_offset(self, onu_id):
- return self.num_northbound_ports + self.num_southbound_ports + onu_id
-
- def _get_channel_id(self, pon_id, onu_id):
+ def get_channel_id(self, pon_id, onu_id):
from pon_port import PonPort
-
return self._onu_offset(onu_id) + (pon_id * PonPort.MAX_ONUS_SUPPORTED)
+ def _onu_offset(self, onu_id):
+ # Start ONU's just past the southbound PON port numbers. Since ONU ID's start
+ # at zero, add one
+ return self.num_northbound_ports + self.num_southbound_ports + onu_id + 1
+
def _channel_id_to_pon_id(self, channel_id, onu_id):
from pon_port import PonPort
@@ -432,3 +493,25 @@
def _pon_id_to_port_number(self, pon_id):
return pon_id + 1 + self.num_northbound_ports
+
+ def _port_number_to_pon_id(self, port):
+ return port - 1 - self.num_northbound_ports
+
+ def is_pon_port(self, port):
+ return self._port_number_to_pon_id(port) in self.southbound_ports
+
+ def is_uni_port(self, port):
+ return port >= self._onu_offset(0) # TODO: Really need to rework this one...
+
+ def get_port_name(self, port):
+ if self.is_nni_port(port):
+ return self.northbound_ports[port].name
+
+ if self.is_pon_port(port):
+ return self.southbound_ports[self._port_number_to_pon_id(port)].name
+
+ if self.is_uni_port(port):
+ return self.northbound_ports[port].name
+
+ if self.is_logical_port(port):
+ raise NotImplemented('TODO: Logical ports not yet supported')
\ No newline at end of file
diff --git a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
index e041040..332a49b 100644
--- a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
+++ b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
@@ -1,3 +1,19 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
from twisted.internet.defer import inlineCallbacks, returnValue
import xmltodict
import structlog
@@ -195,12 +211,14 @@
# If port up and ready: OFPPS_LIVE
# If port config bit is down: OFPPC_PORT_DOWN
# If port state bit is down: OFPPS_LINK_DOWN
- if IetfInterfacesState._get_admin_state(entry) == AdminState.ENABLED:
- return OFPPS_LIVE \
- if IetfInterfacesState._get_oper_status(entry) == OperStatus.ACTIVE \
- else OFPPS_LINK_DOWN
-
- return OFPPC_PORT_DOWN
+ # if IetfInterfacesState._get_admin_state(entry) == AdminState.ENABLED:
+ # return OFPPS_LIVE \
+ # if IetfInterfacesState._get_oper_status(entry) == OperStatus.ACTIVE \
+ # else OFPPS_LINK_DOWN
+ #
+ # return OFPPC_PORT_DOWN
+ # TODO: Update of openflow port state is not supported, so always say we are alive
+ return OFPPS_LIVE
@staticmethod
def _get_of_capabilities(entry):
@@ -247,7 +265,9 @@
40000000000: OFPPF_40GB_FD,
100000000000: OFPPF_100GB_FD,
}
- return speed_map.get(speed, OFPPF_OTHER)
+ # return speed_map.get(speed, OFPPF_OTHER)
+ # TODO: For now, force 100 GB
+ return OFPPF_100GB_FD
@staticmethod
def get_nni_port_entries(rpc_reply, nni_type='ethernet'):
@@ -262,6 +282,8 @@
ports = []
result_dict = xmltodict.parse(rpc_reply.data_xml)
entries = result_dict['data']['interfaces-state']['interface']
+ if not isinstance(entries, list):
+ entries = [entries]
nni_ports = [entry for entry in entries if 'name' in entry and nni_type in entry['name']]
for entry in nni_ports:
diff --git a/voltha/adapters/adtran_olt/codec/olt_config.py b/voltha/adapters/adtran_olt/codec/olt_config.py
index e639f9a..f5bfb7b 100644
--- a/voltha/adapters/adtran_olt/codec/olt_config.py
+++ b/voltha/adapters/adtran_olt/codec/olt_config.py
@@ -32,7 +32,7 @@
self._pons = None
def __str__(self):
- return "OltConfig: {}".format(self.software_version)
+ return "OltConfig: {}".format(self.olt_id)
@property
def olt_id(self):
diff --git a/voltha/adapters/adtran_olt/flow/acl.py b/voltha/adapters/adtran_olt/flow/acl.py
index 6228afc..86f9d73 100644
--- a/voltha/adapters/adtran_olt/flow/acl.py
+++ b/voltha/adapters/adtran_olt/flow/acl.py
@@ -13,18 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-
-import random
-
-from enum import Enum
import structlog
-from twisted.internet.defer import inlineCallbacks, returnValue
-
-from voltha.core.logical_device_agent import mac_str_to_tuple
-from voltha.protos.common_pb2 import OperStatus, AdminState
-from voltha.protos.device_pb2 import Port
-from voltha.protos.logical_device_pb2 import LogicalPort
-from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
import voltha.core.flow_decomposer as fd
@@ -44,7 +33,7 @@
self._parent = flow_entry # FlowEntry parent
self._flow = flow_entry.flow
self._handler = flow_entry.handler
- self._name = None
+ self._name = ACL.flow_to_name(flow_entry)
self._valid = self._decode()
@@ -53,8 +42,8 @@
pass # TODO: Start here Thursday
@staticmethod
- def flow_to_name(flow, handler):
- return 'ACL-{}-{}'.format(flow.id, handler.id)
+ def flow_to_name(flow_entry):
+ return 'ACL-{}-{}'.format(flow_entry.handler.device_id, flow_entry.flow.id)
@property
def valid(self):
@@ -123,24 +112,6 @@
return status
- def _is_men_port(self, port):
- return port in self._handler.northbound_ports(port)
-
- def _is_uni_port(self, port):
- return port in self._handler.southbound_ports(port)
-
- def _is_logical_port(self, port):
- return not self._is_men_port(port) and not self._is_uni_port(port)
-
- def _get_port_name(self, port):
- if self._is_logical_port(port):
- raise NotImplemented('TODO: Logical ports not yet supported')
-
- if self._is_men_port(port):
- return self._handler.northbound_ports[port].name
-
- return None
-
def _decode_traffic_selector(self):
"""
Extract EVC related traffic selection settings
@@ -148,11 +119,27 @@
in_port = fd.get_in_port(self._flow)
assert in_port is not None
+ log.debug('InPort: {}', in_port)
+
+ for field in fd.get_ofb_fields(self._flow):
+ log.debug('Found-OFB-field', field=field)
+
+ for action in fd.get_actions(self._flow):
+ log.debug('Found-Action', action=action)
+
return True
def _decode_traffic_treatment(self):
out_port = fd.get_out_port(self._flow)
+ log.debug('OutPort: {}', out_port)
+
+ for field in fd.get_ofb_fields(self._flow):
+ log.debug('Found-OFB-field', field=field)
+
+ for action in fd.get_actions(self._flow):
+ log.debug('Found-Action', action=action)
+
return True
# BULK operations
diff --git a/voltha/adapters/adtran_olt/flow/evc.py b/voltha/adapters/adtran_olt/flow/evc.py
index 27ce42f..e350db1 100644
--- a/voltha/adapters/adtran_olt/flow/evc.py
+++ b/voltha/adapters/adtran_olt/flow/evc.py
@@ -14,92 +14,104 @@
# limitations under the License.
#
-import random
-
from enum import Enum
-import structlog
from twisted.internet.defer import inlineCallbacks, returnValue
-
-from voltha.core.logical_device_agent import mac_str_to_tuple
-from voltha.protos.common_pb2 import OperStatus, AdminState
-from voltha.protos.device_pb2 import Port
-from voltha.protos.logical_device_pb2 import LogicalPort
-from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
-
-import voltha.core.flow_decomposer as fd
+from voltha.core.flow_decomposer import *
log = structlog.get_logger()
-_evc_list = {} # Key -> Name: List of encoded EVCs
-
EVC_NAME_FORMAT = 'EVC-VOLTHA-{}-{}'
EVC_NAME_REGEX = 'EVC-VOLTHA-{}'.format('regex-here')
DEFAULT_STPID = 0x8100
+_xml_header = '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs"><evc>'
+_xml_trailer = '</evc></evcs>'
+
class EVC(object):
"""
Class to wrap EVC functionality
"""
class SwitchingMethod(Enum):
- SINGLE_TAGGED = 0
- DOUBLE_TAGGED = 1
- MAC_SWITCHED = 2
+ SINGLE_TAGGED = 1
+ DOUBLE_TAGGED = 2
+ MAC_SWITCHED = 3
+ DOUBLE_TAGGED_MAC_SWITCHED = 4
+ DEFAULT = SINGLE_TAGGED
+
+ @staticmethod
+ def xml(value):
+ if value is None:
+ value = EVC.SwitchingMethod.DEFAULT
+ if value == EVC.SwitchingMethod.SINGLE_TAGGED:
+ return '<single-tag-switched/>'
+ elif value == EVC.SwitchingMethod.DOUBLE_TAGGED:
+ return '<double-tag-switched/>'
+ elif value == EVC.SwitchingMethod.MAC_SWITCHED:
+ return '<mac-switched/>'
+ elif value == EVC.SwitchingMethod.DOUBLE_TAGGED_MAC_SWITCHED:
+ return '<double-tag-mac-switched/>'
+ raise ValueError('Invalid SwitchingMethod enumeration')
class Men2UniManipulation(Enum):
- SYMETRIC = 0
- POP_OUT_TAG_ONLY = 1
+ SYMETRIC = 1
+ POP_OUT_TAG_ONLY = 2
+ DEFAULT = SYMETRIC
+
+ @staticmethod
+ def xml(value):
+ if value is None:
+ value = EVC.Men2UniManipulation.DEFAULT
+ fmt = '<men-to-uni-tag-manipulation>{}</men-to-uni-tag-manipulation>'
+ if value == EVC.Men2UniManipulation.SYMETRIC:
+ return fmt.format('<symetric/>')
+ elif value == EVC.Men2UniManipulation.POP_OUT_TAG_ONLY:
+ return fmt.format('<pop-outer-tag-only/>')
+ raise ValueError('Invalid Men2UniManipulation enumeration')
class ElineFlowType(Enum):
- NNI_TO_UNI = 0,
- UNI_TO_NNI = 1,
- NNI_TO_NNI = 2,
- ACL_FILTER = 3,
- UNKNOWN = 4,
- UNSUPPORTED = 5 # Or Invalid
+ NNI_TO_UNI = 1
+ UNI_TO_NNI = 2
+ NNI_TO_NNI = 3
+ ACL_FILTER = 4
+ UNKNOWN = 5
+ UNSUPPORTED = 5 # Or Invalid
def __init__(self, flow_entry):
self._installed = False
self._status_message = None
- self._parent = flow_entry # FlowEntry parent
- self._flow = flow_entry.flow
- self._handler = flow_entry.handler
- self._evc_maps = [] # One if E-Line
+ self._flow = flow_entry
+ self._name = self._create_name()
+ self._evc_maps = {} # Map Name -> evc-map
self._flow_type = EVC.ElineFlowType.UNKNOWN
# EVC related properties
- self._name = EVC.flow_to_name(flow_entry.flow, flow_entry.handler)
self._enabled = True
- self._ce_vlan_preservation = True
self._men_ports = []
- self._s_tag = -1
- self._stpid = DEFAULT_STPID
+ self._s_tag = None
+ self._stpid = None
+ self._switching_method = None
- self._switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
- self._men_to_uni_tag_manipulation = EVC.Men2UniManipulation.SYMETRIC
+ self._ce_vlan_preservation = None
+ self._men_to_uni_tag_manipulation = None
- self._valid = self._decode()
+ try:
+ self._valid = self._decode()
- @staticmethod
- def flow_to_name(flow, handler):
- return EVC_NAME_FORMAT.format(flow.id, handler.id)
+ except Exception as e:
+ log.exception('Failure during EVC decode', e=e)
+ self._valid = False
- @staticmethod
- def create(flow_entry):
- # Does it already exist?
+ def _create_name(self):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.device_id, self._flow.flow_id)
- evc = _evc_list.get(EVC.flow_to_name(flow_entry.flow, flow_entry.handler))
-
- if evc is None:
- evc = EVC(flow_entry.flow, flow_entry.handler)
-
- if evc is not None:
- pass # Look up any EVC that
- return
- pass # Start decode here
-
- return evc
+ @property
+ def name(self):
+ return self._name
@property
def valid(self):
@@ -113,162 +125,225 @@
def status(self):
return self._status_message
+ @status.setter
+ def status(self, value):
+ self._status_message = value
+
+ @property
+ def s_tag(self):
+ return self._s_tag
+
+ @property
+ def stpid(self):
+ return self._stpid
+
+ @stpid.setter
+ def stpid(self, value):
+ assert self._stpid is None or self._stpid == value
+ self._stpid = value
+
+ @property
+ def switching_method(self):
+ return self._switching_method
+
+ @switching_method.setter
+ def switching_method(self, value):
+ assert self._switching_method is None or self._switching_method == value
+ self._switching_method = value
+
+ @property
+ def ce_vlan_preservation(self):
+ return self._ce_vlan_preservation
+
+ @ce_vlan_preservation.setter
+ def ce_vlan_preservation(self, value):
+ assert self._ce_vlan_preservation is None or self._ce_vlan_preservation == value
+ self.ce_vlan_preservation = value
+
+ @property
+ def men_to_uni_tag_manipulation(self):
+ return self._men_to_uni_tag_manipulation
+
+ @men_to_uni_tag_manipulation.setter
+ def men_to_uni_tag_manipulation(self, value):
+ assert self._men_to_uni_tag_manipulation is None or self._men_to_uni_tag_manipulation == value
+ self._men_to_uni_tag_manipulation = value
+
+ @property
+ def flow_entry(self):
+ return self._flow
+
+ @property
+ def evc_maps(self):
+ """
+ Get all EVC Maps that reference this EVC
+ :return: list of EVCMap
+ """
+ return self._evc_maps.values()
+
+ def add_evc_map(self, evc_map):
+ if self._evc_maps is not None:
+ self._evc_maps[evc_map.name] = evc_map
+
+ def remove_evc_map(self, evc_map):
+ if self._evc_maps is not None and evc_map.name in self._evc_maps:
+ del self._evc_maps[evc_map.name]
+
+ @inlineCallbacks
def install(self):
- if not self._installed:
- if self._name in _evc_list:
- self._status_message = "EVC '{}' already is installed".format(self._name)
- raise Exception(self._status_message) # TODO: A unique exception type would work here
+ if self._valid and not self._installed:
+ xml = _xml_header
+ xml += '<name>{}</name>'.format(self.name)
+ xml += '<enabled>{}</enabled>'.format(self._enabled)
+ xml += '<ce-vlan-preservation>{}</ce-vlan-preservation>'.\
+ format(self._ce_vlan_preservation or True)
- raise NotImplemented('TODO: Implement this')
- # xml = '<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' \
- # '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs">' \
- # '<adtn-evc:evc xmlns:adtn-evc="http://www.adtran.com/ns/yang/adtran-evcs">'
- #
- # xml += '<adtn-evc:name>' + name + '</adtn-evc:name>'
- #
- # if stag:
- # xml += '<adtn-evc:stag>' + stag + '</adtn-evc:stag>'
- #
- # if preserve:
- # xml += '<adtn-evc:ce-vlan-preservation>' + preserve + '</adtn-evc:ce-vlan-preservation>'
- #
- # if enabled:
- # xml += '<adtn-evc:enabled>' + enabled + '</adtn-evc:enabled>'
- # else:
- # xml += '<adtn-evc:enabled>' + "true" + '</adtn-evc:enabled>'
- #
- # xml += '</adtn-evc:evc></evc></config>'
- #
- # print "Creating EVC %s" % name
- #
- # print mgr.mgr.edit_config(target="running",
- # config=xml,
- # default_operation="merge",
- # format="xml")
+ if self._s_tag is not None:
+ xml += '<stag>{}</stag>'.format(self._s_tag)
+ xml += '<stag-tpid>{:#x}</stag-tpid>'.format(self._stpid or DEFAULT_STPID)
+ else:
+ xml += 'no-stag/'
- self._installed = True
- _evc_list[self.name] = self
- pass
+ for port in self._men_ports:
+ xml += '<men-ports>{}</men-ports>'.format(port)
- return self._installed
+ xml += EVC.Men2UniManipulation.xml(self._men_to_uni_tag_manipulation)
+ xml += EVC.SwitchingMethod.xml(self._switching_method)
+ xml += _xml_trailer
+ log.debug("Creating EVC {}: '{}'".format(self.name, xml))
+
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='create',
+ lock_timeout=30)
+ self._installed = results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to install EVC', name=self.name, e=e)
+ raise
+
+ returnValue(self._installed and self._valid)
+
+ @inlineCallbacks
def remove(self):
if self._installed:
- raise NotImplemented('TODO: Implement this')
- # xml = '<config xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' \
- # '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs">' \
- # '<adtn-evc:evc xmlns:adtn-evc="http://www.adtran.com/ns/yang/adtran-evcs" nc:operation="delete">'
- #
- # xml += '<adtn-evc:name>' + name + '</adtn-evc:name>'
- #
- # xml += '</adtn-evc:evc></evc></config>'
- #
- # print "Deleting EVC %s" % name
- #
- # print mgr.mgr.edit_config(target="running",
- # config=xml,
- # default_operation="merge",
- # format="xml")
+ xml = _xml_header + '<name>{}</name>'.format(self.name) + _xml_trailer
- self._installed = False
- _evc_list.pop(self.name)
+ log.debug("Deleting EVC {}: '{}'".format(self.name, xml))
+
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='delete',
+ lock_timeout=30)
+ self._installed = not results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to remove EVC', name=self.name, e=e)
+ raise
+
+ # TODO: Do we remove evc-maps as well reference here or maybe have a 'delete' function?
pass
- return not self._installed
+ returnValue(not self._installed)
+ @inlineCallbacks
def enable(self):
- if not self._enabled:
- raise NotImplemented("TODO: Implement this")
- self._enabled = False
+ if self.installed and not self._enabled:
+ xml = _xml_header + '<name>{}</name>'.format(self.name)
+ xml += '<enabled>true</enabled>' + _xml_trailer
+ log.debug("Enabling EVC {}: '{}'".format(self.name, xml))
+
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='merge',
+ lock_timeout=30)
+ self._enabled = results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to enable EVC', name=self.name, e=e)
+ raise
+
+ returnValue(self.installed and self._enabled)
+
+ @inlineCallbacks
def disable(self):
- if self._enabled:
- raise NotImplemented("TODO: Implement this")
- self._enabled = True
+ if self.installed and self._enabled:
+ xml = _xml_header + '<name>{}</name>'.format(self.name)
+ xml += '<enabled>false</enabled>' + _xml_trailer
+
+ log.debug("Disabling EVC {}: '{}'".format(self.name, xml))
+
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='merge',
+ lock_timeout=30)
+ self._enabled = not results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to disable EVC', name=self.name, e=e)
+ raise
+
+ returnValue(self.installed and not self._enabled)
+
+ @inlineCallbacks
+ def delete(self):
+ """
+ Remove from hardware and delete/clean-up
+ """
+ try:
+ self._valid = False
+ succeeded = yield self.remove()
+ # TODO: On timeout or other NETCONF error, should we schedule cleanup later?
+
+ except Exception:
+ succeeded = False
+
+ finally:
+ self._flow = None
+ self._evc_maps = None
+
+ returnValue(succeeded)
def _decode(self):
"""
- Examine flow rules and extract appropriate settings for both this EVC
- and creates any EVC-Maps required.
+ Examine flow rules and extract appropriate settings for this EVC
"""
- from evc_map import EVCMap
-
- # Determine this flow's type
-
- status = self._decode_traffic_selector() and self._decode_traffic_treatment()
-
- if status:
- ingress_map = EVCMap.createIngressMap(self._flow, self._device)
- egress_map = EVCMap.createEgressMap(self._flow, self._device)
-
- status = ingress_map.valid and egress_map.valid
-
- if status:
- self._evc_maps.append(ingress_map)
- self._evc_maps.append(egress_map)
- else:
- self._status_message = 'Ingress MAP invalid: {}'.format(ingress_map.status)\
- if not ingress_map.valid else 'Egress MAP invalid: {}'.format(egress_map.status)
-
- return status
-
- def _is_men_port(self, port):
- return port in self._handler.northbound_ports(port)
-
- def _is_uni_port(self, port):
- return port in self._handler.southbound_ports(port)
-
- def _is_logical_port(self, port):
- return not self._is_men_port(port) and not self._is_uni_port(port)
-
- def _get_port_name(self, port):
- if self._is_logical_port(port):
- raise NotImplemented('TODO: Logical ports not yet supported')
-
- if self._is_men_port(port):
- return self._handler.northbound_ports[port].name
-
- return None
-
- def _decode_traffic_selector(self):
- """
- Extract EVC related traffic selection settings
- """
- in_port = fd.get_in_port(self._flow)
- assert in_port is not None
-
- if self._is_men_port(in_port):
- log.debug('in_port is a MEN Port', port=in_port)
- self._men_ports.append(self._get_port_name(in_port))
+ if self._flow.handler.is_nni_port(self._flow.in_port):
+ self._men_ports.append(self._flow.handler.get_port_name(self._flow.in_port))
else:
- pass # UNI Ports handled in the EVC Maps
+ self._status_message = 'EVCs with UNI ports are not supported'
+ return False # UNI Ports handled in the EVC Maps
- for field in fd.get_ofb_fields(self._flow):
- log.debug('Found OFB field', field=field)
- self._status_message = 'Unsupported field.type={}'.format(field.type)
- return False
+ self._s_tag = self._flow.vlan_id
- return True
+ if self._flow.inner_vid is not None:
+ self._switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
- def _decode_traffic_treatment(self):
- out_port = fd.get_out_port(self._flow)
- num_outputs = 0
-
- if self._is_men_port(out_port):
- log.debug('out_port is a MEN Port', port=out_port)
- self._men_ports.append(self._get_port_name(out_port))
- else:
- pass # UNI Ports handled in the EVC Maps
-
- for action in fd.get_actions(self._flow):
- if action.type == fd.OUTPUT:
- num_outputs += 1 # Handled earlier
- assert num_outputs <= 1 # Only E-LINE supported and no UNI<->UNI
-
- else:
- # TODO: May need to modify ce-preservation
- log.debug('Found action', action=action)
-
+ # Note: The following fields will get set when the first EVC-MAP
+ # is associated with this object. Once set, they cannot be changed to
+ # another value.
+ # self._stpid
+ # self._switching_method
+ # self._ce_vlan_preservation
+ # self._men_to_uni_tag_manipulation
return True
# BULK operations
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
index 1d16daf..d5e29ad 100644
--- a/voltha/adapters/adtran_olt/flow/evc_map.py
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -14,20 +14,10 @@
# limitations under the License.
#
-import random
-
import structlog
from enum import Enum
from twisted.internet.defer import inlineCallbacks, returnValue
-from voltha.core.logical_device_agent import mac_str_to_tuple
-from voltha.protos.common_pb2 import OperStatus, AdminState
-from voltha.protos.device_pb2 import Port
-from voltha.protos.logical_device_pb2 import LogicalPort
-from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
-
-import voltha.core.flow_decomposer as fd
-
log = structlog.get_logger()
EVC_MAP_NAME_INGRESS_FORMAT = 'EVCMap-VOLTHA-ingress-{}'
@@ -36,6 +26,10 @@
EVC_MAP_NAME_INGRESS_REGEX_FORMAT = EVC_MAP_NAME_INGRESS_FORMAT.format('regex here')
EVC_MAP_NAME_EGRESS_REGEX_FORMAT = EVC_MAP_NAME_EGRESS_FORMAT.format('regex here')
+_xml_header = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"><evc-map>'
+_xml_trailer = '</evc-map></evc-maps>'
+
+
class EVCMap(object):
"""
Class to wrap EVC functionality
@@ -44,16 +38,30 @@
NO_EVC_CONNECTION = 0
EVC = 1
DISCARD = 2
+ DEFAULT = NO_EVC_CONNECTION
- class Priority_Option(Enum):
+ @staticmethod
+ def xml(value):
+ # Note we do not have XML for 'EVC' enumeration.
+ if value is None:
+ value = EVCMap.EvcConnection.DEFAULT
+ if value == EVCMap.EvcConnection.DISCARD:
+ return '<no-evc-connection/>'
+ elif value == EVCMap.EvcConnection.DISCARD:
+ return 'discard/'
+ raise ValueError('Invalid EvcConnection enumeration')
+
+ class PriorityOption(Enum):
INHERIT_PRIORITY = 0
EXPLICIT_PRIORITY = 1
+ DEFAULT = INHERIT_PRIORITY
- def __init__(self, flow, handler, evc, is_ingress_map):
+ def __init__(self, flow, evc, is_ingress_map):
+ self._flow = flow
+ self._evc = evc
+ self._is_ingress_map = is_ingress_map
self._installed = False
self._status_message = None
- self._flow = flow
- self._handler = handler
self._name = None
self._enabled = True
@@ -61,14 +69,14 @@
self._evc_connection = EVCMap.EvcConnection.NO_EVC_CONNECTION
self._evc_name = None
- self._men_priority = EVCMap.Priority_Option.INHERIT_PRIORITY
+ self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
self._men_pri = 0 # If Explicit Priority
- self._c_tag = -1
- self._men_ctag_priority = EVCMap.Priority_Option.INHERIT_PRIORITY
+ self._c_tag = None
+ self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
self._men_ctag_pri = 0 # If Explicit Priority
- self._match_ce_vlan_id = -1
+ self._match_ce_vlan_id = None
self._match_untagged = True
self._match_destination_mac_address = None
self._match_l2cp = False
@@ -77,18 +85,32 @@
self._match_unicast = False
self._match_igmp = False
- self._evc = evc
- self._is_ingress_map = is_ingress_map
+ # ACL logic
+ self._eth_type = None
+ self._ip_protocol = None
+ self._ipv4_dst = None
+ self._udp_dst = None
+ self._udp_src = None
- self._valid = self.decode()
+ try:
+ self._valid = self._decode()
+
+ except Exception as e:
+ log.exception('Failure during EVCMap decode', e=e)
+ self._valid = False
+
+ if self._valid:
+ evc.add_evc_map(self)
+ else:
+ self._evc = None
@staticmethod
- def createIngressMap(flow, device, evc):
- return EVCMap(flow, device, evc, True)
+ def create_ingress_map(flow, evc):
+ return EVCMap(flow, evc, True)
@staticmethod
- def createEgressMap(flow, device, evc):
- return EVCMap(flow, device, evc, False)
+ def create_egress_map(flow, evc):
+ return EVCMap(flow, evc, False)
@property
def valid(self):
@@ -99,33 +121,242 @@
return self._installed
@property
+ def name(self):
+ return self._name
+
+ @property
def status(self):
return self._status_message
+ @status.setter
+ def status(self, value):
+ self._status_message = value
+
+ @property
+ def _needs_acl_support(self):
+ return self._eth_type is None and self._ip_protocol is None and\
+ self._ipv4_dst is None and self._udp_dst is None and self._udp_src is None
+
+ @inlineCallbacks
def install(self):
- if not self._installed:
- pass
+ if self._valid and not self._installed:
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">' \
+ '<evc-map>'
+ xml += '<name>{}</name>'.format(self.name)
+ xml += '<enabled>{}</enabled>'.format(self._enabled)
+ xml += '<uni>{}</uni>'.format(self._uni_port)
- return self._installed
+ if self._evc_name is not None:
+ xml += '<evc>{}</evc>'.format(self._evc_name)
+ else:
+ xml += EVCMap.EvcConnection.xml(self._evc_connection)
+ if self._match_untagged:
+ xml += '<match-untagged>True</match-untagged>'
+ elif self._c_tag is not None:
+ xml += '<ctag>{}</ctag>'.format(self._c_tag)
+
+ xml += _xml_trailer
+
+ log.debug("Creating EVC-MAP {}: '{}'".format(self.name, xml))
+
+ if self._needs_acl_support:
+ self._installed = True # TODO: Support ACLs
+ else:
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='create',
+ lock_timeout=30)
+ self._installed = results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to install EVC-MAP', name=self.name, e=e)
+ raise
+
+ # TODO: The following is not yet supported
+ # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_pri = 0 # If Explicit Priority
+ #
+ # self._c_tag = None
+ # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_ctag_pri = 0 # If Explicit Priority
+ #
+ # self._match_ce_vlan_id = None
+ # self._match_untagged = True
+ # self._match_destination_mac_address = None
+ # self._match_l2cp = False
+ # self._match_broadcast = False
+ # self._match_multicast = False
+ # self._match_unicast = False
+ # self._match_igmp = False
+ # self._eth_type = None
+ # self._ip_protocol = None
+ # self._ipv4_dst = None
+ # self._udp_dst = None
+ # self._udp_src = None
+
+ returnValue(self._installed and self._valid)
+
+ @inlineCallbacks
def remove(self):
if self._installed:
- pass
+ xml = _xml_header + '<name>{}</name>'.format(self.name) + _xml_trailer
- return not self._installed
+ log.debug("Deleting EVC-MAP {}: '{}'".format(self.name, xml))
+
+ if self._needs_acl_support:
+ self._installed = False # TODO: Support ACLs
+ else:
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='delete',
+ lock_timeout=30)
+ self._installed = not results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to remove EVC-MAP', name=self.name, e=e)
+ raise
+
+ # TODO: Do we remove evc reference here or maybe have a 'delete' function?
+
+ returnValue(self._installed)
+
+ @inlineCallbacks
+ def enable(self):
+ if self.installed and not self._enabled:
+ xml = _xml_header + '<name>{}</name>'.format(self.name)
+ xml += '<enabled>true</enabled>' + _xml_trailer
+
+ log.debug("Enabling EVC-MAP {}: '{}'".format(self.name, xml))
+
+ if self._needs_acl_support:
+ self._enabled = True # TODO: Support ACLs
+ else:
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='merge',
+ lock_timeout=30)
+ self._enabled = results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to enable EVC-MAP', name=self.name, e=e)
+ raise
+
+ returnValue(self.installed and self._enabled)
+
+ @inlineCallbacks
+ def disable(self):
+ if self.installed and self._enabled:
+ xml = _xml_header + '<name>{}</name>'.format(self.name)
+ xml += '<enabled>false</enabled>' + _xml_trailer
+
+ log.debug("Disabling EVC-MAP {}: '{}'".format(self.name, xml))
+
+ if self._needs_acl_support:
+ self._enabled = False # TODO: Support ACLs
+ else:
+ try:
+ results = yield self._flow.handler.netconf_client.edit_config(xml,
+ default_operation='merge',
+ lock_timeout=30)
+ self._enabled = not results.ok
+ if results.ok:
+ self.status = ''
+ else:
+ self.status = results.error # TODO: Save off error status
+
+ except Exception as e:
+ log.exception('Failed to disable EVC-MAP', name=self.name, e=e)
+ raise
+
+ returnValue(self.installed and not self._enabled)
+
+ @inlineCallbacks
+ def delete(self):
+ """
+ Remove from hardware and delete/clean-up
+ """
+ try:
+ self._valid = False
+ succeeded = yield self.remove()
+ # TODO: On timeout or other NETCONF error, should we schedule cleanup later?
+
+ except Exception:
+ succeeded = False
+
+ finally:
+ self._flow = None
+ evc, self._evc = self._evc, None
+ if evc is not None:
+ evc.remove_evc_map(self)
+
+ returnValue(succeeded)
def _decode(self):
- self._name = 'EVC-MAP-{}-{}'.format('i' if self._is_ingress_map else 'e', self._flow.id)
+ from evc import EVC
+ from flow_entry import FlowEntry
- return self._decode_traffic_selector() and self._decode_traffic_treatment()
+ flow = self._flow
- def _decode_traffic_selector(self):
- self._status_message('TODO: Not yet implemented')
- return False
+ self._name = 'EVC-MAP-{}-{}'.format('i' if self._is_ingress_map else 'e', flow.flow_id)
- def _decode_traffic_treatment(self):
- self._status_message('TODO: Not yet implemented')
- return False
+ if self._evc:
+ self._evc_connection = EVCMap.EvcConnection.EVC
+ self._evc_name = self._evc.name
+ else:
+ self._status_message = 'Can only create EVC-MAP if EVC supplied'
+ return False
+ if flow.handler.is_pon_port(flow.in_port) or flow.handler.is_uni_port(flow.in_port):
+ self._uni_port = self._flow.handler.get_port_name(flow.in_port)
+ else:
+ self._status_message = 'EVC-MAPS without UNI or PON ports are not supported'
+ return False # UNI Ports handled in the EVC Maps
+ # If no match of VLAN this may be for untagged traffic
+ if flow.vlan_id is None and flow.inner_vid is None:
+ self._match_untagged = True
+ else:
+ self._match_untagged = False
+ self._c_tag = flow.inner_vid
+
+ # If a push of a single VLAN is present with a POP of the VLAN in the EVC's
+ # flow, then this is a traditional EVC flow
+
+ if len(flow.push_vlan_id) == 1 and self._evc.flow_entry.pop_vlan == 1:
+ self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.SYMETRIC
+ self._evc.switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
+ self._evc.stpid = flow.push_vlan_tpid[0]
+
+ elif len(flow.push_vlan_id) == 2 and self._evc.flow_entry.pop_vlan == 1:
+ self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
+ self._evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
+ # self._match_ce_vlan_id = 'TODO: something maybe'
+ raise NotImplementedError('TODO: Not supported/needed yet')
+
+ # ACL logic
+
+ self._eth_type = flow.eth_type
+
+ if self._eth_type == FlowEntry.EtherType.IPv4:
+ self._ip_protocol = flow.ip_protocol
+ self._ipv4_dst = flow.ipv4_dst
+
+ if self._ip_protocol == FlowEntry.IpProtocol.UDP:
+ self._udp_dst = flow.udp_dst
+ self._udp_src = flow.udp_src
+
+ return True
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
index b683cab..fed24e0 100644
--- a/voltha/adapters/adtran_olt/flow/flow_entry.py
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -13,92 +13,420 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
-import structlog
from evc import EVC
-from acl import ACL
+from evc_map import EVCMap
+from enum import Enum
import voltha.core.flow_decomposer as fd
-from voltha.protos.openflow_13_pb2 import OFPP_IN_PORT, OFPP_TABLE, OFPP_NORMAL, OFPP_FLOOD, OFPP_ALL
+from voltha.core.flow_decomposer import *
from voltha.protos.openflow_13_pb2 import OFPP_CONTROLLER, OFPP_LOCAL, OFPP_ANY, OFPP_MAX
+from twisted.internet.defer import returnValue, inlineCallbacks
log = structlog.get_logger()
+# IP Protocol numbers
+_supported_ip_protocols = [
+ 1, # ICMP
+ 2, # IGMP
+ 6, # TCP
+ 17, # UDP
+]
+
+_existing_flow_entries = {} # device-id -> flow dictionary
+ # |
+ # +-> flow-id -> flow-entry
+
class FlowEntry(object):
"""
- Provide a class that wraps the flow rule and also provides state/status for
- a FlowEntry.
+ Provide a class that wraps the flow rule and also provides state/status for a FlowEntry.
+
+ When a new flow is sent, it is first decoded to check for any potential errors. If None are
+ found, the entry is created and it is analyzed to see if it can be combined to with any other flows
+ to create or modify an existing EVC.
+
+ Note: Since only E-LINE is supported, modification of an existing EVC is not performed.
"""
+ class FlowDirection(Enum):
+ UPSTREAM = 0 # UNI port to NNI Port
+ DOWNSTREAM = 1 # NNI port to UNI Port
+ NNI = 2 # NNI port to NNI Port
+ UNI = 3 # UNI port to UNI Port
+ OTHER = 4 # Unable to determine
+
+ _flow_dir_map = {
+ (FlowDirection.UNI, FlowDirection.NNI): FlowDirection.UPSTREAM,
+ (FlowDirection.NNI, FlowDirection.UNI): FlowDirection.DOWNSTREAM,
+ (FlowDirection.UNI, FlowDirection.UNI): FlowDirection.UNI,
+ (FlowDirection.NNI, FlowDirection.NNI): FlowDirection.NNI,
+ }
+
+ # Well known EtherType
+ class EtherType(Enum):
+ EAPOL = 0x88E8
+ IPv4 = 0x0800
+ ARP = 0x0806
+
+ # Well known IP Protocols
+ class IpProtocol(Enum):
+ IGMP = 2
+ UDP = 17
+
def __init__(self, flow, handler):
self._flow = flow
self._handler = handler
- log.debug('Initializing a new FlowEntry', flow=flow)
+ self.evc = None # EVC this flow is part of
+ self.evc_map = None # EVC-MAP this flow is part of
+ self._flow_direction = FlowEntry.FlowDirection.OTHER
+
+ self._name = self._create_flow_name()
+ # A value used to locate possible related flow entries
+ self.signature = None
+
+ # Selection properties
+ self.in_port = None
+ self.vlan_id = None
+ self.pcp = None
+ self.eth_type = None
+ self.ip_protocol = None
+ self.ipv4_dst = None
+ self.udp_dst = None # UDP Port #
+ self.udp_src = None # UDP Port #
+ self.inner_vid = None
+
+ # Actions
+ self.output = None
+ self.pop_vlan = 0
+ self.push_vlan_tpid = []
+ self.push_vlan_id = []
@property
def name(self):
- return 'Flow-{}'.format(self.flow.id)
+ return self._name # TODO: Is a name really needed in production?
+
+ # TODO: Is a name really needed in production?
+ def _create_flow_name(self):
+ return 'flow-{}-{}'.format(self.device_id, self.flow_id)
@property
def flow(self):
return self._flow
@property
+ def flow_id(self):
+ return self.flow.id
+
+ @property
def handler(self):
return self._handler
+ @property
+ def device_id(self):
+ return self.handler.device_id
+
+ @property
+ def flow_direction(self):
+ return self._flow_direction
+
@staticmethod
def create(flow, handler):
"""
- Create the appropriate FlowEntry wrapper for the flow
+ Create the appropriate FlowEntry wrapper for the flow. This method returns a two
+ results.
+
+ The first result is the flow entry that was created. This could be a match to an
+ existing flow since it is a bulk update. None is returned only if no match to
+ an existing entry is found and decode failed (unsupported field)
+
+ The second result is the EVC this flow should be added to. This could be an
+ existing flow (so your adding another EVC-MAP) or a brand new EVC (no existing
+ EVC-MAPs). None is returned if there are not a valid EVC that can be created YET.
:param flow: (Flow) Flow entry passed to VOLTHA adapter
:param handler: (AdtranDeviceHandler) handler for the device
- :return: (FlowEntry) A flow entry of the appropriate type
+ :return: (FlowEntry, EVC)
"""
- # Determine the type of flow entry. An ACL type entry is use to send
- # packets to a reserved port (controller) or to drop them.
+ # Exit early if it already exists
+ try:
+ flow_entry = FlowEntry(flow, handler)
- in_port = fd.get_in_port(flow)
- out_port = fd.get_out_port(flow)
+ if flow_entry.device_id not in _existing_flow_entries:
+ _existing_flow_entries[flow_entry.device_id] = {}
- if in_port or out_port is None:
+ flow_table = _existing_flow_entries[flow_entry.device_id]
+
+ if flow_entry.flow_id in flow_table:
+ return flow_entry, None
+
+ #########################################
+ # A new flow, decode it into the items of interest
+
+ if not flow_entry._decode():
+ return None, None
+
+ # Look for any matching flows in the other direction that might help make an EVC
+ # and then save it off in the device specific flow table
+ # TODO: For now, only support for E-LINE services between NNI and UNI
+
+ flow_candidates = [_flow for _flow in flow_table.itervalues()
+ if _flow.signature == flow_entry.signature and
+ _flow.in_port == flow_entry.output and
+ (_flow.flow_direction == FlowEntry.FlowDirection.UPSTREAM or
+ _flow.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM)
+ ]
+
+ flow_table[flow_entry.flow_id] = flow_entry
+
+ # TODO: For now, only support for E-LINE services between NNI and UNI
+ if len(flow_candidates) == 0 or (flow_entry.flow_direction != FlowEntry.FlowDirection.UPSTREAM and
+ flow_entry.flow_direction != FlowEntry.FlowDirection.DOWNSTREAM):
+ return flow_entry, None
+
+ # Possible candidate found. Currently, the logical_device_agent sends us the load downstream
+ # flow first and then all the matching upstreams. So we should have only one match
+
+ if flow_entry.flow_direction == FlowEntry.FlowDirection.DOWNSTREAM:
+ downstream_flow = flow_entry
+ else:
+ assert len(flow_candidates) != 0
+ downstream_flow = flow_candidates[0]
+
+ if flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ upstream_flows = [flow_entry]
+ else:
+ upstream_flows = flow_candidates
+
+ return flow_entry, FlowEntry._create_evc_and_maps(downstream_flow, upstream_flows)
+
+ except Exception as e:
+ log.exception('Error during flow_entry processing', e=e)
+
+ @staticmethod
+ def _create_evc_and_maps(downstream_flow, upstream_flows):
+ """
+ Give a set of flows, find (or create) the EVC and any needed EVC-MAPs
+
+ :param downstream_flow: NNI -> UNI flow (provides much of the EVC values)
+ :param upstream_flows: UNI -> NNI flows (provides much of the EVC-MAP values)
+
+ :return: EVC object
+ """
+ # Get any existing EVC if a flow is already created
+
+ if downstream_flow.evc is None:
+ downstream_flow.evc = EVC(downstream_flow)
+
+ evc = downstream_flow.evc
+ if not evc.valid:
return None
- # Convert all possible physical ports into a single number for matching purposes
+ # Create EVC-MAPs
+ for flow in upstream_flows:
+ if flow.evc_map is None:
+ flow.evc_map = EVCMap.create_ingress_map(flow, evc)
- if in_port <= OFPP_MAX:
- in_port = OFPP_MAX
+ all_valid = all(flow.evc_map.valid for flow in upstream_flows)
- if out_port <= OFPP_MAX:
- in_port = OFPP_MAX
+ return evc if all(flow.evc_map.valid for flow in upstream_flows) else None
- # Commented out entries below represent future desireable combinations, but not supported
- # in initial release of this device adapter.
+ def _decode(self):
+ """
+ Examine flow rules and extract appropriate settings
+ """
+ status = self._decode_traffic_selector() and self._decode_traffic_treatment()
- flow_type = {
- (OFPP_MAX, OFPP_MAX): EVCFlowEntry, # Physical port to physical port
- (OFPP_ANY, OFPP_CONTROLLER): ACLFlowEntry, # A common SDN/Openflow operation
- (OFPP_MAX, OFPP_TABLE): EVCFlowEntry, # Perhaps double-tagging?
- # (OFPP_MAX, OFPP_LOCAL): ACLFlowEntry,
- # (OFPP_ANY, OFPP_LOCAL): ACLFlowEntry,
- # (OFPP_LOCAL, OFPP_MAX): ACLFlowEntry,
- # (OFPP_MAX, OFPP_IN_PORT): EVCFlowEntry,
- # (OFPP_ANY, OFPP_IN_PORT): EVCFlowEntry,
+ if status:
+ # Determine direction of the flow
- }.get((in_port, out_port), None)
+ def port_type(port):
+ if port in self._handler.northbound_ports:
+ return FlowEntry.FlowDirection.NNI
+ elif port <= OFPP_MAX:
+ return FlowEntry.FlowDirection.UNI
+ return FlowEntry.FlowDirection.OTHER
- return None if flow_type is None else flow_type(FlowEntry(flow, handler))
+ self._flow_direction = FlowEntry._flow_dir_map.get((port_type(self.in_port), port_type(self.output)),
+ FlowEntry.FlowDirection.OTHER)
+
+ # Create a signature that will help locate related flow entries on a device.
+ # These are not exact, just ones that may be put together to make an EVC. The
+ # basic rules are:
+ #
+ # 1 - Same device
+ dev_id = self._handler.device_id
+
+ # 2 - Port numbers in increasing order
+ ports = [self.in_port, self.output]
+ ports.sort()
+
+ # 3 - The outer VID
+
+ push_len = len(self.push_vlan_id)
+ assert push_len <= 2
+
+ outer = self.vlan_id or None if push_len == 0 else self.push_vlan_id[0]
+
+ # 4 - The inner VID.
+ inner = self.inner_vid or None if push_len <= 1 else self.push_vlan_id[1]
+
+ self.signature = '{}'.format(dev_id)
+ for port in ports:
+ self.signature += '.{}'.format(port)
+ self.signature += '.{}.{}'.format(outer, inner)
+
+ return status
+
+ def _decode_traffic_selector(self):
+ """
+ Extract EVC related traffic selection settings
+ """
+ self.in_port = fd.get_in_port(self._flow)
+
+ if self.in_port > OFPP_MAX:
+ log.warn('Logical input ports are not supported at this time')
+ return False
+
+ for field in fd.get_ofb_fields(self._flow):
+ if field.type == IN_PORT:
+ pass # Handled earlier
+
+ elif field.type == VLAN_VID:
+ log.info('*** field.type == VLAN_VID', value=field.vlan_vid & 0xfff)
+ self.vlan_id = field.vlan_vid & 0xfff
+
+ elif field.type == VLAN_PCP:
+ log.info('*** field.type == VLAN_PCP', value=field.vlan_pcp)
+ self.pcp = field.vlan_pcp
+
+ elif field.type == ETH_TYPE:
+ log.info('*** field.type == ETH_TYPE', value=field.eth_type)
+ self.eth_type = field.eth_type
+
+ elif field.type == IP_PROTO:
+ log.info('*** field.type == IP_PROTO', value=field.ip_proto)
+ self.ip_protocol = field.ip_proto
+
+ if self.ip_protocol not in _supported_ip_protocols:
+ log.error('Unsupported IP Protocol')
+ return False
+
+ elif field.type == IPV4_DST:
+ log.info('*** field.type == IPV4_DST', value=field.ipv4_dst)
+ self.ipv4_dst = field.ipv4_dst
+
+ elif field.type == UDP_DST:
+ log.info('*** field.type == UDP_DST', value=field.udp_dst)
+ self.udp_dst = field.udp_dst
+
+ elif field.type == UDP_SRC:
+ log.info('*** field.type == UDP_SRC', value=field.udp_src)
+ self.udp_src = field.udp_src
+
+ elif field.type == METADATA:
+ log.info('*** field.type == METADATA', value=field.table_metadata)
+ self.inner_vid = field.table_metadata
+
+ else:
+ log.warn('Found unsupported selection field', type=field.type)
+ self._status_message = 'Unsupported field.type={}'.format(field.type)
+ return False
+
+ return True
+
+ def _decode_traffic_treatment(self):
+ self.output = fd.get_out_port(self._flow)
+
+ if self.output > OFPP_MAX:
+ log.warn('Logical output ports are not supported at this time')
+ return False
+
+ for act in fd.get_actions(self._flow):
+ if act.type == fd.OUTPUT:
+ pass # Handled earlier
+
+ elif act.type == POP_VLAN:
+ log.info('*** action.type == POP_VLAN')
+ self.pop_vlan += 1
+
+ elif act.type == PUSH_VLAN:
+ log.info('*** action.type == PUSH_VLAN', value=act.push)
+ # TODO: Do we want to test the ethertype for support?
+ tpid = act.push.ethertype
+ self.push_vlan_tpid.append(tpid)
+
+ elif act.type == SET_FIELD:
+ log.info('*** action.type == SET_FIELD', value=act.set_field.field)
+ assert (act.set_field.field.oxm_class == ofp.OFPXMC_OPENFLOW_BASIC)
+ field = act.set_field.field.ofb_field
+ if field.type == VLAN_VID:
+ self.push_vlan_id.append(field.vlan_vid & 0xfff)
+
+ else:
+ # TODO: May need to modify ce-preservation
+ log.warn('Found unsupported action', action=act)
+ self._status_message = 'Unsupported action.type={}'.format(act.type)
+ return False
+
+ return True
+
+ @staticmethod
+ def drop_missing_flows(device_id, valid_flow_ids):
+ flow_table = _existing_flow_entries.get(device_id, None)
+
+ if flow_table is not None:
+ flows_to_drop = [flow for flow_id, flow in flow_table.items() if flow_id not in valid_flow_ids]
+
+ for flow in flows_to_drop:
+ try:
+ yield flow.remove()
+
+ except Exception as e:
+ log.exception('Exception while removing stale flow', flow=flow, e=e)
+
+ @inlineCallbacks
+ def remove(self):
+ """
+ Remove this flow entry from the list of existing entries and drop EVC
+ if needed
+ """
+ # Remove from exiting table list
+ device_id = self._handler.device_id
+ flow_id = self._flow.id
+ flow_table = _existing_flow_entries.get(device_id, None)
+
+ if flow_table is not None and flow_id in flow_table:
+ del flow_table[flow_id]
+ if len(flow_table) == 0:
+ del _existing_flow_entries[device_id]
+
+ # Remove flow from the hardware
+
+ evc_map, self.evc_map = self.evc_map, None
+ evc, self.evc = self.evc, None
+
+ if evc_map is not None:
+ yield evc_map.delete()
+
+ if evc is not None:
+ yield evc.delete()
+
+ self._flow = None
+ self._handler = None
+
+ returnValue('done')
######################################################
# Bulk operations
@staticmethod
def enable_all():
+ # TODO: May want to be device specific or regex based
raise NotImplemented("TODO: Implement this")
@staticmethod
def disable_all():
+ # TODO: May want to be device specific or regex based
raise NotImplemented("TODO: Implement this")
@staticmethod
@@ -110,55 +438,3 @@
"""
raise NotImplemented("TODO: Implement this")
-
-class EVCFlowEntry(FlowEntry):
- def __init__(self, flow, handler):
- super(FlowEntry, self).__init__(flow, handler)
- self.evc = EVC.create(flow, handler)
-
- @property
- def valid(self):
- return self.evc.valid
-
- @property
- def installed(self):
- return self.evc.installed
-
- def install(self):
- return self.evc.install()
-
- def remove(self):
- return self.evc.remove()
-
- def enable(self):
- return self.evc.enable()
-
- def disable(self):
- return self.evc.disable()
-
-
-class ACLFlowEntry(FlowEntry):
- def __init__(self, flow, handler):
- super(FlowEntry, self).__init__(flow, handler)
- self.acl = ACL.create(flow, handler)
-
- @property
- def valid(self):
- return self.acl.valid
-
- @property
- def installed(self):
- return self.acl.installed
-
- def install(self):
- return self.acl.install()
-
- def remove(self):
- return self.acl.remove()
-
- def enable(self):
- return self.acl.enable()
-
- def disable(self):
- return self.evc.disable()
-
diff --git a/voltha/adapters/adtran_olt/net/adtran_netconf.py b/voltha/adapters/adtran_olt/net/adtran_netconf.py
index f75e115..d019da6 100644
--- a/voltha/adapters/adtran_olt/net/adtran_netconf.py
+++ b/voltha/adapters/adtran_olt/net/adtran_netconf.py
@@ -299,7 +299,7 @@
finally:
if lock_timeout > 0:
try:
- yield self._session.lock(target, lock_timeout)
+ yield self._session.unlock(target)
except Exception as e:
log.exception('edit_config unlock Exception: {}'.format(e.message))
diff --git a/voltha/adapters/adtran_olt/net/adtran_rest.py b/voltha/adapters/adtran_olt/net/adtran_rest.py
index aa42c8e..049b94a 100644
--- a/voltha/adapters/adtran_olt/net/adtran_rest.py
+++ b/voltha/adapters/adtran_olt/net/adtran_rest.py
@@ -137,7 +137,7 @@
raise
except ConnectionClosed:
- returnValue(None)
+ returnValue(ConnectionClosed)
except Exception as e:
log.exception("REST {} '{}' request to '{}' failed: {}".format(method, name, url, str(e)))
diff --git a/voltha/adapters/adtran_olt/net/mock_netconf_client.py b/voltha/adapters/adtran_olt/net/mock_netconf_client.py
new file mode 100644
index 0000000..e28b800
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/mock_netconf_client.py
@@ -0,0 +1,216 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+import random
+import time
+from adtran_netconf import AdtranNetconfClient
+from common.utils.asleep import asleep
+from ncclient.operations.rpc import RPCReply, RPCError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = structlog.get_logger()
+
+_dummy_xml = '<rpc-reply message-id="br-549" ' + \
+ 'xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" ' + \
+ 'xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ '<data/>' + \
+ '</rpc-reply>'
+
+class MockNetconfClient(AdtranNetconfClient):
+ """
+ Performs NETCONF requests
+ """
+ def __init__(self, host_ip, port=830, username='', password='', timeout=20):
+ super(MockNetconfClient, self).__init__(host_ip, port=port, username=username,
+ password=password, timeout=timeout)
+ self._connected = False
+ self._locked = {}
+
+ def __str__(self):
+ return "MockNetconfClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+ @property
+ def capabilities(self):
+ """
+ Get the server's NETCONF capabilities
+
+ :return: (ncclient.capabilities.Capabilities) object representing the server's capabilities.
+ """
+ return None
+
+ @property
+ def connected(self):
+ """
+ Is this client connected to a NETCONF server
+ :return: (boolean) True if connected
+ """
+ return self._connected
+
+ @inlineCallbacks
+ def connect(self, connect_timeout=None):
+ """
+ Connect to the NETCONF server
+
+ o To disable attempting publickey authentication altogether, call with
+ allow_agent and look_for_keys as False.`
+
+ o hostkey_verify enables hostkey verification from ~/.ssh/known_hosts
+
+ :return: (deferred) Deferred request
+ """
+ yield asleep(random.uniform(0.1, 5.0)) # Simulate NETCONF request delay
+ self._connected = True
+ self._locked = {}
+ returnValue(True)
+
+ @inlineCallbacks
+ def close(self):
+ """
+ Close the connection to the NETCONF server
+ :return: (deferred) Deferred request
+ """
+ yield asleep(random.uniform(0.1, 0.5)) # Simulate NETCONF request delay
+ self._connected = False
+ self._locked = {}
+ returnValue(True)
+
+ @inlineCallbacks
+ def get_config(self, source='running'):
+ """
+ Get the configuration from the specified source
+
+ :param source: (string) Configuration source, 'running', 'candidate', ...
+ :return: (deferred) Deferred request that wraps the GetReply class
+ """
+ yield asleep(random.uniform(0.1, 4.0)) # Simulate NETCONF request delay
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
+
+ @inlineCallbacks
+ def get(self, payload):
+ """
+ Get the requested data from the server
+
+ :param payload: Payload/filter
+ :return: (defeered) for GetReply
+ """
+ yield asleep(random.uniform(0.1, 3.0)) # Simulate NETCONF request delay
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
+
+ @inlineCallbacks
+ def lock(self, source, lock_timeout):
+ """
+ Lock the configuration system
+ :param source: is the name of the configuration datastore accessed
+ :param lock_timeout: timeout in seconds for holding the lock
+ :return: (defeered) for RpcReply
+ """
+ expire_time = time.time() + lock_timeout
+
+ if source not in self._locked:
+ self._locked[source] = None
+
+ while self._locked[source] is not None:
+ # Watch for lock timeout
+ if time.time() >= self._locked[source]:
+ self._locked[source] = None
+ break
+ yield asleep(0.1)
+
+ if time.time() < expire_time:
+ yield asleep(random.uniform(0.1, 0.5)) # Simulate NETCONF request delay
+ self._locked[source] = expire_time
+
+ returnValue(RPCReply(_dummy_xml) if expire_time > time.time() else RPCError('TODO'))
+
+ @inlineCallbacks
+ def unlock(self, source):
+ """
+ Get the requested data from the server
+ :param rpc_string: RPC request
+ :param source: is the name of the configuration datastore accessed
+ :return: (defeered) for RpcReply
+ """
+ if source not in self._locked:
+ self._locked[source] = None
+
+ if self._locked[source] is not None:
+ yield asleep(random.uniform(0.1, 0.5)) # Simulate NETCONF request delay
+
+ self._locked[source] = None
+ returnValue(RPCReply(_dummy_xml))
+
+ @inlineCallbacks
+ def edit_config(self, config, target='running', default_operation=None,
+ test_option=None, error_option=None, lock_timeout=-1):
+ """
+ Loads all or part of the specified config to the target configuration datastore with the ability to lock
+ the datastore during the edit. To change multiple items, use your own calls to lock/unlock instead of
+ using the lock_timeout value
+
+ :param config is the configuration, which must be rooted in the config element. It can be specified
+ either as a string or an Element.format="xml"
+ :param target is the name of the configuration datastore being edited
+ :param default_operation if specified must be one of { 'merge', 'replace', or 'none' }
+ :param test_option if specified must be one of { 'test_then_set', 'set' }
+ :param error_option if specified must be one of { 'stop-on-error', 'continue-on-error', 'rollback-on-error' }
+ The 'rollback-on-error' error_option depends on the :rollback-on-error capability.
+ :param lock_timeout if >0, the maximum number of seconds to hold a lock on the datastore while the edit
+ operation is underway
+
+ :return: (defeered) for RpcReply
+ """
+ if lock_timeout > 0:
+ try:
+ request = self.lock(target, lock_timeout)
+ yield request
+
+ except Exception as e:
+ log.exception('edit_config Lock Exception: {}'.format(e.message))
+ raise
+ try:
+ yield asleep(random.uniform(0.1, 2.0)) # Simulate NETCONF request delay
+
+ except Exception as e:
+ log.exception('edit_config Edit Exception: {}'.format(e.message))
+ raise
+
+ finally:
+ if lock_timeout > 0:
+ yield self.unlock(target)
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
+
+ @inlineCallbacks
+ def rpc(self, rpc_string):
+ """
+ Custom RPC request
+ :param rpc_string: (string) RPC request
+ :return: (defeered) for GetReply
+ """
+ yield asleep(random.uniform(0.1, 2.0)) # Simulate NETCONF request delay
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index 11a676c..0e20009 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -19,7 +19,7 @@
import structlog
from enum import Enum
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.protos.common_pb2 import OperStatus, AdminState
@@ -48,6 +48,7 @@
assert 'port_no' in kwargs
self.log = structlog.get_logger(port_no=kwargs.get('port_no'))
+ self.log.info('Creating NNI Port')
self._port_no = kwargs.get('port_no')
self._name = kwargs.get('name', 'nni-{}'.format(self._port_no))
@@ -57,25 +58,21 @@
self._deferred = None
self._state = NniPort.State.INITIAL
- self.log.info('Creating NNI Port')
# Local cache of NNI configuration
self._enabled = None
# And optional parameters
+ # TODO: Currently cannot update admin/oper status, so create this enabled and active
+ # self._admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
+ # self._oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
- self._admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
- self._oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
self._label = kwargs.pop('label', 'NNI port {}'.format(self._port_no))
- self._mac_address = kwargs.pop('mac_address',
- '08:00:{}{}:{}{}:{}{}:00'.format(random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9),
- random.randint(0, 9)))
-
+ self._mac_address = kwargs.pop('mac_address', '00:00:00:00:00:00')
+ # TODO: Get with JOT and find out how to pull out MAC Address via NETCONF
# TODO: May need to refine capabilities into current, advertised, and peer
self._ofp_capabilities = kwargs.pop('ofp_capabilities', OFPPF_100GB_FD | OFPPF_FIBER)
@@ -98,6 +95,10 @@
return self._port_no
@property
+ def name(self):
+ return self._name
+
+ @property
def olt(self):
return self._parent
@@ -177,7 +178,6 @@
self._deferred = reactor.callLater(0, self._finish_startup)
return self._deferred
- @inlineCallbacks
def _finish_startup(self):
if self._state != NniPort.State.INITIAL:
returnValue('Done')
@@ -187,23 +187,20 @@
if self._enabled:
self._admin_state = AdminState.ENABLED
self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
- self._state = NniPort.State.RUNNING
+ self._update_adapter_agent()
# TODO: Start status polling of NNI interfaces
self._deferred = None # = reactor.callLater(3, self.do_stuff)
- self._update_adapter_agent()
- returnValue('Enabled')
-
+ self._state = NniPort.State.RUNNING
else:
# Startup failed. Could be due to object creation with an invalid initial admin_status
# state. May want to schedule a start to occur again if this happens
self._admin_state = AdminState.DISABLED
self._oper_status = OperStatus.UNKNOWN
- self._state = NniPort.State.STOPPED
-
self._update_adapter_agent()
- returnValue('Disabled')
+
+ self._state = NniPort.State.STOPPED
def stop(self):
if self._state == NniPort.State.STOPPED:
@@ -305,16 +302,12 @@
<data>
<interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
<interface><name>hundred-gigabit-ethernet 0/1</name></interface>
- <interface><name>hundred-gigabit-ethernet 0/2</name></interface>
- <interface><name>hundred-gigabit-ethernet 0/3</name></interface>
- <interface><name>hundred-gigabit-ethernet 0/4</name></interface>
</interfaces-state>
</data>
</rpc-reply>
"""
return GetReply(raw)
- @inlineCallbacks
def reset(self):
"""
Set the NNI Port to a known good state on initial port startup. Actual
@@ -322,7 +315,7 @@
"""
if self._state != NniPort.State.INITIAL:
self.log.error('Reset ignored, only valid during initial startup', state=self._state)
- returnValue('Ignored')
+ return fail()
self.log.info('Reset {}'.format(self._label))
@@ -330,9 +323,8 @@
self._enabled = True
self._admin_state = AdminState.ENABLED
- returnValue('Enabled')
+ return succeed('Enabled')
- @inlineCallbacks
def set_config(self, leaf, value):
if leaf == 'enabled':
@@ -340,4 +332,4 @@
else:
raise NotImplemented("Leaf '{}' is not supported".format(leaf))
- returnValue('Success')
+ return succeed('Success')
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index 6cba0c8..144d053 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-#
+
import base64
import json
@@ -39,51 +39,77 @@
Wraps an ONU
"""
MIN_ONU_ID = 0
- MAX_ONU_ID = 1022
- BROADCAST_ONU_ID = 1023
+ MAX_ONU_ID = 254
+ BROADCAST_ONU_ID = 255
+ # MAX_ONU_ID = 1022
+ # BROADCAST_ONU_ID = 1023
DEFAULT_PASSWORD = ''
- def __init__(self, serial_number, parent, password=DEFAULT_PASSWORD):
- self.onu_id = parent.get_next_onu_id()
- self.serial_number = serial_number
- self.password = password
- self.parent = parent
+ def __init__(self, serial_number, pon, password=DEFAULT_PASSWORD):
+ self._onu_id = pon.get_next_onu_id()
+
+ if self._onu_id is None:
+ raise ValueError('No ONU ID available')
+
+ self._serial_number = serial_number
+ self._password = password
+ self._pon = pon
+ self._name = 'xpon {}/{}'.format(pon.pon_id, self._onu_id)
try:
sn_ascii = base64.decodestring(serial_number).lower()[:4]
except Exception:
sn_ascii = 'Invalid_VSSN'
- self.vendor_device = _VSSN_TO_VENDOR.get(sn_ascii,
- 'Unsupported_{}'.format(sn_ascii))
+ self._vendor_device = _VSSN_TO_VENDOR.get(sn_ascii,
+ 'Unsupported_{}'.format(sn_ascii))
def __del__(self):
# self.stop()
pass
def __str__(self):
- return "Onu-{}-{}/{} parent: {}".format(self.onu_id, self.serial_number,
- base64.decodestring(self.serial_number),
- self.parent)
+ return "Onu-{}-{}/{} parent: {}".format(self._onu_id, self._serial_number,
+ base64.decodestring(self._serial_number),
+ self._pon)
+
+ @property
+ def pon(self):
+ return self._pon
+
+ @property
+ def olt(self):
+ return self.pon.olt
+
+ @property
+ def onu_id(self):
+ return self._onu_id
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def vendor_device(self):
+ return self._vendor_device
def create(self, enabled):
"""
POST -> /restconf/data/gpon-olt-hw:olt/pon=<pon-id>/onus/onu ->
"""
- pon_id = self.parent.pon_id
- data = json.dumps({'onu-id': self.onu_id,
- 'serial-number': self.serial_number,
+ pon_id = self.pon.pon_id
+ data = json.dumps({'onu-id': self._onu_id,
+ 'serial-number': self._serial_number,
'enable': enabled})
uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(pon_id)
- name = 'onu-create-{}-{}-{}: {}'.format(pon_id, self.onu_id, self.serial_number, enabled)
+ name = 'onu-create-{}-{}-{}: {}'.format(pon_id, self._onu_id, self._serial_number, enabled)
- return self.parent.parent.rest_client.request('POST', uri, data=data, name=name)
+ return self.olt.rest_client.request('POST', uri, data=data, name=name)
def set_config(self, leaf, value):
- pon_id = self.parent.pon_id
- data = json.dumps({'onu-id': self.onu_id,
+ pon_id = self.pon.pon_id
+ data = json.dumps({'onu-id': self._onu_id,
leaf: value})
uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(pon_id)
- name = 'pon-set-config-{}-{}-{}'.format(self.pon_id, leaf, str(value))
- name = 'onu-set-config-{}-{}-{}: {}'.format(pon_id, self.onu_id, leaf, value)
- return self.parent.parent.rest_client.request('PATCH', uri, data=data, name=name)
+ name = 'onu-set-config-{}-{}-{}: {}'.format(pon_id, self._onu_id, leaf, value)
+ return self.olt.rest_client.request('PATCH', uri, data=data, name=name)
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index da1bcc4..99f3652 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -56,6 +56,7 @@
self._parent = parent
self._pon_id = pon_index
self._port_no = port_no
+ self._name = 'xpon {}'.format(pon_index)
self._label = label or 'PON-{}'.format(pon_index)
self._port = None
self._no_onu_discover_tick = 5.0 # TODO: Decrease to 1 or 2 later
@@ -64,8 +65,11 @@
self._onus = {} # serial_number -> ONU (allowed list)
self._next_onu_id = Onu.MIN_ONU_ID
- self._admin_state = admin_state
- self._oper_status = OperStatus.UNKNOWN
+ # TODO: Currently cannot update admin/oper status, so create this enabled and active
+ # self._admin_state = admin_state
+ # self._oper_status = OperStatus.UNKNOWN
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
self._deferred = None
self._state = PonPort.State.INITIAL
@@ -102,6 +106,10 @@
return self._port_no
@property
+ def name(self):
+ return self._name
+
+ @property
def pon_id(self):
return self._pon_id
@@ -150,7 +158,7 @@
self._state = PonPort.State.INITIAL
# Do the rest of the startup in an async method
- self._deferred = reactor.callLater(0, self._finish_startup)
+ self._deferred = reactor.callLater(0.5, self._finish_startup)
return self._deferred
@inlineCallbacks
@@ -161,6 +169,8 @@
if self._state != PonPort.State.INITIAL:
returnValue('Done')
+ self.log.debug('Performing final port startup')
+
if self._enabled is None or self._downstream_fec_enable is None or self._upstream_fec_enable is None:
try:
self._deferred = self.get_pon_config()
@@ -168,7 +178,7 @@
except Exception as e:
self.log.exception('Initial GET of config failed: {}'.format(e.message))
- self._deferred = reactor.callLater(3, self._finish_startup)
+ self._deferred = reactor.callLater(5, self._finish_startup)
returnValue(self._deferred)
# Load cache
@@ -196,7 +206,7 @@
except Exception as e:
self.log.exception('downstream FEC enable failed: {}'.format(str(e)))
- self._deferred = reactor.callLater(3, self._finish_startup)
+ self._deferred = reactor.callLater(5, self._finish_startup)
returnValue(self._deferred)
if not self._upstream_fec_enable:
@@ -207,7 +217,7 @@
except Exception as e:
self.log.exception('upstream FEC enable failed: {}'.format(str(e)))
- self._deferred = reactor.callLater(3, self._finish_startup)
+ self._deferred = reactor.callLater(5, self._finish_startup)
returnValue(self._deferred)
self.log.debug('ONU Startup complete: results: {}'.format(pprint.PrettyPrinter().pformat(results)))
@@ -219,7 +229,7 @@
# Begin to ONU discovery. Once a second if no ONUs found and once every 20
# seconds after one or more ONUs found on the PON
- self._deferred = reactor.callLater(3, self.discover_onus)
+ self._deferred = reactor.callLater(1, self.discover_onus)
self._update_adapter_agent()
returnValue('Enabled')
@@ -298,6 +308,8 @@
self.log.exception('Failed to get current ONU config', e=e)
raise
+ returnValue('Reset complete')
+
def delete(self):
"""
Parent device is being deleted. Do not change any config but
@@ -367,18 +379,17 @@
new = self._process_status_onu_list(status.onus)
for onu_id in new:
- import base64
# self.add_new_onu(serial_number, status)
- self.log.info('Found ONU {}/{} in status list'.format(onu_id, base64.decodestring(onu_id)))
+ self.log.info('Found ONU {} in status list'.format(onu_id))
raise NotImplementedError('TODO: Adding ONUs from existing ONU (status list) not supported')
# Get new/missing from the discovered ONU leaf
new, missing = self._process_status_onu_discovered_list(status.discovered_onu)
- # TODO: Do something useful
- if len(missing):
- self.log.info('Missing ONUs are: {}'.format(missing))
+ # TODO: Do something useful (Does the discovery list clear out activated ONU's?)
+ # if len(missing):
+ # self.log.info('Missing ONUs are: {}'.format(missing))
for serial_number in new:
reactor.callLater(0, self.add_onu, serial_number, status)
@@ -431,12 +442,11 @@
# Newly found and not enabled ONU, enable it now if not at max
if len(self._onus) < self.MAX_ONUS_SUPPORTED:
- # TODO: For now, always allow any ONU
+ # TODO: For now, always allow any ONU to be activated
if serial_number not in self._onus:
- onu = Onu(serial_number, self)
-
try:
+ onu = Onu(serial_number, self)
yield onu.create(True)
self.on_new_onu_discovered(onu)
@@ -461,26 +471,26 @@
"""
olt = self.olt
adapter = self.adapter_agent
+ channel_id = self.olt.get_channel_id(self._pon_id, onu.onu_id)
- proxy = Device.ProxyAddress(device_id=olt.device_id,
- channel_id=self._port_no,
- onu_id=onu.onu_id)
+ proxy = Device.ProxyAddress(device_id=olt.device_id, channel_id=channel_id)
adapter.child_device_detected(parent_device_id=olt.device_id,
parent_port_no=self._port_no,
child_device_type=onu.vendor_device,
proxy_address=proxy,
- admin_state=AdminState.ENABLED)
+ admin_state=AdminState.ENABLED,
+ vlan=channel_id)
def get_next_onu_id(self):
- used_ids = [onu.onu_id for onu in self.onus]
+ used_ids = [onu.onu_id for onu in self._onus.itervalues()]
while True:
- onu_id = self.next_onu_id
- self.next_onu_id += 1
+ onu_id = self._next_onu_id
+ self._next_onu_id += 1
- if self.next_onu_id > Onu.MAX_ONU_ID:
- self.next_onu_id = Onu.MIN_ONU_ID
+ if self._next_onu_id > Onu.MAX_ONU_ID:
+ self._next_onu_id = Onu.MIN_ONU_ID
if onu_id not in used_ids:
return onu_id