VOL-1397: Adtran-OLT - Initial containerization commit
- Need to move VERSION to base directory
Change-Id: I9d62d0607a011ce642e379fd92b35ec48b300070
diff --git a/adapters/adtran_olt/README.md b/adapters/adtran_olt/README.md
new file mode 100644
index 0000000..737b73b
--- /dev/null
+++ b/adapters/adtran_olt/README.md
@@ -0,0 +1,176 @@
+# Adtran OLT Device Adapter
+To preprovision an Adtran OLT, you will need to provide the IP Address and
+the NETCONF/REST credentials for the device. The NETCONF/REST credentials are an
+extension of the existing **preprovision_olt** command and these are placed after
+entering two dashes '_--_'. The full syntax to use is.
+
+| Short | Long | Default | Notes |
+| :---: | :----------------: | :--------: | ----- |
+| -u | --nc_username | '' | NETCONF Username |
+| -p | --nc_password | '' | NETCONF Password |
+| -t | --nc_port | 830 | NETCONF TCP Port |
+| -U | --rc_username | '' | REST Username |
+| -P | --rc_password | '' | REST Password |
+| -T | --rc_port | 8081 | REST TCP Port |
+| -z | --zmq_port | 5656 | ZeroMQ OMCI Proxy Port |
+| -M | --multicast_vlan | 4000 | Multicast VLANs (comma-delimited) |
+| -Z | --pio_port | 5657 | PIO Service ZeroMQ Port |
+| -o | --resource_mgr_key | adtran_olt | OLT Type to look up associated resource manager configuration |
+
+For example, if your Adtran OLT is address 10.17.174.193 with the default TCP ports and
+NETCONF credentials of admin/admin and REST credentials of ADMIN/ADMIN, the command line
+would be:
+
+```bash
+ preprovision_olt -t adtran_olt -i 10.17.174.193 -- -u admin -p admin -U ADMIN -P ADMIN
+```
+or
+```bash
+ preprovision_olt -t adtran_olt -i 10.17.174.193 -- --nc_username admin --nc_password admin --rc_username ADMIN --rc_password ADMIN
+```
+
+In addition to specifying the Adtran OLT by a single IP address, the host & port provisioning option
+is also supported. This allows you to configure the address of the Adtran OLT with the same command line
+option as the OpenOLT device adapter. For the port number, just specify the netconf port (default 830)
+as in:
+
+```bash
+ preprovision_olt -t adtran_olt -H 10.17.174.193:830
+```
+or
+```bash
+ preprovision_olt -t adtran_olt --host_and_port 10.17.174.193:830
+```
+
+## Resource Manager Provisioning Support
+Starting in Fall of 2018, Resource Manager Support was added as the default provisioning mechanism
+for the Adtran OLT as the xPON provisioning support will be deprecated by the v2.0 release in
+late-2018/early-2019.
+
+The Resource Manager is used to manage device PON resource pool and allocate PON resources from
+such pools. Resource Manager module currently manages assignment of ONU-ID, ALLOC-ID and
+GEM-PORT ID. The Resource Manager uses the KV store to back-up all the resource pool allocation data.
+
+The Adtran OLT adapter interacts with Resource Manager module for PON resource assignments. The
+adtranolt_resource_manager module is responsible for interfacing with the Resource Manager.
+
+The Resource Manager optionally uses olt_vendor_type specific resource ranges to initialize the
+PON resource pools. In order to utilize this option, create an entry for olt_vendor_type specific
+PON resource ranges on the KV store. Please make sure to use the same KV store used by the VOLTHA core.
+
+### For example
+To specify **ADTRAN OLT** device specific resource ranges, first create a JSON file
+_adtran_olt_resource_range.json_ with the following entry
+
+{
+ "onu_start_idx": 0,
+ "onu_end_idx": 127,
+ "alloc_id_start_idx": 1024,
+ "alloc_id_end_idx": 4222,
+ "gem_port_id_start_idx": 2176,
+ "gem_port_id_end_idx": 16383,
+ "num_of_pon_port": 16
+}
+This data should be put on the KV store location _resource_manager/xgspon/resource_ranges/adtran_olt_
+
+The format of the KV store location is resource_manager/<technology>/resource_ranges/<resource_mgr_key>
+
+In the below example the KV store is assumed to be Consul. However the same is applicable to be
+etcd or any other KV store. Please make sure to use the same KV store used by the VOLTHA core.
+
+```bash
+curl -X PUT -H "Content-Type: application/json" \
+ http://127.0.0.1:8500/v1/kv/resource_manager/xgspon/resource_ranges/adtran_olt \
+ -d @./adtran_olt_resource_range.json
+```
+The olt_vendor_type should be referred to during the preprovisioning step as shown below. The
+olt_vendor_type is an extra option and should be specified after --. The -o specifies the resource_mgr_key.
+
+ (voltha) preprovision_olt -t adtran -H 192.168.1.100:830 -- -o adtran_olt
+Once the OLT device is enabled, any further PON Resource assignments will happen within the PON Resource ranges defined in asfvolt16_resource_range.json and placed on the KV store.
+
+Additional Notes
+If a default resource range profile should be used with all olt_vendor_types, then place such Resource Range profile at the below path on the KV store.
+
+resource_manager/xgspon/resource_ranges/default
+
+## xPON Provisioning Support
+
+Currently the Adtran Device Adapter supports xPON provisioning to enable PON ports, or activate ONUs, you
+must use the appropriate commands. In the VOLTHA v2.0 release (Q4 2018?), the xPON provisioning will be removed
+from VOLTHA and replaced with Technology Profiles. _By default, this provisioning is now disabled and you should
+use the '-X' extra-arguments provisioning command switch if you wish to use it_.
+
+### REST Based xPON Pre-Provisioning
+In addition to CLI provisioning, the Adtran OLT Device Adapter can also be provisioned though the
+VOLTHA Northbound REST API. The following examples show curl commands when running with the **_Consul_**
+key-value store. Similar curl commands can be used when **_etcd_** is used as the key value store
+
+```bash
+VOLTHA_IP=localhost
+OLT_IP=10.17.174.228
+REST_PORT=`curl -s http://localhost:8500/v1/catalog/service/voltha-envoy-8443 | jq -r '.[0].ServicePort'`
+
+curl -k -s -X POST https://${VOLTHA_IP}:${REST_PORT}/api/v1/devices \
+ --header 'Content-Type: application/json' --header 'Accept: application/json' \
+ -d "{\"type\": \"adtran_olt\",\"ipv4_address\": \"${OLT_IP}\",\"extra_args\": \"-u admin -p admin -U ADMIN -P ADMIN\"}" \
+| jq '.' | tee /tmp/adtn-olt.json
+```
+This will not only pre-provision the OLT, but it will also return the created VOLTHA Device ID for use other commands.
+The output is also shown on the console as well:
+
+```bash
+curl -k -s -X POST https://${VOLTHA_IP}:${REST_PORT}/api/v1/devices \
+ --header 'Content-Type: application/json' --header 'Accept: application/json' \
+ -d "{\"type\": \"adtran_olt\",\"ipv4_address\": \"${OLT_IP}\",\"extra_args\": \"-u admin -p admin -U ADMIN -P ADMIN\"}" \
+| jq '.' | tee /tmp/adtn-olt.json
+{
+ "extra_args": "-u admin -p admin -U ADMIN -P ADMIN",
+ "vendor": "",
+ "channel_terminations": [],
+ "parent_port_no": 0,
+ "connect_status": "UNKNOWN",
+ "root": false,
+ "adapter": "adtran_olt",
+ "vlan": 0,
+ "hardware_version": "",
+ "ports": [],
+ "ipv4_address": "10.17.174.228",
+ "parent_id": "",
+ "oper_status": "UNKNOWN",
+ "admin_state": "PREPROVISIONED",
+ "reason": "",
+ "serial_number": "",
+ "model": "",
+ "type": "adtran_olt",
+ "id": "00017cbb382b9260",
+ "firmware_version": ""
+}
+```
+Besides specifying the "ipv4_address" leaf, you can alternatively use the "host_and_port" leaf to
+provide the IP Host address and the NetCONF port as in "10.17.174.228:830"
+
+### Enabling the Pre-Provisioned OLT
+To enable the OLT, you need the retrieve the OLT Device ID and issue a POST request to the proper URL as in:
+```bash
+DEVICE_ID=$(jq .id /tmp/adtn-olt.json | sed 's/"//g')
+
+curl -k -s -X POST https://${VOLTHA_IP}:${REST_PORT}/api/v1/local/devices/${DEVICE_ID}/enable
+```
+#### Other REST APIs
+To list out any devices, you can use the following command:
+
+```bash
+curl -k -s https://${VOLTHA_IP}:${REST_PORT}/api/v1/devices | json_pp
+```
+
+Other API endpoints (beyond the /v1/ field above) can be listed with the following command
+
+```bash
+curl -k -s https://${VOLTHA_IP}:${REST_PORT}/api/v1 | json_pp
+```
+
+# Tested OLT Device Driver versions
+
+The minimum version number of for the OLT software is: *_11971320F1-ML-3309_* or later
+
diff --git a/adapters/adtran_olt/__init__.py b/adapters/adtran_olt/__init__.py
new file mode 100644
index 0000000..d67fcf2
--- /dev/null
+++ b/adapters/adtran_olt/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_olt/adtran_olt.py b/adapters/adtran_olt/adtran_olt.py
new file mode 100644
index 0000000..c052b78
--- /dev/null
+++ b/adapters/adtran_olt/adtran_olt.py
@@ -0,0 +1,222 @@
+#
+# Copyright 2019-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+"""
+ADTRAN OLT Adapter.
+"""
+import structlog
+from twisted.internet import reactor, defer
+
+from pyvoltha.adapters.iadapter import OltAdapter
+from pyvoltha.protos import third_party
+from pyvoltha.protos.common_pb2 import AdminState
+
+from adtran_olt_handler import AdtranOltHandler
+
+
+_ = third_party
+log = structlog.get_logger()
+
+
+class AdtranOltAdapter(OltAdapter):
+ name = 'adtran_olt'
+
+ def __init__(self, core_proxy, adapter_proxy, config):
+ super(AdtranOltAdapter, self).__init__(core_proxy=core_proxy,
+ adapter_proxy=adapter_proxy,
+ config=config,
+ device_handler_class=AdtranOltHandler,
+ name=AdtranOltAdapter.name,
+ vendor='ADTRAN, Inc.',
+ version='2.0.0',
+ device_type=AdtranOltAdapter.name,
+ accepts_bulk_flow_update=True,
+ accepts_add_remove_flow_updates=False) # TODO: Implement me
+
+ log.debug('adtran_olt.__init__')
+
+ def health(self):
+ """
+ Return a 3-state health status using the voltha.HealthStatus message.
+
+ :return: Deferred or direct return with voltha.HealthStatus message
+ """
+ # TODO: Currently this is always healthy for every adapter.
+ # If we decide not to modify this, delete this method and use base class method
+ from pyvoltha.protos.health_pb2 import HealthStatus
+ return HealthStatus(state=HealthStatus.HEALTHY)
+
+ def abandon_device(self, device):
+ """
+ Make sure the adapter no longer looks after device. This is called
+ if device ownership is taken over by another Voltha instance.
+
+ :param device: A Voltha.Device object
+ :return: (Deferred) Shall be fired to acknowledge abandonment.
+ """
+ log.info('abandon-device', device=device)
+ raise NotImplementedError()
+
+ def adopt_device(self, device):
+ """
+ Make sure the adapter looks after given device. Called when a device
+ is provisioned top-down and needs to be activated by the adapter.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :return: (Deferred) Shall be fired to acknowledge device ownership.
+ """
+ log.info('adopt-device', device=device)
+ kwargs = {
+ 'adapter': self,
+ 'device-id': device.id
+ }
+ self.devices_handlers[device.id] = self.device_handler_class(**kwargs)
+ d = defer.Deferred()
+ reactor.callLater(0, self.devices_handlers[device.id].activate, d, False)
+ return d
+
+ def reconcile_device(self, device):
+ try:
+ self.devices_handlers[device.id] = self.device_handler_class(self,
+ device.id)
+ # Work only required for devices that are in ENABLED state
+ if device.admin_state == AdminState.ENABLED:
+
+ kwargs = {
+ 'adapter': self,
+ 'device-id': device.id
+ }
+ self.devices_handlers[device.id] =self.device_handler_class(**kwargs)
+ d = defer.Deferred()
+ reactor.callLater(0, self.devices_handlers[device.id].activate, d, True)
+
+ else:
+ # Invoke the children reconciliation which would setup the
+ # basic children data structures
+ self.core_proxy.reconcile_child_devices(device.id)
+ return device
+
+ except Exception, e:
+ log.exception('Exception', e=e)
+
+ def self_test_device(self, device):
+ """
+ This is called to Self a device based on a NBI call.
+ :param device: A Voltha.Device object.
+ :return: Will return result of self test
+ """
+ log.info('self-test-device', device=device.id)
+ # TODO: Support self test?
+ from pyvoltha.protos.voltha_pb2 import SelfTestResponse
+ return SelfTestResponse(result=SelfTestResponse.NOT_SUPPORTED)
+
+ def delete_device(self, device):
+ """
+ This is called to delete a device from the PON based on a NBI call.
+ If the device is an OLT then the whole PON will be deleted.
+
+ :param device: A Voltha.Device object.
+ :return: (Deferred) Shall be fired to acknowledge the deletion.
+ """
+ log.info('delete-device', device=device)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ reactor.callLater(0, handler.delete)
+ del self.device_handlers[device.id]
+ del self.logical_device_id_to_root_device_id[device.parent_id]
+
+ return device
+
+ def download_image(self, device, request):
+ """
+ This is called to request downloading a specified image into the standby partition
+ of a device based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) Shall be fired to acknowledge the download.
+ """
+ log.info('image_download', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.start_download(device, request, defer.Deferred())
+
+ def get_image_download_status(self, device, request):
+ """
+ This is called to inquire about a requested image download status based
+ on a NBI call. The adapter is expected to update the DownloadImage DB object
+ with the query result
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('get_image_download', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.download_status(device, request, defer.Deferred())
+
+ def cancel_image_download(self, device, request):
+ """
+ This is called to cancel a requested image download
+ based on a NBI call. The admin state of the device will not
+ change after the download.
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('cancel_image_download', device=device)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.cancel_download(device, request, defer.Deferred())
+
+ def activate_image_update(self, device, request):
+ """
+ This is called to activate a downloaded image from
+ a standby partition into active partition.
+ Depending on the device implementation, this call
+ may or may not cause device reboot.
+ If no reboot, then a reboot is required to make the
+ activated image running on device
+ This call is expected to be non-blocking.
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('activate_image_update', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.activate_image(device, request, defer.Deferred())
+
+ def revert_image_update(self, device, request):
+ """
+ This is called to deactivate the specified image at
+ active partition, and revert to previous image at
+ standby partition.
+ Depending on the device implementation, this call
+ may or may not cause device reboot.
+ If no reboot, then a reboot is required to make the
+ previous image running on device
+ This call is expected to be non-blocking.
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('revert_image_update', device=device, request=request)
+ handler = self.devices_handlers.get(device.id)
+ if handler is not None:
+ return handler.revert_image(device, request, defer.Deferred())
diff --git a/adapters/adtran_olt/adtran_olt.yml b/adapters/adtran_olt/adtran_olt.yml
new file mode 100644
index 0000000..8dc42a3
--- /dev/null
+++ b/adapters/adtran_olt/adtran_olt.yml
@@ -0,0 +1,67 @@
+---
+# Copyright 2019-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+logging:
+ version: 1
+
+ formatters:
+ brief:
+ format: '%(message)s'
+ default:
+ format: '%(asctime)s.%(msecs)03d %(levelname)-8s %(threadName)s %(module)s.%(funcName)s %(message)s'
+ datefmt: '%Y%m%dT%H%M%S'
+
+ handlers:
+ console:
+ class : logging.StreamHandler
+ level: DEBUG
+ formatter: default
+ stream: ext://sys.stdout
+ localRotatingFile:
+ class: logging.handlers.RotatingFileHandler
+ filename: adtran_olt.log
+ formatter: default
+ maxBytes: 2097152
+ backupCount: 10
+ level: DEBUG
+ null:
+ class: logging.NullHandler
+
+ loggers:
+ amqp:
+ handlers: [null]
+ propagate: False
+ conf:
+ propagate: False
+ '': # root logger
+ handlers: [console, localRotatingFile]
+ level: DEBUG # this can be bumped up/down by -q and -v command line
+ # options
+ propagate: False
+
+
+kafka-cluster-proxy:
+ event_bus_publisher:
+ topic_mappings:
+ 'model-change-events':
+ kafka_topic: 'voltha.events'
+ filters: [null]
+ 'alarms':
+ kafka_topic: 'voltha.alarms'
+ filters: [null]
+ 'kpis':
+ kafka_topic: 'voltha.kpis'
+ filters: [null]
+
diff --git a/adapters/adtran_olt/adtran_olt_handler.py b/adapters/adtran_olt/adtran_olt_handler.py
new file mode 100644
index 0000000..ad32b84
--- /dev/null
+++ b/adapters/adtran_olt/adtran_olt_handler.py
@@ -0,0 +1,1400 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import random
+import xmltodict
+
+from twisted.internet import reactor
+from twisted.internet.defer import returnValue, inlineCallbacks, succeed
+
+from codec.olt_state import OltState
+from adapters.adtran_common.download import Download
+from adapters.adtran_common.flow.flow_entry import FlowEntry
+from net.pio_zmq import PioClient
+from net.pon_zmq import PonClient
+from resources.adtran_olt_resource_manager import AdtranOltResourceMgr
+from adapters.adtran_common.adtran_device_handler import AdtranDeviceHandler
+from resources import adtranolt_platform as platform
+from adapters.adtran_common.net.rcmd import RCmd
+
+from pyvoltha.common.tech_profile.tech_profile import *
+from pyvoltha.common.openflow.utils import ofp, mk_flow_stat, in_port, output, vlan_vid
+from pyvoltha.adapters.common.frameio.frameio import hexify
+from pyvoltha.adapters.extensions.omci.omci import *
+from pyvoltha.protos.voltha_pb2 import Device
+from pyvoltha.protos.common_pb2 import AdminState, OperStatus
+from pyvoltha.protos.device_pb2 import ImageDownload, Image, Port
+from pyvoltha.protos.openflow_13_pb2 import OFPP_MAX, OFPC_GROUP_STATS, OFPC_PORT_STATS, \
+ OFPC_TABLE_STATS, OFPC_FLOW_STATS, ofp_switch_features, ofp_desc
+
+
+class AdtranOltHandler(AdtranDeviceHandler):
+ """
+ The OLT Handler is used to wrap a single instance of a 10G OLT 1-U pizza-box
+ """
+ MIN_OLT_HW_VERSION = datetime.datetime(2017, 1, 5)
+
+ # Full table output
+
+ GPON_OLT_HW_URI = '/restconf/data/gpon-olt-hw'
+ GPON_OLT_HW_STATE_URI = GPON_OLT_HW_URI + ':olt-state'
+ GPON_OLT_HW_CONFIG_URI = GPON_OLT_HW_URI + ':olt'
+ GPON_PON_CONFIG_LIST_URI = GPON_OLT_HW_CONFIG_URI + '/pon'
+
+ # Per-PON info
+
+ GPON_PON_STATE_URI = GPON_OLT_HW_STATE_URI + '/pon={}' # .format(pon-id)
+ GPON_PON_CONFIG_URI = GPON_PON_CONFIG_LIST_URI + '={}' # .format(pon-id)
+
+ GPON_ONU_CONFIG_LIST_URI = GPON_PON_CONFIG_URI + '/onus/onu' # .format(pon-id)
+ GPON_ONU_CONFIG_URI = GPON_ONU_CONFIG_LIST_URI + '={}' # .format(pon-id,onu-id)
+
+ GPON_TCONT_CONFIG_LIST_URI = GPON_ONU_CONFIG_URI + '/t-conts/t-cont' # .format(pon-id,onu-id)
+ GPON_TCONT_CONFIG_URI = GPON_TCONT_CONFIG_LIST_URI + '={}' # .format(pon-id,onu-id,alloc-id)
+
+ GPON_GEM_CONFIG_LIST_URI = GPON_ONU_CONFIG_URI + '/gem-ports/gem-port' # .format(pon-id,onu-id)
+ GPON_GEM_CONFIG_URI = GPON_GEM_CONFIG_LIST_URI + '={}' # .format(pon-id,onu-id,gem-id)
+
+ GPON_PON_DISCOVER_ONU = '/restconf/operations/gpon-olt-hw:discover-onu'
+
+ BASE_ONU_OFFSET = 64
+
+ def __init__(self, **kwargs):
+ super(AdtranOltHandler, self).__init__(**kwargs)
+
+ self.status_poll = None
+ self.status_poll_interval = 5.0
+ self.status_poll_skew = self.status_poll_interval / 10
+ self._pon_agent = None
+ self._pio_agent = None
+ self._ssh_deferred = None
+ self._system_id = None
+ self._download_protocols = None
+ self._download_deferred = None
+ self._downloads = {} # name -> Download obj
+ self._pio_exception_map = []
+
+ self.downstream_shapping_supported = True # 1971320F1-ML-4154 and later
+
+ # FIXME: Remove once we containerize. Only exists to keep BroadCom OpenOMCI ONU Happy
+ # when it reaches up our rear and tries to yank out a UNI port number
+ self.platform_class = None
+
+ # To keep broadcom ONU happy
+ self.platform = platform() # TODO: Remove once tech-profiles & containerization are done !!!
+
+ def __del__(self):
+ # OLT Specific things here.
+ #
+ # If you receive this during 'enable' of the object, you probably threw an
+ # uncaught exception which triggered an errback in the VOLTHA core.
+ d, self.status_poll = self.status_poll, None
+
+ # Clean up base class as well
+ AdtranDeviceHandler.__del__(self)
+
+ def _cancel_deferred(self):
+ d1, self.status_poll = self.status_poll, None
+ d2, self._ssh_deferred = self._ssh_deferred, None
+ d3, self._download_deferred = self._download_deferred, None
+
+ for d in [d1, d2, d3]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ def __str__(self):
+ return "AdtranOltHandler: {}".format(self.ip_address)
+
+ @property
+ def system_id(self):
+ return self._system_id
+
+ @system_id.setter
+ def system_id(self, value):
+ if self._system_id != value:
+ self._system_id = value
+
+ data = json.dumps({'olt-id': str(value)})
+ uri = AdtranOltHandler.GPON_OLT_HW_CONFIG_URI
+ self.rest_client.request('PATCH', uri, data=data, name='olt-system-id')
+
+ @inlineCallbacks
+ def get_device_info(self, device):
+ """
+ Perform an initial network operation to discover the device hardware
+ and software version. Serial Number would be helpful as well.
+
+ Upon successfully retrieving the information, remember to call the
+ 'start_heartbeat' method to keep in contact with the device being managed
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ """
+ from codec.physical_entities_state import PhysicalEntitiesState
+ # TODO: After a CLI 'reboot' command, the device info may get messed up (prints labels and not values)
+ # # Enter device and type 'show'
+ device = {
+ 'model': 'n/a',
+ 'hardware_version': 'unknown',
+ 'serial_number': 'unknown',
+ 'vendor': 'ADTRAN, Inc.',
+ 'firmware_version': 'unknown',
+ 'running-revision': 'unknown',
+ 'candidate-revision': 'unknown',
+ 'startup-revision': 'unknown',
+ 'software-images': []
+ }
+ if self.is_virtual_olt:
+ returnValue(device)
+
+ try:
+ pe_state = PhysicalEntitiesState(self.netconf_client)
+ self.startup = pe_state.get_state()
+ results = yield self.startup
+
+ if results.ok:
+ modules = pe_state.get_physical_entities('adtn-phys-mod:module')
+
+ if isinstance(modules, list):
+ module = modules[0]
+
+ name = str(module.get('model-name', 'n/a')).translate(None, '?')
+ model = str(module.get('model-number', 'n/a')).translate(None, '?')
+
+ device['model'] = '{} - {}'.format(name, model) if len(name) > 0 else \
+ module.get('parent-entity', 'n/a')
+ device['hardware_version'] = str(module.get('hardware-revision',
+ 'n/a')).translate(None, '?')
+ device['serial_number'] = str(module.get('serial-number',
+ 'n/a')).translate(None, '?')
+ if 'software' in module:
+ if 'software' in module['software']:
+ software = module['software']['software']
+ if isinstance(software, dict):
+ device['running-revision'] = str(software.get('running-revision',
+ 'n/a')).translate(None, '?')
+ device['candidate-revision'] = str(software.get('candidate-revision',
+ 'n/a')).translate(None, '?')
+ device['startup-revision'] = str(software.get('startup-revision',
+ 'n/a')).translate(None, '?')
+ elif isinstance(software, list):
+ for sw_item in software:
+ sw_type = sw_item.get('name', '').lower()
+ if sw_type == 'firmware':
+ device['firmware_version'] = str(sw_item.get('running-revision',
+ 'unknown')).translate(None, '?')
+ elif sw_type == 'software':
+ for rev_type in ['startup-revision',
+ 'running-revision',
+ 'candidate-revision']:
+ if rev_type in sw_item:
+ image = Image(name=rev_type,
+ version=sw_item[rev_type],
+ is_active=(rev_type == 'running-revision'),
+ is_committed=True,
+ is_valid=True,
+ install_datetime='Not Available',
+ hash='Not Available')
+ device['software-images'].append(image)
+
+ # Update features based on version
+ # Format expected to be similar to: 1971320F1-ML-4154
+
+ running_version = next((image.version for image in device.get('software-images', list())
+ if image.is_active), '').split('-')
+ if len(running_version) > 2:
+ try:
+ self.downstream_shapping_supported = int(running_version[-1]) >= 4154
+ except ValueError:
+ pass
+
+ except Exception as e:
+ self.log.exception('dev-info-failure', e=e)
+ raise
+
+ returnValue(device)
+
+ def initialize_resource_manager(self):
+ # Initialize the resource and tech profile managers
+ extra_args = '--olt_model {}'.format(self.resource_manager_key)
+ self.resource_mgr = AdtranOltResourceMgr(self.device_id,
+ self.host_and_port,
+ extra_args,
+ self.default_resource_mgr_device_info)
+ self._populate_tech_profile_per_pon_port()
+
+ @property
+ def default_resource_mgr_device_info(self):
+ class AdtranOltDevInfo(object):
+ def __init__(self, pon_ports):
+ self.technology = "xgspon"
+ self.onu_id_start = 0
+ self.onu_id_end = platform.MAX_ONUS_PER_PON
+ self.alloc_id_start = platform.MIN_TCONT_ALLOC_ID
+ self.alloc_id_end = platform.MAX_TCONT_ALLOC_ID
+ self.gemport_id_start = platform.MIN_GEM_PORT_ID
+ self.gemport_id_end = platform.MAX_GEM_PORT_ID
+ self.pon_ports = len(pon_ports)
+ self.max_tconts = platform.MAX_TCONTS_PER_ONU
+ self.max_gem_ports = platform.MAX_GEM_PORTS_PER_ONU
+ self.intf_ids = pon_ports.keys() # PON IDs
+
+ return AdtranOltDevInfo(self.southbound_ports)
+
+ def _populate_tech_profile_per_pon_port(self):
+ self.tech_profiles = {intf_id: self.resource_mgr.resource_managers[intf_id].tech_profile
+ for intf_id in self.resource_mgr.device_info.intf_ids}
+
+ # Make sure we have as many tech_profiles as there are pon ports on
+ # the device
+ assert len(self.tech_profiles) == self.resource_mgr.device_info.pon_ports
+
+ def get_tp_path(self, intf_id, ofp_port_name):
+ # TODO: Should get Table id form the flow, as of now hardcoded to DEFAULT_TECH_PROFILE_TABLE_ID (64)
+ # 'tp_path' contains the suffix part of the tech_profile_instance path.
+ # The prefix to the 'tp_path' should be set to \
+ # TechProfile.KV_STORE_TECH_PROFILE_PATH_PREFIX by the ONU adapter.
+ return self.tech_profiles[intf_id].get_tp_path(DEFAULT_TECH_PROFILE_TABLE_ID,
+ ofp_port_name)
+
+ def delete_tech_profile_instance(self, intf_id, onu_id, logical_port):
+ # Remove the TP instance associated with the ONU
+ ofp_port_name = self.get_ofp_port_name(intf_id, onu_id, logical_port)
+ tp_path = self.get_tp_path(intf_id, ofp_port_name)
+ return self.tech_profiles[intf_id].delete_tech_profile_instance(tp_path)
+
+ def get_ofp_port_name(self, pon_id, onu_id, logical_port_number):
+ parent_port_no = self.pon_id_to_port_number(pon_id)
+ child_device = self.adapter_agent.get_child_device(self.device_id,
+ parent_port_no=parent_port_no, onu_id=onu_id)
+ if child_device is None:
+ self.log.error("could-not-find-child-device", parent_port_no=pon_id, onu_id=onu_id)
+ return None, None
+
+ ports = self.adapter_agent.get_ports(child_device.id, Port.ETHERNET_UNI)
+ port = next((port for port in ports if port.port_no == logical_port_number), None)
+ logical_port = self.adapter_agent.get_logical_port(self.logical_device_id,
+ port.label)
+ ofp_port_name = (logical_port.ofp_port.name, logical_port.ofp_port.port_no)
+
+ return ofp_port_name
+
+ @inlineCallbacks
+ def enumerate_northbound_ports(self, device):
+ """
+ Enumerate all northbound ports of this device.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ try:
+ # Also get the MAC Address for the OLT
+ command = "ip link | grep -A1 eth0 | sed -n -e 's/^.*ether //p' | awk '{ print $1 }'"
+ rcmd = RCmd(self.ip_address, self.netconf_username, self.netconf_password,
+ command)
+ address = yield rcmd.execute()
+ self.mac_address = address.replace('\n', '')
+ self.log.info("mac-addr", mac_addr=self.mac_address)
+
+ except Exception as e:
+ log.exception('mac-address', e=e)
+ raise
+
+ try:
+ from codec.ietf_interfaces import IetfInterfacesState
+ from nni_port import MockNniPort
+
+ ietf_interfaces = IetfInterfacesState(self.netconf_client)
+
+ if self.is_virtual_olt:
+ results = MockNniPort.get_nni_port_state_results()
+ else:
+ self.startup = ietf_interfaces.get_state()
+ results = yield self.startup
+
+ ports = ietf_interfaces.get_port_entries(results, 'ethernet')
+ returnValue(ports)
+
+ except Exception as e:
+ log.exception('enumerate_northbound_ports', e=e)
+ raise
+
+ def process_northbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_northbound_ports' method.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_northbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return: (Deferred or None).
+ """
+ from nni_port import NniPort, MockNniPort
+
+ for port in results.itervalues():
+ port_no = port.get('port_no')
+ assert port_no, 'Port number not found'
+
+ # May already exist if device was not fully reachable when first enabled
+ if port_no not in self.northbound_ports:
+ self.log.info('processing-nni', port_no=port_no, name=port['port_no'])
+ self.northbound_ports[port_no] = NniPort(self, **port) if not self.is_virtual_olt \
+ else MockNniPort(self, **port)
+
+ if len(self.northbound_ports) >= self.max_nni_ports: # TODO: For now, limit number of NNI ports to make debugging easier
+ break
+
+ self.num_northbound_ports = len(self.northbound_ports)
+
+ def _olt_version(self):
+ # Version
+ # 0 Unknown
+ # 1 V1 OMCI format
+ # 2 V2 OMCI format
+ # 3 2018-01-11 or later
+ version = 0
+ info = self._rest_support.get('module-info', [dict()])
+ hw_mod_ver_str = next((mod.get('revision') for mod in info
+ if mod.get('module-name', '').lower() == 'gpon-olt-hw'), None)
+
+ if hw_mod_ver_str is not None:
+ try:
+ from datetime import datetime
+ hw_mod_dt = datetime.strptime(hw_mod_ver_str, '%Y-%m-%d')
+ version = 2 if hw_mod_dt >= datetime(2017, 9, 21) else 2
+
+ except Exception as e:
+ self.log.exception('ver-str-check', e=e)
+
+ return version
+
+ @inlineCallbacks
+ def enumerate_southbound_ports(self, device):
+ """
+ Enumerate all southbound ports of this device.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ ###############################################################################
+ # Determine number of southbound ports. We know it is 16, but this keeps this
+ # device adapter generic for our other OLTs up to this point.
+
+ self.startup = self.rest_client.request('GET', self.GPON_PON_CONFIG_LIST_URI,
+ 'pon-config')
+ try:
+ from codec.ietf_interfaces import IetfInterfacesState
+ from nni_port import MockNniPort
+
+ results = yield self.startup
+
+ ietf_interfaces = IetfInterfacesState(self.netconf_client)
+
+ if self.is_virtual_olt:
+ nc_results = MockNniPort.get_pon_port_state_results()
+ else:
+ self.startup = ietf_interfaces.get_state()
+ nc_results = yield self.startup
+
+ ports = ietf_interfaces.get_port_entries(nc_results, 'xpon')
+ if len(ports) == 0:
+ ports = ietf_interfaces.get_port_entries(nc_results,
+ 'channel-termination')
+ for data in results:
+ pon_id = data['pon-id']
+ port = ports[pon_id + 1]
+ port['pon-id'] = pon_id
+ port['admin_state'] = AdminState.ENABLED \
+ if data.get('enabled', True)\
+ else AdminState.DISABLED
+
+ except Exception as e:
+ log.exception('enumerate_southbound_ports', e=e)
+ raise
+
+ returnValue(ports)
+
+ def process_southbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_southbound_ports' method.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_southbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return: (Deferred or None).
+ """
+ from pon_port import PonPort
+
+ for pon in results.itervalues():
+ pon_id = pon.get('pon-id')
+ assert pon_id is not None, 'PON ID not found'
+ if pon['ifIndex'] is None:
+ pon['port_no'] = self.pon_id_to_port_number(pon_id)
+ else:
+ pass # Need to adjust ONU numbering !!!!
+
+ # May already exist if device was not fully reachable when first enabled
+ if pon_id not in self.southbound_ports:
+ self.southbound_ports[pon_id] = PonPort(self, **pon)
+
+ self.num_southbound_ports = len(self.southbound_ports)
+
+ def pon(self, pon_id):
+ return self.southbound_ports.get(pon_id)
+
+ def complete_device_specific_activation(self, device, reconciling):
+ """
+ Perform an initial network operation to discover the device hardware
+ and software version. Serial Number would be helpful as well.
+
+ This method is called from within the base class's activate generator.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+
+ :param reconciling: (boolean) True if taking over for another VOLTHA
+ """
+ # ZeroMQ clients
+ self._zmq_startup()
+
+ # Download support
+ self._download_deferred = reactor.callLater(0, self._get_download_protocols)
+
+ # Register for adapter messages
+ self.adapter_agent.register_for_inter_adapter_messages()
+
+ # PON Status
+ self.status_poll = reactor.callLater(5, self.poll_for_status)
+ return succeed('Done')
+
+ def on_heatbeat_alarm(self, active):
+ if not active:
+ self.ready_network_access()
+
+ @inlineCallbacks
+ def _get_download_protocols(self):
+ if self._download_protocols is None:
+ try:
+ config = '<filter>' + \
+ '<file-servers-state xmlns="http://www.adtran.com/ns/yang/adtran-file-servers">' + \
+ '<profiles>' + \
+ '<supported-protocol/>' + \
+ '</profiles>' + \
+ '</file-servers-state>' + \
+ '</filter>'
+
+ results = yield self.netconf_client.get(config)
+
+ result_dict = xmltodict.parse(results.data_xml)
+ entries = result_dict['data']['file-servers-state']['profiles']['supported-protocol']
+ self._download_protocols = [entry['#text'].split(':')[-1] for entry in entries
+ if '#text' in entry]
+
+ except Exception as e:
+ self.log.exception('protocols', e=e)
+ self._download_protocols = None
+ self._download_deferred = reactor.callLater(10, self._get_download_protocols)
+
+ @inlineCallbacks
+ def ready_network_access(self):
+ # Check for port status
+ command = 'netstat -pan | grep -i 0.0.0.0:{} | wc -l'.format(self.pon_agent_port)
+ rcmd = RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
+
+ try:
+ self.log.debug('check-request', command=command)
+ results = yield rcmd.execute()
+ self.log.info('check-results', results=results, result_type=type(results))
+ create_it = int(results) != 1
+
+ except Exception as e:
+ self.log.exception('find', e=e)
+ create_it = True
+
+ if create_it:
+ def v1_method():
+ command = 'mkdir -p /etc/pon_agent; touch /etc/pon_agent/debug.conf; '
+ command += 'ps -ae | grep -i ngpon2_agent; '
+ command += 'service_supervisor stop ngpon2_agent; service_supervisor start ngpon2_agent; '
+ command += 'ps -ae | grep -i ngpon2_agent'
+
+ self.log.debug('create-request', command=command)
+ return RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
+
+ def v2_v3_method():
+ # Old V2 method
+ # For V2 images, want -> export ZMQ_LISTEN_ON_ANY_ADDRESS=1
+ # For V3+ images, want -> export AGENT_LISTEN_ON_ANY_ADDRESS=1
+
+ # V3 unifies listening port, compatible with v2
+ cmd = "sed --in-place '/add feature/aexport ZMQ_LISTEN_ON_ANY_ADDRESS=1' " \
+ "/etc/ngpon2_agent/ngpon2_agent_feature_flags; "
+ cmd += "sed --in-place '/add feature/aexport AGENT_LISTEN_ON_ANY_ADDRESS=1' " \
+ "/etc/ngpon2_agent/ngpon2_agent_feature_flags; "
+
+ # Note: 'ps' commands are to help decorate the logfile with useful info
+ cmd += 'ps -ae | grep -i ngpon2_agent; '
+ cmd += 'service_supervisor stop ngpon2_agent; service_supervisor start ngpon2_agent; '
+ cmd += 'ps -ae | grep -i ngpon2_agent'
+
+ self.log.debug('create-request', command=cmd)
+ return RCmd(self.ip_address, self.netconf_username, self.netconf_password, cmd)
+
+ # Look for version
+ next_run = 15
+ version = v2_v3_method # NOTE: Only v2 or later supported.
+
+ if version is not None:
+ try:
+ rcmd = version()
+ results = yield rcmd.execute()
+ self.log.info('create-results', results=results, result_type=type(results))
+
+ except Exception as e:
+ self.log.exception('mkdir-and-restart', e=e)
+ else:
+ next_run = 0
+
+ if next_run > 0:
+ self._ssh_deferred = reactor.callLater(next_run, self.ready_network_access)
+
+ returnValue('retrying' if next_run > 0 else 'ready')
+
+ def _zmq_startup(self):
+ # ZeroMQ clients
+ self._pon_agent = PonClient(self.ip_address,
+ port=self.pon_agent_port,
+ rx_callback=self.rx_pa_packet)
+
+ try:
+ self._pio_agent = PioClient(self.ip_address,
+ port=self.pio_port,
+ rx_callback=self.rx_pio_packet)
+ except Exception as e:
+ self._pio_agent = None
+ self.log.exception('pio-agent', e=e)
+
+ def _zmq_shutdown(self):
+ pon, self._pon_agent = self._pon_agent, None
+ pio, self._pio_agent = self._pio_agent, None
+
+ for c in [pon, pio]:
+ if c is not None:
+ try:
+ c.shutdown()
+ except:
+ pass
+
+ def _unregister_for_inter_adapter_messages(self):
+ try:
+ self.adapter_agent.unregister_for_inter_adapter_messages()
+ except:
+ pass
+
+ def disable(self):
+ self._cancel_deferred()
+
+ # Drop registration for adapter messages
+ self._unregister_for_inter_adapter_messages()
+ self._zmq_shutdown()
+ self._pio_exception_map = []
+
+ super(AdtranOltHandler, self).disable()
+
+ def reenable(self, done_deferred=None):
+ super(AdtranOltHandler, self).reenable(done_deferred=done_deferred)
+
+ # Only do the re-enable if we fully came up on the very first enable attempt.
+ # If we had not, the base class will have initiated the 'activate' for us
+
+ if self._initial_enable_complete:
+ self._zmq_startup()
+ self.adapter_agent.register_for_inter_adapter_messages()
+ self.status_poll = reactor.callLater(1, self.poll_for_status)
+
+ def reboot(self):
+ if not self._initial_enable_complete:
+ # Never contacted the device on the initial startup, do 'activate' steps instead
+ return
+
+ self._cancel_deferred()
+
+ # Drop registration for adapter messages
+ self._unregister_for_inter_adapter_messages()
+ self._zmq_shutdown()
+
+ # Download supported protocols may change (if new image gets activated)
+ self._download_protocols = None
+
+ super(AdtranOltHandler, self).reboot()
+
+ def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
+ super(AdtranOltHandler, self)._finish_reboot(timeout, previous_oper_status, previous_conn_status)
+
+ self.ready_network_access()
+
+ # Download support
+ self._download_deferred = reactor.callLater(0, self._get_download_protocols)
+
+ # Register for adapter messages
+ self.adapter_agent.register_for_inter_adapter_messages()
+ self._zmq_startup()
+
+ self.status_poll = reactor.callLater(5, self.poll_for_status)
+
+ def delete(self):
+ self._cancel_deferred()
+
+ # Drop registration for adapter messages
+ self._unregister_for_inter_adapter_messages()
+ self._zmq_shutdown()
+
+ super(AdtranOltHandler, self).delete()
+
+ def rx_pa_packet(self, packets):
+ if self._pon_agent is not None:
+ for packet in packets:
+ try:
+ pon_id, onu_id, msg_bytes, is_omci = self._pon_agent.decode_packet(packet)
+
+ if is_omci:
+ proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
+
+ if proxy_address is not None:
+ self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
+
+ except Exception as e:
+ self.log.exception('rx-pon-agent-packet', e=e)
+
+ def _compute_logical_port_no(self, port_no, evc_map, packet):
+ logical_port_no = None
+
+ # Upstream direction?
+ if self.is_pon_port(port_no):
+ #TODO: Validate the evc-map name
+ from python.adapters.adtran.adtran_common.flow.evc_map import EVCMap
+ map_info = EVCMap.decode_evc_map_name(evc_map)
+ logical_port_no = int(map_info.get('ingress-port'))
+
+ if logical_port_no is None:
+ # Get PON
+ pon = self.get_southbound_port(port_no)
+
+ # Examine Packet and decode gvid
+ if packet is not None:
+ pass
+
+ elif self.is_nni_port(port_no):
+ nni = self.get_northbound_port(port_no)
+ logical_port = nni.get_logical_port() if nni is not None else None
+ logical_port_no = logical_port.ofp_port.port_no if logical_port is not None else None
+
+ # TODO: Need to decode base on port_no & evc_map
+ return logical_port_no
+
+ def rx_pio_packet(self, packets):
+ self.log.debug('rx-packet-in', type=type(packets), data=packets)
+ assert isinstance(packets, list), 'Expected a list of packets'
+
+ # TODO self._pio_agent.socket.socket.closed might be a good check here as well
+ if self.logical_device_id is not None and self._pio_agent is not None:
+ for packet in packets:
+ url_type = self._pio_agent.get_url_type(packet)
+ if url_type == PioClient.UrlType.EVCMAPS_RESPONSE:
+ exception_map = self._pio_agent.decode_query_response_packet(packet)
+ self.log.debug('rx-pio-packet', exception_map=exception_map)
+ # update latest pio exception map
+ self._pio_exception_map = exception_map
+
+ elif url_type == PioClient.UrlType.PACKET_IN:
+ try:
+ from scapy.layers.l2 import Ether, Dot1Q
+ ifindex, evc_map, packet = self._pio_agent.decode_packet(packet)
+
+ # convert ifindex to physical port number
+ # pon port numbers start at 60001 and end at 600016 (16 pons)
+ if ifindex > 60000 and ifindex < 60017:
+ port_no = (ifindex - 60000) + 4
+ # nni port numbers start at 1401 and end at 1404 (16 nnis)
+ elif ifindex > 1400 and ifindex < 1405:
+ port_no = ifindex - 1400
+ else:
+ raise ValueError('Unknown physical port. ifindex: {}'.format(ifindex))
+
+ logical_port_no = self._compute_logical_port_no(port_no, evc_map, packet)
+
+ if logical_port_no is not None:
+ if self.is_pon_port(port_no) and packet.haslayer(Dot1Q):
+ # Scrub g-vid
+ inner_pkt = packet.getlayer(Dot1Q)
+ assert inner_pkt.haslayer(Dot1Q), 'Expected a C-Tag'
+ packet = Ether(src=packet.src, dst=packet.dst, type=inner_pkt.type)\
+ / inner_pkt.payload
+
+ self.adapter_agent.send_packet_in(logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port_no,
+ packet=str(packet))
+ else:
+ self.log.warn('logical-port-not-found', port_no=port_no, evc_map=evc_map)
+
+ except Exception as e:
+ self.log.exception('rx-pio-packet', e=e)
+
+ else:
+ self.log.warn('packet-in-unknown-url-type', url_type=url_type)
+
+ def packet_out(self, egress_port, msg):
+ """
+ Pass a packet_out message content to adapter so that it can forward it
+ out to the device. This is only called on root devices.
+
+ :param egress_port: egress logical port number
+ :param msg: actual message
+ :return: None """
+
+ if self.pio_port is not None:
+ from scapy.layers.l2 import Ether, Dot1Q
+ from scapy.layers.inet import UDP
+
+ self.log.debug('sending-packet-out', egress_port=egress_port,
+ msg=hexify(msg))
+ pkt = Ether(msg)
+
+ # Remove any extra tags
+ while pkt.type == 0x8100:
+ msg_hex = hexify(msg)
+ msg_hex = msg_hex[:24] + msg_hex[32:]
+ bytes = []
+ msg_hex = ''.join(msg_hex.split(" "))
+ for i in range(0, len(msg_hex), 2):
+ bytes.append(chr(int(msg_hex[i:i+2], 16)))
+
+ msg = ''.join(bytes)
+ pkt = Ether(msg)
+
+ if self._pio_agent is not None:
+ port, ctag, vlan_id, evcmapname = FlowEntry.get_packetout_info(self, egress_port)
+ exceptiontype = None
+ if pkt.type == FlowEntry.EtherType.EAPOL:
+ exceptiontype = 'eapol'
+ ctag = self.utility_vlan
+ elif pkt.type == 2:
+ exceptiontype = 'igmp'
+ elif pkt.type == FlowEntry.EtherType.IPv4:
+ if UDP in pkt and pkt[UDP].sport == 67 and pkt[UDP].dport == 68:
+ exceptiontype = 'dhcp'
+
+ if exceptiontype is None:
+ self.log.warn('packet-out-exceptiontype-unknown', eEtherType=pkt.type)
+
+ elif port is not None and ctag is not None and vlan_id is not None and \
+ evcmapname is not None and self.pio_exception_exists(evcmapname, exceptiontype):
+
+ self.log.debug('sending-pio-packet-out', port=port, ctag=ctag, vlan_id=vlan_id,
+ evcmapname=evcmapname, exceptiontype=exceptiontype)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=vlan_id) /
+ Dot1Q(vlan=ctag, type=pkt.type) /
+ pkt.payload
+ )
+ data = self._pio_agent.encode_packet(port, str(out_pkt), evcmapname, exceptiontype)
+ self.log.debug('pio-packet-out', message=data)
+ try:
+ self._pio_agent.send(data)
+
+ except Exception as e:
+ self.log.exception('pio-send', egress_port=egress_port, e=e)
+ else:
+ self.log.warn('packet-out-flow-not-found', egress_port=egress_port)
+
+ def pio_exception_exists(self, name, exp):
+ # verify exception is in the OLT's reported exception map for this evcmap name
+ if exp is None:
+ return False
+ entry = next((entry for entry in self._pio_exception_map if entry['evc-map-name'] == name), None)
+ if entry is None:
+ return False
+ if exp not in entry['exception-types']:
+ return False
+ return True
+
+ def send_packet_exceptions_request(self):
+ if self._pio_agent is not None:
+ request = self._pio_agent.query_request_packet()
+ try:
+ self._pio_agent.send(request)
+
+ except Exception as e:
+ self.log.exception('pio-send', e=e)
+
+ def poll_for_status(self):
+ self.log.debug('Initiating-status-poll')
+
+ device = self.adapter_agent.get_device(self.device_id)
+
+ if device.admin_state == AdminState.ENABLED and\
+ device.oper_status != OperStatus.ACTIVATING and\
+ self.rest_client is not None:
+ uri = AdtranOltHandler.GPON_OLT_HW_STATE_URI
+ name = 'pon-status-poll'
+ self.status_poll = self.rest_client.request('GET', uri, name=name)
+ self.status_poll.addBoth(self.status_poll_complete)
+ else:
+ self.status_poll = reactor.callLater(0, self.status_poll_complete, 'inactive')
+
+ def status_poll_complete(self, results):
+ """
+ Results of the status poll
+ :param results:
+ """
+ from pon_port import PonPort
+
+ if isinstance(results, dict) and 'pon' in results:
+ try:
+ self.log.debug('status-success')
+ for pon_id, pon in OltState(results).pons.iteritems():
+ pon_port = self.southbound_ports.get(pon_id, None)
+
+ if pon_port is not None and pon_port.state == PonPort.State.RUNNING:
+ pon_port.process_status_poll(pon)
+
+ except Exception as e:
+ self.log.exception('PON-status-poll', e=e)
+
+ # Reschedule
+
+ delay = self.status_poll_interval
+ delay += random.uniform(-delay / 10, delay / 10)
+
+ self.status_poll = reactor.callLater(delay, self.poll_for_status)
+
+ def _create_utility_flow(self):
+ nni_port = self.northbound_ports.get(1).port_no
+ pon_port = self.southbound_ports.get(0).port_no
+
+ return mk_flow_stat(
+ priority=200,
+ match_fields=[
+ in_port(nni_port),
+ vlan_vid(ofp.OFPVID_PRESENT + self.utility_vlan)
+ ],
+ actions=[output(pon_port)]
+ )
+
+ @inlineCallbacks
+ def update_flow_table(self, flows, device):
+ """
+ Update the flow table on the OLT. If an existing flow is not in the list, it needs
+ to be removed from the device.
+
+ :param flows: List of flows that should be installed upon completion of this function
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ """
+ self.log.debug('bulk-flow-update', num_flows=len(flows),
+ device_id=device.id, flows=flows)
+
+ valid_flows = []
+
+ if flows:
+ # Special helper egress Packet In/Out flows
+ special_flow = self._create_utility_flow()
+ valid_flow, evc = FlowEntry.create(special_flow, self)
+
+ if valid_flow is not None:
+ valid_flows.append(valid_flow.flow_id)
+
+ if evc is not None:
+ try:
+ evc.schedule_install()
+ self.add_evc(evc)
+
+ except Exception as e:
+ evc.status = 'EVC Install Exception: {}'.format(e.message)
+ self.log.exception('EVC-install', e=e)
+
+ # verify exception flows were installed by OLT PET process
+ reactor.callLater(5, self.send_packet_exceptions_request)
+
+ # Now process bulk flows
+ for flow in flows:
+ try:
+ # Try to create an EVC.
+ #
+ # The first result is the flow entry that was created. This could be a match to an
+ # existing flow since it is a bulk update. None is returned only if no match to
+ # an existing entry is found and decode failed (unsupported field)
+ #
+ # The second result is the EVC this flow should be added to. This could be an
+ # existing flow (so your adding another EVC-MAP) or a brand new EVC (no existing
+ # EVC-MAPs). None is returned if there are not a valid EVC that can be created YET.
+
+ valid_flow, evc = FlowEntry.create(flow, self)
+
+ if valid_flow is not None:
+ valid_flows.append(valid_flow.flow_id)
+
+ if evc is not None:
+ try:
+ evc.schedule_install()
+ self.add_evc(evc)
+
+ except Exception as e:
+ evc.status = 'EVC Install Exception: {}'.format(e.message)
+ self.log.exception('EVC-install', e=e)
+
+ except Exception as e:
+ self.log.exception('bulk-flow-update-add', e=e)
+
+ # Now drop all flows from this device that were not in this bulk update
+ try:
+ yield FlowEntry.drop_missing_flows(self, valid_flows)
+
+ except Exception as e:
+ self.log.exception('bulk-flow-update-remove', e=e)
+
+ def remove_from_flow_table(self, _flows):
+ """
+ Remove flows from the device
+
+ :param _flows: (list) Flows
+ """
+ raise NotImplementedError
+
+ def add_to_flow_table(self, _flows):
+ """
+ Remove flows from the device
+
+ :param _flows: (list) Flows
+ """
+ raise NotImplementedError
+
+ def get_ofp_device_info(self, device):
+ """
+ Retrieve the OLT device info. This includes the ofp_desc and
+ ofp_switch_features. The existing ofp structures can be used,
+ or all the attributes get added to the Device definition or a new proto
+ definition gets created. This API will allow the Core to create a
+ LogicalDevice associated with this device (OLT only).
+ :param device: device
+ :return: Proto Message (TBD)
+ """
+ from pyvoltha.protos.inter_container_pb2 import SwitchCapability
+ version = device.images.image[0].version
+
+ return SwitchCapability(
+ desc=ofp_desc(mfr_desc='VOLTHA Project',
+ hw_desc=device.hardware_version,
+ sw_desc=version,
+ serial_num=device.serial_number,
+ dp_desc='n/a'),
+ switch_features=ofp_switch_features(n_buffers=256, # TODO fake for now
+ n_tables=2, # TODO ditto
+ capabilities=( # TODO and ditto
+ OFPC_FLOW_STATS |
+ OFPC_TABLE_STATS |
+ OFPC_PORT_STATS |
+ OFPC_GROUP_STATS))
+ )
+
+ def get_ofp_port_info(self, device, port_no):
+ """
+ Retrieve the port info. This includes the ofp_port. The existing ofp
+ structure can be used, or all the attributes get added to the Port
+ definitions or a new proto definition gets created. This API will allow
+ the Core to create a LogicalPort associated with this device.
+ :param device: device
+ :param port_no: port number
+ :return: Proto Message (TBD)
+ """
+ from pyvoltha.protos.inter_container_pb2 import PortCapability
+ # Since the adapter created the device port then it has the reference of the port to
+ # return the capability. TODO: Do a lookup on the NNI port number and return the
+ # appropriate attributes
+ self.log.info('get_ofp_port_info', port_no=port_no,
+ info=self.ofp_port_no, device_id=device.id)
+
+ nni = self.get_northbound_port(port_no)
+ if nni is not None:
+ lp = nni.get_logical_port()
+ if lp is not None:
+ return PortCapability(port=lp)
+
+ # @inlineCallbacks
+ def send_proxied_message(self, proxy_address, msg):
+ self.log.debug('sending-proxied-message', msg=msg)
+
+ if isinstance(msg, Packet):
+ msg = str(msg)
+
+ if self._pon_agent is not None:
+ pon_id, onu_id = self._proxy_address_to_pon_onu_id(proxy_address)
+
+ pon = self.southbound_ports.get(pon_id)
+
+ if pon is not None and pon.enabled:
+ onu = pon.onu(onu_id)
+
+ if onu is not None and onu.enabled:
+ data = self._pon_agent.encode_omci_packet(msg, pon_id, onu_id)
+ try:
+ self._pon_agent.send(data)
+
+ except Exception as e:
+ self.log.exception('pon-agent-send', pon_id=pon_id, onu_id=onu_id, e=e)
+ else:
+ self.log.debug('onu-invalid-or-disabled', pon_id=pon_id, onu_id=onu_id)
+ else:
+ self.log.debug('pon-invalid-or-disabled', pon_id=pon_id)
+
+ def _onu_offset(self, onu_id):
+ # Start ONU's just past the southbound PON port numbers. Since ONU ID's start
+ # at zero, add one
+ # assert AdtranOltHandler.BASE_ONU_OFFSET > (self.num_northbound_ports + self.num_southbound_ports + 1)
+ assert AdtranOltHandler.BASE_ONU_OFFSET > (4 + self.num_southbound_ports + 1) # Skip over uninitialized ports
+ return AdtranOltHandler.BASE_ONU_OFFSET + onu_id
+
+ def _pon_onu_id_to_proxy_address(self, pon_id, onu_id):
+ if pon_id in self.southbound_ports:
+ pon = self.southbound_ports[pon_id]
+ onu = pon.onu(onu_id)
+ proxy_address = onu.proxy_address if onu is not None else None
+
+ else:
+ proxy_address = None
+
+ return proxy_address
+
+ def _proxy_address_to_pon_onu_id(self, proxy_address):
+ """
+ Convert the proxy address to the PON-ID and ONU-ID
+ :param proxy_address: (ProxyAddress)
+ :return: (tuple) pon-id, onu-id
+ """
+ onu_id = proxy_address.onu_id
+ pon_id = self._port_number_to_pon_id(proxy_address.channel_id)
+
+ return pon_id, onu_id
+
+ def pon_id_to_port_number(self, pon_id):
+ return pon_id + 1 + 4 # Skip over uninitialized ports
+
+ def _port_number_to_pon_id(self, port):
+ if self.is_uni_port(port):
+ # Convert to OLT device port
+ port = platform.intf_id_from_uni_port_num(port)
+
+ return port - 1 - 4 # Skip over uninitialized ports
+
+ def is_pon_port(self, port):
+ return self._port_number_to_pon_id(port) in self.southbound_ports
+
+ def is_uni_port(self, port):
+ return OFPP_MAX >= port >= (5 << 11)
+
+ def get_southbound_port(self, port):
+ pon_id = self._port_number_to_pon_id(port)
+ return self.southbound_ports.get(pon_id, None)
+
+ def get_northbound_port(self, port):
+ return self.northbound_ports.get(port, None)
+
+ def get_port_name(self, port, logical_name=False):
+ """
+ Get the name for a port
+
+ Port names are used in various ways within and outside of VOLTHA.
+ Typically, the physical port name will be used during device handler conversations
+ with the hardware (REST, NETCONF, ...) while the logical port name is what the
+ outside world (ONOS, SEBA, ...) uses.
+
+ All ports have a physical port name, but only ports exposed through VOLTHA
+ as a logical port will have a logical port name
+ """
+ if self.is_nni_port(port):
+ port = self.get_northbound_port(port)
+ return port.logical_port_name if logical_name else port.physical_port_name
+
+ if self.is_pon_port(port):
+ port = self.get_southbound_port(port)
+ return port.logical_port_name if logical_name else port.physical_port_name
+
+ if self.is_uni_port(port):
+ return 'uni-{}'.format(port)
+
+ if self.is_logical_port(port):
+ raise NotImplemented('Logical OpenFlow ports are not supported')
+
+ def _update_download_status(self, request, download):
+ if download is not None:
+ request.state = download.download_state
+ request.reason = download.failure_reason
+ request.image_state = download.image_state
+ request.additional_info = download.additional_info
+ request.downloaded_bytes = download.downloaded_bytes
+ else:
+ request.state = ImageDownload.DOWNLOAD_UNKNOWN
+ request.reason = ImageDownload.UNKNOWN_ERROR
+ request.image_state = ImageDownload.IMAGE_UNKNOWN
+ request.additional_info = "Download request '{}' not found".format(request.name)
+ request.downloaded_bytes = 0
+
+ self.adapter_agent.update_image_download(request)
+
+ def start_download(self, device, request, done):
+ """
+ This is called to request downloading a specified image into
+ the standby partition of a device based on a NBI call.
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+ :return: (Deferred) Shall be fired to acknowledge the download.
+ """
+ log.info('image_download', request=request)
+
+ try:
+ if not self._initial_enable_complete:
+ # Never contacted the device on the initial startup, do 'activate' steps instead
+ raise Exception('Device has not finished initial activation')
+
+ if request.name in self._downloads:
+ raise Exception("Download request with name '{}' already exists".
+ format(request.name))
+ try:
+ download = Download.create(self, request, self._download_protocols)
+
+ except Exception:
+ request.additional_info = 'Download request creation failed due to exception'
+ raise
+
+ try:
+ self._downloads[download.name] = download
+ self._update_download_status(request, download)
+ done.callback('started')
+ return done
+
+ except Exception:
+ request.additional_info = 'Download request startup failed due to exception'
+ del self._downloads[download.name]
+ download.cancel_download(request)
+ raise
+
+ except Exception as e:
+ self.log.exception('create', e=e)
+
+ request.reason = ImageDownload.UNKNOWN_ERROR if self._initial_enable_complete\
+ else ImageDownload.DEVICE_BUSY
+ request.state = ImageDownload.DOWNLOAD_FAILED
+ if not request.additional_info:
+ request.additional_info = e.message
+
+ self.adapter_agent.update_image_download(request)
+
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+ raise
+
+ def download_status(self, device, request, done):
+ """
+ This is called to inquire about a requested image download status based
+ on a NBI call.
+
+ The adapter is expected to update the DownloadImage DB object with the
+ query result
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('download_status', request=request)
+ download = self._downloads.get(request.name)
+
+ self._update_download_status(request, download)
+
+ if request.state not in [ImageDownload.DOWNLOAD_STARTED,
+ ImageDownload.DOWNLOAD_SUCCEEDED,
+ ImageDownload.DOWNLOAD_FAILED]:
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+
+ done.callback(request.state)
+ return done
+
+ def cancel_download(self, device, request, done):
+ """
+ This is called to cancel a requested image download based on a NBI
+ call. The admin state of the device will not change after the
+ download.
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) Shall be fired to acknowledge
+ """
+ log.info('cancel_download', request=request)
+
+ download = self._downloads.get(request.name)
+
+ if download is not None:
+ del self._downloads[request.name]
+ result = download.cancel_download(request)
+ self._update_download_status(request, download)
+ done.callback(result)
+ else:
+ self._update_download_status(request, download)
+ done.errback(KeyError('Download request not found'))
+
+ if device.admin_state == AdminState.DOWNLOADING_IMAGE:
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+
+ return done
+
+ def activate_image(self, device, request, done):
+ """
+ This is called to activate a downloaded image from a standby partition
+ into active partition.
+
+ Depending on the device implementation, this call may or may not
+ cause device reboot. If no reboot, then a reboot is required to make
+ the activated image running on device
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('activate_image', request=request)
+
+ download = self._downloads.get(request.name)
+ if download is not None:
+ del self._downloads[request.name]
+ result = download.activate_image()
+ self._update_download_status(request, download)
+ done.callback(result)
+ else:
+ self._update_download_status(request, download)
+ done.errback(KeyError('Download request not found'))
+
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+ return done
+
+ def revert_image(self, device, request, done):
+ """
+ This is called to deactivate the specified image at active partition,
+ and revert to previous image at standby partition.
+
+ Depending on the device implementation, this call may or may not
+ cause device reboot. If no reboot, then a reboot is required to
+ make the previous image running on device
+
+ :param device: A Voltha.Device object.
+ :param request: A Voltha.ImageDownload object.
+ :param done: (Deferred) Deferred to fire when done
+
+ :return: (Deferred) OperationResponse object.
+ """
+ log.info('revert_image', request=request)
+
+ download = self._downloads.get(request.name)
+ if download is not None:
+ del self._downloads[request.name]
+ result = download.revert_image()
+ self._update_download_status(request, download)
+ done.callback(result)
+ else:
+ self._update_download_status(request, download)
+ done.errback(KeyError('Download request not found'))
+
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self.adapter_agent.update_device(device)
+ return done
+
+ def add_onu_device(self, pon_id, onu_id, serial_number):
+ onu_device = self.adapter_agent.get_child_device(self.device_id,
+ serial_number=serial_number)
+ if onu_device is not None:
+ return onu_device
+
+ try:
+ # NOTE - channel_id of onu is set to pon_id
+ pon_port = self.pon_id_to_port_number(pon_id)
+ proxy_address = Device.ProxyAddress(device_id=self.device_id,
+ channel_id=pon_port,
+ onu_id=onu_id,
+ onu_session_id=onu_id)
+
+ self.log.debug("added-onu", port_no=pon_id,
+ onu_id=onu_id, serial_number=serial_number,
+ proxy_address=proxy_address)
+
+ self.adapter_agent.add_onu_device(
+ parent_device_id=self.device_id,
+ parent_port_no=pon_port,
+ vendor_id=serial_number[:4],
+ proxy_address=proxy_address,
+ root=True,
+ serial_number=serial_number,
+ admin_state=AdminState.ENABLED,
+ )
+
+ except Exception as e:
+ self.log.exception('onu-activation-failed', e=e)
+ return None
+
+ def setup_onu_tech_profile(self, pon_id, onu_id, logical_port_number):
+ # Send ONU Adapter related tech profile information.
+ self.log.debug('add-tech-profile-info')
+
+ uni_id = self.platform.uni_id_from_uni_port(logical_port_number)
+ parent_port_no = self.pon_id_to_port_number(pon_id)
+ onu_device = self.adapter_agent.get_child_device(self.device_id,
+ onu_id=onu_id,
+ parent_port_no=parent_port_no)
+
+ ofp_port_name, ofp_port_no = self.get_ofp_port_name(pon_id, onu_id,
+ logical_port_number)
+ if ofp_port_name is None:
+ self.log.error("port-name-not-found")
+ return
+
+ tp_path = self.get_tp_path(pon_id, ofp_port_name)
+
+ self.log.debug('Load-tech-profile-request-to-onu-handler', tp_path=tp_path)
+
+ msg = {'proxy_address': onu_device.proxy_address, 'uni_id': uni_id,
+ 'event': 'download_tech_profile', 'event_data': tp_path}
+
+ # Send the event message to the ONU adapter
+ self.adapter_agent.publish_inter_adapter_message(onu_device.id, msg)
diff --git a/adapters/adtran_olt/codec/__init__.py b/adapters/adtran_olt/codec/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/adapters/adtran_olt/codec/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_olt/codec/ietf_interfaces.py b/adapters/adtran_olt/codec/ietf_interfaces.py
new file mode 100644
index 0000000..0bdf691
--- /dev/null
+++ b/adapters/adtran_olt/codec/ietf_interfaces.py
@@ -0,0 +1,328 @@
+# CCopyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet.defer import inlineCallbacks, returnValue
+import xmltodict
+import structlog
+from pyvoltha.protos.openflow_13_pb2 import OFPPF_1GB_FD, OFPPF_10GB_FD, OFPPF_40GB_FD, OFPPF_100GB_FD
+from pyvoltha.protos.openflow_13_pb2 import OFPPF_FIBER, OFPPF_COPPER
+from pyvoltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPC_PORT_DOWN, OFPPS_LINK_DOWN, OFPPF_OTHER
+from pyvoltha.protos.common_pb2 import OperStatus, AdminState
+
+log = structlog.get_logger()
+
+_ietf_interfaces_config_rpc = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface/>
+ </interfaces>
+ </filter>
+"""
+
+_ietf_interfaces_state_rpc = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface>
+ <name/>
+ <type/>
+ <admin-status/>
+ <oper-status/>
+ <last-change/>
+ <phys-address/>
+ <speed/>
+ </interface>
+ </interfaces-state>
+ </filter>
+"""
+
+_allowed_with_default_types = ['report-all', 'report-all-tagged', 'trim', 'explicit']
+
+# TODO: Centralize the item below as a function in a core util module
+
+
+def _with_defaults(default_type=None):
+ if default_type is None:
+ return ""
+
+ assert(default_type in _allowed_with_default_types)
+ return """
+ <with-defaults xmlns="urn:ietf:params:xml:ns:yang:ietf-netconf-with-defaults">
+ {}</with-defaults>""".format(default_type)
+
+
+class IetfInterfacesConfig(object):
+ def __init__(self, session):
+ self._session = session
+
+ @inlineCallbacks
+ def get_config(self, source='running', with_defaults=None):
+
+ filter = _ietf_interfaces_config_rpc + _with_defaults(with_defaults)
+
+ request = self._session.get(source, filter=filter)
+ rpc_reply = yield request
+ returnValue(rpc_reply)
+
+ def get_interfaces(self, rpc_reply, interface_type=None):
+ """
+ Get the physical entities of a particular type
+ :param rpc_reply: Reply from previous get or request
+ :param interface_type: (String or List) The type of interface (case-insensitive)
+ :return: list) of OrderDict interface entries
+ """
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+
+ entries = result_dict['data']['interfaces']
+
+ if interface_type is None:
+ return entries
+
+ # for entry in entries:
+ # import pprint
+ # log.info(pprint.PrettyPrinter(indent=2).pformat(entry))
+
+ def _matches(entry, value):
+ if 'type' in entry and '#text' in entry['type']:
+ text_val = entry['type']['#text'].lower()
+ if isinstance(value, list):
+ return any(v.lower() in text_val for v in value)
+ return value.lower() in text_val
+ return False
+
+ return [entry for entry in entries if _matches(entry, interface_type)]
+
+
+class IetfInterfacesState(object):
+ def __init__(self, session):
+ self._session = session
+
+ @inlineCallbacks
+ def get_state(self):
+ try:
+ request = self._session.get(_ietf_interfaces_state_rpc)
+ rpc_reply = yield request
+ returnValue(rpc_reply)
+
+ except Exception as e:
+ log.exception('get_state', e=e)
+ raise
+
+ @staticmethod
+ def get_interfaces(self, rpc_reply, key='type', key_value=None):
+ """
+ Get the physical entities of a particular type
+ :param key_value: (String or List) The type of interface (case-insensitive)
+ :return: list) of OrderDict interface entries
+ """
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['interfaces-state']['interface']
+
+ if key_value is None:
+ return entries
+
+ for entry in entries:
+ import pprint
+ log.info(pprint.PrettyPrinter(indent=2).pformat(entry))
+
+ def _matches(entry, key, value):
+ if key in entry and '#text' in entry[key]:
+ text_val = entry[key]['#text'].lower()
+ if isinstance(value, list):
+ return any(v.lower() in text_val for v in value)
+ return value.lower() in text_val
+ return False
+
+ return [entry for entry in entries if _matches(entry, key, key_value)]
+
+ @staticmethod
+ def _get_admin_state(entry):
+ state_map = {
+ 'up': AdminState.ENABLED,
+ 'down': AdminState.DISABLED,
+ 'testing': AdminState.DISABLED
+ }
+ return state_map.get(entry.get('admin-status', 'down'),
+ AdminState.UNKNOWN)
+
+ @staticmethod
+ def _get_oper_status(entry):
+ state_map = {
+ 'up': OperStatus.ACTIVE,
+ 'down': OperStatus.FAILED,
+ 'testing': OperStatus.TESTING,
+ 'unknown': OperStatus.UNKNOWN,
+ 'dormant': OperStatus.DISCOVERED,
+ 'not-present': OperStatus.UNKNOWN,
+ 'lower-layer-down': OperStatus.FAILED
+ }
+ return state_map.get(entry.get('oper-status', 'down'),
+ OperStatus.UNKNOWN)
+
+ @staticmethod
+ def _get_mac_addr(entry):
+ mac_addr = entry.get('phys-address', None)
+ if mac_addr is None:
+ import random
+ # TODO: Get with qumram team about phys addr
+ mac_addr = '08:00:{}{}:{}{}:{}{}:00'.format(random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9),
+ random.randint(0, 9))
+ return mac_addr
+
+ @staticmethod
+ def _get_speed_value(entry):
+ speed = entry.get('speed') or IetfInterfacesState._get_speed_via_name(entry.get('name'))
+ if isinstance(speed, str):
+ return long(speed)
+ return speed
+
+ @staticmethod
+ def _get_speed_via_name(name):
+ speed_map = {
+ 'terabit': 1000000000000,
+ 'hundred-gigabit': 100000000000,
+ 'fourty-gigabit': 40000000000,
+ 'ten-gigabit': 10000000000,
+ 'gigabit': 1000000000,
+ }
+ for n,v in speed_map.iteritems():
+ if n in name.lower():
+ return v
+ return 0
+
+ @staticmethod
+ def _get_of_state(entry):
+ # If port up and ready: OFPPS_LIVE
+ # If port config bit is down: OFPPC_PORT_DOWN
+ # If port state bit is down: OFPPS_LINK_DOWN
+ # if IetfInterfacesState._get_admin_state(entry) == AdminState.ENABLED:
+ # return OFPPS_LIVE \
+ # if IetfInterfacesState._get_oper_status(entry) == OperStatus.ACTIVE \
+ # else OFPPS_LINK_DOWN
+ #
+ # return OFPPC_PORT_DOWN
+ # TODO: Update of openflow port state is not supported, so always say we are alive
+ return OFPPS_LIVE
+
+ @staticmethod
+ def _get_of_capabilities(entry):
+ # The capabilities field is a bitmap that uses a combination of the following flags :
+ # Capabilities supported by the datapath
+ # enum ofp_capabilities {
+ # OFPC_FLOW_STATS = 1 << 0, /* Flow statistics. */
+ # OFPC_TABLE_STATS = 1 << 1, /* Table statistics. */
+ # OFPC_PORT_STATS = 1 << 2, /* Port statistics. */
+ # OFPC_GROUP_STATS = 1 << 3, /* Group statistics. */
+ # OFPC_IP_REASM = 1 << 5, /* Can reassemble IP fragments. */
+ # OFPC_QUEUE_STATS = 1 << 6, /* Queue statistics. */
+ # OFPC_PORT_BLOCKED = 1 << 8, /* Switch will block looping ports. */
+ # OFPC_BUNDLES = 1 << 9, /* Switch supports bundles. */
+ # OFPC_FLOW_MONITORING = 1 << 10, /* Switch supports flow monitoring. */
+ # }
+ # enum ofp_port_features {
+ # OFPPF_10MB_HD = 1 << 0, /* 10 Mb half-duplex rate support. */
+ # OFPPF_10MB_FD = 1 << 1, /* 10 Mb full-duplex rate support. */
+ # OFPPF_100MB_HD = 1 << 2, /* 100 Mb half-duplex rate support. */
+ # OFPPF_100MB_FD = 1 << 3, /* 100 Mb full-duplex rate support. */
+ # OFPPF_1GB_HD = 1 << 4, /* 1 Gb half-duplex rate support. */
+ # OFPPF_1GB_FD = 1 << 5, /* 1 Gb full-duplex rate support. */
+ # OFPPF_10GB_FD = 1 << 6, /* 10 Gb full-duplex rate support. */
+ # OFPPF_40GB_FD = 1 << 7, /* 40 Gb full-duplex rate support. */
+ # OFPPF_100GB_FD = 1 << 8, /* 100 Gb full-duplex rate support. */
+ # OFPPF_1TB_FD = 1 << 9, /* 1 Tb full-duplex rate support. */
+ # OFPPF_OTHER = 1 << 10, /* Other rate, not in the list. */
+ # OFPPF_COPPER = 1 << 11, /* Copper medium. */
+ # OFPPF_FIBER = 1 << 12, /* Fiber medium. */
+ # OFPPF_AUTONEG = 1 << 13, /* Auto-negotiation. */
+ # OFPPF_PAUSE = 1 << 14, /* Pause. */
+ # OFPPF_PAUSE_ASYM = 1 << 15 /* Asymmetric pause. */
+ # }
+ # TODO: Look into adtran-physical-entities and decode xSFP type any other settings
+ return IetfInterfacesState._get_of_speed(entry) | OFPPF_FIBER
+
+ @staticmethod
+ def _get_of_speed(entry):
+ speed = IetfInterfacesState._get_speed_value(entry)
+ speed_map = {
+ 1000000000: OFPPF_1GB_FD,
+ 10000000000: OFPPF_10GB_FD,
+ 40000000000: OFPPF_40GB_FD,
+ 100000000000: OFPPF_100GB_FD,
+ }
+ # return speed_map.get(speed, OFPPF_OTHER)
+ # TODO: For now, force 100 GB
+ return OFPPF_100GB_FD
+
+ @staticmethod
+ def _get_port_number(name, if_index):
+ import re
+
+ formats = [
+ 'xpon \d/{1,2}\d', # OLT version 3 (Feb 2018++)
+ 'Hundred-Gigabit-Ethernet \d/\d/{1,2}\d', # OLT version 2
+ 'XPON \d/\d/{1,2}\d', # OLT version 2
+ 'hundred-gigabit-ethernet \d/{1,2}\d', # OLT version 1
+ 'channel-termination {1,2}\d', # OLT version 1
+ ]
+ p2 = re.compile('\d+')
+
+ for regex in formats:
+ p = re.compile(regex, re.IGNORECASE)
+ match = p.match(name)
+ if match is not None:
+ return int(p2.findall(name)[-1])
+
+ @staticmethod
+ def get_port_entries(rpc_reply, port_type):
+ """
+ Get the port entries that make up the northbound and
+ southbound interfaces
+
+ :param rpc_reply:
+ :param port_type:
+ :return:
+ """
+ ports = dict()
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['interfaces-state']['interface']
+ if not isinstance(entries, list):
+ entries = [entries]
+ port_entries = [entry for entry in entries if 'name' in entry and
+ port_type.lower() in entry['name'].lower()]
+
+ for entry in port_entries:
+ port = {
+ 'port_no': IetfInterfacesState._get_port_number(entry.get('name'),
+ entry.get('ifindex')),
+ 'name': entry.get('name', 'unknown'),
+ 'ifIndex': entry.get('ifIndex'),
+ # 'label': None,
+ 'mac_address': IetfInterfacesState._get_mac_addr(entry),
+ 'admin_state': IetfInterfacesState._get_admin_state(entry),
+ 'oper_status': IetfInterfacesState._get_oper_status(entry),
+ 'ofp_state': IetfInterfacesState._get_of_state(entry),
+ 'ofp_capabilities': IetfInterfacesState._get_of_capabilities(entry),
+ 'current_speed': IetfInterfacesState._get_of_speed(entry),
+ 'max_speed': IetfInterfacesState._get_of_speed(entry),
+ }
+ port_no = port['port_no']
+ if port_no not in ports:
+ ports[port_no] = port
+ else:
+ ports[port_no].update(port)
+
+ return ports
diff --git a/adapters/adtran_olt/codec/olt_config.py b/adapters/adtran_olt/codec/olt_config.py
new file mode 100644
index 0000000..473e2f6
--- /dev/null
+++ b/adapters/adtran_olt/codec/olt_config.py
@@ -0,0 +1,329 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+
+log = structlog.get_logger()
+
+
+class OltConfig(object):
+ """
+ Class to wrap decode of olt container (config) from the ADTRAN
+ gpon-olt-hw.yang YANG model
+ """
+ def __init__(self, packet):
+ self._packet = packet
+ self._pons = None
+
+ def __str__(self):
+ return "OltConfig: {}".format(self.olt_id)
+
+ @property
+ def olt_id(self):
+ """Unique OLT identifier"""
+ return self._packet.get('olt-id', '')
+
+ @property
+ def debug_output(self):
+ """least important level that will output everything"""
+ return self._packet.get('debug-output', 'warning')
+
+ @property
+ def pons(self):
+ if self._pons is None:
+ self._pons = OltConfig.Pon.decode(self._packet.get('pon', None))
+ return self._pons
+
+ class Pon(object):
+ """
+ Provides decode of PON list from within
+ """
+ def __init__(self, packet):
+ assert 'pon-id' in packet, 'pon-id not found'
+ self._packet = packet
+ self._onus = None
+
+ def __str__(self):
+ return "OltConfig.Pon: pon-id: {}".format(self.pon_id)
+
+ @staticmethod
+ def decode(pon_list):
+ pons = {}
+
+ if pon_list is not None:
+ for pon_data in pon_list:
+ pon = OltConfig.Pon(pon_data)
+ assert pon.pon_id not in pons
+ pons[pon.pon_id] = pon
+
+ return pons
+
+ @property
+ def pon_id(self):
+ """PON identifier"""
+ return self._packet['pon-id']
+
+ @property
+ def enabled(self):
+ """The desired state of the interface"""
+ return self._packet.get('enabled', False)
+
+ @property
+ def downstream_fec_enable(self):
+ """Enables downstream Forward Error Correction"""
+ return self._packet.get('downstream-fec-enable', False)
+
+ @property
+ def upstream_fec_enable(self):
+ """Enables upstream Forward Error Correction"""
+ return self._packet.get('upstream-fec-enable', False)
+
+ @property
+ def deployment_range(self):
+ """Maximum deployment distance (meters)"""
+ return self._packet.get('deployment-range', 25000)
+
+ @property
+ def onus(self):
+ if self._onus is None:
+ self._onus = OltConfig.Pon.Onu.decode(self._packet.get('onus', None))
+ return self._onus
+
+ class Onu(object):
+ """
+ Provides decode of onu list for a PON port
+ """
+ def __init__(self, packet):
+ assert 'onu-id' in packet, 'onu-id not found'
+ self._packet = packet
+ self._tconts = None
+ self._tconts_dict = None
+ self._gem_ports = None
+ self._gem_ports_dict = None
+
+ def __str__(self):
+ return "OltConfig.Pon.Onu: onu-id: {}".format(self.onu_id)
+
+ @staticmethod
+ def decode(onu_dict):
+ onus = {}
+
+ if onu_dict is not None:
+ if 'onu' in onu_dict:
+ for onu_data in onu_dict['onu']:
+ onu = OltConfig.Pon.Onu(onu_data)
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
+ elif len(onu_dict) > 0 and 'onu-id' in onu_dict[0]:
+ onu = OltConfig.Pon.Onu(onu_dict[0])
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
+
+ return onus
+
+ @property
+ def onu_id(self):
+ """The ID used to identify the ONU"""
+ return self._packet['onu-id']
+
+ @property
+ def serial_number_64(self):
+ """The serial number (base-64) is unique for each ONU"""
+ return self._packet.get('serial-number', '')
+
+ @property
+ def password(self):
+ """ONU Password"""
+ return self._packet.get('password', bytes(0))
+
+ @property
+ def enable(self):
+ """If true, places the ONU in service"""
+ return self._packet.get('enable', False)
+
+ @property
+ def tconts(self):
+ if self._tconts is None:
+ self._tconts = OltConfig.Pon.Onu.TCont.decode(self._packet.get('t-conts', None))
+ return self._tconts
+
+ @property
+ def tconts_dict(self): # TODO: Remove if not used
+ if self._tconts_dict is None:
+ self._tconts_dict = {tcont.alloc_id: tcont for tcont in self.tconts}
+ return self._tconts_dict
+
+ @property
+ def gem_ports(self):
+ if self._gem_ports is None:
+ self._gem_ports = OltConfig.Pon.Onu.GemPort.decode(self._packet.get('gem-ports', None))
+ return self._gem_ports
+
+ @property
+ def gem_ports_dict(self): # TODO: Remove if not used
+ if self._gem_ports_dict is None:
+ self._gem_ports_dict = {gem.gem_id: gem for gem in self.gem_ports}
+ return self._gem_ports_dict
+
+ class TCont(object):
+ """
+ Provides decode of onu list for the T-CONT container
+ """
+ def __init__(self, packet):
+ assert 'alloc-id' in packet, 'alloc-id not found'
+ self._packet = packet
+ self._traffic_descriptor = None
+ self._best_effort = None
+
+ def __str__(self):
+ return "OltConfig.Pon.Onu.TCont: alloc-id: {}".format(self.alloc_id)
+
+ @staticmethod
+ def decode(tcont_container):
+ tconts = {}
+
+ if tcont_container is not None:
+ for tcont_data in tcont_container.get('t-cont', []):
+ tcont = OltConfig.Pon.Onu.TCont(tcont_data)
+ assert tcont.alloc_id not in tconts
+ tconts[tcont.alloc_id] = tcont
+
+ return tconts
+
+ @property
+ def alloc_id(self):
+ """The ID used to identify the T-CONT"""
+ return self._packet['alloc-id']
+
+ @property
+ def traffic_descriptor(self):
+ """
+ Each Alloc-ID is provisioned with a traffic descriptor that specifies
+ the three bandwidth component parameters: fixed bandwidth, assured
+ bandwidth, and maximum bandwidth, as well as the ternary eligibility
+ indicator for additional bandwidth assignment
+ """
+ if self._traffic_descriptor is None and 'traffic-descriptor' in self._packet:
+ self._traffic_descriptor = OltConfig.Pon.Onu.TCont.\
+ TrafficDescriptor(self._packet['traffic-descriptor'])
+ return self._traffic_descriptor
+
+ class TrafficDescriptor(object):
+ def __init__(self, packet):
+ self._packet = packet
+
+ def __str__(self):
+ return "OltConfig.Pon.Onu.TCont.TrafficDescriptor: {}/{}/{}".\
+ format(self.fixed_bandwidth, self.assured_bandwidth,
+ self.maximum_bandwidth)
+
+ @property
+ def fixed_bandwidth(self):
+ try:
+ return int(self._packet.get('fixed-bandwidth', 0))
+ except:
+ return 0
+
+ @property
+ def assured_bandwidth(self):
+ try:
+ return int(self._packet.get('assured-bandwidth', 0))
+ except:
+ return 0
+
+ @property
+ def maximum_bandwidth(self):
+ try:
+ return int(self._packet.get('maximum-bandwidth', 0))
+ except:
+ return 0
+
+ @property
+ def additional_bandwidth_eligibility(self):
+ return self._packet.get('additional-bandwidth-eligibility', 'none')
+
+ @property
+ def best_effort(self):
+ if self._best_effort is None:
+ self._best_effort = OltConfig.Pon.Onu.TCont.BestEffort.decode(
+ self._packet.get('best-effort', None))
+ return self._best_effort
+
+ class BestEffort(object):
+ def __init__(self, packet):
+ self._packet = packet
+
+ def __str__(self):
+ return "OltConfig.Pon.Onu.TCont.BestEffort: {}".format(self.bandwidth)
+
+ @property
+ def bandwidth(self):
+ return self._packet['bandwidth']
+
+ @property
+ def priority(self):
+ return self._packet['priority']
+
+ @property
+ def weight(self):
+ return self._packet['weight']
+
+ class GemPort(object):
+ """
+ Provides decode of onu list for the gem-ports container
+ """
+ def __init__(self, packet):
+ assert 'port-id' in packet, 'port-id not found'
+ self._packet = packet
+
+ def __str__(self):
+ return "OltConfig.Pon.Onu.GemPort: port-id: {}/{}".\
+ format(self.port_id, self.alloc_id)
+
+ @staticmethod
+ def decode(gem_port_container):
+ gem_ports = {}
+
+ if gem_port_container is not None:
+ for gem_port_data in gem_port_container.get('gem-port', []):
+ gem_port = OltConfig.Pon.Onu.GemPort(gem_port_data)
+ assert gem_port.port_id not in gem_ports
+ gem_ports[gem_port.port_id] = gem_port
+
+ return gem_ports
+
+ @property
+ def port_id(self):
+ """The ID used to identify the GEM Port"""
+ return self._packet['port-id']
+
+ @property
+ def gem_id(self):
+ """The ID used to identify the GEM Port"""
+ return self.port_id
+
+ @property
+ def alloc_id(self):
+ """The Alloc-ID of the T-CONT to which this GEM port is mapped"""
+ return self._packet['alloc-id']
+
+ @property
+ def omci_transport(self):
+ """If true, this GEM port is used to transport the OMCI virtual connection"""
+ return self._packet.get('omci-transport', False)
+
+ @property
+ def encryption(self):
+ """If true, enable encryption using the advanced encryption standard(AES)"""
+ return self._packet.get('encryption', False)
diff --git a/adapters/adtran_olt/codec/olt_state.py b/adapters/adtran_olt/codec/olt_state.py
new file mode 100644
index 0000000..74413a4
--- /dev/null
+++ b/adapters/adtran_olt/codec/olt_state.py
@@ -0,0 +1,290 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import structlog
+
+log = structlog.get_logger()
+
+
+class OltState(object):
+ """
+ Class to wrap decode of olt-state container from the ADTRAN
+ gpon-olt-hw.yang YANG model
+ """
+
+ def __init__(self, packet):
+ self._packet = packet
+ self._pons = None
+
+ def __str__(self):
+ return "OltState: {}".format(self.software_version)
+
+ @property
+ def software_version(self):
+ """The software version of olt driver"""
+ return self._packet.get('software-version', '')
+
+ @property
+ def pons(self):
+ if self._pons is None:
+ self._pons = OltState.Pon.decode(self._packet.get('pon', None))
+ return self._pons
+
+ #############################################################
+ # Act like a container for simple access into PON list
+
+ def __len__(self):
+ return len(self.pons)
+
+ def __getitem__(self, key):
+ if not isinstance(key, int):
+ raise TypeError('Key should be of type int')
+ if key not in self.pons:
+ raise KeyError("key '{}' not found".format(key))
+
+ return self.pons[key]
+
+ def __iter__(self):
+ raise NotImplementedError('TODO: Not yet implemented')
+
+ def __contains__(self, item):
+ if not isinstance(item, int):
+ raise TypeError('Item should be of type int')
+ return item in self.pons
+
+ # TODO: Look at generator support and if it is useful
+
+ class Pon(object):
+ """
+ Provides decode of PON list from within
+ """
+ def __init__(self, packet):
+ assert 'pon-id' in packet
+ self._packet = packet
+ self._onus = None
+ self._gems = None
+
+ def __str__(self):
+ return "OltState.Pon: pon-id: {}".format(self.pon_id)
+
+ @staticmethod
+ def decode(pon_list):
+ # log.debug('Decoding PON List:{}{}'.format(os.linesep,
+ # pprint.PrettyPrinter().pformat(pon_list)))
+ pons = {}
+ for pon_data in pon_list:
+ pon = OltState.Pon(pon_data)
+ assert pon.pon_id not in pons
+ pons[pon.pon_id] = pon
+
+ return pons
+
+ @property
+ def pon_id(self):
+ """PON identifier"""
+ return self._packet['pon-id']
+
+ @property
+ def downstream_wavelength(self):
+ """The wavelength, in nanometers, being used in the downstream direction"""
+ return self._packet.get('downstream-wavelength', 0)
+
+ @property
+ def upstream_wavelength(self):
+ """The wavelength, in nanometers, being used in the upstream direction"""
+ return self._packet.get('upstream-wavelength', 0)
+
+ @property
+ def downstream_channel_id(self):
+ """Downstream wavelength channel identifier associated with this PON."""
+ return self._packet.get('downstream-channel-id', 0)
+
+ @property
+ def rx_packets(self):
+ """Sum all of the RX Packets of GEM ports that are not base TCONT's"""
+ return int(self._packet.get('rx-packets', 0))
+
+ @property
+ def tx_packets(self):
+ """Sum all of the TX Packets of GEM ports that are not base TCONT's"""
+ return int(self._packet.get('tx-packets', 0))
+
+ @property
+ def rx_bytes(self):
+ """Sum all of the RX Octets of GEM ports that are not base TCONT's"""
+ return int(self._packet.get('rx-bytes', 0))
+
+ @property
+ def tx_bytes(self):
+ """Sum all of the TX Octets of GEM ports that are not base TCONT's"""
+ return int(self._packet.get('tx-bytes', 0))
+
+ @property
+ def tx_bip_errors(self):
+ """Sum the TX ONU bip errors to get TX BIP's per PON"""
+ return int(self._packet.get('tx-bip-errors', 0))
+
+ @property
+ def wm_tuned_out_onus(self):
+ """
+ bit array indicates the list of tuned out ONU's that are in wavelength
+ mobility protecting state.
+ onu-bit-octects:
+ type binary { length "4 .. 1024"; }
+ description each bit position indicates corresponding ONU's status
+ (true or false) whether that ONU's is in
+ wavelength mobility protecting state or not
+ For 128 ONTs per PON, the size of this
+ array will be 16. onu-bit-octects[0] and MSB bit in that byte
+ represents ONU 0 etc.
+ """
+ return self._packet.get('wm-tuned-out-onus', bytes(0))
+
+ @property
+ def ont_los(self):
+ """List of configured ONTs that have been previously discovered and are in a los of signal state"""
+ return self._packet.get('ont-los', [])
+
+ @property
+ def discovered_onu(self):
+ """
+ Immutable Set of each Optical Network Unit(ONU) that has been activated via discovery
+ key/value: serial-number (string)
+ """
+ return frozenset([sn['serial-number'] for sn in self._packet.get('discovered-onu', [])
+ if 'serial-number' in sn and sn['serial-number'] != 'AAAAAAAAAAA='])
+
+ @property
+ def gems(self):
+ """This list is not in the proposed BBF model, the stats are part of ietf-interfaces"""
+ if self._gems is None:
+ self._gems = OltState.Pon.Gem.decode(self._packet.get('gem', []))
+ return self._gems
+
+ @property
+ def onus(self):
+ """
+ The map of each Optical Network Unit(ONU). Key: ONU ID (int)
+ """
+ if self._onus is None:
+ self._onus = OltState.Pon.Onu.decode(self._packet.get('onu', []))
+ return self._onus
+
+ class Onu(object):
+ """
+ Provides decode of onu list for a PON port
+ """
+ def __init__(self, packet):
+ assert 'onu-id' in packet, 'onu-id not found in packet'
+ self._packet = packet
+
+ def __str__(self):
+ return "OltState.Pon.Onu: onu-id: {}".format(self.onu_id)
+
+ @staticmethod
+ def decode(onu_list):
+ # log.debug('onus:{}{}'.format(os.linesep,
+ # pprint.PrettyPrinter().pformat(onu_list)))
+ onus = {}
+ for onu_data in onu_list:
+ onu = OltState.Pon.Onu(onu_data)
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
+
+ return onus
+
+ @property
+ def onu_id(self):
+ """The ID used to identify the ONU"""
+ return self._packet['onu-id']
+
+ @property
+ def oper_status(self):
+ """The operational state of each ONU"""
+ return self._packet.get('oper-status', 'unknown')
+
+ @property
+ def reported_password(self):
+ """The password reported by the ONU (binary)"""
+ return self._packet.get('reported-password', bytes(0))
+
+ @property
+ def rssi(self):
+ """The received signal strength indication of the ONU"""
+ return self._packet.get('rssi', -9999)
+
+ @property
+ def equalization_delay(self):
+ """Equalization delay (bits)"""
+ return self._packet.get('equalization-delay', 0)
+
+ @property
+ def fiber_length(self):
+ """Distance to ONU"""
+ return self._packet.get('fiber-length', 0)
+
+ class Gem(object):
+ """
+ Provides decode of onu list for a PON port
+ """
+ def __init__(self, packet):
+ assert 'onu-id' in packet, 'onu-id not found in packet'
+ assert 'port-id' in packet, 'port-id not found in packet'
+ assert 'alloc-id' in packet, 'alloc-id not found in packet'
+ self._packet = packet
+
+ def __str__(self):
+ return "OltState.Pon.Gem: onu-id: {}, gem-id".\
+ format(self.onu_id, self.gem_id)
+
+ @staticmethod
+ def decode(gem_list):
+ # log.debug('gems:{}{}'.format(os.linesep,
+ # pprint.PrettyPrinter().pformat(gem_list)))
+ gems = {}
+ for gem_data in gem_list:
+ gem = OltState.Pon.Gem(gem_data)
+ assert gem.gem_id not in gems
+ gems[gem.gem_id] = gem
+
+ return gems
+
+ @property
+ def onu_id(self):
+ """The ID used to identify the ONU"""
+ return self._packet['onu-id']
+
+ @property
+ def alloc_id(self):
+ return self._packet['alloc-id']
+
+ @property
+ def gem_id(self):
+ return self._packet['port-id']
+
+ @property
+ def tx_packets(self):
+ return int(self._packet.get('tx-packets', 0))
+
+ @property
+ def tx_bytes(self):
+ return int(self._packet.get('tx-bytes', 0))
+
+ @property
+ def rx_packets(self):
+ return int(self._packet.get('rx-packets', 0))
+
+ @property
+ def rx_bytes(self):
+ return int(self._packet.get('rx-bytes', 0))
diff --git a/adapters/adtran_olt/codec/physical_entities_state.py b/adapters/adtran_olt/codec/physical_entities_state.py
new file mode 100644
index 0000000..47187c9
--- /dev/null
+++ b/adapters/adtran_olt/codec/physical_entities_state.py
@@ -0,0 +1,80 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from adapters.adtran_common.net.adtran_netconf import adtran_module_url
+from twisted.internet.defer import inlineCallbacks, returnValue
+import xmltodict
+import structlog
+
+log = structlog.get_logger()
+
+_phys_entities_rpc = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <physical-entities-state xmlns="{}">
+ <physical-entity/>
+ </physical-entities-state>
+ </filter>
+ """.format(adtran_module_url('adtran-physical-entities'))
+
+
+class PhysicalEntitiesState(object):
+ def __init__(self, session):
+ self._session = session
+ self._rpc_reply = None
+
+ @inlineCallbacks
+ def get_state(self):
+ self._rpc_reply = None
+ request = self._session.get(_phys_entities_rpc)
+ self._rpc_reply = yield request
+ returnValue(self._rpc_reply)
+
+ @property
+ def physical_entities(self):
+ """
+ :return: (list) of OrderDict physical entities
+ """
+ if self._rpc_reply is None:
+ # TODO: Support auto-get?
+ return None
+
+ result_dict = xmltodict.parse(self._rpc_reply.data_xml)
+ return result_dict['data']['physical-entities-state']['physical-entity']
+
+ def get_physical_entities(self, classification=None):
+ """
+ Get the physical entities of a particular type
+ :param classification: (String or List) The classification or general hardware type of the
+ component identified by this physical entity
+ (case-insensitive)
+ :return: (list) of OrderDict physical entities
+ """
+ entries = self.physical_entities
+
+ if classification is None:
+ return entries
+
+ # for entry in entries:
+ # import pprint
+ # log.info(pprint.PrettyPrinter(indent=2).pformat(entry))
+
+ def _matches(entry, value):
+ if 'classification' in entry and '#text' in entry['classification']:
+ text_val = entry['classification']['#text'].lower()
+ if isinstance(value, list):
+ return any(v.lower() in text_val for v in value)
+ return value.lower() in text_val
+ return False
+
+ return [entry for entry in entries if _matches(entry, classification)]
diff --git a/adapters/adtran_olt/main.py b/adapters/adtran_olt/main.py
new file mode 100755
index 0000000..07bcc07
--- /dev/null
+++ b/adapters/adtran_olt/main.py
@@ -0,0 +1,558 @@
+#!/usr/bin/env python
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Adtran OLT Adapter main entry point"""
+
+import argparse
+import os
+import time
+
+import arrow
+import yaml
+from packaging.version import Version
+from simplejson import dumps
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.task import LoopingCall
+from zope.interface import implementer
+
+from pyvoltha.common.structlog_setup import setup_logging, update_logging
+from pyvoltha.common.utils.asleep import asleep
+from pyvoltha.common.utils.deferred_utils import TimeOutError
+from pyvoltha.common.utils.dockerhelpers import get_my_containers_name
+from pyvoltha.common.utils.nethelpers import get_my_primary_local_ipv4, \
+ get_my_primary_interface
+from pyvoltha.common.utils.registry import registry, IComponent
+from pyvoltha.adapters.kafka.adapter_proxy import AdapterProxy
+from pyvoltha.adapters.kafka.adapter_request_facade import AdapterRequestFacade
+from pyvoltha.adapters.kafka.core_proxy import CoreProxy
+from pyvoltha.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+ get_messaging_proxy
+from pyvoltha.adapters.kafka.kafka_proxy import KafkaProxy, get_kafka_proxy
+from adtran_olt import AdtranOltAdapter
+from pyvoltha.protos import third_party
+from pyvoltha.protos.adapter_pb2 import AdapterConfig
+
+_ = third_party
+
+
+defs = dict(
+ version_file='./VERSION',
+ config=os.environ.get('CONFIG', './adapters-adtran_olt.yml'),
+ container_name_regex=os.environ.get('CONTAINER_NUMBER_EXTRACTOR', '^.*\.(['
+ '0-9]+)\..*$'),
+ consul=os.environ.get('CONSUL', 'localhost:8500'),
+ name=os.environ.get('NAME', 'adtran_olt'),
+ vendor=os.environ.get('VENDOR', 'Voltha Project'),
+ device_type=os.environ.get('DEVICE_TYPE', 'adtran_olt'),
+ accept_bulk_flow=os.environ.get('ACCEPT_BULK_FLOW', True),
+ accept_atomic_flow=os.environ.get('ACCEPT_ATOMIC_FLOW', True),
+ etcd=os.environ.get('ETCD', 'localhost:2379'),
+ core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
+ interface=os.environ.get('INTERFACE', get_my_primary_interface()),
+ instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
+ kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
+ kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
+ backend=os.environ.get('BACKEND', 'none'),
+ retry_interval=os.environ.get('RETRY_INTERVAL', 2),
+ heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
+
+ # Following are for debugging
+ debug_enabled=True,
+ debug_host='work.bcsw.net',
+ # debug_host='10.0.2.15',
+ debug_port=8765,
+)
+
+
+def parse_args():
+ parser = argparse.ArgumentParser()
+
+ _help = ('Path to adapters-adtran_olt.yml config file (default: %s). '
+ 'If relative, it is relative to main.py of Adtran OLT adapter.'
+ % defs['config'])
+ parser.add_argument('-c', '--config',
+ dest='config',
+ action='store',
+ default=defs['config'],
+ help=_help)
+
+ _help = 'Regular expression for extracting container number from ' \
+ 'container name (default: %s)' % defs['container_name_regex']
+ parser.add_argument('-X', '--container-number-extractor',
+ dest='container_name_regex',
+ action='store',
+ default=defs['container_name_regex'],
+ help=_help)
+
+ _help = '<hostname>:<port> to consul agent (default: %s)' % defs['consul']
+ parser.add_argument('-C', '--consul',
+ dest='consul',
+ action='store',
+ default=defs['consul'],
+ help=_help)
+
+ _help = 'name of this adapter (default: %s)' % defs['name']
+ parser.add_argument('-na', '--name',
+ dest='name',
+ action='store',
+ default=defs['name'],
+ help=_help)
+
+ _help = 'vendor of this adapter (default: %s)' % defs['vendor']
+ parser.add_argument('-ven', '--vendor',
+ dest='vendor',
+ action='store',
+ default=defs['vendor'],
+ help=_help)
+
+ _help = 'supported device type of this adapter (default: %s)' % defs[
+ 'device_type']
+ parser.add_argument('-dt', '--device_type',
+ dest='device_type',
+ action='store',
+ default=defs['device_type'],
+ help=_help)
+
+ _help = 'specifies whether the device type accepts bulk flow updates ' \
+ 'adapter (default: %s)' % defs['accept_bulk_flow']
+ parser.add_argument('-abf', '--accept_bulk_flow',
+ dest='accept_bulk_flow',
+ action='store',
+ default=defs['accept_bulk_flow'],
+ help=_help)
+
+ _help = 'specifies whether the device type accepts add/remove flow ' \
+ '(default: %s)' % defs['accept_atomic_flow']
+ parser.add_argument('-aaf', '--accept_atomic_flow',
+ dest='accept_atomic_flow',
+ action='store',
+ default=defs['accept_atomic_flow'],
+ help=_help)
+
+ _help = '<hostname>:<port> to etcd server (default: %s)' % defs['etcd']
+ parser.add_argument('-e', '--etcd',
+ dest='etcd',
+ action='store',
+ default=defs['etcd'],
+ help=_help)
+
+ _help = ('unique string id of this container instance (default: %s)'
+ % defs['instance_id'])
+ parser.add_argument('-i', '--instance-id',
+ dest='instance_id',
+ action='store',
+ default=defs['instance_id'],
+ help=_help)
+
+ _help = 'ETH interface to recieve (default: %s)' % defs['interface']
+ parser.add_argument('-I', '--interface',
+ dest='interface',
+ action='store',
+ default=defs['interface'],
+ help=_help)
+
+ _help = 'omit startup banner log lines'
+ parser.add_argument('-n', '--no-banner',
+ dest='no_banner',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = 'do not emit periodic heartbeat log messages'
+ parser.add_argument('-N', '--no-heartbeat',
+ dest='no_heartbeat',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = "suppress debug and info logs"
+ parser.add_argument('-q', '--quiet',
+ dest='quiet',
+ action='count',
+ help=_help)
+
+ _help = 'enable verbose logging'
+ parser.add_argument('-v', '--verbose',
+ dest='verbose',
+ action='count',
+ help=_help)
+
+ _help = ('use docker container name as container instance id'
+ ' (overrides -i/--instance-id option)')
+ parser.add_argument('--instance-id-is-container-name',
+ dest='instance_id_is_container_name',
+ action='store_true',
+ default=False,
+ help=_help)
+
+ _help = ('<hostname>:<port> of the kafka adapter broker (default: %s). ('
+ 'If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka_adapter'])
+ parser.add_argument('-KA', '--kafka_adapter',
+ dest='kafka_adapter',
+ action='store',
+ default=defs['kafka_adapter'],
+ help=_help)
+
+ _help = ('<hostname>:<port> of the kafka cluster broker (default: %s). ('
+ 'If not '
+ 'specified (None), the address from the config file is used'
+ % defs['kafka_cluster'])
+ parser.add_argument('-KC', '--kafka_cluster',
+ dest='kafka_cluster',
+ action='store',
+ default=defs['kafka_cluster'],
+ help=_help)
+
+ _help = 'backend to use for config persistence'
+ parser.add_argument('-b', '--backend',
+ default=defs['backend'],
+ choices=['none', 'consul', 'etcd'],
+ help=_help)
+
+ _help = 'topic of core on the kafka bus'
+ parser.add_argument('-ct', '--core_topic',
+ dest='core_topic',
+ action='store',
+ default=defs['core_topic'],
+ help=_help)
+
+ _help = 'Enable remote python debug'
+ parser.add_argument('-de', '--debug_enabled',
+ dest='debug_enabled',
+ action='store_true',
+ default=defs['debug_enabled'],
+ help=_help)
+
+ _help = 'remote debug hostname or IP address'
+ parser.add_argument('-dh', '--debug_host',
+ dest='debug_host',
+ action='store',
+ default=defs['debug_host'],
+ help=_help)
+
+ _help = 'remote debug port number'
+ parser.add_argument('-dp', '--debug_port',
+ dest='debug_port',
+ action='store',
+ default=defs['debug_port'],
+ help=_help)
+
+ args = parser.parse_args()
+
+ # post-processing
+
+ if args.instance_id_is_container_name:
+ args.instance_id = get_my_containers_name()
+
+ return args
+
+
+def setup_remote_debug(host, port, logger):
+ try:
+ import sys
+ sys.path.append('/voltha/pydevd/pycharm-debug.egg')
+ import pydevd
+ # Initial breakpoint
+
+ pydevd.settrace(host, port=port, stdoutToServer=True, stderrToServer=True, suspend=False)
+
+ except ImportError:
+ logger.error('Error importing pydevd package')
+ logger.error('REMOTE DEBUGGING will not be supported in this run...')
+ # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
+
+ except AttributeError:
+ logger.error('Attribute error. Perhaps try to explicitly set PYTHONPATH to'
+ 'pydevd directory and rlogger.errorun again?')
+ logger.error('REMOTE DEBUGGING will not be supported in this run...')
+ # Continue on, you do not want to completely kill VOLTHA, you just need to fix it.
+
+ except:
+ import sys
+ logger.error("pydevd startup exception: %s" % sys.exc_info()[0])
+ print('REMOTE DEBUGGING will not be supported in this run...')
+
+
+def load_config(args):
+ path = args.config
+ if path.startswith('.'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+ path = os.path.abspath(path)
+ with open(path) as fd:
+ config = yaml.load(fd)
+ return config
+
+
+def print_banner(log):
+ log.info(" _____ _______ _____ _ _ ____ _ _______ ")
+ log.info(" /\ | __ \__ __| __ \ /\ | \ | | / __ \| | |__ __|")
+ log.info(" / \ | | | | | | | |__) | / \ | \| | | | | | | | | ")
+ log.info(" / /\ \ | | | | | | | _ / / /\ \ | . ` | | | | | | | | ")
+ log.info(" / ____ \| |__| | | | | | \ \ / ____ \| |\ | | |__| | |____| | ")
+ log.info(" /_/ \_\_____/ |_| |_| _\_\/_/ \_\_| \_| \____/|______|_| ")
+ log.info(" /\ | | | | ")
+ log.info(" / \ __| | __ _ _ __ | |_ ___ _ __ ")
+ log.info(" / /\ \ / _` |/ _` | '_ \| __/ _ \ '__| ")
+ log.info(" / ____ \ (_| | (_| | |_) | || __/ | ")
+ log.info(" /_/ \_\__,_|\__,_| .__/ \__\___|_| ")
+ log.info(" | | ")
+ log.info(" |_| ")
+ log.info("")
+ log.info('(to stop: press Ctrl-C)')
+
+
+@implementer(IComponent)
+class Main(object):
+
+ def __init__(self):
+
+ self.args = args = parse_args()
+ self.config = load_config(args)
+
+ verbosity_adjust = (args.verbose or 0) - (args.quiet or 0)
+ self.log = setup_logging(self.config.get('logging', {}),
+ args.instance_id,
+ verbosity_adjust=verbosity_adjust)
+ self.log.info('container-number-extractor',
+ regex=args.container_name_regex)
+
+ if args.debug_enabled:
+ setup_remote_debug(args.debug_host, args.debug_port, self.log)
+
+ self.adtran_olt_adapter_version = self.get_version()
+ self.log.info('ADTRAN-OLT-Adapter-Version', version=self.adtran_olt_adapter_version)
+
+ if not args.no_banner:
+ print_banner(self.log)
+
+ self.adapter = None
+ self.core_proxy = None
+ self.adapter_proxy = None
+
+ # Create a unique instance id using the passed-in instance id and
+ # UTC timestamp
+ current_time = arrow.utcnow().timestamp
+ self.instance_id = self.args.instance_id + '_' + str(current_time)
+
+ self.core_topic = args.core_topic
+ self.listening_topic = args.name
+ self.startup_components()
+
+ if not args.no_heartbeat:
+ self.start_heartbeat()
+ self.start_kafka_cluster_heartbeat(self.instance_id)
+
+ def get_version(self):
+ path = defs['version_file']
+ if not path.startswith('/'):
+ dir = os.path.dirname(os.path.abspath(__file__))
+ path = os.path.join(dir, path)
+
+ path = os.path.abspath(path)
+ version_file = open(path, 'r')
+ v = version_file.read()
+
+ # Use Version to validate the version string - exception will be raised
+ # if the version is invalid
+ Version(v)
+
+ version_file.close()
+ return v
+
+ def start(self):
+ self.start_reactor() # will not return except Keyboard interrupt
+
+ def stop(self):
+ pass
+
+ def get_args(self):
+ """Allow access to command line args"""
+ return self.args
+
+ def get_config(self):
+ """Allow access to content of config file"""
+ return self.config
+
+ def _get_adapter_config(self):
+ cfg = AdapterConfig()
+ return cfg
+
+ @inlineCallbacks
+ def startup_components(self):
+ try:
+ self.log.info('starting-internal-components',
+ consul=self.args.consul,
+ etcd=self.args.etcd)
+
+ registry.register('main', self)
+
+ # Update the logger to output the vcore id.
+ self.log = update_logging(instance_id=self.instance_id,
+ vcore_id=None)
+
+ yield registry.register(
+ 'kafka_cluster_proxy',
+ KafkaProxy(
+ self.args.consul,
+ self.args.kafka_cluster,
+ config=self.config.get('kafka-cluster-proxy', {})
+ )
+ ).start()
+
+ config = self._get_adapter_config()
+
+ self.core_proxy = CoreProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
+ self.adapter_proxy = AdapterProxy(
+ kafka_proxy=None,
+ core_topic=self.core_topic,
+ my_listening_topic=self.listening_topic)
+
+ self.adapter = AdtranOltAdapter(core_proxy=self.core_proxy,
+ adapter_proxy=self.adapter_proxy,
+ config=config)
+
+ adtran_request_handler = AdapterRequestFacade(adapter=self.adapter)
+
+ yield registry.register(
+ 'kafka_adapter_proxy',
+ IKafkaMessagingProxy(
+ kafka_host_port=self.args.kafka_adapter,
+ # TODO: Add KV Store object reference
+ kv_store=self.args.backend,
+ default_topic=self.args.name,
+ group_id_prefix=self.args.instance_id,
+ target_cls=adtran_request_handler
+ )
+ ).start()
+
+ self.core_proxy.kafka_proxy = get_messaging_proxy()
+ self.adapter_proxy.kafka_proxy = get_messaging_proxy()
+
+ # retry for ever
+ res = yield self._register_with_core(-1)
+
+ self.log.info('started-internal-services')
+
+ except Exception as e:
+ self.log.exception('Failure-to-start-all-components', e=e)
+
+ @inlineCallbacks
+ def shutdown_components(self):
+ """Execute before the reactor is shut down"""
+ self.log.info('exiting-on-keyboard-interrupt')
+ for component in reversed(registry.iterate()):
+ yield component.stop()
+
+ import threading
+ self.log.info('THREADS:')
+ main_thread = threading.current_thread()
+ for t in threading.enumerate():
+ if t is main_thread:
+ continue
+ if not t.isDaemon():
+ continue
+ self.log.info('joining thread {} {}'.format(
+ t.getName(), "daemon" if t.isDaemon() else "not-daemon"))
+ t.join()
+
+ def start_reactor(self):
+ from twisted.internet import reactor
+ reactor.callWhenRunning(
+ lambda: self.log.info('twisted-reactor-started'))
+ reactor.addSystemEventTrigger('before', 'shutdown',
+ self.shutdown_components)
+ reactor.run()
+
+ @inlineCallbacks
+ def _register_with_core(self, retries):
+ while 1:
+ try:
+ resp = yield self.core_proxy.register(
+ self.adapter.adapter_descriptor(),
+ self.adapter.device_types())
+ if resp:
+ self.log.info('registered-with-core',
+ coreId=resp.instance_id)
+ returnValue(resp)
+ except TimeOutError as e:
+ self.log.warn("timeout-when-registering-with-core", e=e)
+ if retries == 0:
+ self.log.exception("no-more-retries", e=e)
+ raise
+ else:
+ retries = retries if retries < 0 else retries - 1
+ yield asleep(defs['retry_interval'])
+ except Exception as e:
+ self.log.exception("failed-registration", e=e)
+ raise
+
+ def start_heartbeat(self):
+
+ t0 = time.time()
+ t0s = time.ctime(t0)
+
+ def heartbeat():
+ self.log.debug(status='up', since=t0s, uptime=time.time() - t0)
+
+ lc = LoopingCall(heartbeat)
+ lc.start(10)
+
+ # Temporary function to send a heartbeat message to the external kafka
+ # broker
+ def start_kafka_cluster_heartbeat(self, instance_id):
+ # For heartbeat we will send a message to a specific "voltha-heartbeat"
+ # topic. The message is a protocol buf
+ # message
+ message = dict(
+ type='heartbeat',
+ adapter=self.args.name,
+ instance=instance_id,
+ ip=get_my_primary_local_ipv4()
+ )
+ topic = defs['heartbeat_topic']
+
+ def send_msg(start_time):
+ try:
+ kafka_cluster_proxy = get_kafka_proxy()
+ if kafka_cluster_proxy and not kafka_cluster_proxy.is_faulty():
+ # self.log.debug('kafka-proxy-available')
+ message['ts'] = arrow.utcnow().timestamp
+ message['uptime'] = time.time() - start_time
+ # self.log.debug('start-kafka-heartbeat')
+ kafka_cluster_proxy.send_message(topic, dumps(message))
+ else:
+ self.log.error('kafka-proxy-unavailable')
+ except Exception, err:
+ self.log.exception('failed-sending-message-heartbeat', e=err)
+
+ try:
+ t0 = time.time()
+ lc = LoopingCall(send_msg, t0)
+ lc.start(10)
+ except Exception, e:
+ self.log.exception('failed-kafka-heartbeat', e=e)
+
+
+if __name__ == '__main__':
+ Main().start()
diff --git a/adapters/adtran_olt/net/__init__.py b/adapters/adtran_olt/net/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/adapters/adtran_olt/net/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_olt/net/pio_zmq.py b/adapters/adtran_olt/net/pio_zmq.py
new file mode 100644
index 0000000..d50b686
--- /dev/null
+++ b/adapters/adtran_olt/net/pio_zmq.py
@@ -0,0 +1,126 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import random
+from adapters.adtran_common.net.adtran_zmq import AdtranZmqClient
+from enum import IntEnum
+
+DEFAULT_PIO_TCP_PORT = 5555
+#DEFAULT_PIO_TCP_PORT = 5657
+
+
+class PioClient(AdtranZmqClient):
+ """
+ Adtran ZeroMQ Client for packet in/out service
+ """
+ def __init__(self, ip_address, rx_callback, port):
+ super(PioClient, self).__init__(ip_address, rx_callback, port)
+ self._seq_number = random.randint(1, 2**32)
+
+ class UrlType(IntEnum):
+ PACKET_IN = 0 # Packet In
+ PACKET_OUT = 1 # Packet Out
+ EVCMAPS_REQUEST = 2 # EVC-MAPs request
+ EVCMAPS_RESPONSE = 3 # EVC-MAPs response
+ UNKNOWN = 4 # UNKNOWN URL
+
+ def get_url_type(self, packet):
+ url_type = PioClient.UrlType.UNKNOWN
+ message = json.loads(packet)
+ if 'url' in message:
+ if message['url'] == 'adtran-olt-of-control/packet-in':
+ url_type = PioClient.UrlType.PACKET_IN
+ elif message['url'] == 'adtran-olt-of-control/packet-out':
+ url_type = PioClient.UrlType.PACKET_OUT
+ elif message['url'] == 'adtran-olt-of-control/evc-map-response':
+ url_type = PioClient.UrlType.EVCMAPS_RESPONSE
+ elif message['url'] == 'adtran-olt-of-control/evc-map-request':
+ url_type = PioClient.UrlType.EVCMAPS_REQUEST
+ return url_type
+
+ def decode_packet(self, packet):
+ from scapy.layers.l2 import Ether
+ try:
+ message = json.loads(packet)
+ self.log.debug('message', message=message)
+
+ for field in ['url', 'evc-map-name', 'total-len', 'port-number', 'message-contents']:
+ assert field in message, "Missing field '{}' in received packet".format(field)
+
+ decoded = message['message-contents'].decode('base64')
+
+ assert len(decoded.encode('hex'))/2 == message['total-len'], \
+ 'Decoded length ({}) != Message Encoded length ({})'.\
+ format(len(decoded.encode('hex')), message['total-len'])
+
+ return int(message['port-number']), message['evc-map-name'], Ether(decoded)
+
+ except Exception as e:
+ self.log.exception('decode', e=e)
+ raise
+
+ @property
+ def sequence_number(self):
+ if self._seq_number >= 2**32:
+ self._seq_number = 0
+ else:
+ self._seq_number += 1
+
+ return self._seq_number
+
+ def encode_packet(self, egress_port, packet, map_name='TODO', exception_type=''):
+ """
+ Encode a message for transmission as a Packet Out
+ :param egress_port: (int) egress physical port number
+ :param packet: (str) actual message
+ :param map_name: (str) EVC-MAP Name
+ :param exception_type: (str) Type of exception
+ """
+ return json.dumps({
+ 'url': 'adtran-olt-of-control/packet-out',
+ 'buffer-id': self.sequence_number,
+ 'total-len': len(packet),
+ 'evc-map-name': map_name,
+ 'exception-type': exception_type,
+ 'port-number': egress_port,
+ 'message-contents': packet.encode('base64')
+ })
+
+ def query_request_packet(self):
+ """
+ Create query-request to get all installed exceptions
+ :return: Request string
+ """
+ return json.dumps({
+ 'url': 'adtran-olt-of-control/evc-map-request'
+ })
+
+ def decode_query_response_packet(self, packet, map_name=None):
+ """
+ Create query-request to get all installed exceptions
+ :param map_name: (str) EVC-MAP Name (None=all)
+ :param packet: returned query response packet
+ :return: list of evcmaps and associated exceptions
+ """
+ from scapy.layers.l2 import Ether
+ message = json.loads(packet)
+ self.log.debug('message', message=message)
+
+ if 'url' in message and message['url'] == 'adtran-olt-of-control/evc-map-response':
+ maps=message['evc-map-list']
+ if maps is not None:
+ self.log.debug('evc-maps-query-response', maps=maps)
+ return maps
+ return []
diff --git a/adapters/adtran_olt/net/pon_zmq.py b/adapters/adtran_olt/net/pon_zmq.py
new file mode 100644
index 0000000..aa42917
--- /dev/null
+++ b/adapters/adtran_olt/net/pon_zmq.py
@@ -0,0 +1,61 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import struct
+import binascii
+from adapters.adtran_common.net.adtran_zmq import AdtranZmqClient
+
+DEFAULT_PON_AGENT_TCP_PORT = 5656
+
+
+class PonClient(AdtranZmqClient):
+ """
+ Adtran ZeroMQ Client for PON Agent service
+ """
+ def __init__(self, ip_address, rx_callback, port):
+ super(PonClient, self).__init__(ip_address, rx_callback, port)
+
+ def encode_omci_packet(self, msg, pon_index, onu_id):
+ """
+ Create an OMCI Tx Packet for the specified ONU
+
+ :param msg: (str) OMCI message to send
+ :param pon_index: (unsigned int) PON Port index
+ :param onu_id: (unsigned int) ONU ID
+
+ :return: (bytes) octet string to send
+ """
+ return json.dumps({"operation": "NOTIFY",
+ "url": "adtran-olt-pon-control/omci-message",
+ "pon-id": pon_index,
+ "onu-id": onu_id,
+ "message-contents": msg.decode("hex").encode("base64")
+ })
+
+ def decode_packet(self, packet):
+ """
+ Decode the PON-Agent packet provided by the ZMQ client
+
+ :param packet: (bytes) Packet
+ :return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
+ and a flag indicating if it is OMCI
+ """
+ msg = json.loads(packet)
+ pon_id = msg['pon-id']
+ onu_id = msg['onu-id']
+ msg_data = msg['message-contents'].decode("base64")
+ is_omci = msg['operation'] == "NOTIFY" and 'omci-message' in msg['url']
+
+ return pon_id, onu_id, msg_data, is_omci
diff --git a/adapters/adtran_olt/nni_port.py b/adapters/adtran_olt/nni_port.py
new file mode 100644
index 0000000..af11e9b
--- /dev/null
+++ b/adapters/adtran_olt/nni_port.py
@@ -0,0 +1,457 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import arrow
+
+import structlog
+import xmltodict
+from adapters.adtran_common.port import AdtnPort
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
+from twisted.python.failure import Failure
+from pyvoltha.protos.common_pb2 import OperStatus, AdminState
+from pyvoltha.protos.device_pb2 import Port
+from pyvoltha.protos.logical_device_pb2 import LogicalPort
+from pyvoltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
+
+
+class NniPort(AdtnPort):
+ """
+ Northbound network port, often Ethernet-based
+ """
+ def __init__(self, parent, **kwargs):
+ super(NniPort, self).__init__(parent, **kwargs)
+
+ # TODO: Weed out those properties supported by common 'Port' object
+
+ self.log = structlog.get_logger(port_no=kwargs.get('port_no'))
+ self.log.info('creating')
+
+ # ONOS/SEBA wants 'nni-<port>' for port names, OLT NETCONF wants their
+ # name (something like hundred-gigabit-ethernet 0/1) which is reported
+ # when we enumerated the ports
+ self._physical_port_name = kwargs.get('name', 'nni-{}'.format(self._port_no))
+ self._logical_port_name = 'nni-{}'.format(self._port_no)
+
+ self._logical_port = None
+
+ self.sync_tick = 10.0
+
+ self._stats_tick = 5.0
+ self._stats_deferred = None
+
+ # Local cache of NNI configuration
+ self._ianatype = '<type xmlns:ianaift="urn:ietf:params:xml:ns:yang:iana-if-type">ianaift:ethernetCsmacd</type>'
+
+ # And optional parameters
+ # TODO: Currently cannot update admin/oper status, so create this enabled and active
+ # self._admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
+ # self._oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
+
+ self._label = self._physical_port_name
+ self._mac_address = kwargs.pop('mac_address', '00:00:00:00:00:00')
+ # TODO: Get with JOT and find out how to pull out MAC Address via NETCONF
+ # TODO: May need to refine capabilities into current, advertised, and peer
+
+ self._ofp_capabilities = kwargs.pop('ofp_capabilities', OFPPF_100GB_FD | OFPPF_FIBER)
+ self._ofp_state = kwargs.pop('ofp_state', OFPPS_LIVE)
+ self._current_speed = kwargs.pop('current_speed', OFPPF_100GB_FD)
+ self._max_speed = kwargs.pop('max_speed', OFPPF_100GB_FD)
+ self._device_port_no = kwargs.pop('device_port_no', self._port_no)
+
+ # Statistics
+ self.rx_dropped = 0
+ self.rx_error_packets = 0
+ self.rx_ucast_packets = 0
+ self.rx_bcast_packets = 0
+ self.rx_mcast_packets = 0
+ self.tx_dropped = 0
+ self.rx_ucast_packets = 0
+ self.tx_bcast_packets = 0
+ self.tx_mcast_packets = 0
+
+ def __str__(self):
+ return "NniPort-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ self.log.debug('get-port-status-update', port=self._port_no,
+ label=self._label)
+ if self._port is None:
+ self._port = Port(port_no=self._port_no,
+ label=self._label,
+ type=Port.ETHERNET_NNI,
+ admin_state=self._admin_state,
+ oper_status=self._oper_status)
+
+ if self._port.admin_state != self._admin_state or\
+ self._port.oper_status != self._oper_status:
+
+ self.log.debug('get-port-status-update', admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ self._port.admin_state = self._admin_state
+ self._port.oper_status = self._oper_status
+
+ return self._port
+
+ @property
+ def iana_type(self):
+ return self._ianatype
+
+ def cancel_deferred(self):
+ super(NniPort, self).cancel_deferred()
+
+ d, self._stats_deferred = self._stats_deferred, None
+ try:
+ if d is not None and d.called:
+ d.cancel()
+ except:
+ pass
+
+ def _update_adapter_agent(self):
+ # adapter_agent add_port also does an update of port status
+ self.log.debug('update-adapter-agent', admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ self.adapter_agent.add_port(self.olt.device_id, self.get_port())
+
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port
+ :return: VOLTHA logical port or None if not supported
+ """
+ def mac_str_to_tuple(mac):
+ """
+ Convert 'xx:xx:xx:xx:xx:xx' MAC address string to a tuple of integers.
+ Example: mac_str_to_tuple('00:01:02:03:04:05') == (0, 1, 2, 3, 4, 5)
+ """
+ return tuple(int(d, 16) for d in mac.split(':'))
+
+ if self._logical_port is None:
+ openflow_port = ofp_port(port_no=self._port_no,
+ hw_addr=mac_str_to_tuple(self._mac_address),
+ name=self._logical_port_name,
+ config=0,
+ state=self._ofp_state,
+ curr=self._ofp_capabilities,
+ advertised=self._ofp_capabilities,
+ peer=self._ofp_capabilities,
+ curr_speed=self._current_speed,
+ max_speed=self._max_speed)
+
+ self._logical_port = LogicalPort(id=self._logical_port_name,
+ ofp_port=openflow_port,
+ device_id=self._parent.device_id,
+ device_port_no=self._device_port_no,
+ root_port=True)
+ return self._logical_port
+
+ @inlineCallbacks
+ def finish_startup(self):
+
+ if self.state != AdtnPort.State.INITIAL:
+ returnValue('Done')
+
+ self.log.debug('final-startup')
+ # TODO: Start status polling of NNI interfaces
+ self.deferred = None # = reactor.callLater(3, self.do_stuff)
+
+ # Begin statistics sync
+ self._stats_deferred = reactor.callLater(self._stats_tick * 2, self._update_statistics)
+
+ try:
+ yield self.set_config('enabled', True)
+
+ super(NniPort, self).finish_startup()
+
+ except Exception as e:
+ self.log.exception('nni-start', e=e)
+ self._oper_status = OperStatus.UNKNOWN
+ self._update_adapter_agent()
+
+ returnValue('Enabled')
+
+ def finish_stop(self):
+
+ # NOTE: Leave all NNI ports active (may have inband management)
+ # TODO: Revisit leaving NNI Ports active on disable
+
+ return self.set_config('enabled', False)
+
+ @inlineCallbacks
+ def reset(self):
+ """
+ Set the NNI Port to a known good state on initial port startup. Actual
+ NNI 'Start' is done elsewhere
+ """
+ # if self.state != AdtnPort.State.INITIAL:
+ # self.log.error('reset-ignored', state=self.state)
+ # returnValue('Ignored')
+
+ self.log.info('resetting', label=self._label)
+
+ # Always enable our NNI ports
+
+ try:
+ results = yield self.set_config('enabled', True)
+ self._admin_state = AdminState.ENABLED
+ self._enabled = True
+ returnValue(results)
+
+ except Exception as e:
+ self.log.exception('reset', e=e)
+ self._admin_state = AdminState.UNKNOWN
+ raise
+
+ @inlineCallbacks
+ def set_config(self, leaf, value):
+ if isinstance(value, bool):
+ value = 'true' if value else 'false'
+
+ config = '<interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._physical_port_name) + \
+ ' {}'.format(self._ianatype) + \
+ ' <{}>{}</{}>'.format(leaf, value, leaf) + \
+ ' </interface>' + \
+ '</interfaces>'
+ try:
+ results = yield self._parent.netconf_client.edit_config(config)
+ returnValue(results)
+
+ except Exception as e:
+ self.log.exception('set', leaf=leaf, value=value, e=e)
+ raise
+
+ def get_nni_config(self):
+ config = '<filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ ' <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._physical_port_name) + \
+ ' <enabled/>' + \
+ ' </interface>' + \
+ ' </interfaces>' + \
+ '</filter>'
+ return self._parent.netconf_client.get(config)
+
+ def get_nni_statistics(self):
+ state = '<filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ ' <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._physical_port_name) + \
+ ' <admin-status/>' + \
+ ' <oper-status/>' + \
+ ' <statistics/>' + \
+ ' </interface>' + \
+ ' </interfaces>' + \
+ '</filter>'
+ return self._parent.netconf_client.get(state)
+
+ def sync_hardware(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
+ def read_config(results):
+ #self.log.debug('read-config', results=results)
+ try:
+ result_dict = xmltodict.parse(results.data_xml)
+ interfaces = result_dict['data']['interfaces']
+ if 'if:interface' in interfaces:
+ entries = interfaces['if:interface']
+ else:
+ entries = interfaces['interface']
+
+ enabled = entries.get('enabled',
+ str(not self.enabled).lower()) == 'true'
+
+ if self.enabled == enabled:
+ return succeed('in-sync')
+
+ self.set_config('enabled', self.enabled)
+ self._oper_status = OperStatus.ACTIVE
+ self._update_adapter_agent()
+
+ except Exception as e:
+ self.log.exception('read-config', e=e)
+ return fail(Failure())
+
+ def failure(reason):
+ self.log.error('hardware-sync-failed', reason=reason)
+
+ def reschedule(_):
+ delay = self.sync_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self.sync_deferred = reactor.callLater(delay, self.sync_hardware)
+
+ self.sync_deferred = self.get_nni_config()
+ self.sync_deferred.addCallbacks(read_config, failure)
+ self.sync_deferred.addBoth(reschedule)
+
+ def _decode_nni_statistics(self, entry):
+ # admin_status = entry.get('admin-status')
+ # oper_status = entry.get('oper-status')
+ # admin_status = entry.get('admin-status')
+ # phys_address = entry.get('phys-address')
+
+ stats = entry.get('statistics')
+ if stats is not None:
+ self.timestamp = arrow.utcnow().float_timestamp
+ self.rx_bytes = int(stats.get('in-octets', 0))
+ self.rx_ucast_packets = int(stats.get('in-unicast-pkts', 0))
+ self.rx_bcast_packets = int(stats.get('in-broadcast-pkts', 0))
+ self.rx_mcast_packets = int(stats.get('in-multicast-pkts', 0))
+ self.rx_error_packets = int(stats.get('in-errors', 0)) + int(stats.get('in-discards', 0))
+
+ self.tx_bytes = int(stats.get('out-octets', 0))
+ self.tx_ucast_packets = int(stats.get('out-unicast-pkts', 0))
+ self.tx_bcast_packets = int(stats.get('out-broadcast-pkts', 0))
+ self.tx_mcasy_packets = int(stats.get('out-multicast-pkts', 0))
+ self.tx_error_packets = int(stats.get('out-errors', 0)) + int(stats.get('out-discards', 0))
+
+ self.rx_packets = self.rx_ucast_packets + self.rx_mcast_packets + self.rx_bcast_packets
+ self.tx_packets = self.tx_ucast_packets + self.tx_mcast_packets + self.tx_bcast_packets
+ # No support for rx_crc_errors or bip_errors
+
+ def _update_statistics(self):
+ if self.state == AdtnPort.State.RUNNING:
+ def read_state(results):
+ # self.log.debug('read-state', results=results)
+ try:
+ result_dict = xmltodict.parse(results.data_xml)
+ entry = result_dict['data']['interfaces-state']['interface']
+ self._decode_nni_statistics(entry)
+ return succeed('done')
+
+ except Exception as e:
+ self.log.exception('read-state', e=e)
+ return fail(Failure())
+
+ def failure(reason):
+ self.log.error('update-stats-failed', reason=reason)
+
+ def reschedule(_):
+ delay = self._stats_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._stats_deferred = reactor.callLater(delay, self._update_statistics)
+
+ try:
+ self._stats_deferred = self.get_nni_statistics()
+ self._stats_deferred.addCallbacks(read_state, failure)
+ self._stats_deferred.addBoth(reschedule)
+
+ except Exception as e:
+ self.log.exception('nni-sync', port=self.name, e=e)
+ self._stats_deferred = reactor.callLater(self._stats_tick, self._update_statistics)
+
+
+class MockNniPort(NniPort):
+ """
+ A class similar to the 'Port' class in the VOLTHA but for a non-existent (virtual OLT)
+
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+
+ def __init__(self, parent, **kwargs):
+ super(MockNniPort, self).__init__(parent, **kwargs)
+
+ def __str__(self):
+ return "NniPort-mock-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+
+ @staticmethod
+ def get_nni_port_state_results():
+ from ncclient.operations.retrieve import GetReply
+ raw = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
+ xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0"
+ message-id="urn:uuid:59e71979-01bb-462f-b17a-b3a45e1889ac">
+ <data>
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface><name>hundred-gigabit-ethernet 0/1</name></interface>
+ </interfaces-state>
+ </data>
+ </rpc-reply>
+ """
+ return GetReply(raw)
+
+ @staticmethod
+ def get_pon_port_state_results():
+ from ncclient.operations.retrieve import GetReply
+ raw = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
+ xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0"
+ message-id="urn:uuid:59e71979-01bb-462f-b17a-b3a45e1889ac">
+ <data>
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface><name>XPON 0/1</name></interface>
+ <interface><name>XPON 0/2</name></interface>
+ <interface><name>XPON 0/3</name></interface>
+ <interface><name>XPON 0/4</name></interface>
+ <interface><name>XPON 0/5</name></interface>
+ <interface><name>XPON 0/6</name></interface>
+ <interface><name>XPON 0/7</name></interface>
+ <interface><name>XPON 0/8</name></interface>
+ <interface><name>XPON 0/9</name></interface>
+ <interface><name>XPON 0/10</name></interface>
+ <interface><name>XPON 0/11</name></interface>
+ <interface><name>XPON 0/12</name></interface>
+ <interface><name>XPON 0/13</name></interface>
+ <interface><name>XPON 0/14</name></interface>
+ <interface><name>XPON 0/15</name></interface>
+ <interface><name>XPON 0/16</name></interface>
+ </interfaces-state>
+ </data>
+ </rpc-reply>
+ """
+ return GetReply(raw)
+
+ def reset(self):
+ """
+ Set the NNI Port to a known good state on initial port startup. Actual
+ NNI 'Start' is done elsewhere
+ """
+ if self.state != AdtnPort.State.INITIAL:
+ self.log.error('reset-ignored', state=self.state)
+ return fail()
+
+ self.log.info('resetting', label=self._label)
+
+ # Always enable our NNI ports
+
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ return succeed('Enabled')
+
+ def set_config(self, leaf, value):
+
+ if leaf == 'enabled':
+ self._enabled = value
+ else:
+ raise NotImplemented("Leaf '{}' is not supported".format(leaf))
+
+ return succeed('Success')
diff --git a/adapters/adtran_olt/onu.py b/adapters/adtran_olt/onu.py
new file mode 100644
index 0000000..648d33e
--- /dev/null
+++ b/adapters/adtran_olt/onu.py
@@ -0,0 +1,701 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import base64
+import binascii
+import json
+import structlog
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+from pyvoltha.common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
+from pyvoltha.protos.device_pb2 import Device
+
+from adtran_olt_handler import AdtranOltHandler
+from adapters.adtran_common.net.adtran_rest import RestInvalidResponseCode
+
+_MAX_EXPEDITE_COUNT = 5
+_EXPEDITE_SECS = 2
+_HW_SYNC_SECS = 60
+
+
+class Onu(object):
+ """
+ Wraps an ONU
+ """
+ DEFAULT_PASSWORD = ''
+
+ def __init__(self, onu_info):
+ self._onu_id = onu_info['onu-id']
+ if self._onu_id is None:
+ raise ValueError('No ONU ID available')
+
+ pon = onu_info['pon']
+ self._olt = pon.olt
+ self._pon_id = pon.pon_id
+ self._name = '{}@{}'.format(pon.physical_port_name, self._onu_id)
+ self.log = structlog.get_logger(pon_id=self._pon_id, onu_id=self._onu_id)
+
+ self._valid = True # Set false during delete/cleanup
+ self._serial_number_base64 = Onu.string_to_serial_number(onu_info['serial-number'])
+ self._serial_number_string = onu_info['serial-number']
+ self._device_id = onu_info['device-id']
+ self._password = onu_info['password']
+ self._created = False
+ self._proxy_address = Device.ProxyAddress(device_id=self.olt.device_id,
+ channel_id=self.olt.pon_id_to_port_number(self._pon_id),
+ onu_id=self._onu_id,
+ onu_session_id=self._onu_id)
+ self._sync_tick = _HW_SYNC_SECS
+ self._expedite_sync = False
+ self._expedite_count = 0
+ self._resync_flows = False
+ self._sync_deferred = None # For sync of ONT config to hardware
+
+ self._gem_ports = {} # gem-id -> GemPort
+ self._tconts = {} # alloc-id -> TCont
+ self._uni_ports = onu_info['uni-ports']
+
+ # Provisionable items
+ self._enabled = onu_info['enabled']
+ self._upstream_fec_enable = onu_info.get('upstream-fec')
+
+ # KPI related items
+ self._rssi = -9999
+ self._equalization_delay = 0
+ self._fiber_length = 0
+ self._timestamp = None # Last time the KPI items were updated
+
+ def __str__(self):
+ return "ONU-{}:{}, SN: {}/{}".format(self._pon_id, self._onu_id,
+ self._serial_number_string, self._serial_number_base64)
+
+ @staticmethod
+ def serial_number_to_string(value):
+ sval = base64.decodestring(value)
+ unique = [elem.encode("hex") for elem in sval[4:8]]
+ return '{}{}{}{}{}'.format(sval[:4], unique[0], unique[1], unique[2], unique[3]).upper()
+
+ @staticmethod
+ def string_to_serial_number(value):
+ bvendor = [octet for octet in value[:4]]
+ bunique = [binascii.a2b_hex(value[offset:offset + 2]) for offset in xrange(4, 12, 2)]
+ bvalue = ''.join(bvendor + bunique)
+ return base64.b64encode(bvalue)
+
+ @property
+ def olt(self):
+ return self._olt
+
+ @property
+ def pon(self):
+ return self.olt.southbound_ports[self._pon_id]
+
+ @property
+ def intf_id(self):
+ return self.pon.intf_id
+
+ @property
+ def pon_id(self):
+ return self._pon_id
+
+ @property
+ def onu_id(self):
+ return self._onu_id
+
+ @property
+ def device_id(self):
+ return self._device_id
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def upstream_fec_enable(self):
+ return self._upstream_fec_enable
+
+ @upstream_fec_enable.setter
+ def upstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'upstream FEC enabled is a boolean'
+ if self._upstream_fec_enable != value:
+ self._upstream_fec_enable = value
+
+ # Recalculate PON upstream FEC
+ self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
+
+ @property
+ def password(self):
+ """
+ Get password. Base 64 format
+ """
+ return self._password
+
+ @password.setter
+ def password(self, value):
+ """
+ Set the password
+ :param value: (str) base 64 encoded value
+ """
+ if self._password is None and value is not None:
+ self._password = value
+ reg_id = (value.decode('base64')).rstrip('\00').lstrip('\00')
+ # Must remove any non-printable characters
+ reg_id = ''.join([i if 127 > ord(i) > 31 else '_' for i in reg_id])
+ # Generate alarm here for regID
+ from voltha.extensions.alarms.onu.onu_active_alarm import OnuActiveAlarm
+ self.log.info('onu-Active-Alarm', serial_number=self._serial_number_string)
+ device = self._olt.adapter_agent.get_device(self._olt.device_id)
+
+ OnuActiveAlarm(self._olt.alarms, self._olt.device_id, self._pon_id,
+ self._serial_number_string, reg_id, device.serial_number,
+ ipv4_address=device.ipv4_address).raise_alarm()
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ if self._enabled != value:
+ self._enabled = value
+ self._resync_flows = True
+
+ self.set_config('enable', self._enabled)
+
+ if self._enabled:
+ self.start()
+ else:
+ self.stop()
+
+ # Recalculate PON upstream FEC
+ self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
+
+ @property
+ def uni_ports(self):
+ return self._uni_ports
+
+ @property
+ def logical_port(self):
+ """Return the logical PORT number of this ONU's UNI"""
+ # TODO: once we support multiple UNIs, this needs to be revisited
+ return self._uni_ports[0]
+
+ @property
+ def gem_ports(self):
+ return self._gem_ports.values()
+
+ @property
+ def proxy_address(self):
+ return self._proxy_address
+
+ @property
+ def serial_number_64(self):
+ return self._serial_number_base64
+
+ @property
+ def serial_number(self):
+ return self._serial_number_string
+
+ @property
+ def timestamp(self):
+ # Last time the KPI items were updated
+ return self._timestamp
+
+ @timestamp.setter
+ def timestamp(self, value):
+ self._timestamp = value
+
+ @property
+ def rssi(self):
+ """The received signal strength indication of the ONU"""
+ return self._rssi
+
+ @rssi.setter
+ def rssi(self, value):
+ if self._rssi != value:
+ self._rssi = value
+ # TODO: Notify anyone?
+
+ @property
+ def equalization_delay(self):
+ """Equalization delay (bits)"""
+ return self._equalization_delay
+
+ @equalization_delay.setter
+ def equalization_delay(self, value):
+ if self._equalization_delay != value:
+ self._equalization_delay = value
+ # TODO: Notify anyone?
+
+ @property
+ def fiber_length(self):
+ """Distance to ONU"""
+ return self._fiber_length
+
+ @fiber_length.setter
+ def fiber_length(self, value):
+ if self._fiber_length != value:
+ self._fiber_length = value
+ # TODO: Notify anyone?
+
+ def _cancel_deferred(self):
+ d, self._sync_deferred = self._sync_deferred, None
+
+ if d is not None and not d.called:
+ try:
+ d.cancel()
+ except Exception:
+ pass
+
+ @inlineCallbacks
+ def create(self, reflow=False):
+ """
+ Create (or reflow) this ONU to hardware
+ :param reflow: (boolean) Flag, if True, indicating if this is a reflow ONU
+ information after an unmanaged OLT hardware reboot
+ """
+ self.log.debug('create', reflow=reflow)
+ self._cancel_deferred()
+
+ data = json.dumps({'onu-id': self._onu_id,
+ 'serial-number': self._serial_number_base64,
+ 'enable': self._enabled})
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
+ name = 'onu-create-{}-{}-{}: {}'.format(self._pon_id, self._onu_id,
+ self._serial_number_base64, self._enabled)
+
+ first_sync = self._sync_tick if self._created else 5
+
+ if not self._created or reflow:
+ try:
+ yield self.olt.rest_client.request('POST', uri, data=data, name=name)
+ self._created = True
+
+ except Exception as e:
+ self.log.exception('onu-create', e=e)
+ # See if it failed due to already being configured
+ url = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
+ url += '/serial-number'
+
+ try:
+ results = yield self.olt.rest_client.request('GET', uri, name=name)
+ self.log.debug('onu-create-check', results=results)
+ if len(results) == 1 and results[0].get('serial-number', '') != self._serial_number_base64:
+ self._created = True
+
+ except Exception as _e:
+ self.log.warn('onu-exists-check', pon_id=self.pon_id, onu_id=self.onu_id,
+ serial_number=self.serial_number)
+
+ self._sync_deferred = reactor.callLater(first_sync, self._sync_hardware)
+
+ # Recalculate PON upstream FEC
+ self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
+ returnValue('created')
+
+ @inlineCallbacks
+ def delete(self):
+ """
+ Clean up ONU (gems/tconts). ONU removal from OLT h/w done by PonPort
+ :return: (deferred)
+ """
+ self._valid = False
+ self._cancel_deferred()
+
+ # Remove from H/W
+ gem_ids = self._gem_ports.keys()
+ alloc_ids = self._tconts.keys()
+
+ dl = []
+ for gem_id in gem_ids:
+ dl.append(self.remove_gem_id(gem_id))
+
+ try:
+ yield defer.gatherResults(dl, consumeErrors=True)
+ except Exception as _e:
+ pass
+
+ dl = []
+ for alloc_id in alloc_ids:
+ dl.append(self.remove_tcont(alloc_id))
+
+ try:
+ yield defer.gatherResults(dl, consumeErrors=True)
+ except Exception as _e:
+ pass
+
+ self._gem_ports.clear()
+ self._tconts.clear()
+ olt, self._olt = self._olt, None
+
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
+ name = 'onu-delete-{}-{}-{}: {}'.format(self._pon_id, self._onu_id,
+ self._serial_number_base64, self._enabled)
+ try:
+ yield olt.rest_client.request('DELETE', uri, name=name)
+
+ except RestInvalidResponseCode as e:
+ if e.code != 404:
+ self.log.exception('onu-delete', e=e)
+
+ except Exception as e:
+ self.log.exception('onu-delete', e=e)
+
+ # Release resource manager resources for this ONU
+ pon_intf_id_onu_id = (self.pon_id, self.onu_id)
+ olt.resource_mgr.free_pon_resources_for_onu(pon_intf_id_onu_id)
+
+ returnValue('deleted')
+
+ def start(self):
+ self._cancel_deferred()
+ self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
+ def stop(self):
+ self._cancel_deferred()
+
+ def restart(self):
+ if not self._valid:
+ return succeed('Deleting')
+
+ self._cancel_deferred()
+ self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
+ return self.create()
+
+ def _sync_hardware(self):
+ from codec.olt_config import OltConfig
+ self.log.debug('sync-hardware')
+
+ def read_config(results):
+ self.log.debug('read-config', results=results)
+
+ dl = []
+
+ try:
+ config = OltConfig.Pon.Onu.decode([results])
+ assert self.onu_id in config, 'sync-onu-not-found-{}'.format(self.onu_id)
+ config = config[self.onu_id]
+
+ if self._enabled != config.enable:
+ dl.append(self.set_config('enable', self._enabled))
+
+ if self.serial_number_64 != config.serial_number_64:
+ dl.append(self.set_config('serial-number', self.serial_number_64))
+
+ if self._enabled:
+ # Sync TCONTs if everything else in sync
+ if len(dl) == 0:
+ dl.extend(sync_tconts(config.tconts))
+
+ # Sync GEM Ports if everything else in sync
+
+ if len(dl) == 0:
+ dl.extend(sync_gem_ports(config.gem_ports))
+
+ if len(dl) == 0:
+ sync_flows()
+
+ except Exception as e:
+ self.log.exception('hw-sync-read-config', e=e)
+
+ # Run h/w sync again a bit faster if we had to sync anything
+ self._expedite_sync = len(dl) > 0
+
+ # TODO: do checks
+ return defer.gatherResults(dl, consumeErrors=True)
+
+ def sync_tconts(hw_tconts):
+ hw_alloc_ids = frozenset(hw_tconts.iterkeys())
+ my_alloc_ids = frozenset(self._tconts.iterkeys())
+ dl = []
+
+ try:
+ extra_alloc_ids = hw_alloc_ids - my_alloc_ids
+ dl.extend(sync_delete_extra_tconts(extra_alloc_ids))
+
+ missing_alloc_ids = my_alloc_ids - hw_alloc_ids
+ dl.extend(sync_add_missing_tconts(missing_alloc_ids))
+
+ matching_alloc_ids = my_alloc_ids & hw_alloc_ids
+ matching_hw_tconts = {alloc_id: tcont
+ for alloc_id, tcont in hw_tconts.iteritems()
+ if alloc_id in matching_alloc_ids}
+ dl.extend(sync_matching_tconts(matching_hw_tconts))
+
+ except Exception as e2:
+ self.log.exception('hw-sync-tconts', e=e2)
+
+ return dl
+
+ def sync_delete_extra_tconts(alloc_ids):
+ return [self.remove_tcont(alloc_id) for alloc_id in alloc_ids]
+
+ def sync_add_missing_tconts(alloc_ids):
+ return [self.add_tcont(self._tconts[alloc_id], reflow=True) for alloc_id in alloc_ids]
+
+ def sync_matching_tconts(hw_tconts):
+ from xpon.traffic_descriptor import TrafficDescriptor
+
+ dl = []
+ # TODO: sync TD & Best Effort. Only other TCONT leaf is the key
+ for alloc_id, hw_tcont in hw_tconts.iteritems():
+ my_tcont = self._tconts[alloc_id]
+ my_td = my_tcont.traffic_descriptor
+ hw_td = hw_tcont.traffic_descriptor
+ if my_td is None:
+ continue
+
+ my_additional = TrafficDescriptor.AdditionalBwEligibility.\
+ to_string(my_td.additional_bandwidth_eligibility)
+
+ reflow = hw_td is None or \
+ my_td.fixed_bandwidth != hw_td.fixed_bandwidth or \
+ my_td.assured_bandwidth != hw_td.assured_bandwidth or \
+ my_td.maximum_bandwidth != hw_td.maximum_bandwidth or \
+ my_additional != hw_td.additional_bandwidth_eligibility
+
+ if not reflow and \
+ my_td.additional_bandwidth_eligibility == \
+ TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING and \
+ my_td.best_effort is not None:
+
+ hw_be = hw_td.best_effort
+ my_be = my_td.best_effort
+
+ reflow = hw_be is None or \
+ my_be.bandwidth != hw_be.bandwidth or \
+ my_be.priority != hw_be.priority or \
+ my_be.weight != hw_be.weight
+
+ if reflow:
+ dl.append(my_tcont.add_to_hardware(self.olt.rest_client))
+ return dl
+
+ def sync_gem_ports(hw_gem_ports):
+ hw_gems_ids = frozenset(hw_gem_ports.iterkeys())
+ my_gems_ids = frozenset(self._gem_ports.iterkeys())
+ dl = []
+
+ try:
+ extra_gems_ids = hw_gems_ids - my_gems_ids
+ dl.extend(sync_delete_extra_gem_ports(extra_gems_ids))
+
+ missing_gem_ids = my_gems_ids - hw_gems_ids
+ dl.extend(sync_add_missing_gem_ports(missing_gem_ids))
+
+ matching_gem_ids = my_gems_ids & hw_gems_ids
+ matching_hw_gem_ports = {gem_id: gem_port
+ for gem_id, gem_port in hw_gem_ports.iteritems()
+ if gem_id in matching_gem_ids}
+
+ dl.extend(sync_matching_gem_ports(matching_hw_gem_ports))
+ self._resync_flows |= len(dl) > 0
+
+ except Exception as ex:
+ self.log.exception('hw-sync-gem-ports', e=ex)
+
+ return dl
+
+ def sync_delete_extra_gem_ports(gem_ids):
+ return [self.remove_gem_id(gem_id) for gem_id in gem_ids]
+
+ def sync_add_missing_gem_ports(gem_ids):
+ return [self.add_gem_port(self._gem_ports[gem_id], reflow=True)
+ for gem_id in gem_ids]
+
+ def sync_matching_gem_ports(hw_gem_ports):
+ dl = []
+ for gem_id, hw_gem_port in hw_gem_ports.iteritems():
+ gem_port = self._gem_ports[gem_id]
+
+ if gem_port.alloc_id != hw_gem_port.alloc_id or\
+ gem_port.encryption != hw_gem_port.encryption or\
+ gem_port.omci_transport != hw_gem_port.omci_transport:
+ dl.append(gem_port.add_to_hardware(self.olt.rest_client,
+ operation='PATCH'))
+ return dl
+
+ def sync_flows():
+ from flow.flow_entry import FlowEntry
+
+ reflow, self._resync_flows = self._resync_flows, False
+ return FlowEntry.sync_flows_by_onu(self, reflow=reflow)
+
+ def failure(_reason):
+ # self.log.error('hardware-sync-get-config-failed', reason=_reason)
+ pass
+
+ def reschedule(_):
+ import random
+ delay = self._sync_tick if self._enabled else 5 * self._sync_tick
+
+ # Speed up sequential resync a limited number of times if out of sync
+ # With 60 second initial an typical worst case resync of 4 times, this
+ # should resync an ONU and all it's gem-ports and tconts within <90 seconds
+ if self._expedite_sync and self._enabled:
+ self._expedite_count += 1
+ if self._expedite_count < _MAX_EXPEDITE_COUNT:
+ delay = _EXPEDITE_SECS
+ else:
+ self._expedite_count = 0
+
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
+ self._expedite_sync = False
+
+ # If PON is not enabled, skip hw-sync. If ONU not enabled, do it but less
+ # frequently
+ if not self.pon.enabled:
+ return reschedule('not-enabled')
+
+ try:
+ self._sync_deferred = self._get_config()
+ self._sync_deferred.addCallbacks(read_config, failure)
+ self._sync_deferred.addBoth(reschedule)
+
+ except Exception as e:
+ self.log.exception('hw-sync-main', e=e)
+ return reschedule('sync-exception')
+
+ def _get_config(self):
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self.onu_id)
+ name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, self.onu_id)
+ return self.olt.rest_client.request('GET', uri, name=name)
+
+ def set_config(self, leaf, value):
+ self.log.debug('set-config', leaf=leaf, value=value)
+ data = json.dumps({leaf: value})
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
+ name = 'onu-set-config-{}-{}-{}: {}'.format(self._pon_id, self._onu_id, leaf, value)
+ return self.olt.rest_client.request('PATCH', uri, data=data, name=name)
+
+ @property
+ def alloc_ids(self):
+ """
+ Get alloc-id's of all T-CONTs
+ """
+ return frozenset(self._tconts.keys())
+
+ @inlineCallbacks
+ def add_tcont(self, tcont, reflow=False):
+ """
+ Creates/ a T-CONT with the given alloc-id
+
+ :param tcont: (TCont) Object that maintains the TCONT properties
+ :param reflow: (boolean) If true, force add (used during h/w resync)
+ :return: (deferred)
+ """
+ if not self._valid:
+ returnValue('Deleting')
+
+ if not reflow and tcont.alloc_id in self._tconts:
+ returnValue('already created')
+
+ self.log.info('add', tcont=tcont, reflow=reflow)
+ self._tconts[tcont.alloc_id] = tcont
+
+ try:
+ results = yield tcont.add_to_hardware(self.olt.rest_client)
+
+ except Exception as e:
+ self.log.exception('tcont', tcont=tcont, reflow=reflow, e=e)
+ results = 'resync needed'
+
+ returnValue(results)
+
+ @inlineCallbacks
+ def remove_tcont(self, alloc_id):
+ tcont = self._tconts.get(alloc_id)
+
+ if tcont is None:
+ returnValue('nop')
+
+ del self._tconts[alloc_id]
+ try:
+ results = yield tcont.remove_from_hardware(self.olt.rest_client)
+
+ except RestInvalidResponseCode as e:
+ results = None
+ if e.code != 404:
+ self.log.exception('tcont-delete', e=e)
+
+ except Exception as e:
+ self.log.exception('delete', e=e)
+ raise
+
+ returnValue(results)
+
+ def gem_port(self, gem_id):
+ return self._gem_ports.get(gem_id)
+
+ def gem_ids(self, tech_profile_id):
+ """Get all GEM Port IDs used by this ONU"""
+ assert tech_profile_id >= DEFAULT_TECH_PROFILE_TABLE_ID
+ return sorted([gem_id for gem_id, gem in self._gem_ports.items()
+ if not gem.multicast and
+ tech_profile_id == gem.tech_profile_id])
+
+ @inlineCallbacks
+ def add_gem_port(self, gem_port, reflow=False):
+ """
+ Add a GEM Port to this ONU
+
+ :param gem_port: (GemPort) GEM Port to add
+ :param reflow: (boolean) If true, force add (used during h/w resync)
+ :return: (deferred)
+ """
+ if not self._valid:
+ returnValue('Deleting')
+
+ if not reflow and gem_port.gem_id in self._gem_ports:
+ returnValue('nop')
+
+ self.log.info('add', gem_port=gem_port, reflow=reflow)
+ self._gem_ports[gem_port.gem_id] = gem_port
+
+ try:
+ results = yield gem_port.add_to_hardware(self.olt.rest_client)
+
+ except Exception as e:
+ self.log.exception('gem-port', gem_port=gem_port, reflow=reflow, e=e)
+ results = 'resync needed'
+
+ returnValue(results)
+
+ @inlineCallbacks
+ def remove_gem_id(self, gem_id):
+ gem_port = self._gem_ports.get(gem_id)
+
+ if gem_port is None:
+ returnValue('nop')
+
+ del self._gem_ports[gem_id]
+ try:
+ yield gem_port.remove_from_hardware(self.olt.rest_client)
+
+ except RestInvalidResponseCode as e:
+ if e.code != 404:
+ self.log.exception('onu-delete', e=e)
+
+ except Exception as ex:
+ self.log.exception('gem-port-delete', e=ex)
+ raise
+
+ returnValue('done')
+
+ @staticmethod
+ def gem_id_to_gvid(gem_id):
+ """Calculate GEM VID (gvid) for a given GEM port id"""
+ return gem_id - 2048
diff --git a/adapters/adtran_olt/pon_port.py b/adapters/adtran_olt/pon_port.py
new file mode 100644
index 0000000..70ec564
--- /dev/null
+++ b/adapters/adtran_olt/pon_port.py
@@ -0,0 +1,886 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import random
+import arrow
+
+import structlog
+from adapters.adtran_common.port import AdtnPort
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue
+from adtran_olt_handler import AdtranOltHandler
+from adapters.adtran_common.net.adtran_rest import RestInvalidResponseCode
+from codec.olt_config import OltConfig
+from onu import Onu
+from pyvoltha.adapters.extensions.alarms.onu.onu_los_alarm import OnuLosAlarm
+from pyvoltha.adapters.extensions.alarms.onu.onu_discovery_alarm import OnuDiscoveryAlarm
+from pyvoltha.protos.common_pb2 import AdminState
+from pyvoltha.protos.device_pb2 import Port
+import resources.adtranolt_platform as platform
+
+
+class PonPort(AdtnPort):
+ """
+ GPON Port
+ """
+ MAX_ONUS_SUPPORTED = 128
+ MAX_DEPLOYMENT_RANGE = 25000 # Meters (OLT-PB maximum)
+
+ _MCAST_ONU_ID = 253
+ _MCAST_ALLOC_BASE = 0x500
+
+ # AutoActivate should be used if xPON configuration is not supported
+ _SUPPORTED_ACTIVATION_METHODS = ['autodiscovery', 'autoactivate']
+ _SUPPORTED_AUTHENTICATION_METHODS = ['serial-number']
+
+ def __init__(self, parent, **kwargs):
+ super(PonPort, self).__init__(parent, **kwargs)
+ assert 'pon-id' in kwargs, 'PON ID not found'
+
+ self._parent = parent
+ self._pon_id = kwargs['pon-id']
+ self.log = structlog.get_logger(device_id=parent.device_id, pon_id=self._pon_id)
+ self._port_no = kwargs['port_no']
+ self._physical_port_name = 'xpon 0/{}'.format(self._pon_id+1)
+ self._label = 'pon-{}'.format(self._pon_id)
+
+ self._in_sync = False
+ self._expedite_sync = False
+ self._expedite_count = 0
+
+ self._discovery_tick = 20.0
+ self._no_onu_discover_tick = self._discovery_tick / 2
+ self._discovered_onus = [] # List of serial numbers
+ self._discovery_deferred = None # Specifically for ONU discovery
+
+ self._onus = {} # serial_number-base64 -> ONU
+ self._onu_by_id = {} # onu-id -> ONU
+ self._mcast_gem_ports = {} # VLAN -> GemPort
+
+ self._active_los_alarms = set() # ONU-ID
+
+ # xPON configuration
+ self._activation_method = 'autoactivate'
+
+ self._downstream_fec_enable = True
+ self._upstream_fec_enable = True
+ self._deployment_range = 25000
+ self._authentication_method = 'serial-number'
+ self._mcast_aes = False
+
+ # Statistics
+ self.tx_bip_errors = 0
+
+ def __str__(self):
+ return "PonPort-{}: Admin: {}, Oper: {}, OLT: {}".format(self._label,
+ self._admin_state,
+ self._oper_status,
+ self.olt)
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ if self._port is None:
+ self._port = Port(port_no=self._port_no,
+ label=self._label,
+ type=Port.PON_OLT,
+ admin_state=self._admin_state,
+ oper_status=self._oper_status)
+
+ return self._port
+
+ @property
+ def pon_id(self):
+ return self._pon_id
+
+ @property
+ def onus(self):
+ """
+ Get a set of all ONUs. While the set is immutable, do not use this method
+ to get a collection that you will iterate through that my yield the CPU
+ such as inline callback. ONUs may be deleted at any time and they will
+ set some references to other objects to NULL during the 'delete' call.
+ Instead, get a list of ONU-IDs and iterate on these and call the 'onu'
+ method below (which will return 'None' if the ONU has been deleted.
+
+ :return: (frozenset) collection of ONU objects on this PON
+ """
+ return frozenset(self._onus.values())
+
+ @property
+ def onu_ids(self):
+ return frozenset(self._onu_by_id.keys())
+
+ def onu(self, onu_id):
+ return self._onu_by_id.get(onu_id)
+
+ @property
+ def in_service_onus(self):
+ return len({onu.onu_id for onu in self.onus
+ if onu.onu_id not in self._active_los_alarms})
+
+ @property
+ def closest_onu_distance(self):
+ distance = -1
+ for onu in self.onus:
+ if onu.fiber_length < distance or distance == -1:
+ distance = onu.fiber_length
+ return distance
+
+ @property
+ def downstream_fec_enable(self):
+ return self._downstream_fec_enable
+
+ @downstream_fec_enable.setter
+ def downstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'downstream FEC enabled is a boolean'
+
+ if self._downstream_fec_enable != value:
+ self._downstream_fec_enable = value
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("downstream-fec-enable", value)
+
+ @property
+ def upstream_fec_enable(self):
+ return self._upstream_fec_enable
+
+ @upstream_fec_enable.setter
+ def upstream_fec_enable(self, value):
+ assert isinstance(value, bool), 'upstream FEC enabled is a boolean'
+ if self._upstream_fec_enable != value:
+ self._upstream_fec_enable = value
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("upstream-fec-enable", value)
+
+ @property
+ def any_upstream_fec_enabled(self):
+ for onu in self.onus:
+ if onu.upstream_fec_enable and onu.enabled:
+ return True
+ return False
+
+ @property
+ def mcast_aes(self):
+ return self._mcast_aes
+
+ @mcast_aes.setter
+ def mcast_aes(self, value):
+ assert isinstance(value, bool), 'MCAST AES is a boolean'
+ if self._mcast_aes != value:
+ self._mcast_aes = value
+ if self.state == AdtnPort.State.RUNNING:
+ pass # TODO
+
+ @property
+ def deployment_range(self):
+ """Maximum deployment range (in meters)"""
+ return self._deployment_range
+
+ @deployment_range.setter
+ def deployment_range(self, value):
+ """Maximum deployment range (in meters)"""
+ if not 0 <= value <= PonPort.MAX_DEPLOYMENT_RANGE:
+ raise ValueError('Deployment range should be 0..{} meters'.
+ format(PonPort.MAX_DEPLOYMENT_RANGE))
+ if self._deployment_range != value:
+ self._deployment_range = value
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("deployment-range", value)
+
+ @property
+ def discovery_tick(self):
+ return self._discovery_tick * 10
+
+ @discovery_tick.setter
+ def discovery_tick(self, value):
+ if value < 0:
+ raise ValueError("Polling interval must be >= 0")
+
+ if self.discovery_tick != value:
+ self._discovery_tick = value / 10
+
+ try:
+ if self._discovery_deferred is not None and \
+ not self._discovery_deferred.called:
+ self._discovery_deferred.cancel()
+ except:
+ pass
+ self._discovery_deferred = None
+
+ if self._discovery_tick > 0:
+ self._discovery_deferred = reactor.callLater(self._discovery_tick,
+ self._discover_onus)
+
+ @property
+ def activation_method(self):
+ return self._activation_method
+
+ @activation_method.setter
+ def activation_method(self, value):
+ value = value.lower()
+ if value not in PonPort._SUPPORTED_ACTIVATION_METHODS:
+ raise ValueError('Invalid ONU activation method')
+
+ self._activation_method = value
+
+ @property
+ def authentication_method(self):
+ return self._authentication_method
+
+ @authentication_method.setter
+ def authentication_method(self, value):
+ value = value.lower()
+ if value not in PonPort._SUPPORTED_AUTHENTICATION_METHODS:
+ raise ValueError('Invalid ONU authentication method')
+ self._authentication_method = value
+
+ def cancel_deferred(self):
+ super(PonPort, self).cancel_deferred()
+
+ d, self._discovery_deferred = self._discovery_deferred, None
+
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception as e:
+ pass
+
+ def _update_adapter_agent(self):
+ """
+ Update the port status and state in the core
+ """
+ self.log.debug('update-adapter-agent', admin_state=self._admin_state,
+ oper_status=self._oper_status)
+
+ # because the core does not provide methods for updating admin
+ # and oper status per port, we need to copy any existing port
+ # info so that we don't wipe out the peers
+ if self._port is not None:
+ agent_ports = self.adapter_agent.get_ports(self.olt.device_id, Port.PON_OLT)
+
+ agent_port = next((ap for ap in agent_ports if ap.port_no == self._port_no), None)
+
+ # copy current Port info
+ if agent_port is not None:
+ self._port = agent_port
+
+ # set new states
+ self._port.admin_state = self._admin_state
+ self._port.oper_status = self._oper_status
+
+ # adapter_agent add_port also does an update of existing port
+ self.adapter_agent.add_port(self.olt.device_id, self.get_port())
+
+ @inlineCallbacks
+ def finish_startup(self):
+ """
+ Do all startup offline since REST may fail
+ """
+ if self.state != AdtnPort.State.INITIAL:
+ returnValue('Done')
+
+ self.log.debug('final-startup')
+ results = None
+
+ try:
+ self.deferred = self._get_pon_config()
+ results = yield self.deferred
+
+ except Exception as e:
+ self.log.exception('initial-GET', e=e)
+ self.deferred = reactor.callLater(5, self.finish_startup)
+ returnValue(self.deferred)
+
+ # Load config from hardware
+
+ enabled = results.get('enabled', False)
+ downstream_fec_enable = results.get('downstream-fec-enable', False)
+ upstream_fec_enable = results.get('upstream-fec-enable', False)
+ deployment_range = results.get('deployment-range', 25000)
+ self._in_sync = True
+
+ if enabled != self._enabled:
+ try:
+ self.deferred = self._set_pon_config("enabled", True)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.exception('final-startup-enable', e=e)
+ self.deferred = reactor.callLater(3, self.finish_startup)
+ returnValue(self.deferred)
+
+ if downstream_fec_enable != self._downstream_fec_enable:
+ try:
+ self.deferred = self._set_pon_config("downstream-fec-enable",
+ self._downstream_fec_enable)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.warning('final-startup-downstream-FEC', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
+
+ if upstream_fec_enable != self._upstream_fec_enable:
+ try:
+ self.deferred = self._set_pon_config("upstream-fec-enable",
+ self._upstream_fec_enable)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.warning('final-startup-upstream-FEC', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
+
+ if deployment_range != self._deployment_range:
+ try:
+ self.deferred = self._set_pon_config("deployment-range",
+ self._deployment_range)
+ yield self.deferred
+
+ except Exception as e:
+ self.log.warning('final-startup-deployment-range', e=e)
+ self._in_sync = False
+ # Non-fatal. May have failed due to no SFQ in slot
+
+ if len(self._onus) > 0:
+ dl = []
+ for onu_id in self.onu_ids:
+ onu = self.onu(onu_id)
+ if onu is not None:
+ dl.append(onu.restart())
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ # Begin to ONU discovery and hardware sync
+
+ self._discovery_deferred = reactor.callLater(5, self._discover_onus)
+
+ # If here, initial settings were successfully written to hardware
+
+ super(PonPort, self).finish_startup()
+ returnValue('Enabled')
+
+ @inlineCallbacks
+ def finish_stop(self):
+ # Remove all existing ONUs. They will need to be re-discovered
+ dl = []
+ onu_ids = frozenset(self._onu_by_id.keys())
+ for onu_id in onu_ids:
+ try:
+ dl.append(self.delete_onu(onu_id))
+
+ except Exception as e:
+ self.log.exception('onu-cleanup', onu_id=onu_id, e=e)
+
+ dl.append(self._set_pon_config("enabled", False))
+ results = yield defer.gatherResults(dl, consumeErrors=True)
+ returnValue(results)
+
+ @inlineCallbacks
+ def reset(self):
+ """
+ Set the PON Port to a known good state on initial port startup. Actual
+ PON 'Start' is done elsewhere
+ """
+ initial_port_state = AdminState.ENABLED
+ self.log.info('reset', initial_state=initial_port_state)
+
+ try:
+ self.deferred = self._get_pon_config()
+ results = yield self.deferred
+ enabled = results.get('enabled', False)
+
+ except Exception as e:
+ self.log.exception('get-config', e=e)
+ enabled = False
+
+ enable = initial_port_state == AdminState.ENABLED
+
+ if enable != enabled:
+ try:
+ self.deferred = yield self._set_pon_config("enabled", enable)
+ except Exception as e:
+ self.log.exception('reset-enabled', e=e, enabled=enabled)
+
+ # TODO: Move to 'set_pon_config' method and also make sure GRPC/Port is ok
+ self._admin_state = AdminState.ENABLED if enable else AdminState.DISABLED
+
+ try:
+ # Walk the provisioned ONU list and disable any existing ONUs
+ results = yield self._get_onu_config()
+
+ if isinstance(results, list) and len(results) > 0:
+ onu_configs = OltConfig.Pon.Onu.decode(results)
+ dl = []
+ for onu_id in onu_configs.iterkeys():
+ dl.append(self.delete_onu(onu_id))
+
+ try:
+ if len(dl) > 0:
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ self.log.exception('rest-ONU-delete', e=e)
+ pass # Non-fatal
+
+ except Exception as e:
+ self.log.exception('onu-delete', e=e)
+
+ returnValue('Reset complete')
+
+ def gem_ids(self, logical_port, flow_vlan, multicast_gems=False):
+ """
+ Get all GEM Port IDs used on a given PON
+
+ :param logical_port: (int) Logical port number of ONU. None if for all ONUs
+ on PON, if Multicast, VID for Multicast, or None for all
+ Multicast GEMPorts
+ :param flow_vlan: (int) If not None, this is the ingress tag (c-tag)
+ :param multicast_gems: (boolean) Select from available Multicast GEM Ports
+ :return: (dict) data_gem -> key -> onu-id, value -> tuple(sorted list of GEM Port IDs, onu_vid)
+ mcast_gem-> key -> mcast-vid, value -> GEM Port IDs
+ """
+ gem_ids = {}
+
+ if multicast_gems:
+ # Multicast GEMs belong to the PON, but we may need to register them on
+ # all ONUs. TODO: Rework when BBF MCAST is addressed in VOLTHA v2.O+
+ for vlan, gem_port in self._mcast_gem_ports.iteritems():
+ if logical_port is None or (logical_port == vlan and logical_port in self.olt.multicast_vlans):
+ gem_ids[vlan] = ([gem_port.gem_id], None)
+ else:
+ raise NotImplemented('TODO: This is deprecated')
+ # for onu_id, onu in self._onu_by_id.iteritems():
+ # if logical_port is None or logical_port == onu.logical_port:
+ # gem_ids[onu_id] = (onu.gem_ids(), flow_vlan)
+ return gem_ids
+
+ def _get_pon_config(self):
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
+ name = 'pon-get-config-{}'.format(self._pon_id)
+ return self._parent.rest_client.request('GET', uri, name=name)
+
+ def _get_onu_config(self, onu_id=None):
+ if onu_id is None:
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_LIST_URI.format(self._pon_id)
+ else:
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, onu_id)
+
+ name = 'pon-get-onu_config-{}-{}'.format(self._pon_id, onu_id)
+ return self._parent.rest_client.request('GET', uri, name=name)
+
+ def _set_pon_config(self, leaf, value):
+ data = json.dumps({leaf: value})
+ uri = AdtranOltHandler.GPON_PON_CONFIG_URI.format(self._pon_id)
+ name = 'pon-set-config-{}-{}-{}'.format(self._pon_id, leaf, str(value))
+ # If no optics on PON, then PON config fails with status 400, suppress this
+ suppress_error = len(self.onu_ids) == 0
+ return self._parent.rest_client.request('PATCH', uri, data=data, name=name,
+ suppress_error=suppress_error)
+
+ def _discover_onus(self):
+ self.log.debug('discovery', state=self._admin_state, in_sync=self._in_sync)
+ if self._admin_state == AdminState.ENABLED:
+ if self._in_sync:
+ data = json.dumps({'pon-id': self._pon_id})
+ uri = AdtranOltHandler.GPON_PON_DISCOVER_ONU
+ name = 'pon-discover-onu-{}'.format(self._pon_id)
+
+ self._discovery_deferred = self._parent.rest_client.request('POST', uri, data, name=name)
+ self._discovery_deferred.addBoth(self._onu_discovery_init_complete)
+ else:
+ self.discovery_deferred = reactor.callLater(0,
+ self._onu_discovery_init_complete,
+ None)
+
+ def _onu_discovery_init_complete(self, _result):
+ """
+ This method is called after the REST POST to request ONU discovery is
+ completed. The results (body) of the post is always empty / 204 NO CONTENT
+ """
+ delay = self._no_onu_discover_tick if len(self._onus) == 0 else self._discovery_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._discovery_deferred = reactor.callLater(delay, self._discover_onus)
+
+ def sync_hardware(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
+ def read_config(results):
+ self.log.debug('read-config', results=results)
+ config = OltConfig.Pon.decode([results])
+ assert self.pon_id in config, 'sync-pon-not-found-{}'.format(self.pon_id)
+ config = config[self.pon_id]
+ self._in_sync = True
+
+ dl = []
+
+ if self.enabled != config.enabled:
+ self._in_sync = False
+ self._expedite_sync = True
+ dl.append(self._set_pon_config("enabled", self.enabled))
+
+ elif self.state == AdtnPort.State.RUNNING:
+ if self.deployment_range != config.deployment_range:
+ self._in_sync = False
+ self._expedite_sync = True
+ dl.append(self._set_pon_config("deployment-range",
+ self.deployment_range))
+
+ # A little side note: FEC enable/disable cannot be changed and
+ # will remain in the previous status until an optical module
+ # is plugged in.
+ if self.downstream_fec_enable != config.downstream_fec_enable:
+ self._in_sync = False
+ dl.append(self._set_pon_config("downstream-fec-enable",
+ self.downstream_fec_enable))
+
+ if self.upstream_fec_enable != config.upstream_fec_enable:
+ self._in_sync = False
+ self._expedite_sync = True
+ dl.append(self._set_pon_config("upstream-fec-enable",
+ self.upstream_fec_enable))
+ defer.gatherResults(dl, consumeErrors=True)
+ return config.onus
+
+ def sync_onus(hw_onus):
+ if self.state == AdtnPort.State.RUNNING:
+ self.log.debug('sync-pon-onu-results', config=hw_onus)
+
+ # ONU's have their own sync task, extra (should be deleted) are
+ # handled here.
+ hw_onu_ids = frozenset(hw_onus.keys())
+ my_onu_ids = frozenset(self._onu_by_id.keys())
+
+ extra_onus = hw_onu_ids - my_onu_ids
+ dl = [self.delete_onu(onu_id, hw_only=True) for onu_id in extra_onus]
+
+ if self.activation_method == "autoactivate":
+ # Autoactivation of ONUs requires missing ONU detection. If
+ # not found, create them here but let the TCont/GEM-Port restore
+ # be handle by ONU H/w sync logic.
+ for onu in [self._onu_by_id[onu_id] for onu_id in my_onu_ids - hw_onu_ids
+ if self._onu_by_id.get(onu_id) is not None]:
+ dl.append(onu.create(reflow=True))
+
+ return defer.gatherResults(dl, consumeErrors=True)
+
+ def failure(reason, what):
+ self.log.error('hardware-sync-{}-failed'.format(what), reason=reason)
+ self._in_sync = False
+ self._expedite_sync = False
+
+ def reschedule(_):
+ # Speed up sequential resync a limited number of times if out of sync.
+
+ delay = self.sync_tick
+
+ if self._expedite_sync:
+ self._expedite_count += 1
+ if self._expedite_count < 5:
+ delay = 1
+ else:
+ self._expedite_count = 0
+
+ delay += random.uniform(-delay / 10, delay / 10)
+ self.sync_deferred = reactor.callLater(delay, self.sync_hardware)
+
+ self.sync_deferred = self._get_pon_config()
+ self.sync_deferred.addCallbacks(read_config, failure, errbackArgs=['get-config'])
+ self.sync_deferred.addCallbacks(sync_onus, failure, errbackArgs=['pon-sync'])
+ self.sync_deferred.addBoth(reschedule)
+
+ def process_status_poll(self, status):
+ """
+ Process PON status poll request
+
+ :param status: (OltState.Pon object) results from RESTCONF GET
+ """
+ self.log.debug('process-status-poll', status=status)
+
+ if self._admin_state != AdminState.ENABLED:
+ return
+
+ # Process LOS list
+ self._process_los_alarms(frozenset(status.ont_los))
+
+ # Get new/missing from the discovered ONU leaf. Stale ONUs from previous
+ # configs are now cleaned up during h/w re-sync/reflow.
+ new, rediscovered_onus = self._process_status_onu_discovered_list(status.discovered_onu)
+
+ # Process newly discovered ONU list and rediscovered ONUs
+ for serial_number in new | rediscovered_onus:
+ reactor.callLater(0, self.add_onu, serial_number, status)
+
+ # PON Statistics
+ timestamp = arrow.utcnow().float_timestamp
+ self._process_statistics(status, timestamp)
+
+ # Process ONU info. Note that newly added ONUs will not be processed
+ # until the next pass
+ self._update_onu_status(status.onus, timestamp)
+
+ # Process GEM Port information
+ self._update_gem_status(status.gems, timestamp)
+
+ def _process_statistics(self, status, timestamp):
+ self.timestamp = timestamp
+ self.rx_packets = status.rx_packets
+ self.rx_bytes = status.rx_bytes
+ self.tx_packets = status.tx_packets
+ self.tx_bytes = status.tx_bytes
+ self.tx_bip_errors = status.tx_bip_errors
+
+ def _update_onu_status(self, onus, timestamp):
+ """
+ Process ONU status for this PON
+ :param onus: (dict) onu_id: ONU State
+ """
+ for onu_id, onu_status in onus.iteritems():
+ if onu_id in self._onu_by_id:
+ onu = self._onu_by_id[onu_id]
+ onu.timestamp = timestamp
+ onu.rssi = onu_status.rssi
+ onu.equalization_delay = onu_status.equalization_delay
+ onu.equalization_delay = onu_status.equalization_delay
+ onu.fiber_length = onu_status.fiber_length
+ onu.password = onu_status.reported_password
+
+ def _update_gem_status(self, gems, timestamp):
+ for gem_id, gem_status in gems.iteritems():
+ onu = self._onu_by_id.get(gem_status.onu_id)
+ if onu is not None:
+ gem_port = onu.gem_port(gem_status.gem_id)
+ if gem_port is not None:
+ gem_port.timestamp = timestamp
+ gem_port.rx_packets = gem_status.rx_packets
+ gem_port.rx_bytes = gem_status.rx_bytes
+ gem_port.tx_packets = gem_status.tx_packets
+ gem_port.tx_bytes = gem_status.tx_bytes
+
+ def _process_los_alarms(self, ont_los):
+ """
+ Walk current LOS and set/clear LOS as appropriate
+ :param ont_los: (frozenset) ONU IDs of ONUs in LOS alarm state
+ """
+ cleared_alarms = self._active_los_alarms - ont_los
+ new_alarms = ont_los - self._active_los_alarms
+
+ if len(cleared_alarms) > 0 or len(new_alarms) > 0:
+ self.log.info('onu-los', cleared=cleared_alarms, new=new_alarms)
+
+ for onu_id in cleared_alarms:
+ self._active_los_alarms.remove(onu_id)
+ OnuLosAlarm(self.olt.alarms, onu_id, self.port_no).clear_alarm()
+
+ for onu_id in new_alarms:
+ self._active_los_alarms.add(onu_id)
+ OnuLosAlarm(self.olt.alarms, onu_id, self.port_no).raise_alarm()
+ reactor.callLater(0, self.delete_onu, onu_id)
+
+ def _process_status_onu_discovered_list(self, discovered_onus):
+ """
+ Look for new ONUs
+
+ :param discovered_onus: (frozenset) Set of ONUs currently discovered
+ """
+ self.log.debug('discovered-ONUs', list=discovered_onus)
+
+ # Only request discovery if activation is auto-discovery or auto-activate
+ continue_discovery = ['autodiscovery', 'autoactivate']
+
+ if self._activation_method not in continue_discovery:
+ return set(), set()
+
+ my_onus = frozenset(self._onus.keys())
+
+ new_onus = discovered_onus - my_onus
+ rediscovered_onus = my_onus & discovered_onus
+
+ return new_onus, rediscovered_onus
+
+ def _get_onu_info(self, serial_number):
+ """
+ Parse through available xPON information for ONU configuration settings
+
+ :param serial_number: (string) Decoded (not base64) serial number string
+ :return: (dict) onu config data or None on lookup failure
+ """
+ try:
+ if self.activation_method == "autodiscovery":
+ # if self.authentication_method == 'serial-number':
+ raise NotImplemented('autodiscovery: Not supported at this time')
+
+ elif self.activation_method == "autoactivate":
+ onu_id = self.get_next_onu_id
+ enabled = True
+ upstream_fec_enabled = True
+
+ else:
+ self.log.error('unsupported-activation-method', method=self.activation_method)
+ return None
+
+ onu_info = {
+ 'device-id': self.olt.device_id,
+ 'serial-number': serial_number,
+ 'pon': self,
+ 'onu-id': onu_id,
+ 'enabled': enabled,
+ 'upstream-fec': upstream_fec_enabled,
+ 'password': Onu.DEFAULT_PASSWORD,
+ }
+ pon_id = self.olt.pon_id_to_port_number(self._pon_id)
+
+ # TODO: Currently only one UNI port and it is hardcoded to port 0
+ onu_info['uni-ports'] = [platform.mk_uni_port_num(pon_id, onu_id)]
+
+ # return onu_info
+ return onu_info
+
+ except Exception as e:
+ self.log.exception('get-onu-info-tech-profiles', e=e)
+ return None
+
+ @inlineCallbacks
+ def add_onu(self, serial_number_64, status):
+ """
+ Add an ONU to the PON
+
+ :param serial_number_64: (str) base-64 encoded serial number
+ :param status: (dict) OLT PON status. Used to detect if ONU is already provisioned
+ """
+ serial_number = Onu.serial_number_to_string(serial_number_64)
+ self.log.info('add-onu', serial_number=serial_number,
+ serial_number_64=serial_number_64, status=status)
+
+ # It takes a little while for a new ONU to be removed from the discovery
+ # list. Return early here so extra ONU IDs are not allocated
+ if serial_number_64 in self._onus:
+ returnValue('wait-for-fpga')
+
+ if serial_number_64 in status.onus:
+ # Handles fast entry into this task before FPGA can clear results of ONU delete
+ returnValue('sticky-onu')
+
+ # At our limit? TODO: Retrieve from device resource manager if available
+ if len(self._onus) >= self.MAX_ONUS_SUPPORTED:
+ self.log.warning('max-onus-provisioned', count=len(self._onus))
+ returnValue('max-onus-reached')
+
+ onu_info = self._get_onu_info(serial_number)
+ onu_id = onu_info['onu-id']
+
+ if onu_id is None:
+ self.log.warning('no-onu-ids-available', serial_number=serial_number,
+ serial_number_64=serial_number_64)
+ returnValue('no-ids-available')
+
+ # TODO: Is the best before or after creation in parent device?
+ alarm = OnuDiscoveryAlarm(self.olt.alarms, self.pon_id, serial_number)
+ reactor.callLater(0, alarm.raise_alarm)
+
+ # Have the core create the ONU device
+ self._parent.add_onu_device(self.pon_id, onu_id, serial_number)
+
+ try:
+ onu = Onu(onu_info)
+ self._onus[serial_number_64] = onu
+ self._onu_by_id[onu.onu_id] = onu
+
+ # Add Multicast to PON on a per-ONU basis
+ #
+ # for id_or_vid, gem_port in gem_ports.iteritems():
+ # try:
+ # if gem_port.multicast:
+ # self.log.debug('id-or-vid', id_or_vid=id_or_vid)
+ # vid = self.olt.multicast_vlans[0] if len(self.olt.multicast_vlans) else None
+ # if vid is not None:
+ # self.add_mcast_gem_port(gem_port, vid)
+ #
+ # except Exception as e:
+ # self.log.exception('id-or-vid', e=e)
+
+ _results = yield onu.create()
+
+ except Exception as e:
+ self.log.warning('add-onu', serial_number=serial_number_64, e=e)
+ # allowable exception. H/w re-sync will recover/fix any issues
+
+ @property
+ def get_next_onu_id(self):
+ return self._parent.resource_mgr.get_onu_id(self._pon_id)
+
+ def release_onu_id(self, onu_id):
+ self._parent.resource_mgr.free_onu_id(self._pon_id, onu_id)
+
+ @inlineCallbacks
+ def _remove_from_hardware(self, onu_id):
+ uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, onu_id)
+ name = 'pon-delete-onu-{}-{}'.format(self._pon_id, onu_id)
+
+ try:
+ yield self._parent.rest_client.request('DELETE', uri, name=name)
+
+ except RestInvalidResponseCode as e:
+ if e.code != 404:
+ self.log.exception('onu-delete', e=e)
+
+ except Exception as e:
+ self.log.exception('onu-hw-delete', onu_id=onu_id, e=e)
+
+ @inlineCallbacks
+ def delete_onu(self, onu_id, hw_only=False):
+ onu = self._onu_by_id.get(onu_id)
+
+ # Remove from any local dictionary
+ if onu_id in self._onu_by_id:
+ del self._onu_by_id[onu_id]
+
+ if onu is not None:
+ if onu.serial_number_64 in self._onus:
+ del self._onus[onu.serial_number_64]
+ try:
+ proxy_address = onu.proxy_address
+ onu.delete() # Remove from hardware
+
+ # And removal from VOLTHA adapter agent
+ if not hw_only:
+ self._parent.delete_child_device(proxy_address)
+
+ except Exception as e:
+ self.log.exception('onu-delete', serial_number=onu.serial_number, e=e)
+ else:
+ try:
+ yield self._remove_from_hardware(onu_id)
+
+ except Exception as e:
+ self.log.debug('onu-remove', serial_number=onu.serial_number, e=e)
+
+ # Remove from LOS list if needed TODO: Should a 'clear' alarm be sent as well ?
+ if onu is not None and onu.id in self._active_los_alarms:
+ self._active_los_alarms.remove(onu.id)
+
+ def add_mcast_gem_port(self, mcast_gem, vlan):
+ """
+ Add any new Multicast GEM Ports to the PON
+ :param mcast_gem: (GemPort)
+ """
+ if vlan in self._mcast_gem_ports:
+ return
+
+ assert len(self._mcast_gem_ports) == 0, 'Only 1 MCAST GEMPort until BBF Support'
+ assert 1 <= vlan <= 4095, 'Invalid Multicast VLAN ID'
+ assert len(self.olt.multicast_vlans) == 1, 'Only support 1 MCAST VLAN until BBF Support'
+
+ self._mcast_gem_ports[vlan] = mcast_gem
diff --git a/adapters/adtran_olt/resources/__init__.py b/adapters/adtran_olt/resources/__init__.py
new file mode 100644
index 0000000..9c454e3
--- /dev/null
+++ b/adapters/adtran_olt/resources/__init__.py
@@ -0,0 +1,14 @@
+# Copyright 2018-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
diff --git a/adapters/adtran_olt/resources/adtran_olt_resource_manager.py b/adapters/adtran_olt/resources/adtran_olt_resource_manager.py
new file mode 100644
index 0000000..caf5a46
--- /dev/null
+++ b/adapters/adtran_olt/resources/adtran_olt_resource_manager.py
@@ -0,0 +1,295 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import structlog
+
+from pyvoltha.adapters.common.pon_resource_manager.resource_manager import PONResourceManager
+from pyvoltha.common.utils.registry import registry
+# from voltha.core.config.config_backend import ConsulStore
+# from voltha.core.config.config_backend import EtcdStore
+from pyvoltha.adapters.common.kvstore.kvstore import create_kv_client
+from adtran_resource_manager import AdtranPONResourceManager
+
+
+class AdtranOltResourceMgr(object):
+
+ GEMPORT_IDS = "gemport_ids"
+ ALLOC_IDS = "alloc_ids"
+ BASE_PATH_KV_STORE = "adtran_olt/{}" # adtran_olt/<device_id>
+
+ def __init__(self, device_id, host_and_port, extra_args, device_info):
+ self.log = structlog.get_logger(id=device_id,
+ ip=host_and_port)
+ self.device_id = device_id
+ self.host_and_port = host_and_port
+ self.extra_args = extra_args
+ self.device_info = device_info
+ self.args = registry('main').get_args()
+ self._path_prefix = AdtranOltResourceMgr.BASE_PATH_KV_STORE.format(device_id)
+
+ # KV store's IP Address and PORT
+ # host, port = '127.0.0.1', 8500
+ if self.args.backend == 'etcd':
+ host, port = self.args.etcd.split(':', 1)
+ self.kv_store = create_kv_client('etcd', host, port)
+ # self.kv_store = EtcdStore(host, port,
+ # AdtranOltResourceMgr.BASE_PATH_KV_STORE.format(device_id))
+ elif self.args.backend == 'consul':
+ host, port = self.args.consul.split(':', 1)
+ self.kv_store = create_kv_client('consul', host, port)
+ # self.kv_store = ConsulStore(host, port,
+ # AdtranOltResourceMgr.BASE_PATH_KV_STORE.format(device_id))
+ else:
+ self.log.error('Invalid-backend')
+ raise Exception("Invalid-backend-for-kv-store")
+
+ self.resource_mgr = AdtranPONResourceManager(
+ self.device_info.technology,
+ self.extra_args,
+ self.device_id, self.args.backend,
+ host, port
+ )
+ # Tech profiles uses this resource manager to retrieve information on a per-interface
+ # basis
+ self.resource_managers = {intf_id: self.resource_mgr for intf_id in device_info.intf_ids}
+
+ # Flag to indicate whether information fetched from device should
+ # be used to initialize PON Resource Ranges
+ self.use_device_info = False
+
+ self.initialize_device_resource_range_and_pool()
+
+ def __del__(self):
+ self.log.info("clearing-device-resource-pool")
+ for key, resource_mgr in self.resource_mgrs.iteritems():
+ resource_mgr.clear_device_resource_pool()
+
+ def get_onu_id(self, pon_intf_id):
+ onu_id = self.resource_mgr.get_resource_id(pon_intf_id,
+ PONResourceManager.ONU_ID,
+ onu_id=None,
+ num_of_id=1)
+ if onu_id is not None:
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ self.resource_mgr.init_resource_map(pon_intf_onu_id)
+
+ return onu_id
+
+ def free_onu_id(self, pon_intf_id, onu_id):
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.ONU_ID,
+ onu_id)
+ pon_intf_onu_id = (pon_intf_id, onu_id)
+ self.resource_mgr.remove_resource_map(pon_intf_onu_id)
+
+ def get_alloc_id(self, pon_intf_onu_id):
+ # Derive the pon_intf from the pon_intf_onu_id tuple
+ pon_intf = pon_intf_onu_id[0]
+ onu_id = pon_intf_onu_id[1]
+ alloc_id_list = self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_onu_id)
+
+ if alloc_id_list and len(alloc_id_list) > 0:
+ # Since we support only one alloc_id for the ONU at the moment,
+ # return the first alloc_id in the list, if available, for that
+ # ONU.
+ return alloc_id_list[0]
+
+ alloc_id_list = self.resource_mgr.get_resource_id(pon_intf,
+ PONResourceManager.ALLOC_ID,
+ onu_id=onu_id,
+ num_of_id=1)
+ if alloc_id_list and len(alloc_id_list) == 0:
+ self.log.error("no-alloc-id-available")
+ return None
+
+ # update the resource map on KV store with the list of alloc_id
+ # allocated for the pon_intf_onu_id tuple
+ self.resource_mgr.update_alloc_ids_for_onu(pon_intf_onu_id,
+ alloc_id_list)
+
+ # Since we request only one alloc id, we refer the 0th
+ # index
+ alloc_id = alloc_id_list[0]
+
+ return alloc_id
+
+ def get_gemport_id(self, pon_intf_onu_id, num_of_id=1):
+ # TODO: Remove this if never used
+ # Derive the pon_intf and onu_id from the pon_intf_onu_id tuple
+ pon_intf = pon_intf_onu_id[0]
+ onu_id = pon_intf_onu_id[1]
+ uni_id = pon_intf_onu_id[2]
+ assert False, 'unused function'
+
+ # gemport_id_list = self.resource_managers[pon_intf].get_current_gemport_ids_for_onu(
+ # pon_intf_onu_id)
+ # if gemport_id_list and len(gemport_id_list) > 0:
+ # return gemport_id_list
+ #
+ # gemport_id_list = self.resource_mgrs[pon_intf].get_resource_id(
+ # pon_intf_id=pon_intf,
+ # resource_type=PONResourceManager.GEMPORT_ID,
+ # num_of_id=num_of_id
+ # )
+ #
+ # if gemport_id_list and len(gemport_id_list) == 0:
+ # self.log.error("no-gemport-id-available")
+ # return None
+ #
+ # # update the resource map on KV store with the list of gemport_id
+ # # allocated for the pon_intf_onu_id tuple
+ # self.resource_managers[pon_intf].update_gemport_ids_for_onu(pon_intf_onu_id,
+ # gemport_id_list)
+ #
+ # self.update_gemports_ponport_to_onu_map_on_kv_store(gemport_id_list,
+ # pon_intf, onu_id, uni_id)
+ # return gemport_id_list
+
+ def free_pon_resources_for_onu(self, pon_intf_id_onu_id):
+ """ Typically called on ONU delete """
+
+ pon_intf_id = pon_intf_id_onu_id[0]
+ onu_id = pon_intf_id_onu_id[1]
+ try:
+ alloc_ids = self.resource_mgr.get_current_alloc_ids_for_onu(pon_intf_id_onu_id)
+ if alloc_ids is not None:
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.ALLOC_ID,
+ alloc_ids, onu_id=onu_id)
+ except:
+ pass
+
+ try:
+ gemport_ids = self.resource_mgr.get_current_gemport_ids_for_onu(pon_intf_id_onu_id)
+ if gemport_ids is not None:
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.GEMPORT_ID,
+ gemport_ids)
+ except:
+ pass
+
+ try:
+ self.resource_mgr.free_resource_id(pon_intf_id,
+ PONResourceManager.ONU_ID,
+ onu_id)
+ except:
+ pass
+
+ # Clear resource map associated with (pon_intf_id, gemport_id) tuple.
+ self.resource_mgr.remove_resource_map(pon_intf_id_onu_id)
+
+ # Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
+ if gemport_ids is not None:
+ for gemport_id in gemport_ids:
+ try:
+ self.kv_store.delete(self._make_path(str((pon_intf_id, gemport_id))))
+ # del self.kv_store[str((pon_intf_id, gemport_id))]
+ except:
+ pass
+
+ def initialize_device_resource_range_and_pool(self):
+ if not self.use_device_info:
+ status = self.resource_mgr.init_resource_ranges_from_kv_store()
+ if not status:
+ self.log.error("failed-to-load-resource-range-from-kv-store")
+ # When we have failed to read the PON Resource ranges from KV
+ # store, use the information selected as the default.
+ self.use_device_info = True
+
+ if self.use_device_info:
+ self.log.info("using-device-info-to-init-pon-resource-ranges")
+ self.resource_mgr.init_default_pon_resource_ranges(
+ onu_id_start_idx=self.device_info.onu_id_start,
+ onu_id_end_idx=self.device_info.onu_id_end,
+ alloc_id_start_idx=self.device_info.alloc_id_start,
+ alloc_id_end_idx=self.device_info.alloc_id_end,
+ gemport_id_start_idx=self.device_info.gemport_id_start,
+ gemport_id_end_idx=self.device_info.gemport_id_end,
+ num_of_pon_ports=self.device_info.pon_ports,
+ intf_ids=self.device_info.intf_ids
+ )
+
+ # After we have initialized resource ranges, initialize the
+ # resource pools accordingly.
+ self.resource_mgr.init_device_resource_pool()
+
+ def get_current_gemport_ids_for_onu(self, pon_intf_onu_id):
+ pon_intf_id = pon_intf_onu_id[0]
+ return self.resource_managers[pon_intf_id].get_current_gemport_ids_for_onu(pon_intf_onu_id)
+
+ def get_current_alloc_ids_for_onu(self, pon_intf_onu_id):
+ pon_intf_id = pon_intf_onu_id[0]
+ alloc_ids = self.resource_managers[pon_intf_id].get_current_alloc_ids_for_onu(pon_intf_onu_id)
+ if alloc_ids is None:
+ return None
+ # We support only one tcont at the moment
+ return alloc_ids[0]
+
+ def update_gemports_ponport_to_onu_map_on_kv_store(self, gemport_list, pon_port, onu_id, uni_id):
+ for gemport in gemport_list:
+ pon_intf_gemport = (pon_port, gemport)
+ # This information is used when packet_indication is received and
+ # we need to derive the ONU Id for which the packet arrived based
+ # on the pon_intf and gemport available in the packet_indication
+ # self.kv_store[str(pon_intf_gemport)] = ' '.join(map(str, (onu_id, uni_id)))
+ self.kv_store.put(self._make_path(str(pon_intf_gemport)), ' '.join(map(str, (onu_id, uni_id)))
+
+ def get_onu_uni_from_ponport_gemport(self, pon_port, gemport):
+ pon_intf_gemport = (pon_port, gemport)
+ #return tuple(map(int, self.kv_store[str(pon_intf_gemport)].split(' ')))
+ return tuple(map(int, self.kv_store.get(self._make_path(str(pon_intf_gemport))).split(' ')))
+
+ def get_flow_id(self, pon_intf_id, onu_id, uni_id, flow_store_cookie, flow_category=None):
+ pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ try:
+ flow_ids = self.resource_managers[pon_intf_id]. \
+ get_current_flow_ids_for_onu(pon_intf_onu_id)
+ if flow_ids is not None:
+ for flow_id in flow_ids:
+ flows = self.get_flow_id_info(pon_intf_id, onu_id, uni_id, flow_id)
+ assert (isinstance(flows, list))
+ for flow in flows:
+
+ if flow_category is not None and \
+ 'flow_category' in flow and \
+ flow['flow_category'] == flow_category:
+ return flow_id
+ if flow['flow_store_cookie'] == flow_store_cookie:
+ return flow_id
+ except Exception as e:
+ self.log.error("error-retrieving-flow-info", e=e)
+
+ flow_id = self.resource_managers[pon_intf_id].get_resource_id(
+ pon_intf_onu_id[0], PONResourceManager.FLOW_ID)
+ if flow_id is not None:
+ self.resource_managers[pon_intf_id].update_flow_id_for_onu(
+ pon_intf_onu_id, flow_id
+ )
+
+ return flow_id
+
+ def get_flow_id_info(self, pon_intf_id, onu_id, uni_id, flow_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ return self.resource_managers[pon_intf_id].get_flow_id_info(pon_intf_onu_id, flow_id)
+
+ def get_current_flow_ids_for_uni(self, pon_intf_id, onu_id, uni_id):
+ pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ return self.resource_managers[pon_intf_id].get_current_flow_ids_for_onu(pon_intf_onu_id)
+
+ def update_flow_id_info_for_uni(self, pon_intf_id, onu_id, uni_id, flow_id, flow_data):
+ pon_intf_onu_id = (pon_intf_id, onu_id, uni_id)
+ return self.resource_managers[pon_intf_id].update_flow_id_info_for_onu(
+ pon_intf_onu_id, flow_id, flow_data)
\ No newline at end of file
diff --git a/adapters/adtran_olt/resources/adtran_resource_manager.py b/adapters/adtran_olt/resources/adtran_resource_manager.py
new file mode 100644
index 0000000..9f2a0a4
--- /dev/null
+++ b/adapters/adtran_olt/resources/adtran_resource_manager.py
@@ -0,0 +1,358 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Resource Manager will be unique for each OLT device.
+
+It exposes APIs to create/free alloc_ids/onu_ids/gemport_ids. Resource Manager
+uses a KV store in backend to ensure resiliency of the data.
+"""
+from bitstring import BitArray
+import json
+from pyvoltha.adapters.common.pon_resource_manager.resource_manager import PONResourceManager
+import adtranolt_platform as platform
+
+
+class AdtranPONResourceManager(PONResourceManager):
+ """Implements APIs to initialize/allocate/release alloc/gemport/onu IDs."""
+
+ # Constants for internal usage.
+ ONU_MAP = 'onu_map'
+
+ def init_device_resource_pool(self):
+ """
+ Initialize resource pool for all PON ports.
+ """
+ for pon_id in self.intf_ids:
+ self.init_resource_id_pool(
+ pon_intf_id=pon_id,
+ resource_type=PONResourceManager.ONU_ID,
+ start_idx=self.pon_resource_ranges[PONResourceManager.ONU_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[PONResourceManager.ONU_ID_END_IDX])
+
+ alloc_id_map = dict()
+ for onu_id in range(platform.MAX_ONUS_PER_PON):
+ alloc_id_map[onu_id] = [platform.mk_alloc_id(pon_id, onu_id, idx)
+ for idx in xrange(platform.MAX_TCONTS_PER_ONU)]
+
+ self.init_resource_id_pool(pon_intf_id=pon_id,
+ resource_type=PONResourceManager.ALLOC_ID,
+ resource_map=alloc_id_map)
+
+ self.init_resource_id_pool(
+ pon_intf_id=pon_id,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ start_idx=self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_START_IDX],
+ end_idx=self.pon_resource_ranges[PONResourceManager.GEMPORT_ID_END_IDX])
+
+ def clear_device_resource_pool(self):
+ """
+ Clear resource pool of all PON ports.
+ """
+ for pon_id in self.intf_ids:
+ self.clear_resource_id_pool(pon_intf_id=pon_id,
+ resource_type=PONResourceManager.ONU_ID)
+
+ self.clear_resource_id_pool(
+ pon_intf_id=pon_id,
+ resource_type=PONResourceManager.ALLOC_ID,
+ )
+
+ self.clear_resource_id_pool(
+ pon_intf_id=pon_id,
+ resource_type=PONResourceManager.GEMPORT_ID,
+ )
+ self.clear_resource_id_pool(
+ pon_intf_id=pon_id,
+ resource_type=PONResourceManager.FLOW_ID,
+ )
+
+ def init_resource_id_pool(self, pon_intf_id, resource_type, start_idx=None,
+ end_idx=None, resource_map=None):
+ """
+ Initialize Resource ID pool for a given Resource Type on a given PON Port
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param start_idx: start index for onu id pool
+ :param end_idx: end index for onu id pool
+ :param resource_map: (dict) Resource map if per-ONU specific
+ :return boolean: True if resource id pool initialized else false
+ """
+ status = False
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return status
+
+ try:
+ # In case of adapter reboot and reconciliation resource in kv store
+ # checked for its presence if not kv store update happens
+ resource = self._get_resource(path)
+
+ if resource is not None:
+ self._log.info("Resource-already-present-in-store", path=path)
+ status = True
+
+ else:
+ if resource_map is None:
+ resource = self._format_resource(pon_intf_id, start_idx, end_idx)
+ self._log.info("Resource-initialized", path=path)
+
+ else:
+ resource = self._format_map_resource(pon_intf_id, resource_map)
+
+ # Add resource as json in kv store.
+ status = self._kv_store.update_to_kv_store(path, resource)
+
+ except Exception as e:
+ self._log.exception("error-initializing-resource-pool", e=e)
+
+ return status
+
+ def _generate_next_id(self, resource, onu_id=None):
+ """
+ Generate unique id having OFFSET as start index.
+
+ :param resource: resource used to generate ID
+ :return int: generated id
+ """
+ if onu_id is not None:
+ resource = resource[AdtranPONResourceManager.ONU_MAP][str(onu_id)]
+
+ pos = resource[PONResourceManager.POOL].find('0b0')
+ resource[PONResourceManager.POOL].set(1, pos)
+ return pos[0] + resource[PONResourceManager.START_IDX]
+
+ def _release_id(self, resource, unique_id, onu_id=None):
+ """
+ Release unique id having OFFSET as start index.
+
+ :param resource: resource used to release ID
+ :param unique_id: id need to be released
+ :param onu_id: ONU ID if unique per ONU
+ """
+ if onu_id is not None:
+ resource = resource[AdtranPONResourceManager.ONU_MAP][str(onu_id)]
+
+ pos = ((int(unique_id)) - resource[PONResourceManager.START_IDX])
+ resource[PONResourceManager.POOL].set(0, pos)
+
+ def get_resource_id(self, pon_intf_id, resource_type, onu_id=None, num_of_id=1):
+ """
+ Create alloc/gemport/onu id for given OLT PON interface.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param num_of_id: required number of ids
+ :param onu_id: ONU ID if unique per ONU (Used for Alloc IDs)
+ :return list/int/None: list, int or None if resource type is
+ alloc_id/gemport_id, onu_id or invalid type
+ respectively
+ """
+ result = None
+
+ if num_of_id < 1:
+ self._log.error("invalid-num-of-resources-requested")
+ return result
+
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return result
+
+ try:
+ resource = self._get_resource(path, onu_id)
+ if resource is not None and \
+ (resource_type == PONResourceManager.ONU_ID or
+ resource_type == PONResourceManager.FLOW_ID):
+ result = self._generate_next_id(resource)
+
+ elif resource is not None and \
+ resource_type == PONResourceManager.GEMPORT_ID:
+ if num_of_id == 1:
+ result = self._generate_next_id(resource)
+ else:
+ result = [self._generate_next_id(resource) for _ in range(num_of_id)]
+
+ elif resource is not None and \
+ resource_type == PONResourceManager.ALLOC_ID:
+ if num_of_id == 1:
+ result = self._generate_next_id(resource, onu_id)
+ else:
+ result = [self._generate_next_id(resource, onu_id) for _ in range(num_of_id)]
+ else:
+ raise Exception("get-resource-failed")
+
+ self._log.debug("Get-" + resource_type + "-success", result=result,
+ path=path)
+ # Update resource in kv store
+ self._update_resource(path, resource, onu_id=onu_id)
+
+ except Exception as e:
+ self._log.exception("Get-" + resource_type + "-id-failed",
+ path=path, e=e)
+ return result
+
+ def free_resource_id(self, pon_intf_id, resource_type, release_content, onu_id=None):
+ """
+ Release alloc/gemport/onu id for given OLT PON interface.
+
+ :param pon_intf_id: OLT PON interface id
+ :param resource_type: String to identify type of resource
+ :param release_content: required number of ids
+ :param onu_id: ONU ID if unique per ONU
+ :return boolean: True if all IDs in given release_content released
+ else False
+ """
+ status = False
+
+ path = self._get_path(pon_intf_id, resource_type)
+ if path is None:
+ return status
+
+ try:
+ resource = self._get_resource(path, onu_id=onu_id)
+ if resource is None:
+ raise Exception("get-resource-for-free-failed")
+
+ if resource_type == PONResourceManager.ONU_ID:
+ self._release_id(resource, release_content)
+
+ elif resource_type == PONResourceManager.ALLOC_ID:
+ for content in release_content:
+ self._release_id(resource, content)
+
+ elif resource_type == PONResourceManager.GEMPORT_ID:
+ for content in release_content:
+ self._release_id(resource, content, onu_id)
+ else:
+ raise Exception("get-resource-for-free-failed")
+
+ self._log.debug("Free-" + resource_type + "-success", path=path)
+
+ # Update resource in kv store
+ status = self._update_resource(path, resource, onu_id=onu_id)
+
+ except Exception as e:
+ self._log.exception("Free-" + resource_type + "-failed",
+ path=path, e=e)
+ return status
+
+ def _update_resource(self, path, resource, onu_id=None):
+ """
+ Update resource in resource kv store.
+
+ :param path: path to update resource
+ :param resource: resource need to be updated
+ :return boolean: True if resource updated in kv store else False
+ """
+ if 'alloc_id' in path.lower():
+ assert onu_id is not None
+ poolResource = resource[AdtranPONResourceManager.ONU_MAP][str(onu_id)]
+ poolResource[PONResourceManager.POOL] = \
+ poolResource[PONResourceManager.POOL].bin
+ else:
+ resource[PONResourceManager.POOL] = \
+ resource[PONResourceManager.POOL].bin
+
+ return self._kv_store.update_to_kv_store(path, json.dumps(resource))
+
+ def _get_resource(self, path, onu_id=None):
+ """
+ Get resource from kv store.
+
+ :param path: path to get resource
+ :return: resource if resource present in kv store else None
+ """
+ # get resource from kv store
+ result = self._kv_store.get_from_kv_store(path)
+ if result is None:
+ return result
+
+ self._log.info("dumping-resource", result=result)
+ resource = result
+
+ if resource is not None:
+ # decode resource fetched from backend store to dictionary
+ resource = json.loads(resource)
+
+ if 'alloc_id' in path.lower():
+ assert onu_id is not None
+ poolResource = resource[AdtranPONResourceManager.ONU_MAP][str(onu_id)]
+ poolResource[PONResourceManager.POOL] = \
+ BitArray('0b' + poolResource[PONResourceManager.POOL])
+ else:
+ # resource pool in backend store stored as binary string whereas to
+ # access the pool to generate/release IDs it need to be converted
+ # as BitArray
+ resource[PONResourceManager.POOL] = \
+ BitArray('0b' + resource[PONResourceManager.POOL])
+
+ return resource
+
+ def _format_resource(self, pon_intf_id, start_idx, end_idx):
+ """
+ Format resource as json.
+
+ :param pon_intf_id: OLT PON interface id
+ :param start_idx: start index for id pool
+ :param end_idx: end index for id pool
+ :return dictionary: resource formatted as dictionary
+ """
+ # Format resource as json to be stored in backend store
+ resource = dict()
+ resource[PONResourceManager.PON_INTF_ID] = pon_intf_id
+ resource[PONResourceManager.START_IDX] = start_idx
+ resource[PONResourceManager.END_IDX] = end_idx
+
+ # resource pool stored in backend store as binary string
+ resource[PONResourceManager.POOL] = BitArray(end_idx-start_idx).bin
+
+ return json.dumps(resource)
+
+ def _format_map_resource(self, pon_intf_id, resource_map):
+ """
+ Format resource as json.
+ # TODO: Refactor the resource BitArray to be just a list of the resources.
+ # This is used to store available alloc-id's on a per-onu/pon basis
+ # which in BitArray string form, is a 768 byte string for just 4 possible
+ # alloc-IDs. This equates to 1.57 MB of storage when you take into
+ # account 128 ONUs and 16 PONs pre-provisioneed
+ :param pon_intf_id: OLT PON interface id
+ :param resource_map: (dict) ONU ID -> Scattered list of IDs
+ :return dictionary: resource formatted as dictionary
+ """
+ # Format resource as json to be stored in backend store
+ resource = dict()
+ resource[PONResourceManager.PON_INTF_ID] = pon_intf_id
+
+ onu_dict = dict()
+ for onu_id, resources in resource_map.items():
+ start_idx = min(resources)
+ end_idx = max(resources) + 1
+
+ onu_dict[onu_id] = {
+ PONResourceManager.START_IDX: start_idx,
+ PONResourceManager.END_IDX: end_idx,
+ }
+ # Set non-allowed values as taken
+ resource_map = BitArray(end_idx - start_idx)
+ not_available = {pos for pos in xrange(end_idx-start_idx)
+ if pos + start_idx not in resources}
+ resource_map.set(True, not_available)
+ onu_dict[onu_id][PONResourceManager.POOL] = resource_map.bin
+
+ resource[AdtranPONResourceManager.ONU_MAP] = onu_dict
+ return json.dumps(resource)
diff --git a/adapters/adtran_olt/resources/adtranolt_platform.py b/adapters/adtran_olt/resources/adtranolt_platform.py
new file mode 100644
index 0000000..3ec7b81
--- /dev/null
+++ b/adapters/adtran_olt/resources/adtranolt_platform.py
@@ -0,0 +1,182 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from pyvoltha.protos.device_pb2 import Port
+import pyvoltha.protos.device_pb2 as dev_pb2
+
+#######################################################################
+#
+# This is a copy of the OpenOLT file of a similar name and is used
+# when running in non-xPON (OpenOLT/SEBA) mode. We need to closely
+# watch for changes in the OpenOLT and eventually work together to
+# have a better way to do things (and more ONUs than 112)
+#
+# TODO: These duplicate some methods in the OLT Handler. Clean up
+# and use a separate file and include it into OLT Handler object
+# as something it derives from.
+#
+#######################################################################
+"""
+Encoding of identifiers
+=======================
+
+Alloc ID
+
+ Uniquely identifies a T-CONT
+ Ranges from 1024..16383 per ITU Standard
+ For Adtran, 1024..1919
+ Unique per PON interface
+
+ 9 8 7 0
+ +-----+----------+
+ | idx | onu_id | + (Min Alloc ID)
+ +-----+----------+
+
+ onu id = 8 bit
+ Alloc index = 2 bits (max 4 TCONTs/ONU)
+
+Flow id
+
+ Identifies a flow within a single OLT
+ Flow Id is unique per OLT
+ Multiple GEM ports can map to same flow id
+
+ 13 11 4 0
+ +--------+--------------+------+
+ | pon id | onu id | Flow |
+ | | | idx |
+ +--------+--------------+------+
+
+ 14 bits = 16384 flows (per OLT).
+
+ pon id = 4 bits = 16 PON ports
+ onu id = 7 bits = 128 ONUss per PON port
+ Flow index = 3 bits = 4 bi-directional flows per ONU
+ = 8 uni-directional flows per ONU
+
+
+Logical (OF) UNI port number
+
+ OpenFlow port number corresponding to PON UNI
+
+ 15 11 4 0
+ +--+--------+--------------+------+
+ |0 | pon id | onu id | 0 |
+ +--+--------+--------------+------+
+
+ pon id = 4 bits = 16 PON ports
+ onu id = 7 bits = 128 ONUs per PON port
+
+
+PON OLT (OF) port number
+
+ OpenFlow port number corresponding to PON OLT ports
+
+ 31 28 0
+ +--------+------------------------~~~------+
+ | 0x2 | pon intf id |
+ +--------+------------------------~~~------+
+
+"""
+
+MIN_TCONT_ALLOC_ID = 1024 # 1024..16383
+MAX_TCONT_ALLOC_ID = 16383
+
+MIN_GEM_PORT_ID = 2176 # 2176..4222
+MAX_GEM_PORT_ID = MIN_GEM_PORT_ID + 2046
+
+MAX_ONUS_PER_PON = 128
+MAX_TCONTS_PER_ONU = 4
+MAX_GEM_PORTS_PER_ONU = 16 # Hardware can handle more
+
+
+class adtran_platform(object):
+ def __init__(self):
+ pass
+
+ def mk_uni_port_num(self, intf_id, onu_id, uni_id=0):
+ return intf_id << 11 | onu_id << 4 | uni_id
+
+ def uni_id_from_uni_port(self, uni_port):
+ return uni_port & 0xF
+
+
+def mk_uni_port_num(intf_id, onu_id, uni_id=0):
+ """
+ Create a unique virtual UNI port number based up on PON and ONU ID
+ :param intf_id:
+ :param onu_id: (int) ONU ID (0..max)
+ :return: (int) UNI Port number
+ """
+ return intf_id << 11 | onu_id << 4 | uni_id
+
+
+def uni_id_from_uni_port(uni_port):
+ return uni_port & 0xF
+
+
+def intf_id_from_uni_port_num(port_num):
+ """
+ Extract the PON device port number from a virtual UNI Port number
+
+ :param port_num: (int) virtual UNI / vENET port number on OLT PON
+ :return: (int) PON Port number (note, this is not the PON ID)
+ """
+ return (port_num >> 11) & 0xF
+
+
+def mk_alloc_id(_, onu_id, idx=0):
+ """
+ Allocate a TCONT Alloc-ID. This is only called by the OLT
+
+ :param _: (int) PON ID (not used)
+ :param onu_id: (int) ONU ID (0..MAX_ONUS_PER_PON-1)
+ :param idx: (int) TCONT Index (0..7)
+ """
+ assert 0 <= onu_id < MAX_ONUS_PER_PON, 'Invalid ONU ID. Expect 0..{}'.format(MAX_ONUS_PER_PON-1)
+ assert 0 <= idx <= MAX_TCONTS_PER_ONU, 'Invalid TCONT instance. Expect 0..{}'.format(MAX_TCONTS_PER_ONU)
+ alloc_id = MIN_TCONT_ALLOC_ID + (idx << 8) + onu_id
+ return alloc_id
+
+
+def intf_id_from_nni_port_num(port_num):
+ # OpenOLT starts at 128. We start at 1 (one-to-one mapping)
+ # return port_num - 128
+ return port_num
+
+
+def intf_id_to_intf_type(intf_id):
+ # if (2 << 28 ^ intf_id) < 16:
+ # return Port.PON_OLT
+ # elif 128 <= intf_id <= 132:
+ # return Port.ETHERNET_NNI
+ if 5 <= intf_id <= 20:
+ return Port.PON_OLT
+ elif 1 <= intf_id <= 4:
+ return Port.ETHERNET_NNI
+ else:
+ raise Exception('Invalid intf_id value')
+
+
+def is_upstream(in_port, out_port):
+ # FIXME
+ # if out_port in [128, 129, 130, 131, 0xfffd, 0xfffffffd]:
+ # Not sure what fffd and the other is
+ return out_port in [1, 2, 3, 4, 0xfffd, 0xfffffffd]
+
+
+def is_downstream(in_port, out_port):
+ return not is_upstream(in_port, out_port)
diff --git a/adapters/adtran_olt/xpon/__init__.py b/adapters/adtran_olt/xpon/__init__.py
new file mode 100644
index 0000000..d67fcf2
--- /dev/null
+++ b/adapters/adtran_olt/xpon/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_olt/xpon/olt_gem_port.py b/adapters/adtran_olt/xpon/olt_gem_port.py
new file mode 100644
index 0000000..9159262
--- /dev/null
+++ b/adapters/adtran_olt/xpon/olt_gem_port.py
@@ -0,0 +1,126 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import json
+
+from adapters.adtran_common.xpon.gem_port import GemPort
+from twisted.internet.defer import inlineCallbacks, returnValue
+from ..adtran_olt_handler import AdtranOltHandler
+
+log = structlog.get_logger()
+
+
+class OltGemPort(GemPort):
+ """
+ Adtran OLT specific implementation
+ """
+ def __init__(self, gem_id, alloc_id, tech_profile_id, pon_id, onu_id, uni_id,
+ encryption=False,
+ multicast=False,
+ traffic_class=None,
+ handler=None,
+ is_mock=False):
+ super(OltGemPort, self).__init__(gem_id, alloc_id, uni_id, tech_profile_id,
+ encryption=encryption,
+ multicast=multicast,
+ traffic_class=traffic_class,
+ handler=handler,
+ is_mock=is_mock)
+ self._timestamp = None
+ self._pon_id = pon_id
+ self._onu_id = onu_id # None if this is a multicast GEM Port
+
+ def __str__(self):
+ return "GemPort: {}/{}/{}, alloc-id: {}, gem-id: {}".format(self.pon_id, self.onu_id,
+ self.uni_id, self.alloc_id,
+ self.gem_id)
+
+ @staticmethod
+ def create(handler, gem, alloc_id, tech_profile_id, pon_id, onu_id, uni_id, _ofp_port_no):
+ return OltGemPort(gem.gemport_id,
+ alloc_id,
+ tech_profile_id,
+ pon_id, onu_id, uni_id,
+ encryption=gem.aes_encryption.lower() == 'true',
+ handler=handler,
+ multicast=False)
+
+ @property
+ def pon_id(self):
+ return self._pon_id
+
+ @property
+ def onu_id(self):
+ return self._onu_id
+
+ @property
+ def timestamp(self):
+ return self._timestamp
+
+ @timestamp.setter
+ def timestamp(self, value):
+ self._timestamp = value
+
+ @property
+ def encryption(self):
+ return self._encryption
+
+ @encryption.setter
+ def encryption(self, value):
+ assert isinstance(value, bool), 'encryption is a boolean'
+
+ if self._encryption != value:
+ self._encryption = value
+ self.set_config(self._handler.rest_client, 'encryption', value)
+
+ @inlineCallbacks
+ def add_to_hardware(self, session, operation='POST'):
+ if self._is_mock:
+ returnValue('mock')
+
+ uri = AdtranOltHandler.GPON_GEM_CONFIG_LIST_URI.format(self.pon_id, self.onu_id)
+ data = json.dumps(self.to_dict())
+ name = 'gem-port-create-{}-{}: {}/{}'.format(self.pon_id, self.onu_id,
+ self.gem_id,
+ self.alloc_id)
+ try:
+ results = yield session.request(operation, uri, data=data, name=name)
+ returnValue(results)
+
+ except Exception as e:
+ if operation == 'POST':
+ returnValue(self.add_to_hardware(session, operation='PATCH'))
+ else:
+ log.exception('add-2-hw', gem=self, e=e)
+ raise
+
+ def remove_from_hardware(self, session):
+ if self._is_mock:
+ returnValue('mock')
+
+ uri = AdtranOltHandler.GPON_GEM_CONFIG_URI.format(self.pon_id, self.onu_id, self.gem_id)
+ name = 'gem-port-delete-{}-{}: {}'.format(self.pon_id, self.onu_id, self.gem_id)
+ return session.request('DELETE', uri, name=name)
+
+ def set_config(self, session, leaf, value):
+ from ..adtran_olt_handler import AdtranOltHandler
+
+ data = json.dumps({leaf: value})
+ uri = AdtranOltHandler.GPON_GEM_CONFIG_URI.format(self.pon_id,
+ self.onu_id,
+ self.gem_id)
+ name = 'onu-set-config-{}-{}-{}'.format(self._pon_id, leaf, str(value))
+ return session.request('PATCH', uri, data=data, name=name)
diff --git a/adapters/adtran_olt/xpon/olt_tcont.py b/adapters/adtran_olt/xpon/olt_tcont.py
new file mode 100644
index 0000000..db31543
--- /dev/null
+++ b/adapters/adtran_olt/xpon/olt_tcont.py
@@ -0,0 +1,90 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import json
+from twisted.internet.defer import inlineCallbacks, returnValue
+from adapters.adtran_common.xpon.tcont import TCont
+# from python.adapters.openolt.protos import openolt_pb2
+from olt_traffic_descriptor import OltTrafficDescriptor
+from ..adtran_olt_handler import AdtranOltHandler
+
+log = structlog.get_logger()
+
+
+class OltTCont(TCont):
+ """
+ Adtran OLT specific implementation
+ """
+ def __init__(self, alloc_id, tech_profile_id, traffic_descriptor, pon_id, onu_id, uni_id, is_mock=False):
+ super(OltTCont, self).__init__(alloc_id, tech_profile_id, traffic_descriptor, uni_id, is_mock=is_mock)
+ self.pon_id = pon_id
+ self.onu_id = onu_id
+
+ def __str__(self):
+ return "TCont: {}/{}/{}, alloc-id: {}".format(self.pon_id, self.onu_id,
+ self.uni_id, self.alloc_id)
+
+ @staticmethod
+ def create(tcont, pon_id, onu_id, tech_profile_id, uni_id, ofp_port_no):
+ # Only valid information in the upstream tcont of a tech profile
+ if tcont.direction != openolt_pb2.UPSTREAM:
+ return None
+
+ td = OltTrafficDescriptor.create(tcont, pon_id, onu_id, uni_id, ofp_port_no)
+ return OltTCont(tcont.alloc_id, tech_profile_id, td, pon_id, onu_id, uni_id)
+
+ @inlineCallbacks
+ def add_to_hardware(self, session):
+ if self._is_mock:
+ returnValue('mock')
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_LIST_URI.format(self.pon_id, self.onu_id)
+ data = json.dumps({'alloc-id': self.alloc_id})
+ name = 'tcont-create-{}-{}: {}'.format(self.pon_id, self.onu_id, self.alloc_id)
+
+ # For TCONT, only leaf is the key. So only post needed
+ try:
+ results = yield session.request('POST', uri, data=data, name=name,
+ suppress_error=False)
+ except Exception as _e:
+ results = None
+
+ if self.traffic_descriptor is not None:
+ try:
+ results = yield self.traffic_descriptor.add_to_hardware(session)
+
+ except Exception as e:
+ log.exception('traffic-descriptor', tcont=self,
+ td=self.traffic_descriptor, e=e)
+ raise
+
+ returnValue(results)
+
+ def remove_from_hardware(self, session):
+ if self._is_mock:
+ returnValue('mock')
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(self.pon_id, self.onu_id, self.alloc_id)
+ name = 'tcont-delete-{}-{}: {}'.format(self.pon_id, self.onu_id, self.alloc_id)
+ return session.request('DELETE', uri, name=name)
+
+
+
+
+
+
+
+
+
diff --git a/adapters/adtran_olt/xpon/olt_traffic_descriptor.py b/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
new file mode 100644
index 0000000..c6d90cf
--- /dev/null
+++ b/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
@@ -0,0 +1,98 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import json
+from adapters.adtran_common.xpon.traffic_descriptor import TrafficDescriptor
+from twisted.internet.defer import inlineCallbacks, returnValue
+from ..adtran_olt_handler import AdtranOltHandler
+
+log = structlog.get_logger()
+
+
+class OltTrafficDescriptor(TrafficDescriptor):
+ """
+ Adtran ONU specific implementation
+ """
+ def __init__(self, pon_id, onu_id, alloc_id, fixed, assured, maximum,
+ additional=TrafficDescriptor.AdditionalBwEligibility.DEFAULT,
+ best_effort=None,
+ is_mock=False):
+ super(OltTrafficDescriptor, self).__init__(fixed, assured, maximum,
+ additional=additional,
+ best_effort=best_effort)
+ self.pon_id = pon_id
+ self.onu_id = onu_id
+ self.alloc_id = alloc_id
+ self._is_mock = is_mock
+
+ @staticmethod
+ def create(tcont, pon_id, onu_id, _uni_id, _ofp_port_no):
+ alloc_id = tcont.alloc_id
+ shaping_info = tcont.traffic_shaping_info
+ fixed = shaping_info.cir
+ assured = 0
+ maximum = shaping_info.pir
+
+ best_effort = None
+ # if shaping_info.add_bw_ind == openolt_pb2.InferredAdditionBWIndication_Assured:
+ # pass
+ # TODO: Support additional BW decode
+ # elif shaping_info.add_bw_ind == openolt_pb2.InferredAdditionBWIndication_BestEffort:
+ # pass
+ # additional = TrafficDescriptor.AdditionalBwEligibility.from_value(
+ # traffic_disc['additional-bw-eligibility-indicator'])
+ #
+ # if additional == TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
+ # best_effort = BestEffort(traffic_disc['maximum-bandwidth'],
+ # traffic_disc['priority'],
+ # traffic_disc['weight'])
+ # else:
+ # best_effort = None
+
+ return OltTrafficDescriptor(pon_id, onu_id, alloc_id,
+ fixed, assured, maximum, best_effort=best_effort)
+
+ @inlineCallbacks
+ def add_to_hardware(self, session):
+ # TODO: Traffic descriptors are no longer shared, save pon and onu ID to base class
+ if self._is_mock:
+ returnValue('mock')
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(self.pon_id,
+ self.onu_id,
+ self.alloc_id)
+ data = json.dumps({'traffic-descriptor': self.to_dict()})
+ name = 'tcont-td-{}-{}: {}'.format(self.pon_id, self.onu_id, self.alloc_id)
+ try:
+ results = yield session.request('PATCH', uri, data=data, name=name)
+
+ except Exception as e:
+ log.exception('traffic-descriptor', td=self, e=e)
+ raise
+
+ # TODO: Add support for best-effort sharing
+ # if self.additional_bandwidth_eligibility == \
+ # TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING:
+ # if self.best_effort is None:
+ # raise ValueError('TCONT is best-effort but does not define best effort sharing')
+ #
+ # try:
+ # results = yield self.best_effort.add_to_hardware(session)
+ #
+ # except Exception as e:
+ # log.exception('best-effort', best_effort=self.best_effort, e=e)
+ # raise
+
+ returnValue(results)