VOL-653: Adtran OLT Device Adapter native packet In/Out support
Change-Id: Ia22c13c555e6d79fa9af5ad45459422531eb37dc
diff --git a/voltha/adapters/adtran_olt/README.md b/voltha/adapters/adtran_olt/README.md
index 891df93..1ba04f7 100644
--- a/voltha/adapters/adtran_olt/README.md
+++ b/voltha/adapters/adtran_olt/README.md
@@ -13,9 +13,9 @@
| -P | --rc_password | '' | REST Password |
| -T | --rc_port | 8081 | REST TCP Port |
| -z | --zmq_port | 5656 | ZeroMQ OMCI Proxy Port |
-| -a | --autoactivate | False | Autoactivate ONUs, xPON othewise |
-| -M | --multicast_vlan | 4092 | Multicast VLANs (comma-delimeted) |
-| -V | --packet_in_vlan | 4000 | OpenFlow Packet-In/Out VLAN |
+| -M | --multicast_vlan | 4000 | Multicast VLANs (comma-delimeted) |
+| -V | --packet_in_vlan | 4000 | OpenFlow Packet-In/Out VLAN, Zero to disable |
+| -v | --untagged_vlan | 4092 | VLAN wrapper for untagged ONU frames |
For example, if your Adtran OLT is address 10.17.174.193 with the default TCP ports and
NETCONF credentials of admin/admin and REST credentials of ADMIN/ADMIN, the command line
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index a373670..f015856 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -37,7 +37,7 @@
OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
from voltha.registry import registry
from alarms.adapter_alarms import AdapterAlarms
-from common.frameio.frameio import BpfProgramFilter, hexify
+from common.frameio.frameio import BpfProgramFilter
from pki.olt_pm_metrics import OltPmMetrics
from common.utils.asleep import asleep
from scapy.layers.l2 import Ether, Dot1Q
@@ -46,8 +46,11 @@
_ = third_party
DEFAULT_PACKET_IN_VLAN = 4000
-DEFAULT_MULTICAST_VLAN = 4050
-_MANAGEMENT_VLAN = 4093
+DEFAULT_POC_3_MULTICAST_VLAN = 4050
+DEFAULT_MULTICAST_VLAN = 4000
+DEFAULT_UTILITY_VLAN = 4094
+DEFAULT_UNTAGGED_VLAN = DEFAULT_UTILITY_VLAN # if RG does not send priority tagged frames
+#DEFAULT_UNTAGGED_VLAN = 4092
_DEFAULT_RESTCONF_USERNAME = ""
_DEFAULT_RESTCONF_PASSWORD = ""
@@ -57,12 +60,13 @@
_DEFAULT_NETCONF_PASSWORD = ""
_DEFAULT_NETCONF_PORT = 830
+FIXED_ONU = True # Enhanced ONU support
+
class AdtranDeviceHandler(object):
"""
A device that supports the ADTRAN RESTCONF protocol for communications
with a VOLTHA/VANILLA managed device.
-
Port numbering guidelines for Adtran OLT devices. Derived classes may augment
the numbering scheme below as needed.
@@ -91,7 +95,8 @@
RESTART_RPC = '<system-restart xmlns="urn:ietf:params:xml:ns:yang:ietf-system"/>'
def __init__(self, **kwargs):
- from net.adtran_zmq import DEFAULT_PON_AGENT_TCP_PORT, DEFAULT_PIO_TCP_PORT
+ from net.pio_zmq import DEFAULT_PIO_TCP_PORT
+ from net.pon_zmq import DEFAULT_PON_AGENT_TCP_PORT
super(AdtranDeviceHandler, self).__init__()
@@ -112,7 +117,12 @@
self.alarms = None
self.packet_in_vlan = DEFAULT_PACKET_IN_VLAN
self.multicast_vlans = [DEFAULT_MULTICAST_VLAN]
+ self.untagged_vlan = DEFAULT_UNTAGGED_VLAN
+ self.utility_vlan = DEFAULT_UTILITY_VLAN
self.default_mac_addr = '00:13:95:00:00:00'
+ self._is_inband_frame = None # TODO: Deprecate after PIO available
+ self.exception_gems = FIXED_ONU
+ self._rest_support = None
# Northbound and Southbound ports
self.northbound_ports = {} # port number -> Port
@@ -139,16 +149,6 @@
self.netconf_password = _DEFAULT_NETCONF_PASSWORD
self._netconf_client = None
- # If Auto-activate is true, all PON ports (up to a limit below) will be auto-enabled
- # and any ONU's discovered will be auto-activated.
- #
- # If it is set to False, then the xPON API/CLI should be used to enable any PON
- # ports. Before enabling a PON, set it's polling interval. If the polling interval
- # is 0, then manual ONU discovery is in effect. If >0, then every 'polling' seconds
- # autodiscover is requested. Any discovered ONUs will need to have their serial-numbers
- # registered (via xPON API/CLI) before they are activated.
-
- self._autoactivate = False
self.max_nni_ports = 1 # TODO: This is a VOLTHA imposed limit in 'flow_decomposer.py
# and logical_device_agent.py
# OMCI ZMQ Channel
@@ -164,9 +164,8 @@
self.heartbeat = None
self.heartbeat_last_reason = ''
- # Virtualized OLT Support & async command support
+ # Virtualized OLT Support
self.is_virtual_olt = False
- self.is_async_control = False
# Installed flows
self._evcs = {} # Flow ID/name -> FlowEntry
@@ -219,7 +218,8 @@
del self._evcs[evc.name]
def parse_provisioning_options(self, device):
- from net.adtran_zmq import DEFAULT_PON_AGENT_TCP_PORT
+ from net.pon_zmq import DEFAULT_PON_AGENT_TCP_PORT
+ from net.pio_zmq import DEFAULT_PIO_TCP_PORT
if not device.ipv4_address:
self.activate_failed(device, 'No ip_address field provided')
@@ -255,23 +255,31 @@
parser.add_argument('--rc_port', '-T', action='store', default=_DEFAULT_RESTCONF_PORT, type=check_tcp_port,
help='RESTCONF TCP Port')
parser.add_argument('--zmq_port', '-z', action='store', default=DEFAULT_PON_AGENT_TCP_PORT,
- type=check_tcp_port, help='ZeroMQ Port')
- parser.add_argument('--autoactivate', '-a', action='store_true', default=False,
- help='Autoactivate / Demo mode')
+ type=check_tcp_port, help='PON Agent ZeroMQ Port')
+ parser.add_argument('--pio_port', '-Z', action='store', default=DEFAULT_PIO_TCP_PORT,
+ type=check_tcp_port, help='PIO Service ZeroMQ Port')
parser.add_argument('--multicast_vlan', '-M', action='store',
default='{}'.format(DEFAULT_MULTICAST_VLAN),
- help='Multicast VLAN')
- parser.add_argument('--packet_in_vlan', '-V', action='store', default=DEFAULT_PACKET_IN_VLAN,
- type=check_vid, help='OpenFlow Packet-In/Out VLAN')
-
+ help='Multicast VLAN'),
+ parser.add_argument('--untagged_vlan', '-v', action='store',
+ default='{}'.format(DEFAULT_UNTAGGED_VLAN),
+ help='VLAN for Untagged Frames from ONUs'),
+ parser.add_argument('--utility_vlan', '-B', action='store',
+ default='{}'.format(DEFAULT_UTILITY_VLAN),
+ help='VLAN for Untagged Frames from ONUs'),
+ parser.add_argument('--no_exception_gems', '-X', action='store_true', default=not FIXED_ONU,
+ help='Native OpenFlow Packet-In/Out support')
try:
args = parser.parse_args(shlex.split(device.extra_args))
- self.packet_in_vlan = args.packet_in_vlan
- self._is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.
- format(self.packet_in_vlan))
- # May have multiple multicast VLANs
- self.multicast_vlans = [int(vid.strip()) for vid in args.multicast_vlan.split(',')]
+ self.exception_gems = not args.no_exception_gems
+ if self.exception_gems:
+ self._is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.
+ format(self.packet_in_vlan))
+ self.multicast_vlans = [DEFAULT_POC_3_MULTICAST_VLAN]
+ else:
+ # May have multiple multicast VLANs
+ self.multicast_vlans = [int(vid.strip()) for vid in args.multicast_vlan.split(',')]
self.netconf_username = args.nc_username
self.netconf_password = args.nc_password
@@ -282,8 +290,7 @@
self.rest_port = args.rc_port
self.pon_agent_port = args.zmq_port
-
- self._autoactivate = args.autoactivate
+ self.pio_port = args.pio_port
if not self.rest_username:
self.rest_username = 'NDE0NDRkNDk0ZQ==\n'.\
@@ -305,17 +312,6 @@
except Exception as e:
self.log.exception('option_parsing_error: {}'.format(e.message))
- @property
- def autoactivate(self):
- """
- Flag indicating if auto-discover/enable of PON ports is enabled as
- well as ONU auto activation. useful for demos
-
- If autoactivate is enabled, the default startup state (first time) for a PON port is disabled
- If autoactivate is disabled, the default startup state for a PON port is enabled
- """
- return self._autoactivate
-
@inlineCallbacks
def activate(self, device, done_deferred=None, reconciling=False):
"""
@@ -340,15 +336,13 @@
try:
self.startup = self.make_restconf_connection()
results = yield self.startup
+ self._rest_support = results
self.log.debug('HELLO_Contents: {}'.format(pprint.PrettyPrinter().pformat(results)))
# See if this is a virtualized OLT. If so, no NETCONF support available
-
- if 'module-info' in results:
- self.is_virtual_olt = any(mod.get('module-name', None) == 'adtran-ont-mock'
- for mod in results['module-info'])
- self.is_async_control = any(mod.get('module-name', None) == 'adtran-olt-pon-control'
- for mod in results['module-info'])
+ self.is_virtual_olt = 'module-info' in results and\
+ any(mod.get('module-name', None) == 'adtran-ont-mock'
+ for mod in results['module-info'])
except Exception as e:
self.log.exception('Initial_RESTCONF_hello_failed', e=e)
@@ -380,26 +374,8 @@
device.hardware_version = results.get('hardware_version', 'unknown')
device.firmware_version = results.get('firmware_version', 'unknown')
device.serial_number = results.get('serial_number', 'unknown')
+ device.images.image.extend(results.get('software-images', []))
- def get_software_images():
- leafs = ['running-revision', 'candidate-revision', 'startup-revision']
- image_names = list(set([results.get(img, 'unknown') for img in leafs]))
-
- images = []
- image_count = 1
- for name in image_names:
- # TODO: Look into how to find out hash, is_valid, and install date/time
- image = Image(name='Candidate_{}'.format(image_count),
- version=name,
- is_active=(name == results.get('running-revision', 'xxx')),
- is_committed=True,
- is_valid=True,
- install_datetime='Not Available')
- image_count += 1
- images.append(image)
- return images
-
- device.images.image.extend(get_software_images())
device.root = True
device.vendor = results.get('vendor', 'Adtran, Inc.')
device.connect_status = ConnectStatus.REACHABLE
@@ -694,7 +670,7 @@
self.log.exception('port-reset', e=e)
self.activate_failed(device, e.message)
- # Clean up all EVC and EVC maps (exceptions are ok)
+ # Clean up all EVCs, EVC maps and ACLs (exceptions are ok)
try:
from flow.evc import EVC
self.startup = yield EVC.remove_all(self.netconf_client)
@@ -709,6 +685,13 @@
except Exception as e:
self.log.exception('evc-map-cleanup', e=e)
+ try:
+ from flow.acl import ACL
+ self.startup = yield ACL.remove_all(self.netconf_client)
+
+ except Exception as e:
+ self.log.exception('acl-cleanup', e=e)
+
# Start/stop the interfaces as needed. These are deferred calls
dl = []
@@ -824,6 +807,7 @@
@inlineCallbacks
def complete_device_specific_activation(self, _device, _reconciling):
+ # NOTE: Override this in your derived class for any device startup completion
return defer.succeed('NOP')
def disable(self):
@@ -1118,7 +1102,7 @@
# Pause additional 5 seconds to let allow OLT microservices to complete some more initialization
yield asleep(5)
-
+ # TODO: Update device info. The software images may have changed...
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
@@ -1232,10 +1216,12 @@
self.log.info('deleted', device_id=self.device_id)
def _activate_io_port(self):
- if self.io_port is None:
+ if self.packet_in_vlan != 0 and self._is_inband_frame is not None and self.io_port is None:
self.log.info('registering-frameio')
self.io_port = registry('frameio').open_port(
self.interface, self._rcv_io, self._is_inband_frame)
+ else:
+ self.io_port = None
def _deactivate_io_port(self):
io, self.io_port = self.io_port, None
@@ -1269,30 +1255,7 @@
self.alarms.send_alarm(self, raw_data)
def packet_out(self, egress_port, msg):
- if self.io_port is not None:
- self.log.debug('sending-packet-out', egress_port=egress_port,
- msg=hexify(msg))
- pkt = Ether(msg)
-
- #ADTRAN To remove any extra tags
- while ( pkt.type == 0x8100 ):
- msg_hex=hexify(msg)
- msg_hex=msg_hex[:24]+msg_hex[32:]
- bytes = []
- msg_hex = ''.join( msg_hex.split(" ") )
- for i in range(0, len(msg_hex), 2):
- bytes.append( chr( int (msg_hex[i:i+2], 16 ) ) )
- msg = ''.join( bytes )
- pkt = Ether(msg)
- #END
-
- out_pkt = (
- Ether(src=pkt.src, dst=pkt.dst) /
- Dot1Q(vlan=self.packet_in_vlan) /
- Dot1Q(vlan=egress_port, type=pkt.type) /
- pkt.payload
- )
- self.io_port.send(str(out_pkt))
+ raise NotImplementedError('Overload in a derived class')
def update_pm_config(self, device, pm_config):
# TODO: This has not been tested
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index 1709871..30f6186 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -51,7 +51,7 @@
self.descriptor = Adapter(
id=self.name,
vendor='Adtran, Inc.',
- version='0.13',
+ version='0.14',
config=AdapterConfig(log_level=LogLevel.INFO)
)
log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index 47fcea8..bd81b47 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -25,12 +25,13 @@
from xpon.adtran_olt_xpon import AdtranOltXPON
from codec.olt_state import OltState
from flow.flow_entry import FlowEntry
-from net.adtran_zmq import AdtranZmqClient
+from net.pio_zmq import PioClient
+from net.pon_zmq import PonClient
+from voltha.core.flow_decomposer import *
from voltha.extensions.omci.omci import *
from voltha.protos.common_pb2 import AdminState, OperStatus
-from voltha.protos.device_pb2 import ImageDownload
+from voltha.protos.device_pb2 import ImageDownload, Image
-FIXED_ONU = True # Enhanced ONU support
ATT_NETWORK = True # Use AT&T cVlan scheme
@@ -71,9 +72,10 @@
self.status_poll = None
self.status_poll_interval = 5.0
self.status_poll_skew = self.status_poll_interval / 10
- self.pon_agent = None
- self.pio_agent = None
- self.ssh_deferred = None
+ self._pon_agent = None
+ self._pio_agent = None
+ self._is_async_control = False
+ self._ssh_deferred = None
self._system_id = None
self._download_protocols = None
self._download_deferred = None
@@ -99,7 +101,7 @@
def _cancel_deferred(self):
d1, self.status_poll = self.status_poll, None
- d2, self.ssh_deferred = self.ssh_deferred, None
+ d2, self._ssh_deferred = self._ssh_deferred, None
d3, self._download_deferred = self._download_deferred, None
for d in [d1, d2, d3]:
@@ -142,13 +144,14 @@
# TODO: After a CLI 'reboot' command, the device info may get messed up (prints labels and not values) Enter device and type 'show'
device = {
'model': 'n/a',
- 'hardware_version': 'n/a',
- 'serial_number': 'n/a',
+ 'hardware_version': 'unknown',
+ 'serial_number': 'unknown',
'vendor': 'Adtran, Inc.',
- 'firmware_version': 'n/a',
- 'running-revision': 'n/a',
- 'candidate-revision': 'n/a',
- 'startup-revision': 'n/a',
+ 'firmware_version': 'unknown',
+ 'running-revision': 'unknown',
+ 'candidate-revision': 'unknown',
+ 'startup-revision': 'unknown',
+ 'software-images': []
}
if self.is_virtual_olt:
returnValue(device)
@@ -173,17 +176,35 @@
'n/a')).translate(None, '?')
device['serial_number'] = str(module.get('serial-number',
'n/a')).translate(None, '?')
- device['firmware_version'] = str(device.get('firmware-revision',
- 'unknown')).translate(None, '?')
if 'software' in module:
if 'software' in module['software']:
software = module['software']['software']
- device['running-revision'] = str(software.get('running-revision',
- 'n/a')).translate(None, '?')
- device['candidate-revision'] = str(software.get('candidate-revision',
- 'n/a')).translate(None, '?')
- device['startup-revision'] = str(software.get('startup-revision',
- 'n/a')).translate(None, '?')
+ if isinstance(software, dict):
+ device['running-revision'] = str(software.get('running-revision',
+ 'n/a')).translate(None, '?')
+ device['candidate-revision'] = str(software.get('candidate-revision',
+ 'n/a')).translate(None, '?')
+ device['startup-revision'] = str(software.get('startup-revision',
+ 'n/a')).translate(None, '?')
+ elif isinstance(software, list):
+ for sw_item in software:
+ sw_type = sw_item.get('name', '').lower()
+ if sw_type == 'firmware':
+ device['firmware_version'] = str(sw_item.get('running-revision',
+ 'unknown')).translate(None, '?')
+ elif sw_type == 'software':
+ for rev_type in ['startup-revision',
+ 'running-revision',
+ 'candidate-revision']:
+ if rev_type in sw_item:
+ image = Image(name=rev_type,
+ version=sw_item[rev_type],
+ is_active=(rev_type == 'running-revision'),
+ is_committed=True,
+ is_valid=True,
+ install_datetime='Not Available')
+ device['software-images'].append(image)
+
except Exception as e:
self.log.exception('get-pe-state', e=e)
@@ -223,8 +244,8 @@
self.startup = ietf_interfaces.get_state()
results = yield self.startup
- ports = ietf_interfaces.get_nni_port_entries(results)
- yield returnValue(ports)
+ ports = ietf_interfaces.get_port_entries(results, 'ethernet')
+ returnValue(ports)
except Exception as e:
log.exception('enumerate_northbound_ports', e=e)
@@ -242,36 +263,88 @@
"""
from nni_port import NniPort, MockNniPort
- for port in results:
- port_no = port['port_no']
- self.log.info('processing-nni', port_no=port_no, name=port['port_no'])
+ for port in results.itervalues():
+ port_no = port.get('port_no')
assert port_no, 'Port number not found'
- assert port_no not in self.northbound_ports, 'Port number is not a northbound port'
+ assert port_no not in self.northbound_ports, \
+ 'Port number {} already in northbound ports'.format(port_no)
+
+ self.log.info('processing-nni', port_no=port_no, name=port['port_no'])
self.northbound_ports[port_no] = NniPort(self, **port) if not self.is_virtual_olt \
else MockNniPort(self, **port)
- # TODO: For now, limit number of NNI ports to make debugging easier
- if len(self.northbound_ports) >= self.max_nni_ports:
+ if len(self.northbound_ports) >= self.max_nni_ports: # TODO: For now, limit number of NNI ports to make debugging easier
break
self.num_northbound_ports = len(self.northbound_ports)
+ def _olt_version(self):
+ # Version
+ # 0 Unknown
+ # 1 V1 OMCI format
+ # 2 V2 OMCI format
+ # 3 2018-01-11 or later
+ version = 0
+ info = self._rest_support.get('module-info', [dict()])
+ hw_mod_ver_str = next((mod.get('revision') for mod in info
+ if mod.get('module-name', '').lower() == 'gpon-olt-hw'), None)
+
+ if hw_mod_ver_str is not None:
+ try:
+ from datetime import datetime
+ hw_mod_dt = datetime.strptime(hw_mod_ver_str, '%Y-%m-%d')
+ version = 2 if hw_mod_dt >= datetime(2017, 9, 21) else 2
+
+ except Exception as e:
+ self.log.exception('ver-str-check', e=e)
+
+ return version
+
@inlineCallbacks
def enumerate_southbound_ports(self, device):
"""
Enumerate all southbound ports of this device.
:param device: A voltha.Device object, with possible device-type
- specific extensions.
+ specific extensions.
:return: (Deferred or None).
"""
###############################################################################
# Determine number of southbound ports. We know it is 16, but this keeps this
# device adapter generic for our other OLTs up to this point.
- self.startup = self.rest_client.request('GET', self.GPON_PON_CONFIG_LIST_URI, 'pon-config')
- results = yield self.startup
- returnValue(results)
+ self.startup = self.rest_client.request('GET', self.GPON_PON_CONFIG_LIST_URI,
+ 'pon-config')
+ try:
+ results = yield self.startup
+
+ from codec.ietf_interfaces import IetfInterfacesState
+ from nni_port import MockNniPort
+
+ ietf_interfaces = IetfInterfacesState(self.netconf_client)
+
+ if self.is_virtual_olt:
+ nc_results = MockNniPort.get_pon_port_state_results()
+ else:
+ self.startup = ietf_interfaces.get_state()
+ nc_results = yield self.startup
+
+ ports = ietf_interfaces.get_port_entries(nc_results, 'xpon')
+ if len(ports) == 0:
+ ports = ietf_interfaces.get_port_entries(nc_results,
+ 'channel-termination')
+ for data in results:
+ pon_id = data['pon-id']
+ port = ports[pon_id + 1]
+ port['pon-id'] = pon_id
+ port['admin_state'] = AdminState.ENABLED if data.get('enabled', False)\
+ else AdminState.DISABLED
+
+ except Exception as e:
+ log.exception('enumerate_southbound_ports', e=e)
+ raise
+
+ returnValue(ports)
def process_southbound_ports(self, device, results):
"""
@@ -285,19 +358,17 @@
"""
from pon_port import PonPort
- for pon in results:
- # Number PON Ports after the NNI ports
- pon_id = pon['pon-id']
- log.info('processing-pon-port', pon_id=pon_id)
- assert pon_id not in self.southbound_ports,\
- 'Pon ID not found in southbound ports'
+ for pon in results.itervalues():
+ pon_id = pon.get('pon-id')
+ assert pon_id is not None, 'PON ID not found'
+ assert pon_id not in self.southbound_ports, \
+ 'PON ID {} already in southbound ports'.format(pon_id)
+ if pon['ifIndex'] is None:
+ pon['port_no'] = self._pon_id_to_port_number(pon_id)
+ else:
+ pass # Need to adjust ONU numbering !!!!
- self.southbound_ports[pon_id] = PonPort(pon_id,
- self._pon_id_to_port_number(pon_id),
- self)
- if self.autoactivate:
- self.southbound_ports[pon_id].downstream_fec_enable = True
- self.southbound_ports[pon_id].upstream_fec_enable = True
+ self.southbound_ports[pon_id] = PonPort(self, **pon)
self.num_southbound_ports = len(self.southbound_ports)
@@ -318,11 +389,10 @@
:param reconciling: (boolean) True if taking over for another VOLTHA
"""
# Make sure configured for ZMQ remote access
- self._ready_zmq()
+ self._ready_network_access()
# ZeroMQ clients
- self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
- self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+ self._zmq_startup()
# Download support
self._download_deferred = reactor.callLater(0, self._get_download_protocols)
@@ -336,7 +406,7 @@
def on_heatbeat_alarm(self, active):
if not active:
- self._ready_zmq()
+ self._ready_network_access()
@inlineCallbacks
def _get_download_protocols(self):
@@ -363,8 +433,11 @@
self._download_deferred = reactor.callLater(10, self._get_download_protocols)
@inlineCallbacks
- def _ready_zmq(self):
+ def _ready_network_access(self):
from net.rcmd import RCmd
+ # Software version
+ self._is_async_control = self._olt_version() >= 2
+
# Check for port status
command = 'netstat -pan | grep -i 0.0.0.0:{} | wc -l'.format(self.pon_agent_port)
rcmd = RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
@@ -380,48 +453,90 @@
create_it = True
if create_it:
- next_run = 15
- command = 'mkdir -p /etc/pon_agent; touch /etc/pon_agent/debug.conf; '
- command += 'ps -ae | grep -i ngpon2_agent; '
- command += 'service_supervisor stop ngpon2_agent; service_supervisor start ngpon2_agent; '
- command += 'ps -ae | grep -i ngpon2_agent'
+ def v1_method():
+ command = 'mkdir -p /etc/pon_agent; touch /etc/pon_agent/debug.conf; '
+ command += 'ps -ae | grep -i ngpon2_agent; '
+ command += 'service_supervisor stop ngpon2_agent; service_supervisor start ngpon2_agent; '
+ command += 'ps -ae | grep -i ngpon2_agent'
- rcmd = RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
-
- try:
self.log.debug('create-request', command=command)
- results = yield rcmd.execute()
- self.log.info('create-results', results=results, result_type=type(results))
+ return RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
- except Exception as e:
- self.log.exception('mkdir', e=e)
+ def v2_v3_method():
+ # Old V2 method
+ command = "sed --in-place=voltha-sav 's/^#export ZMQ_LISTEN/export ZMQ_LISTEN/' " \
+ "/etc/ngpon2_agent/ngpon2_agent_feature_flags; "
+
+ # V3 unifies listening port, compatible with v2
+ # command = "sed --in-place '/add feature flags/aexport ZMQ_LISTEN_ON_ANY_ADDRESS=1' " \
+ # "/etc/ngpon2_agent/ngpon2_agent_feature_flags; "
+ # command += "sed --in-place '/^export ZMQ_LISTEN/aAGENT_LISTEN_ON_ANY_ADDRESS=1' " \
+ # "/etc/ngpon2_agent/ngpon2_agent_feature_flags; "
+
+ command += 'ps -ae | grep -i ngpon2_agent; '
+ command += 'service_supervisor stop ngpon2_agent; service_supervisor start ngpon2_agent; '
+ command += 'ps -ae | grep -i ngpon2_agent'
+
+ self.log.debug('create-request', command=command)
+ return RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
+
+ # Look for version
+ next_run = 15
+ version = v2_v3_method if self._olt_version() > 1 else v1_method
+
+ if version is not None:
+ try:
+ rcmd = version()
+ results = yield rcmd.execute()
+ self.log.info('create-results', results=results, result_type=type(results))
+
+ except Exception as e:
+ self.log.exception('mkdir-and-restart', e=e)
else:
next_run = 0
if next_run > 0:
- self.ssh_deferred = reactor.callLater(next_run, self._ready_zmq)
+ self._ssh_deferred = reactor.callLater(next_run, self._ready_network_access)
+
+ def _zmq_startup(self):
+ # ZeroMQ clients
+ self._pon_agent = PonClient(self.ip_address,
+ port=self.pon_agent_port,
+ rx_callback=self.rx_pa_packet)
+
+ try:
+ self._pio_agent = PioClient(self.ip_address,
+ port=self.pio_port,
+ rx_callback=self.rx_pio_packet)
+ except Exception as e:
+ self._pio_agent = None
+ self.log.exception('pio-agent', e=e)
+
+ def _zmq_shutdown(self):
+ pon, self._pon_agent = self._pon_agent, None
+ pio, self._pio_agent = self._pio_agent, None
+
+ for c in [pon, pio]:
+ if c is not None:
+ try:
+ c.shutdown()
+ except:
+ pass
def disable(self):
self._cancel_deferred()
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
-
- c, self.pon_agent = self.pon_agent, None
- if c is not None:
- try:
- c.shutdown()
- except:
- pass
+ self._zmq_shutdown()
super(AdtranOltHandler, self).disable()
def reenable(self, done_deferred=None):
super(AdtranOltHandler, self).reenable(done_deferred=done_deferred)
- self._ready_zmq()
- self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
- self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+ self._ready_network_access()
+ self._zmq_startup()
# Register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
@@ -431,12 +546,9 @@
def reboot(self):
self._cancel_deferred()
- c, self.pon_agent = self.pon_agent, None
- if c is not None:
- c.shutdown()
-
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
+ self._zmq_shutdown()
# Download supported protocols may change (if new image gets activated)
self._download_protocols = None
@@ -446,16 +558,14 @@
def _finish_reboot(self, timeout, previous_oper_status, previous_conn_status):
super(AdtranOltHandler, self)._finish_reboot(timeout, previous_oper_status, previous_conn_status)
- self._ready_zmq()
+ self._ready_network_access()
# Download support
self._download_deferred = reactor.callLater(0, self._get_download_protocols)
# Register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
-
- self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
- self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+ self._zmq_startup()
self.status_poll = reactor.callLater(5, self.poll_for_status)
@@ -464,35 +574,44 @@
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
-
- c, self.pon_agent = self.pon_agent, None
- if c is not None:
- c.shutdown()
+ self._zmq_shutdown()
super(AdtranOltHandler, self).delete()
def rx_pa_packet(self, packets):
self.log.debug('rx-pon-agent-packet')
- for packet in packets:
- try:
- pon_id, onu_id, msg_bytes, is_omci = \
- AdtranZmqClient.decode_pon_agent_packet(packet,
- self.is_async_control)
- if is_omci:
- proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
+ if self._pon_agent is not None:
+ for packet in packets:
+ try:
+ pon_id, onu_id, msg_bytes, is_omci = \
+ self._pon_agent.decode_packet(packet, self._is_async_control)
+ if is_omci:
+ proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
- if proxy_address is not None:
- self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
+ if proxy_address is not None:
+ self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
- except Exception as e:
- self.log.exception('rx-pon-agent-packet', e=e)
+ except Exception as e:
+ self.log.exception('rx-pon-agent-packet', e=e)
def _compute_logical_port_no(self, port_no, evc_map, packet):
logical_port_no = None
+ # Upstream direction?
if self.is_pon_port(port_no):
- pon = self.get_southbound_port(port_no)
+ #TODO: Validate the evc-map name
+ from flow.evc_map import EVCMap
+ map_info = EVCMap.decode_evc_map_name(evc_map)
+ logical_port_no = int(map_info.get('ingress-port'))
+
+ if logical_port_no is None:
+ # Get PON
+ pon = self.get_southbound_port(port_no)
+
+ # Examine Packet and decode gvid
+ if packet is not None:
+ pass
elif self.is_nni_port(port_no):
nni = self.get_northbound_port(port_no)
@@ -506,19 +625,127 @@
self.log.debug('rx-packet-in', type=type(packets), data=packets)
assert isinstance(packets, list), 'Expected a list of packets'
- if self.logical_device_id is not None:
+ # TODO self._pio_agent.socket.socket.closed might be a good check here as well
+ if self.logical_device_id is not None and self._pio_agent is not None:
for packet in packets:
- try:
- port_no, evc_map, packet = AdtranZmqClient.decode_packet_in_message(packet)
- # packet.show()
+ url_type = self._pio_agent.get_url_type(packet)
+ if url_type == PioClient.UrlType.EVCMAPS_RESPONSE:
+ exception_map = self._pio_agent.decode_query_response_packet(packet)
+ self.log.debug('rx-pio-packet', exception_map=exception_map)
- logical_port_no = self._compute_logical_port_no(port_no, evc_map, packet)
+ elif url_type == PioClient.UrlType.PACKET_IN:
+ try:
+ from scapy.layers.l2 import Ether, Dot1Q
+ ifindex, evc_map, packet = self._pio_agent.decode_packet(packet)
- self.adapter_agent.send_packet_in(logical_device_id=self.logical_device_id,
- logical_port_no=logical_port_no,
- packet=str(packet))
- except Exception as e:
- self.log.exception('rx_pio_packet', e=e)
+ # convert ifindex to physical port number (HACK)
+ port_no = (ifindex - 60000) + 4
+
+ logical_port_no = self._compute_logical_port_no(port_no, evc_map, packet)
+
+ if logical_port_no is not None:
+ if self.is_pon_port(port_no) and packet.haslayer(Dot1Q):
+ # Scrub g-vid
+ inner_pkt = packet.getlayer(Dot1Q)
+ assert inner_pkt.haslayer(Dot1Q), 'Expected a C-Tag'
+ packet = Ether(src=packet.src, dst=packet.dst, type=inner_pkt.type)\
+ / inner_pkt.payload
+
+ self.adapter_agent.send_packet_in(logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port_no,
+ packet=str(packet))
+ else:
+ self.log.warn('logical-port-not-found', port_no=port_no, evc_map=evc_map)
+
+ except Exception as e:
+ self.log.exception('rx-pio-packet', e=e)
+
+ else:
+ self.log.warn('packet-in-unknown-url-type', url_type=url_type)
+
+ def packet_out(self, egress_port, msg):
+ """
+ Pass a packet_out message content to adapter so that it can forward it
+ out to the device. This is only called on root devices.
+
+ :param egress_port: egress logical port number
+ :param msg: actual message
+ :return: None """
+
+ if self.pio_port is not None or self.io_port is not None:
+ from scapy.layers.l2 import Ether, Dot1Q
+ from scapy.layers.inet import IP, UDP
+ from common.frameio.frameio import hexify
+
+ self.log.debug('sending-packet-out', egress_port=egress_port,
+ msg=hexify(msg))
+ pkt = Ether(msg)
+
+ # Remove any extra tags
+ while pkt.type == 0x8100:
+ msg_hex = hexify(msg)
+ msg_hex = msg_hex[:24] + msg_hex[32:]
+ bytes = []
+ msg_hex = ''.join(msg_hex.split(" "))
+ for i in range(0, len(msg_hex), 2):
+ bytes.append(chr(int(msg_hex[i:i+2], 16)))
+
+ msg = ''.join(bytes)
+ pkt = Ether(msg)
+
+ if self.io_port is not None:
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=self.packet_in_vlan) /
+ Dot1Q(vlan=egress_port, type=pkt.type) /
+ pkt.payload
+ )
+ self.io_port.send(str(out_pkt))
+
+ elif self._pio_agent is not None:
+ port, ctag, vlan_id, evcmapname = FlowEntry.get_packetout_info(self.device_id, egress_port)
+ exceptiontype = None
+ if pkt.type == FlowEntry.EtherType.EAPOL:
+ exceptiontype = 'eapol'
+ elif pkt.type == 2:
+ exceptiontype = 'igmp'
+ elif pkt.type == FlowEntry.EtherType.IPv4:
+ ippkt = IP(pkt.payload)
+ if ippkt.proto == FlowEntry.IpProtocol.UDP:
+ udppkt = UDP(ippkt.payload)
+ # packet out from DHCP server is reversed ports
+ if udppkt.sport == 67 and udppkt.dport == 68:
+ exceptiontype = 'dhcp'
+
+ if exceptiontype is None:
+ self.log.warn('packet-out-exceptiontype-unknown', eEtherType=pkt.type)
+
+ elif port is not None and ctag is not None and vlan_id is not None and evcmapname is not None:
+ self.log.debug('sending-pio-packet-out', port=port, ctag=ctag, vlan_id=vlan_id, evcmapname=evcmapname, exceptiontype=exceptiontype)
+ out_pkt = (
+ Ether(src=pkt.src, dst=pkt.dst) /
+ Dot1Q(vlan=port) /
+ Dot1Q(vlan=vlan_id) /
+ Dot1Q(vlan=ctag, type=pkt.type) /
+ pkt.payload
+ )
+ data = self._pio_agent.encode_packet(port, str(out_pkt), evcmapname, exceptiontype)
+ try:
+ self._pio_agent.send(data)
+
+ except Exception as e:
+ self.log.exception('pio-send', egress_port=egress_port, e=e)
+ else:
+ self.log.debug('packet-out-flow-not-found', egress_port=egress_port)
+
+ def send_packet_exceptions_request(self):
+ if self._pio_agent is not None:
+ request = self._pio_agent.query_request_packet()
+ try:
+ self._pio_agent.send(request)
+
+ except Exception as e:
+ self.log.exception('pio-send', e=e)
def poll_for_status(self):
self.log.debug('Initiating-status-poll')
@@ -561,6 +788,34 @@
self.status_poll = reactor.callLater(delay, self.poll_for_status)
+ def _create_untagged_flow(self):
+ nni_port = self.northbound_ports.get(1).port_no
+ pon_port = self.southbound_ports.get(0).port_no
+
+ return mk_flow_stat(
+ priority=100,
+ match_fields=[
+ in_port(nni_port),
+ vlan_vid(ofp.OFPVID_PRESENT + self.untagged_vlan),
+ # eth_type(FlowEntry.EtherType.EAPOL) ?? TODO: is this needed
+ ],
+ actions=[output(pon_port)]
+ )
+
+ def _create_utility_flow(self):
+ nni_port = self.northbound_ports.get(1).port_no
+ pon_port = self.southbound_ports.get(0).port_no
+
+ return mk_flow_stat(
+ priority=200,
+ match_fields=[
+ in_port(nni_port),
+ vlan_vid(ofp.OFPVID_PRESENT + self.utility_vlan),
+ # eth_type(FlowEntry.EtherType.EAPOL) ?? TODO: is this needed
+ ],
+ actions=[output(pon_port)]
+ )
+
@inlineCallbacks
def update_flow_table(self, flows, device):
"""
@@ -571,11 +826,33 @@
:param device: A voltha.Device object, with possible device-type
specific extensions.
"""
+
self.log.debug('bulk-flow-update', num_flows=len(flows),
device_id=device.id, flows=flows)
valid_flows = []
+ # Special helper egress Packet In/Out flows
+ for special_flow in (self._create_untagged_flow(),
+ self._create_utility_flow()):
+ valid_flow, evc = FlowEntry.create(special_flow, self)
+
+ if valid_flow is not None:
+ valid_flows.append(valid_flow.flow_id)
+
+ if evc is not None:
+ try:
+ evc.schedule_install()
+ self.add_evc(evc)
+
+ except Exception as e:
+ evc.status = 'EVC Install Exception: {}'.format(e.message)
+ self.log.exception('EVC-install', e=e)
+
+ # verify exception flows were installed by OLT PET process
+ reactor.callLater(5, self.send_packet_exceptions_request)
+
+ # Now process bulk flows
for flow in flows:
try:
# Try to create an EVC.
@@ -619,7 +896,7 @@
if isinstance(msg, Packet):
msg = str(msg)
- if self.pon_agent is not None:
+ if self._pon_agent is not None:
pon_id, onu_id = self._proxy_address_to_pon_onu_id(proxy_address)
pon = self.southbound_ports.get(pon_id)
@@ -628,10 +905,10 @@
onu = pon.onu(onu_id)
if onu is not None and onu.enabled:
- data = AdtranZmqClient.encode_omci_message(msg, pon_id, onu_id,
- self.is_async_control)
+ data = self._pon_agent.encode_omci_packet(msg, pon_id, onu_id,
+ self._is_async_control)
try:
- self.pon_agent.send(data)
+ self._pon_agent.send(data)
except Exception as e:
self.log.exception('pon-agent-send', pon_id=pon_id, onu_id=onu_id, e=e)
@@ -640,21 +917,22 @@
else:
self.log.debug('pon-invalid-or-disabled', pon_id=pon_id)
- def get_channel_id(self, pon_id, onu_id):
- from pon_port import PonPort
+ def get_onu_vid(self, onu_id): # TODO: Deprecate this with packet-in/out support
if ATT_NETWORK:
- if FIXED_ONU:
- return (onu_id * 120) + 2
- return 1 + onu_id + (pon_id * 120)
+ return (onu_id * 120) + 2
- if FIXED_ONU:
- return self._onu_offset(onu_id)
- return self._onu_offset(onu_id) + (pon_id * PonPort.MAX_ONUS_SUPPORTED)
+ return None
+
+ def get_channel_id(self, pon_id, onu_id): # TODO: Make this more unique. Just don't call the ONU VID method
+ from pon_port import PonPort
+ return self.get_onu_vid(onu_id)
+ # return self._onu_offset(onu_id) + (pon_id * PonPort.MAX_ONUS_SUPPORTED)
def _onu_offset(self, onu_id):
# Start ONU's just past the southbound PON port numbers. Since ONU ID's start
# at zero, add one
- assert AdtranOltHandler.BASE_ONU_OFFSET > (self.num_northbound_ports + self.num_southbound_ports + 1)
+ # assert AdtranOltHandler.BASE_ONU_OFFSET > (self.num_northbound_ports + self.num_southbound_ports + 1)
+ assert AdtranOltHandler.BASE_ONU_OFFSET > (4 + self.num_southbound_ports + 1) # Skip over uninitialized ports
return AdtranOltHandler.BASE_ONU_OFFSET + onu_id
def _pon_onu_id_to_proxy_address(self, pon_id, onu_id):
@@ -675,21 +953,17 @@
:return: (tuple) pon-id, onu-id
"""
onu_id = proxy_address.onu_id
-
- if self.autoactivate:
- # Legacy method
- pon_id = proxy_address.channel_group_id
- else:
- # xPON method
- pon_id = self._port_number_to_pon_id(proxy_address.channel_id)
+ pon_id = self._port_number_to_pon_id(proxy_address.channel_id)
return pon_id, onu_id
def _pon_id_to_port_number(self, pon_id):
- return pon_id + 1 + self.num_northbound_ports
+ # return pon_id + 1 + self.num_northbound_ports
+ return pon_id + 1 + 4 # Skip over uninitialized ports
def _port_number_to_pon_id(self, port):
- return port - 1 - self.num_northbound_ports
+ # return port - 1 - self.num_northbound_ports
+ return port - 1 - 4 # Skip over uninitialized ports
def is_pon_port(self, port):
return self._port_number_to_pon_id(port) in self.southbound_ports
@@ -697,6 +971,55 @@
def is_uni_port(self, port):
return port >= self._onu_offset(0) # TODO: Really need to rework this one...
+ def get_onu_port_and_vlans(self, flow_entry):
+ """
+ Get the logical port (openflow port) for a given southbound port of an ONU
+
+ :param flow_entry: (FlowEntry) Flow to parse
+ :return: None or openflow port number and the actual VLAN IDs we should use
+ """
+ if flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ # Upstream will have VID=Logical_port until VOL-460 is addressed
+ ingress_port = flow_entry.in_port
+ vid = flow_entry.vlan_id
+
+ else:
+ ingress_port = flow_entry.output
+ vid = flow_entry.inner_vid
+
+ pon_port = self.get_southbound_port(ingress_port)
+ if pon_port is None:
+ return None, None, None
+
+ if flow_entry.flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ if self.exception_gems: # FIXED_ONU
+ ingress_port = vid
+ return ingress_port, vid, vid # TODO: Needs work
+
+ # Upstream ACLs will have VID=Logical_port until VOL-460 is addressed
+ # but User data flows have the correct VID / C-Tag.
+ if flow_entry.is_acl_flow:
+ onu = next((onu for onu in pon_port.onus if
+ onu.logical_port == vid), None)
+ else:
+ onu = next((onu for onu in pon_port.onus if
+ onu.onu_vid == vid), None)
+
+ elif flow_entry.vlan_id in (FlowEntry.LEGACY_CONTROL_VLAN,
+ flow_entry.handler.untagged_vlan):
+ # User data flows have inner_vid=correct C-tag. Legacy control VLANs
+ # have inner_vid == logical_port until VOL-460 is addressed
+ onu = next((onu for onu in pon_port.onus if
+ onu.logical_port == vid), None)
+ else:
+ onu = next((onu for onu in pon_port.onus if
+ onu.onu_vid == vid), None)
+
+ if onu is None:
+ return None, None, None
+
+ return onu.logical_port, onu.onu_vid, onu.untagged_vlan
+
def get_southbound_port(self, port):
pon_id = self._port_number_to_pon_id(port)
return self.southbound_ports.get(pon_id, None)
diff --git a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
index 65cf58f..600923d 100644
--- a/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
+++ b/voltha/adapters/adtran_olt/codec/ietf_interfaces.py
@@ -268,26 +268,48 @@
return OFPPF_100GB_FD
@staticmethod
- def get_nni_port_entries(rpc_reply, nni_type='ethernet'):
+ def _get_port_number(name, if_index):
+ import re
+
+ formats = [
+ 'xpon \d/{1,2}\d', # OLT version 3 (Feb 2018++)
+ 'Hundred-Gigabit-Ethernet \d/\d/{1,2}\d', # OLT version 2
+ 'XPON \d/\d/{1,2}\d', # OLT version 2
+ 'hundred-gigabit-ethernet \d/{1,2}\d', # OLT version 1
+ 'channel-termination {1,2}\d', # OLT version 1
+ ]
+ p2 = re.compile('\d+')
+
+ for regex in formats:
+ p = re.compile(regex, re.IGNORECASE)
+ match = p.match(name)
+ if match is not None:
+ return int(p2.findall(name)[-1])
+
+ @staticmethod
+ def get_port_entries(rpc_reply, port_type):
"""
- Get the port entries that make up the northbound interfaces
+ Get the port entries that make up the northbound and
+ southbound interfaces
:param rpc_reply:
- :param nni_type:
+ :param port_type:
:return:
"""
- port_no = 1
- ports = []
+ ports = dict()
result_dict = xmltodict.parse(rpc_reply.data_xml)
entries = result_dict['data']['interfaces-state']['interface']
if not isinstance(entries, list):
entries = [entries]
- nni_ports = [entry for entry in entries if 'name' in entry and nni_type in entry['name']]
+ port_entries = [entry for entry in entries if 'name' in entry and
+ port_type.lower() in entry['name'].lower()]
- for entry in nni_ports:
+ for entry in port_entries:
port = {
- 'port_no': port_no,
+ 'port_no': IetfInterfacesState._get_port_number(entry.get('name'),
+ entry.get('ifindex')),
'name': entry.get('name', 'unknown'),
+ 'ifIndex': entry.get('ifIndex'),
# 'label': None,
'mac_address': IetfInterfacesState._get_mac_addr(entry),
'admin_state': IetfInterfacesState._get_admin_state(entry),
@@ -297,7 +319,10 @@
'current_speed': IetfInterfacesState._get_of_speed(entry),
'max_speed': IetfInterfacesState._get_of_speed(entry),
}
- ports.append(port)
- port_no += 1
+ port_no = port['port_no']
+ if port_no not in ports:
+ ports[port_no] = port
+ else:
+ ports[port_no].update(port)
return ports
diff --git a/voltha/adapters/adtran_olt/codec/olt_config.py b/voltha/adapters/adtran_olt/codec/olt_config.py
index 238d627..cd3416b 100644
--- a/voltha/adapters/adtran_olt/codec/olt_config.py
+++ b/voltha/adapters/adtran_olt/codec/olt_config.py
@@ -122,9 +122,14 @@
def decode(onu_dict):
onus = {}
- if onu_dict is not None and 'onu' in onu_dict:
- for onu_data in onu_dict['onu']:
- onu = OltConfig.Pon.Onu(onu_data)
+ if onu_dict is not None:
+ if 'onu' in onu_dict:
+ for onu_data in onu_dict['onu']:
+ onu = OltConfig.Pon.Onu(onu_data)
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
+ elif len(onu_dict) > 0 and 'onu-id' in onu_dict[0]:
+ onu = OltConfig.Pon.Onu(onu_dict[0])
assert onu.onu_id not in onus
onus[onu.onu_id] = onu
diff --git a/voltha/adapters/adtran_olt/flow/acl.py b/voltha/adapters/adtran_olt/flow/acl.py
index 86f9d73..6d012f2 100644
--- a/voltha/adapters/adtran_olt/flow/acl.py
+++ b/voltha/adapters/adtran_olt/flow/acl.py
@@ -13,20 +13,22 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#
+import xmltodict
+import re
import structlog
-
-import voltha.core.flow_decomposer as fd
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
log = structlog.get_logger()
_acl_list = {} # Key -> Name: List of encoded EVCs
+ACL_NAME_FORMAT = 'VOLTHA-ACL-{}-{}' # format(flow_entry.handler.device_id, flow_entry.flow.id)
+ACL_NAME_REGEX_ALL = 'VOLTHA-ACL-*'
class ACL(object):
"""
Class to wrap Trap-to-Controller functionality
"""
-
def __init__(self, flow_entry):
self._installed = False
self._status_message = None
@@ -34,16 +36,169 @@
self._flow = flow_entry.flow
self._handler = flow_entry.handler
self._name = ACL.flow_to_name(flow_entry)
-
+ self._rule_name = ACL.flow_to_ace_name(flow_entry)
+ self._eth_type = flow_entry.eth_type
+ self._ip_protocol = flow_entry.ip_protocol
+ self._ipv4_dst = flow_entry.ipv4_dst
+ self._src_port = flow_entry.udp_src
+ self._dst_port = flow_entry.udp_dst
+ self._exception = False
+ self._enabled = True
self._valid = self._decode()
+ def __str__(self):
+ return 'ACL: {}, Installed: {}, L2: {}, L3/4: {}'.format(self.name,
+ self._installed,
+ self.is_l2_exception,
+ self.is_l3_l4_exception)
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def installed(self):
+ return self._installed
+
+ @property
+ def is_l2_exception(self):
+ from flow_entry import FlowEntry
+ return self._eth_type not in (None,
+ FlowEntry.EtherType.IPv4,
+ FlowEntry.EtherType.IPv6)
+
+ @property
+ def is_l3_l4_exception(self):
+ return not self.is_l2_exception and self._ip_protocol is not None
+
+ @staticmethod
+ def _xml_header(operation=None):
+ return '<access-lists xmlns="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list"\
+ xmlns:adtn-ietf-ns-acl="http://www.adtran.com/ns/yang/adtran-ietf-ns-access-control-list"><acl{}>'.\
+ format('' if operation is None else ' xc:operation="{}"'.format(operation))
+
+ @staticmethod
+ def _xml_trailer():
+ return '</acl></access-lists>'
+
+ def _xml_action(self):
+ xml = '<actions>'
+ if self._exception:
+ xml += '<adtn-ietf-ns-acl:exception-to-cpu/>'
+ else:
+ xml += '<permit/>'
+ xml += '</actions>'
+ return xml
+
+ def _ace_l2(self):
+ xml = '<ace>'
+ xml += '<rule-name>{}</rule-name>'.format(self._rule_name)
+ xml += '<matches><l2-acl><ether-type>{:04x}</ether-type></l2-acl></matches>'.format(self._eth_type)
+ xml += self._xml_action()
+ xml += '</ace>'
+ return xml
+
+ def _ace_l2_l3_ipv4(self):
+ xml = '<ace>'
+ xml += '<rule-name>{}</rule-name>'.format(self._rule_name)
+ xml += '<matches><l2-l3-ipv4-acl>'
+ xml += '<ether-type>{:04X}</ether-type>'.format(self._eth_type)
+
+ if self._ip_protocol is not None:
+ xml += '<protocol>{}</protocol>'.format(self._ip_protocol)
+ if self._ipv4_dst is not None:
+ xml += '<destination-ipv4-network>{}/32</destination-ipv4-network>'.format(self._ipv4_dst)
+ if self._src_port is not None:
+ xml += '<source-port-range><lower-port>{}</lower-port><operation>eq</operation></source-port-range>'.\
+ format(self._src_port)
+ if self._dst_port is not None:
+ xml += '<destination-port-range><lower-port>' + \
+ '{}</lower-port><operations>eq</operations></destination-port-range>'.format(self._dst_port)
+
+ xml += '</l2-l3-ipv4-acl></matches>'
+ xml += self._xml_action()
+ xml += '</ace>'
+ return xml
+
+ def _ace_any(self):
+ xml = '<ace>'
+ xml += '<rule-name>{}</rule-name>'.format(self._rule_name)
+ xml += '<matches><any-acl/></matches>'
+ xml += self._xml_action()
+ xml += '</ace>'
+ return xml
+
+ def _acl_eth(self):
+ xml = '<acl-type>eth-acl</acl-type>'
+ xml += '<acl-name>{}</acl-name>'.format(self._name)
+ return xml
+
+ def _acl_l4(self):
+ xml = '<acl-type>mixed-l2-l3-ipv4-acl</acl-type>'
+ xml += '<acl-name>{}</acl-name>'.format(self._name)
+ return xml
+
+ def _acl_any(self):
+ xml = '<acl-type>any-acl</acl-type>'
+ xml += '<acl-name>{}</acl-name>'.format(self._name)
+ return xml
+
+ def _install_xml(self):
+ xml = ACL._xml_header('create')
+ if self.is_l2_exception:
+ xml += self._acl_eth()
+ xml += '<aces>{}</aces>'.format(self._ace_l2())
+ elif self.is_l3_l4_exception:
+ xml += self._acl_l4()
+ xml += '<aces>{}</aces>'.format(self._ace_l2_l3_ipv4())
+ else:
+ xml += self._acl_any()
+ xml += '<aces>{}</aces>'.format(self._ace_any())
+
+ xml += ACL._xml_trailer()
+ return xml
+
+ def _remove_xml(self):
+ xml = ACL._xml_header('delete')
+ if self.is_l2_exception:
+ xml += self._acl_eth()
+ elif self.is_l3_l4_exception:
+ xml += self._acl_l4()
+ else:
+ xml += self._acl_any()
+ xml += ACL._xml_trailer()
+ return xml
+
+ def evc_map_ingress_xml(self):
+ """ Individual ACL specific XML for the EVC MAP """
+
+ xml = '<adtn-evc-map-acl:acl-type '
+ fmt = 'xmlns:adtn-ietf-acl="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list">adtn-ietf-acl:{}'\
+ '</adtn-evc-map-acl:acl-type>'
+
+ if self.is_l2_exception:
+ xml += fmt.format('eth-acl')
+
+ elif self.is_l3_l4_exception:
+ xml += fmt.format('mixed-l2-l3-ipv4-acl')
+
+ else:
+ xml += fmt.format('any-acl')
+
+ xml += '<adtn-evc-map-acl:acl-name>{}</adtn-evc-map-acl:acl-name>'.format(self.name)
+ return xml
+
@staticmethod
def create(flow_entry):
- pass # TODO: Start here Thursday
+ return ACL(flow_entry)
@staticmethod
def flow_to_name(flow_entry):
- return 'ACL-{}-{}'.format(flow_entry.handler.device_id, flow_entry.flow.id)
+ return 'VOLTHA-ACL-{}-{}'.format(flow_entry.handler.device_id, flow_entry.flow.id)
+
+ @staticmethod
+ def flow_to_ace_name(flow_entry):
+ return 'VOLTHA-ACE-{}-{}'.format(flow_entry.handler.device_id, flow_entry.flow.id)
@property
def valid(self):
@@ -57,89 +212,65 @@
def status(self):
return self._status_message
+ @inlineCallbacks
def install(self):
- if not self._installed:
+ log.debug('installing-acl', installed=self._installed)
+
+ if not self._installed and self._enabled:
if self._name in _acl_list:
- self._status_message = "ACL '{}' already is installed".format(self.name)
- raise Exception(self._status_message) # TODO: A unique exception type would work here
+ self._status_message = "ACL '{}' already is installed".format(self._name)
+ raise Exception(self._status_message)
- raise NotImplemented('TODO: Implement this')
+ try:
+ acl_xml = self._install_xml()
+ log.debug('install-xml', xml=acl_xml, name=self._name)
- self._installed = True
- _acl_list[self.name] = self
- pass
+ results = yield self._handler.netconf_client.edit_config(acl_xml)
+ self._installed = results.ok
+ self._status_message = '' if results.ok else results.error
- return self._installed
+ if self._installed:
+ _acl_list[self._name] = self
+ except Exception as e:
+ log.exception('install-failure', name=self._name, e=e)
+ raise
+
+ returnValue(self._installed and self._enabled)
+
+ @inlineCallbacks
def remove(self):
+ log.debug('removing-acl', installed=self._installed)
+
if self._installed:
- raise NotImplemented('TODO: Implement this')
+ acl_xml = self._remove_xml()
+ log.info('remove-xml', xml=acl_xml, name=self._name)
- self._installed = False
- _acl_list.pop(self._name)
- pass
+ results = yield self._handler.netconf_client.edit_config(acl_xml)
+ self._installed = not results.ok
+ self._status_message = '' if results.ok else results.error
- return not self._installed
+ if not self._installed:
+ _acl_list.pop(self._name)
+
+ returnValue(not self._installed)
def enable(self):
if not self._enabled:
- raise NotImplemented("TODO: Implement this")
self._enabled = False
+ raise NotImplemented("TODO: Implement this")
def disable(self):
if self._enabled:
- raise NotImplemented("TODO: Implement this")
self._enabled = True
+ raise NotImplemented("TODO: Implement this")
def _decode(self):
"""
- Examine flow rules and extract appropriate settings for both this EVC
- and creates any EVC-Maps required.
+ Examine the field settings and set ACL up for requested fields
"""
- self._name = ACL.flow_to_name(self._flow, self._handler)
-
- # Determine this flow's type
-
- status = self._decode_traffic_selector() and self._decode_traffic_treatment()
-
- if status:
- pass # TODO
-
- if status:
- pass # TODO
- else:
- pass # TODO
-
- return status
-
- def _decode_traffic_selector(self):
- """
- Extract EVC related traffic selection settings
- """
- in_port = fd.get_in_port(self._flow)
- assert in_port is not None
-
- log.debug('InPort: {}', in_port)
-
- for field in fd.get_ofb_fields(self._flow):
- log.debug('Found-OFB-field', field=field)
-
- for action in fd.get_actions(self._flow):
- log.debug('Found-Action', action=action)
-
- return True
-
- def _decode_traffic_treatment(self):
- out_port = fd.get_out_port(self._flow)
-
- log.debug('OutPort: {}', out_port)
-
- for field in fd.get_ofb_fields(self._flow):
- log.debug('Found-OFB-field', field=field)
-
- for action in fd.get_actions(self._flow):
- log.debug('Found-Action', action=action)
-
+ # If EtherType is not None and not IP, this is an L2 exception
+ self._exception = self.is_l2_exception or self.is_l3_l4_exception
return True
# BULK operations
@@ -153,9 +284,64 @@
raise NotImplemented("TODO: Implement this")
@staticmethod
- def remove_all():
+ def remove_all(client, regex_=ACL_NAME_REGEX_ALL):
"""
- Remove all ACLs from hardware
+ Remove all matching ACLs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
"""
- raise NotImplemented("TODO: Implement this")
+ # Do a 'get' on the evc config an you should get the names
+ get_xml = """
+ <filter>
+ <access-lists xmlns="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list">
+ <acl><acl-type/><acl-name/></acl>
+ </access-lists>
+ </filter>
+ """
+ log.info('query', xml=get_xml, regex=regex_)
+ def request_failed(results, operation):
+ log.error('{}-failed'.format(operation), results=results)
+
+ def delete_complete(results):
+ log.debug('delete-complete', results=results)
+
+ def do_delete(rpc_reply, regexpr):
+ log.debug('query-complete', rpc_reply=rpc_reply)
+
+ if rpc_reply.ok:
+ result_dict = xmltodict.parse(rpc_reply.data_xml)
+ entries = result_dict['data']['access-lists'] if 'access-lists' in result_dict['data'] else {}
+
+ if 'acl' in entries:
+ p = re.compile(regexpr)
+
+ pairs = []
+ if isinstance(entries['acl'], list):
+ pairs = { (entry['acl-type'], entry['acl-name']) for entry in entries['acl']
+ if 'acl-name' in entry and 'acl-type' in entry and p.match(entry['acl-name'])}
+ else:
+ if 'acl' in entries:
+ entry = entries['acl']
+ if 'acl-name' in entry and 'acl-type' in entry and p.match(entry['acl-name']):
+ pairs = [ (entry['acl-type'], entry['acl-name']) ]
+
+ if len(pairs) > 0:
+ del_xml = '<access-lists xmlns="http://www.adtran.com/ns/yang/adtran-ietf-access-control-list">'
+ for pair in pairs:
+ del_xml += '<acl xc:operation = "delete">'
+ del_xml += '<acl-type>{}</acl-type>'.format(pair[0])
+ del_xml += '<acl-name>{}</acl-name>'.format(pair[1])
+ del_xml += '</acl>'
+ del_xml += '</access-lists>'
+ log.debug('removing', xml=del_xml)
+
+ return client.edit_config(del_xml)
+
+ return succeed('no entries')
+
+ d = client.get(get_xml)
+ d.addCallbacks(do_delete, request_failed, callbackArgs=[regex_], errbackArgs=['get'])
+ d.addCallbacks(delete_complete, request_failed, errbackArgs=['edit-config'])
+ return d
diff --git a/voltha/adapters/adtran_olt/flow/evc.py b/voltha/adapters/adtran_olt/flow/evc.py
index 85df0fb..f6c1fc6 100644
--- a/voltha/adapters/adtran_olt/flow/evc.py
+++ b/voltha/adapters/adtran_olt/flow/evc.py
@@ -14,7 +14,7 @@
import xmltodict
import re
-from enum import Enum
+from enum import IntEnum
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
from voltha.core.flow_decomposer import *
@@ -30,7 +30,7 @@
"""
Class to wrap EVC functionality
"""
- class SwitchingMethod(Enum):
+ class SwitchingMethod(IntEnum):
SINGLE_TAGGED = 1
DOUBLE_TAGGED = 2
MAC_SWITCHED = 3
@@ -51,7 +51,7 @@
return '<double-tag-mac-switched/>'
raise ValueError('Invalid SwitchingMethod enumeration')
- class Men2UniManipulation(Enum):
+ class Men2UniManipulation(IntEnum):
SYMMETRIC = 1
POP_OUT_TAG_ONLY = 2
DEFAULT = SYMMETRIC
@@ -67,7 +67,7 @@
return fmt.format('<pop-outer-tag-only/>')
raise ValueError('Invalid Men2UniManipulation enumeration')
- class ElineFlowType(Enum):
+ class ElineFlowType(IntEnum):
NNI_TO_UNI = 1
UNI_TO_NNI = 2
NNI_TO_NNI = 3
@@ -92,6 +92,7 @@
self._s_tag = None
self._stpid = None
self._switching_method = None
+ self.service_evc = False
self._ce_vlan_preservation = None
self._men_to_uni_tag_manipulation = None
@@ -167,7 +168,7 @@
@switching_method.setter
def switching_method(self, value):
assert self._switching_method is None or self._switching_method == value,\
- 'Switching Method can only be set once'
+ 'Switching Method can only be set once. EVC: {}'.format(self.name)
self._switching_method = value
@property
@@ -298,7 +299,6 @@
for evc_map in self.evc_maps:
try:
yield evc_map.install()
- pass # TODO: What to do on error?
except Exception as e:
evc_map.status = 'Exception during EVC-MAP Install: {}'.format(e.message)
@@ -350,7 +350,7 @@
if delete_maps:
for evc_map in self.evc_maps:
- dl.append(evc_map.delete()) # TODO: implement bulk-flow procedures
+ dl.append(evc_map.delete(None)) # TODO: implement bulk-flow procedures
yield defer.gatherResults(dl, consumeErrors=True)
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
index 1f76e6c..6dfee88 100644
--- a/voltha/adapters/adtran_olt/flow/evc_map.py
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -16,6 +16,8 @@
import re
import structlog
from enum import Enum
+from acl import ACL
+from twisted.internet import defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
log = structlog.get_logger()
@@ -25,7 +27,7 @@
# cover an entire pon, the name will have the ONU ID and GEM ID appended to it upon
# installation with a period as a separator.
-EVC_MAP_NAME_FORMAT = 'VOLTHA-{}-{}' # format(ingress-port, flow.id)
+EVC_MAP_NAME_FORMAT = 'VOLTHA-{}-{}' # format(logical-ingress-port-number, flow-id)
EVC_MAP_NAME_REGEX_ALL = 'VOLTHA-*'
@@ -66,19 +68,22 @@
raise ValueError('Invalid PriorityOption enumeration')
def __init__(self, flow, evc, is_ingress_map):
- self._flow = flow
- self._evc = evc
- self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
+ self._handler = flow.handler # Same for all Flows attached to this EVC MAP
+ self._flows = {flow.flow_id: flow}
+ self._evc = None
+ self._new_acls = dict() # ACL Name -> ACL Object (To be installed into h/w)
+ self._existing_acls = dict() # ACL Name -> ACL Object (Already in H/w)
+ self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
self._is_ingress_map = is_ingress_map
self._pon_id = None
self._installed = False
+ self._needs_update = False
self._status_message = None
-
+ self._deferred = None
self._name = None
self._enabled = True
self._uni_port = None
self._evc_connection = EVCMap.EvcConnection.DEFAULT
- self._evc_name = None
self._men_priority = EVCMap.PriorityOption.DEFAULT
self._men_pri = 0 # If Explicit Priority
@@ -103,28 +108,35 @@
self._udp_src = None
try:
- self._valid = self._decode()
+ self._valid = self._decode(evc)
except Exception as e:
log.exception('decode', e=e)
self._valid = False
- if self._valid:
- evc.add_evc_map(self)
- else:
- self._evc = None
-
def __str__(self):
- return "EVCMap-{}: UNI: {}, isACL: {}".format(self._name, self._uni_port,
- self._needs_acl_support)
+ return "EVCMap-{}: UNI: {}, hasACL: {}".format(self._name, self._uni_port,
+ self._needs_acl_support)
@staticmethod
- def create_ingress_map(flow, evc):
- return EVCMap(flow, evc, True)
+ def create_ingress_map(flow, evc, dry_run=False):
+ evc_map = EVCMap(flow, evc, True)
+
+ if evc_map._valid and not dry_run:
+ evc.add_evc_map(evc_map)
+ evc_map._evc = evc
+
+ return evc_map
@staticmethod
- def create_egress_map(flow, evc):
- return EVCMap(flow, evc, False)
+ def create_egress_map(flow, evc, dry_run=False):
+ evc_map = EVCMap(flow, evc, False)
+
+ if evc_map._valid and not dry_run:
+ evc.add_evc_map(evc_map)
+ evc_map._evc = evc
+
+ return evc_map
@property
def valid(self):
@@ -184,12 +196,16 @@
def _xml_trailer():
return '</evc-map></evc-maps>'
+ def get_evcmap_name(self, onu_id, gem_id):
+ return'{}.{}.{}'.format(self.name, onu_id, gem_id)
+
def _common_install_xml(self):
xml = '<enabled>{}</enabled>'.format('true' if self._enabled else 'false')
xml += '<uni>{}</uni>'.format(self._uni_port)
- if self._evc_name is not None:
- xml += '<evc>{}</evc>'.format(self._evc_name)
+ evc_name = self._evc.name if self._evc is not None else None
+ if evc_name is not None:
+ xml += '<evc>{}</evc>'.format(evc_name)
else:
xml += EVCMap.EvcConnection.xml(self._evc_connection)
@@ -218,7 +234,11 @@
def _ingress_install_xml(self, onu_s_gem_ids_and_vid):
from ..onu import Onu
- xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
+ if len(self._new_acls):
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps"' +\
+ ' xmlns:adtn-evc-map-acl="http://www.adtran.com/ns/yang/adtran-evc-map-access-control-list">'
+ else:
+ xml = '<evc-maps xmlns="http://www.adtran.com/ns/yang/adtran-evc-maps">'
for onu_or_vlan_id, gem_ids_and_vid in onu_s_gem_ids_and_vid.iteritems():
first_gem_id = True
@@ -238,6 +258,13 @@
xml += '<men-ctag>{}</men-ctag>'.format(vid) # Added in August 2017 model
xml += '</network-ingress-filter>'
+ if len(self._new_acls):
+ xml += '<adtn-evc-map-acl:access-lists>'
+ xml += ' <adtn-evc-map-acl:ingress-acl>'
+ for acl in self._new_acls.itervalues():
+ xml += acl.evc_map_ingress_xml()
+ xml += ' </adtn-evc-map-acl:ingress-acl>'
+ xml += '</adtn-evc-map-acl:access-lists>'
xml += self._common_install_xml()
xml += '</evc-map>'
xml += '</evc-maps>'
@@ -258,20 +285,45 @@
ports.extend(gems_and_vids[0])
return ports
- if self._valid and not self._installed and len(gem_ports()) > 0:
- try:
- # TODO: create generator of XML once we have MANY to install at once
- map_xml = self._ingress_install_xml(self._gem_ids_and_vid) \
- if self._is_ingress_map else self._egress_install_xml()
+ if self._valid and len(gem_ports()) > 0:
+ # Install ACLs first (if not yet installed)
+ acl_list = self._new_acls.values()
+ self._new_acls = dict()
- log.debug('install', xml=map_xml, name=self.name)
- results = yield self._flow.handler.netconf_client.edit_config(map_xml)
- self._installed = results.ok
- self.status = '' if results.ok else results.error
+ for acl in acl_list:
+ try:
+ yield acl.install()
+ # if not results.ok:
+ # pass # TODO : do anything?
- except Exception as e:
- log.exception('install', name=self.name, e=e)
- raise
+ except Exception as e:
+ log.exception('acl-install', name=self.name, e=e)
+ self._new_acls.update(acl_list)
+ raise
+
+ # Now EVC-MAP
+ if not self._installed or self._needs_update:
+ try:
+ self._cancel_deferred()
+ map_xml = self._ingress_install_xml(self._gem_ids_and_vid) \
+ if self._is_ingress_map else self._egress_install_xml()
+
+ log.debug('install', xml=map_xml, name=self.name)
+ results = yield self._handler.netconf_client.edit_config(map_xml)
+ was_installed = self._installed
+ self._installed = results.ok
+ self._needs_update = results.ok
+ self.status = '' if results.ok else results.error
+
+ if results.ok:
+ self._existing_acls.update(acl_list)
+ else:
+ self._new_acls.update(acl_list)
+
+ except Exception as e:
+ log.exception('map-install', name=self.name, e=e)
+ self._new_acls.update(acl_list)
+ raise
returnValue(self._installed and self._valid)
@@ -291,7 +343,7 @@
return EVCMap._xml_header('delete') + \
'<name>{}</name>'.format(self.name) + EVCMap._xml_trailer()
- def remove(self):
+ def _remove(self):
if not self.installed:
returnValue('Not installed')
@@ -305,31 +357,54 @@
log.error('remove-failed', failure=failure)
self._installed = False
- # TODO: create generator of XML once we have MANY to install at once
+ def _remove_acls(_):
+ acls, self._new_acls = self._new_acls, dict()
+ existing, self._existing_acls = self._existing_acls, dict()
+ acls.update(existing)
+
+ dl = []
+ for acl in acls.itervalues():
+ dl.append(acl.remove())
+
+ if len(dl) > 0:
+ defer.gatherResults(dl, consumeErrors=True)
+
map_xml = self._ingress_remove_xml(self._gem_ids_and_vid) if self._is_ingress_map \
else self._egress_remove_xml()
- d = self._flow.handler.netconf_client.edit_config(map_xml)
+ d = self._handler.netconf_client.edit_config(map_xml)
d.addCallbacks(_success, _failure)
+ d.addBoth(_remove_acls)
return d
@inlineCallbacks
- def delete(self):
+ def delete(self, flow):
"""
Remove from hardware and delete/clean-up EVC-MAP Object
+
+ :param flow: (FlowEntry) Specific flow to remove from the MAP or None if all
+ flows should be removed
+ :return:
"""
- if self._evc is not None:
- self._evc.remove_evc_map(self)
- self._evc = None
+ flows = [flow] if flow is not None else list(self._flows.values())
+ removing_all = len(flows) == len(self._flows)
- self._flow = None
- self._valid = False
+ if not removing_all:
+ for f in flows:
+ self._remove_flow(f)
- try:
- yield self.remove()
+ else:
+ if self._evc is not None:
+ self._evc.remove_evc_map(self)
+ self._evc = None
- except Exception as e:
- log.exception('removal', e=e)
+ self._valid = False
+ self._cancel_deferred()
+ try:
+ yield self._remove()
+
+ except Exception as e:
+ log.exception('removal', e=e)
returnValue('Done')
@@ -340,8 +415,142 @@
return reflow
@staticmethod
+ def find_matching_ingress_flow(flow, upstream_flow_table):
+ """
+ Look for an existing EVC-MAP that may match this flow. Called when upstream signature
+ for a flow does not make match. This can happen if an ACL flow is added and only an User
+ Data flow exists, or if only an ACL flow exists.
+
+ :param flow: (FlowEntry) flow to add
+ :param upstream_flow_table: (dict of FlowEntry) Existing upstream flows for this device,
+ including the flow we are looking to add
+ :return: (EVCMap) if appropriate one is found, else None
+ """
+ # A User Data flow will have:
+ # signature: <dev>.1.5.2.242
+ # down-sig: <dev>.1.*.2.*
+ # logical-port: 66
+ # is-acl-flow: False
+ #
+ # An ACL flow will have:
+ # signature: <dev>.1.5.[4092 or 4094].None (untagged VLAN == utility VLAN case)
+ # down-sig: <dev>.1.*.[4092 or 4094].*
+ # logical-port: 66
+ # is-acl-flow: True
+ #
+ # Reduce the upstream flow table to only those that match the ingress,
+ # and logical-ports match (and is not this flow) and have a map
+
+ log.debug('find-matching-ingress-flow', logical_port=flow.logical_port, flow=flow.output)
+ candidate_flows = [f for f in upstream_flow_table.itervalues() if
+ f.logical_port == flow.logical_port and
+ f.output == flow.output and
+ f.evc_map is not None] # This weeds out this flow
+
+ log.debug('find-matching-ingress-flow', candidate_flows=candidate_flows)
+ return candidate_flows[0].evc_map if len(candidate_flows) > 0 else None
+
+ def add_flow(self, flow, evc):
+ """
+ Add a new flow to an existing EVC-MAP. This can be called to add:
+ o an ACL flow to an existing utility/untagged EVC, or
+ o an ACL flow to an existing User Data Flow, or
+ o a User Data Flow to an existing ACL flow (and this needs the EVC updated
+ as well.
+
+ Note that the Downstream EVC provided is the one that matches this flow. If
+ this is adding an ACL to and existing User data flow, we DO NOT want to
+ change the EVC Map's EVC
+
+ :param flow: (FlowEntry) New flow
+ :param evc: (EVC) Matching EVC for downstream flow
+ """
+ from flow_entry import FlowEntry
+ # Create temporary EVC-MAP
+ assert flow.flow_direction == FlowEntry.FlowDirection.UPSTREAM, \
+ 'Only Upstream flows additions are supported at this time'
+
+ tmp_map = EVCMap.create_ingress_map(flow, evc, dry_run=True) \
+ if flow.flow_direction == FlowEntry.FlowDirection.UPSTREAM \
+ else EVCMap.create_egress_map(flow, evc, dry_run=True)
+
+ if tmp_map is None or not tmp_map.valid:
+ return None
+
+ self._needs_update = True
+
+ if len(tmp_map._new_acls) > 0:
+ self._new_acls.update(tmp_map._new_acls) # New ACL flow
+ log.debug('add-acl-flows', map=str(self), new=tmp_map._new_acls)
+
+ # Look up existing EVC for this flow. If it is a service EVC for
+ # Packet In/Out, and this is a regular flow, migrate to the newer EVC
+ if self._evc.service_evc and not evc.service_evc:
+ log.info('new-evc-for-map', old=self._evc.name, new=evc.name)
+ self._evc.remove_evc_map(self)
+ evc.add_evc_map(self)
+ self._evc = evc
+ return self
+
+ @inlineCallbacks
+ def _remove_flow(self, flow):
+ """
+ Remove a specific flow from an EVC_MAP. This includes removing any
+ ACL entries associated with the flow and could result in moving the
+ EVC-MAP over to another EVC.
+
+ :param flow: (FlowEntry) Flow to remove
+ :param removing_all: (bool) If True, all flows are being removed from EVC-MAP
+ """
+ try:
+ self._flows.pop(flow.flow_id)
+
+ if not flow.handler.exception_gems: # ! FIXED_ONU
+ # Remove any ACLs
+
+ acl = ACL.create(flow)
+ if acl is not None:
+ # Remove ACL from EVC-MAP entry
+
+ try:
+ # TODO: Create EVC-MAP with proper 'delete-acl-list' request
+ # TODO: and send it
+ pass
+
+ # TODO: Scan EVC to see if it needs to move back to the Utility
+ # or Untagged EVC from a user data EVC
+ pass
+
+ except Exception as e:
+ log.exception('acl-remove-from-evc', e=e)
+
+ # Remove ACL itself
+ try:
+ yield acl.remove()
+
+ except Exception as e:
+ log.exception('acl-remove', e=e)
+
+ except Exception as e:
+ log.exception('remove-failed', e=e)
+
+ @staticmethod
def create_evc_map_name(flow):
- return EVC_MAP_NAME_FORMAT.format(flow.in_port, flow.flow_id)
+ return EVC_MAP_NAME_FORMAT.format(flow.logical_port, flow.flow_id)
+
+ @staticmethod
+ def decode_evc_map_name(name):
+ """
+ Reverse engineer EVC-MAP name parameters. Helpful in quick packet-in
+ processing
+
+ :param name: (str) EVC Map Name
+ :return: (dict) Logical Ingress Port, OpenFlow Flow-ID
+ """
+ items = name.split('-') if name is not None else dict()
+
+ return {'ingress-port': items[1],
+ 'flow-id': items[2]} if len(items) == 3 else dict()
def add_gem_port(self, gem_port, reflow=False):
# TODO: Refactor
@@ -377,20 +586,18 @@
if len(before) > len(after):
if len(after) == 0:
- return self.remove()
+ return self._remove()
else:
self._installed = False
return self.install()
return succeed('nop')
-
-# self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
-
def _setup_gem_ids(self):
from flow_entry import FlowEntry
- flow = self._flow # TODO: Drop saving of flow once debug complete
+ # all flows should have same GEM port setup
+ flow = self._flows.itervalues().next()
is_pon = flow.handler.is_pon_port(flow.in_port)
if self._is_ingress_map and is_pon:
@@ -398,25 +605,27 @@
if pon_port is not None:
self._pon_id = pon_port.pon_id
- self._gem_ids_and_vid = pon_port.gem_ids(flow.logical_port,
- self._needs_acl_support,
+ exception_gems = self._needs_acl_support and flow.handler.exception_gems # FIXED_ONU
+ untagged_gem = flow.eth_type == FlowEntry.EtherType.EAPOL and\
+ flow.handler.untagged_vlan != flow.handler.utility_vlan
+ self._gem_ids_and_vid = pon_port.gem_ids(flow.logical_port, untagged_gem,
+ exception_gems, # FIXED_ONU
flow.is_multicast_flow)
-
- # TODO: Only EAPOL ACL support for the first demo - FIXED_ONU
- if self._needs_acl_support and self._eth_type != FlowEntry.EtherType.EAPOL.value:
+ # FIXED_ONU
+ if exception_gems and self._eth_type != FlowEntry.EtherType.EAPOL:
self._gem_ids_and_vid = dict()
- def _decode(self):
+ def _decode(self, evc):
from evc import EVC
from flow_entry import FlowEntry
- flow = self._flow # TODO: Drop saving of flow once debug complete
+ # Only called from initializer, so first flow is only flow
+ flow = self._flows.itervalues().next()
self._name = EVCMap.create_evc_map_name(flow)
- if self._evc:
+ if evc:
self._evc_connection = EVCMap.EvcConnection.EVC
- self._evc_name = self._evc.name
else:
self._status_message = 'Can only create EVC-MAP if EVC supplied'
return False
@@ -426,7 +635,7 @@
if is_pon or is_uni:
self._uni_port = flow.handler.get_port_name(flow.in_port)
- self._evc.ce_vlan_preservation = False
+ evc.ce_vlan_preservation = False
else:
self._status_message = 'EVC-MAPS without UNI or PON ports are not supported'
return False # UNI Ports handled in the EVC Maps
@@ -435,11 +644,11 @@
self._eth_type = flow.eth_type
- if self._eth_type == FlowEntry.EtherType.IPv4.value:
+ if self._eth_type == FlowEntry.EtherType.IPv4:
self._ip_protocol = flow.ip_protocol
self._ipv4_dst = flow.ipv4_dst
- if self._ip_protocol == FlowEntry.IpProtocol.UDP.value:
+ if self._ip_protocol == FlowEntry.IpProtocol.UDP:
self._udp_dst = flow.udp_dst
self._udp_src = flow.udp_src
@@ -449,25 +658,24 @@
self._setup_gem_ids()
# self._match_untagged = flow.vlan_id is None and flow.inner_vid is None
- self._c_tag = flow.inner_vid
+ self._c_tag = flow.inner_vid or flow.vlan_id
# If a push of a single VLAN is present with a POP of the VLAN in the EVC's
# flow, then this is a traditional EVC flow
- self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
- self._evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
+ evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
+ evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED # \
+ # if self._c_tag is not None else EVC.SwitchingMethod.SINGLE_TAGGED
- # if len(flow.push_vlan_id) == 1 and self._evc.flow_entry.pop_vlan == 1:
- # self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.SYMMETRIC
- # self._evc.switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
- # self._evc.stpid = flow.push_vlan_tpid[0]
- #
- # elif len(flow.push_vlan_id) == 2 and self._evc.flow_entry.pop_vlan == 1:
- # self._evc.men_to_uni_tag_manipulation = EVC.Men2UniManipulation.POP_OUT_TAG_ONLY
- # self._evc.switching_method = EVC.SwitchingMethod.DOUBLE_TAGGED
- # # self._match_ce_vlan_id = 'something maybe'
- # raise NotImplementedError('TODO: Not supported/needed yet')
+ if not flow.handler.exception_gems: # ! FIXED_ONU
+ try:
+ acl = ACL.create(flow)
+ if acl.name not in self._new_acls:
+ self._new_acls[acl.name] = acl
+ except Exception as e:
+ log.exception('ACL-decoding', e=e)
+ return False
return True
# Bulk operations
@@ -538,3 +746,11 @@
d.addCallbacks(do_delete, request_failed, callbackArgs=[regex_], errbackArgs=['get'])
d.addCallbacks(delete_complete, request_failed, errbackArgs=['edit-config'])
return d
+
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
\ No newline at end of file
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
index 9af0bba..7af92c8 100644
--- a/voltha/adapters/adtran_olt/flow/flow_entry.py
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -14,13 +14,15 @@
from evc import EVC
from evc_map import EVCMap
-from enum import Enum
+from enum import IntEnum
+from untagged_evc import UntaggedEVC
+from utility_evc import UtilityEVC
import voltha.core.flow_decomposer as fd
from voltha.core.flow_decomposer import *
from voltha.protos.openflow_13_pb2 import OFPP_MAX
from twisted.internet import defer
-from twisted.internet.defer import returnValue, inlineCallbacks, succeed, gatherResults
+from twisted.internet.defer import returnValue, inlineCallbacks, gatherResults
log = structlog.get_logger()
@@ -33,16 +35,16 @@
]
_existing_downstream_flow_entries = {} # device-id -> signature-table
+ # |
+ # +-> downstream-signature
# |
- # +-> downstream-signature
- # |
- # +-> 'evc' -> EVC
- # |
- # +-> flow-ids -> flow-entry
+ # +-> 'evc' -> EVC
+ # |
+ # +-> flow-ids -> flow-entry
_existing_upstream_flow_entries = {} # device-id -> flow dictionary
- # |
- # +-> flow-id -> flow-entry
+ # |
+ # +-> flow-id -> flow-entry
class FlowEntry(object):
@@ -55,7 +57,7 @@
Note: Since only E-LINE is supported, modification of an existing EVC is not performed.
"""
- class FlowDirection(Enum):
+ class FlowDirection(IntEnum):
UPSTREAM = 0 # UNI port to NNI Port
DOWNSTREAM = 1 # NNI port to UNI Port
NNI = 2 # NNI port to NNI Port
@@ -68,20 +70,22 @@
(FlowDirection.UNI, FlowDirection.UNI): FlowDirection.UNI,
(FlowDirection.NNI, FlowDirection.NNI): FlowDirection.NNI,
}
+ LEGACY_CONTROL_VLAN = 4000
# Well known EtherTypes
- class EtherType(Enum):
+ class EtherType(IntEnum):
EAPOL = 0x888E
IPv4 = 0x0800
+ IPv6 = 0x86DD
ARP = 0x0806
# Well known IP Protocols
- class IpProtocol(Enum):
+ class IpProtocol(IntEnum):
IGMP = 2
UDP = 17
def __init__(self, flow, handler):
- self._flow = flow # TODO: Remove later
+ self._flow = flow
self._handler = handler
self.flow_id = flow.id
self.evc = None # EVC this flow is part of
@@ -89,6 +93,7 @@
self._flow_direction = FlowEntry.FlowDirection.OTHER
self._logical_port = None # Currently ONU VID is logical port if not doing xPON
self._is_multicast = False
+ self._is_acl_flow = False
# A value used to locate possible related flow entries
self.signature = None
@@ -114,8 +119,9 @@
self._name = self.create_flow_name()
def __str__(self):
- return 'flow_entry: {}, in: {}, out: {}'.format(self.name, self.in_port,
- self.output)
+ return 'flow_entry: {}, in: {}, out: {}, vid: {}, inner:{}, eth: {}, IP: {}'.format(
+ self.name, self.in_port, self.output, self.vlan_id, self.inner_vid,
+ self.eth_type, self.ip_protocol)
@property
def name(self):
@@ -145,6 +151,10 @@
return self._is_multicast
@property
+ def is_acl_flow(self):
+ return self._is_acl_flow or self._needs_acl_support
+
+ @property
def logical_port(self):
return self._logical_port # NNI or UNI Logical Port
@@ -241,7 +251,6 @@
upstream_flows = None
# Compute EVC and and maps
-
evc = FlowEntry._create_evc_and_maps(evc, downstream_flow, upstream_flows)
if evc is not None and evc.valid and downstream_flow_table['evc'] is None:
downstream_flow_table['evc'] = evc
@@ -249,7 +258,7 @@
return flow_entry, evc
except Exception as e:
- log.exception('flow_entry-processing', e=e)
+ log.exception('flow-entry-processing', e=e)
return None, None
@staticmethod
@@ -276,6 +285,12 @@
from mcast import MCastEVC
downstream_flow.evc = MCastEVC.create(downstream_flow)
+ elif downstream_flow.is_acl_flow:
+ if any(flow.eth_type == FlowEntry.EtherType.EAPOL for flow in upstream_flows) and\
+ downstream_flow.handler.utility_vlan != downstream_flow.handler.untagged_vlan:
+ downstream_flow.evc = UntaggedEVC.create(downstream_flow)
+ else:
+ downstream_flow.evc = UtilityEVC.create(downstream_flow)
else:
downstream_flow.evc = EVC(downstream_flow)
@@ -283,10 +298,34 @@
return None
# Create EVC-MAPs. Note upstream_flows is empty list for multicast
+ # For Packet In/Out support. The upstream flows for will have matching
+ # signatures. So the first one to get created should create the EVC and
+ # if it needs and ACL, do so then. The second one should just reference
+ # the first map.
+ #
+ # If the second has and ACL, then it should add it to the map.
+ # TODO: What to do if the second (or third, ...) is the data one.
+ # What should it do then?
+ sig_map_map = {f.signature: f.evc_map for f in upstream_flows
+ if f.evc_map is not None}
for flow in upstream_flows:
if flow.evc_map is None:
- flow.evc_map = EVCMap.create_ingress_map(flow, downstream_flow.evc)
+ if flow.signature in sig_map_map:
+ # Found an explicity matching existing EVC-MAP. Add flow to this EVC-MAP
+ flow.evc_map = sig_map_map[flow.signature].add_flow(flow, downstream_flow.evc)
+ elif flow.handler.exception_gems: # FIXED_ONU
+ # Create a new MAP
+ flow.evc_map = EVCMap.create_ingress_map(flow, downstream_flow.evc)
+ else:
+ # May need to create a MAP or search for an existing ACL/user EVC-Map
+ upstream_flow_table = _existing_upstream_flow_entries[flow.device_id]
+ existing_flow = EVCMap.find_matching_ingress_flow(flow, upstream_flow_table)
+
+ if existing_flow is None:
+ flow.evc_map = EVCMap.create_ingress_map(flow, downstream_flow.evc)
+ else:
+ flow.evc_map = existing_flow.add_flow(flow, downstream_flow.evc)
all_maps_valid = all(flow.evc_map.valid for flow in upstream_flows) \
or downstream_flow.is_multicast_flow
@@ -294,10 +333,7 @@
return downstream_flow.evc if all_maps_valid else None
@property
- def _needs_acl_support(self):
- """
- TODO: This is only while there is only a single downstream exception flow
- """
+ def _needs_acl_support(self): # FIXED_ONU- maybe
if self.ipv4_dst is not None: # In case MCAST downstream has ACL on it
return False
@@ -312,7 +348,6 @@
if status:
# Determine direction of the flow
-
def port_type(port_number):
if port_number in self._handler.northbound_ports:
return FlowEntry.FlowDirection.NNI
@@ -323,10 +358,46 @@
self._flow_direction = FlowEntry._flow_dir_map.get((port_type(self.in_port), port_type(self.output)),
FlowEntry.FlowDirection.OTHER)
+ # Modify flow entry for newer utility/untagged VLAN support
+ if not self.handler.exception_gems: # ! FIXED_ONU
+ # New Packet In/Out support
+ if self._flow_direction == FlowEntry.FlowDirection.DOWNSTREAM and\
+ self.vlan_id in (FlowEntry.LEGACY_CONTROL_VLAN,
+ self.handler.untagged_vlan):
+ # May be for to controller flow downstream (no ethType) or multicast (ethType = IP)
+ if self.eth_type is None or self._needs_acl_support:
+ self._is_multicast = False
+ self._is_acl_flow = True
+ if self.inner_vid is not None:
+ logical_port, subscriber_vlan, untagged_vlan = \
+ self._handler.get_onu_port_and_vlans(self)
+ self.inner_vid = subscriber_vlan
+ self.vlan_id = self.handler.utility_vlan
+ else:
+ self.vlan_id = self.handler.untagged_vlan
+ elif self._flow_direction == FlowEntry.FlowDirection.UPSTREAM:
+ try:
+ # TODO: Need to support flow retry if the ONU is not yet activated !!!!
+ # Get the correct logical port and subscriber VLAN for this UNI
+ self._logical_port, self.vlan_id, untagged_vlan = \
+ self._handler.get_onu_port_and_vlans(self)
+
+ if self._needs_acl_support:
+ self._is_acl_flow = True
+ if self.eth_type == FlowEntry.EtherType.EAPOL and \
+ self.handler.untagged_vlan != self.handler.utility_vlan:
+ self.vlan_id = None
+ self.push_vlan_id[0] = self.handler.untagged_vlan
+ else:
+ self.push_vlan_id[0] = self.handler.utility_vlan
+
+ except Exception as e:
+ # TODO: Need to support flow retry if the ONU is not yet activated !!!!
+ log.exception('tag-fixup', e=e)
+
# Create a signature that will help locate related flow entries on a device.
# These are not exact, just ones that may be put together to make an EVC. The
# basic rules are:
- #
# 1 - Same device
dev_id = self._handler.device_id
@@ -336,7 +407,6 @@
# 3 - The outer VID
# 4 - The inner VID. Wildcard if downstream
-
push_len = len(self.push_vlan_id)
if push_len == 0:
outer = self.vlan_id
@@ -373,7 +443,7 @@
self.in_port = fd.get_in_port(self._flow)
if self.in_port > OFPP_MAX:
- log.warn('Logical-input-ports-not-supported')
+ log.warn('logical-input-ports-not-supported')
return False
for field in fd.get_ofb_fields(self._flow):
@@ -381,16 +451,13 @@
assert self.in_port == field.port, 'Multiple Input Ports found in flow rule'
if self._handler.is_nni_port(self.in_port):
- self._logical_port = self.in_port
+ self._logical_port = self.in_port # TODO: This should be a lookup
elif field.type == VLAN_VID:
# log.info('*** field.type == VLAN_VID', value=field.vlan_vid & 0xfff)
self.vlan_id = field.vlan_vid & 0xfff
self._is_multicast = self.vlan_id in self._handler.multicast_vlans
- if self._handler.is_pon_port(self.in_port):
- self._logical_port = self.vlan_id
-
elif field.type == VLAN_PCP:
# log.info('*** field.type == VLAN_PCP', value=field.vlan_pcp)
self.pcp = field.vlan_pcp
@@ -434,7 +501,7 @@
self.output = fd.get_out_port(self._flow)
if self.output > OFPP_MAX:
- log.warn('Logical-output-ports-not-supported')
+ log.warn('logical-output-ports-not-supported')
return False
for act in fd.get_actions(self._flow):
@@ -470,21 +537,24 @@
@staticmethod
def drop_missing_flows(device_id, valid_flow_ids):
dl = []
-
- flow_table = _existing_upstream_flow_entries.get(device_id)
- if flow_table is not None:
- flows_to_drop = [flow for flow_id, flow in flow_table.items()
- if flow_id not in valid_flow_ids]
- dl.extend([flow.remove() for flow in flows_to_drop])
-
- sig_table = _existing_downstream_flow_entries.get(device_id)
- if sig_table is not None:
- for flow_table in sig_table.itervalues():
+ try:
+ flow_table = _existing_upstream_flow_entries.get(device_id)
+ if flow_table is not None:
flows_to_drop = [flow for flow_id, flow in flow_table.items()
- if isinstance(flow, FlowEntry) and flow_id not in valid_flow_ids]
+ if flow_id not in valid_flow_ids]
dl.extend([flow.remove() for flow in flows_to_drop])
- return gatherResults(dl, consumeErrors=True)
+ sig_table = _existing_downstream_flow_entries.get(device_id)
+ if sig_table is not None:
+ for flow_table in sig_table.itervalues():
+ flows_to_drop = [flow for flow_id, flow in flow_table.items()
+ if isinstance(flow, FlowEntry) and flow_id not in valid_flow_ids]
+ dl.extend([flow.remove() for flow in flows_to_drop])
+
+ except Exception as e:
+ pass
+
+ return gatherResults(dl, consumeErrors=True) if len(dl) > 0 else returnValue('no-flows-to-drop')
@inlineCallbacks
def remove(self):
@@ -541,7 +611,7 @@
try:
dl = []
if evc_map is not None:
- dl.append(evc_map.delete())
+ dl.append(evc_map.delete(self))
if evc is not None:
dl.append(evc.delete())
@@ -613,3 +683,32 @@
"""
raise NotImplemented("TODO: Implement this")
+ @staticmethod
+ def get_packetout_info(device_id, logical_port):
+ """
+ Find parameters needed to send packet out succesfully to the OLT.
+
+ :param device_id: A Voltha.Device object.
+ :param logical_port: (int) logical port number for packet to go out.
+
+ :return: physical port number, ctag, stag, evcmap name
+ """
+ from ..onu import Onu
+
+ log.debug('get-packetout-info', device_id=device_id, logical_port=logical_port)
+ all_flow_entries = _existing_upstream_flow_entries.get(device_id) or {}
+ for flow_entry in all_flow_entries.itervalues():
+ log.debug('get-packetout-info', flow_entry=flow_entry)
+ if flow_entry.evc_map is not None and flow_entry.evc_map.valid and flow_entry.logical_port == logical_port:
+ evc_map = flow_entry.evc_map
+ gem_ids_and_vid = evc_map.gem_ids_and_vid
+ if len(gem_ids_and_vid) > 0:
+ for onu_id, gem_ids_with_vid in gem_ids_and_vid.iteritems():
+ log.debug('get-packetout-info', onu_id=onu_id, gem_ids_with_vid=gem_ids_with_vid)
+ if len(gem_ids_with_vid) > 0:
+ gem_ids = gem_ids_with_vid[0]
+ ctag = gem_ids_with_vid[1]
+ gem_id = gem_ids[0] # TODO: always grab fist in list
+ return flow_entry.in_port, ctag, Onu.gem_id_to_gvid(gem_id), evc_map.get_evcmap_name(onu_id, gem_id)
+ return None, None, None, None
+
diff --git a/voltha/adapters/adtran_olt/flow/mcast.py b/voltha/adapters/adtran_olt/flow/mcast.py
index 7051bd8..124a262 100644
--- a/voltha/adapters/adtran_olt/flow/mcast.py
+++ b/voltha/adapters/adtran_olt/flow/mcast.py
@@ -137,7 +137,7 @@
dl = [self.remove()]
if delete_maps:
for evc_map in self.evc_maps:
- dl.append(evc_map.delete()) # TODO: implement bulk-flow procedures
+ dl.append(evc_map.delete(self)) # TODO: implement bulk-flow procedures
yield defer.gatherResults(dl, consumeErrors=True)
diff --git a/voltha/adapters/adtran_olt/flow/untagged_evc.py b/voltha/adapters/adtran_olt/flow/untagged_evc.py
new file mode 100644
index 0000000..84a92c7
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/untagged_evc.py
@@ -0,0 +1,154 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from evc import EVC
+from twisted.internet import defer
+from twisted.internet.defer import returnValue, inlineCallbacks
+import voltha.core.flow_decomposer as fd
+from voltha.core.flow_decomposer import *
+
+log = structlog.get_logger()
+
+EVC_NAME_FORMAT = 'VOLTHA-UNTAGGED-{}' # format(flow.vlan_id)
+EVC_NAME_REGEX_ALL = EVC_NAME_FORMAT.format('*')
+
+
+_untagged_evcs = {} # device-id -> flow dictionary
+ # |
+ # +-> untagged-vlan-id -> evcs
+
+
+class UntaggedEVC(EVC):
+ """
+ Class to wrap Untagged (no C-Tag) EVC functionality
+ """
+ def __init__(self, flow_entry):
+ super(UntaggedEVC, self).__init__(flow_entry)
+ # No Inner-VID
+ self._switching_method = EVC.SwitchingMethod.SINGLE_TAGGED
+ self._downstream_flows = {flow_entry.flow_id} # Matching Downstream Flow IDs
+ self.service_evc = True
+
+ def __str__(self):
+ return "VOLTHA-UNTAGGED-{}: MEN: {}, VLAN: {}".format(self._name, self._men_ports, self._s_tag)
+
+ def _create_name(self):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.vlan_id)
+
+ @staticmethod
+ def create(flow_entry):
+ device_id = flow_entry.device_id
+ evc_table = _untagged_evcs.get(device_id)
+
+ if evc_table is None:
+ _untagged_evcs[device_id] = dict()
+ evc_table = _untagged_evcs[device_id]
+
+ try:
+ evc = evc_table.get(flow_entry.vlan_id)
+
+ if evc is None:
+ # Create EVC and initial EVC Map
+ evc = UntaggedEVC(flow_entry)
+ evc_table[flow_entry.vlan_id] = evc
+ else:
+ if flow_entry.flow_id in evc.downstream_flows: # TODO: Debug only to see if flow_ids are unique
+ pass
+ else:
+ evc.add_downstream_flows(flow_entry.flow_id)
+
+ return evc
+
+ except Exception as e:
+ log.exception('untagged-create', e=e)
+ return None
+
+ @property
+ def flow_entry(self):
+ return self._flow
+
+ @property
+ def downstream_flows(self):
+ return frozenset(self._downstream_flows)
+
+ def add_downstream_flows(self, flow_id):
+ self._downstream_flows.add(flow_id)
+
+ def remove_downstream_flows(self, flow_id):
+ self._downstream_flows.discard(flow_id)
+
+ @inlineCallbacks
+ def remove(self, remove_maps=True):
+ """
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
+ """
+ log.info('removing', evc=self, remove_maps=remove_maps)
+
+ device_id = self._handler.device_id
+ flow_id = self._flow.id
+ evc_table = _untagged_evcs.get(device_id)
+
+ if evc_table is None or flow_id not in evc_table:
+ returnValue('NOP')
+
+ # Remove flow reference
+ if self._flow.flow_id in self._downstream_flows:
+ del self._downstream_flows[self._flow.flow_id]
+
+ if len(self._downstream_flows) == 0:
+ # Use base class to clean up
+ returnValue(super(UntaggedEVC, self).remove(remove_maps=True))
+
+ returnValue('More references')
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
+ try:
+ dl = [self.remove()]
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete(None)) # TODO: implement bulk-flow procedures
+
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ def reflow(self, reflow_maps=True):
+ pass # TODO: Implement or use base class?
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_NAME_REGEX_ALL):
+ """
+ Remove all matching EVCs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ pass # TODO: ???
diff --git a/voltha/adapters/adtran_olt/flow/utility_evc.py b/voltha/adapters/adtran_olt/flow/utility_evc.py
new file mode 100644
index 0000000..cb8a4ae
--- /dev/null
+++ b/voltha/adapters/adtran_olt/flow/utility_evc.py
@@ -0,0 +1,151 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from voltha.core.flow_decomposer import *
+from evc import EVC
+from twisted.internet import defer
+from twisted.internet.defer import returnValue, inlineCallbacks
+
+log = structlog.get_logger()
+
+EVC_NAME_FORMAT = 'VOLTHA-UTILITY-{}' # format(flow.vlan_id)
+EVC_NAME_REGEX_ALL = EVC_NAME_FORMAT.format('*')
+
+
+_utility_evcs = {} # device-id -> flow dictionary
+ # |
+ # +-> untagged-vlan-id -> evcs
+
+
+class UtilityEVC(EVC):
+ """
+ Class to wrap orphan ingress ACLs EVC functionality
+ """
+ def __init__(self, flow_entry):
+ super(UtilityEVC, self).__init__(flow_entry)
+ self._downstream_flows = {flow_entry.flow_id} # Matching Downstream Flow IDs
+ self.service_evc = True
+
+ def __str__(self):
+ return "VOLTHA-UTILITY-{}: MEN: {}, VLAN: {}".format(self._name, self._men_ports, self._s_tag)
+
+ def _create_name(self):
+ #
+ # TODO: Take into account selection criteria and output to make the name
+ #
+ return EVC_NAME_FORMAT.format(self._flow.vlan_id)
+
+ @staticmethod
+ def create(flow_entry):
+ device_id = flow_entry.device_id
+ evc_table = _utility_evcs.get(device_id)
+
+ if evc_table is None:
+ _utility_evcs[device_id] = dict()
+ evc_table = _utility_evcs[device_id]
+
+ try:
+ evc = evc_table.get(flow_entry.vlan_id)
+
+ if evc is None:
+ # Create EVC and initial EVC Map
+ evc = UtilityEVC(flow_entry)
+ evc_table[flow_entry.vlan_id] = evc
+ else:
+ if flow_entry.flow_id in evc.downstream_flows: # TODO: Debug only to see if flow_ids are unique
+ pass
+ else:
+ evc.add_downstream_flows(flow_entry.flow_id)
+
+ return evc
+
+ except Exception as e:
+ log.exception('untagged-create', e=e)
+ return None
+
+ @property
+ def flow_entry(self):
+ return self._flow
+
+ @property
+ def downstream_flows(self):
+ return frozenset(self._downstream_flows)
+
+ def add_downstream_flows(self, flow_id):
+ self._downstream_flows.add(flow_id)
+
+ def remove_downstream_flows(self, flow_id):
+ self._downstream_flows.discard(flow_id)
+
+ @inlineCallbacks
+ def remove(self, remove_maps=True):
+ """
+ Remove EVC (and optional associated EVC-MAPs) from hardware
+ :param remove_maps: (boolean)
+ :return: (deferred)
+ """
+ log.info('removing', evc=self, remove_maps=remove_maps)
+
+ device_id = self._handler.device_id
+ flow_id = self._flow.id
+ evc_table = _utility_evcs.get(device_id)
+
+ if evc_table is None or flow_id not in evc_table:
+ returnValue('NOP')
+
+ # Remove flow reference
+ if self._flow.flow_id in self._downstream_flows:
+ del self._downstream_flows[self._flow.flow_id]
+
+ if len(self._downstream_flows) == 0:
+ # Use base class to clean up
+ returnValue(super(UtilityEVC, self).remove(remove_maps=True))
+
+ returnValue('More references')
+
+ @inlineCallbacks
+ def delete(self, delete_maps=True):
+ """
+ Remove from hardware and delete/clean-up EVC Object
+ """
+ log.info('deleting', evc=self, delete_maps=delete_maps)
+
+ try:
+ dl = [self.remove()]
+ if delete_maps:
+ for evc_map in self.evc_maps:
+ dl.append(evc_map.delete(None)) # TODO: implement bulk-flow procedures
+
+ yield defer.gatherResults(dl, consumeErrors=True)
+
+ except Exception as e:
+ log.exception('removal', e=e)
+
+ self._evc_maps = None
+ f, self._flow = self._flow, None
+ if f is not None and f.handler is not None:
+ f.handler.remove_evc(self)
+
+ def reflow(self, reflow_maps=True):
+ pass # TODO: Implement or use base class?
+
+ @staticmethod
+ def remove_all(client, regex_=EVC_NAME_REGEX_ALL):
+ """
+ Remove all matching EVCs from hardware
+ :param client: (ncclient) NETCONF Client to use
+ :param regex_: (String) Regular expression for name matching
+ :return: (deferred)
+ """
+ pass # TODO: ???
diff --git a/voltha/adapters/adtran_olt/net/adtran_zmq.py b/voltha/adapters/adtran_olt/net/adtran_zmq.py
index 9ef0731..1d1341c 100644
--- a/voltha/adapters/adtran_olt/net/adtran_zmq.py
+++ b/voltha/adapters/adtran_olt/net/adtran_zmq.py
@@ -13,9 +13,6 @@
# limitations under the License.
import sys
-import json
-import struct
-import binascii
import structlog
from twisted.internet.defer import succeed
@@ -32,162 +29,73 @@
from threading import Thread, Event
-log = structlog.get_logger()
zmq_factory = ZmqFactory()
-# An OMCI message minimally has a 32-bit PON index and 32-bit ONU ID.
-
-DEFAULT_PON_AGENT_TCP_PORT = 5656
-DEFAULT_PIO_TCP_PORT = 5657
-
class AdtranZmqClient(object):
"""
Adtran ZeroMQ Client for PON Agent and/or packet in/out service
"""
def __init__(self, ip_address, rx_callback, port):
- external_conn = 'tcp://{}:{}'.format(ip_address, port)
- endpoint = ZmqEndpoint('connect', external_conn)
+ self.log = structlog.get_logger()
- self._socket = ZmqPairConnection(zmq_factory, endpoint)
+ external_conn = 'tcp://{}:{}'.format(ip_address, port)
+
+ self.zmq_endpoint = ZmqEndpoint('connect', external_conn)
+ self._socket = ZmqPairConnection(zmq_factory, self.zmq_endpoint)
self._socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
+ self.auth = None
def send(self, data):
try:
self._socket.send(data)
except Exception as e:
- log.exception('send', e=e)
+ self.log.exception('send', e=e)
def shutdown(self):
self._socket.onReceive = AdtranZmqClient.rx_nop
self._socket.shutdown()
- @staticmethod
- def rx_nop(message):
- log.debug('discarding-no-receiver')
+ @property
+ def socket(self):
+ return self._socket
@staticmethod
- def encode_omci_message(msg, pon_index, onu_id, is_async_control):
- """
- Create an OMCI Tx Packet for the specified ONU
+ def rx_nop(_):
+ pass
- :param msg: (str) OMCI message to send
- :param pon_index: (unsigned int) PON Port index
- :param onu_id: (unsigned int) ONU ID
- :param is_async_control: (bool) Newer async/JSON support
+ def setup_plain_security(self, username, password):
+ self.log.debug('setup-plain-security')
- :return: (bytes) octet string to send
- """
- assert msg, 'No message provided'
+ def configure_plain(_):
+ self.log.debug('plain-security', username=username,
+ password=password)
- return AdtranZmqClient._encode_omci_message_json(msg, pon_index, onu_id) \
- if is_async_control else \
- AdtranZmqClient._encode_omci_message_legacy(msg, pon_index, onu_id)
+ self.auth.configure_plain(domain='*', passwords={username: password})
+ self._socket.socket.plain_username = username
+ self._socket.socket.plain_password = password
- @staticmethod
- def _encode_omci_message_legacy(msg, pon_index, onu_id):
- """
- Create an OMCI Tx Packet for the specified ONU
+ def add_endoints(_results):
+ self._socket.addEndpoints([self.zmq_endpoint])
- :param msg: (str) OMCI message to send
- :param pon_index: (unsigned int) PON Port index
- :param onu_id: (unsigned int) ONU ID
+ def config_failure(_results):
+ raise Exception('Failed to configure plain-text security')
- :return: (bytes) octet string to send
- """
- s = struct.Struct('!II')
+ def endpoint_failure(_results):
+ raise Exception('Failed to complete endpoint setup')
- # Check if length is prepended (32-bits = 4 bytes ASCII)
- msglen = len(msg)
- assert msglen == 40*2 or msglen == 44*2, 'Invalid OMCI message length'
+ self.auth = TwistedZmqAuthenticator()
- if len(msg) > 40*2:
- msg = msg[:40*2]
+ d = self.auth.start()
+ d.addCallbacks(configure_plain, config_failure)
+ d.addCallbacks(add_endoints, endpoint_failure)
- return s.pack(pon_index, onu_id) + binascii.unhexlify(msg)
+ return d
- @staticmethod
- def _encode_omci_message_json(msg, pon_index, onu_id):
- """
- Create an OMCI Tx Packet for the specified ONU
-
- :param msg: (str) OMCI message to send
- :param pon_index: (unsigned int) PON Port index
- :param onu_id: (unsigned int) ONU ID
-
- :return: (bytes) octet string to send
- """
-
- return json.dumps({"operation": "NOTIFY",
- "url": "adtran-olt-pon-control/omci-message",
- "pon-id": pon_index,
- "onu-id": onu_id,
- "message-contents": msg.decode("hex").encode("base64")
- })
-
- @staticmethod
- def decode_pon_agent_packet(packet, is_async_control):
- """
- Decode the PON-Agent packet provided by the ZMQ client
-
- :param packet: (bytes) Packet
- :param is_async_control: (bool) Newer async/JSON support
- :return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
- and a flag indicating if it is OMCI
- """
- return AdtranZmqClient._decode_omci_message_json(packet) if is_async_control \
- else AdtranZmqClient._decode_omci_message_legacy(packet)
-
- @staticmethod
- def _decode_omci_message_legacy(packet):
- """
- Decode the packet provided by the ZMQ client (binary legacy format)
-
- :param packet: (bytes) Packet
- :return: (long, long, bytes) PON Index, ONU ID, OMCI Frame Contents
- """
- (pon_index, onu_id) = struct.unpack_from('!II', packet)
- omci_msg = packet[8:]
-
- return pon_index, onu_id, omci_msg, True
-
- @staticmethod
- def _decode_omci_message_json(packet):
- """
- Decode the packet provided by the ZMQ client (JSON format)
-
- :param packet: (string) Packet
- :return: (long, long, bytes) PON Index, ONU ID, OMCI Frame Contents
- """
- msg = json.loads(packet)
- pon_id = msg['pon-id']
- onu_id = msg['onu-id']
- msg_data = msg['message-contents'].decode("base64").encode("hex")
- is_omci = msg['operation'] == "NOTIFY" and 'omci-message' in msg['url']
-
- return pon_id, onu_id, msg_data, is_omci
-
- @staticmethod
- def decode_packet_in_message(packet):
- from scapy.layers.l2 import Ether
- try:
- message = json.loads(packet)
- log.debug('message', message=message)
-
- for field in ['url', 'evc-map-name', 'total-len', 'port-number', 'message-contents']:
- assert field in message, "Missing field '{}' in received packet".format(field)
-
- decoded = message['message-contents'].decode('base64')
- assert len(decoded.encode('hex')) == message['total-len'], \
- 'Decoded length ({}) != Message Encoded lenght ({})'.\
- format(len(decoded.encode('hex')), message['total-len'])
-
- return message['port-number'], message['evc-map'], Ether(decoded)
-
- except Exception as e:
- log.exception('decode', e=e)
- raise
+ def setup_curve_security(self):
+ self.log.debug('setup-curve-security')
+ raise NotImplementedError('TODO: curve transport security is not yet supported')
class ZmqPairConnection(ZmqConnection):
@@ -248,6 +156,7 @@
###############################################################################################
###############################################################################################
+
def _inherit_docstrings(cls):
"""inherit docstrings from Authenticator, so we don't duplicate them"""
for name, method in cls.__dict__.items():
@@ -258,11 +167,13 @@
method.__doc__ = upstream_method.__doc__
return cls
+
@_inherit_docstrings
class TwistedZmqAuthenticator(object):
"""Run ZAP authentication in a background thread but communicate via Twisted ZMQ"""
def __init__(self, encoding='utf-8'):
+ self.log = structlog.get_logger()
self.context = zmq_factory.context
self.encoding = encoding
self.pipe = None
@@ -274,21 +185,21 @@
self.pipe.send([b'ALLOW'] + [b(a, self.encoding) for a in addresses])
except Exception as e:
- log.exception('allow', e=e)
+ self.log.exception('allow', e=e)
def deny(self, *addresses):
try:
self.pipe.send([b'DENY'] + [b(a, self.encoding) for a in addresses])
except Exception as e:
- log.exception('deny', e=e)
+ self.log.exception('deny', e=e)
def configure_plain(self, domain='*', passwords=None):
try:
self.pipe.send([b'PLAIN', b(domain, self.encoding), jsonapi.dumps(passwords or {})])
except Exception as e:
- log.exception('configure-plain', e=e)
+ self.log.exception('configure-plain', e=e)
def configure_curve(self, domain='*', location=''):
try:
@@ -297,7 +208,7 @@
self.pipe.send([b'CURVE', domain, location])
except Exception as e:
- log.exception('configure-curve', e=e)
+ self.log.exception('configure-curve', e=e)
def start(self, rx_callback=AdtranZmqClient.rx_nop):
"""Start the authentication thread"""
@@ -316,7 +227,7 @@
self.thread, timeout=10)
except Exception as e:
- log.exception('start', e=e)
+ self.log.exception('start', e=e)
@staticmethod
def _do_thread_start(thread, timeout=10):
@@ -366,6 +277,7 @@
def __init__(self, context, endpoint, encoding='utf-8', authenticator=None):
super(LocalAuthenticationThread, self).__init__(name='0mq Authenticator')
+ self.log = structlog.get_logger()
self.context = context or zmq.Context.instance()
self.encoding = encoding
self.started = Event()
@@ -403,7 +315,7 @@
self.authenticator.stop()
except Exception as e:
- log.exception("run", e=e)
+ self.log.exception("run", e=e)
def _handle_zap(self):
"""
@@ -428,21 +340,21 @@
return terminate
command = msg[0]
- log.debug("auth received API command", command=command)
+ self.log.debug("auth received API command", command=command)
if command == b'ALLOW':
addresses = [u(m, self.encoding) for m in msg[1:]]
try:
self.authenticator.allow(*addresses)
except Exception as e:
- log.exception("Failed to allow", addresses=addresses, e=e)
+ self.log.exception("Failed to allow", addresses=addresses, e=e)
elif command == b'DENY':
addresses = [u(m, self.encoding) for m in msg[1:]]
try:
self.authenticator.deny(*addresses)
except Exception as e:
- log.exception("Failed to deny", addresses=addresses, e=e)
+ self.log.exception("Failed to deny", addresses=addresses, e=e)
elif command == b'PLAIN':
domain = u(msg[1], self.encoding)
@@ -462,6 +374,6 @@
terminate = True
else:
- log.error("Invalid auth command from API", command=command)
+ self.log.error("Invalid auth command from API", command=command)
return terminate
diff --git a/voltha/adapters/adtran_olt/net/pio_zmq.py b/voltha/adapters/adtran_olt/net/pio_zmq.py
new file mode 100644
index 0000000..2fd4048
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/pio_zmq.py
@@ -0,0 +1,125 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import random
+from adtran_zmq import AdtranZmqClient
+from enum import IntEnum
+
+DEFAULT_PIO_TCP_PORT = 5657
+
+
+class PioClient(AdtranZmqClient):
+ """
+ Adtran ZeroMQ Client for packet in/out service
+ """
+ def __init__(self, ip_address, rx_callback, port):
+ super(PioClient, self).__init__(ip_address, rx_callback, port)
+ self._seq_number = random.randint(1, 2**32)
+
+ class UrlType(IntEnum):
+ PACKET_IN = 0 # Packet In
+ PACKET_OUT = 1 # Packet Out
+ EVCMAPS_REQUEST = 2 # EVC-MAPs request
+ EVCMAPS_RESPONSE = 3 # EVC-MAPs response
+ UNKNOWN = 4 # UNKNOWN URL
+
+ def get_url_type(self, packet):
+ url_type = PioClient.UrlType.UNKNOWN
+ message = json.loads(packet)
+ if 'url' in message:
+ if message['url'] == 'adtran-olt-of-control/packet-in':
+ url_type = PioClient.UrlType.PACKET_IN
+ elif message['url'] == 'adtran-olt-of-control/packet-out':
+ url_type = PioClient.UrlType.PACKET_OUT
+ elif message['url'] == 'adtran-olt-of-control/evc-map-response':
+ url_type = PioClient.UrlType.EVCMAPS_RESPONSE
+ elif message['url'] == 'adtran-olt-of-control/evc-map-request':
+ url_type = PioClient.UrlType.EVCMAPS_REQUEST
+ return url_type
+
+ def decode_packet(self, packet):
+ from scapy.layers.l2 import Ether
+ try:
+ message = json.loads(packet)
+ self.log.debug('message', message=message)
+
+ for field in ['url', 'evc-map-name', 'total-len', 'port-number', 'message-contents']:
+ assert field in message, "Missing field '{}' in received packet".format(field)
+
+ decoded = message['message-contents'].decode('base64')
+
+ assert len(decoded.encode('hex'))/2 == message['total-len'], \
+ 'Decoded length ({}) != Message Encoded length ({})'.\
+ format(len(decoded.encode('hex')), message['total-len'])
+
+ return int(message['port-number']), message['evc-map-name'], Ether(decoded)
+
+ except Exception as e:
+ self.log.exception('decode', e=e)
+ raise
+
+ @property
+ def sequence_number(self):
+ if self._seq_number >= 2**32:
+ self._seq_number = 0
+ else:
+ self._seq_number += 1
+
+ return self._seq_number
+
+ def encode_packet(self, egress_port, packet, map_name='TODO', exception_type=''):
+ """
+ Encode a message for transmission as a Packet Out
+ :param egress_port: (int) egress physical port number
+ :param packet: (str) actual message
+ :param map_name: (str) EVC-MAP Name
+ :param exception_type: (str) Type of exception
+ """
+ return json.dumps({
+ 'url': 'adtran-olt-of-control/packet-out',
+ 'buffer-id': self.sequence_number,
+ 'total-len': len(packet),
+ 'evc-map-name': map_name,
+ 'exception-type': exception_type,
+ 'port-number': egress_port,
+ 'message-contents': packet.encode('base64')
+ })
+
+ def query_request_packet(self):
+ """
+ Create query-request to get all installed exceptions
+ :return: Request string
+ """
+ return json.dumps({
+ 'url': 'adtran-olt-of-control/evc-map-request'
+ })
+
+ def decode_query_response_packet(self, packet, map_name=None):
+ """
+ Create query-request to get all installed exceptions
+ :param map_name: (str) EVC-MAP Name (None=all)
+ :param packet: returned query response packet
+ :return: list of evcmaps and associated exceptions
+ """
+ from scapy.layers.l2 import Ether
+ message = json.loads(packet)
+ self.log.debug('message', message=message)
+
+ if 'url' in message and message['url'] == 'adtran-olt-of-control/evc-map-response':
+ maps=message['evc-map-list']
+ if maps is not None:
+ self.log.debug('evc-maps-query-response', maps=maps)
+ return maps
+ return []
diff --git a/voltha/adapters/adtran_olt/net/pon_zmq.py b/voltha/adapters/adtran_olt/net/pon_zmq.py
new file mode 100644
index 0000000..3cea8e2
--- /dev/null
+++ b/voltha/adapters/adtran_olt/net/pon_zmq.py
@@ -0,0 +1,127 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import struct
+import binascii
+from adtran_zmq import AdtranZmqClient
+
+DEFAULT_PON_AGENT_TCP_PORT = 5656
+
+
+class PonClient(AdtranZmqClient):
+ """
+ Adtran ZeroMQ Client for PON Agent service
+ """
+ def __init__(self, ip_address, rx_callback, port):
+ super(PonClient, self).__init__(ip_address, rx_callback, port)
+
+ def encode_omci_packet(self, msg, pon_index, onu_id, is_async_control):
+ """
+ Create an OMCI Tx Packet for the specified ONU
+
+ :param msg: (str) OMCI message to send
+ :param pon_index: (unsigned int) PON Port index
+ :param onu_id: (unsigned int) ONU ID
+ :param is_async_control: (bool) Newer async/JSON support
+
+ :return: (bytes) octet string to send
+ """
+ assert msg, 'No message provided'
+
+ return PonClient._encode_omci_message_json(msg, pon_index, onu_id) \
+ if is_async_control else \
+ PonClient._encode_omci_message_legacy(msg, pon_index, onu_id)
+
+ @staticmethod
+ def _encode_omci_message_legacy(msg, pon_index, onu_id):
+ """
+ Create an OMCI Tx Packet for the specified ONU
+
+ :param msg: (str) OMCI message to send
+ :param pon_index: (unsigned int) PON Port index
+ :param onu_id: (unsigned int) ONU ID
+
+ :return: (bytes) octet string to send
+ """
+ s = struct.Struct('!II')
+
+ # Check if length is prepended (32-bits = 4 bytes ASCII)
+ msglen = len(msg)
+ assert msglen == 40*2 or msglen == 44*2, 'Invalid OMCI message length'
+
+ if len(msg) > 40*2:
+ msg = msg[:40*2]
+
+ return s.pack(pon_index, onu_id) + binascii.unhexlify(msg)
+
+ @staticmethod
+ def _encode_omci_message_json(msg, pon_index, onu_id):
+ """
+ Create an OMCI Tx Packet for the specified ONU
+
+ :param msg: (str) OMCI message to send
+ :param pon_index: (unsigned int) PON Port index
+ :param onu_id: (unsigned int) ONU ID
+
+ :return: (bytes) octet string to send
+ """
+
+ return json.dumps({"operation": "NOTIFY",
+ "url": "adtran-olt-pon-control/omci-message",
+ "pon-id": pon_index,
+ "onu-id": onu_id,
+ "message-contents": msg.decode("hex").encode("base64")
+ })
+
+ def decode_packet(self, packet, is_async_control):
+ """
+ Decode the PON-Agent packet provided by the ZMQ client
+
+ :param packet: (bytes) Packet
+ :param is_async_control: (bool) Newer async/JSON support
+ :return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
+ and a flag indicating if it is OMCI
+ """
+ return PonClient._decode_omci_message_json(packet) if is_async_control \
+ else PonClient._decode_omci_message_legacy(packet)
+
+ @staticmethod
+ def _decode_omci_message_legacy(packet):
+ """
+ Decode the packet provided by the ZMQ client (binary legacy format)
+
+ :param packet: (bytes) Packet
+ :return: (long, long, bytes) PON Index, ONU ID, OMCI Frame Contents
+ """
+ (pon_index, onu_id) = struct.unpack_from('!II', packet)
+ omci_msg = packet[8:]
+
+ return pon_index, onu_id, omci_msg, True
+
+ @staticmethod
+ def _decode_omci_message_json(packet):
+ """
+ Decode the packet provided by the ZMQ client (JSON format)
+
+ :param packet: (string) Packet
+ :return: (long, long, bytes) PON Index, ONU ID, OMCI Frame Contents
+ """
+ msg = json.loads(packet)
+ pon_id = msg['pon-id']
+ onu_id = msg['onu-id']
+ msg_data = msg['message-contents'].decode("base64")
+ is_omci = msg['operation'] == "NOTIFY" and 'omci-message' in msg['url']
+
+ return pon_id, onu_id, msg_data, is_omci
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index c3f4eb3..e836004 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -18,7 +18,7 @@
import structlog
import xmltodict
-from enum import Enum
+from port import AdtnPort
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
from twisted.python.failure import Failure
@@ -29,51 +29,36 @@
from voltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
-class NniPort(object):
+class NniPort(AdtnPort):
"""
- A class similar to the 'Port' class in the VOLTHA
-
- TODO: Merge this with the Port class or cleanup where possible
- so we do not duplicate fields/properties/methods
+ Northbound network port, often Ethernet-based
"""
- class State(Enum):
- INITIAL = 0 # Created and initialization in progress
- RUNNING = 1 # PON port contacted, ONU discovery active
- STOPPED = 2 # Disabled
- DELETING = 3 # Cleanup
-
def __init__(self, parent, **kwargs):
+ super(NniPort, self).__init__(parent, **kwargs)
+
# TODO: Weed out those properties supported by common 'Port' object
- assert parent, 'parent is None'
- assert 'port_no' in kwargs, 'Port number not found'
self.log = structlog.get_logger(port_no=kwargs.get('port_no'))
self.log.info('creating')
- self._port_no = kwargs.get('port_no')
self._name = kwargs.get('name', 'nni-{}'.format(self._port_no))
- self._port = None
- self._logical_port = None
- self._parent = parent
- self._sync_tick = 10.0
- self._sync_deferred = None
+ self._logical_port = None
+
+ self.sync_tick = 10.0
self._stats_tick = 5.0
self._stats_deferred = None
- self._deferred = None
- self._state = NniPort.State.INITIAL
-
# Local cache of NNI configuration
- self._enabled = None
self._ianatype = '<type xmlns:ianaift="urn:ietf:params:xml:ns:yang:iana-if-type">ianaift:ethernetCsmacd</type>'
# And optional parameters
# TODO: Currently cannot update admin/oper status, so create this enabled and active
# self._admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
# self._oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
+ self._enabled = True
self._admin_state = AdminState.ENABLED
self._oper_status = OperStatus.ACTIVE
@@ -89,10 +74,6 @@
self._device_port_no = kwargs.pop('device_port_no', self._port_no)
# Statistics
- self.rx_packets = 0
- self.rx_bytes = 0
- self.tx_packets = 0
- self.tx_bytes = 0
self.rx_dropped = 0
self.rx_errors = 0
self.rx_bcast = 0
@@ -101,76 +82,12 @@
self.tx_bcast = 0
self.tx_mcast = 0
- def __del__(self):
- self.stop()
-
def __str__(self):
return "NniPort-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
self._admin_state,
self._oper_status,
self._parent)
- @property
- def port_no(self):
- return self._port_no
-
- @property
- def name(self):
- return self._name
-
- @property
- def olt(self):
- return self._parent
-
- @property
- def state(self):
- return self._state
-
- @property
- def admin_state(self):
- return self._admin_state
-
- @property
- def oper_status(self):
- return self._oper_status
-
- @property
- def adapter_agent(self):
- return self.olt.adapter_agent
-
- @property
- def iana_type(self):
- return self._ianatype
-
- @property
- def enabled(self):
- return self._enabled
-
- @enabled.setter
- def enabled(self, value):
- assert isinstance(value, bool), 'enabled is a boolean'
- if self._enabled != value:
- if value:
- self.start()
- self.stop()
-
- def _cancel_deferred(self):
- d1, self._deferred = self._deferred, None
- d2, self._sync_deferred = self._sync_deferred, None
- d3, self._stats_deferred = self._stats_deferred, None
-
- for d in [d1, d2, d3]:
- try:
- if d is not None and d.called:
- d.cancel()
- except:
- pass
-
- def _update_adapter_agent(self):
- # TODO: Currently the adapter_agent does not allow 'update' of port status
- # self.adapter_agent.update_port(self.olt.device_id, self.get_port())
- pass
-
def get_port(self):
"""
Get the VOLTHA PORT object for this port
@@ -184,6 +101,25 @@
oper_status=self._oper_status)
return self._port
+ @property
+ def iana_type(self):
+ return self._ianatype
+
+ def cancel_deferred(self):
+ super(NniPort, self).cancel_deferred()
+
+ d, self._stats_deferred = self._stats_deferred, None
+ try:
+ if d is not None and d.called:
+ d.cancel()
+ except:
+ pass
+
+ def _update_adapter_agent(self):
+ # TODO: Currently the adapter_agent does not allow 'update' of port status
+ # self.adapter_agent.update_port(self.olt.device_id, self.get_port())
+ pass
+
def get_logical_port(self):
"""
Get the VOLTHA logical port for this port
@@ -208,96 +144,37 @@
root_port=True)
return self._logical_port
- def start(self):
- """
- Start/enable this NNI
- :return: (deferred)
- """
- if self._state == NniPort.State.RUNNING:
- return succeed('Running')
-
- self.log.info('starting')
- self._cancel_deferred()
-
- self._oper_status = OperStatus.ACTIVATING
- self._update_adapter_agent()
-
- # Do the rest of the startup in an async method
- self._deferred = reactor.callLater(0, self._finish_startup)
- return succeed('Scheduled')
-
@inlineCallbacks
- def _finish_startup(self):
- if self._state != NniPort.State.INITIAL:
+ def finish_startup(self):
+
+ if self.state != AdtnPort.State.INITIAL:
returnValue('Done')
- self._enabled = True
- self._admin_state = AdminState.ENABLED
- self._oper_status = OperStatus.ACTIVE
- self._update_adapter_agent()
-
# TODO: Start status polling of NNI interfaces
- self._deferred = None # = reactor.callLater(3, self.do_stuff)
- self._state = NniPort.State.RUNNING
+ self.deferred = None # = reactor.callLater(3, self.do_stuff)
+ self.state = AdtnPort.State.RUNNING
- # Begin hardware sync
- self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
- self._stats_deferred= reactor.callLater(self._stats_tick * 2, self._update_statistics)
+ # Begin statistics sync
+ self._stats_deferred = reactor.callLater(self._stats_tick * 2, self._update_statistics)
try:
- results = yield self.set_config('enabled', True)
+ yield self.set_config('enabled', True)
+
+ super(NniPort, self).finish_startup()
except Exception as e:
self.log.exception('nni-start', e=e)
self._oper_status = OperStatus.UNKNOWN
self._update_adapter_agent()
+ returnValue('Enabled')
- returnValue(self._deferred)
-
- @inlineCallbacks
- def stop(self):
- if self._state == NniPort.State.STOPPED:
- returnValue('Stopped')
-
- self.log.info('stopping')
- self._cancel_deferred()
+ def finish_stop(self):
# NOTE: Leave all NNI ports active (may have inband management)
# TODO: Revisit leaving NNI Ports active on disable
- self._enabled = None
- self._state = NniPort.State.STOPPED
-
- self._admin_state = AdminState.DISABLED
- self._oper_status = OperStatus.UNKNOWN
- self._update_adapter_agent()
-
- try:
- yield self.set_config('enabled', False)
-
- except Exception as e:
- self.log.exception('nni-stop', e=e)
- self._admin_state = AdminState.UNKNOWN
- raise
-
- returnValue(self._deferred)
-
- def restart(self):
- if self._state == NniPort.State.RUNNING or self._state == NniPort.State.STOPPED:
- start_it = (self._state == NniPort.State.RUNNING)
- self._state = NniPort.State.INITIAL
- return self.start() if start_it else self.stop()
- return succeed('nop')
-
- def delete(self):
- """
- Parent device is being deleted. Do not change any config but
- stop all polling
- """
- self.log.info('deleting', label=self._label)
- self._state = NniPort.State.DELETING
- self._cancel_deferred()
+ return self.set_config('enabled', False)
@inlineCallbacks
def reset(self):
@@ -305,8 +182,8 @@
Set the NNI Port to a known good state on initial port startup. Actual
NNI 'Start' is done elsewhere
"""
- if self._state != NniPort.State.INITIAL:
- self.log.error('reset-ignored', state=self._state)
+ if self.state != AdtnPort.State.INITIAL:
+ self.log.error('reset-ignored', state=self.state)
returnValue('Ignored')
self.log.info('resetting', label=self._label)
@@ -368,8 +245,8 @@
'</filter>'
return self._parent.netconf_client.get(state)
- def _sync_hardware(self):
- if self._state == NniPort.State.RUNNING or self._state == NniPort.State.STOPPED:
+ def sync_hardware(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
def read_config(results):
self.log.debug('read-config', results=results)
try:
@@ -394,13 +271,13 @@
self.log.error('hardware-sync-failed', reason=reason)
def reschedule(_):
- delay = self._sync_tick
+ delay = self.sync_tick
delay += random.uniform(-delay / 10, delay / 10)
- self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
+ self.sync_deferred = reactor.callLater(delay, self.sync_hardware)
- self._sync_deferred = self.get_nni_config()
- self._sync_deferred.addCallbacks(read_config, failure)
- self._sync_deferred.addBoth(reschedule)
+ self.sync_deferred = self.get_nni_config()
+ self.sync_deferred.addCallbacks(read_config, failure)
+ self.sync_deferred.addBoth(reschedule)
def _decode_nni_statistics(self, entry):
admin_status = entry.get('admin-status')
@@ -425,7 +302,7 @@
self.tx_packets = int(stats.get('out-unicast-pkts', 0)) + self.tx_mcast + self.tx_bcast
def _update_statistics(self):
- if self._state == NniPort.State.RUNNING:
+ if self.state == AdtnPort.State.RUNNING:
def read_state(results):
self.log.debug('read-state', results=results)
try:
@@ -490,13 +367,45 @@
"""
return GetReply(raw)
+ @staticmethod
+ def get_pon_port_state_results():
+ from ncclient.operations.retrieve import GetReply
+ raw = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
+ xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0"
+ message-id="urn:uuid:59e71979-01bb-462f-b17a-b3a45e1889ac">
+ <data>
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface><name>XPON 0/1</name></interface>
+ <interface><name>XPON 0/2</name></interface>
+ <interface><name>XPON 0/3</name></interface>
+ <interface><name>XPON 0/4</name></interface>
+ <interface><name>XPON 0/5</name></interface>
+ <interface><name>XPON 0/6</name></interface>
+ <interface><name>XPON 0/7</name></interface>
+ <interface><name>XPON 0/8</name></interface>
+ <interface><name>XPON 0/9</name></interface>
+ <interface><name>XPON 0/10</name></interface>
+ <interface><name>XPON 0/11</name></interface>
+ <interface><name>XPON 0/12</name></interface>
+ <interface><name>XPON 0/13</name></interface>
+ <interface><name>XPON 0/14</name></interface>
+ <interface><name>XPON 0/15</name></interface>
+ <interface><name>XPON 0/16</name></interface>
+ </interfaces-state>
+ </data>
+ </rpc-reply>
+ """
+ return GetReply(raw)
+
def reset(self):
"""
Set the NNI Port to a known good state on initial port startup. Actual
NNI 'Start' is done elsewhere
"""
- if self._state != NniPort.State.INITIAL:
- self.log.error('reset-ignored', state=self._state)
+ if self.state != AdtnPort.State.INITIAL:
+ self.log.error('reset-ignored', state=self.state)
return fail()
self.log.info('resetting', label=self._label)
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index 3eaf2f2..bb31b61 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -22,16 +22,6 @@
from adtran_olt_handler import AdtranOltHandler
from net.adtran_rest import RestInvalidResponseCode
-# Following is only used in autoactivate/demo mode. Otherwise xPON commands should be used
-_VSSN_TO_VENDOR = {
- 'ADTN': 'adtran_onu',
- 'BRCM': 'broadcom_onu',
- 'DP??': 'dpoe_onu', # TODO: Get actual VSSN for this vendor
- 'PMCS': 'pmcs_onu',
- 'PSMO': 'ponsim_onu',
- 'TBIT': 'tibit_onu',
-}
-
_MAX_EXPEDITE_COUNT = 5
_EXPEDITE_SECS = 2
_HW_SYNC_SECS = 30
@@ -63,7 +53,8 @@
self._gem_ports = {} # gem-id -> GemPort
self._tconts = {} # alloc-id -> TCont
self._onu_vid = onu_info['onu-vid']
- self._uni_ports = [onu_info['onu-vid']]
+ self.untagged_vlan = self._onu_vid
+ self._uni_ports = [onu_info['onu-vid']] # TODO: Get rid of this
assert len(self._uni_ports) == 1, 'Only one UNI port supported at this time'
self._channel_id = onu_info['channel-id']
self._enabled = onu_info['enabled']
@@ -84,9 +75,15 @@
self._resync_flows = False
self._sync_deferred = None # For sync of ONT config to hardware
+ if onu_info['venet'] is not None:
+ port_no, subscriber_vlan, self.untagged_vlan = Onu.decode_venet(onu_info['venet'],
+ self.olt.untagged_vlan)
+ if port_no is not None:
+ self._uni_ports = [port_no]
+ if subscriber_vlan is not None:
+ self._onu_vid = subscriber_vlan
+
self.log = structlog.get_logger(pon_id=self._pon_id, onu_id=self._onu_id)
- self._vendor_id = _VSSN_TO_VENDOR.get(self._serial_number_string.upper()[:4],
- 'Unsupported_{}'.format(self._serial_number_string))
def __del__(self):
# self.stop()
@@ -95,7 +92,40 @@
def __str__(self):
return "ONU-{}:{}, SN: {}/{}".format(self._onu_id, self._pon_id,
self._serial_number_string, self._serial_number_base64)
-
+
+ @staticmethod
+ def decode_venet(venet_info, untagged_vlan):
+ # TODO: Move this one and ONU one into venet decode to dict() area
+ try:
+ # Allow spaces or dashes as separator, select last as the
+ # port number. UNI-1, UNI 1, and UNI 3-2-1 are the same
+ port_no = int(venet_info['name'].replace(' ', '-').split('-')[-1:][0])
+ subscriber_vlan = port_no
+ try:
+ # Subscriber VLAN and Untagged vlan are comma separated
+ parts = venet_info['description'].split(',')
+ sub_part = next((part for part in parts if 'vlan' in part.lower()), None)
+ untagged_part = next((part for part in parts if 'untagged' in part.lower()), None)
+ try:
+ if sub_part is not None:
+ subscriber_vlan = int(sub_part.split(':')[-1:][0])
+ except Exception as e:
+ pass
+ try:
+ if untagged_part is not None:
+ untagged_vlan = int(untagged_part.split(':')[-1:][0])
+ except Exception as e:
+ pass
+ except Exception as e:
+ pass
+
+ return port_no, subscriber_vlan, untagged_vlan
+
+ except ValueError:
+ pass
+ except KeyError:
+ pass
+
@staticmethod
def serial_number_to_string(value):
sval = base64.decodestring(value)
@@ -208,28 +238,22 @@
device_id = self.olt.device_id
- if self.olt.autoactivate:
- self._proxy_address = Device.ProxyAddress(device_id=device_id,
- channel_id=self.onu_vid,
- channel_group_id=self.pon.pon_id,
- onu_id=self.onu_id)
- else:
- try:
- v_ont_ani = self._vont_ani
- voltha_core = self.olt.adapter_agent.core
- xpon_agent = voltha_core.xpon_agent
- channel_group_id = xpon_agent.get_channel_group_for_vont_ani(v_ont_ani)
- parent_chnl_pair_id = xpon_agent.get_port_num(device_id,
- v_ont_ani.data.preferred_chanpair)
- self._proxy_address = Device.ProxyAddress(
- device_id=device_id,
- channel_group_id=channel_group_id,
- channel_id=parent_chnl_pair_id,
- channel_termination=v_ont_ani.data.preferred_chanpair,
- onu_id=self.onu_id,
- onu_session_id=self.onu_id)
- except Exception:
- pass
+ try:
+ v_ont_ani = self._vont_ani
+ voltha_core = self.olt.adapter_agent.core
+ xpon_agent = voltha_core.xpon_agent
+ channel_group_id = xpon_agent.get_channel_group_for_vont_ani(v_ont_ani)
+ parent_chnl_pair_id = xpon_agent.get_port_num(device_id,
+ v_ont_ani.data.preferred_chanpair)
+ self._proxy_address = Device.ProxyAddress(
+ device_id=device_id,
+ channel_group_id=channel_group_id,
+ channel_id=parent_chnl_pair_id,
+ channel_termination=v_ont_ani.data.preferred_chanpair,
+ onu_id=self.onu_id,
+ onu_session_id=self.onu_id)
+ except Exception:
+ pass
return self._proxy_address
@@ -262,10 +286,6 @@
return self._serial_number_string
@property
- def vendor_id(self):
- return self._vendor_id
-
- @property
def rssi(self):
"""The received signal strength indication of the ONU"""
return self._rssi
@@ -333,6 +353,19 @@
except Exception as e: # TODO: Add breakpoint here during unexpected reboot test
self.log.exception('onu-create', e=e)
+ # See if it failed due to already being configured
+ url = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self._onu_id)
+ url += '/serial-number'
+
+ try:
+ results = yield self.olt.rest_client.request('GET', uri, name=name)
+ self.log.debug('onu-create-check', results=results)
+ if len(results) != 1 or results[0].get('serial-number', '') != self._serial_number_base64:
+ raise e
+
+ except Exception as e:
+ self.log.exception('onu-exists-check', e=e)
+ raise
# Now set up all tconts & gem-ports
first_sync = self._sync_tick
@@ -359,7 +392,6 @@
# Recalculate PON upstream FEC
self.pon.upstream_fec_enable = self.pon.any_upstream_fec_enabled
-
returnValue('created')
@inlineCallbacks
@@ -727,15 +759,19 @@
def gem_port(self, gem_id):
return self._gem_ports.get(gem_id)
- def gem_ids(self, exception_gems):
+ def gem_ids(self, untagged_gem, exception_gems): # FIXED_ONU
"""Get all GEM Port IDs used by this ONU"""
if exception_gems:
gem_ids = sorted([gem_id for gem_id, gem in self._gem_ports.items()
- if gem.exception and not gem.multicast]) # FIXED_ONU
+ if gem.exception and not gem.multicast])
+ return gem_ids
+ elif untagged_gem:
+ gem_ids = sorted([gem_id for gem_id, gem in self._gem_ports.items()
+ if gem.untagged and not gem.exception and not gem.multicast])
return gem_ids
else:
return sorted([gem_id for gem_id, gem in self._gem_ports.items()
- if not gem.multicast and not gem.exception]) # FIXED_ONU
+ if not gem.multicast and not gem.exception and not gem.untagged])
@inlineCallbacks
def add_gem_port(self, gem_port, reflow=False):
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index 3b2334e..bb02379 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -16,7 +16,7 @@
import random
import structlog
-from enum import Enum
+from port import AdtnPort
from twisted.internet import reactor, defer
from twisted.internet.defer import inlineCallbacks, returnValue, succeed
@@ -29,12 +29,9 @@
from voltha.protos.device_pb2 import Port
-class PonPort(object):
+class PonPort(AdtnPort):
"""
- A class similar to the 'Port' class in the VOLTHA
-
- TODO: Merge this with the Port class or cleanup where possible
- so we do not duplicate fields/properties/methods
+ GPON Port
"""
MAX_ONUS_SUPPORTED = 256
DEFAULT_ENABLED = False
@@ -43,74 +40,52 @@
_MCAST_ONU_ID = 253
_MCAST_ALLOC_BASE = 0x500
- class State(Enum):
- INITIAL = 0 # Created and initialization in progress
- RUNNING = 1 # PON port contacted, ONU discovery active
- STOPPED = 2 # Disabled
- DELETING = 3 # Cleanup
-
- _SUPPORTED_ACTIVATION_METHODS = ['autodiscovery', 'autoactivate']
+ _SUPPORTED_ACTIVATION_METHODS = ['autodiscovery'] # , 'autoactivate']
_SUPPORTED_AUTHENTICATION_METHODS = ['serial-number']
- def __init__(self, pon_index, port_no, parent):
- # TODO: Weed out those properties supported by common 'Port' object (future)
- self.log = structlog.get_logger(device_id=parent.device_id, pon_id=pon_index)
+ def __init__(self, parent, **kwargs):
+
+ super(PonPort, self).__init__(parent, **kwargs)
+
+ assert 'pon-id' in kwargs, 'PON ID not found'
self._parent = parent
- self._pon_id = pon_index
- self._port_no = port_no
- self._name = 'xpon 0/{}'.format(pon_index+1)
- self._label = 'pon-{}'.format(pon_index)
- self._port = None
- self._discovery_tick = 20.0
- self._no_onu_discover_tick = self._discovery_tick / 2
- self._discovered_onus = [] # List of serial numbers
- self._sync_tick = 20.0
+ self._pon_id = kwargs['pon-id']
+ self.log = structlog.get_logger(device_id=parent.device_id, pon_id=self._pon_id)
+ self._port_no = kwargs['port_no']
+ self._name = 'xpon 0/{}'.format(self._pon_id+1)
+ self._label = 'pon-{}'.format(self._pon_id)
+
self._in_sync = False
self._expedite_sync = False
self._expedite_count = 0
+ self._discovery_tick = 20.0
+ self._no_onu_discover_tick = self._discovery_tick / 2
+ self._discovered_onus = [] # List of serial numbers
+
self._onus = {} # serial_number-base64 -> ONU (allowed list)
self._onu_by_id = {} # onu-id -> ONU
self._next_onu_id = Onu.MIN_ONU_ID + 128
self._mcast_gem_ports = {} # VLAN -> GemPort
- self._admin_state = AdminState.DISABLED
- self._oper_status = OperStatus.DISCOVERED
- self._state = PonPort.State.INITIAL
- self._deferred = None # General purpose
self._discovery_deferred = None # Specifically for ONU discovery
- self._sync_deferred = None # For sync of PON config to hardware
self._active_los_alarms = set() # ONU-ID
# xPON configuration
self._xpon_name = None
- self._enabled = False
self._downstream_fec_enable = False
self._upstream_fec_enable = False
self._deployment_range = 25000
self._authentication_method = 'serial-number'
self._mcast_aes = False
self._line_rate = 'down_10_up_10'
-
- if self.olt.autoactivate:
- # Enable PON on startup
- self._activation_method = 'autoactivate'
- self._admin_state = AdminState.ENABLED
- else:
- self._activation_method = 'autodiscovery'
+ self._activation_method = 'autodiscovery'
# Statistics
- self.rx_packets = 0
- self.rx_bytes = 0
- self.tx_packets = 0
- self.tx_bytes = 0
self.tx_bip_errors = 0
- def __del__(self):
- self.stop()
-
def __str__(self):
return "PonPort-{}: Admin: {}, Oper: {}, OLT: {}".format(self._label,
self._admin_state,
@@ -134,14 +109,6 @@
return self._port
@property
- def port_no(self):
- return self._port_no
-
- @property
- def name(self):
- return self._name
-
- @property
def xpon_name(self):
return self._xpon_name
@@ -155,10 +122,6 @@
return self._pon_id
@property
- def olt(self):
- return self._parent
-
- @property
def onus(self):
"""
Get a set of all ONUs. While the set is immutable, do not use this method
@@ -180,27 +143,6 @@
return self._onu_by_id.get(onu_id)
@property
- def state(self):
- return self._state
-
- @property
- def admin_state(self):
- return self._admin_state
-
- @admin_state.setter
- def admin_state(self, value):
- if self._admin_state != value:
- self._admin_state = value
- if self._admin_state == AdminState.ENABLED:
- self.start()
- else:
- self.stop()
-
- @property
- def oper_status(self):
- return self._oper_status
-
- @property
def in_service_onus(self):
return len({onu.onu_id for onu in self.onus
if onu.onu_id not in self._active_los_alarms})
@@ -214,22 +156,6 @@
return distance
@property
- def adapter_agent(self):
- return self.olt.adapter_agent
-
- @property
- def enabled(self):
- return self._enabled
-
- @enabled.setter
- def enabled(self, value):
- assert isinstance(value, bool), 'enabled is a boolean'
- if self._enabled != value:
- if value:
- self.start()
- self.stop()
-
- @property
def downstream_fec_enable(self):
return self._downstream_fec_enable
@@ -239,8 +165,8 @@
if self._downstream_fec_enable != value:
self._downstream_fec_enable = value
- if self._state == PonPort.State.RUNNING:
- self._deferred = self._set_pon_config("downstream-fec-enable", value)
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("downstream-fec-enable", value)
@property
def upstream_fec_enable(self):
@@ -251,8 +177,8 @@
assert isinstance(value, bool), 'upstream FEC enabled is a boolean'
if self._upstream_fec_enable != value:
self._upstream_fec_enable = value
- if self._state == PonPort.State.RUNNING:
- self._deferred = self._set_pon_config("upstream-fec-enable", value)
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("upstream-fec-enable", value)
@property
def any_upstream_fec_enabled(self):
@@ -270,7 +196,7 @@
assert isinstance(value, bool), 'MCAST AES is a boolean'
if self._mcast_aes != value:
self._mcast_aes = value
- if self._state == PonPort.State.RUNNING:
+ if self.state == AdtnPort.State.RUNNING:
pass # TODO
@property
@@ -283,7 +209,7 @@
# TODO cast to enum
if self._line_rate != value:
self._line_rate = value
- if self._state == PonPort.State.RUNNING:
+ if self.state == AdtnPort.State.RUNNING:
pass # TODO
@property
@@ -299,8 +225,8 @@
format(PonPort.MAX_DEPLOYMENT_RANGE))
if self._deployment_range != value:
self._deployment_range = value
- if self._state == PonPort.State.RUNNING:
- self._deferred = self._set_pon_config("deployment-range", value)
+ if self.state == AdtnPort.State.RUNNING:
+ self.deferred = self._set_pon_config("deployment-range", value)
@property
def discovery_tick(self):
@@ -348,70 +274,41 @@
raise ValueError('Invalid ONU authentication method')
self._authentication_method = value
- def get_logical_port(self):
- """
- Get the VOLTHA logical port for this port. For PON ports, a logical port
- is not currently created, so always return None
+ def cancel_deferred(self):
+ super(PonPort, self).cancel_deferred()
- :return: VOLTHA logical port or None if not supported
- """
- return None
+ d, self._discovery_deferred = self._discovery_deferred, None
- def _cancel_deferred(self):
- d1, self._deferred = self._deferred, None
- d2, self._discovery_deferred = self._discovery_deferred, None
- d3, self._sync_deferred = self._sync_deferred, None
-
- for d in [d1, d2, d3]:
- try:
- if d is not None and not d.called:
- d.cancel()
- except Exception as e:
- pass
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception as e:
+ pass
def _update_adapter_agent(self):
# TODO: Currently the adapter_agent does not allow 'update' of port status
# self.adapter_agent.update_port(self.olt.device_id, self.get_port())
pass
- def start(self):
- """
- Start/enable this PON and start ONU discover
- """
- if self._state == PonPort.State.RUNNING:
- return succeed('Running')
-
- self.log.info('start')
-
- self._cancel_deferred()
- self._state = PonPort.State.INITIAL
- self._oper_status = OperStatus.ACTIVATING
- self._enabled = True
-
- # Do the rest of the startup in an async method
- self._deferred = reactor.callLater(0.5, self._finish_startup)
- self._update_adapter_agent()
-
- return succeed('Scheduled')
-
@inlineCallbacks
- def _finish_startup(self):
+ def finish_startup(self):
"""
Do all startup offline since REST may fail
"""
- if self._state != PonPort.State.INITIAL:
+ if self.state != AdtnPort.State.INITIAL:
returnValue('Done')
self.log.debug('final-startup')
+ results = None
try:
- self._deferred = self._get_pon_config()
- results = yield self._deferred
+ self.deferred = self._get_pon_config()
+ results = yield self.deferred
except Exception as e:
self.log.exception('initial-GET', e=e)
- self._deferred = reactor.callLater(5, self._finish_startup)
- returnValue(self._deferred)
+ self.deferred = reactor.callLater(5, self.finish_startup)
+ returnValue(self.deferred)
# Load config from hardware
@@ -423,19 +320,19 @@
if enabled != self._enabled:
try:
- self._deferred = self._set_pon_config("enabled", True)
- yield self._deferred
+ self.deferred = self._set_pon_config("enabled", True)
+ yield self.deferred
except Exception as e:
self.log.exception('final-startup-enable', e=e)
- self._deferred = reactor.callLater(3, self._finish_startup)
- returnValue(self._deferred)
+ self.deferred = reactor.callLater(3, self.finish_startup)
+ returnValue(self.deferred)
if downstream_fec_enable != self._downstream_fec_enable:
try:
- self._deferred = self._set_pon_config("downstream-fec-enable",
- self._downstream_fec_enable)
- yield self._deferred
+ self.deferred = self._set_pon_config("downstream-fec-enable",
+ self._downstream_fec_enable)
+ yield self.deferred
except Exception as e:
self.log.warning('final-startup-downstream-FEC', e=e)
@@ -444,9 +341,9 @@
if upstream_fec_enable != self._upstream_fec_enable:
try:
- self._deferred = self._set_pon_config("upstream-fec-enable",
- self._upstream_fec_enable)
- yield self._deferred
+ self.deferred = self._set_pon_config("upstream-fec-enable",
+ self._upstream_fec_enable)
+ yield self.deferred
except Exception as e:
self.log.warning('final-startup-upstream-FEC', e=e)
@@ -455,23 +352,15 @@
if deployment_range != self._deployment_range:
try:
- self._deferred = self._set_pon_config("deployment-range",
- self._deployment_range)
- yield self._deferred
+ self.deferred = self._set_pon_config("deployment-range",
+ self._deployment_range)
+ yield self.deferred
except Exception as e:
self.log.warning('final-startup-deployment-range', e=e)
self._in_sync = False
# Non-fatal. May have failed due to no SFQ in slot
- # If here, initial settings were successfully written to hardware
-
- self._admin_state = AdminState.ENABLED
- self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
- self._state = PonPort.State.RUNNING
-
- # Restart any ONU's in case here due to reboot
-
if len(self._onus) > 0:
dl = []
for onu_id in self.onu_ids:
@@ -483,41 +372,26 @@
# Begin to ONU discovery and hardware sync
self._discovery_deferred = reactor.callLater(5, self._discover_onus)
- self._sync_deferred = reactor.callLater(60, self._sync_hardware)
- self._update_adapter_agent()
+ # If here, initial settings were successfully written to hardware
+
+ super(PonPort, self).finish_startup()
returnValue('Enabled')
- @inlineCallbacks
- def stop(self):
- if self._state == PonPort.State.STOPPED:
- self.log.debug('already stopped')
- returnValue('Stopped')
-
- self.log.info('stopping')
-
- self._cancel_deferred()
- self._enabled = False
- self._admin_state = AdminState.DISABLED
- self._oper_status = OperStatus.UNKNOWN
- self._update_adapter_agent()
-
- self._state = PonPort.State.STOPPED
-
+ def finish_stop(self):
# Remove all existing ONUs. They will need to be re-discovered
-
+ dl = []
onu_ids = frozenset(self._onu_by_id.keys())
for onu_id in onu_ids:
try:
- yield self.delete_onu(onu_id)
+ dl.append(self.delete_onu(onu_id))
+
except Exception as e:
self.log.exception('onu-cleanup', onu_id=onu_id, e=e)
- results = yield self._set_pon_config("enabled", False)
- self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
+ dl.append(self._set_pon_config("enabled", False))
- self.log.debug('stopped')
- returnValue(results)
+ return defer.gatherResults(dl, consumeErrors=True)
@inlineCallbacks
def reset(self):
@@ -525,16 +399,16 @@
Set the PON Port to a known good state on initial port startup. Actual
PON 'Start' is done elsewhere
"""
- if self._state != PonPort.State.INITIAL:
- self.log.error('reset-ignored', state=self._state)
+ if self.state != AdtnPort.State.INITIAL:
+ self.log.error('reset-ignored', state=self.state)
returnValue('Ignored')
- initial_port_state = AdminState.ENABLED if self.olt.autoactivate else AdminState.DISABLED
+ initial_port_state = AdminState.DISABLED
self.log.info('reset', initial_state=initial_port_state)
try:
- self._deferred = self._get_pon_config()
- results = yield self._deferred
+ self.deferred = self._get_pon_config()
+ results = yield self.deferred
enabled = results.get('enabled', False)
except Exception as e:
@@ -545,7 +419,7 @@
if enable != enabled:
try:
- self._deferred = yield self._set_pon_config("enabled", enable)
+ self.deferred = yield self._set_pon_config("enabled", enable)
except Exception as e:
self.log.exception('reset-enabled', e=e, enabled=enabled)
@@ -567,7 +441,7 @@
yield defer.gatherResults(dl, consumeErrors=True)
except Exception as e:
- self.log.exception('rest-ONU-delete', onu_id=onu_id, e=e)
+ self.log.exception('rest-ONU-delete', e=e)
pass # Non-fatal
except Exception as e:
@@ -575,31 +449,15 @@
returnValue('Reset complete')
- def restart(self):
- if self._state == PonPort.State.RUNNING or self._state == PonPort.State.STOPPED:
- start_it = (self._state == PonPort.State.RUNNING)
- self._state = PonPort.State.INITIAL
-
- return self.start() if start_it else self.stop()
- return succeed('nop')
-
- def delete(self):
- """
- Parent device is being deleted. Do not change any config but
- stop all polling
- """
- self.log.info('Deleting')
- self._state = PonPort.State.DELETING
- self._cancel_deferred()
-
- def gem_ids(self, vid, exception_gems, multicast_gems):
+ def gem_ids(self, logical_port, untagged_gem, exception_gems, multicast_gems): # FIXED_ONU
"""
Get all GEM Port IDs used on a given PON
- :param vid: (int) VLAN ID if customer ONU specific. None if for all ONUs
- on PON, if Multicast, VID for Multicast, or None for all\
+ :param logical_port: (int) Logical port umber of ONU. None if for all ONUs
+ on PON, if Multicast, VID for Multicast, or None for all
Multicast GEMPorts
- :param exception_gems: (boolean) Select from special purpose ACL GEM-Portas
+ :param untagged_gem: (boolean) Select from special purpose untagged GEM Port
+ :param exception_gems: (boolean) Select from special purpose ACL GEM Port
:param multicast_gems: (boolean) Select from available Multicast GEM Ports
:return: (dict) data_gem -> key -> onu-id, value -> tuple(sorted list of GEM Port IDs, onu_vid)
mcast_gem-> key -> mcast-vid, value -> GEM Port IDs
@@ -609,14 +467,15 @@
if multicast_gems:
# Multicast GEMs belong to the PON, but we may need to register them on
# all ONUs. Rework when BBF MCAST Gems are supported
- for vlan, gem_port in self._mcast_gem_ports.iteritems():
- if vid is None or (vid == vlan and vid in self.olt.multicast_vlans):
+ for vlan, gem_port in self._mcast_gem_ports.iteritems(): # TODO: redo logic
+ if logical_port is None or (logical_port == vlan and logical_port in self.olt.multicast_vlans):
gem_ids[vlan] = ([gem_port.gem_id], None)
else:
for onu_id, onu in self._onu_by_id.iteritems():
- if vid is None or vid == onu.onu_vid:
- gem_ids[onu_id] = (onu.gem_ids(exception_gems), onu.onu_vid) # FIXED_ONU
-
+ if logical_port is None or logical_port == onu.logical_port:
+ gem_ids[onu_id] = (onu.gem_ids(untagged_gem, exception_gems),
+ onu.onu_vid if not untagged_gem
+ else self.olt.untagged_vlan)
return gem_ids
def _get_pon_config(self):
@@ -654,7 +513,7 @@
self._onu_discovery_init_complete,
None)
- def _onu_discovery_init_complete(self, _):
+ def _onu_discovery_init_complete(self, _result):
"""
This method is called after the REST POST to request ONU discovery is
completed. The results (body) of the post is always empty / 204 NO CONTENT
@@ -663,8 +522,8 @@
delay += random.uniform(-delay / 10, delay / 10)
self._discovery_deferred = reactor.callLater(delay, self._discover_onus)
- def _sync_hardware(self):
- if self._state == PonPort.State.RUNNING or self._state == PonPort.State.STOPPED:
+ def sync_hardware(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
def read_config(results):
self.log.debug('read-config', results=results)
config = OltConfig.Pon.decode([results])
@@ -679,7 +538,7 @@
self._expedite_sync = True
dl.append(self._set_pon_config("enabled", self.enabled))
- elif self._state == PonPort.State.RUNNING:
+ elif self.state == AdtnPort.State.RUNNING:
if self.deployment_range != config.deployment_range:
self._in_sync = False
self._expedite_sync = True
@@ -701,7 +560,7 @@
return config.onus
def sync_onus(hw_onus):
- if self._state == PonPort.State.RUNNING:
+ if self.state == AdtnPort.State.RUNNING:
self.log.debug('sync-pon-onu-results', config=hw_onus)
# ONU's have their own sync task, extra (should be deleted) are
@@ -723,7 +582,7 @@
def reschedule(_):
# Speed up sequential resync a limited number of times if out of sync.
- delay = self._sync_tick
+ delay = self.sync_tick
if self._expedite_sync:
self._expedite_count += 1
@@ -733,12 +592,12 @@
self._expedite_count = 0
delay += random.uniform(-delay / 10, delay / 10)
- self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
+ self.sync_deferred = reactor.callLater(delay, self.sync_hardware)
- self._sync_deferred = self._get_pon_config()
- self._sync_deferred.addCallbacks(read_config, failure, errbackArgs=['get-config'])
- self._sync_deferred.addCallbacks(sync_onus, failure, errbackArgs=['pon-sync'])
- self._sync_deferred.addBoth(reschedule)
+ self.sync_deferred = self._get_pon_config()
+ self.sync_deferred.addCallbacks(read_config, failure, errbackArgs=['get-config'])
+ self.sync_deferred.addCallbacks(sync_onus, failure, errbackArgs=['pon-sync'])
+ self.sync_deferred.addBoth(reschedule)
def process_status_poll(self, status):
"""
@@ -873,7 +732,7 @@
self.log.debug('discovered-ONUs', list=discovered_onus)
# Only request discovery if activation is auto-discovery or auto-activate
- continue_discovery = ['autodiscovery', 'autoactivate']
+ continue_discovery = ['autodiscovery'] # , 'autoactivate']
if self._activation_method not in continue_discovery:
return set(), set()
@@ -892,22 +751,7 @@
:return: (dict) onu config data or None on lookup failure
"""
try:
- from flow.demo_data import get_tconts, get_gem_ports, get_onu_id
-
- if self.activation_method == "autoactivate":
- # This is currently just for 'DEMO' mode
- onu_id = get_onu_id(serial_number)
- if onu_id is None:
- onu_id = self.get_next_onu_id()
- enabled = True
- channel_speed = 8500000000
- tconts = get_tconts(serial_number, onu_id)
- gem_ports = get_gem_ports(serial_number, onu_id)
- vont_ani = None
- xpon_name = None
- upstream_fec_enabled = True
-
- elif self.activation_method == "autodiscovery":
+ if self.activation_method == "autodiscovery":
if self.authentication_method == 'serial-number':
gpon_info = self.olt.get_xpon_info(self.pon_id)
@@ -932,6 +776,11 @@
gem_ports = {key: val for key, val in gpon_info['gem-ports'].iteritems()
if val.tcont_ref in tconts.keys()}
+ venet = next((val for val in gpon_info['v-enets'].itervalues()
+ if val['vont-ani'] == vont_info['name']), None)
+ # TODO: need to handle case where ont_ani, gems, venets, tconts are assigned
+ # after activation is started. only vont-ani needs to be set to get here
+
except StopIteration:
# Can happen if vont-ani or ont-ani has not yet been configured
self.log.debug('no-vont-or-ont')
@@ -959,9 +808,10 @@
'password': Onu.DEFAULT_PASSWORD,
't-conts': tconts,
'gem-ports': gem_ports,
- 'onu-vid': self.olt.get_channel_id(self._pon_id, onu_id),
+ 'onu-vid': self.olt.get_onu_vid(onu_id),
'channel-id': self.olt.get_channel_id(self._pon_id, onu_id),
- 'vont-ani': vont_ani
+ 'vont-ani': vont_ani,
+ 'venet': venet
}
# Hold off ONU activation until at least one GEM Port is defined.
self.log.debug('onu-info', gem_ports=gem_ports)
@@ -984,19 +834,25 @@
from alarms.onu_discovery_alarm import OnuDiscoveryAlarm
self.log.info('onu-lookup-failure', serial_number=serial_number_64)
OnuDiscoveryAlarm(self.olt, self.pon_id, serial_number).raise_alarm()
- return
+ returnValue('new-onu')
if serial_number_64 not in status.onus or onu_info['onu-id'] in self._active_los_alarms:
onu = None
onu_id = onu_info['onu-id']
- if serial_number_64 in self._onus or onu_id in self._onu_by_id:
+ if serial_number_64 in self._onus and onu_id in self._onu_by_id:
+ # Handles fast entry into this task before FPGA can set/clear results
+ returnValue('sticky-onu')
+
+ elif (serial_number_64 in self._onus and onu_id not in self._onu_by_id) or \
+ (serial_number_64 not in self._onus and onu_id in self._onu_by_id):
# May be here due to unmanaged power-cycle on OLT or fiber bounced for a
# previously activated ONU. Drop it and add bac on next discovery cycle
self.delete_onu(onu_id)
elif len(self._onus) >= self.MAX_ONUS_SUPPORTED:
self.log.warning('max-onus-provisioned', count=len(self._onus))
+ returnValue('max-onus-reached')
else:
# TODO: Make use of upstream_channel_speed variable
@@ -1025,37 +881,11 @@
yield onu.create(tconts, gem_ports)
- # If autoactivate (demo) mode and not reflow, activate the ONU
- if self.olt.autoactivate:
- self.activate_onu(onu)
-
except Exception as e:
self.log.exception('add-onu', serial_number=serial_number_64, e=e)
del self._onus[serial_number_64]
del self._onu_by_id[onu.onu_id]
- def activate_onu(self, onu):
- """
- Called when a new ONU is discovered and VOLTHA device adapter needs to be informed
- :param onu:
- """
- if self.olt.autoactivate:
- self.log.info('activate-onu', onu=onu)
-
- olt = self.olt
- adapter = self.adapter_agent
- channel_id = onu.onu_vid
-
- proxy = onu.proxy_address
-
- # NOTE: The following method will be deprecated. Use xPON
- adapter.child_device_detected(parent_device_id=olt.device_id,
- parent_port_no=self._port_no,
- child_device_type=onu.vendor_id,
- proxy_address=proxy,
- admin_state=AdminState.ENABLED,
- vlan=channel_id)
-
def get_next_onu_id(self):
used_ids = [onu.onu_id for onu in self.onus]
@@ -1103,13 +933,6 @@
except Exception as e:
self.log.exception('onu-delete', serial_number=onu.serial_number, e=e)
- if self.olt.autoactivate:
- # Clean up adapter agent of this ONU
- if proxy is not None:
- onu_device = self.olt.adapter_agent.get_child_device_with_proxy_address(proxy)
- if onu_device is not None:
- self.olt.adapter_agent.delete_child_device(self.olt.device_id,
- onu_device.device_id)
else:
try:
yield self._remove_from_hardware(onu_id)
diff --git a/voltha/adapters/adtran_olt/port.py b/voltha/adapters/adtran_olt/port.py
new file mode 100644
index 0000000..f0b8b36
--- /dev/null
+++ b/voltha/adapters/adtran_olt/port.py
@@ -0,0 +1,254 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import random
+
+import structlog
+from enum import Enum
+from twisted.internet import reactor, defer
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed
+
+from adtran_olt_handler import AdtranOltHandler
+from net.adtran_rest import RestInvalidResponseCode
+from codec.olt_config import OltConfig
+from onu import Onu
+from alarms.onu_los_alarm import OnuLosAlarm
+from voltha.protos.common_pb2 import OperStatus, AdminState
+from voltha.protos.device_pb2 import Port
+
+
+class AdtnPort(object):
+ """
+ A class similar to the 'Port' class in the VOLTHA
+ """
+ class State(Enum):
+ INITIAL = 0 # Created and initialization in progress
+ RUNNING = 1 # PON port contacted, ONU discovery active
+ STOPPED = 2 # Disabled
+ DELETING = 3 # Cleanup
+
+ _SUPPORTED_ACTIVATION_METHODS = ['autodiscovery'] # , 'autoactivate']
+ _SUPPORTED_AUTHENTICATION_METHODS = ['serial-number']
+
+ def __init__(self, parent, **kwargs):
+ assert parent, 'parent is None'
+ assert 'port_no' in kwargs, 'Port number not found'
+
+ self.log = structlog.get_logger(device_id=parent.device_id)
+
+ self._parent = parent
+ self._port_no = kwargs.get('port_no')
+
+ # Set the following in your derived class
+ self._name = None
+ self._label = None
+ self._port = None
+
+ self.sync_tick = 20.0
+ self.sync_deferred = None # For sync of PON config to hardware
+
+ # TODO: Deprecate 'enabled' and use admin_state instead
+ self._enabled = False
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.DISCOVERED
+ self._state = AdtnPort.State.INITIAL
+
+ self.deferred = None # General purpose
+
+ # Statistics
+ self.rx_packets = 0
+ self.rx_bytes = 0
+ self.tx_packets = 0
+ self.tx_bytes = 0
+
+ def __del__(self):
+ self.stop()
+
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ raise NotImplementedError('Add to your derived class')
+
+ @property
+ def port_no(self):
+ return self._port_no
+
+ @property
+ def name(self):
+ return self._name
+
+ @property
+ def state(self):
+ return self._state
+
+ @state.setter
+ def state(self, value):
+ self._state = value
+
+ @property
+ def olt(self):
+ return self._parent
+
+ @property
+ def admin_state(self):
+ return self._admin_state
+
+ @admin_state.setter
+ def admin_state(self, value):
+ if self._admin_state != value:
+ self._admin_state = value
+ if self._admin_state == AdminState.ENABLED:
+ self.start()
+ else:
+ self.stop()
+ @property
+ def enabled(self):
+ return self._admin_state == AdminState.ENABLED
+
+ @enabled.setter
+ def enabled(self, value):
+ assert isinstance(value, bool), 'enabled is a boolean'
+ self.admin_state = AdminState.ENABLED if value else AdminState.DISABLED
+
+ # @property
+ # def enabled(self):
+ # return self._enabled
+ #
+ # @enabled.setter
+ # def enabled(self, value):
+ # assert isinstance(value, bool), 'enabled is a boolean'
+ # if self._enabled != value:
+ # if value:
+ # self.start()
+ # self.stop()
+
+ @property
+ def oper_status(self):
+ return self._oper_status
+
+ @property
+ def adapter_agent(self):
+ return self.olt.adapter_agent
+
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port. For PON ports, a logical port
+ is not currently created, so always return None
+
+ :return: VOLTHA logical port or None if not supported
+ """
+ return None
+
+ def cancel_deferred(self):
+ d1, self.deferred = self.deferred, None
+ d2, self.sync_deferred = self.sync_deferred, None
+
+ for d in [d1, d2]:
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except Exception:
+ pass
+
+ def _update_adapter_agent(self):
+ raise NotImplementedError('Add to your derived class')
+
+ def start(self):
+ """
+ Start/enable this PON and start ONU discover
+ """
+ if self.state == AdtnPort.State.RUNNING:
+ return succeed('Running')
+
+ self.log.info('start')
+
+ self.cancel_deferred()
+ self.state = AdtnPort.State.INITIAL
+ self._oper_status = OperStatus.ACTIVATING
+ self._enabled = True
+
+ # Do the rest of the startup in an async method
+ self.deferred = reactor.callLater(0.5, self.finish_startup)
+ self._update_adapter_agent()
+
+ return succeed('Scheduled')
+
+ def finish_startup(self):
+ if self.state == AdtnPort.State.INITIAL:
+ self.log.debug('final-startup')
+
+ # If here, initial settings were successfully written to hardware
+
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
+ self.state = AdtnPort.State.RUNNING
+
+ self.sync_deferred = reactor.callLater(self.sync_tick,
+ self.sync_hardware)
+ self._update_adapter_agent()
+
+ @inlineCallbacks
+ def stop(self):
+ if self.state == AdtnPort.State.STOPPED:
+ self.log.debug('already stopped')
+ returnValue('Stopped')
+
+ self.log.info('stopping')
+ try:
+ self.cancel_deferred()
+ self._enabled = False
+ self._admin_state = AdminState.DISABLED
+ self._oper_status = OperStatus.UNKNOWN
+ self._update_adapter_agent()
+
+ self.state = AdtnPort.State.STOPPED
+
+ self.sync_deferred = reactor.callLater(self.sync_tick,
+ self.sync_hardware)
+
+ self.deferred = self.finish_stop()
+ yield self.deferred
+
+ except Exception as e:
+ raise
+
+ returnValue('Stopped')
+
+ def finish_stop(self):
+ pass # Add to your derived class if needed
+
+ def restart(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
+ start_it = (self.state == AdtnPort.State.RUNNING)
+ self.state = AdtnPort.State.INITIAL
+ return self.start() if start_it else self.stop()
+ return succeed('nop')
+
+ def delete(self):
+ """
+ Parent device is being deleted. Do not change any config but
+ stop all polling
+ """
+ self.log.info('Deleting')
+ self.state = AdtnPort.State.DELETING
+ self.cancel_deferred()
+
+ def sync_hardware(self):
+ raise NotImplementedError('Add to your derived class')
+
+# TODO: Continue to consolidate port functionality
diff --git a/voltha/adapters/adtran_olt/xpon/adtran_xpon.py b/voltha/adapters/adtran_olt/xpon/adtran_xpon.py
index 71943db..ef07b1a 100644
--- a/voltha/adapters/adtran_olt/xpon/adtran_xpon.py
+++ b/voltha/adapters/adtran_olt/xpon/adtran_xpon.py
@@ -298,7 +298,7 @@
if name in items:
# Treat like an update. It will update collection if needed
- return self.xpon_update(data)
+ return self.xpon_update(data, td=td)
log.debug('new-item', item_type=item_type, item=new_item)
items[name] = new_item
@@ -315,7 +315,7 @@
else:
del items[name]
- def xpon_update(self, data):
+ def xpon_update(self, data, td=None):
log.debug('xpon-update', data=data)
name = data.name
@@ -328,7 +328,7 @@
if existing_item is None:
raise KeyError("'{}' not found. Type: {}".format(name, type(data)))
- item_type, update_item = self._data_to_dict(data)
+ item_type, update_item = self._data_to_dict(data, td=td)
log.debug('update-item', item_type=item_type, item=update_item)
def _dict_diff(lhs, rhs):
diff --git a/voltha/adapters/adtran_olt/xpon/gem_port.py b/voltha/adapters/adtran_olt/xpon/gem_port.py
index fc16fd9..ada51f9 100644
--- a/voltha/adapters/adtran_olt/xpon/gem_port.py
+++ b/voltha/adapters/adtran_olt/xpon/gem_port.py
@@ -24,6 +24,7 @@
tcont_ref=None,
traffic_class=None,
intf_ref=None,
+ untagged=False,
exception=False, # FIXED_ONU
name=None,
handler=None):
@@ -36,6 +37,7 @@
self._encryption = encryption
self._omci_transport = omci_transport
self.multicast = multicast
+ self.untagged = untagged
self.exception = exception # FIXED_ONU
self._handler = handler
diff --git a/voltha/adapters/adtran_olt/xpon/olt_gem_port.py b/voltha/adapters/adtran_olt/xpon/olt_gem_port.py
index 79cde83..a23e5fb 100644
--- a/voltha/adapters/adtran_olt/xpon/olt_gem_port.py
+++ b/voltha/adapters/adtran_olt/xpon/olt_gem_port.py
@@ -33,6 +33,7 @@
tcont_ref=None,
traffic_class=None,
intf_ref=None,
+ untagged=False,
exception=False, # FIXED_ONU
name=None,
handler=None,
@@ -44,6 +45,7 @@
tcont_ref=tcont_ref,
traffic_class=traffic_class,
intf_ref=intf_ref,
+ untagged=untagged,
exception=exception,
name=name,
handler=handler)
@@ -51,13 +53,17 @@
@staticmethod
def create(handler, gem_port):
- exception = gem_port['gemport-id'] in [2180, 2186, 2192, # FIXED_ONU
- 2198, 2204, 2210,
- 2216, 2222, 2228,
- 2234, 2240, 2246,
- 2252, 2258]
- mcast = gem_port['gemport-id'] in [4095]
+ if handler.exception_gems:
+ exception = gem_port['gemport-id'] in [2180, 2186, 2192, # FIXED_ONU
+ 2198, 2204, 2210,
+ 2216, 2222, 2228,
+ 2234, 2240, 2246,
+ 2252, 2258]
+ else:
+ exception = False # FIXED_ONU
+ mcast = gem_port['gemport-id'] in [4095] # TODO: Perform proper lookup
+ untagged = 'untagged' in gem_port['name'].lower()
# TODO: Use next once real BBF mcast available.
# port_ref = 'channel-pair-ref 'if mcast else 'venet-ref'
port_ref = 'venet-ref 'if mcast else 'venet-ref'
@@ -71,7 +77,8 @@
intf_ref=gem_port.get(port_ref),
handler=handler,
multicast=mcast,
- exception=exception)
+ untagged=untagged,
+ exception=exception) # FIXED_ONU
@property
def encryption(self):