VOL-1397: Adtran-OLT - Initial containerization commit
- Need to move VERSION to base directory
Change-Id: I9d62d0607a011ce642e379fd92b35ec48b300070
diff --git a/adapters/adtran_common/__init__.py b/adapters/adtran_common/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/adapters/adtran_common/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_common/adtran_device_handler.py b/adapters/adtran_common/adtran_device_handler.py
new file mode 100644
index 0000000..79877b7
--- /dev/null
+++ b/adapters/adtran_common/adtran_device_handler.py
@@ -0,0 +1,1438 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Adtran generic VOLTHA device handler
+"""
+import argparse
+import datetime
+import shlex
+import time
+
+import structlog
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.python.failure import Failure
+
+from adapters.adtran_common.net.adtran_netconf import AdtranNetconfClient
+from adapters.adtran_common.net.adtran_rest import AdtranRestClient
+from pyvoltha.protos import third_party
+from pyvoltha.protos.common_pb2 import OperStatus, AdminState, ConnectStatus
+from pyvoltha.protos.logical_device_pb2 import LogicalDevice
+from pyvoltha.protos.openflow_13_pb2 import ofp_desc, ofp_switch_features, OFPC_PORT_STATS, \
+ OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+from pyvoltha.adapters.extensions.alarms.adapter_alarms import AdapterAlarms
+from pyvoltha.adapters.extensions.kpi.olt.olt_pm_metrics import OltPmMetrics
+from pyvoltha.common.utils.asleep import asleep
+from flow.flow_tables import DeviceFlows, DownstreamFlows
+
+_ = third_party
+
+DEFAULT_MULTICAST_VLAN = 4000
+BROADCOM_UNTAGGED_VLAN = 4091
+DEFAULT_UTILITY_VLAN = BROADCOM_UNTAGGED_VLAN
+
+_DEFAULT_RESTCONF_USERNAME = ""
+_DEFAULT_RESTCONF_PASSWORD = ""
+_DEFAULT_RESTCONF_PORT = 8081
+
+_DEFAULT_NETCONF_USERNAME = ""
+_DEFAULT_NETCONF_PASSWORD = ""
+_DEFAULT_NETCONF_PORT = 830
+
+_STARTUP_RETRY_TIMEOUT = 5 # 5 seconds delay after activate failed before we
+_DEFAULT_RESOURCE_MGR_KEY = "adtran"
+
+
+class AdtranDeviceHandler(object):
+ """
+ A device that supports the ADTRAN RESTCONF protocol for communications
+ with a VOLTHA/VANILLA managed device.
+ Port numbering guidelines for Adtran OLT devices. Derived classes may augment
+ the numbering scheme below as needed.
+
+ - Reserve port 0 for the CPU capture port. All ports to/from this port should
+ be related to messages destined to/from the OpenFlow controller.
+
+ - Begin numbering northbound ports (network facing) at port 1 contiguously.
+ Consider the northbound ports to typically be the highest speed uplinks.
+ If these ports are removable or provided by one or more slots in a chassis
+ subsystem, still reserve the appropriate amount of port numbers whether they
+ are populated or not.
+
+ - Number southbound ports (customer facing) ports next starting at the next
+ available port number. If chassis based, follow the same rules as northbound
+ ports and reserve enough port numbers.
+
+ - Number any out-of-band management ports (if any) last. It will be up to the
+ Device Adapter developer whether to expose these to openflow or not. If you do
+ not expose them, but do have the ports, still reserve the appropriate number of
+ port numbers just in case.
+ """
+ # HTTP shortcuts
+ HELLO_URI = '/restconf/adtran-hello:hello'
+
+ # RPC XML shortcuts
+ RESTART_RPC = '<system-restart xmlns="urn:ietf:params:xml:ns:yang:ietf-system"/>'
+
+ def __init__(self, **kwargs):
+ from net.pio_zmq import DEFAULT_PIO_TCP_PORT
+ from net.pon_zmq import DEFAULT_PON_AGENT_TCP_PORT
+
+ super(AdtranDeviceHandler, self).__init__()
+
+ adapter = kwargs['adapter']
+ device_id = kwargs['device-id']
+ timeout = kwargs.get('timeout', 20)
+
+ self.adapter = adapter
+ self.adapter_agent = adapter.adapter_agent
+ self.device_id = device_id
+ self.log = structlog.get_logger(device_id=device_id)
+ self.startup = None # Startup/reboot deferred
+ self.channel = None # Proxy messaging channel with 'send' method
+ self.logical_device_id = None
+ self.pm_metrics = None
+ self.alarms = None
+ self.multicast_vlans = [DEFAULT_MULTICAST_VLAN]
+ self.utility_vlan = DEFAULT_UTILITY_VLAN
+ self.mac_address = '00:13:95:00:00:00'
+ self._rest_support = None
+ self._initial_enable_complete = False
+ self.resource_mgr = None
+ self.tech_profiles = None # dict(): intf_id -> ResourceMgr.TechProfile
+
+ # Northbound and Southbound ports
+ self.northbound_ports = {} # port number -> Port
+ self.southbound_ports = {} # port number -> Port (For PON, use pon-id as key)
+ # self.management_ports = {} # port number -> Port TODO: Not currently supported
+
+ self.num_northbound_ports = None
+ self.num_southbound_ports = None
+ # self.num_management_ports = None
+
+ self.ip_address = None
+ self.host_and_port = None
+ self.timeout = timeout
+ self.restart_failure_timeout = 5 * 60 # 5 Minute timeout
+
+ # REST Client
+ self.rest_port = _DEFAULT_RESTCONF_PORT
+ self.rest_username = _DEFAULT_RESTCONF_USERNAME
+ self.rest_password = _DEFAULT_RESTCONF_PASSWORD
+ self._rest_client = None
+
+ # NETCONF Client
+ self.netconf_port = _DEFAULT_NETCONF_PORT
+ self.netconf_username = _DEFAULT_NETCONF_USERNAME
+ self.netconf_password = _DEFAULT_NETCONF_PASSWORD
+ self._netconf_client = None
+
+ # Flow entries
+ self.upstream_flows = DeviceFlows()
+ self.downstream_flows = DownstreamFlows()
+
+ self.max_nni_ports = 1 # TODO: This is a VOLTHA imposed limit in 'flow_decomposer.py
+ # and logical_device_agent.py
+
+ self.resource_manager_key = _DEFAULT_RESOURCE_MGR_KEY
+ # OMCI ZMQ Channel
+ self.pon_agent_port = DEFAULT_PON_AGENT_TCP_PORT
+ self.pio_port = DEFAULT_PIO_TCP_PORT
+
+ # Heartbeat support
+ self.heartbeat_count = 0
+ self.heartbeat_miss = 0
+ self.heartbeat_interval = 2 # TODO: Decrease before release or any scale testing
+ self.heartbeat_failed_limit = 3
+ self.heartbeat_timeout = 5
+ self.heartbeat = None
+ self.heartbeat_last_reason = ''
+
+ # Virtualized OLT Support
+ self.is_virtual_olt = False
+
+ # Installed flows
+ self._evcs = {} # Flow ID/name -> FlowEntry
+
+ def _delete_logical_device(self):
+ ldi, self.logical_device_id = self.logical_device_id, None
+
+ if ldi is None:
+ return
+
+ self.log.debug('delete-logical-device', ldi=ldi)
+
+ logical_device = self.adapter_agent.get_logical_device(ldi)
+ self.adapter_agent.delete_logical_device(logical_device)
+
+ device = self.adapter_agent.get_device(self.device_id)
+ device.parent_id = ''
+
+ # Update the logical device mapping
+ if ldi in self.adapter.logical_device_id_to_root_device_id:
+ del self.adapter.logical_device_id_to_root_device_id[ldi]
+
+ def __del__(self):
+ # Kill any startup or heartbeat defers
+
+ d, self.startup = self.startup, None
+ h, self.heartbeat = self.heartbeat, None
+
+ if d is not None and not d.called:
+ d.cancel()
+
+ if h is not None and not h.called:
+ h.cancel()
+
+ # Remove the logical device
+ self._delete_logical_device()
+
+ self.northbound_ports.clear()
+ self.southbound_ports.clear()
+
+ def __str__(self):
+ return "AdtranDeviceHandler: {}".format(self.ip_address)
+
+ @property
+ def netconf_client(self):
+ return self._netconf_client
+
+ @property
+ def rest_client(self):
+ return self._rest_client
+
+ @property
+ def evcs(self):
+ return list(self._evcs.values())
+
+ def add_evc(self, evc):
+ if self._evcs is not None and evc.name not in self._evcs:
+ self._evcs[evc.name] = evc
+
+ def remove_evc(self, evc):
+ if self._evcs is not None and evc.name in self._evcs:
+ del self._evcs[evc.name]
+
+ def parse_provisioning_options(self, device):
+ if device.ipv4_address:
+ self.ip_address = device.ipv4_address
+ self.host_and_port = '{}:{}'.format(self.ip_address,
+ self.netconf_port)
+ elif device.host_and_port:
+ self.host_and_port = device.host_and_port.split(":")
+ self.ip_address = self.host_and_port[0]
+ self.netconf_port = int(self.host_and_port[1])
+ self.adapter_agent.update_device(device)
+
+ else:
+ self.activate_failed(device, 'No IP_address field provided')
+
+ #############################################################
+ # Now optional parameters
+ def check_tcp_port(value):
+ ivalue = int(value)
+ if ivalue <= 0 or ivalue > 65535:
+ raise argparse.ArgumentTypeError("%s is a not a valid port number" % value)
+ return ivalue
+
+ def check_vid(value):
+ ivalue = int(value)
+ if ivalue < 1 or ivalue > 4094:
+ raise argparse.ArgumentTypeError("Valid VLANs are 1..4094")
+ return ivalue
+
+ parser = argparse.ArgumentParser(description='Adtran Device Adapter')
+ parser.add_argument('--nc_username', '-u', action='store', default=_DEFAULT_NETCONF_USERNAME,
+ help='NETCONF username')
+ parser.add_argument('--nc_password', '-p', action='store', default=_DEFAULT_NETCONF_PASSWORD,
+ help='NETCONF Password')
+ parser.add_argument('--nc_port', '-t', action='store', default=_DEFAULT_NETCONF_PORT,
+ type=check_tcp_port, help='NETCONF TCP Port')
+ parser.add_argument('--rc_username', '-U', action='store', default=_DEFAULT_RESTCONF_USERNAME,
+ help='REST username')
+ parser.add_argument('--rc_password', '-P', action='store', default=_DEFAULT_RESTCONF_PASSWORD,
+ help='REST Password')
+ parser.add_argument('--rc_port', '-T', action='store', default=_DEFAULT_RESTCONF_PORT,
+ type=check_tcp_port, help='RESTCONF TCP Port')
+ parser.add_argument('--zmq_port', '-z', action='store', default=DEFAULT_PON_AGENT_TCP_PORT,
+ type=check_tcp_port, help='PON Agent ZeroMQ Port')
+ parser.add_argument('--pio_port', '-Z', action='store', default=DEFAULT_PIO_TCP_PORT,
+ type=check_tcp_port, help='PIO Service ZeroMQ Port')
+ parser.add_argument('--multicast_vlan', '-M', action='store',
+ default='{}'.format(DEFAULT_MULTICAST_VLAN),
+ help='Multicast VLAN'),
+ parser.add_argument('--utility_vlan', '-B', action='store',
+ default='{}'.format(DEFAULT_UTILITY_VLAN),
+ type=check_vid, help='VLAN for Controller based upstream flows from ONUs')
+ parser.add_argument('--resource_mgr_key', '-o', action='store',
+ default=_DEFAULT_RESOURCE_MGR_KEY,
+ help='OLT Type to look up associated resource manager configuration')
+ try:
+ args = parser.parse_args(shlex.split(device.extra_args))
+
+ # May have multiple multicast VLANs
+ self.multicast_vlans = [int(vid.strip()) for vid in args.multicast_vlan.split(',')]
+
+ self.netconf_username = args.nc_username
+ self.netconf_password = args.nc_password
+ self.netconf_port = args.nc_port
+
+ self.rest_username = args.rc_username
+ self.rest_password = args.rc_password
+ self.rest_port = args.rc_port
+
+ self.pon_agent_port = args.zmq_port
+ self.pio_port = args.pio_port
+ self.resource_manager_key = args.resource_mgr_key
+
+ if not self.rest_username:
+ self.rest_username = 'NDE0NDRkNDk0ZQ==\n'. \
+ decode('base64').decode('hex')
+ if not self.rest_password:
+ self.rest_password = 'NTA0MTUzNTM1NzRmNTI0NA==\n'. \
+ decode('base64').decode('hex')
+ if not self.netconf_username:
+ self.netconf_username = 'Njg3Mzc2NzI2ZjZmNzQ=\n'. \
+ decode('base64').decode('hex')
+ if not self.netconf_password:
+ self.netconf_password = 'NDI0ZjUzNDM0Zg==\n'. \
+ decode('base64').decode('hex')
+
+ except argparse.ArgumentError as e:
+ self.activate_failed(device,
+ 'Invalid arguments: {}'.format(e.message),
+ reachable=False)
+ except Exception as e:
+ self.log.exception('option_parsing_error: {}'.format(e.message))
+
+ @inlineCallbacks
+ def activate(self, done_deferred, reconciling):
+ """
+ Activate the OLT device
+
+ :param done_deferred: (Deferred) Deferred to fire when done
+ :param reconciling: If True, this adapter is taking over for a previous adapter
+ for an existing OLT
+ """
+ self.log.info('AdtranDeviceHandler.activating', reconciling=reconciling)
+
+ if self.logical_device_id is None:
+ device = self.adapter_agent.get_device(self.device_id)
+
+ try:
+ # Parse our command line options for this device
+ self.parse_provisioning_options(device)
+
+ ############################################################################
+ # Currently, only virtual OLT (pizzabox) is supported
+ # self.is_virtual_olt = Add test for MOCK Device if we want to support it
+
+ ############################################################################
+ # Start initial discovery of NETCONF support (if any)
+ try:
+ device.reason = 'establishing NETCONF connection'
+ self.adapter_agent.update_device(device)
+
+ self.startup = self.make_netconf_connection()
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('netconf-connection', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ ############################################################################
+ # Update access information on network device for full protocol support
+ try:
+ device.reason = 'device networking validation'
+ self.adapter_agent.update_device(device)
+ self.startup = self.ready_network_access()
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('network-setup', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ ############################################################################
+ # Restconf setup
+ try:
+ device.reason = 'establishing RESTConf connections'
+ self.adapter_agent.update_device(device)
+ self.startup = self.make_restconf_connection()
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('restconf-setup', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ ############################################################################
+ # Get the device Information
+ if reconciling:
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+ else:
+ try:
+ device.reason = 'retrieving device information'
+ self.adapter_agent.update_device(device)
+ self.startup = self.get_device_info(device)
+ results = yield self.startup
+
+ device.model = results.get('model', 'unknown')
+ device.hardware_version = results.get('hardware_version', 'unknown')
+ device.firmware_version = results.get('firmware_version', 'unknown')
+ device.serial_number = results.get('serial_number', 'unknown')
+ device.images.image.extend(results.get('software-images', []))
+
+ device.root = True
+ device.vendor = results.get('vendor', 'Adtran Inc.')
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ except Exception as e:
+ self.log.exception('device-info', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ try:
+ # Enumerate and create Northbound NNI interfaces
+ device.reason = 'enumerating northbound interfaces'
+ self.adapter_agent.update_device(device)
+ self.startup = self.enumerate_northbound_ports(device)
+ results = yield self.startup
+
+ self.startup = self.process_northbound_ports(device, results)
+ yield self.startup
+
+ device.reason = 'adding northbound interfaces to adapter'
+ self.adapter_agent.update_device(device)
+
+ if not reconciling:
+ for port in self.northbound_ports.itervalues():
+ self.adapter_agent.add_port(device.id, port.get_port())
+
+ except Exception as e:
+ self.log.exception('NNI-enumeration', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ try:
+ # Enumerate and create southbound interfaces
+ device.reason = 'enumerating southbound interfaces'
+ self.adapter_agent.update_device(device)
+ self.startup = self.enumerate_southbound_ports(device)
+ results = yield self.startup
+
+ self.startup = self.process_southbound_ports(device, results)
+ yield self.startup
+
+ device.reason = 'adding southbound interfaces to adapter'
+ self.adapter_agent.update_device(device)
+
+ if not reconciling:
+ for port in self.southbound_ports.itervalues():
+ self.adapter_agent.add_port(device.id, port.get_port())
+
+ except Exception as e:
+ self.log.exception('PON_enumeration', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ # Initialize resource manager
+ self.initialize_resource_manager()
+
+ if reconciling:
+ if device.admin_state == AdminState.ENABLED:
+ if device.parent_id:
+ self.logical_device_id = device.parent_id
+ self.adapter_agent.reconcile_logical_device(device.parent_id)
+ else:
+ self.log.info('no-logical-device-set')
+
+ # Reconcile child devices
+ self.adapter_agent.reconcile_child_devices(device.id)
+ ld_initialized = self.adapter_agent.get_logical_device()
+ assert device.parent_id == ld_initialized.id, \
+ 'parent ID not Logical device ID'
+
+ else:
+ # Complete activation by setting up logical device for this OLT and saving
+ # off the devices parent_id
+ ld_initialized = self.create_logical_device(device)
+
+ ############################################################################
+ # Setup PM configuration for this device
+ if self.pm_metrics is None:
+ try:
+ device.reason = 'setting up Performance Monitoring configuration'
+ self.adapter_agent.update_device(device)
+
+ kwargs = {
+ 'nni-ports': self.northbound_ports.values(),
+ 'pon-ports': self.southbound_ports.values()
+ }
+ self.pm_metrics = OltPmMetrics(self.adapter_agent, self.device_id,
+ ld_initialized.id, grouped=True,
+ freq_override=False, **kwargs)
+
+ pm_config = self.pm_metrics.make_proto()
+ self.log.debug("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
+ except Exception as e:
+ self.log.exception('pm-setup', e=e)
+ self.activate_failed(device, e.message, reachable=False)
+
+ ############################################################################
+ # Set the ports in a known good initial state
+ if not reconciling:
+ device.reason = 'setting device to a known initial state'
+ self.adapter_agent.update_device(device)
+ try:
+ for port in self.northbound_ports.itervalues():
+ self.startup = yield port.reset()
+
+ for port in self.southbound_ports.itervalues():
+ self.startup = yield port.reset()
+
+ except Exception as e:
+ self.log.exception('port-reset', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ ############################################################################
+ # Create logical ports for all southbound and northbound interfaces
+ try:
+ device.reason = 'creating logical ports'
+ self.adapter_agent.update_device(device)
+ self.startup = self.create_logical_ports(device, ld_initialized, reconciling)
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('logical-port', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ ############################################################################
+ # Setup Alarm handler
+ device.reason = 'setting up adapter alarms'
+ self.adapter_agent.update_device(device)
+
+ self.alarms = AdapterAlarms(self.adapter_agent, device.id, ld_initialized.id)
+
+ ############################################################################
+ # Register for ONU detection
+ # self.adapter_agent.register_for_onu_detect_state(device.id)
+ # Complete device specific steps
+ try:
+ self.log.debug('device-activation-procedures')
+ device.reason = 'performing model specific activation procedures'
+ self.adapter_agent.update_device(device)
+ self.startup = self.complete_device_specific_activation(device, reconciling)
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('device-activation-procedures', e=e)
+ returnValue(self.restart_activate(done_deferred, reconciling))
+
+ # Schedule the heartbeat for the device
+ self.log.debug('starting-heartbeat')
+ self.start_heartbeat(delay=10)
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld_initialized.id
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+ self.logical_device_id = ld_initialized.id
+
+ # Start collecting stats from the device after a brief pause
+ reactor.callLater(10, self.pm_metrics.start_collector)
+
+ # Signal completion
+ self._initial_enable_complete = True
+ self.log.info('activated')
+
+ except Exception as e:
+ self.log.exception('activate', e=e)
+ if done_deferred is not None:
+ done_deferred.errback(e)
+
+ if done_deferred is not None:
+ done_deferred.callback('activated')
+
+ returnValue('activated')
+
+ def restart_activate(self, done_deferred, reconciling):
+ """
+ Startup activation failed, pause a short period of time and retry
+
+ :param done_deferred: (deferred) Deferred to fire upon completion of activation
+ :param reconciling: (bool) If true, we are reconciling after moving to a new vCore
+ """
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+ device = self.adapter_agent.get_device(self.device_id)
+ device.reason = 'Failed during {}, retrying'.format(device.reason)
+ self.adapter_agent.update_device(device)
+ self.startup = reactor.callLater(_STARTUP_RETRY_TIMEOUT, self.activate,
+ done_deferred, reconciling)
+ return 'retrying'
+
+ @inlineCallbacks
+ def ready_network_access(self):
+ # Override in device specific class if needed
+ returnValue('nop')
+
+ def activate_failed(self, device, reason, reachable=True):
+ """
+ Activation process (adopt_device) has failed.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ :param reason: (string) failure reason
+ :param reachable: (boolean) Flag indicating if device may be reachable
+ via RESTConf or NETConf even after this failure.
+ """
+ device.oper_status = OperStatus.FAILED
+ if not reachable:
+ device.connect_status = ConnectStatus.UNREACHABLE
+
+ device.reason = reason
+ self.adapter_agent.update_device(device)
+ raise Exception('Failed to activate OLT: {}'.format(device.reason))
+
+ @inlineCallbacks
+ def make_netconf_connection(self, connect_timeout=None,
+ close_existing_client=False):
+
+ if close_existing_client and self._netconf_client is not None:
+ try:
+ yield self._netconf_client.close()
+ except:
+ pass
+ self._netconf_client = None
+
+ client = self._netconf_client
+
+ if client is None:
+ if not self.is_virtual_olt:
+ client = AdtranNetconfClient(self.ip_address,
+ self.netconf_port,
+ self.netconf_username,
+ self.netconf_password,
+ self.timeout)
+ else:
+ from python.adapters.adtran.adtran_common.net.mock_netconf_client import MockNetconfClient
+ client = MockNetconfClient(self.ip_address,
+ self.netconf_port,
+ self.netconf_username,
+ self.netconf_password,
+ self.timeout)
+ if client.connected:
+ self._netconf_client = client
+ returnValue(True)
+
+ timeout = connect_timeout or self.timeout
+
+ try:
+ request = client.connect(timeout)
+ results = yield request
+ self._netconf_client = client
+ returnValue(results)
+
+ except Exception as e:
+ self.log.exception('Failed to create NETCONF Client', e=e)
+ self._netconf_client = None
+ raise
+
+ @inlineCallbacks
+ def make_restconf_connection(self, get_timeout=None):
+ client = self._rest_client
+
+ if client is None:
+ client = AdtranRestClient(self.ip_address,
+ self.rest_port,
+ self.rest_username,
+ self.rest_password,
+ self.timeout)
+
+ timeout = get_timeout or self.timeout
+
+ try:
+ request = client.request('GET', self.HELLO_URI, name='hello', timeout=timeout)
+ results = yield request
+ if isinstance(results, dict) and 'module-info' in results:
+ self._rest_client = client
+ returnValue(results)
+ else:
+ from twisted.internet.error import ConnectError
+ self._rest_client = None
+ raise ConnectError(string='Results received but unexpected data type or contents')
+ except Exception:
+ self._rest_client = None
+ raise
+
+ def create_logical_device(self, device):
+ version = device.images.image[0].version
+
+ ld = LogicalDevice(
+ # NOTE: not setting id and datapath_id will let the adapter agent pick id
+ desc=ofp_desc(mfr_desc='VOLTHA Project',
+ hw_desc=device.hardware_version,
+ sw_desc=version,
+ serial_num=device.serial_number,
+ dp_desc='n/a'),
+ switch_features=ofp_switch_features(n_buffers=256,
+ n_tables=2,
+ capabilities=(
+ OFPC_FLOW_STATS |
+ OFPC_TABLE_STATS |
+ OFPC_GROUP_STATS |
+ OFPC_PORT_STATS)),
+ root_device_id=device.id)
+
+ ld_initialized = self.adapter_agent.create_logical_device(ld,
+ dpid=self.mac_address)
+ return ld_initialized
+
+ @inlineCallbacks
+ def create_logical_ports(self, device, ld_initialized, reconciling):
+ if not reconciling:
+ # Add the ports to the logical device
+
+ for port in self.northbound_ports.itervalues():
+ lp = port.get_logical_port()
+ if lp is not None:
+ self.adapter_agent.add_logical_port(ld_initialized.id, lp)
+
+ for port in self.southbound_ports.itervalues():
+ lp = port.get_logical_port()
+ if lp is not None:
+ self.adapter_agent.add_logical_port(ld_initialized.id, lp)
+
+ # Clean up all EVCs, EVC maps and ACLs (exceptions are ok)
+ try:
+ from flow.evc import EVC
+ self.startup = yield EVC.remove_all(self.netconf_client)
+ from flow.utility_evc import UtilityEVC
+ self.startup = yield UtilityEVC.remove_all(self.netconf_client)
+
+ except Exception as e:
+ self.log.exception('evc-cleanup', e=e)
+
+ try:
+ from flow.evc_map import EVCMap
+ self.startup = yield EVCMap.remove_all(self.netconf_client)
+
+ except Exception as e:
+ self.log.exception('evc-map-cleanup', e=e)
+
+ from flow.acl import ACL
+ ACL.clear_all(device.id)
+ try:
+ self.startup = yield ACL.remove_all(self.netconf_client)
+
+ except Exception as e:
+ self.log.exception('acl-cleanup', e=e)
+
+ from flow.flow_entry import FlowEntry
+ FlowEntry.clear_all(self)
+
+ from download import Download
+ Download.clear_all(self.netconf_client)
+
+ # Start/stop the interfaces as needed. These are deferred calls
+
+ dl = []
+ for port in self.northbound_ports.itervalues():
+ try:
+ dl.append(port.start())
+ except Exception as e:
+ self.log.exception('northbound-port-startup', e=e)
+
+ for port in self.southbound_ports.itervalues():
+ try:
+ dl.append(port.start() if port.admin_state == AdminState.ENABLED else port.stop())
+
+ except Exception as e:
+ self.log.exception('southbound-port-startup', e=e)
+
+ results = yield defer.gatherResults(dl, consumeErrors=True)
+
+ returnValue(results)
+
+ @inlineCallbacks
+ def device_information(self, device):
+ """
+ Examine the various managment models and extract device information for
+ VOLTHA use
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def enumerate_northbound_ports(self, device):
+ """
+ Enumerate all northbound ports of a device. You should override
+ a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def process_northbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_northbound_ports' method.
+ You should override this method in your derived class as necessary and
+ create an NNI Port object (of your own choosing) that supports a 'get_port'
+ method. Once created, insert it into this base class's northbound_ports
+ collection.
+
+ Should you encounter a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_northbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return:
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def enumerate_southbound_ports(self, device):
+ """
+ Enumerate all southbound ports of a device. You should override
+ this method in your derived class as necessary. Should you encounter
+ a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :return: (Deferred or None).
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ @inlineCallbacks
+ def process_southbound_ports(self, device, results):
+ """
+ Process the results from the 'enumerate_southbound_ports' method.
+ You should override this method in your derived class as necessary and
+ create an Port object (of your own choosing) that supports a 'get_port'
+ method. Once created, insert it into this base class's southbound_ports
+ collection.
+
+ Should you encounter a non-recoverable error, throw an appropriate exception.
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions.
+ :param results: Results from the 'enumerate_southbound_ports' method that
+ you implemented. The type and contents are up to you to
+ :return:
+ """
+ yield defer.Deferred(lambda c: c.callback("Not Required"))
+
+ # TODO: Move some of the items below from here and the EVC to a utility class
+
+ def is_nni_port(self, port):
+ return port in self.northbound_ports
+
+ def is_uni_port(self, port):
+ raise NotImplementedError('implement in derived class')
+
+ def is_pon_port(self, port):
+ raise NotImplementedError('implement in derived class')
+
+ def is_logical_port(self, port):
+ return not self.is_nni_port(port) and not self.is_uni_port(port) and not self.is_pon_port(port)
+
+ def get_port_name(self, port):
+ raise NotImplementedError('implement in derived class')
+
+ def initialize_resource_manager(self):
+ raise NotImplementedError('implement in derived class')
+
+ @inlineCallbacks
+ def complete_device_specific_activation(self, _device, _reconciling):
+ # NOTE: Override this in your derived class for any device startup completion
+ return defer.succeed('NOP')
+
+ @inlineCallbacks
+ def disable(self):
+ """
+ This is called when a previously enabled device needs to be disabled based on a NBI call.
+ """
+ self.log.info('disabling', device_id=self.device_id)
+
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+ device.reason = 'Disabling'
+ self.adapter_agent.update_device(device)
+
+ # Drop registration for ONU detection
+ # self.adapter_agent.unregister_for_onu_detect_state(self.device.id)
+ # Suspend any active healthchecks / pings
+
+ h, self.heartbeat = self.heartbeat, None
+ try:
+ if h is not None and not h.called:
+ h.cancel()
+ except:
+ pass
+ # Update the operational status to UNKNOWN
+
+ device.oper_status = OperStatus.UNKNOWN
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Disable all child devices first
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ admin_state=AdminState.DISABLED)
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ # Remove the logical device to clear out logical device ports for any
+ # previously activated ONUs
+ self._delete_logical_device()
+
+ # Set all ports to disabled
+ self.adapter_agent.disable_all_ports(self.device_id)
+
+ dl = []
+ for port in self.northbound_ports.itervalues():
+ dl.append(port.stop())
+
+ for port in self.southbound_ports.itervalues():
+ dl.append(port.stop())
+
+ # NOTE: Flows removed before this method is called
+ # Wait for completion
+
+ self.startup = defer.gatherResults(dl, consumeErrors=True)
+ yield self.startup
+
+ if self.netconf_client:
+ self.netconf_client.close()
+
+ self._netconf_client = None
+ self._rest_client = None
+
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+ self.log.info('disabled', device_id=device.id)
+ returnValue(None)
+
+ @inlineCallbacks
+ def reenable(self, done_deferred=None):
+ """
+ This is called when a previously disabled device needs to be enabled based on a NBI call.
+ :param done_deferred: (Deferred) Deferred to fire when done
+ """
+ self.log.info('re-enabling', device_id=self.device_id)
+
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
+ if not self._initial_enable_complete:
+ # Never contacted the device on the initial startup, do 'activate' steps instead
+ self.startup = reactor.callLater(0, self.activate, done_deferred, False)
+ returnValue('activating')
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVATING
+ self.adapter_agent.update_device(device)
+
+ # Reenable any previously configured southbound ports
+ for port in self.southbound_ports.itervalues():
+ self.log.debug('reenable-pon-port', pon_id=port.pon_id)
+ port.enabled = True
+
+ # Flows should not exist on re-enable. They are re-pushed
+ if len(self._evcs):
+ self.log.warn('evcs-found', evcs=self._evcs)
+ self._evcs.clear()
+
+ try:
+ yield self.make_restconf_connection()
+
+ except Exception as e:
+ self.log.exception('adtran-hello-reconnect', e=e)
+
+ try:
+ yield self.make_netconf_connection()
+
+ except Exception as e:
+ self.log.exception('NETCONF-re-connection', e=e)
+
+ # Recreate the logical device
+ # NOTE: This causes a flow update event
+ ld_initialized = self.create_logical_device(device)
+
+ # Create logical ports for all southbound and northbound interfaces
+ try:
+ self.startup = self.create_logical_ports(device, ld_initialized, False)
+ yield self.startup
+
+ except Exception as e:
+ self.log.exception('logical-port-creation', e=e)
+
+ device = self.adapter_agent.get_device(device.id)
+ device.parent_id = ld_initialized.id
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.logical_device_id = ld_initialized.id
+
+ # update device active status now
+ self.adapter_agent.update_device(device)
+
+ # Reenable all child devices
+ self.adapter_agent.update_child_devices_state(device.id,
+ admin_state=AdminState.ENABLED)
+ # Schedule the heartbeat for the device
+ self.log.debug('starting-heartbeat')
+ self.start_heartbeat(delay=5)
+
+ self.log.info('re-enabled', device_id=device.id)
+
+ if done_deferred is not None:
+ done_deferred.callback('Done')
+
+ returnValue('reenabled')
+
+ @inlineCallbacks
+ def reboot(self):
+ """
+ This is called to reboot a device based on a NBI call. The admin state of the device
+ will not change after the reboot.
+ """
+ self.log.debug('reboot')
+
+ if not self._initial_enable_complete:
+ # Never contacted the device on the initial startup, do 'activate' steps instead
+ returnValue('failed')
+
+ # Cancel any running enable/disable/... in progress
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+ # Issue reboot command
+
+ if not self.is_virtual_olt:
+ try:
+ yield self.netconf_client.rpc(AdtranDeviceHandler.RESTART_RPC)
+
+ except Exception as e:
+ self.log.exception('NETCONF-shutdown', e=e)
+ returnValue(defer.fail(Failure()))
+
+ # self.adapter_agent.unregister_for_onu_detect_state(self.device.id)
+
+ # Update the operational status to ACTIVATING and connect status to
+ # UNREACHABLE
+
+ device = self.adapter_agent.get_device(self.device_id)
+ previous_oper_status = device.oper_status
+ previous_conn_status = device.connect_status
+ device.oper_status = OperStatus.ACTIVATING
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Update the child devices connect state to UNREACHABLE
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ connect_status=ConnectStatus.UNREACHABLE)
+
+ # Shutdown communications with OLT. Typically it takes about 2 seconds
+ # or so after the reply before the restart actually occurs
+
+ try:
+ response = yield self.netconf_client.close()
+ self.log.debug('Restart response XML was: {}'.format('ok' if response.ok else 'bad'))
+
+ except Exception as e:
+ self.log.exception('NETCONF-client-shutdown', e=e)
+
+ # Clear off clients
+
+ self._netconf_client = None
+ self._rest_client = None
+
+ # Run remainder of reboot process as a new task. The OLT then may be up in a
+ # few moments or may take 3 minutes or more depending on any self tests enabled
+
+ current_time = time.time()
+ timeout = current_time + self.restart_failure_timeout
+
+ self.startup = reactor.callLater(10, self._finish_reboot, timeout,
+ previous_oper_status,
+ previous_conn_status)
+ returnValue(self.startup)
+
+ @inlineCallbacks
+ def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
+ # Now wait until REST & NETCONF are re-established or we timeout
+
+ self.log.info('Resuming-activity',
+ remaining=timeout - time.time(), timeout=timeout, current=time.time())
+
+ if self.rest_client is None:
+ try:
+ yield self.make_restconf_connection(get_timeout=10)
+
+ except Exception:
+ self.log.debug('No RESTCONF connection yet')
+ self._rest_client = None
+
+ if self.netconf_client is None:
+ try:
+ yield self.make_netconf_connection(connect_timeout=10)
+
+ except Exception as e:
+ try:
+ if self.netconf_client is not None:
+ yield self.netconf_client.close()
+ except Exception as e:
+ self.log.exception(e.message)
+ finally:
+ self._netconf_client = None
+
+ if (self.netconf_client is None and not self.is_virtual_olt) or self.rest_client is None:
+ current_time = time.time()
+ if current_time < timeout:
+ self.startup = reactor.callLater(5, self._finish_reboot, timeout,
+ previous_oper_status,
+ previous_conn_status)
+ returnValue(self.startup)
+
+ if self.netconf_client is None and not self.is_virtual_olt:
+ self.log.error('NETCONF-restore-failure')
+ pass # TODO: What is best course of action if cannot get clients back?
+
+ if self.rest_client is None:
+ self.log.error('RESTCONF-restore-failure')
+ pass # TODO: What is best course of action if cannot get clients back?
+
+ # Pause additional 5 seconds to let allow OLT microservices to complete some more initialization
+ yield asleep(5)
+ # TODO: Update device info. The software images may have changed...
+ # Get the latest device reference
+
+ device = self.adapter_agent.get_device(self.device_id)
+ device.oper_status = previous_oper_status
+ device.connect_status = previous_conn_status
+ self.adapter_agent.update_device(device)
+
+ # Update the child devices connect state to REACHABLE
+ self.adapter_agent.update_child_devices_state(self.device_id,
+ connect_status=ConnectStatus.REACHABLE)
+ # Restart ports to previous state
+ dl = []
+
+ for port in self.northbound_ports.itervalues():
+ dl.append(port.restart())
+
+ for port in self.southbound_ports.itervalues():
+ dl.append(port.restart())
+
+ try:
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ self.log.exception('port-restart', e=e)
+
+ # Re-subscribe for ONU detection
+ # self.adapter_agent.register_for_onu_detect_state(self.device.id)
+ # Request reflow of any EVC/EVC-MAPs
+ if len(self._evcs) > 0:
+ dl = []
+ for evc in self.evcs:
+ dl.append(evc.reflow())
+
+ try:
+ yield defer.gatherResults(dl)
+ except Exception as e:
+ self.log.exception('flow-restart', e=e)
+
+ self.log.info('rebooted', device_id=self.device_id)
+ returnValue('Rebooted')
+
+ @inlineCallbacks
+ def delete(self):
+ """
+ This is called to delete a device from the PON based on a NBI call.
+ If the device is an OLT then the whole PON will be deleted.
+ """
+ self.log.info('deleting', device_id=self.device_id)
+
+ # Cancel any outstanding tasks
+
+ d, self.startup = self.startup, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+ h, self.heartbeat = self.heartbeat, None
+ try:
+ if h is not None and not h.called:
+ h.cancel()
+ except:
+ pass
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+ device.reason = 'Deleting'
+ self.adapter_agent.update_device(device)
+
+ # self.adapter_agent.unregister_for_onu_detect_state(self.device.id)
+
+ # Remove all flows from the device
+ # TODO: Create a bulk remove-all by device-id
+
+ evcs = self._evcs
+ self._evcs.clear()
+
+ for evc in evcs:
+ evc.delete() # TODO: implement bulk-flow procedures
+
+ # Remove all child devices
+ self.adapter_agent.delete_all_child_devices(self.device_id)
+
+ # Remove the logical device (should already be gone if disable came first)
+ self._delete_logical_device()
+
+ # Remove the peer references from this device
+ self.adapter_agent.delete_all_peer_references(self.device_id)
+
+ # Tell all ports to stop any background processing
+
+ for port in self.northbound_ports.itervalues():
+ port.delete()
+
+ for port in self.southbound_ports.itervalues():
+ port.delete()
+
+ self.northbound_ports.clear()
+ self.southbound_ports.clear()
+
+ # Shutdown communications with OLT
+
+ if self.netconf_client is not None:
+ try:
+ yield self.netconf_client.close()
+ except Exception as e:
+ self.log.exception('NETCONF-shutdown', e=e)
+
+ self._netconf_client = None
+
+ self._rest_client = None
+ mgr, self.resource_mgr = self.resource_mgr, None
+ if mgr is not None:
+ del mgr
+
+ self.log.info('deleted', device_id=self.device_id)
+
+ def delete_child_device(self, proxy_address):
+ self.log.debug('sending-deactivate-onu',
+ olt_device_id=self.device_id,
+ proxy_address=proxy_address)
+ try:
+ children = self.adapter_agent.get_child_devices(self.device_id)
+ for child in children:
+ if child.proxy_address.onu_id == proxy_address.onu_id and \
+ child.proxy_address.channel_id == proxy_address.channel_id:
+ self.adapter_agent.delete_child_device(self.device_id,
+ child.id,
+ onu_device=child)
+ break
+
+ except Exception as e:
+ self.log.error('adapter_agent error', error=e)
+
+ def packet_out(self, egress_port, msg):
+ raise NotImplementedError('Overload in a derived class')
+
+ def update_pm_config(self, device, pm_config):
+ # TODO: This has not been tested
+ self.log.info('update_pm_config', pm_config=pm_config)
+ self.pm_metrics.update(pm_config)
+
+ @inlineCallbacks
+ def get_device_info(self, device):
+ """
+ Perform an initial network operation to discover the device hardware
+ and software version. Serial Number would be helpful as well.
+
+ Upon successfully retrieving the information, remember to call the
+ 'start_heartbeat' method to keep in contact with the device being managed
+
+ :param device: A voltha.Device object, with possible device-type
+ specific extensions. Such extensions shall be described as part of
+ the device type specification returned by device_types().
+ """
+ device = {}
+ returnValue(device)
+
+ def start_heartbeat(self, delay=10):
+ assert delay > 1, 'Minimum heartbeat is 1 second'
+ self.log.info('Starting-Device-Heartbeat ***')
+ self.heartbeat = reactor.callLater(delay, self.check_pulse)
+ return self.heartbeat
+
+ def check_pulse(self):
+ if self.logical_device_id is not None:
+ try:
+ self.heartbeat = self.rest_client.request('GET', self.HELLO_URI,
+ name='hello', timeout=5)
+ self.heartbeat.addCallbacks(self._heartbeat_success, self._heartbeat_fail)
+
+ except Exception as e:
+ self.heartbeat = reactor.callLater(5, self._heartbeat_fail, e)
+
+ def on_heatbeat_alarm(self, active):
+ if active and self.netconf_client is None or not self.netconf_client.connected:
+ self.make_netconf_connection(close_existing_client=True)
+
+ def heartbeat_check_status(self, _):
+ """
+ Check the number of heartbeat failures against the limit and emit an alarm if needed
+ """
+ device = self.adapter_agent.get_device(self.device_id)
+
+ try:
+ from pyvoltha.adapters.extensions.alarms.heartbeat_alarm import HeartbeatAlarm
+
+ if self.heartbeat_miss >= self.heartbeat_failed_limit:
+ if device.connect_status == ConnectStatus.REACHABLE:
+ self.log.warning('heartbeat-failed', count=self.heartbeat_miss)
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.FAILED
+ device.reason = self.heartbeat_last_reason
+ self.adapter_agent.update_device(device)
+ HeartbeatAlarm(self.alarms, 'olt', self.heartbeat_miss).raise_alarm()
+ self.on_heatbeat_alarm(True)
+ else:
+ # Update device states
+ if device.connect_status != ConnectStatus.REACHABLE:
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+ HeartbeatAlarm(self.alarms, 'olt').clear_alarm()
+ self.on_heatbeat_alarm(False)
+
+ if self.netconf_client is None or not self.netconf_client.connected:
+ self.make_netconf_connection(close_existing_client=True)
+
+ except Exception as e:
+ self.log.exception('heartbeat-check', e=e)
+
+ # Reschedule next heartbeat
+ if self.logical_device_id is not None:
+ self.heartbeat_count += 1
+ self.heartbeat = reactor.callLater(self.heartbeat_interval, self.check_pulse)
+
+ def _heartbeat_success(self, results):
+ self.log.debug('heartbeat-success')
+ self.heartbeat_miss = 0
+ self.heartbeat_last_reason = ''
+ self.heartbeat_check_status(results)
+
+ def _heartbeat_fail(self, failure):
+ self.heartbeat_miss += 1
+ self.log.info('heartbeat-miss', failure=failure,
+ count=self.heartbeat_count,
+ miss=self.heartbeat_miss)
+ self.heartbeat_last_reason = 'RESTCONF connectivity error'
+ self.heartbeat_check_status(None)
+
+ @staticmethod
+ def parse_module_revision(revision):
+ try:
+ return datetime.datetime.strptime(revision, '%Y-%m-%d')
+ except Exception:
+ return None
+
+ def remove_from_flow_table(self, _flows):
+ """
+ Remove flows from the device
+ :param _flows: (list) Flows
+ """
+ raise NotImplementedError()
+
+ def add_to_flow_table(self, _flows):
+ """
+ Remove flows from the device
+ :param _flows: (list) Flows
+ """
+ raise NotImplementedError()
+
+ def process_inter_adapter_message(self, msg):
+ """
+ Called when the adapter receives a message that was sent to it directly
+ from another adapter. An adapter is automatically registered for these
+ messages when creating the inter-container kafka proxy. Note that it is
+ the responsibility of the sending and receiving adapters to properly encode
+ and decode the message.
+ :param msg: Proto Message (any)
+ :return: Proto Message Response
+ """
+ raise NotImplementedError()
+
+ def get_ofp_device_info(self, device):
+ """
+ Retrieve the OLT device info. This includes the ofp_desc and
+ ofp_switch_features. The existing ofp structures can be used,
+ or all the attributes get added to the Device definition or a new proto
+ definition gets created. This API will allow the Core to create a
+ LogicalDevice associated with this device (OLT only).
+ :param device: device
+ :return: Proto Message (TBD)
+ """
+ raise NotImplementedError()
+
+ def get_ofp_port_info(self, device, port_no):
+ """
+ Retrieve the port info. This includes the ofp_port. The existing ofp
+ structure can be used, or all the attributes get added to the Port
+ definitions or a new proto definition gets created. This API will allow
+ the Core to create a LogicalPort associated with this device.
+ :param device: device
+ :param port_no: port number
+ :return: Proto Message (TBD)
+ """
+ raise NotImplementedError()
diff --git a/adapters/adtran_common/download.py b/adapters/adtran_common/download.py
new file mode 100644
index 0000000..8207a99
--- /dev/null
+++ b/adapters/adtran_common/download.py
@@ -0,0 +1,523 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import xmltodict
+from twisted.internet import reactor
+from twisted.internet.defer import returnValue, inlineCallbacks
+from pyvoltha.protos.device_pb2 import ImageDownload
+from pyvoltha.protos.common_pb2 import AdminState
+
+log = structlog.get_logger()
+
+# TODO: Following two would be good provisionable parameters
+DEFAULT_AUTO_AGE_MINUTES = 10
+DEFAULT_MAX_JOB_RUN_SECONDS = 3600 * 4 # Some OLT files are 250MB+
+
+
+class Download(object):
+ """Class to wrap an image download"""
+
+ def __init__(self, handler, request, protocols):
+ self._handler = handler
+ self._deferred = None
+ self.device_id = request.id
+ self._name = request.name
+ self._url = request.url
+ self._crc = request.crc
+ self._version = request.image_version
+ self._local = request.local_dir
+ self._save_config = request.save_config
+ self._supported_protocols = protocols
+
+ self._download_state = ImageDownload.DOWNLOAD_UNKNOWN
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._image_state = ImageDownload.IMAGE_UNKNOWN
+ self._additional_info = ''
+ self._downloaded_octets = 0
+
+ # Server profile info
+ self._server_profile_name = None
+ self._scheme = None
+ self._host = ''
+ self._port = None
+ self._path = ''
+ self._auth = None
+
+ # Download job info
+ self._download_job_name = None
+
+ self._age_out_period = DEFAULT_AUTO_AGE_MINUTES
+ self._max_execution = DEFAULT_MAX_JOB_RUN_SECONDS
+
+ def __str__(self):
+ return "ImageDownload: {}".format(self.name)
+
+ @staticmethod
+ def create(handler, request, supported_protocols):
+ """
+ Create and start a new image download
+
+ :param handler: (AdtranDeviceHandler) Device download is for
+ :param request: (ImageDownload) Request
+ :param supported_protocols: (list) download methods allowed (http, tftp, ...)
+ """
+ download = Download(handler, request, supported_protocols)
+ download._deferred = reactor.callLater(0, download.start_download)
+
+ return download
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def download_state(self):
+ return self._download_state
+
+ @property
+ def failure_reason(self):
+ return self._failure_reason
+
+ @property
+ def image_state(self):
+ return self._image_state
+
+ @property
+ def additional_info(self):
+ return self._additional_info
+
+ @property
+ def downloaded_bytes(self):
+ return self._downloaded_octets
+
+ @property
+ def profile_name(self):
+ return self._server_profile_name
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception as e:
+ pass
+
+ @inlineCallbacks
+ def start_download(self):
+ import uuid
+ log.info('download-start', name=self.name)
+ if not self.parse_url():
+ self._download_failed()
+ returnValue('failed url parsing')
+
+ self._download_state = ImageDownload.DOWNLOAD_STARTED
+ self._failure_reason = ImageDownload.NO_ERROR
+
+ ##############################################################
+ # Configure the file server profile
+ try:
+ self._additional_info = 'Configuring Download Server profile'
+ self._server_profile_name = 'VOLTHA.download.{}'.format(uuid.uuid4())
+ profile = self.server_profile_xml
+ yield self._handler.netconf_client.edit_config(profile)
+
+ except Exception as e:
+ log.exception('server-profile', e=e)
+ self._server_profile_name = None
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._additional_info += ': Failure: {}'.format(e.message)
+ self._download_failed()
+ raise
+
+ ##############################################################
+ # Configure the software download maintenance job
+ try:
+ self._additional_info = 'Configuring Image Download Job'
+ self._download_job_name = 'VOLTHA.download.{}'.format(uuid.uuid4())
+ job = self.download_job_xml
+ yield self._handler.netconf_client.edit_config(job)
+
+ except Exception as e:
+ log.exception('server-profile', e=e)
+ self._download_job_name = None
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._additional_info += ': Failure: {}'.format(e.message)
+ self._download_failed()
+ raise
+
+ ##############################################################
+ # Schedule a task to monitor the download
+ try:
+ self._additional_info = 'Monitoring download status'
+ self._deferred = reactor.callLater(0.5, self.monitor_download_status)
+
+ except Exception as e:
+ log.exception('server-profile', e=e)
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._additional_info += ': Failure: {}'.format(e.message)
+ self._download_failed()
+ raise
+
+ returnValue('started')
+
+ def parse_url(self):
+ from urllib3 import util, exceptions
+ try:
+ results = util.parse_url(self._url)
+
+ # Server info
+ self._scheme = results.scheme.lower()
+ if self._scheme not in self._supported_protocols:
+ self._failure_reason = ImageDownload.INVALID_URL
+ self._additional_info = "Unsupported file transfer protocol: {}".format(results.scheme)
+ return False
+
+ self._host = results.host
+ self._port = results.port
+ self._path = results.path
+ self._auth = results.auth
+ return True
+
+ except exceptions.LocationValueError as e:
+ self._failure_reason = ImageDownload.INVALID_URL
+ self._additional_info = e.message
+ return False
+
+ except Exception as e:
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._additional_info = e.message
+ return False
+
+ @property
+ def server_profile_xml(self):
+ assert self._scheme in ['http', 'https', 'ftp', 'sftp', 'tftp'], 'Invalid protocol'
+
+ xml = """
+ <file-servers xmlns="http://www.adtran.com/ns/yang/adtran-file-servers">
+ <profiles>
+ <profile>"""
+
+ xml += '<name>{}</name>'.format(self._server_profile_name)
+ xml += '<connection-profile>'
+ xml += ' <host>{}</host>'.format(self._host)
+ xml += ' <port>{}</port>'.format(self._port) if self._port is not None else '<use-standard-port/>'
+
+ if self._scheme in ['http', 'https']:
+ xml += ' <protocol '
+ xml += 'xmlns:adtn-file-srv-https="http://www.adtran.com/ns/yang/adtran-file-servers-https">' +\
+ 'adtn-file-srv-https:{}'.format(self._scheme)
+ xml += ' </protocol>'
+
+ elif self._scheme == 'sftp':
+ xml += ' <protocol '
+ xml += 'xmlns:adtn-file-srv-sftp="http://www.adtran.com/ns/yang/adtran-file-servers-sftp">' +\
+ 'adtn-file-srv-sftp:sftp'
+ xml += ' </protocol>'
+
+ elif self._scheme in ['ftp', 'tftp']:
+ xml += '<protocol>adtn-file-srv:{}</protocol>'.format(self._scheme)
+
+ if self._auth is not None:
+ user_pass = self._auth.split(':')
+ xml += '<username>{}</username>'.format(user_pass[0])
+ xml += '<password>$0${}</password>'.format("".join(user_pass[1:]))
+ # And the trailer
+ xml += """
+ </connection-profile>
+ </profile>
+ </profiles>
+ </file-servers>
+ """
+ return xml
+
+ @property
+ def download_job_xml(self):
+ # TODO: May want to support notifications
+ # TODO: Not sure about this name for the entity
+ entity = 'main 0'
+ xml = """
+ <maintenance-jobs xmlns="http://www.adtran.com/ns/yang/adtran-maintenance-jobs" xmlns:adtn-phys-sw-mnt="http://www.adtran.com/ns/yang/adtran-physical-software-maintenance">
+ <maintenance-job>
+ <name>{}</name>
+ <enabled>true</enabled>
+ <notify-enabled>false</notify-enabled>
+ <maximum-execution-time>{}</maximum-execution-time>
+ <run-once>true</run-once>
+ <adtn-phys-sw-mnt:download-software>
+ <adtn-phys-sw-mnt:physical-entity>{}</adtn-phys-sw-mnt:physical-entity>
+ <adtn-phys-sw-mnt:software-name>software</adtn-phys-sw-mnt:software-name>
+ <adtn-phys-sw-mnt:remote-file>
+ <adtn-phys-sw-mnt:file-server-profile>{}</adtn-phys-sw-mnt:file-server-profile>
+ <adtn-phys-sw-mnt:filename>{}</adtn-phys-sw-mnt:filename>
+ """.format(self._download_job_name, self._max_execution, entity,
+ self._server_profile_name, self._name)
+
+ if self._path is not None:
+ xml += """
+ <adtn-phys-sw-mnt:filepath>{}</adtn-phys-sw-mnt:filepath>
+ """.format(self._path)
+
+ xml += """
+ </adtn-phys-sw-mnt:remote-file>
+ </adtn-phys-sw-mnt:download-software>
+ </maintenance-job>
+ </maintenance-jobs>
+ """
+ return xml
+
+ @property
+ def download_status_xml(self):
+ xml = """
+ <filter>
+ <maintenance-jobs-state xmlns="http://www.adtran.com/ns/yang/adtran-maintenance-jobs">
+ <maintenance-job>
+ <name>{}</name>
+ </maintenance-job>
+ </maintenance-jobs-state>
+ </filter>
+ """.format(self._download_job_name)
+ return xml
+
+ @property
+ def delete_server_profile_xml(self):
+ xml = """
+ <file-servers xmlns="http://www.adtran.com/ns/yang/adtran-file-servers">
+ <profiles operation="delete">
+ <profile>
+ <name>{}</name>
+ </profile>
+ </profiles>
+ </file-servers>
+ """.format(self._server_profile_name)
+ return xml
+
+ @property
+ def delete_download_job_xml(self):
+ xml = """
+ <maintenance-jobs xmlns="http://www.adtran.com/ns/yang/adtran-maintenance-jobs">
+ <maintenance-job operation="delete">>
+ <name>{}</name>
+ </maintenance-job>
+ </maintenance-jobs>
+ """.format(self._download_job_name)
+ return xml
+
+ @inlineCallbacks
+ def monitor_download_status(self):
+ log.debug('monitor-download', name=self.name)
+ try:
+ results = yield self._handler.netconf_client.get(self.download_status_xml)
+
+ result_dict = xmltodict.parse(results.data_xml)
+ entries = result_dict['data']['maintenance-jobs-state']['maintenance-job']
+
+ name = entries.get('name')
+ assert name == self._download_job_name, 'The job status name does not match. {} != {}'.format(name, self.name)
+ self._download_state = self.monitor_state_to_download_state(entries['state']['#text'])
+
+ completed = entries['timestamps'].get('completed-timestamp')
+ started = entries['timestamps'].get('start-timestamp')
+
+ if self._download_state == ImageDownload.DOWNLOAD_FAILED:
+ self._failure_reason = ImageDownload.UNKNOWN_ERROR
+ self._additional_info = entries['error'].get('error-message')
+
+ elif self._download_state == ImageDownload.INSUFFICIENT_SPACE:
+ self._failure_reason = ImageDownload.INSUFFICIENT_SPACE
+ self._additional_info = entries['error'].get('error-message')
+
+ elif self._download_state == ImageDownload.DOWNLOAD_STARTED:
+ self._failure_reason = ImageDownload.NO_ERROR
+ self._additional_info = 'Download started at {}'.format(started)
+
+ elif self._download_state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ self._failure_reason = ImageDownload.NO_ERROR
+ self._additional_info = 'Download completed at {}'.format(completed)
+ else:
+ raise NotImplemented('Unsupported state')
+
+ done = self._download_state in [ImageDownload.DOWNLOAD_FAILED,
+ ImageDownload.DOWNLOAD_SUCCEEDED,
+ ImageDownload.INSUFFICIENT_SPACE]
+
+ except Exception as e:
+ log.exception('protocols', e=e)
+ done = False
+
+ if not done:
+ self._deferred = reactor.callLater(1, self.monitor_download_status)
+
+ returnValue('done' if done else 'not-done-yet')
+
+ def _download_failed(self):
+ log.info('download-failed', name=self.name)
+
+ self._cancel_deferred()
+ self._download_state = ImageDownload.DOWNLOAD_FAILED
+
+ # Cleanup NETCONF
+ reactor.callLater(0, self._cleanup_download_job, 20)
+ reactor.callLater(0, self._cleanup_server_profile, 20)
+ # TODO: Do we signal any completion due to failure?
+
+ def _download_complete(self):
+ log.info('download-completed', name=self.name)
+
+ self._cancel_deferred()
+ self._download_state = ImageDownload.DOWNLOAD_SUCCEEDED
+ self._downloaded_octets = 123456
+ self._failure_reason = ImageDownload.NO_ERROR
+
+ reactor.callLater(0, self._cleanup_download_job, 20)
+ reactor.callLater(0, self._cleanup_server_profile, 20)
+ # TODO: How do we signal completion?
+
+ device = self._handler.adapter_agent.get_device(self.device_id)
+ if device is not None:
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self._handler.adapter_agent.update_device(device)
+
+ def cancel_download(self, request):
+ log.info('cancel-sw-download', name=self.name)
+
+ self._cancel_deferred()
+
+ try:
+ # initiate cancelling software download to device at success
+ # delete image download record
+
+ self._handler.adapter_agent.delete_image_download(request)
+
+ device = self._handler.adapter_agent.get_device(self.device_id)
+ if device is not None:
+ # restore admin state to enabled
+ device.admin_state = AdminState.ENABLED
+ self._handler.adapter_agent.update_device(device)
+
+ except Exception as e:
+ log.exception(e.message)
+
+ reactor.callLater(0, self._cleanup_download_job, 20)
+ reactor.callLater(0, self._cleanup_server_profile, 20)
+
+ @inlineCallbacks
+ def _cleanup_server_profile(self, retries, attempt=1):
+ log.info('cleanup-server', name=self.name,
+ profile=self._server_profile_name,
+ attempt=attempt, remaining=retries)
+
+ if self._server_profile_name is not None:
+ try:
+ profile = self.delete_server_profile_xml
+ yield self._handler.netconf_client.edit_config(profile)
+ self._server_profile_name = None
+
+ except Exception as e:
+ log.exception(e.message)
+ if retries > 0:
+ reactor.callLater(attempt * 60, self._cleanup_download_job,
+ retries - 1, attempt + 1)
+
+ @inlineCallbacks
+ def _cleanup_download_job(self, retries, attempt=1):
+ log.info('cleanup-download', name=self.name,
+ profile=self._download_job_name,
+ attempt=attempt, remaining=retries)
+
+ if self._download_job_name is not None:
+ try:
+ job = self.delete_download_job_xml
+ yield self._handler.netconf_client.edit_config(job)
+ self._download_job_name = None
+
+ except Exception as e:
+ log.exception(e.message)
+ if retries > 0:
+ reactor.callLater(attempt * 60, self._cleanup_download_job,
+ retries - 1, attempt + 1)
+
+ @inlineCallbacks
+ def activate_image(self):
+ log.info('download-activate', name=self.name)
+
+ if self._download_state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ pass # TODO: Implement
+ self._image_state = ImageDownload.IMAGE_ACTIVE
+
+ returnValue('TODO: Implement this')
+
+ @inlineCallbacks
+ def revert_image(self):
+ log.info('download-revert', name=self.name)
+
+ if self._download_state == ImageDownload.DOWNLOAD_SUCCEEDED:
+ pass # TODO: Implement
+ self._image_state = ImageDownload.IMAGE_INACTIVE
+
+ returnValue('TODO: Implement this')
+
+ def monitor_state_to_download_state(self, state):
+ if ':' in state:
+ state = state.split(':')[-1]
+ result = {
+ 'downloading-software': ImageDownload.DOWNLOAD_STARTED, # currently downloading software
+ 'storing-software': ImageDownload.DOWNLOAD_STARTED, # successfully downloaded the required software and is storing it to memory
+ 'software-stored': ImageDownload.DOWNLOAD_SUCCEEDED, # successfully downloaded the required software and has stored it successfully to memory
+ 'software-download-failed': ImageDownload.DOWNLOAD_FAILED, # unsuccessfully attemptedto download the required software
+ 'invalid-software': ImageDownload.DOWNLOAD_FAILED, # successfully downloaded the required software but the software was determined to be invalid
+ 'software-storage-failed': ImageDownload.INSUFFICIENT_SPACE, # successfully downloaded the required software but was unable to successfully stored it to memory
+ }.get(state.lower(), None)
+ log.info('download-software-state', result=result, state=state, name=self.name)
+ assert result is not None, 'Invalid state'
+ return result
+
+ def monitor_state_to_activate_state(self, state):
+ if ':' in state:
+ state = state.split(':')[-1]
+ result = {
+ 'enabling-software': ImageDownload.IMAGE_ACTIVATE, # currently enabling the software
+ 'software-enabled': ImageDownload.IMAGE_ACTIVE, # successfully enabled the required software
+ 'enable-software-failed': ImageDownload.IMAGE_INACTIVE, # unsuccessfully attempted to enable the required software revision
+ 'activating-software': ImageDownload.IMAGE_ACTIVATE, # currently activating the software
+ 'software-activated': ImageDownload.IMAGE_ACTIVE, # successfully activated the required software. The job terminated successfully
+ 'activate-software-failed': ImageDownload.IMAGE_INACTIVE, # unsuccessfully attempted to activate the required software revision
+ 'committing-software': ImageDownload.IMAGE_ACTIVATE, # currently committing the software
+ 'software-committed': ImageDownload.IMAGE_ACTIVATE, # successfully committed the required software. The job terminated successfully
+ 'commit-software-failed': ImageDownload.IMAGE_INACTIVE, # unsuccessfully attempted to commit the required software revision
+ }.get(state.lower(), None)
+ log.info('download-activate-state', result=result, state=state, name=self.name)
+ assert result is not None, 'Invalid state'
+ return result
+
+ @staticmethod
+ def clear_all(client):
+ """
+ Remove all file server profiles and download jobs
+ :param client: (ncclient) NETCONF Client to use
+ """
+ from twisted.internet import defer
+ del_fs_xml = """
+ <file-servers xmlns="http://www.adtran.com/ns/yang/adtran-file-servers">
+ <profiles operation="delete"/>
+ </file-servers>
+ """
+ del_job_xml = """
+ <maintenance-jobs operation="delete" xmlns="http://www.adtran.com/ns/yang/adtran-maintenance-jobs"/>
+ """
+ dl = [client.edit_config(del_fs_xml, ignore_delete_error=True),
+ client.edit_config(del_job_xml, ignore_delete_error=True)]
+
+ return defer.gatherResults(dl, consumeErrors=True)
diff --git a/adapters/adtran_common/flow/__init__.py b/adapters/adtran_common/flow/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/adapters/adtran_common/flow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_common/flow/acl.py b/adapters/adtran_common/flow/acl.py
new file mode 100644
index 0000000..67f8c08
--- /dev/null
+++ b/adapters/adtran_common/flow/acl.py
@@ -0,0 +1,385 @@
+#
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import xmltodict
+import re
+import structlog
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+
+log = structlog.get_logger()
+
+_acl_list = {} # Key -> device-id -> Name: List of encoded EVCs
+
+ACL_NAME_FORMAT = 'VOLTHA-ACL-{}-{}' # format(flow_entry.flow_id, flow-entry-hash)
+ACL_NAME_REGEX_ALL = 'VOLTHA-ACL-*'
+ACE_NAME_FORMAT = 'VOLTHA-ACE-{}' # format(flow_entry.flow_id)
+
+
+class ACL(object):
+ """
+ Class to wrap Trap-to-Controller functionality
+ """
+ def __init__(self, flow_entry):
+ self._installed = False
+ self._status_message = None
+ self._parent = flow_entry # FlowEntry parent
+ self._flow = flow_entry.flow
+ self._handler = flow_entry.handler
+ self._name = ACL.flow_to_name(flow_entry)
+ self._rule_name = ACL.flow_to_ace_name(flow_entry)
+ self._eth_type = flow_entry.eth_type
+ self._ip_protocol = flow_entry.ip_protocol
+ self._ipv4_dst = flow_entry.ipv4_dst
+ self._src_port = flow_entry.udp_src
+ self._dst_port = flow_entry.udp_dst
+ self._exception = False
+ self._enabled = True
+ self._valid = self._decode()
+
+ def __str__(self):
+ return 'ACL: {}, Installed: {}, L2: {}, L3/4: {}'.\
+ format(self.name, self._installed, self.is_l2_exception,
+ self.is_l3_l4_exception)
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def is_l2_exception(self):
+ from flow_entry import FlowEntry
+ return self._eth_type not in (None,
+ FlowEntry.EtherType.IPv4,
+ FlowEntry.EtherType.IPv6)
+
+ @property
+ def is_l3_l4_exception(self):
+ return not self.is_l2_exception and self._ip_protocol is not None
+
+ @staticmethod
+ def _xml_header(operation=None):
+ return '<access-lists xmlns="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list"\
+ xmlns:adtn-ietf-ns-acl="http://www.adtran.com/ns/yang/adtran-ietf-ns-access-control-list"><acl{}>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
+
+ @staticmethod
+ def _xml_trailer():
+ return '</acl></access-lists>'
+
+ def _xml_action(self):
+ xml = '<actions>'
+ if self._exception:
+ xml += '<adtn-ietf-ns-acl:exception-to-cpu/>'
+ else:
+ xml += '<permit/>'
+ xml += '</actions>'
+ return xml
+
+ def _ace_l2(self):
+ xml = '<ace>'
+ xml += '<rule-name>{}</rule-name>'.format(self._rule_name)
+ xml += '<matches><l2-acl><ether-type>{:04x}</ether-type></l2-acl></matches>'.format(self._eth_type)
+ xml += self._xml_action()
+ xml += '</ace>'
+ return xml
+
+ def _ace_l2_l3_ipv4(self):
+ xml = '<ace>'
+ xml += '<rule-name>{}</rule-name>'.format(self._rule_name)
+ xml += '<matches><l2-l3-ipv4-acl>'
+ xml += '<ether-type>{:04X}</ether-type>'.format(self._eth_type)
+
+ if self._ip_protocol is not None:
+ xml += '<protocol>{}</protocol>'.format(self._ip_protocol)
+ if self._ipv4_dst is not None:
+ xml += '<destination-ipv4-network>{}/32</destination-ipv4-network>'.format(self._ipv4_dst)
+ if self._src_port is not None:
+ xml += '<source-port-range><lower-port>{}</lower-port><operation>eq</operation></source-port-range>'.\
+ format(self._src_port)
+ if self._dst_port is not None:
+ xml += '<destination-port-range><lower-port>' + \
+ '{}</lower-port><operations>eq</operations></destination-port-range>'.format(self._dst_port)
+
+ xml += '</l2-l3-ipv4-acl></matches>'
+ xml += self._xml_action()
+ xml += '</ace>'
+ return xml
+
+ def _ace_any(self):
+ xml = '<ace>'
+ xml += '<rule-name>{}</rule-name>'.format(self._rule_name)
+ xml += '<matches><any-acl/></matches>'
+ xml += self._xml_action()
+ xml += '</ace>'
+ return xml
+
+ def _acl_eth(self):
+ xml = '<acl-type>eth-acl</acl-type>'
+ xml += '<acl-name>{}</acl-name>'.format(self._name)
+ return xml
+
+ def _acl_l4(self):
+ xml = '<acl-type>mixed-l2-l3-ipv4-acl</acl-type>'
+ xml += '<acl-name>{}</acl-name>'.format(self._name)
+ return xml
+
+ def _acl_any(self):
+ xml = '<acl-type>any-acl</acl-type>'
+ xml += '<acl-name>{}</acl-name>'.format(self._name)
+ return xml
+
+ def _install_xml(self):
+ xml = ACL._xml_header('create')
+ if self.is_l2_exception:
+ xml += self._acl_eth()
+ xml += '<aces>{}</aces>'.format(self._ace_l2())
+ elif self.is_l3_l4_exception:
+ xml += self._acl_l4()
+ xml += '<aces>{}</aces>'.format(self._ace_l2_l3_ipv4())
+ else:
+ xml += self._acl_any()
+ xml += '<aces>{}</aces>'.format(self._ace_any())
+
+ xml += ACL._xml_trailer()
+ return xml
+
+ def _remove_xml(self):
+ xml = ACL._xml_header('delete')
+ if self.is_l2_exception:
+ xml += self._acl_eth()
+ elif self.is_l3_l4_exception:
+ xml += self._acl_l4()
+ else:
+ xml += self._acl_any()
+ xml += ACL._xml_trailer()
+ return xml
+
+ def evc_map_ingress_xml(self):
+ """ Individual ACL specific XML for the EVC MAP """
+
+ xml = '<adtn-evc-map-acl:acl-type '
+ fmt = 'xmlns:adtn-ietf-acl="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list">adtn-ietf-acl:{}'\
+ '</adtn-evc-map-acl:acl-type>'
+
+ if self.is_l2_exception:
+ xml += fmt.format('eth-acl')
+
+ elif self.is_l3_l4_exception:
+ xml += fmt.format('mixed-l2-l3-ipv4-acl')
+
+ else:
+ xml += fmt.format('any-acl')
+
+ xml += '<adtn-evc-map-acl:acl-name>{}</adtn-evc-map-acl:acl-name>'.format(self.name)
+ return xml
+
+ @staticmethod
+ def create(flow_entry):
+ acl = ACL(flow_entry)
+
+ # Already created and installed, return that one
+ acls_installed = _acl_list.get(flow_entry.handler.device_id)
+ if acls_installed is not None:
+ entry = acls_installed.get(acl._name)
+ if entry is not None:
+ return entry
+
+ return acl
+
+ @staticmethod
+ def flow_to_name(flow_entry):
+ return ACL_NAME_FORMAT.format(flow_entry.flow_id, ACL.acl_hash(flow_entry))
+
+ @staticmethod
+ def flow_to_ace_name(flow_entry):
+ return ACE_NAME_FORMAT.format(flow_entry.flow_id)
+
+ @staticmethod
+ def acl_hash(flow_entry):
+ from hashlib import md5
+ in_port = flow_entry.in_port or 0
+ eth_type = flow_entry.eth_type or 0
+ ip_protocol = flow_entry.ip_protocol or 0
+ ipv4_dst = flow_entry.ipv4_dst or 0
+ src_port = flow_entry.udp_src or 0
+ dst_port = flow_entry.udp_dst or 0
+ hex_string = md5('{},{},{},{},{},{}'.format(in_port, eth_type, ip_protocol,
+ ipv4_dst, src_port, dst_port)).hexdigest()
+ return hex_string
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def status(self):
+ return self._status_message
+
+ @inlineCallbacks
+ def install(self):
+ log.debug('installing-acl', installed=self._installed)
+
+ if not self._installed and self._enabled:
+ if self._handler.device_id not in _acl_list:
+ _acl_list[self._handler.device_id] = {}
+
+ acls_installed = _acl_list[self._handler.device_id]
+ if self._name in acls_installed:
+ # Return OK
+ returnValue(self._enabled)
+
+ try:
+ acl_xml = self._install_xml()
+ log.debug('install-xml', xml=acl_xml, name=self._name)
+
+ results = yield self._handler.netconf_client.edit_config(acl_xml)
+ self._installed = results.ok
+ self._status_message = '' if results.ok else results.error
+
+ if self._installed:
+ acls_installed[self._name] = self
+
+ except Exception as e:
+ log.exception('install-failure', name=self._name, e=e)
+ raise
+
+ returnValue(self._installed and self._enabled)
+
+ @inlineCallbacks
+ def remove(self):
+ log.debug('removing-acl', installed=self._installed)
+
+ if self._installed:
+ acl_xml = self._remove_xml()
+ log.info('remove-xml', xml=acl_xml, name=self._name)
+
+ results = yield self._handler.netconf_client.edit_config(acl_xml)
+ self._installed = not results.ok
+ self._status_message = '' if results.ok else results.error
+
+ if not self._installed:
+ acls_installed = _acl_list.get(self._handler.device_id)
+ if acls_installed is not None and self._name in acls_installed:
+ del acls_installed[self._name]
+
+ returnValue(not self._installed)
+
+ def enable(self):
+ if not self._enabled:
+ self._enabled = False
+ raise NotImplemented("TODO: Implement this")
+
+ def disable(self):
+ if self._enabled:
+ self._enabled = True
+ raise NotImplemented("TODO: Implement this")
+
+ def _decode(self):
+ """
+ Examine the field settings and set ACL up for requested fields
+ """
+ # If EtherType is not None and not IP, this is an L2 exception
+ self._exception = self.is_l2_exception or self.is_l3_l4_exception
+ return True
+
+ # BULK operations
+
+ @staticmethod
+ def enable_all():
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def disable_all():
+ raise NotImplemented("TODO: Implement this")
+
+ @staticmethod
+ def clear_all(device_id):
+ """
+ Clear all acls for this device id from the list
+ :param device_id: id of the device
+ """
+ if device_id in _acl_list:
+ del _acl_list[device_id]
+
+ @staticmethod
+ def remove_all(client, regex_=ACL_NAME_REGEX_ALL):
+ """
+ Remove all matching ACLs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ # Do a 'get' on the evc config an you should get the names
+ get_xml = """
+ <filter>
+ <access-lists xmlns="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list">
+ <acl><acl-type/><acl-name/></acl>
+ </access-lists>
+ </filter>
+ """
+ log.debug('query', xml=get_xml, regex=regex_)
+
+ def request_failed(results, operation):
+ log.error('{}-failed'.format(operation), results=results)
+
+ def delete_complete(results):
+ log.debug('delete-complete', results=results)
+
+ def do_delete(rpc_reply, regexpr):
+ log.debug('query-complete', rpc_reply=rpc_reply)
+
+ if rpc_reply.ok:
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['access-lists'] if 'access-lists' in result_dict['data'] else {}
+
+ if 'acl' in entries:
+ p = re.compile(regexpr)
+
+ pairs = []
+ if isinstance(entries['acl'], list):
+ pairs = {(entry['acl-type'], entry['acl-name']) for entry in entries['acl']
+ if 'acl-name' in entry and 'acl-type' in entry and p.match(entry['acl-name'])}
+ else:
+ if 'acl' in entries:
+ entry = entries['acl']
+ if 'acl-name' in entry and 'acl-type' in entry and p.match(entry['acl-name']):
+ pairs = [(entry['acl-type'], entry['acl-name'])]
+
+ if len(pairs) > 0:
+ del_xml = '<access-lists xmlns="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list">'
+ for pair in pairs:
+ del_xml += '<acl xc:operation = "delete">'
+ del_xml += '<acl-type>{}</acl-type>'.format(pair[0])
+ del_xml += '<acl-name>{}</acl-name>'.format(pair[1])
+ del_xml += '</acl>'
+ del_xml += '</access-lists>'
+ log.debug('removing', xml=del_xml)
+
+ return client.edit_config(del_xml)
+
+ return succeed('no entries')
+
+ d = client.get(get_xml)
+ d.addCallbacks(do_delete, request_failed, callbackArgs=[regex_], errbackArgs=['get'])
+ d.addCallbacks(delete_complete, request_failed, errbackArgs=['edit-config'])
+ return d
diff --git a/adapters/adtran_common/flow/evc.py b/adapters/adtran_common/flow/evc.py
new file mode 100644
index 0000000..5e00bca
--- /dev/null
+++ b/adapters/adtran_common/flow/evc.py
@@ -0,0 +1,479 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import xmltodict
+import re
+import structlog
+from enum import IntEnum
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+
+log = structlog.get_logger()
+
+EVC_NAME_FORMAT = 'VOLTHA-{}' # format(flow.id)
+EVC_NAME_REGEX_ALL = EVC_NAME_FORMAT.format('*')
+DEFAULT_STPID = 0x8100
+
+
+class EVC(object):
+ """
+ Class to wrap EVC functionality
+ """
+ class SwitchingMethod(IntEnum):
+ SINGLE_TAGGED = 1
+ DOUBLE_TAGGED = 2
+ MAC_SWITCHED = 3
+ DOUBLE_TAGGED_MAC_SWITCHED = 4
+ DEFAULT = SINGLE_TAGGED
+
+ @staticmethod
+ def xml(value):
+ if value is None:
+ value = EVC.SwitchingMethod.DEFAULT
+ if value == EVC.SwitchingMethod.SINGLE_TAGGED:
+ return '<single-tag-switched/>'
+ elif value == EVC.SwitchingMethod.DOUBLE_TAGGED:
+ return '<double-tag-switched/>'
+ elif value == EVC.SwitchingMethod.MAC_SWITCHED:
+ return '<mac-switched/>'
+ elif value == EVC.SwitchingMethod.DOUBLE_TAGGED_MAC_SWITCHED:
+ return '<double-tag-mac-switched/>'
+ raise ValueError('Invalid SwitchingMethod enumeration')
+
+ class Men2UniManipulation(IntEnum):
+ SYMMETRIC = 1
+ POP_OUT_TAG_ONLY = 2
+ DEFAULT = SYMMETRIC
+
+ @staticmethod
+ def xml(value):
+ if value is None:
+ value = EVC.Men2UniManipulation.DEFAULT
+ fmt = '<men-to-uni-tag-manipulation>{}</men-to-uni-tag-manipulation>'
+ if value == EVC.Men2UniManipulation.SYMMETRIC:
+ return fmt.format('<symmetric/>')
+ elif value == EVC.Men2UniManipulation.POP_OUT_TAG_ONLY:
+ return fmt.format('<pop-outer-tag-only/>')
+ raise ValueError('Invalid Men2UniManipulation enumeration')
+
+ class ElineFlowType(IntEnum):
+ NNI_TO_UNI = 1
+ UNI_TO_NNI = 2
+ NNI_TO_NNI = 3
+ UNI_TO_UNI = 4
+ ACL_FILTER = 5
+ UNKNOWN = 6
+ UNSUPPORTED = 7 # Or Invalid
+
+ def __init__(self, flow_entry):
+ self._installed = False
+ self._status_message = None
+ self._flow = flow_entry
+ self._name = self._create_name()
+ self._deferred = None
+ self._evc_maps = {} # Map Name -> evc-map
+
+ self._flow_type = EVC.ElineFlowType.UNKNOWN
+
+ # EVC related properties
+ self._enabled = True
+ self._men_ports = []
+ self._s_tag = None
+ self._stpid = None
+ self._switching_method = None
+ self.service_evc = False
+
+ self._ce_vlan_preservation = None
+ self._men_to_uni_tag_manipulation = None
+
+ try:
+ self._valid = self._decode()
+
+ except Exception as e:
+ log.exception('Failure during EVC decode', e=e)
+ self._valid = False
+
+ def __str__(self):
+ return "EVC-{}: MEN: {}, S-Tag: {}".format(self._name, self._men_ports, self._s_tag)
+
+ def _create_name(self):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.flow_id)
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+
+ except Exception as e:
+ pass
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @installed.setter
+ def installed(self, value):
+ assert not value, 'EVC Install can only be reset'
+ self._installed = False
+
+ @property
+ def status(self):
+ return self._status_message
+
+ @status.setter
+ def status(self, value):
+ self._status_message = value
+
+ @property
+ def s_tag(self):
+ return self._s_tag
+
+ @property
+ def stpid(self):
+ return self._stpid
+
+ @stpid.setter
+ def stpid(self, value):
+ assert self._stpid is None or self._stpid == value, 'STPID can only be set once'
+ self._stpid = value
+
+ @property
+ def switching_method(self):
+ return self._switching_method
+
+ @switching_method.setter
+ def switching_method(self, value):
+ assert self._switching_method is None or self._switching_method == value,\
+ 'Switching Method can only be set once. EVC: {}'.format(self.name)
+ self._switching_method = value
+
+ @property
+ def ce_vlan_preservation(self):
+ return self._ce_vlan_preservation
+
+ @ce_vlan_preservation.setter
+ def ce_vlan_preservation(self, value):
+ assert self._ce_vlan_preservation is None or self._ce_vlan_preservation == value,\
+ 'CE VLAN Preservation can only be set once'
+ self._ce_vlan_preservation = value
+
+ @property
+ def men_to_uni_tag_manipulation(self):
+ return self._men_to_uni_tag_manipulation
+
+ @men_to_uni_tag_manipulation.setter
+ def men_to_uni_tag_manipulation(self, value):
+ assert self._men_to_uni_tag_manipulation is None or self._men_to_uni_tag_manipulation == value, \
+ 'MEN-to-UNI tag manipulation can only be set once'
+ self._men_to_uni_tag_manipulation = value
+
+ @property
+ def flow_entry(self):
+ # Note that the first flow used to create the EVC is saved and it may
+ # eventually get deleted while others still use the EVC. This should
+ # be okay as the downstream flow/signature table is used to maintain
+ # the lifetime on this EVC object.
+ return self._flow
+
+ @flow_entry.setter
+ def flow_entry(self, value):
+ self._flow = value
+
+ @property
+ def evc_maps(self):
+ """
+ Get all EVC Maps that reference this EVC
+ :return: list of EVCMap
+ """
+ return list(self._evc_maps.values()) if self._evc_maps is not None else []
+
+ @property
+ def evc_map_names(self):
+ """
+ Get all EVC Map names that reference this EVC
+ :return: list of EVCMap names
+ """
+ return list(self._evc_maps.keys()) if self._evc_maps is not None else []
+
+ def add_evc_map(self, evc_map):
+ if self._evc_maps is None:
+ self._evc_maps = dict()
+
+ if evc_map.name not in self._evc_maps:
+ self._evc_maps[evc_map.name] = evc_map
+
+ def remove_evc_map(self, evc_map):
+ if self._evc_maps is not None and evc_map.name in self._evc_maps:
+ del self._evc_maps[evc_map.name]
+
+ def schedule_install(self, delay=0):
+ """
+ Try to install EVC and all MAPs in a single operational sequence.
+ The delay parameter is used during recovery to allow multiple associated
+ EVC maps to be updated/modified independently before the parent EVC
+ is installed.
+
+ :param delay: (int) Seconds to delay before install
+ """
+ self._cancel_deferred()
+
+ self._deferred = reactor.callLater(delay, self._do_install) \
+ if self._valid else succeed('Not VALID')
+
+ return self._deferred
+
+ @staticmethod
+ def _xml_header(operation=None):
+ return '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs"{}><evc>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
+
+ @staticmethod
+ def _xml_trailer():
+ return '</evc></evcs>'
+
+ @inlineCallbacks
+ def _do_install(self):
+ # Install the EVC if needed
+ log.debug('do-install', valid=self._valid, installed=self._installed)
+
+ if self._valid and not self._installed:
+ # TODO: Currently install EVC and then MAPs. Can do it all in a single edit-config operation
+
+ xml = EVC._xml_header()
+ xml += '<name>{}</name>'.format(self.name)
+ xml += '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
+
+ if self._ce_vlan_preservation is not None:
+ xml += '<ce-vlan-preservation>{}</ce-vlan-preservation>'.format('false')
+
+ if self._s_tag is not None:
+ xml += '<stag>{}</stag>'.format(self._s_tag)
+ xml += '<stag-tpid>{}</stag-tpid>'.format(self._stpid or DEFAULT_STPID)
+ else:
+ xml += 'no-stag/'
+
+ for port in self._men_ports:
+ xml += '<men-ports>{}</men-ports>'.format(port)
+
+ # xml += EVC.Men2UniManipulation.xml(self._men_to_uni_tag_manipulation)
+ # xml += EVC.SwitchingMethod.xml(self._switching_method)
+ xml += EVC._xml_trailer()
+
+ log.debug('create-evc', name=self.name, xml=xml)
+ try:
+ # Set installed to true while request is in progress
+ self._installed = True
+ results = yield self._flow.handler.netconf_client.edit_config(xml)
+ self._installed = results.ok
+ self.status = '' if results.ok else results.error
+
+ except Exception as e:
+ log.exception('install-failed', name=self.name, e=e)
+ raise
+
+ # Install any associated EVC Maps
+
+ if self._installed:
+ for evc_map in self.evc_maps:
+ try:
+ yield evc_map.install()
+
+ except Exception as e:
+ evc_map.status = 'Exception during EVC-MAP Install: {}'.format(e.message)
+ log.exception('evc-map-install-failed', e=e)
+
+ returnValue(self._installed and self._valid)
+
+ def remove(self, remove_maps=True):
+ """
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
+ """
+ if not self.installed:
+ return succeed('Not installed')
+
+ log.info('removing', evc=self, remove_maps=remove_maps)
+ dl = []
+
+ def _success(rpc_reply):
+ log.debug('remove-success', rpc_reply=rpc_reply)
+ self._installed = False
+
+ def _failure(results):
+ log.error('remove-failed', results=results)
+ self._installed = False
+
+ xml = EVC._xml_header('delete') + '<name>{}</name>'.format(self.name) + EVC._xml_trailer()
+ d = self._flow.handler.netconf_client.edit_config(xml)
+ d.addCallbacks(_success, _failure)
+ dl.append(d)
+
+ if remove_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.remove())
+
+ return defer.gatherResults(dl, consumeErrors=True)
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
+ assert self._flow, 'Delete EVC must have flow reference'
+ try:
+ dl = [self.remove()]
+ self._valid = False
+
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete(None)) # TODO: implement bulk-flow procedures
+
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ returnValue('Done')
+
+ def reflow(self, reflow_maps=True):
+ """
+ Attempt to install/re-install a flow
+ :param reflow_maps: (boolean) Flag indication if EVC-MAPs should be reflowed as well
+ :return: (deferred)
+ """
+ self._installed = False
+
+ if reflow_maps:
+ for evc_map in self.evc_maps:
+ evc_map.installed = False
+
+ return self.schedule_install()
+
+ def _decode(self):
+ """
+ Examine flow rules and extract appropriate settings for this EVC
+ """
+ if self._flow.handler.is_nni_port(self._flow.in_port):
+ self._men_ports.append(self._flow.handler.get_port_name(self._flow.in_port))
+ else:
+ self._status_message = 'EVCs with UNI ports are not supported'
+ return False # UNI Ports handled in the EVC Maps
+
+ self._s_tag = self._flow.vlan_id
+
+ if self._flow.inner_vid is not None:
+ self._switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
+
+ # For the Utility VLAN, multiple ingress ACLs (different GEMs) will need to
+ # be trapped on this EVC. Since these are usually untagged, we have to force
+ # the EVC to preserve CE VLAN tags.
+
+ if self._s_tag == self._flow.handler.utility_vlan:
+ self._ce_vlan_preservation = True
+
+ # Note: The following fields may get set when the first EVC-MAP
+ # is associated with this object. Once set, they cannot be changed to
+ # another value.
+ # self._stpid
+ # self._switching_method
+ # self._ce_vlan_preservation
+ # self._men_to_uni_tag_manipulation
+ return True
+
+ # BULK operations
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_NAME_REGEX_ALL):
+ """
+ Remove all matching EVCs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ # Do a 'get' on the evc config an you should get the names
+ get_xml = """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs">
+ <evc><name/></evc>
+ </evcs>
+ </filter>
+ """
+ log.debug('query', xml=get_xml, regex=regex_)
+
+ def request_failed(results, operation):
+ log.error('{}-failed'.format(operation), results=results)
+ # No further actions. Periodic poll later on will scrub any old EVCs if needed
+
+ def delete_complete(results):
+ log.debug('delete-complete', results=results)
+
+ def do_delete(rpc_reply, regexpr):
+ log.debug('query-complete', rpc_reply=rpc_reply)
+
+ if rpc_reply.ok:
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['evcs'] if 'evcs' in result_dict['data'] else {}
+
+ if 'evc' in entries:
+ p = re.compile(regexpr)
+
+ if isinstance(entries['evc'], list):
+ names = {entry['name'] for entry in entries['evc'] if 'name' in entry
+ and p.match(entry['name'])}
+ else:
+ names = set()
+ for item in entries['evc'].items():
+ if isinstance(item, tuple) and item[0] == 'name':
+ names.add(item[1])
+ break
+
+ if len(names) > 0:
+ del_xml = '<evcs xmlns="http://www.adtran.com/ns/yang/adtran-evcs"' + \
+ ' xc:operation = "delete">'
+ for name in names:
+ del_xml += '<evc>'
+ del_xml += '<name>{}</name>'.format(name)
+ del_xml += '</evc>'
+ del_xml += '</evcs>'
+ log.debug('removing', xml=del_xml)
+
+ return client.edit_config(del_xml)
+
+ return succeed('no entries')
+
+ d = client.get(get_xml)
+ d.addCallbacks(do_delete, request_failed, callbackArgs=[regex_], errbackArgs=['get'])
+ d.addCallbacks(delete_complete, request_failed, errbackArgs=['edit-config'])
+ return d
diff --git a/adapters/adtran_common/flow/evc_map.py b/adapters/adtran_common/flow/evc_map.py
new file mode 100644
index 0000000..688124a
--- /dev/null
+++ b/adapters/adtran_common/flow/evc_map.py
@@ -0,0 +1,1015 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import xmltodict
+import re
+import structlog
+from enum import Enum
+from acl import ACL
+from twisted.internet import defer, reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+from ncclient.operations.rpc import RPCError
+
+
+log = structlog.get_logger()
+
+# NOTE: For the EVC Map name, the ingress-port number is the VOLTHA port number (not pon-id since
+# it covers NNI ports as well in order to handle the NNI-NNI case. For flows that
+# cover an entire pon, the name will have the ONU ID and GEM ID appended to it upon
+# installation with a period as a separator.
+
+EVC_MAP_NAME_FORMAT = 'VOLTHA-{}-{}' # format(logical-ingress-port-number, flow-id)
+EVC_MAP_NAME_REGEX_ALL = 'VOLTHA-*'
+
+
+class EVCMap(object):
+ """
+ Class to wrap EVC functionality
+ """
+ class EvcConnection(Enum):
+ NO_EVC_CONNECTION = 0
+ EVC = 1
+ DISCARD = 2
+ DEFAULT = NO_EVC_CONNECTION
+
+ @staticmethod
+ def xml(value):
+ # Note we do not have XML for 'EVC' enumeration.
+ if value is None:
+ value = EVCMap.EvcConnection.DEFAULT
+ if value == EVCMap.EvcConnection.DISCARD:
+ return '<no-evc-connection/>'
+ elif value == EVCMap.EvcConnection.DISCARD:
+ return 'discard/'
+ raise ValueError('Invalid EvcConnection enumeration')
+
+ class PriorityOption(Enum):
+ INHERIT_PRIORITY = 0
+ EXPLICIT_PRIORITY = 1
+ DEFAULT = INHERIT_PRIORITY
+
+ @staticmethod
+ def xml(value):
+ if value is None:
+ value = EVCMap.PriorityOption.DEFAULT
+ if value == EVCMap.PriorityOption.INHERIT_PRIORITY:
+ return '<inherit-pri/>'
+ elif value == EVCMap.PriorityOption.EXPLICIT_PRIORITY:
+ return '<explicit-pri/>'
+ raise ValueError('Invalid PriorityOption enumeration')
+
+ def __init__(self, flow, evc, is_ingress_map):
+ self._handler = flow.handler # Same for all Flows attached to this EVC MAP
+ self._flows = {flow.flow_id: flow}
+ self._evc = None
+ self._new_acls = dict() # ACL Name -> ACL Object (To be installed into h/w)
+ self._existing_acls = dict() # ACL Name -> ACL Object (Already in H/w)
+ self._is_ingress_map = is_ingress_map
+ self._pon_id = None
+ self._onu_id = None # Remains None if associated with a multicast flow
+ self._installed = False
+ self._needs_update = False
+ self._status_message = None
+ self._deferred = None
+ self._name = None
+ self._enabled = True
+ self._uni_port = None
+ self._evc_connection = EVCMap.EvcConnection.DEFAULT
+ self._men_priority = EVCMap.PriorityOption.DEFAULT
+ self._men_pri = 0 # If Explicit Priority
+
+ self._c_tag = None
+ self._men_ctag_priority = EVCMap.PriorityOption.DEFAULT
+ self._men_ctag_pri = 0 # If Explicit Priority
+ self._match_ce_vlan_id = None
+ self._match_untagged = False
+ self._match_destination_mac_address = None
+ self._match_l2cp = False
+ self._match_broadcast = False
+ self._match_multicast = False
+ self._match_unicast = False
+ self._match_igmp = False
+
+ from common.tech_profile.tech_profile import DEFAULT_TECH_PROFILE_TABLE_ID
+ self._tech_profile_id = DEFAULT_TECH_PROFILE_TABLE_ID
+ self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
+ self._upstream_bandwidth = None
+ self._shaper_name = None
+
+ # ACL logic
+ self._eth_type = None
+ self._ip_protocol = None
+ self._ipv4_dst = None
+ self._udp_dst = None
+ self._udp_src = None
+
+ try:
+ self._valid = self._decode(evc)
+
+ except Exception as e:
+ log.exception('decode', e=e)
+ self._valid = False
+
+ def __str__(self):
+ return "EVCMap-{}: UNI: {}, hasACL: {}".format(self._name, self._uni_port,
+ self._needs_acl_support)
+
+ @staticmethod
+ def create_ingress_map(flow, evc, dry_run=False):
+ evc_map = EVCMap(flow, evc, True)
+
+ if evc_map._valid and not dry_run:
+ evc.add_evc_map(evc_map)
+ evc_map._evc = evc
+
+ return evc_map
+
+ @staticmethod
+ def create_egress_map(flow, evc, dry_run=False):
+ evc_map = EVCMap(flow, evc, False)
+
+ if evc_map._valid and not dry_run:
+ evc.add_evc_map(evc_map)
+ evc_map._evc = evc
+
+ return evc_map
+
+ @property
+ def valid(self):
+ return self._valid
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def needs_update(self):
+ """ True if an parameter/ACL/... needs update or map needs to be reflowed after a failure"""
+ return self._needs_update
+
+ @needs_update.setter
+ def needs_update(self, value):
+ assert not value, 'needs update can only be reset' # Can only reset
+ self._needs_update = False
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def status(self):
+ return self._status_message
+
+ @status.setter
+ def status(self, value):
+ self._status_message = value
+
+ @property
+ def evc(self):
+ return self._evc
+
+ @property
+ def _needs_acl_support(self):
+ if self._ipv4_dst is not None: # In case MCAST downstream has ACL on it
+ return False
+
+ return self._eth_type is not None or self._ip_protocol is not None or\
+ self._udp_dst is not None or self._udp_src is not None
+
+ @property
+ def pon_id(self):
+ return self._pon_id # May be None
+
+ @property
+ def onu_id(self):
+ return self._onu_id # May be None if associated with a multicast flow
+
+ # @property
+ # def onu_ids(self):
+ # return self._gem_ids_and_vid.keys()
+
+ @property
+ def gem_ids_and_vid(self):
+ return self._gem_ids_and_vid.copy()
+
+ @staticmethod
+ def _xml_header(operation=None):
+ return '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"{}><evc-map>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
+
+ @staticmethod
+ def _xml_trailer():
+ return '</evc-map></evc-maps>'
+
+ def get_evcmap_name(self, onu_id, gem_id):
+ return'{}.{}.{}.{}'.format(self.name, self.pon_id, onu_id, gem_id)
+
+ def _common_install_xml(self):
+ xml = '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
+ xml += '<uni>{}</uni>'.format(self._uni_port)
+
+ evc_name = self._evc.name if self._evc is not None else None
+ if evc_name is not None:
+ xml += '<evc>{}</evc>'.format(evc_name)
+ else:
+ xml += EVCMap.EvcConnection.xml(self._evc_connection)
+
+ xml += '<match-untagged>{}</match-untagged>'.format('true'
+ if self._match_untagged
+ else 'false')
+
+ # TODO: The following is not yet supported (and in some cases, not decoded)
+ # self._men_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_pri = 0 # If Explicit Priority
+ #
+ # self._men_ctag_priority = EVCMap.PriorityOption.INHERIT_PRIORITY
+ # self._men_ctag_pri = 0 # If Explicit Priority
+ #
+ # self._match_ce_vlan_id = None
+ # self._match_untagged = True
+ # self._match_destination_mac_address = None
+ return xml
+
+ def _ingress_install_xml(self, onu_s_gem_ids_and_vid, acl_list, create):
+ from ..onu import Onu
+
+ if len(acl_list):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' +\
+ ' xmlns:adtn-evc-map-acl="http://www.adtran.com/ns/yang/adtran-evc-map-access-control-list">'
+ else:
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
+
+ for onu_or_vlan_id, gem_ids_and_vid in onu_s_gem_ids_and_vid.iteritems():
+ first_gem_id = True
+ gem_ids = gem_ids_and_vid[0]
+ vid = gem_ids_and_vid[1]
+ ident = '{}.{}'.format(self._pon_id, onu_or_vlan_id) if vid is None \
+ else onu_or_vlan_id
+
+ for gem_id in gem_ids:
+ xml += '<evc-map{}>'.format('' if not create else ' xc:operation="create"')
+ xml += '<name>{}.{}.{}</name>'.format(self.name, ident, gem_id)
+ xml += '<ce-vlan-id>{}</ce-vlan-id>'.format(Onu.gem_id_to_gvid(gem_id))
+
+ # GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
+ if first_gem_id and (self._c_tag is not None or vid is not None):
+ first_gem_id = False
+ vlan = vid or self._c_tag
+ xml += '<network-ingress-filter>'
+ xml += '<men-ctag>{}</men-ctag>'.format(vlan) # Added in August 2017 model
+ xml += '</network-ingress-filter>'
+
+ if len(acl_list):
+ xml += '<adtn-evc-map-acl:access-lists>'
+ for acl in acl_list:
+ xml += ' <adtn-evc-map-acl:ingress-acl>'
+ xml += acl.evc_map_ingress_xml()
+ xml += ' </adtn-evc-map-acl:ingress-acl>'
+ xml += '</adtn-evc-map-acl:access-lists>'
+ xml += self._common_install_xml()
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ def _egress_install_xml(self):
+ xml = EVCMap._xml_header()
+ xml += '<name>{}</name>'.format(self.name)
+ xml += self._common_install_xml()
+ xml += EVCMap._xml_trailer()
+ return xml
+
+ def _ingress_remove_acl_xml(self, onu_s_gem_ids_and_vid, acl):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' +\
+ ' xmlns:adtn-evc-map-acl="http://www.adtran.com/ns/yang/adtran-evc-map-access-control-list">'
+ for onu_or_vlan_id, gem_ids_and_vid in onu_s_gem_ids_and_vid.iteritems():
+ first_gem_id = True
+ vid = gem_ids_and_vid[1]
+ ident = '{}.{}'.format(self._pon_id, onu_or_vlan_id) if vid is None \
+ else onu_or_vlan_id
+
+ for gem_id in gem_ids_and_vid[0]:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, ident, gem_id)
+ xml += '<adtn-evc-map-acl:access-lists>'
+ xml += ' <adtn-evc-map-acl:ingress-acl xc:operation="delete">'
+ xml += acl.evc_map_ingress_xml()
+ xml += ' </adtn-evc-map-acl:ingress-acl>'
+ xml += '</adtn-evc-map-acl:access-lists>'
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ @inlineCallbacks
+ def install(self):
+ def gem_ports():
+ ports = []
+ for gems_and_vids in self._gem_ids_and_vid.itervalues():
+ ports.extend(gems_and_vids[0])
+ return ports
+
+ log.debug('install-evc-map', valid=self._valid, gem_ports=gem_ports())
+
+ if self._valid and len(gem_ports()) > 0:
+ # Install ACLs first (if not yet installed)
+ work_acls = self._new_acls.copy()
+ self._new_acls = dict()
+
+ log.debug('install-evc-map-acls', install_acls=len(work_acls))
+ for acl in work_acls.itervalues():
+ try:
+ yield acl.install()
+
+ except Exception as e:
+ log.exception('acl-install-failed', name=self.name, e=e)
+ self._new_acls.update(work_acls)
+ raise
+
+ # Any user-data flows attached to this map ?
+ c_tag = None
+ for flow_id, flow in self._flows.items():
+ c_tag = flow.inner_vid or flow.vlan_id or c_tag
+
+ self._c_tag = c_tag
+
+ # Now EVC-MAP
+ if not self._installed or self._needs_update:
+ log.debug('needs-install-or-update', installed=self._installed, update=self._needs_update)
+ is_installed = self._installed
+ self._installed = True
+ try:
+ self._cancel_deferred()
+
+ log.info('upstream-bandwidth')
+ try:
+ yield self.update_upstream_flow_bandwidth()
+
+ except Exception as e:
+ log.exception('upstream-bandwidth-failed', name=self.name, e=e)
+ raise
+
+ map_xml = self._ingress_install_xml(self._gem_ids_and_vid, work_acls.values(),
+ not is_installed) \
+ if self._is_ingress_map else self._egress_install_xml()
+
+ log.debug('install', xml=map_xml, name=self.name)
+ results = yield self._handler.netconf_client.edit_config(map_xml)
+ self._installed = results.ok
+ self._needs_update = results.ok
+ self._status_message = '' if results.ok else results.error
+
+ if results.ok:
+ self._existing_acls.update(work_acls)
+ else:
+ self._new_acls.update(work_acls)
+
+ except RPCError as rpc_err:
+ if rpc_err.tag == 'data-exists': # Known race due to bulk-flow operation
+ pass
+
+ except Exception as e:
+ log.exception('evc-map-install-failed', name=self.name, e=e)
+ self._installed = is_installed
+ self._new_acls.update(work_acls)
+ raise
+
+ # Install any needed shapers
+ if self._installed:
+ try:
+ yield self.update_downstream_flow_bandwidth()
+
+ except Exception as e:
+ log.exception('shaper-install-failed', name=self.name, e=e)
+ raise
+
+ returnValue(self._installed and self._valid)
+
+ def _ingress_remove_xml(self, onus_gem_ids_and_vid):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation="delete">'
+
+ for onu_id, gem_ids_and_vid in onus_gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += '<evc-map>'
+ xml += '<name>{}.{}.{}</name>'.format(self.name, onu_id, gem_id)
+ xml += '</evc-map>'
+ xml += '</evc-maps>'
+ return xml
+
+ def _egress_remove_xml(self):
+ return EVCMap._xml_header('delete') + \
+ '<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
+
+ def _remove(self):
+ if not self.installed:
+ returnValue('Not installed')
+
+ log.info('removing', evc_map=self)
+
+ def _success(rpc_reply):
+ log.debug('remove-success', rpc_reply=rpc_reply)
+ self._installed = False
+
+ def _failure(failure):
+ log.error('remove-failed', failure=failure)
+ self._installed = False
+
+ def _remove_acls(_):
+ acls, self._new_acls = self._new_acls, dict()
+ existing, self._existing_acls = self._existing_acls, dict()
+ acls.update(existing)
+
+ dl = []
+ for acl in acls.itervalues():
+ dl.append(acl.remove())
+
+ if len(dl) > 0:
+ defer.gatherResults(dl, consumeErrors=True)
+
+ def _remove_shaper(_):
+ if self._shaper_name is not None:
+ self.update_downstream_flow_bandwidth(remove=True)
+
+ map_xml = self._ingress_remove_xml(self._gem_ids_and_vid) if self._is_ingress_map \
+ else self._egress_remove_xml()
+
+ d = self._handler.netconf_client.edit_config(map_xml)
+ d.addCallbacks(_success, _failure)
+ d.addBoth(_remove_acls)
+ d.addBoth(_remove_shaper)
+ return d
+
+ @inlineCallbacks
+ def delete(self, flow):
+ """
+ Remove from hardware and delete/clean-up EVC-MAP Object
+
+ :param flow: (FlowEntry) Specific flow to remove from the MAP or None if all
+ flows should be removed
+ :return:
+ """
+ flows = [flow] if flow is not None else list(self._flows.values())
+ removing_all = len(flows) == len(self._flows)
+
+ log.debug('delete', removing_all=removing_all)
+ if not removing_all:
+ for f in flows:
+ self._remove_flow(f)
+
+ else:
+ if self._evc is not None:
+ self._evc.remove_evc_map(self)
+ self._evc = None
+
+ self._valid = False
+ self._cancel_deferred()
+ try:
+ yield self._remove()
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ returnValue('Done')
+
+ def reflow_needed(self):
+ log.debug('reflow-needed', installed=self.installed, needs_update=self.needs_update)
+ reflow = not self.installed or self.needs_update
+
+ if not reflow:
+ pass # TODO: implement retrieve & compare of EVC Map parameters
+
+ return reflow
+
+ @staticmethod
+ def find_matching_ingress_flow(flow, upstream_flow_table):
+ """
+ Look for an existing EVC-MAP that may match this flow. Called when upstream signature
+ for a flow does not make match. This can happen if an ACL flow is added and only an User
+ Data flow exists, or if only an ACL flow exists.
+
+ :param flow: (FlowEntry) flow to add
+ :param upstream_flow_table: (dict of FlowEntry) Existing upstream flows for this device,
+ including the flow we are looking to add
+ :return: (EVCMap) if appropriate one is found, else None
+ """
+ # A User Data flow will have:
+ # signature: <dev>.1.5.2.242
+ # down-sig: <dev>.1.*.2.*
+ # logical-port: 66
+ # is-acl-flow: False
+ #
+ # An ACL flow will have:
+ # signature: <dev>.1.5.[4092 or 4094].None (untagged VLAN == utility VLAN case)
+ # down-sig: <dev>.1.*.[4092 or 4094].*
+ # logical-port: 66
+ # is-acl-flow: True
+ #
+ # Reduce the upstream flow table to only those that match the ingress,
+ # and logical-ports match (and is not this flow) and have a map
+
+ log.debug('find-matching-ingress-flow', logical_port=flow.logical_port, flow=flow.output)
+ candidate_flows = [f for f in upstream_flow_table.itervalues() if
+ f.in_port == flow.in_port and
+ f.logical_port == flow.logical_port and
+ f.output == flow.output and
+ f.evc_map is not None] # This weeds out this flow
+
+ log.debug('find-matching-ingress-flow', candidate_flows=candidate_flows)
+ return candidate_flows[0].evc_map if len(candidate_flows) > 0 else None
+
+ def add_flow(self, flow, evc):
+ """
+ Add a new flow to an existing EVC-MAP. This can be called to add:
+ o an ACL flow to an existing utility EVC, or
+ o an ACL flow to an existing User Data Flow, or
+ o a User Data Flow to an existing ACL flow (and this needs the EVC updated
+ as well.
+
+ Note that the Downstream EVC provided is the one that matches this flow. If
+ this is adding an ACL to and existing User data flow, we DO NOT want to
+ change the EVC Map's EVC
+
+ :param flow: (FlowEntry) New flow
+ :param evc: (EVC) Matching EVC for downstream flow
+ """
+ from flow_entry import FlowEntry
+ # Create temporary EVC-MAP
+ assert flow.flow_direction in FlowEntry.upstream_flow_types, \
+ 'Only Upstream flows additions are supported at this time'
+
+ log.debug('add-flow-to-evc', flow=flow, evc=evc)
+
+ tmp_map = EVCMap.create_ingress_map(flow, evc, dry_run=True) \
+ if flow.flow_direction in FlowEntry.upstream_flow_types \
+ else EVCMap.create_egress_map(flow, evc, dry_run=True)
+
+ if tmp_map is None or not tmp_map.valid:
+ return None
+
+ self._flows[flow.flow_id] = flow
+ self._needs_update = True
+
+ # Are there ACLs to add to any existing (or empty) ACLs
+ if len(tmp_map._new_acls) > 0:
+ self._new_acls.update(tmp_map._new_acls) # New ACL flow
+ log.debug('add-acl-flows', map=str(self), new=tmp_map._new_acls)
+
+ # Look up existing EVC for this flow. If it is a service EVC for
+ # Packet In/Out, and this is a regular flow, migrate to the newer EVC
+ if self._evc.service_evc and not evc.service_evc:
+ log.info('new-evc-for-map', old=self._evc.name, new=evc.name)
+ self._evc.remove_evc_map(self)
+ evc.add_evc_map(self)
+ self._evc = evc
+
+ return self
+
+ @inlineCallbacks
+ def _remove_flow(self, flow):
+ """
+ Remove a specific flow from an EVC_MAP. This includes removing any
+ ACL entries associated with the flow and could result in moving the
+ EVC-MAP over to another EVC.
+
+ :param flow: (FlowEntry) Flow to remove
+ """
+ try:
+ del self._flows[flow.flow_id]
+
+ log('remove-flow-to-evc', flow=flow)
+ # Remove any ACLs
+ acl_name = ACL.flow_to_name(flow)
+ acl = None
+
+ # if not yet installed just remove it from list
+ if acl_name in self._new_acls:
+ del self._new_acls[acl_name]
+ else:
+ acl = self._existing_acls[acl_name]
+ if acl is not None:
+ # Remove ACL from EVC-MAP entry
+
+ try:
+ map_xml = self._ingress_remove_acl_xml(self._gem_ids_and_vid, acl)
+ log.debug('remove', xml=map_xml, name=acl.name)
+ results = yield self._handler.netconf_client.edit_config(map_xml)
+ if results.ok:
+ del self._existing_acls[acl.name]
+
+ # Scan EVC to see if it needs to move back to the Utility
+ # or Untagged EVC from a user data EVC
+ if self._evc and not self._evc.service_evc and\
+ len(self._flows) > 0 and\
+ all(f.is_acl_flow for f in self._flows.itervalues()):
+
+ self._evc.remove_evc_map(self)
+ first_flow = self._flows.itervalues().next()
+ self._evc = first_flow.get_utility_evc(True)
+ self._evc.add_evc_map(self)
+ log.debug('moved-acl-flows-to-utility-evc', newevcname=self._evc.name)
+
+ self._needs_update = True
+ self._evc.schedule_install()
+
+ except Exception as e:
+ log.exception('acl-remove-from-evc', e=e)
+
+ # Remove ACL itself
+ try:
+ yield acl.remove()
+
+ except Exception as e:
+ log.exception('acl-remove', e=e)
+
+ except Exception as e:
+ log.exception('remove-failed', e=e)
+
+ @staticmethod
+ def create_evc_map_name(flow):
+ # Note: When actually installed into the OLT, the .onu_id.gem_port is
+ # appended to the name
+ return EVC_MAP_NAME_FORMAT.format(flow.logical_port, flow.flow_id)
+
+ @staticmethod
+ def decode_evc_map_name(name):
+ """
+ Reverse engineer EVC-MAP name parameters. Helpful in quick packet-in
+ processing
+
+ :param name: (str) EVC Map Name
+ :return: (dict) Logical Ingress Port, OpenFlow Flow-ID
+ """
+ items = name.split('-') if name is not None else dict()
+
+ # Note: When actually installed into the OLT, the .onu_id.gem_port is
+ # appended to the name
+ return {'ingress-port': items[1],
+ 'flow-id': items[2].split('.')[0]} if len(items) > 2 else dict()
+
+ @inlineCallbacks
+ def update_upstream_flow_bandwidth(self):
+ """
+ Upstream flow bandwidth comes from the flow_entry related to this EVC-MAP
+ and if no bandwidth property is found, allow full bandwidth
+ """
+ # all flows should should be on the same PON
+ flow = self._flows.itervalues().next()
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+
+ if self._is_ingress_map and is_pon:
+ pon_port = flow.handler.get_southbound_port(flow.in_port)
+ if pon_port is None:
+ returnValue('no PON')
+
+ session = self._handler.rest_client
+ # TODO: Refactor with tech profiles
+ tconts = None # pon_port.tconts
+ traffic_descriptors = None # pon_port.traffic_descriptors
+
+ if traffic_descriptors is None or tconts is None:
+ returnValue('no TDs on PON')
+
+ bandwidth = self._upstream_bandwidth or 10000000000
+
+ if self.pon_id is not None and self.onu_id is not None:
+ name = 'tcont-{}-{}-data'.format(self.pon_id, self.onu_id)
+ td = traffic_descriptors.get(name)
+ tcont = tconts.get(name)
+
+ if td is not None and tcont is not None:
+ alloc_id = tcont.alloc_id
+ td.maximum_bandwidth = bandwidth
+ try:
+ results = yield td.add_to_hardware(session)
+ log.debug('td-modify-results', results=results)
+
+ except Exception as _e:
+ pass
+
+ @inlineCallbacks
+ def update_downstream_flow_bandwidth(self, remove=False):
+ """
+ Downstream flow bandwidth is extracted from the related EVC flow_entry
+ bandwidth property. It is written to this EVC-MAP only if it is found
+ """
+ xml = None
+ results = None
+
+ if remove:
+ name, self._shaper_name = self._shaper_name, None
+ if name is not None:
+ xml = self._shaper_remove_xml(name)
+ else:
+ if self._evc is not None and self._evc.flow_entry is not None \
+ and self._evc.flow_entry.bandwidth is not None:
+ self._shaper_name = self._name
+ xml = self._shaper_install_xml(self._shaper_name,
+ self._evc.flow_entry.bandwidth * 1000) # kbps
+ if xml is not None:
+ try:
+ log.info('downstream-bandwidth', xml=xml, name=self.name, remove=remove)
+ results = yield self._handler.netconf_client.edit_config(xml)
+
+ except RPCError as rpc_err:
+ if rpc_err.tag == 'data-exists':
+ pass
+
+ except Exception as e:
+ log.exception('downstream-bandwidth', name=self.name, remove=remove, e=e)
+ raise
+
+ returnValue(results)
+
+ def _shaper_install_xml(self, name, bandwidth):
+ xml = '<adtn-shaper:shapers xmlns:adtn-shaper="http://www.adtran.com/ns/yang/adtran-traffic-shapers" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="merge">'
+ for onu_id, gem_ids_and_vid in self._gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += ' <adtn-shaper:shaper>'
+ xml += ' <adtn-shaper:name>{}.{}.{}</adtn-shaper:name>'.format(name, onu_id, gem_id)
+ xml += ' <adtn-shaper:enabled>true</adtn-shaper:enabled>'
+ xml += ' <adtn-shaper:rate>{}</adtn-shaper:rate>'.format(bandwidth)
+ xml += ' <adtn-shaper-evc-map:evc-map xmlns:adtn-shaper-evc-map="http://www.adtran.com/ns/yang/adtran-traffic-shaper-evc-maps">{}.{}.{}</adtn-shaper-evc-map:evc-map>'.format(self.name, onu_id, gem_id)
+ xml += ' </adtn-shaper:shaper>'
+ xml += '</adtn-shaper:shapers>'
+ return xml
+
+ def _shaper_remove_xml(self, name):
+ xml = '<adtn-shaper:shapers xmlns:adtn-shaper="http://www.adtran.com/ns/yang/adtran-traffic-shapers" xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0" nc:operation="delete">'
+ for onu_id, gem_ids_and_vid in self._gem_ids_and_vid.iteritems():
+ for gem_id in gem_ids_and_vid[0]:
+ xml += ' <adtn-shaper:shaper >'
+ xml += ' <adtn-shaper:name>{}.{}.{}</adtn-shaper:name>'.format(name, onu_id, gem_id)
+ xml += ' </adtn-shaper:shaper>'
+ xml += '</adtn-shaper:shapers>'
+ return xml
+
+ def _setup_tech_profiles(self):
+ # Set up the TCONT / GEM Ports for this connection (Downstream only of course)
+ # all flows should have same GEM port setup
+ flow = self._flows.itervalues().next()
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+
+ if self._is_ingress_map and is_pon:
+ pon_port = flow.handler.get_southbound_port(flow.in_port)
+
+ if pon_port is None:
+ return
+
+ onu = next((onu for onu in pon_port.onus if onu.logical_port == flow.logical_port), None)
+
+ if onu is None: # TODO: Add multicast support later (self.onu_id == None)
+ return
+
+ self._pon_id = pon_port.pon_id
+ self._onu_id = onu.onu_id
+
+ # Identify or allocate TCONT and GEM Ports. If the ONU has been informed of the
+ # GEM PORTs that belong to it, the tech profiles were already set up by a previous
+ # flows
+ onu_gems = onu.gem_ids(self._tech_profile_id)
+
+ if len(onu_gems) > 0:
+ self._gem_ids_and_vid[onu.onu_id] = (onu_gems, flow.vlan_id)
+ return
+
+ uni_id = self._handler.platform.uni_id_from_uni_port(flow.logical_port)
+ pon_profile = self._handler.tech_profiles[self.pon_id]
+ alloc_id = None
+
+ try:
+ (ofp_port_name, ofp_port_no) = self._handler.get_ofp_port_name(self.pon_id,
+ self.onu_id,
+ flow.logical_port)
+ if ofp_port_name is None:
+ log.error("port-name-not-found")
+ return
+
+ # Check tech profile instance already exists for derived port name
+ tech_profile = pon_profile.get_tech_profile_instance(self._tech_profile_id,
+ ofp_port_name)
+ log.debug('Get-tech-profile-instance-status',
+ tech_profile_instance=tech_profile)
+
+ if tech_profile is None:
+ # create tech profile instance
+ tech_profile = pon_profile.create_tech_profile_instance(self._tech_profile_id,
+ ofp_port_name,
+ self.pon_id)
+ if tech_profile is None:
+ raise Exception('Tech-profile-instance-creation-failed')
+ else:
+ log.debug('Tech-profile-instance-already-exist-for-given port-name',
+ ofp_port_name=ofp_port_name)
+
+ # upstream scheduler
+ us_scheduler = pon_profile.get_us_scheduler(tech_profile)
+
+ # downstream scheduler
+ ds_scheduler = pon_profile.get_ds_scheduler(tech_profile)
+
+ # create Tcont protobuf
+ pb_tconts = pon_profile.get_tconts(tech_profile, us_scheduler, ds_scheduler)
+
+ # create TCONTs & GEM Ports locally
+ for pb_tcont in pb_tconts:
+ from ..xpon.olt_tcont import OltTCont
+ tcont = OltTCont.create(pb_tcont,
+ self.pon_id,
+ self.onu_id,
+ self._tech_profile_id,
+ uni_id,
+ ofp_port_no)
+ if tcont is not None:
+ onu.add_tcont(tcont)
+
+ # Fetch alloc id and gemports from tech profile instance
+ alloc_id = tech_profile.us_scheduler.alloc_id
+
+ onu_gems = [gem.gemport_id for gem in tech_profile.upstream_gem_port_attribute_list]
+
+ for gem in tech_profile.upstream_gem_port_attribute_list:
+ from ..xpon.olt_gem_port import OltGemPort
+ gem_port = OltGemPort.create(self._handler,
+ gem,
+ tech_profile.us_scheduler.alloc_id,
+ self._tech_profile_id,
+ self.pon_id,
+ self.onu_id,
+ uni_id,
+ ofp_port_no)
+ if gem_port is not None:
+ onu.add_gem_port(gem_port)
+
+ self._gem_ids_and_vid = {onu.onu_id: (onu_gems, flow.vlan_id)}
+
+ # Send technology profile information to ONU
+ reactor.callLater(0, self._handler.setup_onu_tech_profile, self._pon_id,
+ self.onu_id, flow.logical_port)
+
+ except BaseException as e:
+ log.exception(exception=e)
+
+ # Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store
+ pon_intf_onu_id = (self.pon_id, self.onu_id, uni_id)
+ resource_manager = self._handler.resource_mgr.resource_managers[self.pon_id]
+
+ resource_manager.update_alloc_ids_for_onu(pon_intf_onu_id, list([alloc_id]))
+ resource_manager.update_gemport_ids_for_onu(pon_intf_onu_id, onu_gems)
+
+ self._handler.resource_mgr.update_gemports_ponport_to_onu_map_on_kv_store(onu_gems,
+ self.pon_id,
+ self.onu_id,
+ uni_id)
+
+ def _decode(self, evc):
+ from evc import EVC
+ from flow_entry import FlowEntry
+
+ # Only called from initializer, so first flow is only flow
+ flow = self._flows.itervalues().next()
+
+ self._name = EVCMap.create_evc_map_name(flow)
+
+ if evc:
+ self._evc_connection = EVCMap.EvcConnection.EVC
+ else:
+ self._status_message = 'Can only create EVC-MAP if EVC supplied'
+ return False
+
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+ is_uni = flow.handler.is_uni_port(flow.in_port)
+
+ if flow.bandwidth is not None:
+ self._upstream_bandwidth = flow.bandwidth * 1000000
+
+ if is_pon or is_uni:
+ # Preserve CE VLAN tag only if utility VLAN/EVC
+ self._uni_port = flow.handler.get_port_name(flow.in_port)
+ evc.ce_vlan_preservation = evc.ce_vlan_preservation or False
+ else:
+ self._status_message = 'EVC-MAPS without UNI or PON ports are not supported'
+ return False # UNI Ports handled in the EVC Maps
+
+ # ACL logic
+ self._eth_type = flow.eth_type
+
+ if self._eth_type == FlowEntry.EtherType.IPv4:
+ self._ip_protocol = flow.ip_protocol
+ self._ipv4_dst = flow.ipv4_dst
+
+ if self._ip_protocol == FlowEntry.IpProtocol.UDP:
+ self._udp_dst = flow.udp_dst
+ self._udp_src = flow.udp_src
+
+ # If no match of VLAN this may be for untagged traffic or upstream and needs to
+ # match the gem-port vid
+
+ self._setup_tech_profiles()
+
+ # self._match_untagged = flow.vlan_id is None and flow.inner_vid is None
+ self._c_tag = flow.inner_vid or flow.vlan_id
+
+ # If a push of a single VLAN is present with a POP of the VLAN in the EVC's
+ # flow, then this is a traditional EVC flow
+
+ evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
+ evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED \
+ if self._c_tag is not None else EVC.SwitchingMethod.SINGLE_TAGGED
+
+ try:
+ acl = ACL.create(flow)
+ if acl.name not in self._new_acls:
+ self._new_acls[acl.name] = acl
+
+ except Exception as e:
+ log.exception('ACL-decoding', e=e)
+ return False
+
+ return True
+
+ # Bulk operations
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_MAP_NAME_REGEX_ALL):
+ """
+ Remove all matching EVC Maps from hardware
+
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ # Do a 'get' on the evc-map config an you should get the names
+ get_xml = """
+ <filter>
+ <evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">
+ <evc-map>
+ <name/>
+ </evc-map>
+ </evc-maps>
+ </filter>
+ """
+ log.debug('query', xml=get_xml, regex=regex_)
+
+ def request_failed(results, operation):
+ log.error('{}-failed'.format(operation), results=results)
+ # No further actions. Periodic poll later on will scrub any old EVC-Maps if needed
+
+ def delete_complete(results):
+ log.debug('delete-complete', results=results)
+
+ def do_delete(rpc_reply, regexpr):
+ log.debug('query-complete', rpc_reply=rpc_reply)
+
+ if rpc_reply.ok:
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['evc-maps'] if 'evc-maps' in result_dict['data'] else {}
+
+ if 'evc-map' in entries:
+ p = re.compile(regexpr)
+
+ if isinstance(entries['evc-map'], list):
+ names = {entry['name'] for entry in entries['evc-map']
+ if 'name' in entry and p.match(entry['name'])}
+ else:
+ names = set()
+ for item in entries['evc-map'].items():
+ if isinstance(item, tuple) and item[0] == 'name':
+ names.add(item[1])
+ break
+
+ if len(names) > 0:
+ del_xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' + \
+ ' xc:operation = "delete">'
+ for name in names:
+ del_xml += '<evc-map>'
+ del_xml += '<name>{}</name>'.format(name)
+ del_xml += '</evc-map>'
+ del_xml += '</evc-maps>'
+ log.debug('removing', xml=del_xml)
+
+ return client.edit_config(del_xml)
+
+ return succeed('no entries')
+
+ d = client.get(get_xml)
+ d.addCallbacks(do_delete, request_failed, callbackArgs=[regex_], errbackArgs=['get'])
+ d.addCallbacks(delete_complete, request_failed, errbackArgs=['edit-config'])
+ return d
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
diff --git a/adapters/adtran_common/flow/flow_entry.py b/adapters/adtran_common/flow/flow_entry.py
new file mode 100644
index 0000000..cb8dd4a
--- /dev/null
+++ b/adapters/adtran_common/flow/flow_entry.py
@@ -0,0 +1,821 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from evc import EVC
+from evc_map import EVCMap
+from enum import IntEnum
+from utility_evc import UtilityEVC
+import pyvoltha.common.openflow.utils as fd
+from pyvoltha.protos.openflow_13_pb2 import OFPP_MAX, OFPP_CONTROLLER, OFPVID_PRESENT, OFPXMC_OPENFLOW_BASIC
+from twisted.internet.defer import returnValue, inlineCallbacks, gatherResults
+
+log = structlog.get_logger()
+
+# IP Protocol numbers
+_supported_ip_protocols = [
+ 1, # ICMP
+ 2, # IGMP
+ 6, # TCP
+ 17, # UDP
+]
+
+
+class FlowEntry(object):
+ """
+ Provide a class that wraps the flow rule and also provides state/status for a FlowEntry.
+
+ When a new flow is sent, it is first decoded to check for any potential errors. If None are
+ found, the entry is created and it is analyzed to see if it can be combined to with any other flows
+ to create or modify an existing EVC.
+
+ Note: Since only E-LINE is supported, modification of an existing EVC is not performed.
+ """
+ class PortType(IntEnum):
+ NNI = 0 # NNI Port
+ UNI = 1 # UNI Port
+ PON = 2 # PON Port (all UNIs on PON)
+ CONTROLLER = 3 # Controller port (packet in/out)
+
+ class FlowDirection(IntEnum):
+ UPSTREAM = 0 # UNI port to NNI Port
+ DOWNSTREAM = 1 # NNI port to UNI Port
+ CONTROLLER_UNI = 2 # Trap packet on UNI and send to controller
+ NNI_PON = 3 # NNI port to PON Port (all UNIs) - Utility VLAN & multicast
+
+ # The following are not yet supported
+ CONTROLLER_NNI = 4 # Trap packet on NNI and send to controller
+ CONTROLLER_PON = 5 # Trap packet on all UNIs of a PON and send to controller
+ NNI_NNI = 6 # NNI port to NNI Port
+ UNI_UNI = 7 # UNI port to UNI Port
+ OTHER = 9 # Unable to determine
+
+ upstream_flow_types = {FlowDirection.UPSTREAM, FlowDirection.CONTROLLER_UNI}
+ downstream_flow_types = {FlowDirection.DOWNSTREAM, FlowDirection.NNI_PON}
+
+ LEGACY_CONTROL_VLAN = 4000
+
+ # Well known EtherTypes
+ class EtherType(IntEnum):
+ EAPOL = 0x888E
+ IPv4 = 0x0800
+ IPv6 = 0x86DD
+ ARP = 0x0806
+ LLDP = 0x88CC
+
+ # Well known IP Protocols
+ class IpProtocol(IntEnum):
+ IGMP = 2
+ UDP = 17
+
+ def __init__(self, flow, handler):
+ self._flow = flow
+ self._handler = handler
+ self.flow_id = flow.id
+ self.evc = None # EVC this flow is part of
+ self.evc_map = None # EVC-MAP this flow is part of
+ self._flow_direction = FlowEntry.FlowDirection.OTHER
+ self._logical_port = None # Currently ONU VID is logical port if not doing xPON
+ self._is_multicast = False
+ self._is_acl_flow = False
+ self._bandwidth = None
+
+ # A value used to locate possible related flow entries
+ self.signature = None
+ self.downstream_signature = None # Valid for upstream EVC-MAP Flows
+
+ # Selection properties
+ self.in_port = None
+ self.vlan_id = None
+ self.pcp = None
+ self.eth_type = None
+ self.ip_protocol = None
+ self.ipv4_dst = None
+ self.udp_dst = None # UDP Port #
+ self.udp_src = None # UDP Port #
+ self.inner_vid = None
+
+ # Actions
+ self.output = None
+ self.pop_vlan = False
+ self.push_vlan_tpid = None
+ self.push_vlan_id = None
+
+ self._name = self.create_flow_name()
+
+ def __str__(self):
+ return 'flow_entry: {}, in: {}, out: {}, vid: {}, inner:{}, eth: {}, IP: {}'.format(
+ self.name, self.in_port, self.output, self.vlan_id, self.inner_vid,
+ self.eth_type, self.ip_protocol)
+
+ def __repr__(self):
+ return str(self)
+
+ @property
+ def name(self):
+ return self._name # TODO: Is a name really needed in production?
+
+ def create_flow_name(self):
+ return 'flow-{}-{}'.format(self.device_id, self.flow_id)
+
+ @property
+ def flow(self):
+ return self._flow
+
+ @property
+ def handler(self):
+ return self._handler
+
+ @property
+ def device_id(self):
+ return self.handler.device_id
+
+ @property
+ def bandwidth(self):
+ """ Bandwidth in Mbps (if any) """
+ return self._bandwidth
+
+ @property
+ def flow_direction(self):
+ return self._flow_direction
+
+ @property
+ def is_multicast_flow(self):
+ return self._is_multicast
+
+ @property
+ def is_acl_flow(self):
+ return self._is_acl_flow or self._needs_acl_support
+
+ @property
+ def logical_port(self):
+ return self._logical_port # NNI or UNI Logical Port
+
+ @staticmethod
+ def create(flow, handler):
+ """
+ Create the appropriate FlowEntry wrapper for the flow. This method returns a two
+ results.
+
+ The first result is the flow entry that was created. This could be a match to an
+ existing flow since it is a bulk update. None is returned only if no match to
+ an existing entry is found and decode failed (unsupported field)
+
+ The second result is the EVC this flow should be added to. This could be an
+ existing flow (so your adding another EVC-MAP) or a brand new EVC (no existing
+ EVC-MAPs). None is returned if there are not a valid EVC that can be created YET.
+
+ :param flow: (Flow) Flow entry passed to VOLTHA adapter
+ :param handler: (AdtranDeviceHandler) handler for the device
+ :return: (FlowEntry, EVC)
+ """
+ # Exit early if it already exists
+ try:
+ flow_entry = FlowEntry(flow, handler)
+
+ ######################################################################
+ # Decode the flow entry
+ if not flow_entry._decode(flow):
+ # TODO: When we support individual flow mods, we will need to return
+ # this flow back always
+ return None, None
+
+ ######################################################################
+ # Initialize flow_entry database (dicts) if needed and determine if
+ # the flows have already been handled.
+ downstream_sig_table = handler.downstream_flows
+ upstream_flow_table = handler.upstream_flows
+
+ log.debug('flow-entry-decoded', flow=flow_entry, signature=flow_entry.signature,
+ downstream_signature=flow_entry.downstream_signature)
+
+ if flow_entry.flow_direction in FlowEntry.upstream_flow_types and\
+ flow_entry.flow_id in upstream_flow_table:
+ log.debug('flow-entry-upstream-exists', flow=flow_entry)
+ return flow_entry, None
+
+ if flow_entry.flow_direction in FlowEntry.downstream_flow_types:
+ sig_table = downstream_sig_table.get(flow_entry.signature)
+ if sig_table is not None and flow_entry in sig_table.flows:
+ log.debug('flow-entry-downstream-exists', flow=flow_entry)
+ return flow_entry, None
+
+ ######################################################################
+ # Look for any matching flows in the other direction that might help
+ # make an EVC and then save it off in the device specific flow table
+ #
+ # TODO: For now, only support for E-LINE services between NNI and UNI
+ downstream_flow = None
+ upstream_flows = None
+ downstream_sig = None
+
+ if flow_entry._is_multicast: # Uni-directional flow
+ assert flow_entry._flow_direction in FlowEntry.downstream_flow_types, \
+ 'Only downstream Multicast supported'
+ downstream_flow = flow_entry
+ downstream_sig = flow_entry.signature
+ upstream_flows = []
+
+ elif flow_entry.flow_direction in FlowEntry.downstream_flow_types:
+ downstream_flow = flow_entry
+ downstream_sig = flow_entry.signature
+
+ elif flow_entry.flow_direction in FlowEntry.upstream_flow_types:
+ downstream_sig = flow_entry.downstream_signature
+
+ if downstream_sig is None:
+ # TODO: When we support individual flow mods, we will need to return
+ # this flow back always
+ log.debug('flow-entry-empty-downstream', flow=flow_entry)
+ return None, None
+
+ # Make sure a slot exists for the downstream signature and get its flow table
+ downstream_sig_table = downstream_sig_table.add(downstream_sig)
+ evc = downstream_sig_table.evc
+
+ # Save the new flow_entry to proper flow table
+ if flow_entry.flow_direction in FlowEntry.upstream_flow_types:
+ upstream_flow_table.add(flow_entry)
+ downstream_flow = evc.flow_entry if evc is not None else \
+ next((_flow for _flow in downstream_sig_table.flows.itervalues()
+ if isinstance(_flow, FlowEntry)), None)
+
+ elif flow_entry.flow_direction in FlowEntry.downstream_flow_types:
+ downstream_sig_table.flows.add(flow_entry)
+
+ # Now find all the upstream flows
+ if downstream_flow is not None:
+ upstream_flows = [_flow for _flow in upstream_flow_table.itervalues()
+ if _flow.downstream_signature == downstream_flow.signature]
+ if len(upstream_flows) == 0 and not downstream_flow.is_multicast_flow:
+ upstream_flows = None
+
+ log.debug('flow-entry-search-results', flow=flow_entry,
+ downstream_flow=downstream_flow, upstream_flows=upstream_flows)
+
+ ######################################################################
+ # Compute EVC and and maps
+ evc = FlowEntry._create_evc_and_maps(evc, downstream_flow, upstream_flows)
+
+ # Save off EVC (if we have one) for this flow if it is new
+ if evc is not None and evc.valid and downstream_sig_table.evc is None:
+ downstream_sig_table.evc = evc
+
+ return flow_entry, evc
+
+ except Exception as e:
+ log.exception('flow-entry-processing', e=e)
+ return None, None
+
+ @staticmethod
+ def _create_evc_and_maps(evc, downstream_flow, upstream_flows):
+ """
+ Give a set of flows, find (or create) the EVC and any needed EVC-MAPs
+
+ :param evc: (EVC) Existing EVC for downstream flow. May be null if not created
+ :param downstream_flow: (FlowEntry) NNI -> UNI flow (provides much of the EVC values)
+ :param upstream_flows: (list of FlowEntry) UNI -> NNI flows (provides much of the EVC-MAP values)
+
+ :return: EVC object
+ """
+ log.debug('flow-evc-and-maps', downstream_flow=downstream_flow,
+ upstream_flows=upstream_flows)
+
+ if (evc is None and downstream_flow is None) or upstream_flows is None:
+ return None
+
+ # Get any existing EVC if a flow is already created
+ if downstream_flow.evc is None:
+ if evc is not None:
+ downstream_flow.evc = evc
+
+ elif downstream_flow.is_multicast_flow:
+ from mcast import MCastEVC
+ downstream_flow.evc = MCastEVC.create(downstream_flow)
+
+ elif downstream_flow.is_acl_flow:
+ downstream_flow.evc = downstream_flow.get_utility_evc()
+ else:
+ downstream_flow.evc = EVC(downstream_flow)
+
+ if not downstream_flow.evc.valid:
+ log.debug('flow-evc-and-maps-downstream-invalid',
+ downstream_flow=downstream_flow,
+ upstream_flows=upstream_flows)
+ return None
+
+ # Create EVC-MAPs. Note upstream_flows is empty list for multicast
+ # For Packet In/Out support. The upstream flows for will have matching
+ # signatures. So the first one to get created should create the EVC and
+ # if it needs and ACL, do so then. The second one should just reference
+ # the first map.
+ #
+ # If the second has and ACL, then it should add it to the map.
+ # TODO: What to do if the second (or third, ...) is the data one.
+ # What should it do then?
+ sig_map_map = {f.signature: f.evc_map for f in upstream_flows
+ if f.evc_map is not None}
+
+ for flow in upstream_flows:
+ if flow.evc_map is None:
+ if flow.signature in sig_map_map:
+ # Found an explicitly matching existing EVC-MAP. Add flow to this EVC-MAP
+ flow.evc_map = sig_map_map[flow.signature].add_flow(flow, downstream_flow.evc)
+ else:
+ # May need to create a MAP or search for an existing ACL/user EVC-Map
+ # upstream_flow_table = _existing_upstream_flow_entries[flow.device_id]
+ upstream_flow_table = flow.handler.upstream_flows
+ existing_flow = EVCMap.find_matching_ingress_flow(flow, upstream_flow_table)
+
+ if existing_flow is None:
+ flow.evc_map = EVCMap.create_ingress_map(flow, downstream_flow.evc)
+ else:
+ flow.evc_map = existing_flow.add_flow(flow, downstream_flow.evc)
+
+ all_maps_valid = all(flow.evc_map.valid for flow in upstream_flows) \
+ or downstream_flow.is_multicast_flow
+
+ log.debug('flow-evc-and-maps-downstream',
+ downstream_flow=downstream_flow,
+ upstream_flows=upstream_flows, all_valid=all_maps_valid)
+
+ return downstream_flow.evc if all_maps_valid else None
+
+ def get_utility_evc(self, use_default_vlan_id=False):
+ assert self.is_acl_flow, 'Utility evcs are for acl flows only'
+ return UtilityEVC.create(self, use_default_vlan_id)
+
+ @property
+ def _needs_acl_support(self):
+ if self.ipv4_dst is not None: # In case MCAST downstream has ACL on it
+ return False
+
+ return self.eth_type is not None or self.ip_protocol is not None or\
+ self.ipv4_dst is not None or self.udp_dst is not None or self.udp_src is not None
+
+ def _decode(self, flow):
+ """
+ Examine flow rules and extract appropriate settings
+ """
+ log.debug('start-decode')
+ status = self._decode_traffic_selector(flow) and self._decode_traffic_treatment(flow)
+
+ # Determine direction of the flow and apply appropriate modifications
+ # to the decoded flows
+ if status:
+ if not self._decode_flow_direction():
+ return False
+
+ if self._flow_direction in FlowEntry.downstream_flow_types:
+ status = self._apply_downstream_mods()
+
+ elif self._flow_direction in FlowEntry.upstream_flow_types:
+ status = self._apply_upstream_mods()
+
+ else:
+ # TODO: Need to code this - Perhaps this is an NNI_PON for Multicast support?
+ log.error('unsupported-flow-direction')
+ status = False
+
+ log.debug('flow-evc-decode', direction=self._flow_direction, is_acl=self._is_acl_flow,
+ inner_vid=self.inner_vid, vlan_id=self.vlan_id, pop_vlan=self.pop_vlan,
+ push_vid=self.push_vlan_id, status=status)
+
+ # Create a signature that will help locate related flow entries on a device.
+ if status:
+ # These are not exact, just ones that may be put together to make an EVC. The
+ # basic rules are:
+ #
+ # 1 - Port numbers in increasing order
+ ports = [self.in_port, self.output]
+ ports.sort()
+ assert len(ports) == 2, 'Invalid port count: {}'.format(len(ports))
+
+ # 3 - The outer VID
+ # 4 - The inner VID. Wildcard if downstream
+ if self.push_vlan_id is None:
+ outer = self.vlan_id
+ inner = self.inner_vid
+ else:
+ outer = self.push_vlan_id
+ inner = self.vlan_id
+
+ upstream_sig = '{}'.format(ports[0])
+ downstream_sig = '{}'.format(ports[0])
+ upstream_sig += '.{}'.format(ports[1])
+ downstream_sig += '.{}'.format(ports[1] if self.handler.is_nni_port(ports[1]) else '*')
+
+ upstream_sig += '.{}.{}'.format(outer, inner)
+ downstream_sig += '.{}.*'.format(outer)
+
+ if self._flow_direction in FlowEntry.downstream_flow_types:
+ self.signature = downstream_sig
+
+ elif self._flow_direction in FlowEntry.upstream_flow_types:
+ self.signature = upstream_sig
+ self.downstream_signature = downstream_sig
+
+ else:
+ log.error('unsupported-flow')
+ status = False
+
+ log.debug('flow-evc-decode', upstream_sig=self.signature, downstream_sig=self.downstream_signature)
+ return status
+
+ def _decode_traffic_selector(self, flow):
+ """
+ Extract EVC related traffic selection settings
+ """
+ self.in_port = fd.get_in_port(flow)
+
+ if self.in_port > OFPP_MAX:
+ log.warn('logical-input-ports-not-supported', in_port=self.in_port)
+ return False
+
+ for field in fd.get_ofb_fields(flow):
+ if field.type == fd.IN_PORT:
+ if self._handler.is_nni_port(self.in_port) or self._handler.is_uni_port(self.in_port):
+ self._logical_port = self.in_port
+
+ elif field.type == fd.VLAN_VID:
+ if field.vlan_vid >= OFPVID_PRESENT + 4095:
+ self.vlan_id = None # pre-ONOS v1.13.5 or old EAPOL Rule
+ else:
+ self.vlan_id = field.vlan_vid & 0xfff
+
+ log.debug('*** field.type == VLAN_VID', value=field.vlan_vid, vlan_id=self.vlan_id)
+
+ elif field.type == fd.VLAN_PCP:
+ log.debug('*** field.type == VLAN_PCP', value=field.vlan_pcp)
+ self.pcp = field.vlan_pcp
+
+ elif field.type == fd.ETH_TYPE:
+ log.debug('*** field.type == ETH_TYPE', value=field.eth_type)
+ self.eth_type = field.eth_type
+
+ elif field.type == fd.IP_PROTO:
+ log.debug('*** field.type == IP_PROTO', value=field.ip_proto)
+ self.ip_protocol = field.ip_proto
+
+ if self.ip_protocol not in _supported_ip_protocols:
+ log.error('Unsupported IP Protocol', protocol=self.ip_protocol)
+ return False
+
+ elif field.type == fd.IPV4_DST:
+ log.debug('*** field.type == IPV4_DST', value=field.ipv4_dst)
+ self.ipv4_dst = field.ipv4_dst
+
+ elif field.type == fd.UDP_DST:
+ log.debug('*** field.type == UDP_DST', value=field.udp_dst)
+ self.udp_dst = field.udp_dst
+
+ elif field.type == fd.UDP_SRC:
+ log.debug('*** field.type == UDP_SRC', value=field.udp_src)
+ self.udp_src = field.udp_src
+
+ elif field.type == fd.METADATA:
+ if self._handler.is_nni_port(self.in_port):
+ # Downstream flow
+ log.debug('*** field.type == METADATA', value=field.table_metadata)
+
+ if field.table_metadata > 4095:
+ # ONOS v1.13.5 or later. c-vid in upper 32-bits
+ vid = field.table_metadata & 0x0FFF
+ if vid > 0:
+ self.inner_vid = vid # CTag is never '0'
+
+ elif field.table_metadata > 0:
+ # Pre-ONOS v1.13.5 (vid without the 4096 offset)
+ self.inner_vid = field.table_metadata
+
+ else:
+ # Upstream flow
+ pass # Not used upstream at this time
+
+ log.debug('*** field.type == METADATA', value=field.table_metadata,
+ inner_vid=self.inner_vid)
+ else:
+ log.warn('unsupported-selection-field', type=field.type)
+ self._status_message = 'Unsupported field.type={}'.format(field.type)
+ return False
+
+ return True
+
+ def _decode_traffic_treatment(self, flow):
+ # Loop through traffic treatment
+ for act in fd.get_actions(flow):
+ if act.type == fd.OUTPUT:
+ self.output = act.output.port
+
+ elif act.type == fd.POP_VLAN:
+ log.debug('*** action.type == POP_VLAN')
+ self.pop_vlan = True
+
+ elif act.type == fd.PUSH_VLAN:
+ log.debug('*** action.type == PUSH_VLAN', value=act.push)
+ tpid = act.push.ethertype
+ self.push_vlan_tpid = tpid
+
+ elif act.type == fd.SET_FIELD:
+ log.debug('*** action.type == SET_FIELD', value=act.set_field.field)
+ assert (act.set_field.field.oxm_class == OFPXMC_OPENFLOW_BASIC)
+ field = act.set_field.field.ofb_field
+
+ if field.type == fd.VLAN_VID:
+ self.push_vlan_id = field.vlan_vid & 0xfff
+ else:
+ log.debug('unsupported-set-field')
+ else:
+ log.warn('unsupported-action', action=act)
+ self._status_message = 'Unsupported action.type={}'.format(act.type)
+ return False
+
+ return True
+
+ def _decode_flow_direction(self):
+ # Determine direction of the flow
+ def port_type(port_number):
+ if port_number in self._handler.northbound_ports:
+ return FlowEntry.PortType.NNI
+
+ elif port_number in self._handler.southbound_ports:
+ return FlowEntry.PortType.PON
+
+ elif port_number <= OFPP_MAX:
+ return FlowEntry.PortType.UNI
+
+ elif port_number in {OFPP_CONTROLLER, 0xFFFFFFFD}: # OFPP_CONTROLLER is wrong in proto-file
+ return FlowEntry.PortType.CONTROLLER
+
+ return FlowEntry.PortType.OTHER
+
+ flow_dir_map = {
+ (FlowEntry.PortType.UNI, FlowEntry.PortType.NNI): FlowEntry.FlowDirection.UPSTREAM,
+ (FlowEntry.PortType.NNI, FlowEntry.PortType.UNI): FlowEntry.FlowDirection.DOWNSTREAM,
+ (FlowEntry.PortType.UNI, FlowEntry.PortType.CONTROLLER): FlowEntry.FlowDirection.CONTROLLER_UNI,
+ (FlowEntry.PortType.NNI, FlowEntry.PortType.PON): FlowEntry.FlowDirection.NNI_PON,
+ # The following are not yet supported
+ # (FlowEntry.PortType.NNI, FlowEntry.PortType.CONTROLLER): FlowEntry.FlowDirection.CONTROLLER_NNI,
+ # (FlowEntry.PortType.PON, FlowEntry.PortType.CONTROLLER): FlowEntry.FlowDirection.CONTROLLER_PON,
+ # (FlowEntry.PortType.NNI, FlowEntry.PortType.NNI): FlowEntry.FlowDirection.NNI_NNI,
+ # (FlowEntry.PortType.UNI, FlowEntry.PortType.UNI): FlowEntry.FlowDirection.UNI_UNI,
+ }
+ self._flow_direction = flow_dir_map.get((port_type(self.in_port), port_type(self.output)),
+ FlowEntry.FlowDirection.OTHER)
+ return self._flow_direction != FlowEntry.FlowDirection.OTHER
+
+ def _apply_downstream_mods(self):
+ # This is a downstream flow. It could be any one of the following:
+ #
+ # Legacy control VLAN:
+ # This is the old VLAN 4000 that was used to attach EAPOL and other
+ # controller flows to. Eventually these will change to CONTROLLER_UNI
+ # flows. For these, use the 'utility' VLAN instead so 4000 if available
+ # for other uses (AT&T uses it for downstream multicast video).
+ #
+ # Multicast VLAN:
+ # This is downstream multicast data.
+ # TODO: Test this to see if this needs to be in a separate NNI_PON mod-method
+ #
+ # User Data flow:
+ # This is for user data. Eventually we may need to support ACLs?
+ #
+ # May be for to controller flow downstream (no ethType)
+ if self.vlan_id == FlowEntry.LEGACY_CONTROL_VLAN and self.eth_type is None and self.pcp == 0:
+ return False # Do not install this flow. Utility VLAN is in charge
+
+ elif self.flow_direction == FlowEntry.FlowDirection.NNI_PON and \
+ self.vlan_id == self.handler.utility_vlan:
+ # Utility VLAN downstream flow/EVC
+ self._is_acl_flow = True
+
+ elif self.vlan_id in self._handler.multicast_vlans:
+ # multicast (ethType = IP) # TODO: May need to be an NNI_PON flow
+ self._is_multicast = True
+ self._is_acl_flow = True
+
+ else:
+ # Currently do not support ACLs on user data flows downstream
+ assert not self._needs_acl_support # User data, no special modifications needed at this time
+
+ return True
+
+ def _apply_upstream_mods(self):
+ #
+ # This is an upstream flow. It could be any of the following
+ #
+ # ACL/Packet capture:
+ # This is either a legacy (FlowDirection.UPSTREAM) or a new one
+ # that specifies an output port of controller (FlowDirection.CONTROLLER_UNI).
+ # Either way, these need to be placed on the Utility VLAN if the ONU attached
+ # does not have a user-data flow (C-Tag). If there is a C-Tag available,
+ # then place it on that VLAN.
+ #
+ # Once a user-data flow is established, move any of the ONUs ACL flows
+ # over to that VLAN (this is handled elsewhere).
+ #
+ # User Data flows:
+ # No special modifications are needed
+ #
+ try:
+ # Do not handle PON level ACLs in this method
+ assert(self._flow_direction != FlowEntry.FlowDirection.CONTROLLER_PON)
+
+ # Is this a legacy (VLAN 4000) upstream to-controller flow
+ if self._needs_acl_support and FlowEntry.LEGACY_CONTROL_VLAN == self.push_vlan_id:
+ self._flow_direction = FlowEntry.FlowDirection.CONTROLLER_UNI
+ self._is_acl_flow = True
+ self.push_vlan_id = self.handler.utility_vlan
+
+ return True
+
+ except Exception as e:
+ # TODO: Need to support flow retry if the ONU is not yet activated !!!!
+ log.exception('tag-fixup', e=e)
+ return False
+
+ @staticmethod
+ def drop_missing_flows(handler, valid_flow_ids):
+ dl = []
+ try:
+ flow_table = handler.upstream_flows
+ flows_to_drop = [flow for flow_id, flow in flow_table.items()
+ if flow_id not in valid_flow_ids]
+ dl.extend([flow.remove() for flow in flows_to_drop])
+
+ for sig_table in handler.downstream_flows.itervalues():
+ flows_to_drop = [flow for flow_id, flow in sig_table.flows.items()
+ if isinstance(flow, FlowEntry) and flow_id not in valid_flow_ids]
+ dl.extend([flow.remove() for flow in flows_to_drop])
+
+ except Exception as _e:
+ pass
+
+ return gatherResults(dl, consumeErrors=True) if len(dl) > 0 else returnValue('no-flows-to-drop')
+
+ @inlineCallbacks
+ def remove(self):
+ """
+ Remove this flow entry from the list of existing entries and drop EVC
+ if needed
+ """
+ # Remove from exiting table list
+ flow_id = self.flow_id
+ flow_table = None
+
+ if self.flow_direction in FlowEntry.upstream_flow_types:
+ flow_table = self._handler.upstream_flows
+
+ elif self.flow_direction in FlowEntry.downstream_flow_types:
+ sig_table = self._handler.downstream_flows.get(self.signature)
+ flow_table = sig_table.flows if sig_table is not None else None
+
+ if flow_table is None or flow_id not in flow_table.keys():
+ returnValue('NOP')
+
+ # Remove from flow table and clean up flow table if empty
+ flow_table.remove(flow_id)
+ evc_map, self.evc_map = self.evc_map, None
+ evc = None
+
+ if self.flow_direction in FlowEntry.downstream_flow_types:
+ sig_table = self._handler.downstream_flows.get(self.signature)
+ if len(flow_table) == 0: # Only 'evc' entry present
+ evc = sig_table.evc
+ else:
+ assert sig_table.evc is not None, 'EVC flow re-assignment error'
+
+ # Remove flow from the hardware
+ try:
+ dl = []
+ if evc_map is not None:
+ dl.append(evc_map.delete(self))
+
+ if evc is not None:
+ dl.append(evc.delete())
+
+ yield gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ if self.flow_direction in FlowEntry.downstream_flow_types:
+ # If this flow owns the EVC, assign it to a remaining flow
+ sig_table = self._handler.downstream_flows.get(self.signature)
+ flow_evc = sig_table.evc
+
+ if flow_evc is not None and flow_evc.flow_entry is not None and flow_id == flow_evc.flow_entry.flow_id:
+ flow_evc.flow_entry = next((_flow for _flow in flow_table.itervalues()
+ if isinstance(_flow, FlowEntry)
+ and _flow.flow_id != flow_id), None)
+
+ # If evc was deleted, remove the signature table since now flows exist with
+ # that signature
+ if evc is not None:
+ self._handler.downstream_flows.remove(self.signature)
+
+ self.evc = None
+ returnValue('Done')
+
+ @staticmethod
+ def find_evc_map_flows(onu):
+ """
+ For a given OLT, find all the EVC Maps for a specific ONU
+ :param onu: (Onu) onu
+ :return: (list) of matching flows
+ """
+ # EVCs are only in the downstream table, EVC Maps are in upstream
+ onu_ports = onu.uni_ports
+
+ all_flow_entries = onu.olt.upstream_flows
+ evc_maps = [flow_entry.evc_map for flow_entry in all_flow_entries.itervalues()
+ if flow_entry.in_port in onu_ports
+ and flow_entry.evc_map is not None
+ and flow_entry.evc_map.valid]
+
+ return evc_maps
+
+ @staticmethod
+ def sync_flows_by_onu(onu, reflow=False):
+ """
+ Check status of all flows on a per-ONU basis. Called when values
+ within the ONU are modified that may affect traffic.
+
+ :param onu: (Onu) ONU to examine
+ :param reflow: (boolean) Flag, if True, requests that the flow be sent to
+ hardware even if the values in hardware are
+ consistent with the current flow settings
+ """
+ evc_maps = FlowEntry.find_evc_map_flows(onu)
+ evcs = {}
+
+ for evc_map in evc_maps:
+ if reflow or evc_map.reflow_needed():
+ evc_map.needs_update = False
+
+ if not evc_map.installed:
+ evc = evc_map.evc
+ if evc is not None:
+ evcs[evc.name] = evc
+
+ for evc in evcs.itervalues():
+ evc.installed = False
+ evc.schedule_install(delay=2)
+
+ ######################################################
+ # Bulk operations
+
+ @staticmethod
+ def clear_all(handler):
+ """
+ Remove all flows for the device.
+
+ :param handler: voltha adapter device handler
+ """
+ handler.downstream_flows.clear_all()
+ handler.upstream_flows.clear_all()
+
+ @staticmethod
+ def get_packetout_info(handler, logical_port):
+ """
+ Find parameters needed to send packet out successfully to the OLT.
+
+ :param handler: voltha adapter device handler
+ :param logical_port: (int) logical port number for packet to go out.
+
+ :return: physical port number, ctag, stag, evcmap name
+ """
+ from adapters.adtran_olt.onu import Onu
+
+ for flow_entry in handler.upstream_flows.itervalues():
+ log.debug('get-packetout-info', flow_entry=flow_entry)
+
+ # match logical port
+ if flow_entry.evc_map is not None and flow_entry.evc_map.valid and \
+ flow_entry.logical_port == logical_port:
+ evc_map = flow_entry.evc_map
+ gem_ids_and_vid = evc_map.gem_ids_and_vid
+
+ # must have valid gem id
+ if len(gem_ids_and_vid) > 0:
+ for onu_id, gem_ids_with_vid in gem_ids_and_vid.iteritems():
+ log.debug('get-packetout-info', onu_id=onu_id,
+ gem_ids_with_vid=gem_ids_with_vid)
+ if len(gem_ids_with_vid) > 0:
+ gem_ids = gem_ids_with_vid[0]
+ ctag = gem_ids_with_vid[1]
+ gem_id = gem_ids[0] # TODO: always grab first in list
+ return flow_entry.in_port, ctag, Onu.gem_id_to_gvid(gem_id), \
+ evc_map.get_evcmap_name(onu_id, gem_id)
+ return None, None, None, None
diff --git a/adapters/adtran_common/flow/flow_tables.py b/adapters/adtran_common/flow/flow_tables.py
new file mode 100644
index 0000000..48e2e7e
--- /dev/null
+++ b/adapters/adtran_common/flow/flow_tables.py
@@ -0,0 +1,163 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from flow_entry import FlowEntry
+from evc import EVC
+
+
+class DeviceFlows(object):
+ """ Tracks existing flows on the device """
+
+ def __init__(self):
+ self._flow_table = dict() # Key = (str)Flow ID, Value = FlowEntry
+
+ def __getitem__(self, item):
+ flow_id = item.flow_id if isinstance(item, FlowEntry) else item
+ return self._flow_table[flow_id]
+
+ def __iter__(self):
+ for _flow_id, _flow in self._flow_table.items():
+ yield _flow_id, _flow
+
+ def itervalues(self):
+ for _flow in self._flow_table.values():
+ yield _flow
+
+ def iterkeys(self):
+ for _id in self._flow_table.keys():
+ yield _id
+
+ def items(self):
+ return self._flow_table.items()
+
+ def values(self):
+ return self._flow_table.values()
+
+ def keys(self):
+ return self._flow_table.keys()
+
+ def __len__(self):
+ return len(self._flow_table)
+
+ def add(self, flow):
+ assert isinstance(flow, FlowEntry)
+ if flow.flow_id not in self._flow_table:
+ self._flow_table[flow.flow_id] = flow
+ return flow
+
+ def get(self, item):
+ flow_id = item.flow_id if isinstance(item, FlowEntry) else item
+ return self._flow_table.get(flow_id)
+
+ def remove(self, item):
+ flow_id = item.flow_id if isinstance(item, FlowEntry) else item
+ return self._flow_table.pop(flow_id, None)
+
+ def clear_all(self):
+ self._flow_table = dict()
+
+
+class DownstreamFlows(object):
+ """
+ Tracks existing flows that are downstream (NNI as source port)
+
+ The downstream table is slightly different than the base DeviceFlows
+ table as it is used to track flows that will become EVCs. The base
+ table tracks flows that will be EVC-maps (or related to them).
+
+ The downstream table is also indexed by a downstream signature that
+ is composed as follows:
+
+ <dev-id>.<ingress-port-number>.<s-tag>.*
+
+ In comparison, the upstream flows is similar, but instead of '*' it has the
+ c-tag (if any).
+
+ TODO: Drop device ID from signatures once flow tables are unique to a device handler
+ """
+ def __init__(self):
+ self._signature_table = dict() # Key = (str)Downstream signature
+ # |
+ # +-> downstream-signature
+ # |
+ # +-> 'evc' -> EVC
+ # |
+ # +-> flow-ids -> flow-entries...
+
+ def __getitem__(self, signature):
+ assert isinstance(signature, str)
+ return self._signature_table[signature]
+
+ def __iter__(self):
+ for _flow_id, _flow in self._signature_table.items():
+ yield _flow_id, _flow
+
+ def itervalues(self):
+ for _flow in self._signature_table.values():
+ yield _flow
+
+ def iterkeys(self):
+ for _id in self._signature_table.keys():
+ yield _id
+
+ def items(self):
+ return self._signature_table.items()
+
+ def values(self):
+ return self._signature_table.values()
+
+ def keys(self):
+ return self._signature_table.keys()
+
+ def __len__(self):
+ return len(self._signature_table)
+
+ def get(self, signature):
+ assert isinstance(signature, str)
+ return self._signature_table.get(signature)
+
+ def add(self, signature):
+ assert isinstance(signature, str)
+ """
+ Can be called by upstream flow to reserve a slot
+ """
+ if signature not in self._signature_table:
+ self._signature_table[signature] = DownstreamFlows.SignatureTableEntry(signature)
+ return self._signature_table[signature]
+
+ def remove(self, signature):
+ assert isinstance(signature, str)
+ return self._signature_table.pop(signature)
+
+ def clear_all(self):
+ self._signature_table = dict()
+
+ class SignatureTableEntry(object):
+ def __init__(self, signature):
+ self._signature = signature
+ self._evc = None
+ self._flow_table = DeviceFlows()
+
+ @property
+ def evc(self):
+ return self._evc
+
+ @evc.setter
+ def evc(self, evc):
+ assert isinstance(evc, (EVC, type(None)))
+ self._evc = evc
+
+ @property
+ def flows(self):
+ return self._flow_table
diff --git a/adapters/adtran_common/flow/mcast.py b/adapters/adtran_common/flow/mcast.py
new file mode 100644
index 0000000..54bf24f
--- /dev/null
+++ b/adapters/adtran_common/flow/mcast.py
@@ -0,0 +1,183 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyvoltha.common.openflow.utils import *
+from evc import EVC
+from flow_entry import FlowEntry
+from twisted.internet import defer
+from twisted.internet.defer import returnValue, inlineCallbacks
+
+log = structlog.get_logger()
+
+EVC_NAME_FORMAT = 'VOLTHA-MCAST-{}' # format(flow.vlan_id)
+EVC_NAME_REGEX_ALL = EVC_NAME_FORMAT.format('*')
+
+
+_mcast_evcs = {} # device-id -> flow dictionary
+ # |
+ # +-> vlan-id -> evcs
+
+
+class MCastEVC(EVC):
+ """
+ Class to wrap Multicast EVC and EVC-MAP functionality
+ """
+ def __init__(self, flow_entry):
+ super(MCastEVC, self).__init__(flow_entry)
+ self._downstream_flows = {flow_entry.flow_id} # Matching Downstream Flow IDs
+
+ def __str__(self):
+ return "MCAST-{}: MEN: {}, VLAN: {}".format(self._name, self._men_ports, self._s_tag)
+
+ def _create_name(self):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.vlan_id)
+
+ def _create_evc_map(self, flow_entry):
+ from evc_map import EVCMap
+ flow = FakeUpstreamFlow(flow_entry.flow, flow_entry.handler)
+ return EVCMap.create_ingress_map(flow, self)
+
+ @staticmethod
+ def create(flow_entry):
+ from evc_map import EVCMap
+
+ device_id = flow_entry.device_id
+ if device_id not in _mcast_evcs:
+ _mcast_evcs[device_id] = {}
+
+ evc_table = _mcast_evcs[device_id]
+
+ try:
+ evc = evc_table.get(flow_entry.vlan_id)
+
+ if evc is None:
+ # Create EVC and initial EVC Map
+ evc = MCastEVC(flow_entry)
+ evc_table[flow_entry.vlan_id] = evc
+ else:
+ if flow_entry.flow_id in evc.downstream_flows: # TODO: Debug only to see if flow_ids are unique
+ pass
+ else:
+ evc.add_downstream_flows(flow_entry.flow_id)
+
+ fake_flow = FakeUpstreamFlow(flow_entry.flow, flow_entry.handler)
+ evc_map_name = EVCMap.create_evc_map_name(fake_flow)
+
+ if evc_map_name not in evc.evc_map_names:
+ EVCMap.create_ingress_map(fake_flow, evc)
+
+ return evc
+
+ except Exception as e:
+ log.exception('mcast-create', e=e)
+ return None
+
+ @property
+ def flow_entry(self):
+ return self._flow
+
+ @property
+ def downstream_flows(self):
+ return frozenset(self._downstream_flows)
+
+ def add_downstream_flows(self, flow_id):
+ self._downstream_flows.add(flow_id)
+
+ def remove_downstream_flows(self, flow_id):
+ self._downstream_flows.discard(flow_id)
+
+ @inlineCallbacks
+ def remove(self, remove_maps=True):
+ """
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
+ """
+ log.info('removing', evc=self, remove_maps=remove_maps)
+
+ device_id = self._handler.device_id
+ flow_id = self._flow.id
+ evc_table = _mcast_evcs.get(device_id)
+
+ if evc_table is None or flow_id not in evc_table:
+ returnValue('NOP')
+
+ # Remove flow reference
+ if self._flow.flow_id in self._downstream_flows:
+ del self._downstream_flows[self._flow.flow_id]
+
+ if len(self._downstream_flows) == 0:
+ # Use base class to clean up
+ returnValue(super(MCastEVC, self).remove(remove_maps=True))
+
+ returnValue('More references')
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
+ try:
+ dl = [self.remove()]
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete(self)) # TODO: implement bulk-flow procedures
+
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ def reflow(self, reflow_maps=True):
+ pass # TODO: Implement or use base class?
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_NAME_REGEX_ALL):
+ """
+ Remove all matching EVCs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ pass # TODO: ???
+
+
+class FakeUpstreamFlow(FlowEntry):
+ def __init__(self, flow, handler):
+ super(FakeUpstreamFlow, self).__init__(flow, handler)
+ self._decode()
+ # Change name that the base class set
+ self._name = self.create_flow_name()
+ self._flow_direction = FlowEntry.FlowDirection.UPSTREAM
+ self.in_port, self.output = self.output, self.in_port
+ self.flow_id = '{}-MCAST'.format(self.vlan_id)
+ self._logical_port = self.vlan_id
+ self.push_vlan_id = self.vlan_id
+ self.vlan_id = None
+ self.signature = None
+ self.inner_vid = None
+ self.pop_vlan = False
+
+ def create_flow_name(self):
+ return 'flow-{}-{}-MCAST'.format(self.device_id, self.vlan_id)
diff --git a/adapters/adtran_common/flow/utility_evc.py b/adapters/adtran_common/flow/utility_evc.py
new file mode 100644
index 0000000..fc7fd0b
--- /dev/null
+++ b/adapters/adtran_common/flow/utility_evc.py
@@ -0,0 +1,158 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from pyvoltha.common.openflow.utils import *
+from evc import EVC
+from twisted.internet import defer
+from twisted.internet.defer import returnValue, inlineCallbacks
+
+log = structlog.get_logger()
+
+EVC_NAME_FORMAT = 'VOLTHA-UTILITY-{}' # format(flow.vlan_id)
+EVC_NAME_REGEX_ALL = EVC_NAME_FORMAT.format('*')
+
+
+_utility_evcs = {} # device-id -> flow dictionary
+ # |
+ # +-> utility-vlan-id -> evcs
+
+
+class UtilityEVC(EVC):
+ """
+ Class to wrap orphan ingress ACLs EVC functionality
+ """
+ def __init__(self, flow_entry):
+ super(UtilityEVC, self).__init__(flow_entry)
+ self._downstream_flows = {flow_entry.flow_id} # Matching Downstream Flow IDs
+ self.service_evc = True
+
+ def __str__(self):
+ return "VOLTHA-UTILITY-{}: MEN: {}, VLAN: {}".format(self._name, self._men_ports, self._s_tag)
+
+ def _create_name(self, vlan_id=None):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.vlan_id if vlan_id is None else vlan_id)
+
+ @staticmethod
+ def create(flow_entry, use_default_vlan_id=False):
+ device_id = flow_entry.device_id
+ vlan_id = flow_entry.vlan_id if not use_default_vlan_id else flow_entry.handler.utility_vlan
+ evc_table = _utility_evcs.get(device_id)
+
+ if evc_table is None:
+ _utility_evcs[device_id] = dict()
+ evc_table = _utility_evcs[device_id]
+
+ try:
+ evc = evc_table.get(vlan_id)
+
+ if evc is None:
+ # Create EVC and initial EVC Map
+ evc = UtilityEVC(flow_entry)
+
+ # reapply the stag and name if forced vlan id
+ if use_default_vlan_id:
+ evc._s_tag = vlan_id
+ evc._name = evc._create_name(vlan_id)
+
+ evc_table[vlan_id] = evc
+ else:
+ if flow_entry.flow_id in evc.downstream_flows: # TODO: Debug only to see if flow_ids are unique
+ pass
+ else:
+ evc.add_downstream_flows(flow_entry.flow_id)
+
+ return evc
+
+ except Exception as e:
+ log.exception('utility-create', e=e)
+ return None
+
+ @property
+ def downstream_flows(self):
+ return frozenset(self._downstream_flows)
+
+ def add_downstream_flows(self, flow_id):
+ self._downstream_flows.add(flow_id)
+
+ def remove_downstream_flows(self, flow_id):
+ self._downstream_flows.discard(flow_id)
+
+ def remove(self, remove_maps=True):
+ """
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
+ """
+ log.info('removing', evc=self, remove_maps=remove_maps)
+
+ device_id = self._flow.handler.device_id
+ flow_id = self._flow.flow_id
+ evc_table = _utility_evcs.get(device_id)
+
+ if evc_table is None:
+ return defer.succeed('NOP')
+
+ # Remove flow reference
+ if self._flow.flow_id in self._downstream_flows:
+ self._downstream_flows.discard(self._flow.flow_id)
+
+ if len(self._downstream_flows) == 0:
+ # Use base class to clean up
+ return super(UtilityEVC, self).remove(remove_maps=True)
+
+ return defer.succeed('More references')
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ :return: (deferred)
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
+ assert self._flow, 'Delete EVC must have flow reference'
+ try:
+ dl = [self.remove()]
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete(None)) # TODO: implement bulk-flow procedures
+
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ returnValue('Done')
+
+ def reflow(self, reflow_maps=True):
+ pass # TODO: Implement or use base class?
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_NAME_REGEX_ALL):
+ """
+ Remove all matching EVCs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ _utility_evcs.clear()
+ EVC.remove_all(client, regex_)
\ No newline at end of file
diff --git a/adapters/adtran_common/net/__init__.py b/adapters/adtran_common/net/__init__.py
new file mode 100644
index 0000000..d67fcf2
--- /dev/null
+++ b/adapters/adtran_common/net/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_common/net/adtran_netconf.py b/adapters/adtran_common/net/adtran_netconf.py
new file mode 100644
index 0000000..4e39a6a
--- /dev/null
+++ b/adapters/adtran_common/net/adtran_netconf.py
@@ -0,0 +1,373 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from lxml import etree
+from ncclient import manager
+from ncclient.operations import RPCError
+from ncclient.transport.errors import SSHError
+from twisted.internet import defer, threads
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = structlog.get_logger('ncclient')
+
+ADTRAN_NS = 'http://www.adtran.com/ns/yang'
+
+
+def adtran_module_url(module):
+ return '{}/{}'.format(ADTRAN_NS, module)
+
+
+def phys_entities_rpc():
+ return """
+ <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+ <physical-entities-state xmlns="{}">
+ <physical-entity/>
+ </physical-entities-state>
+ </filter>
+ """.format(adtran_module_url('adtran-physical-entities'))
+
+
+class AdtranNetconfClient(object):
+ """
+ Performs NETCONF requests
+ """
+ def __init__(self, host_ip, port=830, username='', password='', timeout=10):
+ self._ip = host_ip
+ self._port = port
+ self._username = username
+ self._password = password
+ self._timeout = timeout
+ self._session = None
+
+ def __str__(self):
+ return "AdtranNetconfClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+ @property
+ def capabilities(self):
+ """
+ Get the server's NETCONF capabilities
+
+ :return: (ncclient.capabilities.Capabilities) object representing the server's capabilities.
+ """
+ return self._session.server_capabilities if self._session else None
+
+ @property
+ def connected(self):
+ """
+ Is this client connected to a NETCONF server
+ :return: (boolean) True if connected
+ """
+ return self._session is not None and self._session.connected
+
+ def connect(self, connect_timeout=None):
+ """
+ Connect to the NETCONF server
+
+ o To disable attempting publickey authentication altogether, call with
+ allow_agent and look_for_keys as False.
+
+ o hostkey_verify enables hostkey verification from ~/.ssh/known_hosts
+
+ :return: (deferred) Deferred request
+ """
+ timeout = connect_timeout or self._timeout
+
+ return threads.deferToThread(self._do_connect, timeout)
+
+ def _do_connect(self, timeout):
+ try:
+ self._session = manager.connect(host=self._ip,
+ port=self._port,
+ username=self._username,
+ password=self._password,
+ allow_agent=False,
+ look_for_keys=False,
+ hostkey_verify=False,
+ timeout=timeout)
+
+ except SSHError as e:
+ # Log and rethrow exception so any errBack is called
+ log.warn('SSHError-during-connect', e=e)
+ raise e
+
+ except Exception as e:
+ # Log and rethrow exception so any errBack is called
+ log.exception('Connect-failed: {}', e=e)
+ raise e
+
+ # If debug logging is enabled, decrease the level, DEBUG is a significant
+ # performance hit during response XML decode
+
+ if log.isEnabledFor('DEBUG'):
+ log.setLevel('INFO')
+
+ # TODO: ncclient also supports RaiseMode:NONE to limit exceptions. To set use:
+ #
+ # self._session.raise_mode = RaiseMode:NONE
+ #
+ # and the when you get a response back, you can check 'response.ok' to
+ # see if it is 'True' if it is not, you can enumerate the 'response.errors'
+ # list for more information
+
+ return self._session
+
+ def close(self):
+ """
+ Close the connection to the NETCONF server
+ :return: (deferred) Deferred request
+ """
+ s, self._session = self._session, None
+
+ if s is None or not s.connected:
+ return defer.returnValue(True)
+
+ return threads.deferToThread(self._do_close, s)
+
+ def _do_close(self, old_session):
+ return old_session.close_session()
+
+ @inlineCallbacks
+ def _reconnect(self):
+ try:
+ yield self.close()
+ except:
+ pass
+
+ try:
+ yield self.connect()
+ except:
+ pass
+
+ def get_config(self, source='running'):
+ """
+ Get the configuration from the specified source
+
+ :param source: (string) Configuration source, 'running', 'candidate', ...
+
+ :return: (deferred) Deferred request that wraps the GetReply class
+ """
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ self._reconnect()
+
+ return threads.deferToThread(self._do_get_config, source)
+
+ def _do_get_config(self, source):
+ """
+ Get the configuration from the specified source
+
+ :param source: (string) Configuration source, 'running', 'candidate', ...
+
+ :return: (GetReply) The configuration.
+ """
+ return self._session.get_config(source)
+
+ def get(self, payload):
+ """
+ Get the requested data from the server
+
+ :param payload: Payload/filter
+ :return: (deferred) for GetReply
+ """
+ log.debug('get', filter=payload)
+
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ self._reconnect()
+
+ return threads.deferToThread(self._do_get, payload)
+
+ def _do_get(self, payload):
+ """
+ Get the requested data from the server
+
+ :param payload: Payload/filter
+ :return: (GetReply) response
+ """
+ try:
+ log.debug('get', payload=payload)
+ response = self._session.get(payload)
+ # To get XML, use response.xml
+ log.debug('response', response=response)
+
+ except RPCError as e:
+ log.exception('get', e=e)
+ raise
+
+ return response
+
+ def lock(self, source, lock_timeout):
+ """
+ Lock the configuration system
+ :return: (deferred) for RpcReply
+ """
+ log.info('lock', source=source, timeout=lock_timeout)
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_lock, source, lock_timeout)
+
+ def _do_lock(self, source, lock_timeout):
+ """
+ Lock the configuration system
+ """
+ try:
+ response = self._session.lock(source, timeout=lock_timeout)
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('lock', e=e)
+ raise
+
+ return response
+
+ def unlock(self, source):
+ """
+ Get the requested data from the server
+ :param source: RPC request
+
+ :return: (deferred) for RpcReply
+ """
+ log.info('unlock', source=source)
+
+ if not self._session or not self._session.connected:
+ raise NotImplemented('TODO: Support auto-connect if needed')
+
+ return threads.deferToThread(self._do_unlock, source)
+
+ def _do_unlock(self, source):
+ """
+ Lock the configuration system
+ """
+ try:
+ response = self._session.unlock(source)
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('unlock', e=e)
+ raise
+
+ return response
+
+ @inlineCallbacks
+ def edit_config(self, config, target='running', default_operation='none',
+ test_option=None, error_option=None, ignore_delete_error=False):
+ """
+ Loads all or part of the specified config to the target configuration datastore
+ with the ability to lock the datastore during the edit.
+
+ :param config is the configuration, which must be rooted in the config element.
+ It can be specified either as a string or an Element.format="xml"
+ :param target is the name of the configuration datastore being edited
+ :param default_operation if specified must be one of { 'merge', 'replace', or 'none' }
+ :param test_option if specified must be one of { 'test_then_set', 'set' }
+ :param error_option if specified must be one of { 'stop-on-error',
+ 'continue-on-error', 'rollback-on-error' } The
+ 'rollback-on-error' error_option depends on the
+ :rollback-on-error capability.
+ :param ignore_delete_error: (bool) For some startup deletes/clean-ups, we do a
+ delete high up in the config to get whole lists. If
+ these lists are empty, this helps suppress any error
+ message from NETConf on failure to delete an empty list
+
+ :return: (deferred) for RpcReply
+ """
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ try:
+ yield self._reconnect()
+
+ except Exception as e:
+ log.exception('edit-config-connect', e=e)
+
+ try:
+ if config[:7] != '<config':
+ config = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0"' + \
+ ' xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ config + '</config>'
+
+ log.debug('netconf-request', config=config, target=target,
+ default_operation=default_operation)
+
+ rpc_reply = yield threads.deferToThread(self._do_edit_config, target,
+ config, default_operation,
+ test_option, error_option)
+ except Exception as e:
+ if ignore_delete_error and 'operation="delete"' in config.lower():
+ returnValue('ignoring-delete-error')
+ log.exception('edit_config', e=e, config=config, target=target)
+ raise
+
+ returnValue(rpc_reply)
+
+ def _do_edit_config(self, target, config, default_operation, test_option, error_option,
+ ignore_delete_error=False):
+ """
+ Perform actual edit-config operation
+ """
+ try:
+ log.debug('edit-config', target=target, config=config)
+
+ response = self._session.edit_config(target=target, config=config
+ # TODO: Support additional options later
+ # ,default_operation=default_operation,
+ # test_option=test_option,
+ # error_option=error_option
+ )
+
+ log.debug('netconf-response', response=response)
+ # To get XML, use response.xml
+ # To check status, use response.ok (boolean)
+
+ except RPCError as e:
+ if not ignore_delete_error or 'operation="delete"' not in config.lower():
+ log.exception('do_edit_config', e=e, config=config, target=target)
+ raise
+
+ return response
+
+ def rpc(self, rpc_string):
+ """
+ Custom RPC request
+ :param rpc_string: (string) RPC request
+ :return: (deferred) for GetReply
+ """
+ log.debug('rpc', rpc=rpc_string)
+
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ self._reconnect()
+
+ return threads.deferToThread(self._do_rpc, rpc_string)
+
+ def _do_rpc(self, rpc_string):
+ try:
+ response = self._session.dispatch(etree.fromstring(rpc_string))
+ # To get XML, use response.xml
+
+ except RPCError as e:
+ log.exception('rpc', e=e)
+ raise
+
+ return response
diff --git a/adapters/adtran_common/net/adtran_rest.py b/adapters/adtran_common/net/adtran_rest.py
new file mode 100644
index 0000000..9020e82
--- /dev/null
+++ b/adapters/adtran_common/net/adtran_rest.py
@@ -0,0 +1,189 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import structlog
+import treq
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.error import ConnectionClosed, ConnectionDone, ConnectionLost
+
+log = structlog.get_logger()
+
+
+class RestInvalidResponseCode(Exception):
+ def __init__(self, message, url, code):
+ super(RestInvalidResponseCode, self).__init__(message)
+ self.url = url
+ self.code = code
+
+
+class AdtranRestClient(object):
+ """
+ Performs Adtran RESTCONF requests
+ """
+ # HTTP shortcuts
+ HELLO_URI = '/restconf/adtran-hello:hello'
+
+ REST_GET_REQUEST_HEADER = {'User-Agent': 'Adtran RESTConf',
+ 'Accept': ['application/json']}
+
+ REST_POST_REQUEST_HEADER = {'User-Agent': 'Adtran RESTConf',
+ 'Content-Type': 'application/json',
+ 'Accept': ['application/json']}
+
+ REST_PATCH_REQUEST_HEADER = REST_POST_REQUEST_HEADER
+ REST_PUT_REQUEST_HEADER = REST_POST_REQUEST_HEADER
+ REST_DELETE_REQUEST_HEADER = REST_GET_REQUEST_HEADER
+
+ HTTP_OK = 200
+ HTTP_CREATED = 201
+ HTTP_ACCEPTED = 202
+ HTTP_NON_AUTHORITATIVE_INFORMATION = 203
+ HTTP_NO_CONTENT = 204
+ HTTP_RESET_CONTENT = 205
+ HTTP_PARTIAL_CONTENT = 206
+ HTTP_NOT_FOUND = 404
+
+ _valid_methods = {'GET', 'POST', 'PATCH', 'DELETE'}
+ _valid_results = {'GET': [HTTP_OK, HTTP_NO_CONTENT],
+ 'POST': [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT],
+ 'PUT': [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT],
+ 'PATCH': [HTTP_OK],
+ 'DELETE': [HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT, HTTP_NOT_FOUND]
+ }
+
+ for _method in _valid_methods:
+ assert _method in _valid_results # Make sure we have a results entry for each supported method
+
+ def __init__(self, host_ip, port, username='', password='', timeout=10):
+ """
+ REST Client initialization
+
+ :param host_ip: (string) IP Address of Adtran Device
+ :param port: (int) Port number
+ :param username: (string) Username for credentials
+ :param password: (string) Password for credentials
+ :param timeout: (int) Number of seconds to wait for a response before timing out
+ """
+ self._ip = host_ip
+ self._port = port
+ self._username = username
+ self._password = password
+ self._timeout = timeout
+
+ def __str__(self):
+ return "AdtranRestClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+ @inlineCallbacks
+ def request(self, method, uri, data=None, name='', timeout=None, is_retry=False,
+ suppress_error=False):
+ """
+ Send a REST request to the Adtran device
+
+ :param method: (string) HTTP method
+ :param uri: (string) fully URL to perform method on
+ :param data: (string) optional data for the request body
+ :param name: (string) optional name of the request, useful for logging purposes
+ :param timeout: (int) Number of seconds to wait for a response before timing out
+ :param is_retry: (boolean) True if this method called recursively in order to recover
+ from a connection loss. Can happen sometimes in debug sessions
+ and in the real world.
+ :param suppress_error: (boolean) If true, do not output ERROR message on REST request failure
+ :return: (dict) On success with the proper results
+ """
+ log.debug('request', method=method, uri=uri, data=data, retry=is_retry)
+
+ if method.upper() not in self._valid_methods:
+ raise NotImplementedError("REST method '{}' is not supported".format(method))
+
+ url = 'http://{}:{}{}{}'.format(self._ip, self._port,
+ '/' if uri[0] != '/' else '',
+ uri)
+ response = None
+ timeout = timeout or self._timeout
+
+ try:
+ if method.upper() == 'GET':
+ response = yield treq.get(url,
+ auth=(self._username, self._password),
+ timeout=timeout,
+ headers=self.REST_GET_REQUEST_HEADER)
+ elif method.upper() == 'POST' or method.upper() == 'PUT':
+ response = yield treq.post(url,
+ data=data,
+ auth=(self._username, self._password),
+ timeout=timeout,
+ headers=self.REST_POST_REQUEST_HEADER)
+ elif method.upper() == 'PATCH':
+ response = yield treq.patch(url,
+ data=data,
+ auth=(self._username, self._password),
+ timeout=timeout,
+ headers=self.REST_PATCH_REQUEST_HEADER)
+ elif method.upper() == 'DELETE':
+ response = yield treq.delete(url,
+ auth=(self._username, self._password),
+ timeout=timeout,
+ headers=self.REST_DELETE_REQUEST_HEADER)
+ else:
+ raise NotImplementedError("REST method '{}' is not supported".format(method))
+
+ except NotImplementedError:
+ raise
+
+ except (ConnectionDone, ConnectionLost) as e:
+ if is_retry:
+ raise
+ returnValue(self.request(method, uri, data=data, name=name,
+ timeout=timeout, is_retry=True))
+
+ except ConnectionClosed:
+ returnValue(ConnectionClosed)
+
+ except Exception as e:
+ log.exception("rest-request", method=method, url=url, name=name, e=e)
+ raise
+
+ if response.code not in self._valid_results[method.upper()]:
+ message = "REST {} '{}' request to '{}' failed with status code {}".format(method, name,
+ url, response.code)
+ if not suppress_error:
+ log.error(message)
+ raise RestInvalidResponseCode(message, url, response.code)
+
+ if response.code in {self.HTTP_NO_CONTENT, self.HTTP_NOT_FOUND}:
+ returnValue(None)
+
+ else:
+ # TODO: May want to support multiple body encodings in the future
+
+ headers = response.headers
+ type_key = 'content-type'
+ type_val = 'application/json'
+
+ if not headers.hasHeader(type_key) or type_val not in headers.getRawHeaders(type_key, []):
+ raise Exception("REST {} '{}' request response from '{}' was not JSON",
+ method, name, url)
+
+ content = yield response.content()
+ try:
+ result = json.loads(content)
+
+ except Exception as e:
+ log.exception("json-decode", method=method, url=url, name=name,
+ content=content, e=e)
+ raise
+
+ returnValue(result)
diff --git a/adapters/adtran_common/net/adtran_zmq.py b/adapters/adtran_common/net/adtran_zmq.py
new file mode 100644
index 0000000..1d1341c
--- /dev/null
+++ b/adapters/adtran_common/net/adtran_zmq.py
@@ -0,0 +1,379 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import structlog
+
+from twisted.internet.defer import succeed
+from twisted.internet import threads
+
+from txzmq import ZmqEndpoint, ZmqFactory
+from txzmq.connection import ZmqConnection
+
+import zmq
+from zmq import constants
+from zmq.utils import jsonapi
+from zmq.utils.strtypes import b, u
+from zmq.auth.base import Authenticator
+
+from threading import Thread, Event
+
+zmq_factory = ZmqFactory()
+
+
+class AdtranZmqClient(object):
+ """
+ Adtran ZeroMQ Client for PON Agent and/or packet in/out service
+ """
+ def __init__(self, ip_address, rx_callback, port):
+ self.log = structlog.get_logger()
+
+ external_conn = 'tcp://{}:{}'.format(ip_address, port)
+
+ self.zmq_endpoint = ZmqEndpoint('connect', external_conn)
+ self._socket = ZmqPairConnection(zmq_factory, self.zmq_endpoint)
+ self._socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
+ self.auth = None
+
+ def send(self, data):
+ try:
+ self._socket.send(data)
+
+ except Exception as e:
+ self.log.exception('send', e=e)
+
+ def shutdown(self):
+ self._socket.onReceive = AdtranZmqClient.rx_nop
+ self._socket.shutdown()
+
+ @property
+ def socket(self):
+ return self._socket
+
+ @staticmethod
+ def rx_nop(_):
+ pass
+
+ def setup_plain_security(self, username, password):
+ self.log.debug('setup-plain-security')
+
+ def configure_plain(_):
+ self.log.debug('plain-security', username=username,
+ password=password)
+
+ self.auth.configure_plain(domain='*', passwords={username: password})
+ self._socket.socket.plain_username = username
+ self._socket.socket.plain_password = password
+
+ def add_endoints(_results):
+ self._socket.addEndpoints([self.zmq_endpoint])
+
+ def config_failure(_results):
+ raise Exception('Failed to configure plain-text security')
+
+ def endpoint_failure(_results):
+ raise Exception('Failed to complete endpoint setup')
+
+ self.auth = TwistedZmqAuthenticator()
+
+ d = self.auth.start()
+ d.addCallbacks(configure_plain, config_failure)
+ d.addCallbacks(add_endoints, endpoint_failure)
+
+ return d
+
+ def setup_curve_security(self):
+ self.log.debug('setup-curve-security')
+ raise NotImplementedError('TODO: curve transport security is not yet supported')
+
+
+class ZmqPairConnection(ZmqConnection):
+ """
+ Bidirectional messages to/from the socket.
+
+ Wrapper around ZeroMQ PUSH socket.
+ """
+ socketType = constants.PAIR
+
+ def messageReceived(self, message):
+ """
+ Called on incoming message from ZeroMQ.
+
+ :param message: message data
+ """
+ self.onReceive(message)
+
+ def onReceive(self, message):
+ """
+ Called on incoming message received from other end of the pair.
+
+ :param message: message data
+ """
+ raise NotImplementedError(self)
+
+ def send(self, message):
+ """
+ Send message via ZeroMQ socket.
+
+ Sending is performed directly to ZeroMQ without queueing. If HWM is
+ reached on ZeroMQ side, sending operation is aborted with exception
+ from ZeroMQ (EAGAIN).
+
+ After writing read is scheduled as ZeroMQ may not signal incoming
+ messages after we touched socket with write request.
+
+ :param message: message data, could be either list of str (multipart
+ message) or just str
+ :type message: str or list of str
+ """
+ from txzmq.compat import is_nonstr_iter
+ from twisted.internet import reactor
+
+ if not is_nonstr_iter(message):
+ self.socket.send(message, constants.NOBLOCK)
+ else:
+ # for m in message[:-1]:
+ # self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
+ # self.socket.send(message[-1], constants.NOBLOCK)
+ self.socket.send_multipart(message, flags=constants.NOBLOCK)
+
+ if self.read_scheduled is None:
+ self.read_scheduled = reactor.callLater(0, self.doRead)
+
+###############################################################################################
+###############################################################################################
+###############################################################################################
+###############################################################################################
+
+
+def _inherit_docstrings(cls):
+ """inherit docstrings from Authenticator, so we don't duplicate them"""
+ for name, method in cls.__dict__.items():
+ if name.startswith('_'):
+ continue
+ upstream_method = getattr(Authenticator, name, None)
+ if not method.__doc__:
+ method.__doc__ = upstream_method.__doc__
+ return cls
+
+
+@_inherit_docstrings
+class TwistedZmqAuthenticator(object):
+ """Run ZAP authentication in a background thread but communicate via Twisted ZMQ"""
+
+ def __init__(self, encoding='utf-8'):
+ self.log = structlog.get_logger()
+ self.context = zmq_factory.context
+ self.encoding = encoding
+ self.pipe = None
+ self.pipe_endpoint = "inproc://{0}.inproc".format(id(self))
+ self.thread = None
+
+ def allow(self, *addresses):
+ try:
+ self.pipe.send([b'ALLOW'] + [b(a, self.encoding) for a in addresses])
+
+ except Exception as e:
+ self.log.exception('allow', e=e)
+
+ def deny(self, *addresses):
+ try:
+ self.pipe.send([b'DENY'] + [b(a, self.encoding) for a in addresses])
+
+ except Exception as e:
+ self.log.exception('deny', e=e)
+
+ def configure_plain(self, domain='*', passwords=None):
+ try:
+ self.pipe.send([b'PLAIN', b(domain, self.encoding), jsonapi.dumps(passwords or {})])
+
+ except Exception as e:
+ self.log.exception('configure-plain', e=e)
+
+ def configure_curve(self, domain='*', location=''):
+ try:
+ domain = b(domain, self.encoding)
+ location = b(location, self.encoding)
+ self.pipe.send([b'CURVE', domain, location])
+
+ except Exception as e:
+ self.log.exception('configure-curve', e=e)
+
+ def start(self, rx_callback=AdtranZmqClient.rx_nop):
+ """Start the authentication thread"""
+ try:
+ # create a socket to communicate with auth thread.
+
+ endpoint = ZmqEndpoint('bind', self.pipe_endpoint) # We are server, thread will be client
+ self.pipe = ZmqPairConnection(zmq_factory, endpoint)
+ self.pipe.onReceive = rx_callback
+
+ self.thread = LocalAuthenticationThread(self.context,
+ self.pipe_endpoint,
+ encoding=self.encoding)
+
+ return threads.deferToThread(TwistedZmqAuthenticator._do_thread_start,
+ self.thread, timeout=10)
+
+ except Exception as e:
+ self.log.exception('start', e=e)
+
+ @staticmethod
+ def _do_thread_start(thread, timeout=10):
+ thread.start()
+
+ # Event.wait:Changed in version 2.7: Previously, the method always returned None.
+ if sys.version_info < (2, 7):
+ thread.started.wait(timeout=timeout)
+
+ elif not thread.started.wait(timeout=timeout):
+ raise RuntimeError("Authenticator thread failed to start")
+
+ def stop(self):
+ """Stop the authentication thread"""
+ pipe, self.pipe = self.pipe, None
+ thread, self.thread = self.thread, None
+
+ if pipe:
+ pipe.send(b'TERMINATE')
+ pipe.onReceive = AdtranZmqClient.rx_nop
+ pipe.shutdown()
+
+ if thread.is_alive():
+ return threads.deferToThread(TwistedZmqAuthenticator._do_thread_join,
+ thread)
+ return succeed('done')
+
+ @staticmethod
+ def _do_thread_join(thread, timeout=1):
+ thread.join(timeout)
+ pass
+
+ def is_alive(self):
+ """Is the ZAP thread currently running?"""
+ return self.thread and self.thread.is_alive()
+
+ def __del__(self):
+ self.stop()
+
+
+# NOTE: Following is a duplicated from zmq code since class was not exported
+class LocalAuthenticationThread(Thread):
+ """A Thread for running a zmq Authenticator
+
+ This is run in the background by ThreadedAuthenticator
+ """
+
+ def __init__(self, context, endpoint, encoding='utf-8', authenticator=None):
+ super(LocalAuthenticationThread, self).__init__(name='0mq Authenticator')
+ self.log = structlog.get_logger()
+ self.context = context or zmq.Context.instance()
+ self.encoding = encoding
+ self.started = Event()
+ self.authenticator = authenticator or Authenticator(context, encoding=encoding)
+
+ # create a socket to communicate back to main thread.
+ self.pipe = context.socket(zmq.PAIR)
+ self.pipe.linger = 1
+ self.pipe.connect(endpoint)
+
+ def run(self):
+ """Start the Authentication Agent thread task"""
+ try:
+ self.authenticator.start()
+ self.started.set()
+ zap = self.authenticator.zap_socket
+ poller = zmq.Poller()
+ poller.register(self.pipe, zmq.POLLIN)
+ poller.register(zap, zmq.POLLIN)
+ while True:
+ try:
+ socks = dict(poller.poll())
+ except zmq.ZMQError:
+ break # interrupted
+
+ if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
+ terminate = self._handle_pipe()
+ if terminate:
+ break
+
+ if zap in socks and socks[zap] == zmq.POLLIN:
+ self._handle_zap()
+
+ self.pipe.close()
+ self.authenticator.stop()
+
+ except Exception as e:
+ self.log.exception("run", e=e)
+
+ def _handle_zap(self):
+ """
+ Handle a message from the ZAP socket.
+ """
+ msg = self.authenticator.zap_socket.recv_multipart()
+ if not msg:
+ return
+ self.authenticator.handle_zap_message(msg)
+
+ def _handle_pipe(self):
+ """
+ Handle a message from front-end API.
+ """
+ terminate = False
+
+ # Get the whole message off the pipe in one go
+ msg = self.pipe.recv_multipart()
+
+ if msg is None:
+ terminate = True
+ return terminate
+
+ command = msg[0]
+ self.log.debug("auth received API command", command=command)
+
+ if command == b'ALLOW':
+ addresses = [u(m, self.encoding) for m in msg[1:]]
+ try:
+ self.authenticator.allow(*addresses)
+ except Exception as e:
+ self.log.exception("Failed to allow", addresses=addresses, e=e)
+
+ elif command == b'DENY':
+ addresses = [u(m, self.encoding) for m in msg[1:]]
+ try:
+ self.authenticator.deny(*addresses)
+ except Exception as e:
+ self.log.exception("Failed to deny", addresses=addresses, e=e)
+
+ elif command == b'PLAIN':
+ domain = u(msg[1], self.encoding)
+ json_passwords = msg[2]
+ self.authenticator.configure_plain(domain, jsonapi.loads(json_passwords))
+
+ elif command == b'CURVE':
+ # For now we don't do anything with domains
+ domain = u(msg[1], self.encoding)
+
+ # If location is CURVE_ALLOW_ANY, allow all clients. Otherwise
+ # treat location as a directory that holds the certificates.
+ location = u(msg[2], self.encoding)
+ self.authenticator.configure_curve(domain, location)
+
+ elif command == b'TERMINATE':
+ terminate = True
+
+ else:
+ self.log.error("Invalid auth command from API", command=command)
+
+ return terminate
diff --git a/adapters/adtran_common/net/mock_netconf_client.py b/adapters/adtran_common/net/mock_netconf_client.py
new file mode 100644
index 0000000..314f2a0
--- /dev/null
+++ b/adapters/adtran_common/net/mock_netconf_client.py
@@ -0,0 +1,199 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import random
+import time
+from adtran_netconf import AdtranNetconfClient
+from pyvoltha.common.utils.asleep import asleep
+from ncclient.operations.rpc import RPCReply, RPCError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = structlog.get_logger()
+
+_dummy_xml = '<rpc-reply message-id="br-549" ' + \
+ 'xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" ' + \
+ 'xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ '<data/>' + \
+ '</rpc-reply>'
+
+
+class MockNetconfClient(AdtranNetconfClient):
+ """
+ Performs NETCONF requests
+ """
+ def __init__(self, host_ip, port=830, username='', password='', timeout=20):
+ super(MockNetconfClient, self).__init__(host_ip, port=port, username=username,
+ password=password, timeout=timeout)
+ self._connected = False
+ self._locked = {}
+
+ def __str__(self):
+ return "MockNetconfClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+ @property
+ def capabilities(self):
+ """
+ Get the server's NETCONF capabilities
+
+ :return: (ncclient.capabilities.Capabilities) object representing the server's capabilities.
+ """
+ return None
+
+ @property
+ def connected(self):
+ """
+ Is this client connected to a NETCONF server
+ :return: (boolean) True if connected
+ """
+ return self._connected
+
+ @inlineCallbacks
+ def connect(self, connect_timeout=None):
+ """
+ Connect to the NETCONF server
+ o To disable attempting publickey authentication altogether, call with
+ allow_agent and look_for_keys as False.`
+
+ o hostkey_verify enables hostkey verification from ~/.ssh/known_hosts
+
+ :return: (deferred) Deferred request
+ """
+ yield asleep(random.uniform(0.1, 5.0)) # Simulate NETCONF request delay
+ self._connected = True
+ self._locked = {}
+ returnValue(True)
+
+ @inlineCallbacks
+ def close(self):
+ """
+ Close the connection to the NETCONF server
+ :return: (deferred) Deferred request
+ """
+ yield asleep(random.uniform(0.1, 0.5)) # Simulate NETCONF request delay
+ self._connected = False
+ self._locked = {}
+ returnValue(True)
+
+ @inlineCallbacks
+ def get_config(self, source='running'):
+ """
+ Get the configuration from the specified source
+
+ :param source: (string) Configuration source, 'running', 'candidate', ...
+ :return: (deferred) Deferred request that wraps the GetReply class
+ """
+ yield asleep(random.uniform(0.1, 4.0)) # Simulate NETCONF request delay
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
+
+ @inlineCallbacks
+ def get(self, payload):
+ """
+ Get the requested data from the server
+
+ :param payload: Payload/filter
+ :return: (defeered) for GetReply
+ """
+ yield asleep(random.uniform(0.1, 3.0)) # Simulate NETCONF request delay
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
+
+ @inlineCallbacks
+ def lock(self, source, lock_timeout):
+ """
+ Lock the configuration system
+ :param source: is the name of the configuration datastore accessed
+ :param lock_timeout: timeout in seconds for holding the lock
+ :return: (defeered) for RpcReply
+ """
+ expire_time = time.time() + lock_timeout
+
+ if source not in self._locked:
+ self._locked[source] = None
+
+ while self._locked[source] is not None:
+ # Watch for lock timeout
+ if time.time() >= self._locked[source]:
+ self._locked[source] = None
+ break
+ yield asleep(0.1)
+
+ if time.time() < expire_time:
+ yield asleep(random.uniform(0.1, 0.5)) # Simulate NETCONF request delay
+ self._locked[source] = expire_time
+
+ returnValue(RPCReply(_dummy_xml) if expire_time > time.time() else RPCError('TODO'))
+
+ @inlineCallbacks
+ def unlock(self, source):
+ """
+ Get the requested data from the server
+ :param rpc_string: RPC request
+ :param source: is the name of the configuration datastore accessed
+ :return: (defeered) for RpcReply
+ """
+ if source not in self._locked:
+ self._locked[source] = None
+
+ if self._locked[source] is not None:
+ yield asleep(random.uniform(0.1, 0.5)) # Simulate NETCONF request delay
+
+ self._locked[source] = None
+ returnValue(RPCReply(_dummy_xml))
+
+ @inlineCallbacks
+ def edit_config(self, config, target='running', default_operation='merge',
+ test_option=None, error_option=None):
+ """
+ Loads all or part of the specified config to the target configuration datastore with the ability to lock
+ the datastore during the edit.
+
+ :param config is the configuration, which must be rooted in the config element. It can be specified
+ either as a string or an Element.format="xml"
+ :param target is the name of the configuration datastore being edited
+ :param default_operation if specified must be one of { 'merge', 'replace', or 'none' }
+ :param test_option if specified must be one of { 'test_then_set', 'set' }
+ :param error_option if specified must be one of { 'stop-on-error', 'continue-on-error', 'rollback-on-error' }
+ The 'rollback-on-error' error_option depends on the :rollback-on-error capability.
+
+ :return: (defeered) for RpcReply
+ """
+ try:
+ yield asleep(random.uniform(0.1, 2.0)) # Simulate NETCONF request delay
+
+ except Exception as e:
+ log.exception('edit_config', e=e)
+ raise
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
+
+ @inlineCallbacks
+ def rpc(self, rpc_string):
+ """
+ Custom RPC request
+ :param rpc_string: (string) RPC request
+ :return: (defeered) for GetReply
+ """
+ yield asleep(random.uniform(0.1, 2.0)) # Simulate NETCONF request delay
+
+ # TODO: Customize if needed...
+ xml = _dummy_xml
+ returnValue(RPCReply(xml))
diff --git a/adapters/adtran_common/net/rcmd.py b/adapters/adtran_common/net/rcmd.py
new file mode 100644
index 0000000..3062b4c
--- /dev/null
+++ b/adapters/adtran_common/net/rcmd.py
@@ -0,0 +1,112 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from twisted.internet.defer import Deferred, succeed
+from twisted.internet.protocol import Factory, Protocol
+from twisted.conch.client.knownhosts import ConsoleUI, KnownHostsFile
+from twisted.conch.endpoints import SSHCommandClientEndpoint
+from twisted.internet import reactor
+
+log = structlog.get_logger()
+_open = open
+
+
+class RCmd(object):
+ """
+ Execute a one-time remote command via SSH
+ """
+ def __init__(self, host, username, password,
+ command,
+ port=None,
+ keys=None,
+ known_hosts=None,
+ agent=None):
+ self.reactor = reactor
+ self.host = host
+ self.port = port
+ self.username = username
+ self.password = password
+ self.keys = keys
+ # self.knownHosts = known_hosts
+ self.knownHosts = known_hosts
+ self.agent = agent
+ self.command = command
+ self.ui = RCmd.FixedResponseUI(True)
+
+ class NoiseProtocol(Protocol):
+ def __init__(self):
+ self.finished = Deferred()
+ self.strings = ["bif", "pow", "zot"]
+
+ def connectionMade(self):
+ log.debug('connection-made')
+ self._send_noise()
+
+ def _send_noise(self):
+ if self.strings:
+ self.transport.write(self.strings.pop(0) + "\n")
+ else:
+ self.transport.loseConnection()
+
+ def dataReceived(self, data):
+ log.debug('rx', data=data)
+ if self.finished is not None and not self.finished.called:
+ self.finished.callback(data)
+ self._send_noise()
+
+ def connectionLost(self, reason):
+ log.debug('connection-lost')
+ if not self.finished.called:
+ self.finished.callback(reason)
+
+ class PermissiveKnownHosts(KnownHostsFile):
+ def verifyHostKey(self, ui, hostname, ip, key):
+ log.debug('verifyHostKey')
+ return True
+
+ class FixedResponseUI(ConsoleUI):
+ def __init__(self, result):
+ super(RCmd.FixedResponseUI, self).__init__(lambda: _open("/dev/null",
+ "r+b",
+ buffering=0))
+ self.result = result
+
+ def prompt(self, _):
+ log.debug('prompt')
+ return succeed(True)
+
+ def warn(self, text):
+ log.debug('warn')
+ pass
+
+ def _endpoint_for_command(self, command):
+ return SSHCommandClientEndpoint.newConnection(
+ self.reactor, command, self.username, self.host,
+ port=self.port,
+ password=self.password,
+ keys=self.keys,
+ agentEndpoint=self.agent,
+ knownHosts=self.knownHosts,
+ ui=self.ui
+ )
+
+ def execute(self):
+ endpoint = self._endpoint_for_command(self.command)
+ factory = Factory()
+ factory.protocol = RCmd.NoiseProtocol
+
+ d = endpoint.connect(factory)
+ d.addCallback(lambda proto: proto.finished)
+ return d
diff --git a/adapters/adtran_common/port.py b/adapters/adtran_common/port.py
new file mode 100644
index 0000000..0fc49dc
--- /dev/null
+++ b/adapters/adtran_common/port.py
@@ -0,0 +1,251 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from enum import Enum
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+
+from pyvoltha.protos.common_pb2 import OperStatus, AdminState
+
+
+class AdtnPort(object):
+ """
+ A class similar to the 'Port' class in the VOLTHA
+ """
+ class State(Enum):
+ INITIAL = 0 # Created and initialization in progress
+ RUNNING = 1 # PON port contacted, ONU discovery active
+ STOPPED = 2 # Disabled
+ DELETING = 3 # Cleanup
+
+ def __init__(self, parent, **kwargs):
+ assert parent, 'parent is None'
+ assert 'port_no' in kwargs, 'Port number not found'
+
+ self.log = structlog.get_logger(device_id=parent.device_id)
+
+ self._parent = parent
+ self._port_no = kwargs.get('port_no')
+
+ # Set the following in your derived class. These names are used in
+ # various ways. Typically, the physical port name will be used during
+ # device handler conversations with the hardware (REST, NETCONF, ...)
+ # while the logical port name is what the outside world (ONOS, SEBA, ...)
+ # uses. All ports have a physical port name, but only ports exposed through
+ # VOLTHA as a logical port will have a logical port name
+
+ self._physical_port_name = None
+ self._logical_port_name = None
+ self._label = None
+ self._port = None
+
+ self.sync_tick = 20.0
+ self.sync_deferred = None # For sync of PON config to hardware
+
+ # TODO: Deprecate 'enabled' and use admin_state instead may want initial to always be
+ # disabled and then in derived classes, set it in the 'reset' method called on startup.
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+
+ self._oper_status = OperStatus.DISCOVERED
+ self._state = AdtnPort.State.INITIAL
+
+ self.deferred = None # General purpose
+
+ # Statistics
+ self.rx_packets = 0
+ self.rx_bytes = 0
+ self.tx_packets = 0
+ self.tx_bytes = 0
+ self.timestamp = 0 # UTC when KPI items last updated
+
+ def __del__(self):
+ self.stop()
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ raise NotImplementedError('Add to your derived class')
+
+ @property
+ def port_no(self):
+ return self._port_no
+
+ @property
+ def intf_id(self):
+ return self.port_no
+
+ @property
+ def physical_port_name(self):
+ return self._physical_port_name
+
+ @property
+ def logical_port_name(self):
+ return self._logical_port_name
+
+ @property # For backwards compatibility
+ def name(self):
+ return self._logical_port_name
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ self._state = value
+
+ @property
+ def olt(self):
+ return self._parent
+
+ @property
+ def admin_state(self):
+ return self._admin_state
+
+ @admin_state.setter
+ def admin_state(self, value):
+ if self._admin_state != value:
+ self._admin_state = value
+ if self._admin_state == AdminState.ENABLED:
+ self.start()
+ else:
+ self.stop()
+ @property
+ def enabled(self):
+ return self._admin_state == AdminState.ENABLED
+
+ @enabled.setter
+ def enabled(self, value):
+ assert isinstance(value, bool), 'enabled is a boolean'
+ self.admin_state = AdminState.ENABLED if value else AdminState.DISABLED
+
+ @property
+ def oper_status(self):
+ return self._oper_status
+
+ @property
+ def adapter_agent(self):
+ return self.olt.adapter_agent
+
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port. For PON ports, a logical port
+ is not currently created, so always return None
+
+ :return: VOLTHA logical port or None if not supported
+ """
+ return None
+
+ def cancel_deferred(self):
+ d1, self.deferred = self.deferred, None
+ d2, self.sync_deferred = self.sync_deferred, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception:
+ pass
+
+ def _update_adapter_agent(self):
+ raise NotImplementedError('Add to your derived class')
+
+ def start(self):
+ """
+ Start/enable this PON and start ONU discover
+ """
+ if self.state == AdtnPort.State.RUNNING:
+ return succeed('Running')
+
+ self.log.info('start-port')
+
+ self.cancel_deferred()
+ self.state = AdtnPort.State.INITIAL
+ self._oper_status = OperStatus.ACTIVATING
+ self._enabled = True
+
+ # Do the rest of the startup in an async method
+ self.deferred = reactor.callLater(0.5, self.finish_startup)
+ self._update_adapter_agent()
+
+ return succeed('Scheduled')
+
+ def finish_startup(self):
+ if self.state == AdtnPort.State.INITIAL:
+ self.log.debug('final-startup')
+
+ # If here, initial settings were successfully written to hardware
+
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+ self.state = AdtnPort.State.RUNNING
+
+ self.sync_deferred = reactor.callLater(self.sync_tick,
+ self.sync_hardware)
+ self._update_adapter_agent()
+
+ @inlineCallbacks
+ def stop(self):
+ if self.state == AdtnPort.State.STOPPED:
+ self.log.debug('already stopped')
+ returnValue('Stopped')
+
+ self.log.info('stopping')
+ try:
+ self.cancel_deferred()
+ self._enabled = False
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.UNKNOWN
+ self._update_adapter_agent()
+
+ self.state = AdtnPort.State.STOPPED
+
+ self.deferred = self.finish_stop()
+ yield self.deferred
+
+ except Exception as e:
+ self.log.exception('stop-failed', e=e)
+
+ returnValue('Stopped')
+
+ @inlineCallbacks
+ def finish_stop(self):
+ pass # Add to your derived class if needed
+ returnValue(None)
+
+ def restart(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
+ start_it = (self.state == AdtnPort.State.RUNNING)
+ self.state = AdtnPort.State.INITIAL
+ return self.start() if start_it else self.stop()
+ return succeed('nop')
+
+ def delete(self):
+ """
+ Parent device is being deleted. Do not change any config but
+ stop all polling
+ """
+ self.log.info('Deleting')
+ self.state = AdtnPort.State.DELETING
+ self.cancel_deferred()
+
+ def sync_hardware(self):
+ raise NotImplementedError('Add to your derived class')
+
+# TODO: Continue to consolidate port functionality
diff --git a/adapters/adtran_common/xpon/__init__.py b/adapters/adtran_common/xpon/__init__.py
new file mode 100644
index 0000000..b0fb0b2
--- /dev/null
+++ b/adapters/adtran_common/xpon/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_common/xpon/best_effort.py b/adapters/adtran_common/xpon/best_effort.py
new file mode 100644
index 0000000..99622af
--- /dev/null
+++ b/adapters/adtran_common/xpon/best_effort.py
@@ -0,0 +1,47 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import json
+
+log = structlog.get_logger()
+
+
+class BestEffort(object):
+ def __init__(self, bandwidth, priority, weight):
+ self.bandwidth = bandwidth # bps
+ self.priority = priority # 0.255
+ self.weight = weight # 0..100
+
+ def __str__(self):
+ return "BestEffort: {}/p-{}/w-{}".format(self.bandwidth,
+ self.priority,
+ self.weight)
+
+ def to_dict(self):
+ val = {
+ 'bandwidth': self.bandwidth,
+ 'priority': self.priority,
+ 'weight': self.weight
+ }
+ return val
+
+ def add_to_hardware(self, session, pon_id, onu_id, alloc_id, best_effort):
+ from ..adtran_olt_handler import AdtranOltHandler
+
+ uri = AdtranOltHandler.GPON_TCONT_CONFIG_URI.format(pon_id, onu_id, alloc_id)
+ data = json.dumps({'best-effort': best_effort.to_dict()})
+ name = 'tcont-best-effort-{}-{}: {}'.format(pon_id, onu_id, alloc_id)
+
+ return session.request('PATCH', uri, data=data, name=name)
diff --git a/adapters/adtran_common/xpon/gem_port.py b/adapters/adtran_common/xpon/gem_port.py
new file mode 100644
index 0000000..14dccb1
--- /dev/null
+++ b/adapters/adtran_common/xpon/gem_port.py
@@ -0,0 +1,63 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class GemPort(object):
+ """
+ Class to wrap TCont capabilities
+ """
+ def __init__(self, gem_id, alloc_id, uni_id, tech_profile_id,
+ encryption=False,
+ multicast=False,
+ traffic_class=None,
+ handler=None,
+ is_mock=False):
+
+ self.gem_id = gem_id
+ self._alloc_id = alloc_id
+ self.uni_id = uni_id
+ self.tech_profile_id = tech_profile_id
+ self.traffic_class = traffic_class
+ self._encryption = encryption
+ self.multicast = multicast
+ self._handler = handler
+ self._is_mock = is_mock
+ self.tech_profile_id = None # TODO: Make property and clean up object once tech profiles fully supported
+
+ # Statistics
+ self.rx_packets = 0
+ self.rx_bytes = 0
+ self.tx_packets = 0
+ self.tx_bytes = 0
+
+ def __str__(self):
+ return "GemPort: alloc-id: {}, gem-id: {}, uni-id: {}".format(self.alloc_id,
+ self.gem_id,
+ self.uni_id)
+
+ @property
+ def alloc_id(self):
+ return self._alloc_id
+
+ @property
+ def encryption(self):
+ return self._encryption
+
+ def to_dict(self):
+ return {
+ 'port-id': self.gem_id,
+ 'alloc-id': self.alloc_id,
+ 'encryption': self._encryption,
+ 'omci-transport': False
+ }
diff --git a/adapters/adtran_common/xpon/tcont.py b/adapters/adtran_common/xpon/tcont.py
new file mode 100644
index 0000000..79d94fa
--- /dev/null
+++ b/adapters/adtran_common/xpon/tcont.py
@@ -0,0 +1,29 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+class TCont(object):
+ """
+ Class to wrap TCont capabilities
+ """
+ def __init__(self, alloc_id, tech_profile_id, traffic_descriptor, uni_id, is_mock=False):
+ self.alloc_id = alloc_id
+ self.traffic_descriptor = traffic_descriptor
+ self._is_mock = is_mock
+ self.tech_profile_id = tech_profile_id
+ self.uni_id = uni_id
+
+ def __str__(self):
+ return "TCont: alloc-id: {}, uni-id: {}".format(self.alloc_id,
+ self.uni_id)
diff --git a/adapters/adtran_common/xpon/traffic_descriptor.py b/adapters/adtran_common/xpon/traffic_descriptor.py
new file mode 100644
index 0000000..230605b
--- /dev/null
+++ b/adapters/adtran_common/xpon/traffic_descriptor.py
@@ -0,0 +1,75 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from enum import Enum
+
+
+class TrafficDescriptor(object):
+ """
+ Class to wrap the uplink traffic descriptor.
+ """
+ class AdditionalBwEligibility(Enum):
+ NONE = 0
+ BEST_EFFORT_SHARING = 1
+ NON_ASSURED_SHARING = 2 # Should match xpon.py values
+ DEFAULT = NONE
+
+ @staticmethod
+ def to_string(value):
+ return {
+ TrafficDescriptor.AdditionalBwEligibility.NON_ASSURED_SHARING: "non-assured-sharing",
+ TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING: "best-effort-sharing",
+ TrafficDescriptor.AdditionalBwEligibility.NONE: "none"
+ }.get(value, "unknown")
+
+ @staticmethod
+ def from_value(value):
+ """
+ Matches both Adtran and xPON values
+ :param value:
+ :return:
+ """
+ return {
+ 0: TrafficDescriptor.AdditionalBwEligibility.NONE,
+ 1: TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING,
+ 2: TrafficDescriptor.AdditionalBwEligibility.NON_ASSURED_SHARING,
+ }.get(value, TrafficDescriptor.AdditionalBwEligibility.DEFAULT)
+
+ def __init__(self, fixed, assured, maximum,
+ additional=AdditionalBwEligibility.DEFAULT,
+ best_effort=None):
+ self.fixed_bandwidth = fixed # bps
+ self.assured_bandwidth = assured # bps
+ self.maximum_bandwidth = maximum # bps
+ self.additional_bandwidth_eligibility = additional
+ self.best_effort = best_effort\
+ if additional == TrafficDescriptor.AdditionalBwEligibility.BEST_EFFORT_SHARING\
+ else None
+
+ def __str__(self):
+ return "TrafficDescriptor: {}/{}/{}".format(self.fixed_bandwidth,
+ self.assured_bandwidth,
+ self.maximum_bandwidth)
+
+ def to_dict(self):
+ val = {
+ 'fixed-bandwidth': self.fixed_bandwidth,
+ 'assured-bandwidth': self.assured_bandwidth,
+ 'maximum-bandwidth': self.maximum_bandwidth,
+ 'additional-bandwidth-eligibility':
+ TrafficDescriptor.AdditionalBwEligibility.to_string(
+ self.additional_bandwidth_eligibility)
+ }
+ return val
+