Initial Adtran XGS-PON OLT Check-In
Change-Id: I07e4c4a6692d4c3497f4a639a3136f2022d795fe
diff --git a/voltha/adapters/adtran_olt/README.md b/voltha/adapters/adtran_olt/README.md
new file mode 100644
index 0000000..d0344e5
--- /dev/null
+++ b/voltha/adapters/adtran_olt/README.md
@@ -0,0 +1,2 @@
+# Adtran OLT Device Adapter
+
diff --git a/voltha/adapters/adtran_olt/__init__.py b/voltha/adapters/adtran_olt/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/adtran_olt/__init__.py
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
new file mode 100644
index 0000000..ad53b0d
--- /dev/null
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -0,0 +1,498 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+Adtran generic VOLTHA device handler
+"""
+import datetime
+import pprint
+
+import arrow
+import re
+import structlog
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks
+
+from voltha.adapters.adtran_olt.net.adtran_rest import AdtranRestClient
+from voltha.protos import third_party
+from voltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
+from voltha.protos.events_pb2 import AlarmEventType, \
+ AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+from voltha.protos.logical_device_pb2 import LogicalDevice
+from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_switch_features, OFPC_PORT_STATS, \
+ OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+from voltha.registry import registry
+
+_ = third_party
+
+
+class AdtranDeviceHandler(object):
+ """
+ A device that supports the ADTRAN RESTCONF protocol for communications
+ with a VOLTHA/VANILLA managed device.
+
+ Port numbering guidelines for Adtran OLT devices. Derived classes may augment
+ the numbering scheme below as needed.
+
+ - Reserve port 0 for the CPU capture port. All ports to/from this port should
+ be related to messages destined to/from the OpenFlow controller.
+
+ - Begin numbering northbound ports (network facing) at port 1 contiguously.
+ Consider the northbound ports to typically be the highest speed uplinks.
+ If these ports are removable or provided by one or more slots in a chassis
+ subsystem, still reserve the appropriate amount of port numbers whether they
+ are populated or not.
+
+ - Number southbound ports (customer facing) ports next starting at the next
+ available port number. If chassis based, follow the same rules as northbound
+ ports and reserve enough port numbers.
+
+ - Number any out-of-band management ports (if any) last. It will be up to the
+ Device Adapter developer whether to expose these to openflow or not. If you do
+ not expose them, but do have the ports, still reserve the appropriate number of
+ port numbers just in case.
+ """
+ # HTTP shortcuts
+ HELLO_URI = '/restconf/adtran-hello:hello'
+
+ def __init__(self, adapter, device_id, username='', password='', timeout=20):
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.startup = None
+ self.channel = None # Proxy messaging channel with 'send' method
+ self.io_port = None
+ self.logical_device_id = None
+ self.interface = registry('main').get_args().interface
+
+ # Northbound and Southbound ports
+ self.northbound_ports = {} # port number -> Port
+ self.southbound_ports = {} # port number -> Port (For PON, use pon-id as key)
+ self.management_ports = {} # port number -> Port TODO: Not currently supported
+
+ self.num_northbound_ports = None
+ self.num_southbound_ports = None
+ self.num_management_ports = None
+
+ # REST Client
+ self.ip_address = None
+ self.rest_port = None
+ self.rest_timeout = timeout
+ self.rest_username = username
+ self.rest_password = password
+ self.rest_client = None
+
+ # Heartbeat support
+ self.heartbeat_count = 0
+ self.heartbeat_miss = 0
+ self.heartbeat_interval = 10 # TODO: Decrease before release
+ self.heartbeat_failed_limit = 3
+ self.heartbeat_timeout = 5
+ self.heartbeat = None
+ self.heartbeat_last_reason = ''
+
+ self.max_ports = 1 # TODO: Remove later
+
+ def __del__(self):
+ # Kill any startup or heartbeat defers
+
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
+ h, self.heartbeat = self.heartbeat, None
+
+ if h is not None:
+ h.cancel()
+
+ self.northbound_ports.clear()
+ self.southbound_ports.clear()
+
+ def __str__(self):
+ return "AdtranDeviceHandler: {}:{}".format(self.ip_address, self.rest_port)
+
+ @inlineCallbacks
+ def activate(self, device):
+ """
+ Activate the OLT device
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ """
+ self.log.info('AdtranDeviceHandler.activating', device=device)
+
+ if self.logical_device_id is None:
+ if not device.host_and_port:
+ self.activate_failed(device, 'No host_and_port field provided')
+
+ pattern = '(?P<host>[^:/ ]+).?(?P<port>[0-9]*).*'
+ info = re.match(pattern, device.host_and_port)
+
+ if not info or len(info.group('host')) == 0 or len(info.group('port')) == 0 or \
+ (int(info.group('port')) if info.group('port') else None) is None:
+ self.activate_failed(device, 'Invalid Host or Port provided',
+ reachable=False)
+
+ self.ip_address = str(info.group('host'))
+ self.rest_port = int(info.group('port'))
+
+ ############################################################################
+ # Start initial discovery of RESTCONF support (if any)
+ self.rest_client = AdtranRestClient(self.ip_address,
+ self.rest_port,
+ self.rest_username,
+ self.rest_password,
+ self.rest_timeout)
+ try:
+ # content: (dict) Modules from the hello message
+
+ self.startup = self.rest_client.request('GET', self.HELLO_URI, name='hello')
+
+ results = yield self.startup
+ self.log.debug('HELLO Contents: {}'.format(pprint.PrettyPrinter().pformat(results)))
+
+ except Exception as e:
+ results = None
+ self.log.exception('Initial RESTCONF adtran-hello failed', e=e)
+ self.activate_failed(device, e.message, reachable=False)
+
+ ############################################################################
+ # TODO: Get these six via NETCONF and from the derived class
+
+ device.model = 'TODO: Adtran PizzaBox, YUM'
+ device.hardware_version = 'TODO: H/W Version'
+ device.firmware_version = 'TODO: S/W Version'
+ device.software_version = 'TODO: S/W Version'
+ device.serial_number = 'TODO: Serial Number'
+
+ device.root = True
+ device.vendor = 'Adtran, Inc.'
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ try:
+ # Enumerate and create Northbound NNI interfaces
+
+ self.startup = self.enumerate_northbound_ports(device)
+ results = yield self.startup
+
+ self.startup = self.process_northbound_ports(device, results)
+ yield self.startup
+
+ for port in self.northbound_ports.itervalues():
+ self.adapter_agent.add_port(device.id, port.get_port())
+
+ except Exception as e:
+ self.log.exception('Northbound port enumeration and creation failed', e=e)
+ self.activate_failed(device, e.message)
+ results = None
+
+ try:
+ # Enumerate and create southbound interfaces
+
+ self.startup = self.enumerate_southbound_ports(device)
+ results = yield self.startup
+
+ self.startup = self.process_southbound_ports(device, results)
+ yield self.startup
+
+ for port in self.southbound_ports.itervalues():
+ self.adapter_agent.add_port(device.id, port.get_port())
+
+ except Exception as e:
+ self.log.exception('Southbound port enumeration and creation failed', e=e)
+ self.activate_failed(device, e.message)
+
+ # Complete activation by setting up logical device for this OLT and saving
+ # off the devices parent_id
+
+ ld = LogicalDevice(
+ # NOTE: not setting id and datapath_id will let the adapter agent pick id
+ desc=ofp_desc(mfr_desc=device.vendor,
+ hw_desc=device.hardware_version,
+ sw_desc=device.software_version,
+ serial_num=device.serial_number,
+ dp_desc='n/a'),
+ switch_features=ofp_switch_features(n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=(OFPC_FLOW_STATS |
+ OFPC_TABLE_STATS |
+ OFPC_PORT_STATS |
+ OFPC_GROUP_STATS)), # TODO and ditto
+ root_device_id=device.id)
+
+ ld_initialized = self.adapter_agent.create_logical_device(ld)
+
+ # Create logical ports for all southbound and northbound interfaces
+
+ for port in self.northbound_ports.itervalues():
+ lp = port.get_logical_port()
+ if lp is not None:
+ self.adapter_agent.add_logical_port(ld_initialized.id, lp)
+
+ for port in self.southbound_ports.itervalues():
+ lp = port.get_logical_port()
+ if lp is not None:
+ self.adapter_agent.add_logical_port(ld_initialized.id, lp)
+
+ # Set the downlinks in a known good initial state
+
+ try:
+ for port in self.southbound_ports.itervalues():
+ self.startup = port.reset()
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('Failed to reset southbound ports to known good initial state', e=e)
+ self.activate_failed(device, e.message)
+
+ # Start/stop the interfaces as needed
+
+ try:
+ for port in self.northbound_ports.itervalues():
+ self.startup = port.start()
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('Failed to start northbound port(s)', e=e)
+ self.activate_failed(device, e.message)
+
+ try:
+ start_downlinks = self.initial_port_state == AdminState.ENABLED
+
+ for port in self.southbound_ports.itervalues():
+ self.startup = port.start() if start_downlinks else port.stop()
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('Failed to start southbound port(s)', e=e)
+ self.activate_failed(device, e.message)
+
+ # Complete device specific steps
+ try:
+ self.startup = self.complete_device_specific_activation(device, results)
+ if self.startup is not None:
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('Device specific activation failed', e=e)
+ self.activate_failed(device, e.message)
+
+ # Schedule the heartbeat for the device
+
+ self.start_heartbeat(delay=10)
+
+ # Save off logical ID and specify that we active
+
+ self.logical_device_id = ld_initialized.id
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld_initialized.id
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ def activate_failed(self, device, reason, reachable=True):
+ """
+ Activation process (adopt_device) has failed.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :param reason: (string) failure reason
+ :param reachable: (boolean) Flag indicating if device may be reachable
+ via RESTConf or NETConf even after this failure.
+ """
+ device.oper_status = OperStatus.FAILED
+ if not reachable:
+ device.connect_status = ConnectStatus.UNREACHABLE
+
+ device.reason = reason
+ self.adapter_agent.update_device(device)
+ raise RuntimeError('Failed to activate OLT: {}'.format(device.reason))
+
+ @inlineCallbacks
+ def enumerate_northbound_ports(self, device):
+ """
+ Enumerate all northbound ports of a device. You should override
+ this method in your derived class as necessary. Should you encounter
+ a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def process_northbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_northbound_ports' method.
+ You should override this method in your derived class as necessary and
+ create an NNI Port object (of your own choosing) that supports a 'get_port'
+ method. Once created, insert it into this base class's northbound_ports
+ collection.
+
+ Should you encounter a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_northbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return:
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def enumerate_southbound_ports(self, device):
+ """
+ Enumerate all southbound ports of a device. You should override
+ this method in your derived class as necessary. Should you encounter
+ a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def process_southbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_southbound_ports' method.
+ You should override this method in your derived class as necessary and
+ create an Port object (of your own choosing) that supports a 'get_port'
+ method. Once created, insert it into this base class's southbound_ports
+ collection.
+
+ Should you encounter a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_southbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return:
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def complete_device_specific_activation(self, _device, _content):
+ return None
+
+ def deactivate(self, device):
+ # Clear off logical device ID
+ self.logical_device_id = None
+
+ # Kill any heartbeat poll
+ h, self.heartbeat = self.heartbeat, None
+
+ if h is not None:
+ h.cancel()
+
+ @inlineCallbacks
+ def get_device_info(self, device):
+ """
+ Perform an initial network operation to discover the device hardware
+ and software version. Serial Number would be helpful as well.
+
+ Upon successfully retrieving the information, remember to call the
+ 'start_heartbeat' method to keep in contact with the device being managed
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ """
+ pass
+ return None # raise NotImplementedError('TODO: You should override this in your derived class???')
+
+ def start_heartbeat(self, delay=10):
+ assert delay > 1
+ self.heartbeat = reactor.callLater(delay, self.check_pulse)
+
+ def check_pulse(self):
+ if self.logical_device_id is not None:
+ self.heartbeat = self.rest_client.request('GET', self.HELLO_URI, name='hello')
+ self.heartbeat.addCallbacks(self.heartbeat_check_status, self.heartbeat_fail)
+
+ def heartbeat_check_status(self, results):
+ """
+ Check the number of heartbeat failures against the limit and emit an alarm if needed
+ """
+ device = self.adapter_agent.get_device(self.device_id)
+
+ if self.heartbeat_miss >= self.heartbeat_failed_limit and device.connect_status == ConnectStatus.REACHABLE:
+ self.log.warning('olt-heartbeat-failed', count=self.heartbeat_miss)
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.FAILED
+ device.reason = self.heartbeat_last_reason
+ self.adapter_agent.update_device(device)
+
+ self.heartbeat_alarm(self.device_id, False, self.heartbeat_miss)
+ else:
+ assert results
+ # Update device states
+
+ self.log.info('heartbeat success')
+
+ if device.connect_status != ConnectStatus.REACHABLE:
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+
+ self.heartbeat_alarm(self.device_id, True)
+
+ self.heartbeat_miss = 0
+ self.heartbeat_last_reason = ''
+ self.heartbeat_count += 1
+
+ # Reschedule next heartbeat
+ if self.logical_device_id is not None:
+ self.heartbeat = reactor.callLater(self.heartbeat_interval, self.check_pulse)
+
+ def heartbeat_fail(self, failure):
+ self.heartbeat_miss += 1
+ self.log.info('heartbeat-miss', failure=failure,
+ count=self.heartbeat_count, miss=self.heartbeat_miss)
+ self.heartbeat_check_status(None)
+
+ def heartbeat_alarm(self, device_id, status, heartbeat_misses=0):
+ try:
+ ts = arrow.utcnow().timestamp
+ alarm_data = {'heartbeats_missed': str(heartbeat_misses)}
+
+ alarm_event = self.adapter_agent.create_alarm(
+ id='voltha.{}.{}.olt'.format(self.adapter.name, device_id),
+ resource_id='olt',
+ type=AlarmEventType.EQUIPMENT,
+ category=AlarmEventCategory.PON,
+ severity=AlarmEventSeverity.CRITICAL,
+ state=AlarmEventState.RAISED if status else AlarmEventState.CLEARED,
+ description='OLT Alarm - Heartbeat - {}'.format('Raised'
+ if status
+ else 'Cleared'),
+ context=alarm_data,
+ raised_ts=ts)
+ self.adapter_agent.submit_alarm(device_id, alarm_event)
+
+ except Exception as e:
+ self.log.exception('failed-to-submit-alarm', e=e)
+
+ @staticmethod
+ def parse_module_revision(revision):
+ try:
+ return datetime.datetime.strptime(revision, '%Y-%m-%d')
+ except Exception:
+ return None
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
new file mode 100644
index 0000000..c5ae568
--- /dev/null
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -0,0 +1,311 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Adtran 1-U OLT adapter.
+"""
+import structlog
+from twisted.internet import reactor
+from zope.interface import implementer
+
+from adtran_olt_handler import AdtranOltHandler
+from voltha.adapters.interface import IAdapterInterface
+from voltha.protos import third_party
+from voltha.protos.adapter_pb2 import Adapter
+from voltha.protos.adapter_pb2 import AdapterConfig
+from voltha.protos.common_pb2 import LogLevel
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes
+from voltha.protos.health_pb2 import HealthStatus
+from voltha.registry import registry
+
+_ = third_party
+log = structlog.get_logger()
+
+
+@implementer(IAdapterInterface)
+class AdtranOltAdapter(object):
+ name = 'adtran_olt'
+
+ supported_device_types = [
+ DeviceType(
+ id=name,
+ adapter=name,
+ accepts_bulk_flow_update=True
+ )
+ ]
+
+ def __init__(self, adapter_agent, config):
+ self.adapter_agent = adapter_agent
+ self.config = config
+ self.descriptor = Adapter(
+ id=self.name,
+ vendor='Adtran Inc.',
+ version='0.1',
+ config=AdapterConfig(log_level=LogLevel.INFO)
+ )
+ log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
+ self.devices_handlers = dict() # device_id -> AdtranOltHandler()
+ self.interface = registry('main').get_args().interface
+ # self.logical_device_id_to_root_device_id = dict()
+
+ def start(self):
+ """
+ Called once after adapter instance is loaded. Can be used to async
+ initialization.
+
+ :return: (None or Deferred)
+ """
+ log.debug('starting', interface=self.interface)
+ log.info('started', interface=self.interface)
+
+ def stop(self):
+ """
+ Called once before adapter is unloaded. It can be used to perform
+ any cleanup after the adapter.
+
+ :return: (None or Deferred)
+ """
+ log.debug('stopping', interface=self.interface)
+ log.info('stopped', interface=self.interface)
+
+ def adapter_descriptor(self):
+ """
+ Return the adapter descriptor object for this adapter.
+
+ :return: voltha.Adapter grpc object (see voltha/protos/adapter.proto),
+ with adapter-specific information and config extensions.
+ """
+ log.debug('get descriptor', interface=self.interface)
+ return self.descriptor
+
+ def device_types(self):
+ """
+ Return list of device types supported by the adapter.
+
+ :return: voltha.DeviceTypes protobuf object, with optional type
+ specific extensions.
+ """
+ log.debug('get device_types', interface=self.interface, items=self.supported_device_types)
+ return DeviceTypes(items=self.supported_device_types)
+
+ def health(self):
+ """
+ Return a 3-state health status using the voltha.HealthStatus message.
+
+ :return: Deferred or direct return with voltha.HealthStatus message
+ """
+ log.debug('get health', interface=self.interface)
+ return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
+
+ def change_master_state(self, master):
+ """
+ Called to indicate if plugin shall assume or lose master role. The
+ master role can be used to perform functions that must be performed
+ from a single point in the cluster. In single-node deployments of
+ Voltha, the plugins are always in master role.
+
+ :param master: (bool) True to indicate the mastership needs to be
+ assumed; False to indicate that mastership needs to be abandoned.
+ :return: (Deferred) which is fired by the adapter when mastership is
+ assumed/dropped, respectively.
+ """
+ log.debug('change_master_state', interface=self.interface, master=master)
+ raise NotImplementedError()
+
+ def adopt_device(self, device):
+ """
+ Make sure the adapter looks after given device. Called when a device
+ is provisioned top-down and needs to be activated by the adapter.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+ log.info('adopt-device', device=device)
+ self.devices_handlers[device.id] = AdtranOltHandler(self, device.id)
+ reactor.callLater(0, self.devices_handlers[device.id].activate, device)
+ return device
+
+ def abandon_device(self, device):
+ """
+ Make sure the adapter no longer looks after device. This is called
+ if device ownership is taken over by another Voltha instance.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge abandonment.
+ """
+ log.info('abandon-device', device=device)
+ handler = self.devices_handlers.pop(device.id)
+
+ if handler is not None:
+ reactor.callLater(0, handler.deactivate, device)
+
+ return device
+
+ def disable_device(self, device):
+ """
+ This is called when a previously enabled device needs to be disabled
+ based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge disabling the device.
+ """
+ log.debug('disable_device', device=device)
+ raise NotImplementedError()
+
+ def reenable_device(self, device):
+ """
+ This is called when a previously disabled device needs to be enabled
+ based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge re-enabling the device.
+ """
+ log.debug('reenable_device', device=device)
+ raise NotImplementedError()
+
+ def reboot_device(self, device):
+ """
+ This is called to reboot a device based on a NBI call. The admin
+ state of the device will not change after the reboot
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the reboot.
+ """
+ log.info('reboot_device', device=device)
+ raise NotImplementedError()
+
+ def delete_device(self, device):
+ """
+ This is called to delete a device from the PON based on a NBI call.
+ If the device is an OLT then the whole PON will be deleted.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the deletion.
+ """
+ log.info('delete_device', device=device)
+ raise NotImplementedError()
+
+ def get_device_details(self, device):
+ """
+ This is called to get additional device details based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the retrieval of
+ additional details.
+ """
+ log.debug('get_device_details', device=device)
+ raise NotImplementedError()
+
+ def update_flows_bulk(self, device, flows, groups):
+ """
+ Called after any flow table change, but only if the device supports
+ bulk mode, which is expressed by the 'accepts_bulk_flow_update'
+ capability attribute of the device type.
+
+ :param device: A Voltha.Device object.
+ :param flows: An openflow_v13.Flows object
+ :param groups: An openflow_v13.Flows object
+ :return: (Deferred or None)
+ """
+ log.info('bulk-flow-update', device_id=device.id, flows=flows,
+ groups=groups)
+ assert len(groups.items) == 0, "Cannot yet deal with groups"
+ raise NotImplementedError()
+ handler = self.devices_handlers[device.id]
+ return handler.update_flow_table(flows.items, device)
+
+ def update_flows_incrementally(self, device, flow_changes, group_changes):
+ """
+ [This mode is not supported yet.]
+
+ :param device: A Voltha.Device object.
+ :param flow_changes:
+ :param group_changes:
+ :return:
+ """
+ log.debug('update_flows_incrementally', device=device, flow_changes=flow_changes,
+ group_changes=group_changes)
+ raise NotImplementedError()
+
+ def update_pm_config(self, device, pm_configs):
+ """
+ Called every time a request is made to change pm collection behavior
+ :param device: A Voltha.Device object
+ :param pm_configs: A Pms
+ """
+ log.debug('update_pm_config', device=device, pm_configs=pm_configs)
+ raise NotImplementedError()
+
+ def send_proxied_message(self, proxy_address, msg):
+ """
+ Forward a msg to a child device of device, addressed by the given
+ proxy_address=Device.ProxyAddress().
+
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device.
+ :param msg: (str) The actual message to send.
+ :return: (Deferred(None) or None) The return of this method should
+ indicate that the message was successfully *sent*.
+ """
+ log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
+ handler = self.devices_handlers[proxy_address.device_id]
+ handler.send_proxied_message(proxy_address, msg)
+
+ def receive_proxied_message(self, proxy_address, msg):
+ """
+ Pass an async message (arrived via a proxy) to this device.
+
+ :param proxy_address: Address info for the parent device
+ to route the message to the child device. This was given to the
+ child device by the parent device at the creation of the child
+ device. Note this is the proxy_address with which the adapter
+ had to register prior to receiving proxied messages.
+ :param msg: (str) The actual message received.
+ :return: None
+ """
+ log.debug('receive_proxied_message', proxy_address=proxy_address, msg=msg)
+ raise NotImplementedError()
+
+ def receive_packet_out(self, logical_device_id, egress_port_no, msg):
+ """
+ Pass a packet_out message content to adapter so that it can forward it
+ out to the device. This is only called on root devices.
+
+ :param logical_device_id:
+ :param egress_port_no: egress logical port number
+ :param msg: actual message
+ :return: None
+ """
+ log.info('packet-out', logical_device_id=logical_device_id,
+ egress_port_no=egress_port_no, msg_len=len(msg))
+ raise NotImplementedError()
+
+ def receive_inter_adapter_message(self, msg):
+ """
+ Called when the adapter recieves a message that was sent to it directly
+ from another adapter. An adapter may register for these messages by calling
+ the register_for_inter_adapter_messages() method in the adapter agent.
+ Note that it is the responsibility of the sending and receiving
+ adapters to properly encode and decode the message.
+ :param msg: The message contents.
+ :return: None
+ """
+ log.info('rx_inter_adapter_msg')
+ raise NotImplementedError()
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
new file mode 100644
index 0000000..f037565
--- /dev/null
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -0,0 +1,366 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import datetime
+import pprint
+import random
+
+from twisted.internet import reactor
+from twisted.internet.defer import returnValue, inlineCallbacks
+
+from adtran_device_handler import AdtranDeviceHandler
+from codec.olt_state import OltState
+from net.adtran_zmq import AdtranZmqClient
+from voltha.extensions.omci.omci import *
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Device
+from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE
+
+
+# from ncclient import manager
+
+
+class AdtranOltHandler(AdtranDeviceHandler):
+ """
+ The OLT Handler is used to wrap a single instance of a 10G OLT 1-U pizza-box
+ """
+ MIN_OLT_HW_VERSION = datetime.datetime(2017, 1, 5)
+
+ # Full table output
+
+ GPON_OLT_HW_URI = '/restconf/data/gpon-olt-hw'
+ GPON_OLT_HW_STATE_URI = '/restconf/data/gpon-olt-hw:olt-state'
+ GPON_PON_CONFIG_LIST_URI = '/restconf/data/gpon-olt-hw:olt/pon'
+
+ # Per-PON info
+
+ GPON_PON_PON_STATE_URI = '/restconf/data/gpon-olt-hw:olt-state/pon={}' # .format(pon)
+ GPON_PON_CONFIG_URI = '/restconf/data/gpon-olt-hw:olt/pon={}' # .format(pon)
+ GPON_PON_ONU_CONFIG_URI = '/restconf/data/gpon-olt-hw:olt/pon={}/onus/onu' # .format(pon)
+
+ GPON_PON_DISCOVER_ONU = '/restconf/operations/gpon-olt-hw:discover-onu'
+
+ def __init__(self, adapter, device_id, username="", password="",
+ timeout=20, initial_port_state=True):
+ super(AdtranOltHandler, self).__init__(adapter, device_id, username=username,
+ password=password, timeout=timeout)
+ self.gpon_olt_hw_revision = None
+ self.status_poll = None
+ self.status_poll_interval = 5.0
+ self.status_poll_skew = self.status_poll_interval / 10
+ self.initial_port_state = AdminState.ENABLED if initial_port_state else AdminState.DISABLED
+ self.initial_onu_state = AdminState.DISABLED
+
+ self.zmq_client = None
+ self.nc_client = None
+
+ def __del__(self):
+ # OLT Specific things here.
+ #
+ # If you receive this during 'enable' of the object, you probably threw an
+ # uncaught exception which trigged an errback in the VOLTHA core.
+
+ d, self.status_poll = self.status_poll, None
+
+ # TODO Any OLT device specific cleanup here
+ # def get_channel(self):
+ # if self.channel is None:
+ # device = self.adapter_agent.get_device(self.device_id)
+ # return self.channel
+ #
+ # Clean up base class as well
+
+ AdtranDeviceHandler.__del__(self)
+
+ def __str__(self):
+ return "AdtranOltHandler: {}:{}".format(self.ip_address, self.rest_port)
+
+ @inlineCallbacks
+ def enumerate_northbound_ports(self, device):
+ """
+ Enumerate all northbound ports of this device.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ # TODO: For now, hard code some JSON. Eventually will be XML from NETConf
+
+ ports = [
+ {'port_no': 1,
+ 'admin_state': AdminState.ENABLED,
+ 'oper_status': OperStatus.ACTIVE,
+ 'ofp_state': OFPPS_LIVE,
+ 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
+ 'current_speed': OFPPF_100GB_FD,
+ 'max_speed': OFPPF_100GB_FD},
+ {'port_no': 2,
+ 'admin_state': AdminState.ENABLED,
+ 'oper_status': OperStatus.ACTIVE,
+ 'ofp_state': OFPPS_LIVE,
+ 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
+ 'current_speed': OFPPF_100GB_FD,
+ 'max_speed': OFPPF_100GB_FD},
+ {'port_no': 3,
+ 'admin_state': AdminState.ENABLED,
+ 'oper_status': OperStatus.ACTIVE,
+ 'ofp_state': OFPPS_LIVE,
+ 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
+ 'current_speed': OFPPF_100GB_FD,
+ 'max_speed': OFPPF_100GB_FD},
+ {'port_no': 4,
+ 'admin_state': AdminState.ENABLED,
+ 'oper_status': OperStatus.ACTIVE,
+ 'ofp_state': OFPPS_LIVE,
+ 'ofp_capabilities': OFPPF_100GB_FD | OFPPF_FIBER,
+ 'current_speed': OFPPF_100GB_FD,
+ 'max_speed': OFPPF_100GB_FD}
+ ]
+
+ yield returnValue(ports)
+
+ def process_northbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_northbound_ports' method.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_northbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return: (Deferred or None).
+ """
+ from nni_port import NniPort
+
+ for port in results:
+ port_no = port['port_no']
+ self.log.info('Processing northbound port {}/{}'.format(port_no, port['port_no']))
+ assert port_no
+ assert port_no not in self.northbound_ports
+ self.northbound_ports[port_no] = NniPort(self, **port)
+
+ self.num_northbound_ports = len(self.northbound_ports)
+
+ @inlineCallbacks
+ def enumerate_southbound_ports(self, device):
+ """
+ Enumerate all southbound ports of this device.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ ###############################################################################
+ # Determine number of southbound ports. We know it is 16, but this keeps this
+ # device adapter generic for our other OLTs up to this point.
+
+ self.startup = self.rest_client.request('GET', self.GPON_PON_CONFIG_LIST_URI, 'pon-config')
+ results = yield self.startup
+ returnValue(results)
+
+ def process_southbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_southbound_ports' method.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_southbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return: (Deferred or None).
+ """
+ from pon_port import PonPort
+
+ for pon in results:
+
+ # Number PON Ports after the NNI ports
+ pon_id = pon['pon-id']
+ log.info('Processing pon port {}'.format(pon_id))
+
+ assert pon_id not in self.southbound_ports
+
+ admin_state = AdminState.ENABLED if pon.get('enabled',
+ PonPort.DEFAULT_ENABLED) else AdminState.DISABLED
+
+ self.southbound_ports[pon_id] = PonPort(pon_id,
+ self._pon_id_to_port_number(pon_id),
+ self,
+ admin_state=admin_state)
+
+ # TODO: For now, limit number of PON ports to make debugging easier
+
+ if len(self.southbound_ports) >= self.max_ports:
+ break
+
+ self.num_southbound_ports = len(self.southbound_ports)
+
+ def complete_device_specific_activation(self, device, results):
+ """
+ Perform an initial network operation to discover the device hardware
+ and software version. Serial Number would be helpful as well.
+
+ This method is called from within the base class's activate generator.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+
+ :param results: (dict) original adtran-hello RESTCONF results body
+ """
+ #
+ # For the pizzabox OLT, periodically query the OLT state of all PONs. This
+ # is simpler then having each PON port do its own poll. From this, we can:
+ #
+ # o Discover any new or missing ONT/ONUs
+ #
+ # o TODO Discover any LOS for any ONT/ONUs
+ #
+ # o TODO Update some PON level statistics
+
+ self.zmq_client = AdtranZmqClient(self.ip_address, self.rx_packet)
+ # self.nc_client = manager.connect(host='', # self.ip_address,
+ # username=self.rest_username,
+ # password=self.rest_password,
+ # hostkey_verify=False,
+ # allow_agent=False,
+ # look_for_keys=False)
+
+ self.status_poll = reactor.callLater(1, self.poll_for_status)
+ return None
+
+ def rx_packet(self, message):
+ try:
+ self.log.info('rx_Packet: Message from ONU')
+
+ pon_id, onu_id, msg, is_omci = AdtranZmqClient.decode_packet(message)
+
+ if is_omci:
+ proxy_address = Device.ProxyAddress(device_id=self.device_id,
+ channel_id=self._get_channel_id(pon_id, onu_id),
+ onu_id=onu_id)
+
+ self.adapter_agent.receive_proxied_message(proxy_address, msg)
+ else:
+ pass # TODO: Packet in support not yet supported
+ # self.adapter_agent.send_packet_in(logical_device_id=logical_device_id,
+ # logical_port_no=cvid, # C-VID encodes port no
+ # packet=str(msg))
+ except Exception as e:
+ self.log.exception('Exception during RX Packet processing', e=e)
+
+ def poll_for_status(self):
+ self.log.debug('Initiating status poll')
+
+ device = self.adapter_agent.get_device(self.device_id)
+
+ if device.admin_state == AdminState.ENABLED:
+ uri = AdtranOltHandler.GPON_OLT_HW_STATE_URI
+ name = 'pon-status-poll'
+ self.startup = self.rest_client.request('GET', uri, name=name)
+ self.startup.addBoth(self.status_poll_complete)
+
+ def status_poll_complete(self, results):
+ """
+ Results of the status poll
+
+ :param results:
+ """
+ self.log.debug('Status poll results: {}'.
+ format(pprint.PrettyPrinter().pformat(results)))
+
+ if isinstance(results, dict) and 'pon' in results:
+ try:
+ for pon_id, pon in OltState(results).pons.iteritems():
+ if pon_id in self.southbound_ports:
+ self.southbound_ports[pon_id].process_status_poll(pon)
+
+ except Exception as e:
+ self.log.exception('Exception during PON status poll processing', e=e)
+ else:
+ self.log.warning('Had some kind of polling error')
+
+ # Reschedule
+
+ delay = self.status_poll_interval
+ delay += random.uniform(-delay / 10, delay / 10)
+
+ self.status_poll = reactor.callLater(delay, self.poll_for_status)
+
+ @inlineCallbacks
+ def deactivate(self, device):
+ # OLT Specific things here
+
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
+ self.pons.clear()
+
+ # TODO: Any other? OLT specific deactivate steps
+
+ # Call into base class and have it clean up as well
+ super(AdtranOltHandler, self).deactivate(device)
+
+ @inlineCallbacks
+ def update_flow_table(self, flows, device):
+ self.log.info('bulk-flow-update', device_id=device.id, flows=flows)
+ raise NotImplementedError('TODO: Not yet implemented')
+
+ @inlineCallbacks
+ def send_proxied_message(self, proxy_address, msg):
+ self.log.info('sending-proxied-message: message type: {}'.format(type(msg)))
+
+ if isinstance(msg, Packet):
+ msg = str(msg)
+
+ if self.zmq_client is not None:
+ pon_id = self._channel_id_to_pon_id(proxy_address.channel_id, proxy_address.onu_id)
+ onu_id = proxy_address.onu_id
+
+ data = AdtranZmqClient.encode_omci_message(msg, pon_id, onu_id)
+
+ try:
+ self.zmq_client.send(data)
+
+ except Exception as e:
+ self.log.info('zmqClient.send exception', exc=str(e))
+ raise
+
+ @staticmethod
+ def is_gpon_olt_hw(content):
+ """
+ If the hello content
+
+ :param content: (dict) Results of RESTCONF adtran-hello GET request
+ :return: (string) GPON OLT H/w RESTCONF revision number or None on error/not GPON
+ """
+ for item in content.get('module-info', None):
+ if item.get('module-name') == 'gpon-olt-hw':
+ return AdtranDeviceHandler.parse_module_revision(item.get('revision', None))
+ return None
+
+ def _onu_offset(self, onu_id):
+ return self.num_northbound_ports + self.num_southbound_ports + onu_id
+
+ def _get_channel_id(self, pon_id, onu_id):
+ from pon_port import PonPort
+
+ return self._onu_offset(onu_id) + (pon_id * PonPort.MAX_ONUS_SUPPORTED)
+
+ def _channel_id_to_pon_id(self, channel_id, onu_id):
+ from pon_port import PonPort
+
+ return (channel_id - self._onu_offset(onu_id)) / PonPort.MAX_ONUS_SUPPORTED
+
+ def _pon_id_to_port_number(self, pon_id):
+ return pon_id + 1 + self.num_northbound_ports
diff --git a/voltha/adapters/adtran_olt/codec/__init__.py b/voltha/adapters/adtran_olt/codec/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/adtran_olt/codec/__init__.py
diff --git a/voltha/adapters/adtran_olt/codec/olt_config.py b/voltha/adapters/adtran_olt/codec/olt_config.py
new file mode 100644
index 0000000..e639f9a
--- /dev/null
+++ b/voltha/adapters/adtran_olt/codec/olt_config.py
@@ -0,0 +1,153 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import pprint
+
+import os
+import structlog
+
+log = structlog.get_logger()
+
+
+class OltConfig(object):
+ """
+ Class to wrap decode of olt container (config) from the ADTRAN
+ gpon-olt-hw.yang YANG model
+ """
+
+ def __init__(self, packet):
+ self._packet = packet
+ self._pons = None
+
+ def __str__(self):
+ return "OltConfig: {}".format(self.software_version)
+
+ @property
+ def olt_id(self):
+ """Unique OLT identifier"""
+ return self._packet.get('olt-id', '')
+
+ @property
+ def debug_output(self):
+ """least important level that will output everything"""
+ return self._packet.get('debug-output', 'warning')
+
+ @property
+ def pons(self):
+ if self._pons is None:
+ self._pons = OltConfig.Pon.decode(self._packet.get('pon', None))
+ return self._pons
+
+ class Pon(object):
+ """
+ Provides decode of PON list from within
+ """
+
+ def __init__(self, packet):
+ assert 'pon-id' in packet
+ self._packet = packet
+ self._onus = None
+
+ def __str__(self):
+ return "OltConfig.Pon: pon-id: {}".format(self.pon_id)
+
+ @staticmethod
+ def decode(pon_list):
+ log.info('Decoding PON List:{}{}'.format(os.linesep,
+ pprint.PrettyPrinter().pformat(pon_list)))
+ pons = {}
+ for pon_data in pon_list:
+ pon = OltConfig.Pon(pon_data)
+ assert pon.pon_id not in pons
+ pons[pon.pon_id] = pon
+
+ return pons
+
+ @property
+ def pon_id(self):
+ """PON identifier"""
+ return self._packet['pon-id']
+
+ @property
+ def enabled(self):
+ """The desired state of the interface"""
+ return self._packet.get('enabled', True)
+
+ @property
+ def downstream_fec_enable(self):
+ """Enables downstream Forward Error Correction"""
+ return self._packet.get('downstream-fec-enable', False)
+
+ @property
+ def upstream_fec_enable(self):
+ """Enables upstream Forward Error Correction"""
+ return self._packet.get('upstream-fec-enable', False)
+
+ @property
+ def deployment_range(self):
+ """Maximum deployment distance (meters)"""
+ return self._packet.get('deployment-range', 25000)
+
+ @property
+ def onus(self):
+ if self._onus is None:
+ self._onus = OltConfig.Pon.decode(self._packet.get('pon', None))
+ return self._onus
+
+ class Onu(object):
+ """
+ Provides decode of onu list for a PON port
+ """
+
+ def __init__(self, packet):
+ assert 'onu-id' in packet
+ self._packet = packet
+
+ def __str__(self):
+ return "OltConfig.Pon.Onu: onu-id: {}".format(self.onu_id)
+
+ @staticmethod
+ def decode(onu_list):
+ log.debug('onus:{}{}'.format(os.linesep,
+ pprint.PrettyPrinter().pformat(onu_list)))
+ onus = {}
+ for onu_data in onu_list:
+ onu = OltConfig.Pon.Onu(onu_data)
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
+
+ return onus
+
+ @property
+ def onu_id(self):
+ """The ID used to identify the ONU"""
+ return self._packet['onu-id']
+
+ @property
+ def serial_number(self):
+ """The serial number is unique for each ONU"""
+ return self._packet.get('serial-number', '')
+
+ @property
+ def password(self):
+ """ONU Password"""
+ return self._packet.get('password', bytes(0))
+
+ @property
+ def enable(self):
+ """If true, places the ONU in service"""
+ return self._packet.get('enable', False)
+
+ # TODO: TCONT and GEM lists
diff --git a/voltha/adapters/adtran_olt/codec/olt_state.py b/voltha/adapters/adtran_olt/codec/olt_state.py
new file mode 100644
index 0000000..32b43aa
--- /dev/null
+++ b/voltha/adapters/adtran_olt/codec/olt_state.py
@@ -0,0 +1,228 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import pprint
+
+import os
+import structlog
+
+log = structlog.get_logger()
+
+
+class OltState(object):
+ """
+ Class to wrap decode of olt-state container from the ADTRAN
+ gpon-olt-hw.yang YANG model
+ """
+
+ def __init__(self, packet):
+ self._packet = packet
+ self._pons = None
+
+ def __str__(self):
+ return "OltState: {}".format(self.software_version)
+
+ @property
+ def software_version(self):
+ """The software version of olt driver"""
+ return self._packet.get('software-version', '')
+
+ @property
+ def pons(self):
+ if self._pons is None:
+ self._pons = OltState.Pon.decode(self._packet.get('pon', None))
+ return self._pons
+
+ #############################################################
+ # Act like a container for simple access into PON list
+
+ def __len__(self):
+ return len(self.pons)
+
+ def __getitem__(self, key):
+ if not isinstance(key, int):
+ raise TypeError('Key should be of type int')
+ if key not in self.pons:
+ raise KeyError("key '{}' not found".format(key))
+
+ return self.pons[key]
+
+ def __iter__(self):
+ raise NotImplementedError('TODO: Not yet implemented')
+
+ def __contains__(self, item):
+ if not isinstance(item, int):
+ raise TypeError('Item should be of type int')
+ return item in self.pons
+
+ # TODO: Look at generator support and if it is useful
+
+ class Pon(object):
+ """
+ Provides decode of PON list from within
+ """
+
+ def __init__(self, packet):
+ assert 'pon-id' in packet
+ self._packet = packet
+ self._onus = None
+
+ def __str__(self):
+ return "OltState.Pon: pon-id: {}".format(self.pon_id)
+
+ @staticmethod
+ def decode(pon_list):
+ log.info('Decoding PON List:{}{}'.format(os.linesep,
+ pprint.PrettyPrinter().pformat(pon_list)))
+ pons = {}
+ for pon_data in pon_list:
+ pon = OltState.Pon(pon_data)
+ assert pon.pon_id not in pons
+ pons[pon.pon_id] = pon
+
+ return pons
+
+ @property
+ def pon_id(self):
+ """PON identifier"""
+ return self._packet['pon-id']
+
+ @property
+ def downstream_wavelength(self):
+ """The wavelength, in nanometers, being used in the downstream direction"""
+ return self._packet.get('downstream-wavelength', 0)
+
+ @property
+ def upstream_wavelength(self):
+ """The wavelength, in nanometers, being used in the upstream direction"""
+ return self._packet.get('upstream-wavelength', 0)
+
+ @property
+ def downstream_channel_id(self):
+ """Downstream wavelength channel identifier associated with this PON."""
+ return self._packet.get('downstream-channel-id', 0)
+
+ @property
+ def rx_packets(self):
+ """Sum all of the RX Packets of GEM ports that are not base TCONT's"""
+ return self._packet.get('rx-packets', 0)
+
+ @property
+ def tx_packets(self):
+ """Sum all of the TX Packets of GEM ports that are not base TCONT's"""
+ return self._packet.get('tx-packets', 0)
+
+ @property
+ def rx_bytes(self):
+ """Sum all of the RX Octets of GEM ports that are not base TCONT's"""
+ return self._packet.get('rx-bytes', 0)
+
+ @property
+ def tx_bytes(self):
+ """Sum all of the TX Octets of GEM ports that are not base TCONT's"""
+ return self._packet.get('tx-bytes', 0)
+
+ @property
+ def tx_bip_errors(self):
+ """Sum the TX ONU bip errors to get TX BIP's per PON"""
+ return self._packet.get('tx-bip-errors', 0)
+
+ @property
+ def wm_tuned_out_onus(self):
+ """
+ bit array indicates the list of tuned out ONU's that are in wavelength
+ mobility protecting state.
+ onu-bit-octects:
+ type binary { length "4 .. 1024"; }
+ description each bit position indicates corresponding ONU's status
+ (true or false) whether that ONU's is in
+ wavelength mobility protecting state or not
+ For 128 ONTs per PON, the size of this
+ array will be 16. onu-bit-octects[0] and MSB bit in that byte
+ represents ONU 0 etc.
+ """
+ return self._packet.get('wm-tuned-out-onus', bytes(0))
+
+ @property
+ def ont_los(self):
+ """List of configured ONTs that have been previously discovered and are in a los of signal state"""
+ return self._packet.get('ont-los', [])
+
+ @property
+ def discovered_onu(self):
+ """
+ Immutable Set of each Optical Network Unit(ONU) that has been activated via discovery
+ key/value: serial-number (string)
+ """
+ return frozenset([sn['serial-number'] for sn in self._packet.get('discovered-onu', [])
+ if 'serial-number' in sn])
+
+ @property
+ def gems(self):
+ """This list is not in the proposed BBF model, the stats are part of ietf-interfaces"""
+ raise NotImplementedError('TODO: not yet supported')
+
+ @property
+ def onus(self):
+ """
+ The map of each Optical Network Unit(ONU). Key: ONU ID (int)
+ """
+ if self._onus is None:
+ self._onus = OltState.Pon.Onu.decode(self._packet.get('onu', []))
+ return self._onus
+
+ class Onu(object):
+ """
+ Provides decode of onu list for a PON port
+ """
+
+ def __init__(self, packet):
+ assert 'onu-id' in packet
+ self._packet = packet
+
+ def __str__(self):
+ return "OltState.Pon.Onu: onu-id: {}".format(self.onu_id)
+
+ @staticmethod
+ def decode(onu_list):
+ log.debug('onus:{}{}'.format(os.linesep,
+ pprint.PrettyPrinter().pformat(onu_list)))
+ onus = {}
+ for onu_data in onu_list:
+ onu = OltState.Pon.Onu(onu_data)
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
+
+ return onus
+
+ @property
+ def onu_id(self):
+ """The ID used to identify the ONU"""
+ return self._packet['onu-id']
+
+ @property
+ def oper_status(self):
+ """The operational state of each ONU"""
+ return self._packet.get('oper-status', 'unknown')
+
+ @property
+ def reported_password(self):
+ """The password reported by the ONU (binary)"""
+ return self._packet.get('reported-password', bytes(0))
+
+ @property
+ def rssi(self):
+ """The received signal strength indication of the ONU"""
+ return self._packet.get('rssi', -9999)
diff --git a/voltha/adapters/adtran_olt/net/__init__.py b/voltha/adapters/adtran_olt/net/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/__init__.py
diff --git a/voltha/adapters/adtran_olt/net/adtran_rest.py b/voltha/adapters/adtran_olt/net/adtran_rest.py
new file mode 100644
index 0000000..456b3c5
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/adtran_rest.py
@@ -0,0 +1,162 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+
+import structlog
+import treq
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.error import ConnectionClosed
+
+log = structlog.get_logger()
+
+
+class AdtranRestClient(object):
+ """
+ Performs Adtran RESTCONF requests
+ """
+ # HTTP shortcuts
+ HELLO_URI = '/restconf/adtran-hello:hello'
+
+ REST_GET_REQUEST_HEADER = {'User-Agent': 'Adtran RESTConf',
+ 'Accept': ['application/json']}
+
+ REST_POST_REQUEST_HEADER = {'User-Agent': 'Adtran RESTConf',
+ 'Content-Type': 'application/json',
+ 'Accept': ['application/json']}
+
+ REST_PATCH_REQUEST_HEADER = REST_POST_REQUEST_HEADER
+ REST_PUT_REQUEST_HEADER = REST_POST_REQUEST_HEADER
+ REST_DELETE_REQUEST_HEADER = REST_GET_REQUEST_HEADER
+
+ HTTP_OK = 200
+ HTTP_CREATED = 201
+ HTTP_ACCEPTED = 202
+ HTTP_NON_AUTHORITATIVE_INFORMATION = 203
+ HTTP_NO_CONTENT = 204
+ HTTP_RESET_CONTENT = 205
+ HTTP_PARTIAL_CONTENT = 206
+
+ _valid_methods = {'GET', 'POST', 'PATCH', 'DELETE'}
+ _valid_results = {'GET': [HTTP_OK, HTTP_NO_CONTENT],
+ 'POST': [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT],
+ 'PUT': [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT],
+ 'PATCH': [HTTP_OK],
+ 'DELETE': [HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT]
+ }
+
+ for _method in _valid_methods:
+ assert _method in _valid_results # Make sure we have a results entry for each supported method
+
+ def __init__(self, host_ip, port, username='', password='', timeout=20):
+ """
+ REST Client initialization
+
+ :param host_ip: (string) IP Address of Adtran Device
+ :param port: (int) Port number
+ :param username: (string) Username for credentials
+ :param password: (string) Password for credentials
+ :param timeout: (int) Number of seconds to wait for a response before timing out
+ """
+ self.ip = host_ip
+ self.rest_port = port
+ self.username = username
+ self.password = password
+ self.timeout = timeout
+
+ @inlineCallbacks
+ def request(self, method, uri, data=None, name=''):
+ """
+ Send a REST request to the Adtran device
+
+ :param method: (string) HTTP method
+ :param uri: (string) fully URL to perform method on
+ :param data: (string) optional data for the request body
+ :param name: (string) optional name of the request, useful for logging purposes
+ :return: (deferred)
+ """
+
+ if method.upper() not in self._valid_methods:
+ raise NotImplementedError("REST method '{}' is not supported".format(method))
+
+ url = 'http://{}:{}{}{}'.format(self.ip, self.rest_port,
+ '/' if uri[0] != '/' else '',
+ uri)
+ try:
+ if method.upper() == 'GET':
+ response = yield treq.get(url,
+ auth=(self.username, self.password),
+ timeout=self.timeout,
+ headers=self.REST_GET_REQUEST_HEADER)
+ elif method.upper() == 'POST' or method.upper() == 'PUT':
+ response = yield treq.post(url,
+ data=data,
+ auth=(self.username, self.password),
+ timeout=self.timeout,
+ headers=self.REST_POST_REQUEST_HEADER)
+ elif method.upper() == 'PATCH':
+ response = yield treq.patch(url,
+ data=data,
+ auth=(self.username, self.password),
+ timeout=self.timeout,
+ headers=self.REST_PATCH_REQUEST_HEADER)
+ elif method.upper() == 'DELETE':
+ response = yield treq.delete(url,
+ auth=(self.username, self.password),
+ timeout=self.timeout,
+ headers=self.REST_DELETE_REQUEST_HEADER)
+ else:
+ raise NotImplementedError("REST method '{}' is not supported".format(method))
+
+ except NotImplementedError:
+ raise
+
+ except ConnectionClosed:
+ returnValue(None)
+
+ except Exception, e:
+ log.exception("REST {} '{}' request to '{}' failed: {}".format(method, name, url, str(e)))
+ raise
+
+ if response.code not in self._valid_results[method.upper()]:
+ message = "REST {} '{}' request to '{}' failed with status code {}".format(method, name,
+ url, response.code)
+ log.error(message)
+ raise Exception(message)
+
+ if response.code == self.HTTP_NO_CONTENT:
+ returnValue(None)
+
+ else:
+ # TODO: May want to support multiple body encodings in the future
+
+ headers = response.headers
+ type_key = 'content-type'
+ type_val = 'application/json'
+
+ if not headers.hasHeader(type_key) or type_val not in headers.getRawHeaders(type_key, []):
+ raise Exception("REST {} '{}' request response from '{} was not JSON",
+ method, name, url)
+
+ content = yield response.content()
+ try:
+ result = json.loads(content)
+
+ except Exception, e:
+ log.exception("REST {} '{}' JSON decode of '{}' failure: {}".format(method, name,
+ url, str(e)))
+ raise
+
+ returnValue(result)
diff --git a/voltha/adapters/adtran_olt/net/adtran_zmq.py b/voltha/adapters/adtran_olt/net/adtran_zmq.py
new file mode 100644
index 0000000..bdfaa45
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/adtran_zmq.py
@@ -0,0 +1,139 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import binascii
+import struct
+
+import structlog
+from txzmq import ZmqEndpoint, ZmqFactory
+from txzmq.connection import ZmqConnection
+from zmq import constants
+
+log = structlog.get_logger()
+zmq_factory = ZmqFactory()
+
+# An OMCI message minimally has a 32-bit PON index and 32-bit ONU ID.
+
+_OLT_TASK_ZEROMQ_OMCI_TCP_PORT = 25656
+
+
+class AdtranZmqClient(object):
+ """
+ Adtran ZeroMQ Client for PON Agent packet in/out service
+
+ PON Agent expects and external PAIR socket with
+ """
+
+ def __init__(self, ip_address, rx_callback=None,
+ port=_OLT_TASK_ZEROMQ_OMCI_TCP_PORT):
+ self.external_conn = 'tcp://{}:{}'.format(ip_address, port)
+
+ self.zmq_endpoint = ZmqEndpoint('connect', self.external_conn)
+ self.socket = ZmqPairConnection(zmq_factory,
+ self.zmq_endpoint)
+
+ self.socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
+
+ def send(self, data):
+ try:
+ self.socket.send(data)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ @staticmethod
+ def rx_nop(message):
+ log.debug('Discarding ZMQ message, no receiver specified')
+
+ @staticmethod
+ def encode_omci_message(msg, pon_index, onu_id):
+ """
+ Create an OMCI Tx Packet for the specified ONU
+
+ :param msg: (str) OMCI message to send
+ :param pon_index: (unsigned int) PON Port index
+ :param onu_id: (unsigned int) ONU ID
+
+ :return: (bytes) octet string to send
+ """
+ assert msg
+ # log.debug("Encoding OMCI: PON: {}, ONU: {}, Message: '{}'".
+ # format(pon_index, onu_id, msg))
+ s = struct.Struct('!II')
+
+ return s.pack(pon_index, onu_id) + binascii.unhexlify(msg)
+
+ @staticmethod
+ def decode_packet(packet):
+ """
+ Decode the packet provided by the ZMQ client
+
+ :param packet: (bytes) Packet
+ :return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
+ and a flag indicating if it is OMCI
+ """
+ # TODO: For now, only OMCI supported
+ if isinstance(packet, list):
+ if len(packet) > 1:
+ pass # TODO: Can we get multiple packets?
+
+ return AdtranZmqClient._decode_omci_message(packet[0])
+ return -1, -1, None, False
+
+ @staticmethod
+ def _decode_omci_message(packet):
+ """
+ Decode the packet provided by the ZMQ client
+
+ :param packet: (bytes) Packet
+ :return: (long, long, bytes) PON Index, ONU ID, OMCI Frame Contents
+ """
+ (pon_index, onu_id) = struct.unpack_from('!II', packet)
+ omci_msg = packet[8:]
+
+ return pon_index, onu_id, omci_msg, True
+
+ @staticmethod
+ def _decode_packet_in_message(packet):
+ # TODO: This is not yet supported
+ (pon_index, onu_id) = struct.unpack_from('!II', packet)
+ msg = binascii.hexlify(packet[8:])
+
+ return pon_index, onu_id, msg, False
+
+
+class ZmqPairConnection(ZmqConnection):
+ """
+ Bidirectional messages to/from the socket.
+
+ Wrapper around ZeroMQ PUSH socket.
+ """
+ socketType = constants.PAIR
+
+ def messageReceived(self, message):
+ """
+ Called on incoming message from ZeroMQ.
+
+ :param message: message data
+ """
+ self.onReceive(message)
+
+ def onReceive(self, message):
+ """
+ Called on incoming message received from other end of the pair.
+
+ :param message: message data
+ """
+ raise NotImplementedError(self)
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
new file mode 100644
index 0000000..55160f3
--- /dev/null
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -0,0 +1,140 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from voltha.core.logical_device_agent import mac_str_to_tuple
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Port
+from voltha.protos.logical_device_pb2 import LogicalPort
+from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
+
+log = structlog.get_logger()
+
+
+class NniPort(object):
+ """
+ A class similar to the 'Port' class in the VOLTHA
+
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+
+ def __init__(self, parent, **kwargs):
+ # TODO: Weed out those properties supported by common 'Port' object
+ assert parent
+ assert 'port_no' in kwargs
+
+ self.port = None
+ self.logical_port = None
+ self.parent = parent
+ self.port_no = kwargs.get('port_no')
+
+ self.startup = None
+ log.info('Creating NNI Port {}'.format(self.port_no))
+
+ # And optional parameters
+
+ self.admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
+ self.oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
+ self.label = kwargs.pop('label', 'NNI port {}'.format(self.port_no))
+ self.name = kwargs.pop('name', 'nni-{}'.format(self.port_no))
+ self.mac_address = kwargs.pop('mac_address',
+ '08:00:{}{}:{}{}:{}{}:00'.format(random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9)))
+
+ # TODO: May need to refine capabilities into current, advertised, and peer
+
+ self.ofp_capabilities = kwargs.pop('ofp_capabilities', OFPPF_100GB_FD | OFPPF_FIBER)
+ self.ofp_state = kwargs.pop('ofp_state', OFPPS_LIVE)
+ self.current_speed = kwargs.pop('current_speed', OFPPF_100GB_FD)
+ self.max_speed = kwargs.pop('max_speed', OFPPF_100GB_FD)
+ self.device_port_no = kwargs.pop('device_port_no', self.port_no)
+
+ def __str__(self):
+ return "NniPort-{}: Admin: {}, Oper: {}, parent: {}".format(self.port_no,
+ self.admin_state,
+ self.oper_status,
+ self.parent)
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ if self.port is None:
+ self.port = Port(port_no=self.port_no,
+ label=self.label,
+ type=Port.ETHERNET_NNI,
+ admin_state=self.admin_state,
+ oper_status=self.oper_status)
+ return self.port
+
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port
+ :return: VOLTHA logical port or None if not supported
+ """
+ if self.logical_port is None:
+ openflow_port = ofp_port(port_no=self.port_no,
+ hw_addr=mac_str_to_tuple(self.mac_address),
+ name=self.name,
+ config=0,
+ state=self.ofp_state,
+ curr=self.ofp_capabilities,
+ advertised=self.ofp_capabilities,
+ peer=self.ofp_capabilities,
+ curr_speed=self.current_speed,
+ max_speed=self.max_speed)
+
+ self.logical_port = LogicalPort(id='nni{}'.format(self.port_no),
+ ofp_port=openflow_port,
+ device_id=self.parent.device_id,
+ device_port_no=self.device_port_no,
+ root_port=True)
+ return self.logical_port
+
+ @inlineCallbacks
+ def start(self):
+ """
+ Start/enable this NNI
+
+ :return: (deferred)
+ """
+ log.info('Starting NNI port {}'.format(self.port_no))
+
+ # TODO: Start up any watchdog/polling tasks here
+
+ yield returnValue('NNI Port start is a NOP at this time')
+
+ def stop(self):
+ log.info('Stopping NNI port {}'.format(self.port_no))
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
+ self.admin_state = AdminState.DISABLED
+ self.oper_status = OperStatus.UNKNOWN
+
+ yield returnValue('NNI Port stop may need more work')
+ # TODO: How do we reflect this into VOLTHA
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
new file mode 100644
index 0000000..16ebf18
--- /dev/null
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -0,0 +1,87 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import base64
+import json
+
+import structlog
+
+from adtran_olt_handler import AdtranOltHandler
+
+log = structlog.get_logger()
+
+_VSSN_TO_VENDOR = {
+ 'adtn': 'adtran_onu',
+ 'bcm?': 'broadcom_onu', # TODO: Get actual VSSN for this vendor
+ 'dp??': 'dpoe_onu', # TODO: Get actual VSSN for this vendor
+ 'pmc?': 'pmcs_onu', # TODO: Get actual VSSN for this vendor
+ 'psm?': 'ponsim_onu', # TODO: Get actual VSSN for this vendor
+ 'sim?': 'simulated_onu', # TODO: Get actual VSSN for this vendor
+ 'tbt?': 'tibit_onu', # TODO: Get actual VSSN for this vendor
+}
+
+
+class Onu(object):
+ """
+ Wraps an ONU
+ """
+ MIN_ONU_ID = 0
+ MAX_ONU_ID = 1022
+ BROADCAST_ONU_ID = 1023
+ DEFAULT_PASSWORD = ''
+
+ def __init__(self, serial_number, parent, password=DEFAULT_PASSWORD):
+ self.onu_id = parent.get_next_onu_id()
+ self.serial_number = serial_number
+ self.password = password
+ self.parent = parent
+
+ try:
+ sn_ascii = base64.decodestring(serial_number).lower()[:4]
+ except Exception:
+ sn_ascii = 'Invalid_VSSN'
+
+ self.vendor_device = _VSSN_TO_VENDOR.get(sn_ascii,
+ 'Unsupported_{}'.format(sn_ascii))
+
+ def __del__(self):
+ # self.stop()
+ pass
+
+ def __str__(self):
+ return "Onu-{}-{}/{} parent: {}".format(self.onu_id, self.serial_number,
+ base64.decodestring(self.serial_number),
+ self.parent)
+
+ def create(self, enabled):
+ """
+ POST -> /restconf/data/gpon-olt-hw:olt/pon=<pon-id>/onus/onu ->
+ """
+ pon_id = self.parent.pon_id
+ data = json.dumps({'onu-id': self.onu_id,
+ 'serial-number': self.serial_number,
+ 'enable': enabled})
+ uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(pon_id)
+ name = 'onu-create-{}-{}-{}: {}'.format(pon_id, self.onu_id, self.serial_number, enabled)
+ return self.parent.parent.rest_client.request('POST', uri, data=data, name=name)
+
+ def set_config(self, leaf, value):
+ pon_id = self.parent.pon_id
+ data = json.dumps({'onu-id': self.onu_id,
+ leaf: value})
+ uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(pon_id)
+ name = 'pon-set-config-{}-{}-{}'.format(self.pon_id, leaf, str(value))
+ name = 'onu-set-config-{}-{}-{}: {}'.format(pon_id, self.onu_id, leaf, value)
+ return self.parent.parent.rest_client.request('PATCH', uri, data=data, name=name)
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
new file mode 100644
index 0000000..1eacbaf
--- /dev/null
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -0,0 +1,397 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import json
+import pprint
+import random
+
+import os
+import structlog
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+from adtran_olt_handler import AdtranOltHandler
+from codec.olt_config import OltConfig
+from onu import Onu
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Device
+from voltha.protos.device_pb2 import Port
+
+log = structlog.get_logger()
+
+
+class PonPort(object):
+ """
+ A class similar to the 'Port' class in the VOLTHA
+
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+ MAX_ONUS_SUPPORTED = 256
+ DEFAULT_ENABLED = False
+
+ def __init__(self, pon_index, port_no, parent, admin_state=AdminState.UNKNOWN, label=None):
+ # TODO: Weed out those properties supported by common 'Port' object
+ assert admin_state != AdminState.UNKNOWN
+ self.parent = parent
+ self._pon_index = pon_index
+ self._port_no = port_no
+ self.label = label or 'PON-{}'.format(pon_index)
+ self.admin_state = admin_state
+ self.oper_status = OperStatus.ACTIVE # TODO: Need to discover
+ self.startup = None
+ self.onu_discovery = None
+ self.port = None
+ self.no_onu_discover_tick = 5.0 # TODO: Decrease to 1 or 2 later
+ self.discovery_tick = 20.0
+ self.discovered_onus = [] # List of serial numbers
+ self.onus = {} # serial_number -> ONU (allowed list)
+ self.next_onu_id = Onu.MIN_ONU_ID
+
+ def __del__(self):
+ # self.stop()
+ pass
+
+ def __str__(self):
+ return "PonPort-{}: Admin: {}, Oper: {}, parent: {}".format(self.label,
+ self.admin_state,
+ self.oper_status,
+ self.parent)
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ if self.port is None:
+ self.port = Port(port_no=self.port_number,
+ label=self.label,
+ type=Port.PON_OLT,
+ admin_state=self.admin_state,
+ oper_status=self.oper_status)
+ return self.port
+
+ @property
+ def port_number(self):
+ return self._port_no
+
+ @property
+ def pon_id(self):
+ return self._pon_index
+
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port
+ :return: VOLTHA logical port or None if not supported
+ """
+ return None
+
+ @inlineCallbacks
+ def start(self):
+ """
+ Start/enable this PON and start ONU discover
+ :return: (deferred)
+ """
+ log.info('Starting {}'.format(self.label))
+
+ """
+ Here is where I will start to bring up a PON port and discover an ONT
+
+ Note: For some reason, you cannot chain the FEC enables with the pon enable below?
+ """
+ try:
+ self.startup = self.set_pon_config("enabled", True)
+ yield self.startup
+
+ except Exception, e:
+ log.exception("enabled failed: {}".format(str(e)))
+ raise
+
+ try:
+ self.startup = self.set_pon_config("downstream-fec-enable", True)
+ yield self.startup
+
+ except Exception, e:
+ log.exception("downstream FEC enable failed: {}".format(str(e)))
+ raise
+
+ try:
+ self.startup = self.set_pon_config("upstream-fec-enable", True)
+ results = yield self.startup
+
+ except Exception, e:
+ log.exception("upstream FEC enable failed: {}".format(str(e)))
+ raise
+
+ log.debug('ONU Startup complete: results: {}'.
+ format(pprint.PrettyPrinter().pformat(results)))
+
+ if isinstance(results, dict) and results.get('enabled', False):
+ self.admin_state = AdminState.ENABLED
+ self.oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+
+ # Begin to ONU discovery. Once a second if no ONUs found and once every 20
+ # seconds after one or more ONUs found on the PON
+ self.onu_discovery = reactor.callLater(3, self.discover_onus)
+ returnValue(self.onu_discovery)
+
+ else:
+ # Startup failed. Could be due to object creation with an invalid initial admin_status
+ # state. May want to schedule a start to occur again if this happens
+ self.admin_state = AdminState.DISABLED
+ self.oper_status = OperStatus.UNKNOWN
+ raise NotImplementedError('TODO: Support of PON startup failure not yet supported')
+
+ @inlineCallbacks
+ def stop(self):
+ log.info('Stopping {}'.format(self.label))
+ d, self.startup = self.startup, None
+ if d is not None:
+ d.cancel()
+
+ d, self.onu_discovery = self.onu_discovery, None
+ if d is not None:
+ d.cancel()
+
+ self.reset(False)
+ self.admin_state = AdminState.DISABLED
+ self.oper_status = OperStatus.UNKNOWN
+ # TODO: How do we reflect this into VOLTHA?
+
+ @inlineCallbacks
+ def reset(self):
+ log.info('Reset {}'.format(self.label))
+
+ if self.admin_state != self.parent.initial_port_state:
+ try:
+ enable = self.parent.initial_port_state == AdminState.ENABLED
+ yield self.set_pon_config("enabled", enable)
+
+ # TODO: Move to 'set_pon_config' method and also make sure GRPC/Port is ok
+ self.admin_state = AdminState.ENABLED if enable else AdminState.DISABLE
+
+ except Exception as e:
+ log.exception('Reset of PON {} to initial state failed'.
+ format(self.pon_id), e=e)
+ raise
+
+ if self.admin_state == AdminState.ENABLED and self.parent.initial_onu_state == AdminState.DISABLED:
+ try:
+ # Walk the provisioned ONU list and disable any exiting ONUs
+ results = yield self.get_onu_config()
+
+ if isinstance(results, list) and len(results) > 0:
+ onu_configs = OltConfig.Pon.Onu.decode(results)
+ for onu_id in onu_configs.iterkeys():
+ try:
+ yield self.delete_onu(onu_id)
+
+ except Exception as e:
+ log.exception('Delete of ONU {} on PON {} failed'.
+ format(onu_id, self.pon_id), e=e)
+ pass # Non-fatal
+
+ except Exception as e:
+ log.exception('Failed to get current ONU config for PON {}'.
+ format(self.pon_id), e=e)
+ raise
+
+ def get_pon_config(self):
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self.pon_id)
+ name = 'pon-get-config-{}'.format(self.pon_id)
+ return self.parent.rest_client.request('GET', uri, name=name)
+
+ def get_onu_config(self, onu_id=None):
+ uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(self.pon_id)
+ if onu_id is not None:
+ uri += '={}'.format(onu_id)
+ name = 'pon-get-onu_config-{}-{}'.format(self.pon_id, onu_id)
+ return self.parent.rest_client.request('GET', uri, name=name)
+
+ def set_pon_config(self, leaf, value):
+ data = json.dumps({leaf: value})
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self.pon_id)
+ name = 'pon-set-config-{}-{}-{}'.format(self.pon_id, leaf, str(value))
+ return self.parent.rest_client.request('PATCH', uri, data=data, name=name)
+
+ def discover_onus(self):
+ log.debug("Initiating discover of ONU/ONTs on PON {}".format(self.pon_id))
+
+ if self.admin_state == AdminState.ENABLED:
+ data = json.dumps({'pon-id': self.pon_id})
+ uri = AdtranOltHandler.GPON_PON_DISCOVER_ONU
+ name = 'pon-discover-onu-{}'.format(self.pon_id)
+ self.startup = self.parent.rest_client.request('POST', uri, data, name=name)
+
+ self.startup.addBoth(self.onu_discovery_init_complete)
+
+ def onu_discovery_init_complete(self, _):
+ """
+ This method is called after the REST POST to request ONU discovery is
+ completed. The results (body) of the post is always empty / 204 NO CONTENT
+ """
+ log.debug('PON {} ONU Discovery requested'.format(self.pon_id))
+
+ # Reschedule
+
+ delay = self.no_onu_discover_tick if len(self.onus) == 0 else self.discovery_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+
+ self.onu_discovery = reactor.callLater(delay, self.discover_onus)
+
+ def process_status_poll(self, status):
+ """
+ Process PON status poll request
+
+ :param status: (OltState.Pon object) results from RESTCONF GET
+ """
+ log.debug('process_status_poll: PON {}: {}{}'.format(self.pon_id,
+ os.linesep,
+ status))
+ if self.admin_state != AdminState.ENABLED:
+ return
+
+ # Process the ONU list in for this PON, may have previously provisioned ones there
+ # were discovered on an earlier boot
+
+ new = self._process_status_onu_list(status.onus)
+
+ for onu_id in new:
+ # self.add_new_onu(serial_number, status)
+ log.info('Found ONU {} in status list'.format(onu_id))
+ raise NotImplementedError('TODO: Adding ONUs from existing ONU (status list) not supported')
+
+ # Get new/missing from the discovered ONU leaf
+
+ new, missing = self._process_status_onu_discovered_list(status.discovered_onu)
+
+ # TODO: Do something useful
+ if len(missing):
+ log.info('Missing ONUs are: {}'.format(missing))
+
+ for serial_number in new:
+ reactor.callLater(0, self.add_onu, serial_number, status)
+
+ # Process discovered ONU list
+
+ # TODO: Process LOS list
+ # TODO: Process status
+ pass
+
+ def _process_status_onu_list(self, onus):
+ """
+ Look for new or missing ONUs
+
+ :param onus: (dict) Set of known ONUs
+ """
+ log.debug('Processing ONU list: {}'.format(onus))
+
+ my_onu_ids = frozenset([o.onu_id for o in self.onus.itervalues()])
+ discovered_onus = frozenset(onus.keys())
+
+ new_onus_ids = discovered_onus - my_onu_ids
+ missing_onus_ids = my_onu_ids - discovered_onus
+
+ new = {o: v for o, v in onus.iteritems() if o in new_onus_ids}
+ missing_onus = {o: v for o, v in onus.iteritems() if o in missing_onus_ids}
+
+ return new # , missing_onus # TODO: Support ONU removal
+
+ def _process_status_onu_discovered_list(self, discovered_onus):
+ """
+ Look for new or missing ONUs
+
+ :param discovered_onus: (frozenset) Set of ONUs currently discovered
+ """
+ log.debug('Processing discovered ONU list: {}'.format(discovered_onus))
+
+ my_onus = frozenset(self.onus.keys())
+
+ new_onus = discovered_onus - my_onus
+ missing_onus = my_onus - discovered_onus
+
+ return new_onus, missing_onus
+
+ @inlineCallbacks
+ def add_onu(self, serial_number, status):
+ log.info('Add ONU: {}'.format(serial_number))
+
+ if serial_number not in status.onus:
+ # Newly found and not enabled ONU, enable it now if not at max
+
+ if len(self.onus) < self.MAX_ONUS_SUPPORTED:
+ # TODO: For now, always allow any ONU
+
+ if serial_number not in self.onus:
+ onu = Onu(serial_number, self)
+
+ try:
+ yield onu.create(True)
+
+ self.on_new_onu_discovered(onu)
+ self.onus[serial_number] = onu
+
+ except Exception as e:
+ log.exception('Exception during add_onu, pon: {}, onu: {}'.
+ format(self.pon_id, onu.onu_id), e=e)
+ else:
+ log.info('TODO: Code this')
+
+ else:
+ log.warning('Maximum number of ONUs already provisioned on PON {}'.
+ format(self.pon_id))
+ else:
+ # ONU has been enabled
+ pass
+
+ def on_new_onu_discovered(self, onu):
+ """
+ Called when a new ONU is discovered and VOLTHA device adapter needs to be informed
+ :param onu:
+ :return:
+ """
+ olt = self.parent
+ adapter = olt.adapter_agent
+
+ proxy = Device.ProxyAddress(device_id=olt.device_id,
+ channel_id=self.port_number,
+ onu_id=onu.onu_id)
+
+ adapter.child_device_detected(parent_device_id=olt.device_id,
+ parent_port_no=self.port_number,
+ child_device_type=onu.vendor_device,
+ proxy_address=proxy)
+
+ def get_next_onu_id(self):
+ used_ids = [onu.onu_id for onu in self.onus]
+
+ while True:
+ onu_id = self.next_onu_id
+ self.next_onu_id += 1
+
+ if self.next_onu_id > Onu.MAX_ONU_ID:
+ self.next_onu_id = Onu.MIN_ONU_ID
+
+ if onu_id not in used_ids:
+ return onu_id
+
+ def delete_onu(self, onu_id):
+ uri = AdtranOltHandler.GPON_PON_ONU_CONFIG_URI.format(self.pon_id)
+ uri += '={}'.format(onu_id)
+ name = 'pon-delete-onu-{}-{}'.format(self.pon_id, onu_id)
+
+ # TODO: Need removal from VOLTHA child_device method
+
+ return self.parent.rest_client.request('DELETE', uri, name=name)