+# Copyright 2017-present Adtran, Inc.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+# http://www.apache.org/licenses/LICENSE-2.0
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import random
+import arrow
+import structlog
+import xmltodict
+from adapters.adtran_common.port import AdtnPort
+from twisted.internet import reactor
+from twisted.internet.defer import inlineCallbacks, returnValue, succeed, fail
+from twisted.python.failure import Failure
+from pyvoltha.protos.common_pb2 import OperStatus, AdminState
+from pyvoltha.protos.device_pb2 import Port
+from pyvoltha.protos.logical_device_pb2 import LogicalPort
+from pyvoltha.protos.openflow_13_pb2 import OFPPF_100GB_FD, OFPPF_FIBER, OFPPS_LIVE, ofp_port
+class NniPort(AdtnPort):
+ """
+ Northbound network port, often Ethernet-based
+ """
+ def __init__(self, parent, **kwargs):
+ super(NniPort, self).__init__(parent, **kwargs)
+ # TODO: Weed out those properties supported by common 'Port' object
+ self.log = structlog.get_logger(port_no=kwargs.get('port_no'))
+ self.log.info('creating')
+ # ONOS/SEBA wants 'nni-<port>' for port names, OLT NETCONF wants their
+ # name (something like hundred-gigabit-ethernet 0/1) which is reported
+ # when we enumerated the ports
+ self._physical_port_name = kwargs.get('name', 'nni-{}'.format(self._port_no))
+ self._logical_port_name = 'nni-{}'.format(self._port_no)
+ self._logical_port = None
+ self.sync_tick = 10.0
+ self._stats_tick = 5.0
+ self._stats_deferred = None
+ # Local cache of NNI configuration
+ self._ianatype = '<type xmlns:ianaift="urn:ietf:params:xml:ns:yang:iana-if-type">ianaift:ethernetCsmacd</type>'
+ # And optional parameters
+ # TODO: Currently cannot update admin/oper status, so create this enabled and active
+ # self._admin_state = kwargs.pop('admin_state', AdminState.UNKNOWN)
+ # self._oper_status = kwargs.pop('oper_status', OperStatus.UNKNOWN)
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ self._oper_status = OperStatus.ACTIVE
+ self._label = self._physical_port_name
+ self._mac_address = kwargs.pop('mac_address', '00:00:00:00:00:00')
+ # TODO: Get with JOT and find out how to pull out MAC Address via NETCONF
+ # TODO: May need to refine capabilities into current, advertised, and peer
+ self._ofp_capabilities = kwargs.pop('ofp_capabilities', OFPPF_100GB_FD | OFPPF_FIBER)
+ self._ofp_state = kwargs.pop('ofp_state', OFPPS_LIVE)
+ self._current_speed = kwargs.pop('current_speed', OFPPF_100GB_FD)
+ self._max_speed = kwargs.pop('max_speed', OFPPF_100GB_FD)
+ self._device_port_no = kwargs.pop('device_port_no', self._port_no)
+ # Statistics
+ self.rx_dropped = 0
+ self.rx_error_packets = 0
+ self.rx_ucast_packets = 0
+ self.rx_bcast_packets = 0
+ self.rx_mcast_packets = 0
+ self.tx_dropped = 0
+ self.rx_ucast_packets = 0
+ self.tx_bcast_packets = 0
+ self.tx_mcast_packets = 0
+ def __str__(self):
+ return "NniPort-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+ def get_port(self):
+ """
+ Get the VOLTHA PORT object for this port
+ :return: VOLTHA Port object
+ """
+ self.log.debug('get-port-status-update', port=self._port_no,
+ label=self._label)
+ if self._port is None:
+ self._port = Port(port_no=self._port_no,
+ label=self._label,
+ type=Port.ETHERNET_NNI,
+ admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ if self._port.admin_state != self._admin_state or\
+ self._port.oper_status != self._oper_status:
+ self.log.debug('get-port-status-update', admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ self._port.admin_state = self._admin_state
+ self._port.oper_status = self._oper_status
+ return self._port
+ @property
+ def iana_type(self):
+ return self._ianatype
+ def cancel_deferred(self):
+ super(NniPort, self).cancel_deferred()
+ d, self._stats_deferred = self._stats_deferred, None
+ try:
+ if d is not None and d.called:
+ d.cancel()
+ except:
+ pass
+ def _update_adapter_agent(self):
+ # adapter_agent add_port also does an update of port status
+ self.log.debug('update-adapter-agent', admin_state=self._admin_state,
+ oper_status=self._oper_status)
+ self.adapter_agent.add_port(self.olt.device_id, self.get_port())
+ def get_logical_port(self):
+ """
+ Get the VOLTHA logical port for this port
+ :return: VOLTHA logical port or None if not supported
+ """
+ def mac_str_to_tuple(mac):
+ """
+ Convert 'xx:xx:xx:xx:xx:xx' MAC address string to a tuple of integers.
+ Example: mac_str_to_tuple('00:01:02:03:04:05') == (0, 1, 2, 3, 4, 5)
+ """
+ return tuple(int(d, 16) for d in mac.split(':'))
+ if self._logical_port is None:
+ openflow_port = ofp_port(port_no=self._port_no,
+ hw_addr=mac_str_to_tuple(self._mac_address),
+ name=self._logical_port_name,
+ config=0,
+ state=self._ofp_state,
+ curr=self._ofp_capabilities,
+ advertised=self._ofp_capabilities,
+ peer=self._ofp_capabilities,
+ curr_speed=self._current_speed,
+ max_speed=self._max_speed)
+ self._logical_port = LogicalPort(id=self._logical_port_name,
+ ofp_port=openflow_port,
+ device_id=self._parent.device_id,
+ device_port_no=self._device_port_no,
+ root_port=True)
+ return self._logical_port
+ @inlineCallbacks
+ def finish_startup(self):
+ if self.state != AdtnPort.State.INITIAL:
+ returnValue('Done')
+ self.log.debug('final-startup')
+ # TODO: Start status polling of NNI interfaces
+ self.deferred = None # = reactor.callLater(3, self.do_stuff)
+ # Begin statistics sync
+ self._stats_deferred = reactor.callLater(self._stats_tick * 2, self._update_statistics)
+ try:
+ yield self.set_config('enabled', True)
+ super(NniPort, self).finish_startup()
+ except Exception as e:
+ self.log.exception('nni-start', e=e)
+ self._oper_status = OperStatus.UNKNOWN
+ self._update_adapter_agent()
+ returnValue('Enabled')
+ def finish_stop(self):
+ # NOTE: Leave all NNI ports active (may have inband management)
+ # TODO: Revisit leaving NNI Ports active on disable
+ return self.set_config('enabled', False)
+ @inlineCallbacks
+ def reset(self):
+ """
+ Set the NNI Port to a known good state on initial port startup. Actual
+ NNI 'Start' is done elsewhere
+ """
+ # if self.state != AdtnPort.State.INITIAL:
+ # self.log.error('reset-ignored', state=self.state)
+ # returnValue('Ignored')
+ self.log.info('resetting', label=self._label)
+ # Always enable our NNI ports
+ try:
+ results = yield self.set_config('enabled', True)
+ self._admin_state = AdminState.ENABLED
+ self._enabled = True
+ returnValue(results)
+ except Exception as e:
+ self.log.exception('reset', e=e)
+ self._admin_state = AdminState.UNKNOWN
+ raise
+ @inlineCallbacks
+ def set_config(self, leaf, value):
+ if isinstance(value, bool):
+ value = 'true' if value else 'false'
+ config = '<interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._physical_port_name) + \
+ ' {}'.format(self._ianatype) + \
+ ' <{}>{}</{}>'.format(leaf, value, leaf) + \
+ ' </interface>' + \
+ '</interfaces>'
+ try:
+ results = yield self._parent.netconf_client.edit_config(config)
+ returnValue(results)
+ except Exception as e:
+ self.log.exception('set', leaf=leaf, value=value, e=e)
+ raise
+ def get_nni_config(self):
+ config = '<filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ ' <interfaces xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._physical_port_name) + \
+ ' <enabled/>' + \
+ ' </interface>' + \
+ ' </interfaces>' + \
+ '</filter>'
+ return self._parent.netconf_client.get(config)
+ def get_nni_statistics(self):
+ state = '<filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+ ' <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">' + \
+ ' <interface>' + \
+ ' <name>{}</name>'.format(self._physical_port_name) + \
+ ' <admin-status/>' + \
+ ' <oper-status/>' + \
+ ' <statistics/>' + \
+ ' </interface>' + \
+ ' </interfaces>' + \
+ '</filter>'
+ return self._parent.netconf_client.get(state)
+ def sync_hardware(self):
+ if self.state == AdtnPort.State.RUNNING or self.state == AdtnPort.State.STOPPED:
+ def read_config(results):
+ #self.log.debug('read-config', results=results)
+ try:
+ result_dict = xmltodict.parse(results.data_xml)
+ interfaces = result_dict['data']['interfaces']
+ if 'if:interface' in interfaces:
+ entries = interfaces['if:interface']
+ else:
+ entries = interfaces['interface']
+ enabled = entries.get('enabled',
+ str(not self.enabled).lower()) == 'true'
+ if self.enabled == enabled:
+ return succeed('in-sync')
+ self.set_config('enabled', self.enabled)
+ self._oper_status = OperStatus.ACTIVE
+ self._update_adapter_agent()
+ except Exception as e:
+ self.log.exception('read-config', e=e)
+ return fail(Failure())
+ def failure(reason):
+ self.log.error('hardware-sync-failed', reason=reason)
+ def reschedule(_):
+ delay = self.sync_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self.sync_deferred = reactor.callLater(delay, self.sync_hardware)
+ self.sync_deferred = self.get_nni_config()
+ self.sync_deferred.addCallbacks(read_config, failure)
+ self.sync_deferred.addBoth(reschedule)
+ def _decode_nni_statistics(self, entry):
+ # admin_status = entry.get('admin-status')
+ # oper_status = entry.get('oper-status')
+ # admin_status = entry.get('admin-status')
+ # phys_address = entry.get('phys-address')
+ stats = entry.get('statistics')
+ if stats is not None:
+ self.timestamp = arrow.utcnow().float_timestamp
+ self.rx_bytes = int(stats.get('in-octets', 0))
+ self.rx_ucast_packets = int(stats.get('in-unicast-pkts', 0))
+ self.rx_bcast_packets = int(stats.get('in-broadcast-pkts', 0))
+ self.rx_mcast_packets = int(stats.get('in-multicast-pkts', 0))
+ self.rx_error_packets = int(stats.get('in-errors', 0)) + int(stats.get('in-discards', 0))
+ self.tx_bytes = int(stats.get('out-octets', 0))
+ self.tx_ucast_packets = int(stats.get('out-unicast-pkts', 0))
+ self.tx_bcast_packets = int(stats.get('out-broadcast-pkts', 0))
+ self.tx_mcasy_packets = int(stats.get('out-multicast-pkts', 0))
+ self.tx_error_packets = int(stats.get('out-errors', 0)) + int(stats.get('out-discards', 0))
+ self.rx_packets = self.rx_ucast_packets + self.rx_mcast_packets + self.rx_bcast_packets
+ self.tx_packets = self.tx_ucast_packets + self.tx_mcast_packets + self.tx_bcast_packets
+ # No support for rx_crc_errors or bip_errors
+ def _update_statistics(self):
+ if self.state == AdtnPort.State.RUNNING:
+ def read_state(results):
+ # self.log.debug('read-state', results=results)
+ try:
+ result_dict = xmltodict.parse(results.data_xml)
+ entry = result_dict['data']['interfaces-state']['interface']
+ self._decode_nni_statistics(entry)
+ return succeed('done')
+ except Exception as e:
+ self.log.exception('read-state', e=e)
+ return fail(Failure())
+ def failure(reason):
+ self.log.error('update-stats-failed', reason=reason)
+ def reschedule(_):
+ delay = self._stats_tick
+ delay += random.uniform(-delay / 10, delay / 10)
+ self._stats_deferred = reactor.callLater(delay, self._update_statistics)
+ try:
+ self._stats_deferred = self.get_nni_statistics()
+ self._stats_deferred.addCallbacks(read_state, failure)
+ self._stats_deferred.addBoth(reschedule)
+ except Exception as e:
+ self.log.exception('nni-sync', port=self.name, e=e)
+ self._stats_deferred = reactor.callLater(self._stats_tick, self._update_statistics)
+class MockNniPort(NniPort):
+ """
+ A class similar to the 'Port' class in the VOLTHA but for a non-existent (virtual OLT)
+ TODO: Merge this with the Port class or cleanup where possible
+ so we do not duplicate fields/properties/methods
+ """
+ def __init__(self, parent, **kwargs):
+ super(MockNniPort, self).__init__(parent, **kwargs)
+ def __str__(self):
+ return "NniPort-mock-{}: Admin: {}, Oper: {}, parent: {}".format(self._port_no,
+ self._admin_state,
+ self._oper_status,
+ self._parent)
+ @staticmethod
+ def get_nni_port_state_results():
+ from ncclient.operations.retrieve import GetReply
+ raw = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
+ xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0"
+ message-id="urn:uuid:59e71979-01bb-462f-b17a-b3a45e1889ac">
+ <data>
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface><name>hundred-gigabit-ethernet 0/1</name></interface>
+ </interfaces-state>
+ </data>
+ </rpc-reply>
+ """
+ return GetReply(raw)
+ @staticmethod
+ def get_pon_port_state_results():
+ from ncclient.operations.retrieve import GetReply
+ raw = """
+ <?xml version="1.0" encoding="UTF-8"?>
+ <rpc-reply xmlns="urn:ietf:params:xml:ns:netconf:base:1.0"
+ xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0"
+ message-id="urn:uuid:59e71979-01bb-462f-b17a-b3a45e1889ac">
+ <data>
+ <interfaces-state xmlns="urn:ietf:params:xml:ns:yang:ietf-interfaces">
+ <interface><name>XPON 0/1</name></interface>
+ <interface><name>XPON 0/2</name></interface>
+ <interface><name>XPON 0/3</name></interface>
+ <interface><name>XPON 0/4</name></interface>
+ <interface><name>XPON 0/5</name></interface>
+ <interface><name>XPON 0/6</name></interface>
+ <interface><name>XPON 0/7</name></interface>
+ <interface><name>XPON 0/8</name></interface>
+ <interface><name>XPON 0/9</name></interface>
+ <interface><name>XPON 0/10</name></interface>
+ <interface><name>XPON 0/11</name></interface>
+ <interface><name>XPON 0/12</name></interface>
+ <interface><name>XPON 0/13</name></interface>
+ <interface><name>XPON 0/14</name></interface>
+ <interface><name>XPON 0/15</name></interface>
+ <interface><name>XPON 0/16</name></interface>
+ </interfaces-state>
+ </data>
+ </rpc-reply>
+ """
+ return GetReply(raw)
+ def reset(self):
+ """
+ Set the NNI Port to a known good state on initial port startup. Actual
+ NNI 'Start' is done elsewhere
+ """
+ if self.state != AdtnPort.State.INITIAL:
+ self.log.error('reset-ignored', state=self.state)
+ return fail()
+ self.log.info('resetting', label=self._label)
+ # Always enable our NNI ports
+ self._enabled = True
+ self._admin_state = AdminState.ENABLED
+ return succeed('Enabled')
+ def set_config(self, leaf, value):
+ if leaf == 'enabled':
+ self._enabled = value
+ else:
+ raise NotImplemented("Leaf '{}' is not supported".format(leaf))
+ return succeed('Success')