OLT & ONU updates for Adtran Device Adapters
Change-Id: I229b37016b89d4ea73c2bcf1cc7662dbe5a63c9d
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index 297bb1b..295fb32 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -51,12 +51,12 @@
DEFAULT_MULTICAST_VLAN = 4050
_MANAGEMENT_VLAN = 4093
-_DEFAULT_RESTCONF_USERNAME = "ADMIN"
-_DEFAULT_RESTCONF_PASSWORD = "PASSWORD"
+_DEFAULT_RESTCONF_USERNAME = ""
+_DEFAULT_RESTCONF_PASSWORD = ""
_DEFAULT_RESTCONF_PORT = 8081
-_DEFAULT_NETCONF_USERNAME = "hsvroot"
-_DEFAULT_NETCONF_PASSWORD = "BOSCO"
+_DEFAULT_NETCONF_USERNAME = ""
+_DEFAULT_NETCONF_PASSWORD = ""
_DEFAULT_NETCONF_PORT = 830
@@ -538,9 +538,15 @@
raise RuntimeError('Failed to activate OLT: {}'.format(device.reason))
@inlineCallbacks
- def make_netconf_connection(self, connect_timeout=None):
- ############################################################################
- # Start initial discovery of NETCONF support
+ def make_netconf_connection(self, connect_timeout=None,
+ close_existing_client=False):
+
+ if close_existing_client and self._netconf_client is not None:
+ try:
+ yield self._netconf_client.close()
+ except:
+ pass
+ self._netconf_client = None
client = self._netconf_client
@@ -683,7 +689,7 @@
except Exception as e:
self.log.exception('southbound-port-startup', e=e)
- results = yield defer.gatherResults(dl)
+ results = yield defer.gatherResults(dl, consumeErrors=True)
returnValue(results)
@@ -1212,7 +1218,7 @@
registry('frameio').close_port(io)
def _rcv_io(self, port, frame):
- self.log.info('received', iface_name=port.iface_name, frame_len=len(frame))
+ self.log.debug('received', iface_name=port.iface_name, frame_len=len(frame))
pkt = Ether(frame)
if pkt.haslayer(Dot1Q):
@@ -1238,8 +1244,8 @@
def packet_out(self, egress_port, msg):
if self.io_port is not None:
- self.log.info('sending-packet-out', egress_port=egress_port,
- msg=hexify(msg))
+ self.log.debug('sending-packet-out', egress_port=egress_port,
+ msg=hexify(msg))
pkt = Ether(msg)
#ADTRAN To remove any extra tags
@@ -1344,7 +1350,7 @@
device.oper_status = OperStatus.FAILED
device.reason = self.heartbeat_last_reason
self.adapter_agent.update_device(device)
- self.heartbeat_alarm(False, self.heartbeat_miss)
+ self.heartbeat_alarm(True, self.heartbeat_miss)
else:
# Update device states
if device.connect_status != ConnectStatus.REACHABLE:
@@ -1352,7 +1358,10 @@
device.oper_status = OperStatus.ACTIVE
device.reason = ''
self.adapter_agent.update_device(device)
- self.heartbeat_alarm(True)
+ self.heartbeat_alarm(False)
+
+ if self.netconf_client is None or not self.netconf_client.connected:
+ self.make_netconf_connection(close_existing_client=True)
except Exception as e:
self.log.exception('heartbeat-check', e=e)
@@ -1376,16 +1385,17 @@
self.heartbeat_last_reason = 'RESTCONF connectivity error'
self.heartbeat_check_status(None)
- def heartbeat_alarm(self, status, heartbeat_misses=0):
+ def heartbeat_alarm(self, raise_alarm, heartbeat_misses=0):
alarm = 'Heartbeat'
alarm_data = {
'ts': arrow.utcnow().timestamp,
- 'description': self.alarms.format_description('olt', alarm, status),
+ 'description': self.alarms.format_description('olt', alarm,
+ raise_alarm),
'id': self.alarms.format_id(alarm),
'type': AlarmEventType.EQUIPMENT,
'category': AlarmEventCategory.PON,
'severity': AlarmEventSeverity.CRITICAL,
- 'state': AlarmEventState.RAISED if status else AlarmEventState.CLEARED
+ 'state': AlarmEventState.RAISED if raise_alarm else AlarmEventState.CLEARED
}
context_data = {'heartbeats_missed': heartbeat_misses}
self.alarms.send_alarm(context_data, alarm_data)
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index cccd53c..d34f9e5 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -51,7 +51,7 @@
self.descriptor = Adapter(
id=self.name,
vendor='Adtran, Inc.',
- version='0.8',
+ version='0.9',
config=AdapterConfig(log_level=LogLevel.INFO)
)
log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index 800058f..1c90348 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -848,7 +848,7 @@
raise NotImplementedError('TODO: not yet supported')
- def delete_interface(self, data):
+ def remove_interface(self, data):
"""
Deleete XPON interfaces
:param data: (xpon config info)
diff --git a/voltha/adapters/adtran_olt/flow/evc.py b/voltha/adapters/adtran_olt/flow/evc.py
index add54e8..71b7617 100644
--- a/voltha/adapters/adtran_olt/flow/evc.py
+++ b/voltha/adapters/adtran_olt/flow/evc.py
@@ -81,6 +81,7 @@
self._status_message = None
self._flow = flow_entry
self._name = self._create_name()
+ self._deferred = None
self._evc_maps = {} # Map Name -> evc-map
self._flow_type = EVC.ElineFlowType.UNKNOWN
@@ -111,6 +112,16 @@
#
return EVC_NAME_FORMAT.format(self._flow.flow_id)
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+
+ except Exception as e:
+ pass
+
@property
def name(self):
return self._name
@@ -123,6 +134,11 @@
def installed(self):
return self._installed
+ @installed.setter
+ def installed(self, value):
+ assert not value, 'EVC Install can only be reset'
+ self._installed = False
+
@property
def status(self):
return self._status_message
@@ -210,11 +226,21 @@
if self._evc_maps is not None and evc_map.name in self._evc_maps:
del self._evc_maps[evc_map.name]
- def schedule_install(self):
+ def schedule_install(self, delay=0):
"""
- Try to install EVC and all MAPs in a single operational sequence
+ Try to install EVC and all MAPs in a single operational sequence.
+ The delay parameter is used during recovery to allow multiple associated
+ EVC maps to be updated/modified independently before the parent EVC
+ is installed.
+
+ :param delay: (int) Seconds to delay before install
"""
- return reactor.callLater(0, self._do_install) if self._valid else succeed('Not VALID')
+ self._cancel_deferred()
+
+ self._deferred = reactor.callLater(delay, self._do_install) \
+ if self._valid else succeed('Not VALID')
+
+ return self._deferred
@staticmethod
def _xml_header(operation=None):
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
index 752e150..027a19e 100644
--- a/voltha/adapters/adtran_olt/flow/evc_map.py
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -152,6 +152,10 @@
self._status_message = value
@property
+ def evc(self):
+ return self._evc
+
+ @property
def _needs_acl_support(self):
if self._ipv4_dst is not None: # In case MCAST downstream has ACL on it
return False
@@ -248,7 +252,13 @@
@inlineCallbacks
def install(self):
- if self._valid and not self._installed and len(self._gem_ids_and_vid) > 0:
+ def gem_ports():
+ ports = []
+ for gems_and_vids in self._gem_ids_and_vid.itervalues():
+ ports.extend(gems_and_vids[0])
+ return ports
+
+ if self._valid and not self._installed and len(gem_ports()) > 0:
try:
# TODO: create generator of XML once we have MANY to install at once
map_xml = self._ingress_install_xml(self._gem_ids_and_vid) \
@@ -256,7 +266,7 @@
log.debug('install', xml=map_xml, name=self.name)
results = yield self._flow.handler.netconf_client.edit_config(map_xml,
- lock_timeout=30)
+ lock_timeout=10)
self._installed = results.ok
self.status = '' if results.ok else results.error
@@ -325,10 +335,76 @@
returnValue('Done')
+ def reflow_needed(self):
+ log.debug('reflow-needed')
+ reflow = not self.installed
+ # TODO: implement
+ return reflow
+
@staticmethod
def create_evc_map_name(flow):
return EVC_MAP_NAME_FORMAT.format(flow.in_port, flow.flow_id)
+ def add_gem_port(self, gem_port, reflow=False):
+ # TODO: Refactor
+ if self._is_ingress_map:
+ def gem_ports():
+ ports = []
+ for gems_and_vids in self._gem_ids_and_vid.itervalues():
+ ports.extend(gems_and_vids[0])
+ return ports
+
+ before = gem_ports()
+ self._setup_gem_ids()
+ after = gem_ports()
+
+ if reflow or len(before) < len(after):
+ self._installed = False
+ return self.install()
+
+ return succeed('nop')
+
+ def remove_gem_port(self, gem_port):
+ # TODO: Refactor
+ if self._is_ingress_map:
+ def gem_ports():
+ ports = []
+ for gems_and_vids in self._gem_ids_and_vid.itervalues():
+ ports.extend(gems_and_vids[0])
+ return ports
+
+ before = gem_ports()
+ self._setup_gem_ids()
+ after = gem_ports()
+
+ if len(before) > len(after):
+ self._installed = False
+ return self.install()
+
+ return succeed('nop')
+
+
+# self._gem_ids_and_vid = None # { key -> onu-id, value -> tuple(sorted GEM Port IDs, onu_vid) }
+
+ def _setup_gem_ids(self):
+ from flow_entry import FlowEntry
+
+ flow = self._flow # TODO: Drop saving of flow once debug complete
+ is_pon = flow.handler.is_pon_port(flow.in_port)
+
+ if self._is_ingress_map and is_pon:
+ pon_port = flow.handler.get_southbound_port(flow.in_port)
+
+ if pon_port is not None:
+ self._pon_id = pon_port.pon_id
+ self._gem_ids_and_vid = pon_port.gem_ids(flow.logical_port,
+ self._needs_acl_support,
+ flow.is_multicast_flow)
+
+ # TODO: Only EAPOL ACL support for the first demo - FIXED_ONU
+ if self._needs_acl_support and self._eth_type != FlowEntry.EtherType.EAPOL.value:
+ self._gem_ids_and_vid = dict()
+
def _decode(self):
from evc import EVC
from flow_entry import FlowEntry
@@ -369,18 +445,7 @@
# If no match of VLAN this may be for untagged traffic or upstream and needs to
# match the gem-port vid
- if self._is_ingress_map and is_pon:
- pon_port = flow.handler.get_southbound_port(flow.in_port)
-
- if pon_port is not None:
- self._pon_id = pon_port.pon_id
- self._gem_ids_and_vid = pon_port.gem_ids(flow.logical_port,
- self._needs_acl_support,
- flow.is_multicast_flow)
-
- # TODO: Only EAPOL ACL support for the first demo - FIXED_ONU
- if self._needs_acl_support and self._eth_type != FlowEntry.EtherType.EAPOL.value:
- self._gem_ids_and_vid = dict()
+ self._setup_gem_ids()
# self._match_untagged = flow.vlan_id is None and flow.inner_vid is None
self._c_tag = flow.inner_vid
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
index 0fd42f7..638fd6b 100644
--- a/voltha/adapters/adtran_olt/flow/flow_entry.py
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -18,7 +18,8 @@
import voltha.core.flow_decomposer as fd
from voltha.core.flow_decomposer import *
-from voltha.protos.openflow_13_pb2 import OFPP_CONTROLLER, OFPP_LOCAL, OFPP_ANY, OFPP_MAX
+from voltha.protos.openflow_13_pb2 import OFPP_MAX
+from twisted.internet import defer
from twisted.internet.defer import returnValue, inlineCallbacks, succeed, gatherResults
log = structlog.get_logger()
@@ -485,31 +486,6 @@
return gatherResults(dl, consumeErrors=True)
- @staticmethod
- def find_evc_map_flows(device_id, pon_id, onu_id=None):
- """
- For a given OLT, find all the EVC Maps for a specific PON ID and optionally a
- specific ONU
- :param device_id: Device ID
- :param pon_id: (int) PON ID
- :param onu_id: (int) Optional ONU ID
- :return: (list) of matching flows
- """
- # EVCs are only in the downstream table, EVC Map are in upstream
- flow_table = _existing_upstream_flow_entries.get(device_id, None)
-
- if flow_table is None:
- return []
-
- flows = []
- for flow in flow_table.itervalues():
- evc_map = flow.evc_map
- if evc_map is not None and evc_map.pon_id is not None and evc_map.pon_id == pon_id:
- # PON ID Matches
- if onu_id is None or onu_id in evc_map.gem_ids_and_vid:
- flows.append(evc_map)
- return flows
-
@inlineCallbacks
def remove(self):
"""
@@ -547,7 +523,7 @@
flow_evc = flow_table['evc']
# If this flow owns the EVC, assign it to a remaining flow
- if flow_id == flow_evc.flow_entry.flow_id:
+ if flow_evc is not None and flow_id == flow_evc.flow_entry.flow_id:
flow_table['evc'].flow_entry = next((_flow for _flow in flow_table.itervalues()
if isinstance(_flow, FlowEntry)
and _flow.flow_id != flow_id), None)
@@ -578,6 +554,53 @@
self.evc = None
returnValue(succeed('Done'))
+ @staticmethod
+ def find_evc_map_flows(onu):
+ """
+ For a given OLT, find all the EVC Maps for a specific ONU
+ :param onu: (Onu) onu
+ :return: (list) of matching flows
+ """
+ # EVCs are only in the downstream table, EVC Map are in upstream
+
+ device_id = onu.device_id
+ onu_ports = onu.uni_ports
+
+ all_flow_entries = _existing_upstream_flow_entries.get(device_id) or {}
+ evc_maps = [flow_entry.evc_map for flow_entry in all_flow_entries.itervalues()
+ if flow_entry.in_port in onu_ports
+ and flow_entry.evc_map is not None
+ and flow_entry.evc_map.valid]
+
+ return evc_maps
+
+ @staticmethod
+ def sync_flows_by_onu(onu, reflow=False):
+ """
+ Check status of all flows on a per-ONU basis. Called when values
+ within the ONU are modified that may affect traffic.
+
+ :param onu: (Onu) ONU to examine
+ :param reflow: (boolean) Flag, if True, requests that the flow be sent to
+ hardware even if the values in hardware are
+ consistent with the current flow settings
+ """
+ evc_maps = FlowEntry.find_evc_map_flows(onu)
+ evcs = {}
+
+ for evc_map in evc_maps:
+ if reflow or evc_map.reflow_needed():
+ evc_map.installed = False
+
+ if not evc_map.installed:
+ evc = evc_map.evc
+ if evc is not None:
+ evcs[evc.name] = evc
+
+ for evc in evcs.itervalues():
+ evc.installed = False
+ evc.schedule_install(delay=2)
+
######################################################
# Bulk operations
diff --git a/voltha/adapters/adtran_olt/gem_port.py b/voltha/adapters/adtran_olt/gem_port.py
index 70959d4..1ce3915 100644
--- a/voltha/adapters/adtran_olt/gem_port.py
+++ b/voltha/adapters/adtran_olt/gem_port.py
@@ -55,6 +55,12 @@
@staticmethod
def create(data, olt):
assert isinstance(data, GemportsConfigData)
+ exception = data.gemport_id in [2180, 2186, 2192,
+ 2198, 2204, 2210,
+ 2216, 2222, 2228,
+ 2234, 2240, 2246,
+ 2252, 2258]
+ mcast = data.gemport_id in [4095]
return GemPort(data.gemport_id, None,
encryption=data.aes_indicator,
@@ -63,7 +69,9 @@
name=data.name,
traffic_class=data.traffic_class,
intf_ref=data.itf_ref, # v_enet
- olt=olt)
+ olt=olt,
+ multicast=mcast,
+ exception=exception)
@property
def alloc_id(self):
diff --git a/voltha/adapters/adtran_olt/net/adtran_netconf.py b/voltha/adapters/adtran_olt/net/adtran_netconf.py
index 7cbea60..14947c9 100644
--- a/voltha/adapters/adtran_olt/net/adtran_netconf.py
+++ b/voltha/adapters/adtran_olt/net/adtran_netconf.py
@@ -137,6 +137,18 @@
def _do_close(self, old_session):
return old_session.close_session()
+ @inlineCallbacks
+ def _reconnect(self):
+ try:
+ yield self.close()
+ except:
+ pass
+
+ try:
+ yield self.connect()
+ except:
+ pass
+
def get_config(self, source='running'):
"""
Get the configuration from the specified source
@@ -145,9 +157,11 @@
:return: (deferred) Deferred request that wraps the GetReply class
"""
+ if not self._session:
+ raise NotImplemented('No SSH Session')
- if not self._session or not self._session.connected:
- raise NotImplemented('TODO: Support auto-connect if needed')
+ if not self._session.connected:
+ self._reconnect()
return threads.deferToThread(self._do_get_config, source)
@@ -170,8 +184,11 @@
"""
log.debug('get', filter=payload)
- if not self._session or not self._session.connected:
- raise NotImplemented('TODO: Support auto-connect if needed')
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ self._reconnect()
return threads.deferToThread(self._do_get, payload)
@@ -268,8 +285,15 @@
:return: (deferred) for RpcReply
"""
- if not self._session or not self._session.connected:
- raise NotImplemented('TODO: Support auto-connect if needed')
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ try:
+ yield self._reconnect()
+
+ except Exception as e:
+ log.exception('edit-config-connect', e=e)
rpc_reply = None
# if lock_timeout > 0:
@@ -337,8 +361,11 @@
"""
log.debug('rpc', rpc=rpc_string)
- if not self._session or not self._session.connected:
- raise NotImplemented('TODO: Support auto-connect if needed')
+ if not self._session:
+ raise NotImplemented('No SSH Session')
+
+ if not self._session.connected:
+ self._reconnect()
return threads.deferToThread(self._do_rpc, rpc_string)
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index bea3fd0..463b02a 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -56,7 +56,7 @@
self._logical_port = None
self._parent = parent
- self._sync_tick = 20.0 # TODO: Implement
+ self._sync_tick = 10.0
self._sync_deferred = None
self._deferred = None
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index ec021c1..8497178 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -32,6 +32,10 @@
'TBIT': 'tibit_onu',
}
+_MAX_EXPEDITE_COUNT = 5
+_EXPEDITE_SECS = 2
+_HW_SYNC_SECS = 30
+
class Onu(object):
"""
@@ -43,16 +47,6 @@
DEFAULT_PASSWORD = ''
def __init__(self, onu_info):
- # onu_info = {
- # 'serial-number': serial_number,
- # 'xpon-name': None,
- # 'pon-id': self.pon_id,
- # 'onu-id': None, # Set later (mandatory)
- # 'enabled': True,
- # 'upstream-channel-speed': 0,
- # 't-conts': get_tconts(self.pon_id, serial_number),
- # 'gem-ports': get_gem_ports(self.pon_id, serial_number),
- # }
self._onu_id = onu_info['onu-id']
if self._onu_id is None:
raise ValueError('No ONU ID available')
@@ -82,9 +76,10 @@
self._include_multicast = True # TODO: May need to add multicast on a per-ONU basis
- self._sync_tick = 60.0
+ self._sync_tick = _HW_SYNC_SECS
self._expedite_sync = False
self._expedite_count = 0
+ self._resync_flows = False
self._sync_deferred = None # For sync of ONT config to hardware
# TODO: enable and upstream-channel-speed not yet supported
@@ -126,6 +121,10 @@
return self._onu_id
@property
+ def device_id(self):
+ return self._device_id
+
+ @property
def name(self):
return self._name
@@ -137,6 +136,8 @@
def enabled(self, value):
if self._enabled != value:
self._enabled = value
+ self._resync_flows = True
+
self.set_config('enable', self._enabled)
if self._enabled:
@@ -149,6 +150,10 @@
return self._onu_vid
@property
+ def uni_ports(self):
+ return self._uni_ports
+
+ @property
def logical_port(self):
"""Return the logical PORT number of this ONU's UNI"""
return self._uni_ports[0]
@@ -248,6 +253,7 @@
def _cancel_deferred(self):
d, self._sync_deferred = self._sync_deferred, None
+
if d is not None and not d.called:
try:
d.cancel()
@@ -278,7 +284,6 @@
except Exception as e: # TODO: Add breakpoint here during unexpected reboot test
self.log.exception('onu-create', e=e)
- raise
# Now set up all tconts & gem-ports
first_sync = self._sync_tick
@@ -353,36 +358,50 @@
def restart(self):
if not self._valid:
return succeed('Deleting')
+
+ self._cancel_deferred()
+ self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
tconts, self._tconts = self._tconts, {}
gem_ports, self._gem_ports = self._gem_ports, {}
+
return self.create(tconts, gem_ports)
def _sync_hardware(self):
from codec.olt_config import OltConfig
self.log.debug('sync-hardware')
+
def read_config(results):
self.log.debug('read-config', results=results)
- config = OltConfig.Pon.Onu.decode([results])
- assert self.onu_id in config, 'sync-onu-not-found-{}'.format(self.onu_id)
- config = config[self.onu_id]
dl = []
- if self._enabled != config.enable:
- dl.append(self.set_config('enable', self._enabled))
+ try:
+ config = OltConfig.Pon.Onu.decode([results])
+ assert self.onu_id in config, 'sync-onu-not-found-{}'.format(self.onu_id)
+ config = config[self.onu_id]
- if self.serial_number != config.serial_number:
- dl.append(self.set_config('serial-number', self.serial_number))
+ if self._enabled != config.enable:
+ dl.append(self.set_config('enable', self._enabled))
- # Sync TCONTs if everything else in sync
+ if self.serial_number != config.serial_number:
+ dl.append(self.set_config('serial-number', self.serial_number))
- if len(dl) == 0:
- dl.extend(sync_tconts(config.tconts))
+ # Sync TCONTs if everything else in sync
- # Sync GEM Ports if everything else in sync
+ if len(dl) == 0:
+ dl.extend(sync_tconts(config.tconts))
- if len(dl) == 0:
- dl.extend(sync_gem_ports(config.gem_ports))
+ # Sync GEM Ports if everything else in sync
+
+ if len(dl) == 0:
+ dl.extend(sync_gem_ports(config.gem_ports))
+
+ if len(dl) == 0:
+ sync_flows()
+
+ except Exception as e:
+ self.log.exception('hw-sync-read-config', e=e)
# Run h/w sync again a bit faster if we had to sync anything
self._expedite_sync = len(dl) > 0
@@ -395,17 +414,21 @@
my_alloc_ids = frozenset(self._tconts.iterkeys())
dl = []
- extra_alloc_ids = hw_alloc_ids - my_alloc_ids
- dl.extend(sync_delete_extra_tconts(extra_alloc_ids))
+ try:
+ extra_alloc_ids = hw_alloc_ids - my_alloc_ids
+ dl.extend(sync_delete_extra_tconts(extra_alloc_ids))
- missing_alloc_ids = my_alloc_ids - hw_alloc_ids
- dl.extend(sync_add_missing_tconts(missing_alloc_ids))
+ missing_alloc_ids = my_alloc_ids - hw_alloc_ids
+ dl.extend(sync_add_missing_tconts(missing_alloc_ids))
- matching_alloc_ids = my_alloc_ids & hw_alloc_ids
- matching_hw_tconts = {alloc_id: tcont
- for alloc_id, tcont in hw_tconts.iteritems()
- if alloc_id in matching_alloc_ids}
- dl.extend(sync_matching_tconts(matching_hw_tconts))
+ matching_alloc_ids = my_alloc_ids & hw_alloc_ids
+ matching_hw_tconts = {alloc_id: tcont
+ for alloc_id, tcont in hw_tconts.iteritems()
+ if alloc_id in matching_alloc_ids}
+ dl.extend(sync_matching_tconts(matching_hw_tconts))
+
+ except Exception as e:
+ self.log.exception('hw-sync-tconts', e=e)
return dl
@@ -461,17 +484,23 @@
my_gems_ids = frozenset(self._gem_ports.iterkeys())
dl = []
- extra_gems_ids = hw_gems_ids - my_gems_ids
- dl.extend(sync_delete_extra_gem_ports(extra_gems_ids))
+ try:
+ extra_gems_ids = hw_gems_ids - my_gems_ids
+ dl.extend(sync_delete_extra_gem_ports(extra_gems_ids))
- missing_gem_ids = my_gems_ids - hw_gems_ids
- dl.extend(sync_add_missing_gem_ports(missing_gem_ids))
+ missing_gem_ids = my_gems_ids - hw_gems_ids
+ dl.extend(sync_add_missing_gem_ports(missing_gem_ids))
- matching_gem_ids = my_gems_ids & hw_gems_ids
- matching_hw_gem_ports = {gem_id: gem_port
- for gem_id, gem_port in hw_gem_ports.iteritems()
- if gem_id in matching_gem_ids}
- dl.extend(sync_matching_gem_ports(matching_hw_gem_ports))
+ matching_gem_ids = my_gems_ids & hw_gems_ids
+ matching_hw_gem_ports = {gem_id: gem_port
+ for gem_id, gem_port in hw_gem_ports.iteritems()
+ if gem_id in matching_gem_ids}
+
+ dl.extend(sync_matching_gem_ports(matching_hw_gem_ports))
+ self._resync_flows |= len(dl) > 0
+
+ except Exception as e:
+ self.log.exception('hw-sync-gem-ports', e=e)
return dl
@@ -496,6 +525,12 @@
operation='PATCH'))
return dl
+ def sync_flows():
+ from flow.flow_entry import FlowEntry
+
+ reflow, self._resync_flows = self._resync_flows, False
+ return FlowEntry.sync_flows_by_onu(self, reflow=reflow)
+
def failure(reason):
# self.log.error('hardware-sync-get-config-failed', reason=reason)
pass
@@ -510,8 +545,8 @@
if self._expedite_sync:
self._expedite_count += 1
- if self._expedite_count < 5:
- delay = 5
+ if self._expedite_count < _MAX_EXPEDITE_COUNT:
+ delay = _EXPEDITE_SECS
else:
self._expedite_count = 0
@@ -525,9 +560,14 @@
if not self.pon.enabled:
return reschedule('not-enabled')
- self._sync_deferred = self._get_config()
- self._sync_deferred.addCallbacks(read_config, failure)
- self._sync_deferred.addBoth(reschedule)
+ try:
+ self._sync_deferred = self._get_config()
+ self._sync_deferred.addCallbacks(read_config, failure)
+ self._sync_deferred.addBoth(reschedule)
+
+ except Exception as e:
+ self.log.exception('hw-sync-main', e=e)
+ return reschedule('sync-exception')
def _get_config(self):
uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, self.onu_id)
@@ -646,13 +686,17 @@
results = yield gem_port.add_to_hardware(self.olt.rest_client,
self._pon_id,
self.onu_id)
+ # self._resync_flows = True
# May need to update flow tables/evc-maps
if gem_port.alloc_id in self._tconts:
+ from flow.flow_entry import FlowEntry
# GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
# from flow.flow_entry import FlowEntry
- # evc_maps = FlowEntry.find_evc_map_flows(self._device_id, self._pon_id, self._onu_id)
- pass # TODO: Start here Tuesday
+ evc_maps = FlowEntry.find_evc_map_flows(self)
+
+ for evc_map in evc_maps:
+ evc_map.add_gem_port(gem_port, reflow=reflow)
except Exception as e:
self.log.exception('gem-port', gem_port=gem_port, reflow=reflow, e=e)
@@ -670,8 +714,11 @@
returnValue(succeed('nop'))
del self._gem_ports[gem_id]
+ # self._resync_flows = True
try:
+ from flow.flow_entry import FlowEntry
+
if gem_port.alloc_id in self._tconts:
# May need to update flow tables/evc-maps
# GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
@@ -680,6 +727,11 @@
results = yield gem_port.remove_from_hardware(self.olt.rest_client,
self._pon_id,
self.onu_id)
+ evc_maps = FlowEntry.find_evc_map_flows(self)
+
+ for evc_map in evc_maps:
+ evc_map.remove_gem_port(gem_port)
+
except Exception as e:
self.log.exception('delete', e=e)
raise
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index 87fb93a..d1a242d 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -63,8 +63,8 @@
self._name = 'xpon 0/{}'.format(pon_index+1)
self._label = 'pon-{}'.format(pon_index)
self._port = None
- self._no_onu_discover_tick = 5.0
self._discovery_tick = 20.0
+ self._no_onu_discover_tick = self._discovery_tick / 2
self._discovered_onus = [] # List of serial numbers
self._sync_tick = 20.0
self._in_sync = False
@@ -433,7 +433,7 @@
self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
self.log.debug('stopped')
- returnValue(succeed(results))
+ returnValue(results)
@inlineCallbacks
def reset(self):
@@ -613,7 +613,7 @@
self._expedite_sync = True
dl.append(self._set_pon_config("upstream-fec-enable",
self.upstream_fec_enable))
- return defer.gatherResults(dl)
+ return defer.gatherResults(dl, consumeErrors=True)
def sync_onus(results):
if self._state == PonPort.State.RUNNING:
@@ -794,10 +794,13 @@
if val.tconf_ref in tcont_names}
except StopIteration:
+ self.log.debug('no-vont-ony')
return None # Can happen if vont-ani/serial-number has not yet been configured
else:
+ self.log.debug('not-serial-number-authentication')
return None
else:
+ self.log.debug('not-auto-discovery')
return None
onu_info = {
@@ -816,8 +819,10 @@
'vont-ani': vont_ani
}
# Hold off ONU activation until at least one GEM Port is defined.
+ self.log.debug('onu-info', gem_ports=gem_ports)
- return onu_info if len(gem_ports) > 0 else None
+ return onu_info
+ # return onu_info if len(gem_ports) > 0 else None
except Exception as e:
self.log.exception('get-onu-info', e=e)
@@ -827,17 +832,26 @@
def add_onu(self, serial_number, status):
self.log.info('add-onu', serial_number=serial_number, status=status)
- if serial_number not in status.onus:
- # Newly found and not enabled ONU, enable it now if not at max
+ onu_info = self._get_onu_info(Onu.serial_number_to_string(serial_number))
- onu_info = self._get_onu_info(Onu.serial_number_to_string(serial_number))
+ if onu_info is None:
+ self.log.info('lookup-failure', serial_number=serial_number)
- if onu_info is None:
- self.log.info('lookup-failure', serial_number=serial_number)
+ if serial_number not in status.onus or onu_info['onu-id'] in self._active_los_alarms:
+ onu = None
- elif serial_number in self._onus or onu_info['onu-id'] in self._onu_by_id:
+ if onu_info['onu-id'] in self._active_los_alarms:
+ try:
+ yield self._remove_from_hardware(onu_info['onu-id'])
+
+ except Exception as e:
+ self.log.exception('los-cleanup', e=e)
+
+ if serial_number in self._onus or onu_info['onu-id'] in self._onu_by_id:
# May be here due to unmanaged power-cycle on OLT
+
self.log.info('onu-already-added', serial_number=serial_number)
+
assert serial_number in self._onus and\
onu_info['onu-id'] in self._onu_by_id, \
'ONU not in both lists'
@@ -849,7 +863,8 @@
reflow = True
elif len(self._onus) >= self.MAX_ONUS_SUPPORTED:
- self.log.warning('max-onus-provisioned', count=len(self._onus))
+ self.log.warning('max-onus-provisioned', count=len(self._onus))
+
else:
# TODO: Make use of upstream_channel_speed variable
onu = Onu(onu_info)
@@ -857,30 +872,35 @@
self._onus[serial_number] = onu
self._onu_by_id[onu.onu_id] = onu
- try:
- tconts = onu_info['t-conts']
- gem_ports = onu_info['gem-ports']
+ if onu is not None:
+ try:
+ tconts = onu_info['t-conts']
+ gem_ports = onu_info['gem-ports']
- # Add Multicast to PON on a per-ONU basis until xPON multicast support is ready
- # In xPON/BBF, mcast gems tie back to the channel-pair
- # MCAST VLAN IDs stored as a negative value
+ # Add Multicast to PON on a per-ONU basis until xPON multicast support is ready
+ # In xPON/BBF, mcast gems tie back to the channel-pair
+ # MCAST VLAN IDs stored as a negative value
- for id_or_vid, gem_port in gem_ports.iteritems(): # TODO: Deprecate this when BBF ready
- if gem_port.multicast:
- self.add_mcast_gem_port(gem_port, -id_or_vid)
+ for id_or_vid, gem_port in gem_ports.iteritems(): # TODO: Deprecate this when BBF ready
+ try:
+ if gem_port.multicast:
+ self.log.debug('id-or-vid', id_or_vid=id_or_vid)
+ self.add_mcast_gem_port(gem_port, -id_or_vid)
+ except Exception as e:
+ self.log.exception('id-or-vid', e=e)
- yield onu.create(tconts, gem_ports, reflow=reflow)
+ yield onu.create(tconts, gem_ports, reflow=reflow)
- # If autoactivate (demo) mode and not reflow, activate the ONU
- if self.olt.autoactivate and not reflow:
- self.activate_onu(onu)
+ # If autoactivate (demo) mode and not reflow, activate the ONU
+ if self.olt.autoactivate and not reflow:
+ self.activate_onu(onu)
- except Exception as e:
- self.log.exception('add-onu', serial_number=serial_number, reflow=reflow, e=e)
+ except Exception as e:
+ self.log.exception('add-onu', serial_number=serial_number, reflow=reflow, e=e)
- if not reflow:
- del self._onus[serial_number]
- del self._onu_by_id[onu.onu_id]
+ if not reflow:
+ del self._onus[serial_number]
+ del self._onu_by_id[onu.onu_id]
def activate_onu(self, onu):
"""
@@ -918,19 +938,28 @@
return onu_id
@inlineCallbacks
- def delete_onu(self, onu_id):
+ def _remove_from_hardware(self, onu_id):
uri = AdtranOltHandler.GPON_ONU_CONFIG_URI.format(self._pon_id, onu_id)
name = 'pon-delete-onu-{}-{}'.format(self._pon_id, onu_id)
+ try:
+ yield self._parent.rest_client.request('DELETE', uri, name=name)
+
+ except Exception as e:
+ self.log.exception('onu-hw-delete', onu_id=onu_id, e=e)
+
+ @inlineCallbacks
+ def delete_onu(self, onu_id):
onu = self._onu_by_id.get(onu_id)
# Remove from any local dictionary
if onu_id in self._onu_by_id:
del self._onu_by_id[onu_id]
+
for sn in [onu.serial_numbers for onu in self._onus.itervalues() if onu.onu_id == onu_id]:
del self._onus[sn]
try:
- yield self._parent.rest_client.request('DELETE', uri, name=name)
+ yield self._remove_from_hardware(onu_id)
except Exception as e:
self.log.exception('onu', serial_number=onu.serial_number, e=e)
diff --git a/voltha/adapters/adtran_onu/adtran_onu.py b/voltha/adapters/adtran_onu/adtran_onu.py
old mode 100644
new mode 100755
index 8103d53..426ddfd
--- a/voltha/adapters/adtran_onu/adtran_onu.py
+++ b/voltha/adapters/adtran_onu/adtran_onu.py
@@ -20,7 +20,7 @@
from uuid import uuid4
from twisted.internet import reactor
-from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, succeed
from voltha.adapters.iadapter import OnuAdapter
from voltha.core.logical_device_agent import mac_str_to_tuple
@@ -47,6 +47,7 @@
class AdtranOnuAdapter(OnuAdapter):
def __init__(self, adapter_agent, config):
+ self.log = structlog.get_logger()
super(AdtranOnuAdapter, self).__init__(adapter_agent=adapter_agent,
config=config,
device_handler_class=AdtranOnuHandler,
@@ -228,6 +229,7 @@
self.adapter_agent = adapter.adapter_agent
self.device_id = device_id
self.logical_device_id = None
+ self.enabled = True
self.log = structlog.get_logger(device_id=device_id)
self.incoming_messages = DeferredQueue(size=_MAX_INCOMING_OMCI_MESSAGES)
self.proxy_address = None
@@ -247,6 +249,14 @@
self._gem_ports = {} # Name -> dict
self._deferred = None
+ def _cancel_deferred(self):
+ d, self._deferred = self._deferred, None
+ try:
+ if d is not None and not d.called:
+ d.cancel()
+ except:
+ pass
+
def receive_message(self, msg):
try:
self.incoming_messages.put(msg)
@@ -260,7 +270,9 @@
# first we verify that we got parent reference and proxy info
assert device.parent_id, 'Invalid Parent ID'
assert device.proxy_address.device_id, 'Invalid Device ID'
- assert device.proxy_address.channel_id, 'invalid Channel ID'
+ # assert device.proxy_address.channel_id, 'invalid Channel ID'
+
+ self._cancel_deferred()
# register for proxied messages right away
self.proxy_address = device.proxy_address
@@ -326,6 +338,10 @@
device = self.adapter_agent.get_device(self.device_id)
+ if control_vlan is not None and device.vlan != control_vlan:
+ device.vlan = control_vlan
+ self.adapter_agent.update_device(device)
+
openflow_port = ofp_port(
port_no=openflow_port_no,
hw_addr=mac_str_to_tuple('08:00:%02x:%02x:%02x:%02x' %
@@ -348,9 +364,6 @@
ofp_port=openflow_port,
device_id=device.id,
device_port_no=self.uni_port.port_no))
- if control_vlan is not None and device.vlan != control_vlan:
- device.vlan = control_vlan
- self.adapter_agent.update_device(device)
def _get_uni_port(self):
ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
@@ -370,7 +383,8 @@
# first we verify that we got parent reference and proxy info
assert device.parent_id
assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
+ # assert device.proxy_address.channel_id
+ self._cancel_deferred()
# register for proxied messages right away
self.proxy_address = device.proxy_address
@@ -379,6 +393,7 @@
# Set the connection status to REACHABLE
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
+ self.enabled = True
# TODO: Verify that the uni, pon and logical ports exists
@@ -458,7 +473,6 @@
except Exception as e:
self.last_response = None
- self.log.info('wait-for-response-exception', exc=str(e))
raise e
@inlineCallbacks
@@ -466,6 +480,9 @@
self.log.info('message-exchange')
self._deferred = None
+ if self.device_id is None or self.incoming_messages is None:
+ returnValue(succeed('deleted'))
+
# reset incoming message queue
while self.incoming_messages.pending:
_ = yield self.incoming_messages.get()
@@ -474,10 +491,18 @@
# Start by getting some useful device information
device = self.adapter_agent.get_device(self.device_id)
- device.oper_status = OperStatus.ACTIVATING
+ # TODO device.oper_status = OperStatus.ACTIVATING
+
+ device.oper_status = OperStatus.ACTIVE
+ device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
- device.connect_status = ConnectStatus.UNREACHABLE
+ if not self.enabled:
+ # Try again later
+ self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT,
+ self.message_exchange)
+ # TODO device.connect_status = ConnectStatus.UNREACHABLE
+
try:
# TODO: Handle tx/wait-for-response timeouts and retry logic.
# May timeout to ONU not fully discovered (can happen in xPON case)
@@ -572,8 +597,7 @@
device.connect_status = ConnectStatus.REACHABLE
except Exception as e:
- self.log.exception('Failed', e=e)
-
+ self.log.debug('Failed', e=e)
# Try again later. May not have been discovered
self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT,
self.message_exchange)
@@ -885,6 +909,7 @@
def reboot(self):
from common.utils.asleep import asleep
self.log.info('rebooting', device_id=self.device_id)
+ self._cancel_deferred()
# Update the operational status to ACTIVATING and connect status to
# UNREACHABLE
@@ -893,6 +918,7 @@
previous_conn_status = device.connect_status
device.oper_status = OperStatus.ACTIVATING
device.connect_status = ConnectStatus.UNREACHABLE
+
self.adapter_agent.update_device(device)
# Sleep 10 secs, simulating a reboot
@@ -915,11 +941,15 @@
:param device: A Voltha.Device object.
:return: Will return result of self test
"""
+ from voltha.protos.voltha_pb2 import SelfTestResponse
self.log.info('self-test-device', device=device.id)
- raise NotImplementedError()
+ # TODO: Support self test?
+ return SelfTestResponse(result=SelfTestResponse.NOT_SUPPORTED)
def disable(self):
self.log.info('disabling', device_id=self.device_id)
+ self.enabled = False
+ self._cancel_deferred()
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
@@ -972,11 +1002,12 @@
try:
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
+ self._cancel_deferred()
# First we verify that we got parent reference and proxy info
assert device.parent_id
assert device.proxy_address.device_id
- assert device.proxy_address.channel_id
+ # assert device.proxy_address.channel_id
# Re-register for proxied messages right away
self.proxy_address = device.proxy_address
@@ -1010,6 +1041,8 @@
device = self.adapter_agent.get_device(device.id)
device.oper_status = OperStatus.ACTIVE
+ self.enabled = True
+
self.adapter_agent.update_device(device)
self.log.info('re-enabled', device_id=device.id)
@@ -1019,12 +1052,22 @@
def delete(self):
self.log.info('deleting', device_id=self.device_id)
# A delete request may be received when an OLT is disabled
+
+ self.enabled = False
+ self._cancel_deferred()
+
# TODO: Need to implement this
# 1) Remove all flows from the device
+
+ # Drop references
+ self.incoming_messages = None
+
self.log.info('deleted', device_id=self.device_id)
- # PON Mgnt APIs #
+ # Drop device ID
+ self.device_id = None
+ # PON Mgnt APIs #
def _get_xpon_collection(self, data):
if isinstance(data, OntaniConfig):
@@ -1091,7 +1134,10 @@
def _decode_openflow_port_and_control_vlan(self, venet_info):
try:
- ofp_port_no = int(venet_info['name'].split('-')[1])
+ # Allow spaces or dashes as separator, select last as
+ # the port number
+
+ ofp_port_no = int(venet_info['name'].replace(' ', '-').split('-')[-1:][0])
cntl_vlan = ofp_port_no
return ofp_port_no, cntl_vlan
@@ -1120,11 +1166,13 @@
raise NotImplementedError('TODO: not yet supported')
- def delete_interface(self, data):
+ def remove_interface(self, data):
"""
Deleete XPON interfaces
:param data: (xpon config info)
"""
+ self.log.info('remove-interface', data=data)
+
name = data.name
interface = data.interface
inst_data = data.data
@@ -1135,7 +1183,7 @@
if item in items:
del items[name]
pass # TODO Do something....
- raise NotImplementedError('TODO: not yet supported')
+ # raise NotImplementedError('TODO: not yet supported')
def create_tcont(self, tcont_data, traffic_descriptor_data):
"""
@@ -1191,7 +1239,7 @@
if tcont is not None:
del self._tconts[tcont_data.name]
pass # Perform any needed operations
- raise NotImplementedError('TODO: Not yet supported')
+ # raise NotImplementedError('TODO: Not yet supported')
def create_gemport(self, data):
"""
@@ -1233,7 +1281,7 @@
#
# TODO: On GEM Port changes, may need to delete ONU Flow(s)
pass # Perform any needed operations
- raise NotImplementedError('TODO: Not yet supported')
+ # raise NotImplementedError('TODO: Not yet supported')
def create_multicast_gemport(self, data):
"""