OLT & ONU xPON support updates and alarm fix
Change-Id: Id2f0457548f3951c62adbd430c075b63e7e00b8d
diff --git a/voltha/adapters/adtran_olt/adapter_alarms.py b/voltha/adapters/adtran_olt/adapter_alarms.py
index 4202c1c..ceb1063 100644
--- a/voltha/adapters/adtran_olt/adapter_alarms.py
+++ b/voltha/adapters/adtran_olt/adapter_alarms.py
@@ -43,6 +43,7 @@
class AdapterAlarms:
def __init__(self, adapter, device):
+ self.log = structlog.get_logger(device_id=device.id)
self.adapter = adapter
self.device_id = device.id
self.lc = None
@@ -50,7 +51,7 @@
def format_id(self, alarm):
return 'voltha.{}.{}.{}'.format(self.adapter.name,
self.device_id,
- alarm),
+ alarm)
def format_description(self, _object, alarm, status):
return '{} Alarm - {} - {}'.format(_object.upper(),
@@ -61,8 +62,9 @@
try:
current_context = {}
- for key, value in context_data.__dict__.items():
- current_context[key] = str(value)
+ if isinstance(context_data, dict):
+ for key, value in context_data.iteritems():
+ current_context[key] = str(value)
alarm_event = self.adapter.adapter_agent.create_alarm(
id=alarm_data.get('id', 'voltha.{}.{}.olt'.format(self.adapter.name,
@@ -81,8 +83,3 @@
except Exception as e:
self.log.exception('failed-to-send-alarm', e=e)
-
-
-
-
-
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index e8c8af6..297bb1b 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -51,12 +51,12 @@
DEFAULT_MULTICAST_VLAN = 4050
_MANAGEMENT_VLAN = 4093
-_DEFAULT_RESTCONF_USERNAME = ""
-_DEFAULT_RESTCONF_PASSWORD = ""
+_DEFAULT_RESTCONF_USERNAME = "ADMIN"
+_DEFAULT_RESTCONF_PASSWORD = "PASSWORD"
_DEFAULT_RESTCONF_PORT = 8081
-_DEFAULT_NETCONF_USERNAME = ""
-_DEFAULT_NETCONF_PASSWORD = ""
+_DEFAULT_NETCONF_USERNAME = "hsvroot"
+_DEFAULT_NETCONF_PASSWORD = "BOSCO"
_DEFAULT_NETCONF_PORT = 830
@@ -669,19 +669,21 @@
# Start/stop the interfaces as needed. These are deferred calls
- try:
- dl = []
- for port in self.northbound_ports.itervalues():
+ dl = []
+ for port in self.northbound_ports.itervalues():
+ try:
dl.append(port.start())
+ except Exception as e:
+ self.log.exception('northbound-port-startup', e=e)
- for port in self.southbound_ports.itervalues():
+ for port in self.southbound_ports.itervalues():
+ try:
dl.append(port.start() if port.admin_state == AdminState.ENABLED else port.stop())
- results = yield defer.gatherResults(dl)
+ except Exception as e:
+ self.log.exception('southbound-port-startup', e=e)
- except Exception as e:
- self.log.exception('port-startup', e=e)
- results = defer.fail(Failure())
+ results = yield defer.gatherResults(dl)
returnValue(results)
@@ -1320,48 +1322,58 @@
def check_pulse(self):
if self.logical_device_id is not None:
- self.heartbeat = self.rest_client.request('GET', self.HELLO_URI, name='hello')
- self.heartbeat.addCallbacks(self.heartbeat_check_status, self.heartbeat_fail)
+ try:
+ self.heartbeat = self.rest_client.request('GET', self.HELLO_URI,
+ name='hello', timeout=5)
+ self.heartbeat.addCallbacks(self._heartbeat_success, self._heartbeat_fail)
- def heartbeat_check_status(self, results):
+ except Exception as e:
+ self.heartbeat = reactor.callLater(5, self._heartbeat_fail, e)
+
+ def heartbeat_check_status(self, _):
"""
Check the number of heartbeat failures against the limit and emit an alarm if needed
"""
device = self.adapter_agent.get_device(self.device_id)
- if self.heartbeat_miss >= self.heartbeat_failed_limit and device.connect_status == ConnectStatus.REACHABLE:
- self.log.warning('olt-heartbeat-failed', count=self.heartbeat_miss)
- device.connect_status = ConnectStatus.UNREACHABLE
- device.oper_status = OperStatus.FAILED
- device.reason = self.heartbeat_last_reason
- self.adapter_agent.update_device(device)
+ try:
+ if self.heartbeat_miss >= self.heartbeat_failed_limit:
+ if device.connect_status == ConnectStatus.REACHABLE:
+ self.log.warning('heartbeat-failed', count=self.heartbeat_miss)
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.FAILED
+ device.reason = self.heartbeat_last_reason
+ self.adapter_agent.update_device(device)
+ self.heartbeat_alarm(False, self.heartbeat_miss)
+ else:
+ # Update device states
+ if device.connect_status != ConnectStatus.REACHABLE:
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self.adapter_agent.update_device(device)
+ self.heartbeat_alarm(True)
- self.heartbeat_alarm(False, self.heartbeat_miss)
- else:
- # Update device states
-
- self.log.info('heartbeat-success')
-
- if device.connect_status != ConnectStatus.REACHABLE:
- device.connect_status = ConnectStatus.REACHABLE
- device.oper_status = OperStatus.ACTIVE
- device.reason = ''
- self.adapter_agent.update_device(device)
-
- self.heartbeat_alarm(True)
-
- self.heartbeat_miss = 0
- self.heartbeat_last_reason = ''
- self.heartbeat_count += 1
+ except Exception as e:
+ self.log.exception('heartbeat-check', e=e)
# Reschedule next heartbeat
if self.logical_device_id is not None:
+ self.heartbeat_count += 1
self.heartbeat = reactor.callLater(self.heartbeat_interval, self.check_pulse)
- def heartbeat_fail(self, failure):
+ def _heartbeat_success(self, results):
+ self.log.debug('heartbeat-success')
+ self.heartbeat_miss = 0
+ self.heartbeat_last_reason = ''
+ self.heartbeat_check_status(results)
+
+ def _heartbeat_fail(self, failure):
self.heartbeat_miss += 1
self.log.info('heartbeat-miss', failure=failure,
- count=self.heartbeat_count, miss=self.heartbeat_miss)
+ count=self.heartbeat_count,
+ miss=self.heartbeat_miss)
+ self.heartbeat_last_reason = 'RESTCONF connectivity error'
self.heartbeat_check_status(None)
def heartbeat_alarm(self, status, heartbeat_misses=0):
@@ -1384,3 +1396,17 @@
return datetime.datetime.strptime(revision, '%Y-%m-%d')
except Exception:
return None
+
+ @staticmethod
+ def _dict_diff(lhs, rhs):
+ """
+ Compare the values of two dictionaries and return the items in 'rhs'
+ that are different than 'lhs. The RHS dictionary keys can be a subset of the
+ LHS dictionary, or the RHS dictionary keys can contain new values.
+
+ :param lhs: (dict) Original dictionary values
+ :param rhs: (dict) New dictionary values to compare to the original (lhs) dict
+ :return: (dict) Dictionary with differences from the RHS dictionary
+ """
+ assert len(lhs.keys()) == len(set(lhs.iterkeys()) & (rhs.iterkeys())), 'Dictionary Keys do not match'
+ return {k: v for k, v in rhs.items() if k not in lhs or lhs[k] != rhs[k]}
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index be75f2e..cccd53c 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -51,7 +51,7 @@
self.descriptor = Adapter(
id=self.name,
vendor='Adtran, Inc.',
- version='0.7',
+ version='0.8',
config=AdapterConfig(log_level=LogLevel.INFO)
)
log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index 4eb2a63..800058f 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -271,6 +271,9 @@
self.num_southbound_ports = len(self.southbound_ports)
+ def pon(self, pon_id):
+ return self.southbound_ports.get(pon_id)
+
def complete_device_specific_activation(self, device, reconciling):
"""
Perform an initial network operation to discover the device hardware
@@ -359,9 +362,7 @@
pon_id, onu_id, msg, is_omci = AdtranZmqClient.decode_packet(message)
if is_omci:
- proxy_address = Device.ProxyAddress(device_id=self.device_id,
- channel_id=self.get_channel_id(pon_id, onu_id),
- onu_id=onu_id)
+ proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
self.adapter_agent.receive_proxied_message(proxy_address, msg)
else:
@@ -489,17 +490,25 @@
msg = str(msg)
if self.zmq_client is not None:
- pon_id = self._channel_id_to_pon_id(proxy_address.channel_id, proxy_address.onu_id)
- onu_id = proxy_address.onu_id
+ pon_id, onu_id = self._proxy_address_to_pon_onu_id(proxy_address)
- data = AdtranZmqClient.encode_omci_message(msg, pon_id, onu_id)
+ pon = self.southbound_ports.get(pon_id)
- try:
- self.zmq_client.send(data)
+ if pon is not None and pon.enabled:
+ onu = pon.onu(onu_id)
- except Exception as e:
- self.log.exception('zmqClient.send', e=e)
- raise
+ if onu is not None and onu.enabled:
+ data = AdtranZmqClient.encode_omci_message(msg, pon_id, onu_id)
+
+ try:
+ self.zmq_client.send(data)
+
+ except Exception as e:
+ self.log.exception('zmqClient-send', pon_id=pon_id, onu_id=onu_id, e=e)
+ else:
+ self.log.debug('onu-invalid-or-disabled', pon_id=pon_id, onu_id=onu_id)
+ else:
+ self.log.debug('pon-invalid-or-disabled', pon_id=pon_id)
@staticmethod
def is_gpon_olt_hw(content):
@@ -518,7 +527,7 @@
from pon_port import PonPort
if ATT_NETWORK:
if FIXED_ONU:
- return onu_id + 2
+ return (onu_id * 120) + 2
return 1 + onu_id + (pon_id * 120)
if FIXED_ONU:
@@ -531,16 +540,33 @@
assert AdtranOltHandler.BASE_ONU_OFFSET > (self.num_northbound_ports + self.num_southbound_ports + 1)
return AdtranOltHandler.BASE_ONU_OFFSET + onu_id
- def _channel_id_to_pon_id(self, channel_id, onu_id):
- from pon_port import PonPort
- if ATT_NETWORK:
- if FIXED_ONU:
- return channel_id - onu_id - 2
- return (channel_id - onu_id - 1) / 120
+ def _pon_onu_id_to_proxy_address(self, pon_id, onu_id):
+ if pon_id in self.southbound_ports:
+ pon = self.southbound_ports[pon_id]
+ onu = pon.onu(onu_id)
+ proxy_address = onu.proxy_address if onu is not None else None
- if FIXED_ONU:
- return channel_id - self._onu_offset(onu_id)
- return (channel_id - self._onu_offset(onu_id)) / PonPort.MAX_ONUS_SUPPORTED
+ else:
+ proxy_address = None
+
+ return proxy_address
+
+ def _proxy_address_to_pon_onu_id(self, proxy_address):
+ """
+ Convert the proxy address to the PON-ID and ONU-ID
+ :param proxy_address: (ProxyAddress)
+ :return: (tuple) pon-id, onu-id
+ """
+ onu_id = proxy_address.onu_id
+
+ if self.autoactivate:
+ # Legacy method
+ pon_id = proxy_address.channel_group_id
+ else:
+ # xPON method
+ pon_id = self._port_number_to_pon_id(proxy_address.channel_id)
+
+ return pon_id, onu_id
def _pon_id_to_port_number(self, pon_id):
return pon_id + 1 + self.num_northbound_ports
@@ -637,6 +663,30 @@
return self._v_enets
return None
+ @property
+ def channel_terminations(self):
+ return self._channel_terminations
+
+ @property
+ def channel_pairs(self):
+ return self._channel_pairs
+
+ @property
+ def channel_partitions(self):
+ return self._channel_partitions
+
+ @property
+ def v_ont_anis(self):
+ return self._v_ont_anis
+
+ @property
+ def v_enets(self):
+ return self._v_enets
+
+ @property
+ def tconts(self):
+ return self._tconts
+
def _data_to_dict(self, data):
name = data.name
interface = data.interface
@@ -707,7 +757,8 @@
'expected-serial-number': inst_data.expected_serial_number,
'preferred-channel-pair': inst_data.preferred_chanpair,
'channel-partition': inst_data.parent_ref,
- 'upstream-channel-speed': inst_data.upstream_channel_speed
+ 'upstream-channel-speed': inst_data.upstream_channel_speed,
+ 'data': data
}
elif isinstance(data, VEnetConfig):
@@ -728,24 +779,21 @@
self.log.debug('create-interface', interface=data.interface, inst_data=data.data)
name = data.name
- interface = data.interface
- inst_data = data.data
-
items = self._get_xpon_collection(data)
if items is not None and name not in items:
self._cached_xpon_pon_info = {} # Clear cached data
item_type, new_item = self._data_to_dict(data)
- self.log.debug('new-item', item_type=item_type, item=new_item)
+ #self.log.debug('new-item', item_type=item_type, item=new_item)
- if name in items:
- raise KeyError("{} '{}' already exists".format(item_type, name))
+ if name not in items:
+ self.log.debug('new-item', item_type=item_type, item=new_item)
- items[name] = new_item
+ items[name] = new_item
- if isinstance(data, ChannelterminationConfig):
- self._on_channel_termination_create(name)
+ if isinstance(data, ChannelterminationConfig):
+ self._on_channel_termination_create(name)
def update_interface(self, data):
"""
@@ -753,9 +801,6 @@
:param data: (xpon config info)
"""
name = data.name
- interface = data.interface
- inst_data = data.data
-
items = self._get_xpon_collection(data)
if items is None:
@@ -768,8 +813,8 @@
item_type, update_item = self._data_to_dict(data)
self.log.debug('update-item', item_type=item_type, item=update_item)
- # TODO: Calculate the difference
- diffs = {}
+ # Calculate the difference
+ diffs = AdtranDeviceHandler._dict_diff(existing_item, update_item)
if len(diffs) == 0:
self.log.debug('update-item-no-diffs')
@@ -778,25 +823,25 @@
# Act on changed items
if isinstance(data, ChannelgroupConfig):
- pass
+ self._on_channel_group_modify(name, items, diffs)
elif isinstance(data, ChannelpartitionConfig):
- pass
+ self._on_channel_partition_modify(name, items, diffs)
elif isinstance(data, ChannelpairConfig):
- pass
+ self._on_channel_pair_modify(name, items, diffs)
elif isinstance(data, ChannelterminationConfig):
- pass
+ self._on_channel_termination_modify(name, items, diffs)
elif isinstance(data, OntaniConfig):
- pass
+ raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, VOntaniConfig):
- pass
+ raise NotImplementedError('TODO: not yet supported')
elif isinstance(data, VEnetConfig):
- pass
+ raise NotImplementedError('TODO: not yet supported')
else:
raise NotImplementedError('Unknown data type')
@@ -819,13 +864,13 @@
del items[name]
if isinstance(data, ChannelgroupConfig):
- pass
+ pass # Rely upon xPON logic to not allow delete of a referenced group
elif isinstance(data, ChannelpartitionConfig):
- pass
+ pass # Rely upon xPON logic to not allow delete of a referenced partition
elif isinstance(data, ChannelpairConfig):
- pass
+ pass # Rely upon xPON logic to not allow delete of a referenced pair
elif isinstance(data, ChannelterminationConfig):
self._on_channel_termination_delete(name)
@@ -844,6 +889,63 @@
raise NotImplementedError('TODO: not yet supported')
+ def _valid_to_modify(self, item_type, valid, diffs):
+ bad_keys = [mod_key not in valid for mod_key in diffs]
+ if len(bad_keys) != 0:
+ self.log.warn("{} modification of '{}' not supported").format(item_type, bad_keys[0])
+ return False
+ return True
+
+ def _get_related_pons(self, item_type):
+
+ if isinstance(item_type, ChannelgroupConfig):
+ return [] # TODO: Implement
+
+ elif isinstance(item_type, ChannelpartitionConfig):
+ return [] # TODO: Implement
+
+ elif isinstance(item_type, ChannelpairConfig):
+ return [] # TODO: Implement
+
+ elif isinstance(item_type, ChannelterminationConfig):
+ return [] # TODO: Implement
+
+ else:
+ return []
+
+ def _on_channel_group_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['polling-period'] # Modify of these keys supported
+
+ if self._valid_to_modify('channel-group', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
+ def _on_channel_partition_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['fec-downstream', 'mcast-aes', 'differential-fiber-distance']
+
+ if self._valid_to_modify('channel-partition', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
+ def _on_channel_pair_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['line-rate'] # Modify of these keys supported
+
+ if self._valid_to_modify('channel-pair', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
def _on_channel_termination_create(self, name, pon_type='xgs-ponid'):
assert name in self._channel_terminations, \
'{} is not a channel-termination'.format(name)
@@ -877,7 +979,6 @@
# mcast_aes = cpart['mcast-aes']
# TODO: Support BER calculation period
- # TODO support FEC, and MCAST AES settings
# TODO Support setting of line rate
pon_port.xpon_name = name
@@ -885,10 +986,24 @@
pon_port.authentication_method = authentication_method
pon_port.deployment_range = deployment_range * 1000 # pon-agent uses meters
pon_port.downstream_fec_enable = downstream_fec
+ # TODO: For now, upstream FEC = downstream
+ pon_port.upstream_fec_enable = downstream_fec
+
# TODO: pon_port.mcast_aes = mcast_aes
pon_port.admin_state = AdminState.ENABLED if enabled else AdminState.DISABLED
+ def _on_channel_termination_modify(self, name, items, diffs):
+ if len(diffs) == 0:
+ return
+
+ valid_keys = ['enabled'] # Modify of these keys supported
+
+ if self._valid_to_modify('channel-termination', valid_keys, diffs.keys()):
+ self.log.info('TODO: Not-Implemented-yet')
+ # for k, v in diffs.items:
+ # items[name][k] = v
+
def _on_channel_termination_delete(self, name, pon_type='xgs-ponid'):
assert name in self._channel_terminations, \
'{} is not a channel-termination'.format(name)
@@ -900,7 +1015,23 @@
if pon_port is None:
raise ValueError('Unknown PON port. PON-ID: {}'.format(pon_id))
- pon_port.enabled = False
+ pon_port.admin_state = AdminState.DISABLED
+
+ def _on_ont_ani_create(self, name):
+ self.log.info('TODO: Not-Implemented-yet')
+ # elif isinstance(data, OntaniConfig):
+ # return 'ont-ani', {
+ # 'name': name,
+ # 'enabled': interface.enabled,
+ # 'upstream-fec': inst_data.upstream_fec_indicator,
+ # 'mgnt-gemport-aes': inst_data.mgnt_gemport_aes_indicator
+ # }
+
+ def _on_ont_ani_delete(self, name):
+ self.log.info('TODO: Not-Implemented-yet')
+
+ def _on_ont_ani_modify(self, name, items, existing, update, diffs):
+ pass
def create_tcont(self, tcont_data, traffic_descriptor_data):
"""
@@ -908,19 +1039,23 @@
:param tcont_data:
:param traffic_descriptor_data:
"""
+ self.log.debug('create-tcont', tcont=tcont_data, td=traffic_descriptor_data)
+
traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
tcont = TCont.create(tcont_data, traffic_descriptor)
- if tcont.name in self._tconts:
- raise KeyError("TCONT '{}' already exists".format(tcont.name))
+ if tcont.name not in self._tconts:
+ self._cached_xpon_pon_info = {} # Clear cached data
+ self._tconts[tcont.name] = tcont
- if traffic_descriptor.name in self._traffic_descriptors:
- raise KeyError("Traffic Descriptor '{}' already exists".format(traffic_descriptor.name))
+ # Update any ONUs referenced
+ tcont.xpon_create(self)
- self._cached_xpon_pon_info = {} # Clear cached data
+ if traffic_descriptor.name not in self._traffic_descriptors:
+ self._traffic_descriptors[traffic_descriptor.name] = traffic_descriptor
- self._tconts[tcont.name] = tcont
- self._traffic_descriptors[traffic_descriptor.name] = traffic_descriptor
+ # Update any ONUs referenced
+ traffic_descriptor.xpon_create(self, tcont)
def update_tcont(self, tcont_data, traffic_descriptor_data):
"""
@@ -928,6 +1063,8 @@
:param tcont_data:
:param traffic_descriptor_data:
"""
+ self.log.debug('update-tcont', tcont=tcont_data, td=traffic_descriptor_data)
+
if tcont_data.name not in self._tconts:
raise KeyError("TCONT '{}' does not exists".format(tcont_data.name))
@@ -940,6 +1077,9 @@
traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
tcont = TCont.create(tcont_data, traffic_descriptor)
#
+ # Update any ONUs referenced
+ # tcont.xpon_update(self)
+ # traffic_descriptor.xpon_update(self, tcont)
pass
raise NotImplementedError('TODO: Not yet supported')
@@ -949,6 +1089,8 @@
:param tcont_data:
:param traffic_descriptor_data:
"""
+ self.log.debug('remove-tcont', tcont=tcont_data, td=traffic_descriptor_data)
+
tcont = self._tconts.get(tcont_data.name)
traffic_descriptor = self._traffic_descriptors.get(traffic_descriptor_data.name)
@@ -963,6 +1105,10 @@
del self._tconts[tcont_data.name]
self._cached_xpon_pon_info = {} # Clear cached data
+
+ # Update any ONUs referenced
+ # tcont.xpon_delete(self)
+
pass # Perform any needed operations
raise NotImplementedError('TODO: Not yet supported')
@@ -971,7 +1117,9 @@
Create GEM Port
:param data:
"""
- gem_port = GemPort.create(data)
+ self.log.debug('create-gemport', gem_port=data)
+
+ gem_port = GemPort.create(data, self)
if gem_port.name in self._gem_ports:
raise KeyError("GEM Port '{}' already exists".format(gem_port.name))
@@ -979,20 +1127,25 @@
self._cached_xpon_pon_info = {} # Clear cached data
self._gem_ports[gem_port.name] = gem_port
- # TODO: On GEM Port changes, may need to add ONU Flow(s)
+ # Update any ONUs referenced
+ gem_port.xpon_create(self)
def update_gemport(self, data):
"""
Update GEM Port
:param data:
"""
+ self.log.debug('update-gemport', gem_port=data)
+
if data.name not in self._gem_ports:
raise KeyError("GEM Port '{}' does not exists".format(data.name))
self._cached_xpon_pon_info = {} # Clear cached data
- gem_port = GemPort.create(data)
+ #gem_port = GemPort.create(data)
#
# TODO: On GEM Port changes, may need to add/delete/modify ONU Flow(s)
+ # Update any ONUs referenced
+ # gem_port.xpon_update(self)
pass
raise NotImplementedError('TODO: Not yet supported')
@@ -1001,6 +1154,8 @@
Delete GEM Port
:param data:
"""
+ self.log.debug('remove-gemport', gem_port=data.name)
+
gem_port = self._gem_ports.get(data.name)
if gem_port is not None:
@@ -1009,6 +1164,8 @@
self._cached_xpon_pon_info = {} # Clear cached data
#
# TODO: On GEM Port changes, may need to delete ONU Flow(s)
+ # Update any ONUs referenced
+ # gem_port.xpon_delete(self)
pass # Perform any needed operations
raise NotImplementedError('TODO: Not yet supported')
@@ -1018,6 +1175,7 @@
:data: multicast gemport data object
:return: None
"""
+ self.log.debug('create-mcast-gemport', gem_port=data)
#
#
#
@@ -1029,6 +1187,7 @@
:data: multicast gemport data object
:return: None
"""
+ self.log.debug('update-mcast-gemport', gem_port=data)
#
#
#
@@ -1040,6 +1199,7 @@
:data: multicast gemport data object
:return: None
"""
+ self.log.debug('delete-mcast-gemport', gem_port=data.name)
#
#
#
diff --git a/voltha/adapters/adtran_olt/codec/olt_config.py b/voltha/adapters/adtran_olt/codec/olt_config.py
index 5997682..88cfb0a 100644
--- a/voltha/adapters/adtran_olt/codec/olt_config.py
+++ b/voltha/adapters/adtran_olt/codec/olt_config.py
@@ -63,10 +63,12 @@
@staticmethod
def decode(pon_list):
pons = {}
- for pon_data in pon_list:
- pon = OltConfig.Pon(pon_data)
- assert pon.pon_id not in pons
- pons[pon.pon_id] = pon
+
+ if pon_list is not None:
+ for pon_data in pon_list:
+ pon = OltConfig.Pon(pon_data)
+ assert pon.pon_id not in pons
+ pons[pon.pon_id] = pon
return pons
@@ -119,10 +121,12 @@
@staticmethod
def decode(onu_list):
onus = {}
- for onu_data in onu_list:
- onu = OltConfig.Pon.Onu(onu_data)
- assert onu.onu_id not in onus
- onus[onu.onu_id] = onu
+
+ if onu_list is not None:
+ for onu_data in onu_list:
+ onu = OltConfig.Pon.Onu(onu_data)
+ assert onu.onu_id not in onus
+ onus[onu.onu_id] = onu
return onus
@@ -186,10 +190,12 @@
@staticmethod
def decode(tcont_container):
tconts = {}
- for tcont_data in tcont_container.get('t-cont', []):
- tcont = OltConfig.Pon.Onu.TCont(tcont_data)
- assert tcont.alloc_id not in tconts
- tconts[tcont.alloc_id] = tcont
+
+ if tcont_container is not None:
+ for tcont_data in tcont_container.get('t-cont', []):
+ tcont = OltConfig.Pon.Onu.TCont(tcont_data)
+ assert tcont.alloc_id not in tconts
+ tconts[tcont.alloc_id] = tcont
return tconts
@@ -286,10 +292,12 @@
@staticmethod
def decode(gem_port_container):
gem_ports = {}
- for gem_port_data in gem_port_container.get('gem-port', []):
- gem_port = OltConfig.Pon.Onu.GemPort(gem_port_data)
- assert gem_port.port_id not in gem_ports
- gem_ports[gem_port.port_id] = gem_port
+
+ if gem_port_container is not None:
+ for gem_port_data in gem_port_container.get('gem-port', []):
+ gem_port = OltConfig.Pon.Onu.GemPort(gem_port_data)
+ assert gem_port.port_id not in gem_ports
+ gem_ports[gem_port.port_id] = gem_port
return gem_ports
diff --git a/voltha/adapters/adtran_olt/flow/evc_map.py b/voltha/adapters/adtran_olt/flow/evc_map.py
index 5186ce4..752e150 100644
--- a/voltha/adapters/adtran_olt/flow/evc_map.py
+++ b/voltha/adapters/adtran_olt/flow/evc_map.py
@@ -153,8 +153,11 @@
@property
def _needs_acl_support(self):
+ if self._ipv4_dst is not None: # In case MCAST downstream has ACL on it
+ return False
+
return self._eth_type is not None or self._ip_protocol is not None or\
- self._ipv4_dst is not None or self._udp_dst is not None or self._udp_src is not None
+ self._udp_dst is not None or self._udp_src is not None
@property
def pon_id(self):
diff --git a/voltha/adapters/adtran_olt/flow/flow_entry.py b/voltha/adapters/adtran_olt/flow/flow_entry.py
index 189c7d0..0fd42f7 100644
--- a/voltha/adapters/adtran_olt/flow/flow_entry.py
+++ b/voltha/adapters/adtran_olt/flow/flow_entry.py
@@ -297,6 +297,9 @@
"""
TODO: This is only while there is only a single downstream exception flow
"""
+ if self.ipv4_dst is not None: # In case MCAST downstream has ACL on it
+ return False
+
return self.eth_type is not None or self.ip_protocol is not None or\
self.ipv4_dst is not None or self.udp_dst is not None or self.udp_src is not None
diff --git a/voltha/adapters/adtran_olt/gem_port.py b/voltha/adapters/adtran_olt/gem_port.py
index d4b744c..70959d4 100644
--- a/voltha/adapters/adtran_olt/gem_port.py
+++ b/voltha/adapters/adtran_olt/gem_port.py
@@ -32,7 +32,8 @@
traffic_class=None,
intf_ref=None,
exception=False, # FIXED_ONU
- name=None):
+ name=None,
+ olt=None):
self.name = name
self.gem_id = gem_id
self._alloc_id = alloc_id
@@ -44,6 +45,7 @@
self._omci_transport = omci_transport
self.multicast = multicast
self.exception = exception # FIXED_ONU
+ self._olt = olt
def __str__(self):
return "GemPort: {}, alloc-id: {}, gem-id: {}".format(self.name,
@@ -51,7 +53,7 @@
self.gem_id)
@staticmethod
- def create(data):
+ def create(data, olt):
assert isinstance(data, GemportsConfigData)
return GemPort(data.gemport_id, None,
@@ -60,15 +62,17 @@
ident=data.id,
name=data.name,
traffic_class=data.traffic_class,
- intf_ref=data.itf_ref) # v_enet
+ intf_ref=data.itf_ref, # v_enet
+ olt=olt)
@property
def alloc_id(self):
- if self._alloc_id is None:
- #
- # TODO: Resolve this (needs to be OLT handler)
- #
- pass
+ if self._alloc_id is None and self._olt is not None:
+ try:
+ self._alloc_id = self._olt.tconts.get(self.tconf_ref).alloc_id
+ except Exception:
+ pass
+
return self._alloc_id
@property
@@ -104,3 +108,35 @@
uri = AdtranOltHandler.GPON_GEM_CONFIG_URI.format(pon_id, onu_id, self.gem_id)
name = 'gem-port-delete-{}-{}: {}'.format(pon_id, onu_id, self.gem_id)
return session.request('DELETE', uri, name=name)
+
+ def _get_onu(self, olt):
+ onu = None
+ try:
+ v_enet = olt.v_enets.get(self.intf_ref)
+ vont_ani = olt.v_ont_anis.get(v_enet['v-ont-ani'])
+ ch_pair = olt.channel_pairs.get(vont_ani['preferred-channel-pair'])
+ ch_term = next((term for term in olt.channel_terminations.itervalues()
+ if term['channel-pair'] == ch_pair['name']), None)
+
+ pon = olt.pon(ch_term['xgs-ponid'])
+ onu = pon.onu(vont_ani['onu-id'])
+
+ except Exception:
+ pass
+
+ return onu
+
+ def xpon_create(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ onu.add_gem_port(self)
+
+ def xpon_update(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ pass # TODO: Not yet supported
+
+ def xpon_delete(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ pass # TODO: Not yet supported
diff --git a/voltha/adapters/adtran_olt/nni_port.py b/voltha/adapters/adtran_olt/nni_port.py
index e3c8a67..bea3fd0 100644
--- a/voltha/adapters/adtran_olt/nni_port.py
+++ b/voltha/adapters/adtran_olt/nni_port.py
@@ -211,6 +211,12 @@
self._oper_status = OperStatus.ACTIVE # TODO: is this correct, how do we tell GRPC
self._update_adapter_agent()
+ # TODO: Start status polling of NNI interfaces
+ self._deferred = None # = reactor.callLater(3, self.do_stuff)
+ self._state = NniPort.State.RUNNING
+ # Begin hardware sync
+ self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
+
try:
results = yield self.set_config('enabled', True)
@@ -219,12 +225,6 @@
self._admin_state = AdminState.UNKNOWN
raise
- # TODO: Start status polling of NNI interfaces
- self._deferred = None # = reactor.callLater(3, self.do_stuff)
- self._state = NniPort.State.RUNNING
- # Begin hardware sync
- self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
-
returnValue(self._deferred)
@inlineCallbacks
@@ -238,22 +238,21 @@
# NOTE: Leave all NNI ports active (may have inband management)
# TODO: Revisit leaving NNI Ports active on disable
- # Flush config cache
self._enabled = None
+ self._state = NniPort.State.STOPPED
self._admin_state = AdminState.DISABLED
self._oper_status = OperStatus.UNKNOWN
self._update_adapter_agent()
try:
- results = yield self.set_config('enabled', False)
+ yield self.set_config('enabled', False)
except Exception as e:
self.log.exception('nni-stop', e=e)
self._admin_state = AdminState.UNKNOWN
raise
- self._state = NniPort.State.STOPPED
returnValue(self._deferred)
def restart(self):
diff --git a/voltha/adapters/adtran_olt/onu.py b/voltha/adapters/adtran_olt/onu.py
index 8d43761..ec021c1 100644
--- a/voltha/adapters/adtran_olt/onu.py
+++ b/voltha/adapters/adtran_olt/onu.py
@@ -73,10 +73,12 @@
assert len(self._uni_ports) == 1, 'Only one UNI port supported at this time'
self._channel_id = onu_info['channel-id']
self._enabled = onu_info['enabled']
+ self._vont_ani = onu_info.get('vont-ani')
self._rssi = -9999
self._equalization_delay = 0
self._fiber_length = 0
self._valid = True # Set false during delete/cleanup
+ self._proxy_address = None
self._include_multicast = True # TODO: May need to add multicast on a per-ONU basis
@@ -128,6 +130,21 @@
return self._name
@property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ if self._enabled != value:
+ self._enabled = value
+ self.set_config('enable', self._enabled)
+
+ if self._enabled:
+ self.start()
+ else:
+ self.stop()
+
+ @property
def onu_vid(self):
return self._onu_vid
@@ -137,6 +154,54 @@
return self._uni_ports[0]
@property
+ def proxy_address(self):
+ if self._proxy_address is None:
+ from voltha.protos.device_pb2 import Device
+
+ device_id = self.olt.device_id
+
+ if self.olt.autoactivate:
+ self._proxy_address = Device.ProxyAddress(device_id=device_id,
+ channel_id=self.onu_vid,
+ channel_group_id=self.pon.pon_id,
+ onu_id=self.onu_id)
+ else:
+ try:
+ v_ont_ani = self._vont_ani
+ voltha_core = self.olt.adapter_agent.core
+ xpon_agent = voltha_core.xpon_agent
+ channel_group_id = xpon_agent.get_channel_group_for_vont_ani(v_ont_ani)
+ parent_chnl_pair_id = xpon_agent.get_port_num(device_id,
+ v_ont_ani.data.preferred_chanpair)
+ self._proxy_address = Device.ProxyAddress(
+ device_id=device_id,
+ channel_group_id=channel_group_id,
+ channel_id=parent_chnl_pair_id,
+ channel_termination=v_ont_ani.data.preferred_chanpair,
+ onu_id=self.onu_id,
+ onu_session_id=self.onu_id)
+ except Exception:
+ pass
+
+ return self._proxy_address
+
+ def _get_v_ont_ani(self, olt):
+ onu = None
+ try:
+ vont_ani = olt.v_ont_anis.get(self.vont_ani)
+ ch_pair = olt.channel_pairs.get(vont_ani['preferred-channel-pair'])
+ ch_term = next((term for term in olt.channel_terminations.itervalues()
+ if term['channel-pair'] == ch_pair['name']), None)
+
+ pon = olt.pon(ch_term['xgs-ponid'])
+ onu = pon.onu(vont_ani['onu-id'])
+
+ except Exception:
+ pass
+
+ return onu
+
+ @property
def channel_id(self):
return self._channel_id
@@ -198,7 +263,7 @@
:param reflow: (boolean) Flag, if True, indicating if this is a reflow ONU
information after an unmanaged OLT hardware reboot
"""
- self.log.debug('create')
+ self.log.debug('create', tconts=tconts, gem_ports=gem_ports, reflow=reflow)
self._cancel_deferred()
data = json.dumps({'onu-id': self._onu_id,
@@ -209,31 +274,34 @@
self._serial_number_base64, self._enabled)
try:
- results = yield self.olt.rest_client.request('POST', uri, data=data, name=name)
+ yield self.olt.rest_client.request('POST', uri, data=data, name=name)
except Exception as e: # TODO: Add breakpoint here during unexpected reboot test
self.log.exception('onu-create', e=e)
raise
# Now set up all tconts & gem-ports
+ first_sync = self._sync_tick
for _, tcont in tconts.items():
try:
- results = yield self.add_tcont(tcont, reflow=reflow)
+ yield self.add_tcont(tcont, reflow=reflow)
except Exception as e:
self.log.exception('add-tcont', tcont=tcont, e=e)
+ first_sync = 2 # Expedite first hw-sync
for _, gem_port in gem_ports.items():
try:
- results = yield self.add_gem_port(gem_port, reflow=reflow)
+ yield self.add_gem_port(gem_port, reflow=reflow)
except Exception as e:
self.log.exception('add-gem-port', gem_port=gem_port, reflow=reflow, e=e)
+ first_sync = 2 # Expedite first hw-sync
- self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
+ self._sync_deferred = reactor.callLater(first_sync, self._sync_hardware)
- returnValue(results)
+ returnValue('created')
@inlineCallbacks
def delete(self):
@@ -274,6 +342,14 @@
returnValue(succeed('deleted'))
+ def start(self):
+ self._cancel_deferred()
+ self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
+ def stop(self):
+ self._cancel_deferred()
+ self._sync_deferred = reactor.callLater(0, self._sync_hardware)
+
def restart(self):
if not self._valid:
return succeed('Deleting')
@@ -283,7 +359,7 @@
def _sync_hardware(self):
from codec.olt_config import OltConfig
-
+ self.log.debug('sync-hardware')
def read_config(results):
self.log.debug('read-config', results=results)
@@ -293,10 +369,10 @@
dl = []
if self._enabled != config.enable:
- dl.append(self._set_config('enable', self._enabled))
+ dl.append(self.set_config('enable', self._enabled))
if self.serial_number != config.serial_number:
- dl.append(self._set_config('serial-number', self.serial_number))
+ dl.append(self.set_config('serial-number', self.serial_number))
# Sync TCONTs if everything else in sync
@@ -426,7 +502,7 @@
def reschedule(_):
import random
- delay = self._sync_tick
+ delay = self._sync_tick if self._enabled else 5 * self._sync_tick
# Speed up sequential resync a limited number of times if out of sync
# With 60 second initial an typical worst case resync of 4 times, this
@@ -443,8 +519,10 @@
self._sync_deferred = reactor.callLater(delay, self._sync_hardware)
self._expedite_sync = False
- pon_enabled = self.pon.enabled
- if not pon_enabled:
+ # If PON is not enabled, skip hw-sync. If ONU not enabled, do it but less
+ # frequently
+
+ if not self.pon.enabled:
return reschedule('not-enabled')
self._sync_deferred = self._get_config()
@@ -486,18 +564,39 @@
if not reflow and tcont.alloc_id in self._tconts:
returnValue(succeed('already created'))
+ self._tconts[tcont.alloc_id] = tcont
+
try:
results = yield tcont.add_to_hardware(self.olt.rest_client,
self._pon_id, self._onu_id)
- self._tconts[tcont.alloc_id] = tcont
except Exception as e:
self.log.exception('tcont', tcont=tcont, reflow=reflow, e=e)
- raise
+ # May occur with xPON provisioning, use hw-resync to recover
+ results = 'resync needed'
returnValue(results)
@inlineCallbacks
+ def update_tcont(self, alloc_id, new_values):
+ # TODO: If alloc-id in use by a gemport, should we deny request?
+ tcont = self._tconts.get(alloc_id)
+
+ if tcont is None:
+ returnValue(succeed('not-found'))
+
+ # del self._tconts[alloc_id]
+ #
+ # try:
+ # results = yield tcont.remove_from_hardware()
+ #
+ # except Exception as e:
+ # self.log.exception('delete', e=e)
+ # raise
+
+ returnValue(succeed('TODO: Not implemented yet'))
+
+ @inlineCallbacks
def remove_tcont(self, alloc_id):
# TODO: If alloc-id in use by a gemport, should we deny request?
tcont = self._tconts.get(alloc_id)
@@ -539,24 +638,27 @@
returnValue(succeed('Deleting'))
if not reflow and gem_port.gem_id in self._gem_ports:
- returnValue(succeed('already created'))
+ returnValue(succeed)
+
+ self._gem_ports[gem_port.gem_id] = gem_port
try:
results = yield gem_port.add_to_hardware(self.olt.rest_client,
self._pon_id,
self.onu_id)
- self._gem_ports[gem_port.gem_id] = gem_port
# May need to update flow tables/evc-maps
if gem_port.alloc_id in self._tconts:
# GEM-IDs are a sorted list (ascending). First gemport handles downstream traffic
- from flow.flow_entry import FlowEntry
- evc_maps = FlowEntry.find_evc_map_flows(self._device_id, self._pon_id, self._onu_id)
+ # from flow.flow_entry import FlowEntry
+ # evc_maps = FlowEntry.find_evc_map_flows(self._device_id, self._pon_id, self._onu_id)
pass # TODO: Start here Tuesday
except Exception as e:
self.log.exception('gem-port', gem_port=gem_port, reflow=reflow, e=e)
- raise
+ # This can happen with xPON if the ONU has been provisioned, but the PON Discovery
+ # has not occurred for the ONU. Rely on hw sync to recover
+ results = 'resync needed'
returnValue(results)
diff --git a/voltha/adapters/adtran_olt/pon_port.py b/voltha/adapters/adtran_olt/pon_port.py
index 0505057..87fb93a 100644
--- a/voltha/adapters/adtran_olt/pon_port.py
+++ b/voltha/adapters/adtran_olt/pon_port.py
@@ -151,6 +151,9 @@
def olt(self):
return self._parent
+ def onu(self, onu_id):
+ return self._onu_by_id.get(onu_id)
+
@property
def state(self):
return self._state
@@ -301,7 +304,6 @@
def start(self):
"""
Start/enable this PON and start ONU discover
- :return: (deferred)
"""
if self._state == PonPort.State.RUNNING:
return succeed('Running')
@@ -421,16 +423,17 @@
self._cancel_deferred()
self._enabled = False
- results = yield self._set_pon_config("enabled", False)
- self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
-
self._admin_state = AdminState.DISABLED
self._oper_status = OperStatus.UNKNOWN
self._update_adapter_agent()
self._state = PonPort.State.STOPPED
+
+ results = yield self._set_pon_config("enabled", False)
+ self._sync_deferred = reactor.callLater(self._sync_tick, self._sync_hardware)
+
self.log.debug('stopped')
- returnValue(results)
+ returnValue(succeed(results))
@inlineCallbacks
def reset(self):
@@ -729,9 +732,6 @@
self._active_los_alarms.add(onu_id)
los_alarm(True, onu_id)
- # TODO: A method to update the AdapterAgent's child device state (operStatus)
- # would be useful here
-
def _process_status_onu_discovered_list(self, discovered_onus):
"""
Look for new ONUs
@@ -770,6 +770,7 @@
channel_speed = 0
tconts = get_tconts(serial_number, onu_id)
gem_ports = get_gem_ports(serial_number, onu_id)
+ vont_ani = None
elif self.activation_method == "autodiscovery":
if self.authentication_method == 'serial-number':
@@ -779,6 +780,7 @@
# TODO: Change iteration to itervalues below
vont_info = next(info for _, info in gpon_info['v-ont-anis'].items()
if info.get('expected-serial-number') == serial_number)
+ vont_ani = vont_info['data']
onu_id = vont_info['onu-id']
enabled = vont_info['enabled']
@@ -792,7 +794,7 @@
if val.tconf_ref in tcont_names}
except StopIteration:
- return None # Can happen if vont-ani has not yet been configured
+ return None # Can happen if vont-ani/serial-number has not yet been configured
else:
return None
else:
@@ -810,7 +812,8 @@
't-conts': tconts,
'gem-ports': gem_ports,
'onu-vid': self.olt.get_channel_id(self._pon_id, onu_id),
- 'channel-id': self.olt.get_channel_id(self._pon_id, onu_id)
+ 'channel-id': self.olt.get_channel_id(self._pon_id, onu_id),
+ 'vont-ani': vont_ani
}
# Hold off ONU activation until at least one GEM Port is defined.
@@ -867,7 +870,9 @@
self.add_mcast_gem_port(gem_port, -id_or_vid)
yield onu.create(tconts, gem_ports, reflow=reflow)
- if not reflow:
+
+ # If autoactivate (demo) mode and not reflow, activate the ONU
+ if self.olt.autoactivate and not reflow:
self.activate_onu(onu)
except Exception as e:
@@ -880,21 +885,18 @@
def activate_onu(self, onu):
"""
Called when a new ONU is discovered and VOLTHA device adapter needs to be informed
- :param onu:
- :return:
+ :param onu:
"""
- # Only call older 'child_device_detected' if not using xPON to configure the system
+ if self.olt.autoactivate:
+ self.log.info('activate-onu', onu=onu)
- if self.activation_method == "autoactivate":
olt = self.olt
adapter = self.adapter_agent
channel_id = onu.onu_vid
- proxy = Device.ProxyAddress(device_id=olt.device_id,
- channel_id=channel_id,
- onu_id=onu.onu_id,
- onu_session_id=onu.onu_id)
+ proxy = onu.proxy_address
+ # NOTE: The following method will be deprecated. Use xPON
adapter.child_device_detected(parent_device_id=olt.device_id,
parent_port_no=self._port_no,
child_device_type=onu.vendor_id,
@@ -936,25 +938,17 @@
if onu is not None:
# Clean up adapter agent of this ONU
- proxy = Device.ProxyAddress(device_id=self.olt.device_id,
- channel_id=onu.channel_id)
- onu_device = self.olt.adapter_agent.get_child_device_with_proxy_address(proxy)
+ proxy = onu.proxy_address
- if onu_device is not None:
- self.olt.adapter_agent.delete_child_device(self.olt.device_id,
- onu_device.device_id)
+ if proxy is not None:
+ onu_device = self.olt.adapter_agent.get_child_device_with_proxy_address(proxy)
+ if onu_device is not None:
+ self.olt.adapter_agent.delete_child_device(self.olt.device_id,
+ onu_device.device_id)
self.olt.adapter_agent.update_child_devices_state(self.olt.device_id,
admin_state=AdminState.DISABLED)
- def delete_child_device(self, parent_device_id, child_device_id):
- onu_device = self.root_proxy.get('/devices/{}'.format(child_device_id))
- if onu_device is not None:
- if onu_device.parent_id == parent_device_id:
- self.log.debug('deleting-child-device', parent_device_id=parent_device_id,
- child_device_id=child_device_id)
- self._remove_node('/devices', child_device_id)
-
def add_mcast_gem_port(self, mcast_gem, vlan):
"""
Add any new Multicast GEM Ports to the PON
diff --git a/voltha/adapters/adtran_olt/tcont.py b/voltha/adapters/adtran_olt/tcont.py
index 29361a3..946946d 100644
--- a/voltha/adapters/adtran_olt/tcont.py
+++ b/voltha/adapters/adtran_olt/tcont.py
@@ -81,6 +81,43 @@
name = 'tcont-delete-{}-{}: {}'.format(pon_id, onu_id, self.alloc_id)
return succeed(session.request('DELETE', uri, name=name))
+ def _get_onu(self, olt):
+ onu = None
+ try:
+ vont_ani = olt.v_ont_anis.get(self.vont_ani)
+ ch_pair = olt.channel_pairs.get(vont_ani['preferred-channel-pair'])
+ ch_term = next((term for term in olt.channel_terminations.itervalues()
+ if term['channel-pair'] == ch_pair['name']), None)
+
+ pon = olt.pon(ch_term['xgs-ponid'])
+ onu = pon.onu(vont_ani['onu-id'])
+
+ except Exception:
+ pass
+
+ return onu
+
+ def xpon_create(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ onu.add_tcont(self)
+
+ def xpon_update(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ pass # TODO: Not yet supported
+
+ def xpon_delete(self, olt):
+ # Look up any associated ONU. May be None if pre-provisioning
+ onu = self._get_onu(olt)
+
+ if onu is not None:
+ onu.remove_tcont(self.alloc_id)
+
class TrafficDescriptor(object):
"""
@@ -194,6 +231,14 @@
returnValue(results)
+ def xpon_create(self, olt, tcont):
+ # Look up any associated ONU. May be None if pre-provisioning
+ pass # TODO
+
+ def xpon_update(self, olt, tcont):
+ # Look up any associated ONU. May be None if pre-provisioning
+ pass # TODO: Not yet supported
+
class BestEffort(object):
def __init__(self, bandwidth, priority, weight):
diff --git a/voltha/adapters/adtran_onu/adtran_onu.py b/voltha/adapters/adtran_onu/adtran_onu.py
index 3e6e794..8103d53 100644
--- a/voltha/adapters/adtran_onu/adtran_onu.py
+++ b/voltha/adapters/adtran_onu/adtran_onu.py
@@ -21,189 +21,40 @@
from uuid import uuid4
from twisted.internet import reactor
from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
-from zope.interface import implementer
-from voltha.adapters.interface import IAdapterInterface
+from voltha.adapters.iadapter import OnuAdapter
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.protos import third_party
-from voltha.protos.adapter_pb2 import Adapter
-from voltha.protos.adapter_pb2 import AdapterConfig
-from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus, \
AdminState
-from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port, Image
+from voltha.protos.device_pb2 import DeviceTypes, Port, Image
from voltha.protos.health_pb2 import HealthStatus
from voltha.protos.logical_device_pb2 import LogicalPort
from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, OFPPF_10GB_FD
-from voltha.protos.openflow_13_pb2 import OFPXMC_OPENFLOW_BASIC, ofp_port
+from voltha.protos.openflow_13_pb2 import ofp_port
from common.frameio.frameio import hexify
from voltha.extensions.omci.omci import *
-from voltha.protos.bbf_fiber_base_pb2 import \
- ChannelgroupConfig, ChannelpartitionConfig, ChannelpairConfig, ChannelterminationConfig, \
- OntaniConfig, VOntaniConfig, VEnetConfig
+from voltha.protos.bbf_fiber_base_pb2 import OntaniConfig, VOntaniConfig, VEnetConfig
+from voltha.adapters.adtran_olt.tcont import TCont, TrafficDescriptor, BestEffort
+from voltha.adapters.adtran_olt.gem_port import GemPort
_ = third_party
-log = structlog.get_logger()
+
+_MAX_INCOMING_OMCI_MESSAGES = 10
+_OMCI_TIMEOUT = 10
+_STARTUP_RETRY_WAIT = 5
-@implementer(IAdapterInterface)
-class AdtranOnuAdapter(object):
- name = 'adtran_onu'
- version = '0.1'
-
- supported_device_types = [
- DeviceType(
- id=name,
- vendor_id='ADTN',
- adapter=name,
- accepts_bulk_flow_update=True
- )
- ]
-
+class AdtranOnuAdapter(OnuAdapter):
def __init__(self, adapter_agent, config):
- self.adapter_agent = adapter_agent
- self.config = config
- self.descriptor = Adapter(
- id=self.name,
- vendor='Adtran, Inc.',
- version=self.version,
- config=AdapterConfig(log_level=LogLevel.INFO)
- )
- self.devices_handlers = dict() # device_id -> AdtranOnuHandler()
-
- def start(self):
- log.debug('starting')
- log.info('started')
-
- def stop(self):
- log.debug('stopping')
- log.info('stopped')
-
- def adapter_descriptor(self):
- return self.descriptor
-
- def device_types(self):
- return DeviceTypes(items=self.supported_device_types)
-
- def health(self):
- return HealthStatus(state=HealthStatus.HealthState.HEALTHY)
-
- def change_master_state(self, master):
- raise NotImplementedError()
-
- def adopt_device(self, device):
- log.info('adopt_device', device_id=device.id)
- self.devices_handlers[device.proxy_address.channel_id] = AdtranOnuHandler(self, device.id)
- reactor.callLater(0, self.devices_handlers[device.proxy_address.channel_id].activate, device)
- return device
-
- def reconcile_device(self, device):
- raise NotImplementedError()
-
- def abandon_device(self, device):
- raise NotImplementedError()
-
- def disable_device(self, device):
- raise NotImplementedError()
-
- def reenable_device(self, device):
- raise NotImplementedError()
-
- def reboot_device(self, device):
- raise NotImplementedError()
-
- def download_image(self, device, request):
- raise NotImplementedError()
-
- def get_image_download_status(self, device, request):
- raise NotImplementedError()
-
- def cancel_image_download(self, device, request):
- raise NotImplementedError()
-
- def activate_image_update(self, device, request):
- raise NotImplementedError()
-
- def revert_image_update(self, device, request):
- raise NotImplementedError()
-
- def self_test_device(self, device):
- raise NotImplementedError()
-
- def delete_device(self, device):
- raise NotImplementedError()
-
- def get_device_details(self, device):
- raise NotImplementedError()
-
- def update_pm_config(self, device, pm_configs):
- raise NotImplementedError()
-
- def update_flows_bulk(self, device, flows, groups):
- log.info('bulk-flow-update', device_id=device.id,
- flows=flows, groups=groups)
- assert len(groups.items) == 0
- handler = self.devices_handlers[device.proxy_address.channel_id]
- return handler.update_flow_table(device, flows.items)
-
- def update_flows_incrementally(self, device, flow_changes, group_changes):
- raise NotImplementedError()
-
- def send_proxied_message(self, proxy_address, msg):
- log.info('send-proxied-message', proxy_address=proxy_address, msg=msg)
-
- def receive_proxied_message(self, proxy_address, msg):
- log.info('receive-proxied-message', proxy_address=proxy_address,
- device_id=proxy_address.device_id, msg=hexify(msg))
- handler = self.devices_handlers[proxy_address.channel_id]
- handler.receive_message(msg)
-
- def receive_packet_out(self, logical_device_id, egress_port_no, msg):
- log.info('packet-out', logical_device_id=logical_device_id,
- egress_port_no=egress_port_no, msg_len=len(msg))
- raise NotImplementedError()
-
- def receive_inter_adapter_message(self, msg):
- log.info('rx_inter_adapter_msg')
- raise NotImplementedError()
-
- def suppress_alarm(self, filter):
- log.info('suppress_alarm', filter=filter)
- raise NotImplementedError()
-
- def unsuppress_alarm(self, filter):
- log.info('unsuppress_alarm', filter=filter)
- raise NotImplementedError()
-
- def receive_onu_detect_state(self, device_id, state):
- """
- Receive onu detect state in ONU adapter
- :param proxy_address: ONU device address
- :param state: ONU detect state (bool)
- :return: None
- """
- raise NotImplementedError()
-
- # PON Mgnt APIs #
- def create_interface(self, device, data):
- """
- API to create various interfaces (only some PON interfaces as of now)
- in the devices
- """
- raise NotImplementedError()
-
- def update_interface(self, device, data):
- """
- API to update various interfaces (only some PON interfaces as of now)
- in the devices
- """
- raise NotImplementedError()
-
- def remove_interface(self, device, data):
- """
- API to delete various interfaces (only some PON interfaces as of now)
- in the devices
- """
- raise NotImplementedError()
+ super(AdtranOnuAdapter, self).__init__(adapter_agent=adapter_agent,
+ config=config,
+ device_handler_class=AdtranOnuHandler,
+ name='adtran_onu',
+ vendor='Adtran, Inc.',
+ version='0.2',
+ device_type='adtran_onu',
+ vendor_id='ADTN')
def create_tcont(self, device, tcont_data, traffic_descriptor_data):
"""
@@ -213,9 +64,12 @@
:traffic_descriptor_data: traffic descriptor data object
:return: None
"""
- log.info('create-tcont', tcont_data=tcont_data,
- traffic_descriptor_data=traffic_descriptor_data)
- raise NotImplementedError()
+ self.log.info('create-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_tcont(tcont_data, traffic_descriptor_data)
def update_tcont(self, device, tcont_data, traffic_descriptor_data):
"""
@@ -225,9 +79,12 @@
:traffic_descriptor_data: traffic descriptor data object
:return: None
"""
- log.info('update-tcont', tcont_data=tcont_data,
- traffic_descriptor_data=traffic_descriptor_data)
- raise NotImplementedError()
+ self.log.info('update-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_tcont(tcont_data, traffic_descriptor_data)
def remove_tcont(self, device, tcont_data, traffic_descriptor_data):
"""
@@ -237,9 +94,12 @@
:traffic_descriptor_data: traffic descriptor data object
:return: None
"""
- log.info('remove-tcont', tcont_data=tcont_data,
- traffic_descriptor_data=traffic_descriptor_data)
- raise NotImplementedError()
+ self.log.info('remove-tcont', tcont_data=tcont_data,
+ traffic_descriptor_data=traffic_descriptor_data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_tcont(tcont_data, traffic_descriptor_data)
def create_gemport(self, device, data):
"""
@@ -248,8 +108,11 @@
:data: gemport data object
:return: None
"""
- log.info('create-gemport', data=data)
- raise NotImplementedError()
+ self.log.info('create-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_gemport(data)
def update_gemport(self, device, data):
"""
@@ -258,8 +121,11 @@
:data: gemport data object
:return: None
"""
- log.info('update-gemport', data=data)
- raise NotImplementedError()
+ self.log.info('update-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_gemport(data)
def remove_gemport(self, device, data):
"""
@@ -268,26 +134,92 @@
:data: gemport data object
:return: None
"""
- log.info('remove-gemport', data=data)
- raise NotImplementedError()
+ self.log.info('remove-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_gemport(data)
def create_multicast_gemport(self, device, data):
- raise NotImplementedError()
+ """
+ API to create multicast gemport object in the devices
+ :param device: device id
+ :data: multicast gemport data object
+ :return: None
+ """
+ self.log.info('create-mcast-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_gemport(data)
def update_multicast_gemport(self, device, data):
- raise NotImplementedError()
+ """
+ API to update multicast gemport object in the devices
+ :param device: device id
+ :data: multicast gemport data object
+ :return: None
+ """
+ self.log.info('update-mcast-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.update_multicast_gemport(data)
def remove_multicast_gemport(self, device, data):
- raise NotImplementedError()
+ """
+ API to delete multicast gemport object in the devices
+ :param device: device id
+ :data: multicast gemport data object
+ :return: None
+ """
+ self.log.info('remove-mcast-gemport', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.remove_multicast_gemport(data)
def create_multicast_distribution_set(self, device, data):
- raise NotImplementedError()
+ """
+ API to create multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :param device: device id
+ :data: multicast distribution data object
+ :return: None
+ """
+ self.log.info('create-mcast-distribution-set', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_distribution_set(data)
def update_multicast_distribution_set(self, device, data):
- raise NotImplementedError()
+ """
+ API to update multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :param device: device id
+ :data: multicast distribution data object
+ :return: None
+ """
+ self.log.info('update-mcast-distribution-set', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_distribution_set(data)
def remove_multicast_distribution_set(self, device, data):
- raise NotImplementedError()
+ """
+ API to delete multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :param device: device id
+ :data: multicast distribution data object
+ :return: None
+ """
+ self.log.info('remove-mcast-distribution-set', data=data)
+ if device.id in self.devices_handlers:
+ handler = self.devices_handlers[device.id]
+ if handler is not None:
+ handler.create_multicast_distribution_set(data)
class AdtranOnuHandler(object):
@@ -295,21 +227,40 @@
self.adapter = adapter
self.adapter_agent = adapter.adapter_agent
self.device_id = device_id
+ self.logical_device_id = None
self.log = structlog.get_logger(device_id=device_id)
- self.incoming_messages = DeferredQueue()
+ self.incoming_messages = DeferredQueue(size=_MAX_INCOMING_OMCI_MESSAGES)
self.proxy_address = None
self.tx_id = 0
self.last_response = None
+ self.ofp_port_no = None
+ self.control_vlan = None
+ # reference of uni_port is required when re-enabling the device if
+ # it was disabled previously
+ self.uni_port = None
+ self.pon_port = None
+ self._v_ont_anis = {} # Name -> dict
+ self._ont_anis = {} # Name -> dict
+ self._v_enets = {} # Name -> dict
+ self._tconts = {} # Name -> dict
+ self._traffic_descriptors = {} # Name -> dict
+ self._gem_ports = {} # Name -> dict
+ self._deferred = None
def receive_message(self, msg):
- self.incoming_messages.put(msg)
+ try:
+ self.incoming_messages.put(msg)
+
+ except Exception as e:
+ self.log.exception('rx-msg', e=e)
def activate(self, device):
self.log.info('activating')
# first we verify that we got parent reference and proxy info
- assert device.parent_id
- assert device.proxy_address.device_id
+ assert device.parent_id, 'Invalid Parent ID'
+ assert device.proxy_address.device_id, 'Invalid Device ID'
+ assert device.proxy_address.channel_id, 'invalid Channel ID'
# register for proxied messages right away
self.proxy_address = device.proxy_address
@@ -319,74 +270,126 @@
device.root = True
device.vendor = 'Adtran Inc.'
device.model = '10G GPON ONU' # TODO: get actual number
+ device.model = '10G GPON ONU' # TODO: get actual number
device.hardware_version = 'NOT AVAILABLE'
device.firmware_version = 'NOT AVAILABLE'
+
# TODO: Support more versions as needed
images = Image(version='NOT AVAILABLE')
device.images.image.extend([images])
- device.serial_number = uuid4().hex
- device.connect_status = ConnectStatus.REACHABLE
+ device.connect_status = ConnectStatus.UNKNOWN
self.adapter_agent.update_device(device)
# register physical ports
- nni_port = Port(port_no=1,
- label='PON port',
- type=Port.PON_ONU,
- admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE,
- peers=[Port.PeerPort(device_id=device.parent_id,
- port_no=device.parent_port_no)])
+ self.pon_port = Port(port_no=1,
+ label='PON port',
+ type=Port.PON_ONU,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE,
+ peers=[Port.PeerPort(device_id=device.parent_id,
+ port_no=device.parent_port_no)])
- self.adapter_agent.add_port(device.id, nni_port)
+ self.uni_port = Port(port_no=2,
+ label='Ethernet port',
+ type=Port.ETHERNET_UNI,
+ admin_state=AdminState.ENABLED,
+ oper_status=OperStatus.ACTIVE)
- uni_port = Port(port_no=2,
- label='Ethernet port',
- type=Port.ETHERNET_UNI,
- admin_state=AdminState.ENABLED,
- oper_status=OperStatus.ACTIVE)
-
- self.adapter_agent.add_port(device.id, uni_port)
+ self.adapter_agent.add_port(device.id, self.uni_port)
+ self.adapter_agent.add_port(device.id, self.pon_port)
# add uni port to logical device
parent_device = self.adapter_agent.get_device(device.parent_id)
- logical_device_id = parent_device.parent_id
- assert logical_device_id
+ self.logical_device_id = parent_device.parent_id
+ assert self.logical_device_id, 'Invalid logical device ID'
- port_no = device.proxy_address.channel_id
+ if device.vlan:
+ # vlan non-zero if created via legacy method (not xPON). Also
+ # Set a random serial number since not xPON based
- log.info('ONU OPENFLOW PORT WILL BE {}'.format(port_no))
-
- cap = OFPPF_10GB_FD | OFPPF_FIBER
- self.adapter_agent.add_logical_port(logical_device_id, LogicalPort(
- id='uni-{}'.format(port_no),
- ofp_port=ofp_port(
- port_no=port_no,
- hw_addr=mac_str_to_tuple('08:00:%02x:%02x:%02x:%02x' %
- ((device.parent_port_no >> 8 & 0xff),
- device.parent_port_no & 0xff,
- (port_no >> 8) & 0xff,
- port_no & 0xff)),
- name='uni-{}'.format(port_no),
- config=0,
- state=OFPPS_LIVE,
- curr=cap,
- advertised=cap,
- peer=cap,
- curr_speed=OFPPF_10GB_FD,
- max_speed=OFPPF_10GB_FD
- ),
- device_id=device.id,
- device_port_no=uni_port.port_no
- ))
+ device.serial_number = uuid4().hex
+ self._add_logical_port(device.vlan, control_vlan=device.vlan)
# Begin ONU Activation sequence
- reactor.callLater(0, self.message_exchange)
+ self._deferred = reactor.callLater(0, self.message_exchange)
+ self.adapter_agent.update_device(device)
+
+ def _add_logical_port(self, openflow_port_no, control_vlan=None,
+ capabilities=OFPPF_10GB_FD | OFPPF_FIBER,
+ speed=OFPPF_10GB_FD):
+
+ if self.ofp_port_no is None:
+ self.ofp_port_no = openflow_port_no
+ self.control_vlan = control_vlan
+
+ device = self.adapter_agent.get_device(self.device_id)
+
+ openflow_port = ofp_port(
+ port_no=openflow_port_no,
+ hw_addr=mac_str_to_tuple('08:00:%02x:%02x:%02x:%02x' %
+ ((device.parent_port_no >> 8 & 0xff),
+ device.parent_port_no & 0xff,
+ (openflow_port_no >> 8) & 0xff,
+ openflow_port_no & 0xff)),
+ name='uni-{}'.format(openflow_port_no),
+ config=0,
+ state=OFPPS_LIVE,
+ curr=capabilities,
+ advertised=capabilities,
+ peer=capabilities,
+ curr_speed=speed,
+ max_speed=speed
+ )
+ self.adapter_agent.add_logical_port(self.logical_device_id,
+ LogicalPort(
+ id='uni-{}'.format(openflow_port),
+ ofp_port=openflow_port,
+ device_id=device.id,
+ device_port_no=self.uni_port.port_no))
+ if control_vlan is not None and device.vlan != control_vlan:
+ device.vlan = control_vlan
+ self.adapter_agent.update_device(device)
+
+ def _get_uni_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def _get_pon_port(self):
+ ports = self.adapter_agent.get_ports(self.device_id, Port.PON_ONU)
+ if ports:
+ # For now, we use on one uni port
+ return ports[0]
+
+ def reconcile(self, device):
+ self.log.info('reconciling-ONU-device-starts')
+
+ # first we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(device.proxy_address)
+
+ # Set the connection status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # TODO: Verify that the uni, pon and logical ports exists
+
+ # Mark the device as REACHABLE and ACTIVE
device = self.adapter_agent.get_device(device.id)
+ device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ self.log.info('reconciling-ONU-device-ends')
+
@inlineCallbacks
def update_flow_table(self, device, flows):
import voltha.core.flow_decomposer as fd
@@ -396,7 +399,7 @@
# We need to proxy through the OLT to get to the ONU
# Configuration from here should be using OMCI
#
- log.info('update_flow_table', device_id=device.id, flows=flows)
+ self.log.info('update_flow_table', device_id=device.id, flows=flows)
for flow in flows:
# TODO: Do we get duplicates here (ie all flows re-pushed on each individual flow add?)
@@ -409,7 +412,7 @@
self.log.debug('Found OFB field', field=field)
for action in fd.get_actions(flow):
- log.debug('Found Action', action=action)
+ self.log.debug('Found Action', action=action)
raise NotImplementedError()
@@ -428,24 +431,41 @@
@inlineCallbacks
def wait_for_response(self):
- log.info('wait-for-response')
+ self.log.info('wait-for-response') # TODO: Add timeout
+
+ def add_watchdog(deferred, timeout=_OMCI_TIMEOUT):
+ from twisted.internet import defer
+
+ def callback(value):
+ if not watchdog.called:
+ watchdog.cancel()
+ return value
+
+ deferred.addBoth(callback)
+
+ from twisted.internet import reactor
+ watchdog = reactor.callLater(timeout, defer.timeout, deferred)
+ return deferred
+
try:
- response = yield self.incoming_messages.get()
- log.info('got-response')
+ response = yield add_watchdog(self.incoming_messages.get())
+
+ self.log.info('got-response')
resp = OmciFrame(response)
resp.show()
#returnValue(resp)
self.last_response = resp
except Exception as e:
+ self.last_response = None
self.log.info('wait-for-response-exception', exc=str(e))
raise e
- #returnValue(None)
- self.last_response = None
@inlineCallbacks
def message_exchange(self):
- log.info('message_exchange')
+ self.log.info('message-exchange')
+ self._deferred = None
+
# reset incoming message queue
while self.incoming_messages.pending:
_ = yield self.incoming_messages.get()
@@ -454,9 +474,14 @@
# Start by getting some useful device information
device = self.adapter_agent.get_device(self.device_id)
+ device.oper_status = OperStatus.ACTIVATING
+ self.adapter_agent.update_device(device)
+ device.connect_status = ConnectStatus.UNREACHABLE
try:
- #pass
+ # TODO: Handle tx/wait-for-response timeouts and retry logic.
+ # May timeout to ONU not fully discovered (can happen in xPON case)
+ # or other errors.
# Decode fields in response and update device info
self.send_get_OntG('vendor_id')
@@ -466,30 +491,31 @@
data = omci_response.getfieldval("data")
device.vendor = data["vendor_id"]
+ # Mark as reachable if at least first message gets through
+ device.connect_status = ConnectStatus.REACHABLE
- self.send_get_cardHolder('actual_plugin_unit_type',257)
+ self.send_get_cardHolder('actual_plugin_unit_type', 257)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
data = omci_response.getfieldval("data")
device.type = str(data["actual_plugin_unit_type"])
- self.send_get_circuit_pack('number_of_ports',257)
+ self.send_get_circuit_pack('number_of_ports', 257)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
data = omci_response.getfieldval("data")
device.type = str(data["number_of_ports"])
-
- self.send_get_IpHostConfigData('mac_address',515)
+ self.send_get_IpHostConfigData('mac_address', 515)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
data = omci_response.getfieldval("data")
device.mac_address = str(data["mac_address"])
- self.send_get_Ont2G('equipment_id',0)
+ self.send_get_Ont2G('equipment_id', 0)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
@@ -498,7 +524,7 @@
eqptId = eqptId_bootVersion[0:10]
bootVersion = eqptId_bootVersion[12:20]
- self.send_get_Ont2G('omcc_version',0)
+ self.send_get_Ont2G('omcc_version', 0)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
@@ -506,8 +532,7 @@
#decimal version
omciVersion = str(data["omcc_version"])
-
- self.send_get_Ont2G('vendor_product_code',0)
+ self.send_get_Ont2G('vendor_product_code', 0)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
@@ -515,7 +540,7 @@
#decimal value
vedorProductCode = str(data["vendor_product_code"])
- self.send_get_OntG('version',0)
+ self.send_get_OntG('version', 0)
yield self.wait_for_response()
response = self.last_response
omci_response = response.getfieldval("omci_message")
@@ -534,7 +559,6 @@
yield self.wait_for_response()
response = self.last_response
-
# device.model = '10G GPON ONU' # TODO: get actual number
# device.hardware_version = 'TODO: to be filled'
# device.firmware_version = 'TODO: to be filled'
@@ -544,21 +568,26 @@
# device.images.image.extend([images])
# self.adapter_agent.update_device(device)
- except Exception as e:
+ device.oper_status = OperStatus.ACTIVE
+ device.connect_status = ConnectStatus.REACHABLE
- log.exception('Failed', e=e)
+ except Exception as e:
+ self.log.exception('Failed', e=e)
+
+ # Try again later. May not have been discovered
+ self._deferred = reactor.callLater(_STARTUP_RETRY_WAIT,
+ self.message_exchange)
####################################################
- log.info('*************** ONU IS ACTIVATED ****************')
+ self.log.info('onu-activated')
# self.send_get_circuit_pack()
# yield self.wait_for_response()
-
- pass
+ self.adapter_agent.update_device(device)
def send_mib_reset(self, entity_id=0):
- log.info('send_mib_reset')
+ self.log.info('send_mib_reset')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciMibReset.message_id,
@@ -730,7 +759,7 @@
self.send_omci_message(frame)
def send_get_circuit_pack(self, attribute, entity_id=0):
- log.info('send_get_circuit_pack: entry')
+ self.log.info('send_get_circuit_pack: entry')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciGet.message_id,
@@ -742,7 +771,6 @@
)
self.send_omci_message(frame)
-
def send_get_device_info(self, attribute, entity_id=0):
frame = OmciFrame(
transaction_id=self.get_tx_id(),
@@ -756,7 +784,7 @@
self.send_omci_message(frame)
def send_get_OntG(self, attribute, entity_id=0):
- log.info('send_get_OntG: entry')
+ self.log.info('send_get_OntG: entry')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciGet.message_id,
@@ -769,7 +797,7 @@
self.send_omci_message(frame)
# def send_get_OntG(self, entity_id=0):
- # log.info('send_get_OntG: entry')
+ # self.log.info('send_get_OntG: entry')
# frame = OmciFrame(
# transaction_id=self.get_tx_id(),
# message_type=OmciGet.message_id,
@@ -779,13 +807,12 @@
# attributes_mask=OntG.mask_for('vendor_id')
# )
# )
- # log.info('send_get_OntG: sending')
+ # self.log.info('send_get_OntG: sending')
# self.send_omci_message(frame)
- # log.info('send_get_OntG: sent')
-
+ # self.log.info('send_get_OntG: sent')
def send_get_Ont2G(self, attribute, entity_id=0):
- log.info('send_get_Ont2G: entry')
+ self.log.info('send_get_Ont2G: entry')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciGet.message_id,
@@ -799,7 +826,7 @@
self.send_omci_message(frame)
def send_get_cardHolder(self, attribute, entity_id=0):
- log.info('send_get_cardHolder: entry')
+ self.log.info('send_get_cardHolder: entry')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciGet.message_id,
@@ -812,7 +839,7 @@
self.send_omci_message(frame)
def send_set_adminState(self,entity_id):
- log.info('send_set_AdminState: entry')
+ self.log.info('send_set_AdminState: entry')
data = dict(
administrative_state=0
)
@@ -829,7 +856,7 @@
self.send_omci_message(frame)
def send_get_IpHostConfigData(self, attribute, entity_id=0):
- log.info('send_get_IpHostConfigData: entry')
+ self.log.info('send_get_IpHostConfigData: entry')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciGet.message_id,
@@ -842,7 +869,7 @@
self.send_omci_message(frame)
def send_get_SoftwareImage(self, attribute, entity_id=0):
- log.info('send_get_SoftwareImage: entry')
+ self.log.info('send_get_SoftwareImage: entry')
frame = OmciFrame(
transaction_id=self.get_tx_id(),
message_type=OmciGet.message_id,
@@ -854,7 +881,160 @@
)
self.send_omci_message(frame)
+ @inlineCallbacks
+ def reboot(self):
+ from common.utils.asleep import asleep
+ self.log.info('rebooting', device_id=self.device_id)
+
+ # Update the operational status to ACTIVATING and connect status to
+ # UNREACHABLE
+ device = self.adapter_agent.get_device(self.device_id)
+ previous_oper_status = device.oper_status
+ previous_conn_status = device.connect_status
+ device.oper_status = OperStatus.ACTIVATING
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Sleep 10 secs, simulating a reboot
+ # TODO: send alert and clear alert after the reboot
+ yield asleep(10) # TODO: Need to reboot for real
+
+ # Change the operational status back to its previous state. With a
+ # real OLT the operational state should be the state the device is
+ # after a reboot.
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+ device.oper_status = previous_oper_status
+ device.connect_status = previous_conn_status
+ self.adapter_agent.update_device(device)
+ self.log.info('rebooted', device_id=self.device_id)
+
+ def self_test_device(self, device):
+ """
+ This is called to Self a device based on a NBI call.
+ :param device: A Voltha.Device object.
+ :return: Will return result of self test
+ """
+ self.log.info('self-test-device', device=device.id)
+ raise NotImplementedError()
+
+ def disable(self):
+ self.log.info('disabling', device_id=self.device_id)
+
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # Disable all ports on that device
+ self.adapter_agent.disable_all_ports(self.device_id)
+
+ # Update the device operational status to UNKNOWN
+ device.oper_status = OperStatus.UNKNOWN
+ device.connect_status = ConnectStatus.UNREACHABLE
+ self.adapter_agent.update_device(device)
+
+ # Remove the uni logical port from the OLT, if still present
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ assert parent_device
+ logical_device_id = parent_device.parent_id
+ assert logical_device_id
+ port_no, self.ofp_port_no = self.ofp_port_no, None
+ port_id = 'uni-{}'.format(port_no)
+
+ try:
+ port = self.adapter_agent.get_logical_port(logical_device_id,
+ port_id)
+ self.adapter_agent.delete_logical_port(logical_device_id, port)
+ except KeyError:
+ self.log.info('logical-port-not-found', device_id=self.device_id,
+ portid=port_id)
+
+ # Remove pon port from parent
+ self.pon_port = self._get_pon_port()
+ self.adapter_agent.delete_port_reference_from_parent(self.device_id,
+ self.pon_port)
+
+ # Just updating the port status may be an option as well
+ # port.ofp_port.config = OFPPC_NO_RECV
+ # yield self.adapter_agent.update_logical_port(logical_device_id,
+ # port)
+ # Unregister for proxied message
+ self.adapter_agent.unregister_for_proxied_messages(
+ device.proxy_address)
+
+ # TODO:
+ # 1) Remove all flows from the device
+ # 2) Remove the device from ponsim
+
+ self.log.info('disabled', device_id=device.id)
+
+ def reenable(self):
+ self.log.info('re-enabling', device_id=self.device_id)
+ try:
+ # Get the latest device reference
+ device = self.adapter_agent.get_device(self.device_id)
+
+ # First we verify that we got parent reference and proxy info
+ assert device.parent_id
+ assert device.proxy_address.device_id
+ assert device.proxy_address.channel_id
+
+ # Re-register for proxied messages right away
+ self.proxy_address = device.proxy_address
+ self.adapter_agent.register_for_proxied_messages(
+ device.proxy_address)
+
+ # Re-enable the ports on that device
+ self.adapter_agent.enable_all_ports(self.device_id)
+
+ # Refresh the port reference
+ self.uni_port = self._get_uni_port()
+ self.pon_port = self._get_pon_port()
+
+ # Add the pon port reference to the parent
+ self.adapter_agent.add_port_reference_to_parent(device.id,
+ self.pon_port)
+
+ # Update the connect status to REACHABLE
+ device.connect_status = ConnectStatus.REACHABLE
+ self.adapter_agent.update_device(device)
+
+ # re-add uni port to logical device
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ self.logical_device_id = parent_device.parent_id
+ assert self.logical_device_id, 'Invalid logical device ID'
+
+ if device.vlan:
+ # vlan non-zero if created via legacy method (not xPON)
+ self._add_logical_port(device.vlan, device.vlan,
+ control_vlan=device.vlan)
+
+ device = self.adapter_agent.get_device(device.id)
+ device.oper_status = OperStatus.ACTIVE
+ self.adapter_agent.update_device(device)
+
+ self.log.info('re-enabled', device_id=device.id)
+ except Exception, e:
+ self.log.exception('error-reenabling', e=e)
+
+ def delete(self):
+ self.log.info('deleting', device_id=self.device_id)
+ # A delete request may be received when an OLT is disabled
+ # TODO: Need to implement this
+ # 1) Remove all flows from the device
+ self.log.info('deleted', device_id=self.device_id)
+
# PON Mgnt APIs #
+
+
+ def _get_xpon_collection(self, data):
+ if isinstance(data, OntaniConfig):
+ return self._ont_anis
+ elif isinstance(data, VOntaniConfig):
+ return self._v_ont_anis
+ elif isinstance(data, VEnetConfig):
+ return self._v_enets
+ return None
+
def create_interface(self, data):
"""
Create XPON interfaces
@@ -864,50 +1044,98 @@
interface = data.interface
inst_data = data.data
- if isinstance(data, ChannelgroupConfig):
- self.log.debug('create_interface-channel-group', interface=interface, data=inst_data)
- pass
+ items = self._get_xpon_collection(data)
+ if items is None:
+ raise NotImplemented('xPON {} is not implemented'.
+ format(type(data)))
- elif isinstance(data, ChannelpartitionConfig):
- self.log.debug('create_interface-channel-partition', interface=interface, data=inst_data)
- pass
-
- elif isinstance(data, ChannelpairConfig):
- self.log.debug('create_interface-channel-pair', interface=interface, data=inst_data)
- pass
-
- elif isinstance(data, ChannelterminationConfig):
- self.log.debug('create_interface-channel-termination', interface=interface, data=inst_data)
- pass
-
- elif isinstance(data, OntaniConfig):
+ if isinstance(data, OntaniConfig):
self.log.debug('create_interface-ont-ani', interface=interface, data=inst_data)
- pass
+
+ if name not in items:
+ items[name] = {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'upstream-fec': inst_data.upstream_fec_indicator,
+ 'mgnt-gemport-aes': inst_data.mgnt_gemport_aes_indicator
+ }
elif isinstance(data, VOntaniConfig):
self.log.debug('create_interface-v-ont-ani', interface=interface, data=inst_data)
- pass
+
+ if name not in items:
+ items[name] = {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'onu-id': inst_data.onu_id,
+ 'expected-serial-number': inst_data.expected_serial_number,
+ 'preferred-channel-pair': inst_data.preferred_chanpair,
+ 'channel-partition': inst_data.parent_ref,
+ 'upstream-channel-speed': inst_data.upstream_channel_speed
+ }
elif isinstance(data, VEnetConfig):
self.log.debug('create_interface-v-enet', interface=interface, data=inst_data)
- pass
+
+ if name not in items:
+ items[name] = {
+ 'name': name,
+ 'enabled': interface.enabled,
+ 'v-ont-ani': inst_data.v_ontani_ref
+ }
+ ofp_port_no, cntl_vlan = self._decode_openflow_port_and_control_vlan(items[name])
+ self._add_logical_port(ofp_port_no, control_vlan=cntl_vlan)
else:
raise NotImplementedError('Unknown data type')
+ def _decode_openflow_port_and_control_vlan(self, venet_info):
+ try:
+ ofp_port_no = int(venet_info['name'].split('-')[1])
+ cntl_vlan = ofp_port_no
+
+ return ofp_port_no, cntl_vlan
+
+ except ValueError:
+ self.log.error('invalid-uni-port-name', name=venet_info['name'])
+ except KeyError:
+ self.log.error('invalid-venet-data', data=venet_info)
+
def update_interface(self, data):
"""
Update XPON interfaces
:param data: (xpon config info)
"""
- pass
+ name = data.name
+ interface = data.interface
+ inst_data = data.data
+
+ items = self._get_xpon_collection(data)
+
+ if items is None:
+ raise ValueError('Unknown data type: {}'.format(type(data)))
+
+ if name not in items:
+ raise KeyError("'{}' not found. Type: {}".format(name, type(data)))
+
+ raise NotImplementedError('TODO: not yet supported')
def delete_interface(self, data):
"""
Deleete XPON interfaces
:param data: (xpon config info)
"""
- pass
+ name = data.name
+ interface = data.interface
+ inst_data = data.data
+
+ items = self._get_xpon_collection(data)
+ item = items.get(name)
+
+ if item in items:
+ del items[name]
+ pass # TODO Do something....
+ raise NotImplementedError('TODO: not yet supported')
def create_tcont(self, tcont_data, traffic_descriptor_data):
"""
@@ -915,7 +1143,17 @@
:param tcont_data:
:param traffic_descriptor_data:
"""
- pass
+ traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
+ tcont = TCont.create(tcont_data, traffic_descriptor)
+
+ if tcont.name in self._tconts:
+ raise KeyError("TCONT '{}' already exists".format(tcont.name))
+
+ if traffic_descriptor.name in self._traffic_descriptors:
+ raise KeyError("Traffic Descriptor '{}' already exists".format(traffic_descriptor.name))
+
+ self._tconts[tcont.name] = tcont
+ self._traffic_descriptors[traffic_descriptor.name] = traffic_descriptor
def update_tcont(self, tcont_data, traffic_descriptor_data):
"""
@@ -923,7 +1161,18 @@
:param tcont_data:
:param traffic_descriptor_data:
"""
+ if tcont_data.name not in self._tconts:
+ raise KeyError("TCONT '{}' does not exists".format(tcont_data.name))
+
+ if traffic_descriptor_data.name not in self._traffic_descriptors:
+ raise KeyError("Traffic Descriptor '{}' does not exists".
+ format(traffic_descriptor_data.name))
+
+ traffic_descriptor = TrafficDescriptor.create(traffic_descriptor_data)
+ tcont = TCont.create(tcont_data, traffic_descriptor)
+ #
pass
+ raise NotImplementedError('TODO: Not yet supported')
def remove_tcont(self, tcont_data, traffic_descriptor_data):
"""
@@ -931,25 +1180,108 @@
:param tcont_data:
:param traffic_descriptor_data:
"""
- pass
+ tcont = self._tconts.get(tcont_data.name)
+ traffic_descriptor = self._traffic_descriptors.get(traffic_descriptor_data.name)
+
+ if traffic_descriptor is not None:
+ del self._traffic_descriptors[traffic_descriptor_data.name]
+ pass # Perform any needed operations
+ # raise NotImplementedError('TODO: Not yet supported')
+
+ if tcont is not None:
+ del self._tconts[tcont_data.name]
+ pass # Perform any needed operations
+ raise NotImplementedError('TODO: Not yet supported')
def create_gemport(self, data):
"""
Create GEM Port
:param data:
"""
- pass
+ gem_port = GemPort.create(data)
+
+ if gem_port.name in self._gem_ports:
+ raise KeyError("GEM Port '{}' already exists".format(gem_port.name))
+
+ self._gem_ports[gem_port.name] = gem_port
+
+ # TODO: On GEM Port changes, may need to add ONU Flow(s)
def update_gemport(self, data):
"""
Update GEM Port
:param data:
"""
- pass
+ if data.name not in self._gem_ports:
+ raise KeyError("GEM Port '{}' does not exists".format(data.name))
- def delete_gemport(self, data):
+ gem_port = GemPort.create(data)
+ #
+ # TODO: On GEM Port changes, may need to add/delete/modify ONU Flow(s)
+ pass
+ raise NotImplementedError('TODO: Not yet supported')
+
+ def remove_gemport(self, data):
"""
Delete GEM Port
:param data:
"""
- pass
+ gem_port = self._gem_ports.get(data.name)
+
+ if gem_port is not None:
+ del self._gem_ports[data.name]
+ #
+ # TODO: On GEM Port changes, may need to delete ONU Flow(s)
+ pass # Perform any needed operations
+ raise NotImplementedError('TODO: Not yet supported')
+
+ def create_multicast_gemport(self, data):
+ """
+ API to create multicast gemport object in the devices
+ :data: multicast gemport data object
+ :return: None
+ """
+ pass # TODO: Implement
+
+ def update_multicast_gemport(self, data):
+ """
+ API to update multicast gemport object in the devices
+ :data: multicast gemport data object
+ :return: None
+ """
+ pass # TODO: Implement
+
+ def remove_multicast_gemport(self, data):
+ """
+ API to delete multicast gemport object in the devices
+ :data: multicast gemport data object
+ :return: None
+ """
+ pass # TODO: Implement
+
+ def create_multicast_distribution_set(self, data):
+ """
+ API to create multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :data: multicast distribution data object
+ :return: None
+ """
+ pass # TODO: Implement
+
+ def update_multicast_distribution_set(self, data):
+ """
+ API to update multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :data: multicast distribution data object
+ :return: None
+ """
+ pass # TODO: Implement
+
+ def remove_multicast_distribution_set(self, data):
+ """
+ API to delete multicast distribution rule to specify
+ the multicast VLANs that ride on the multicast gemport
+ :data: multicast distribution data object
+ :return: None
+ """
+ pass # TODO: Implement