VOL-599: Update to zeroMQ support and Tcont/TD bug fix
Change-Id: I7ed03c8584f44cd40bf35339af0a13092baf5f22
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index 59ed01b..a373670 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -91,7 +91,7 @@
RESTART_RPC = '<system-restart xmlns="urn:ietf:params:xml:ns:yang:ietf-system"/>'
def __init__(self, **kwargs):
- from net.adtran_zmq import DEFAULT_ZEROMQ_OMCI_TCP_PORT
+ from net.adtran_zmq import DEFAULT_PON_AGENT_TCP_PORT, DEFAULT_PIO_TCP_PORT
super(AdtranDeviceHandler, self).__init__()
@@ -152,7 +152,8 @@
self.max_nni_ports = 1 # TODO: This is a VOLTHA imposed limit in 'flow_decomposer.py
# and logical_device_agent.py
# OMCI ZMQ Channel
- self.zmq_port = DEFAULT_ZEROMQ_OMCI_TCP_PORT
+ self.pon_agent_port = DEFAULT_PON_AGENT_TCP_PORT
+ self.pio_port = DEFAULT_PIO_TCP_PORT
# Heartbeat support
self.heartbeat_count = 0
@@ -218,7 +219,7 @@
del self._evcs[evc.name]
def parse_provisioning_options(self, device):
- from net.adtran_zmq import DEFAULT_ZEROMQ_OMCI_TCP_PORT
+ from net.adtran_zmq import DEFAULT_PON_AGENT_TCP_PORT
if not device.ipv4_address:
self.activate_failed(device, 'No ip_address field provided')
@@ -253,7 +254,7 @@
help='REST Password')
parser.add_argument('--rc_port', '-T', action='store', default=_DEFAULT_RESTCONF_PORT, type=check_tcp_port,
help='RESTCONF TCP Port')
- parser.add_argument('--zmq_port', '-z', action='store', default=DEFAULT_ZEROMQ_OMCI_TCP_PORT,
+ parser.add_argument('--zmq_port', '-z', action='store', default=DEFAULT_PON_AGENT_TCP_PORT,
type=check_tcp_port, help='ZeroMQ Port')
parser.add_argument('--autoactivate', '-a', action='store_true', default=False,
help='Autoactivate / Demo mode')
@@ -280,7 +281,7 @@
self.rest_password = args.rc_password
self.rest_port = args.rc_port
- self.zmq_port = args.zmq_port
+ self.pon_agent_port = args.zmq_port
self._autoactivate = args.autoactivate
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index ba88d9b..1709871 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -51,7 +51,7 @@
self.descriptor = Adapter(
id=self.name,
vendor='Adtran, Inc.',
- version='0.12',
+ version='0.13',
config=AdapterConfig(log_level=LogLevel.INFO)
)
log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index 3ffecbb..47fcea8 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -71,7 +71,8 @@
self.status_poll = None
self.status_poll_interval = 5.0
self.status_poll_skew = self.status_poll_interval / 10
- self.zmq_client = None
+ self.pon_agent = None
+ self.pio_agent = None
self.ssh_deferred = None
self._system_id = None
self._download_protocols = None
@@ -319,8 +320,9 @@
# Make sure configured for ZMQ remote access
self._ready_zmq()
- # ZeroMQ client
- self.zmq_client = AdtranZmqClient(self.ip_address, rx_callback=self.rx_packet, port=self.zmq_port)
+ # ZeroMQ clients
+ self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
+ self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
# Download support
self._download_deferred = reactor.callLater(0, self._get_download_protocols)
@@ -364,7 +366,7 @@
def _ready_zmq(self):
from net.rcmd import RCmd
# Check for port status
- command = 'netstat -pan | grep -i 0.0.0.0:{} | wc -l'.format(self.zmq_port)
+ command = 'netstat -pan | grep -i 0.0.0.0:{} | wc -l'.format(self.pon_agent_port)
rcmd = RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
try:
@@ -405,7 +407,7 @@
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
- c, self.zmq_client = self.zmq_client, None
+ c, self.pon_agent = self.pon_agent, None
if c is not None:
try:
c.shutdown()
@@ -418,8 +420,9 @@
super(AdtranOltHandler, self).reenable(done_deferred=done_deferred)
self._ready_zmq()
- self.zmq_client = AdtranZmqClient(self.ip_address, rx_callback=self.rx_packet,
- port=self.zmq_port)
+ self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
+ self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+
# Register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
@@ -428,7 +431,7 @@
def reboot(self):
self._cancel_deferred()
- c, self.zmq_client = self.zmq_client, None
+ c, self.pon_agent = self.pon_agent, None
if c is not None:
c.shutdown()
@@ -451,7 +454,9 @@
# Register for adapter messages
self.adapter_agent.register_for_inter_adapter_messages()
- self.zmq_client = AdtranZmqClient(self.ip_address, rx_callback=self.rx_packet, port=self.zmq_port)
+ self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
+ self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+
self.status_poll = reactor.callLater(5, self.poll_for_status)
def delete(self):
@@ -460,28 +465,60 @@
# Drop registration for adapter messages
self.adapter_agent.unregister_for_inter_adapter_messages()
- c, self.zmq_client = self.zmq_client, None
+ c, self.pon_agent = self.pon_agent, None
if c is not None:
c.shutdown()
super(AdtranOltHandler, self).delete()
- def rx_packet(self, message):
- try:
- self.log.debug('rx_packet')
+ def rx_pa_packet(self, packets):
+ self.log.debug('rx-pon-agent-packet')
- pon_id, onu_id, msg_bytes, is_omci = AdtranZmqClient.decode_packet(message,
- self.is_async_control)
- if is_omci:
- proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
- self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
- else:
- pass # TODO: Packet in support not yet supported
- # self.adapter_agent.send_packet_in(logical_device_id=logical_device_id,
- # logical_port_no=cvid, # C-VID encodes port no
- # packet=str(msg))
- except Exception as e:
- self.log.exception('rx_packet', e=e)
+ for packet in packets:
+ try:
+ pon_id, onu_id, msg_bytes, is_omci = \
+ AdtranZmqClient.decode_pon_agent_packet(packet,
+ self.is_async_control)
+ if is_omci:
+ proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
+
+ if proxy_address is not None:
+ self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
+
+ except Exception as e:
+ self.log.exception('rx-pon-agent-packet', e=e)
+
+ def _compute_logical_port_no(self, port_no, evc_map, packet):
+ logical_port_no = None
+
+ if self.is_pon_port(port_no):
+ pon = self.get_southbound_port(port_no)
+
+ elif self.is_nni_port(port_no):
+ nni = self.get_northbound_port(port_no)
+ logical_port = nni.get_logical_port() if nni is not None else None
+ logical_port_no = logical_port.ofp_port.port_no if logical_port is not None else None
+
+ # TODO: Need to decode base on port_no & evc_map
+ return logical_port_no
+
+ def rx_pio_packet(self, packets):
+ self.log.debug('rx-packet-in', type=type(packets), data=packets)
+ assert isinstance(packets, list), 'Expected a list of packets'
+
+ if self.logical_device_id is not None:
+ for packet in packets:
+ try:
+ port_no, evc_map, packet = AdtranZmqClient.decode_packet_in_message(packet)
+ # packet.show()
+
+ logical_port_no = self._compute_logical_port_no(port_no, evc_map, packet)
+
+ self.adapter_agent.send_packet_in(logical_device_id=self.logical_device_id,
+ logical_port_no=logical_port_no,
+ packet=str(packet))
+ except Exception as e:
+ self.log.exception('rx_pio_packet', e=e)
def poll_for_status(self):
self.log.debug('Initiating-status-poll')
@@ -582,7 +619,7 @@
if isinstance(msg, Packet):
msg = str(msg)
- if self.zmq_client is not None:
+ if self.pon_agent is not None:
pon_id, onu_id = self._proxy_address_to_pon_onu_id(proxy_address)
pon = self.southbound_ports.get(pon_id)
@@ -594,10 +631,10 @@
data = AdtranZmqClient.encode_omci_message(msg, pon_id, onu_id,
self.is_async_control)
try:
- self.zmq_client.send(data)
+ self.pon_agent.send(data)
except Exception as e:
- self.log.exception('zmqClient-send', pon_id=pon_id, onu_id=onu_id, e=e)
+ self.log.exception('pon-agent-send', pon_id=pon_id, onu_id=onu_id, e=e)
else:
self.log.debug('onu-invalid-or-disabled', pon_id=pon_id, onu_id=onu_id)
else:
@@ -664,6 +701,9 @@
pon_id = self._port_number_to_pon_id(port)
return self.southbound_ports.get(pon_id, None)
+ def get_northbound_port(self, port):
+ return self.northbound_ports.get(port, None)
+
def get_port_name(self, port):
if self.is_nni_port(port):
return self.northbound_ports[port].name
diff --git a/voltha/adapters/adtran_olt/net/adtran_zmq.py b/voltha/adapters/adtran_olt/net/adtran_zmq.py
index 4bbd704..9ef0731 100644
--- a/voltha/adapters/adtran_olt/net/adtran_zmq.py
+++ b/voltha/adapters/adtran_olt/net/adtran_zmq.py
@@ -12,47 +12,56 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import binascii
-import struct
+import sys
import json
-
+import struct
+import binascii
import structlog
+
+from twisted.internet.defer import succeed
+from twisted.internet import threads
+
from txzmq import ZmqEndpoint, ZmqFactory
from txzmq.connection import ZmqConnection
+
+import zmq
from zmq import constants
+from zmq.utils import jsonapi
+from zmq.utils.strtypes import b, u
+from zmq.auth.base import Authenticator
+
+from threading import Thread, Event
log = structlog.get_logger()
zmq_factory = ZmqFactory()
# An OMCI message minimally has a 32-bit PON index and 32-bit ONU ID.
-DEFAULT_ZEROMQ_OMCI_TCP_PORT = 5656
+DEFAULT_PON_AGENT_TCP_PORT = 5656
+DEFAULT_PIO_TCP_PORT = 5657
class AdtranZmqClient(object):
"""
- Adtran ZeroMQ Client for PON Agent packet in/out service
- PON Agent expects and external PAIR socket with
+ Adtran ZeroMQ Client for PON Agent and/or packet in/out service
"""
- def __init__(self, ip_address, rx_callback=None, port=DEFAULT_ZEROMQ_OMCI_TCP_PORT):
- self.external_conn = 'tcp://{}:{}'.format(ip_address, port)
+ def __init__(self, ip_address, rx_callback, port):
+ external_conn = 'tcp://{}:{}'.format(ip_address, port)
+ endpoint = ZmqEndpoint('connect', external_conn)
- self.zmq_endpoint = ZmqEndpoint('connect', self.external_conn)
- self.socket = ZmqPairConnection(zmq_factory,
- self.zmq_endpoint)
-
- self.socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
+ self._socket = ZmqPairConnection(zmq_factory, endpoint)
+ self._socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
def send(self, data):
try:
- self.socket.send(data)
+ self._socket.send(data)
except Exception as e:
log.exception('send', e=e)
def shutdown(self):
- self.socket.onReceive = AdtranZmqClient.rx_nop
- self.socket.shutdown()
+ self._socket.onReceive = AdtranZmqClient.rx_nop
+ self._socket.shutdown()
@staticmethod
def rx_nop(message):
@@ -118,24 +127,17 @@
})
@staticmethod
- def decode_packet(packet, is_async_control):
+ def decode_pon_agent_packet(packet, is_async_control):
"""
- Decode the packet provided by the ZMQ client
+ Decode the PON-Agent packet provided by the ZMQ client
:param packet: (bytes) Packet
:param is_async_control: (bool) Newer async/JSON support
:return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
and a flag indicating if it is OMCI
"""
- # TODO: For now, only OMCI supported
- if isinstance(packet, list):
- if len(packet) > 1:
- pass # TODO: Can we get multiple packets?
-
- return AdtranZmqClient._decode_omci_message_json(packet[0]) if is_async_control \
- else AdtranZmqClient._decode_omci_message_legacy(packet[0])
-
- return -1, -1, None, False
+ return AdtranZmqClient._decode_omci_message_json(packet) if is_async_control \
+ else AdtranZmqClient._decode_omci_message_legacy(packet)
@staticmethod
def _decode_omci_message_legacy(packet):
@@ -167,12 +169,25 @@
return pon_id, onu_id, msg_data, is_omci
@staticmethod
- def _decode_packet_in_message(packet):
- # TODO: This is not yet supported
- (pon_index, onu_id) = struct.unpack_from('!II', packet)
- msg = binascii.hexlify(packet[8:])
+ def decode_packet_in_message(packet):
+ from scapy.layers.l2 import Ether
+ try:
+ message = json.loads(packet)
+ log.debug('message', message=message)
- return pon_index, onu_id, msg, False
+ for field in ['url', 'evc-map-name', 'total-len', 'port-number', 'message-contents']:
+ assert field in message, "Missing field '{}' in received packet".format(field)
+
+ decoded = message['message-contents'].decode('base64')
+ assert len(decoded.encode('hex')) == message['total-len'], \
+ 'Decoded length ({}) != Message Encoded lenght ({})'.\
+ format(len(decoded.encode('hex')), message['total-len'])
+
+ return message['port-number'], message['evc-map'], Ether(decoded)
+
+ except Exception as e:
+ log.exception('decode', e=e)
+ raise
class ZmqPairConnection(ZmqConnection):
@@ -198,3 +213,255 @@
:param message: message data
"""
raise NotImplementedError(self)
+
+ def send(self, message):
+ """
+ Send message via ZeroMQ socket.
+
+ Sending is performed directly to ZeroMQ without queueing. If HWM is
+ reached on ZeroMQ side, sending operation is aborted with exception
+ from ZeroMQ (EAGAIN).
+
+ After writing read is scheduled as ZeroMQ may not signal incoming
+ messages after we touched socket with write request.
+
+ :param message: message data, could be either list of str (multipart
+ message) or just str
+ :type message: str or list of str
+ """
+ from txzmq.compat import is_nonstr_iter
+ from twisted.internet import reactor
+
+ if not is_nonstr_iter(message):
+ self.socket.send(message, constants.NOBLOCK)
+ else:
+ # for m in message[:-1]:
+ # self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
+ # self.socket.send(message[-1], constants.NOBLOCK)
+ self.socket.send_multipart(message, flags=constants.NOBLOCK)
+
+ if self.read_scheduled is None:
+ self.read_scheduled = reactor.callLater(0, self.doRead)
+
+###############################################################################################
+###############################################################################################
+###############################################################################################
+###############################################################################################
+
+def _inherit_docstrings(cls):
+ """inherit docstrings from Authenticator, so we don't duplicate them"""
+ for name, method in cls.__dict__.items():
+ if name.startswith('_'):
+ continue
+ upstream_method = getattr(Authenticator, name, None)
+ if not method.__doc__:
+ method.__doc__ = upstream_method.__doc__
+ return cls
+
+@_inherit_docstrings
+class TwistedZmqAuthenticator(object):
+ """Run ZAP authentication in a background thread but communicate via Twisted ZMQ"""
+
+ def __init__(self, encoding='utf-8'):
+ self.context = zmq_factory.context
+ self.encoding = encoding
+ self.pipe = None
+ self.pipe_endpoint = "inproc://{0}.inproc".format(id(self))
+ self.thread = None
+
+ def allow(self, *addresses):
+ try:
+ self.pipe.send([b'ALLOW'] + [b(a, self.encoding) for a in addresses])
+
+ except Exception as e:
+ log.exception('allow', e=e)
+
+ def deny(self, *addresses):
+ try:
+ self.pipe.send([b'DENY'] + [b(a, self.encoding) for a in addresses])
+
+ except Exception as e:
+ log.exception('deny', e=e)
+
+ def configure_plain(self, domain='*', passwords=None):
+ try:
+ self.pipe.send([b'PLAIN', b(domain, self.encoding), jsonapi.dumps(passwords or {})])
+
+ except Exception as e:
+ log.exception('configure-plain', e=e)
+
+ def configure_curve(self, domain='*', location=''):
+ try:
+ domain = b(domain, self.encoding)
+ location = b(location, self.encoding)
+ self.pipe.send([b'CURVE', domain, location])
+
+ except Exception as e:
+ log.exception('configure-curve', e=e)
+
+ def start(self, rx_callback=AdtranZmqClient.rx_nop):
+ """Start the authentication thread"""
+ try:
+ # create a socket to communicate with auth thread.
+
+ endpoint = ZmqEndpoint('bind', self.pipe_endpoint) # We are server, thread will be client
+ self.pipe = ZmqPairConnection(zmq_factory, endpoint)
+ self.pipe.onReceive = rx_callback
+
+ self.thread = LocalAuthenticationThread(self.context,
+ self.pipe_endpoint,
+ encoding=self.encoding)
+
+ return threads.deferToThread(TwistedZmqAuthenticator._do_thread_start,
+ self.thread, timeout=10)
+
+ except Exception as e:
+ log.exception('start', e=e)
+
+ @staticmethod
+ def _do_thread_start(thread, timeout=10):
+ thread.start()
+
+ # Event.wait:Changed in version 2.7: Previously, the method always returned None.
+ if sys.version_info < (2, 7):
+ thread.started.wait(timeout=timeout)
+
+ elif not thread.started.wait(timeout=timeout):
+ raise RuntimeError("Authenticator thread failed to start")
+
+ def stop(self):
+ """Stop the authentication thread"""
+ pipe, self.pipe = self.pipe, None
+ thread, self.thread = self.thread, None
+
+ if pipe:
+ pipe.send(b'TERMINATE')
+ pipe.onReceive = AdtranZmqClient.rx_nop
+ pipe.shutdown()
+
+ if thread.is_alive():
+ return threads.deferToThread(TwistedZmqAuthenticator._do_thread_join,
+ thread)
+ return succeed('done')
+
+ @staticmethod
+ def _do_thread_join(thread, timeout=1):
+ thread.join(timeout)
+ pass
+
+ def is_alive(self):
+ """Is the ZAP thread currently running?"""
+ return self.thread and self.thread.is_alive()
+
+ def __del__(self):
+ self.stop()
+
+
+# NOTE: Following is a duplicated from zmq code since class was not exported
+class LocalAuthenticationThread(Thread):
+ """A Thread for running a zmq Authenticator
+
+ This is run in the background by ThreadedAuthenticator
+ """
+
+ def __init__(self, context, endpoint, encoding='utf-8', authenticator=None):
+ super(LocalAuthenticationThread, self).__init__(name='0mq Authenticator')
+ self.context = context or zmq.Context.instance()
+ self.encoding = encoding
+ self.started = Event()
+ self.authenticator = authenticator or Authenticator(context, encoding=encoding)
+
+ # create a socket to communicate back to main thread.
+ self.pipe = context.socket(zmq.PAIR)
+ self.pipe.linger = 1
+ self.pipe.connect(endpoint)
+
+ def run(self):
+ """Start the Authentication Agent thread task"""
+ try:
+ self.authenticator.start()
+ self.started.set()
+ zap = self.authenticator.zap_socket
+ poller = zmq.Poller()
+ poller.register(self.pipe, zmq.POLLIN)
+ poller.register(zap, zmq.POLLIN)
+ while True:
+ try:
+ socks = dict(poller.poll())
+ except zmq.ZMQError:
+ break # interrupted
+
+ if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
+ terminate = self._handle_pipe()
+ if terminate:
+ break
+
+ if zap in socks and socks[zap] == zmq.POLLIN:
+ self._handle_zap()
+
+ self.pipe.close()
+ self.authenticator.stop()
+
+ except Exception as e:
+ log.exception("run", e=e)
+
+ def _handle_zap(self):
+ """
+ Handle a message from the ZAP socket.
+ """
+ msg = self.authenticator.zap_socket.recv_multipart()
+ if not msg:
+ return
+ self.authenticator.handle_zap_message(msg)
+
+ def _handle_pipe(self):
+ """
+ Handle a message from front-end API.
+ """
+ terminate = False
+
+ # Get the whole message off the pipe in one go
+ msg = self.pipe.recv_multipart()
+
+ if msg is None:
+ terminate = True
+ return terminate
+
+ command = msg[0]
+ log.debug("auth received API command", command=command)
+
+ if command == b'ALLOW':
+ addresses = [u(m, self.encoding) for m in msg[1:]]
+ try:
+ self.authenticator.allow(*addresses)
+ except Exception as e:
+ log.exception("Failed to allow", addresses=addresses, e=e)
+
+ elif command == b'DENY':
+ addresses = [u(m, self.encoding) for m in msg[1:]]
+ try:
+ self.authenticator.deny(*addresses)
+ except Exception as e:
+ log.exception("Failed to deny", addresses=addresses, e=e)
+
+ elif command == b'PLAIN':
+ domain = u(msg[1], self.encoding)
+ json_passwords = msg[2]
+ self.authenticator.configure_plain(domain, jsonapi.loads(json_passwords))
+
+ elif command == b'CURVE':
+ # For now we don't do anything with domains
+ domain = u(msg[1], self.encoding)
+
+ # If location is CURVE_ALLOW_ANY, allow all clients. Otherwise
+ # treat location as a directory that holds the certificates.
+ location = u(msg[2], self.encoding)
+ self.authenticator.configure_curve(domain, location)
+
+ elif command == b'TERMINATE':
+ terminate = True
+
+ else:
+ log.error("Invalid auth command from API", command=command)
+
+ return terminate
diff --git a/voltha/adapters/adtran_olt/xpon/olt_tcont.py b/voltha/adapters/adtran_olt/xpon/olt_tcont.py
index 5efcdcc..95fd8cc 100644
--- a/voltha/adapters/adtran_olt/xpon/olt_tcont.py
+++ b/voltha/adapters/adtran_olt/xpon/olt_tcont.py
@@ -32,10 +32,10 @@
@staticmethod
def create(tcont, td):
- from traffic_descriptor import TrafficDescriptor
+ from olt_traffic_descriptor import OltTrafficDescriptor
assert isinstance(tcont, dict), 'TCONT should be a dictionary'
- assert isinstance(td, TrafficDescriptor), 'Invalid Traffic Descriptor data type'
+ assert isinstance(td, OltTrafficDescriptor), 'Invalid Traffic Descriptor data type'
return OltTCont(tcont['alloc-id'], td,
name=tcont['name'],
diff --git a/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py b/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
index fd4f753..1aa3848 100644
--- a/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
+++ b/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
@@ -51,12 +51,12 @@
else:
best_effort = None
- return TrafficDescriptor(traffic_disc['fixed-bandwidth'],
- traffic_disc['assured-bandwidth'],
- traffic_disc['maximum-bandwidth'],
- name=traffic_disc['name'],
- best_effort=best_effort,
- additional=additional)
+ return OltTrafficDescriptor(traffic_disc['fixed-bandwidth'],
+ traffic_disc['assured-bandwidth'],
+ traffic_disc['maximum-bandwidth'],
+ name=traffic_disc['name'],
+ best_effort=best_effort,
+ additional=additional)
@inlineCallbacks
def add_to_hardware(self, session, pon_id, onu_id, alloc_id):