VOL-599: Update to zeroMQ support and Tcont/TD bug fix

Change-Id: I7ed03c8584f44cd40bf35339af0a13092baf5f22
diff --git a/voltha/adapters/adtran_olt/adtran_device_handler.py b/voltha/adapters/adtran_olt/adtran_device_handler.py
index 59ed01b..a373670 100644
--- a/voltha/adapters/adtran_olt/adtran_device_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_device_handler.py
@@ -91,7 +91,7 @@
     RESTART_RPC = '<system-restart xmlns="urn:ietf:params:xml:ns:yang:ietf-system"/>'
 
     def __init__(self, **kwargs):
-        from net.adtran_zmq import DEFAULT_ZEROMQ_OMCI_TCP_PORT
+        from net.adtran_zmq import DEFAULT_PON_AGENT_TCP_PORT, DEFAULT_PIO_TCP_PORT
 
         super(AdtranDeviceHandler, self).__init__()
 
@@ -152,7 +152,8 @@
         self.max_nni_ports = 1  # TODO: This is a VOLTHA imposed limit in 'flow_decomposer.py
                                 # and logical_device_agent.py
         # OMCI ZMQ Channel
-        self.zmq_port = DEFAULT_ZEROMQ_OMCI_TCP_PORT
+        self.pon_agent_port = DEFAULT_PON_AGENT_TCP_PORT
+        self.pio_port = DEFAULT_PIO_TCP_PORT
 
         # Heartbeat support
         self.heartbeat_count = 0
@@ -218,7 +219,7 @@
             del self._evcs[evc.name]
 
     def parse_provisioning_options(self, device):
-        from net.adtran_zmq import DEFAULT_ZEROMQ_OMCI_TCP_PORT
+        from net.adtran_zmq import DEFAULT_PON_AGENT_TCP_PORT
 
         if not device.ipv4_address:
             self.activate_failed(device, 'No ip_address field provided')
@@ -253,7 +254,7 @@
                             help='REST Password')
         parser.add_argument('--rc_port', '-T', action='store', default=_DEFAULT_RESTCONF_PORT, type=check_tcp_port,
                             help='RESTCONF TCP Port')
-        parser.add_argument('--zmq_port', '-z', action='store', default=DEFAULT_ZEROMQ_OMCI_TCP_PORT,
+        parser.add_argument('--zmq_port', '-z', action='store', default=DEFAULT_PON_AGENT_TCP_PORT,
                             type=check_tcp_port, help='ZeroMQ Port')
         parser.add_argument('--autoactivate', '-a', action='store_true', default=False,
                             help='Autoactivate / Demo mode')
@@ -280,7 +281,7 @@
             self.rest_password = args.rc_password
             self.rest_port = args.rc_port
 
-            self.zmq_port = args.zmq_port
+            self.pon_agent_port = args.zmq_port
 
             self._autoactivate = args.autoactivate
 
diff --git a/voltha/adapters/adtran_olt/adtran_olt.py b/voltha/adapters/adtran_olt/adtran_olt.py
index ba88d9b..1709871 100644
--- a/voltha/adapters/adtran_olt/adtran_olt.py
+++ b/voltha/adapters/adtran_olt/adtran_olt.py
@@ -51,7 +51,7 @@
         self.descriptor = Adapter(
             id=self.name,
             vendor='Adtran, Inc.',
-            version='0.12',
+            version='0.13',
             config=AdapterConfig(log_level=LogLevel.INFO)
         )
         log.debug('adtran_olt.__init__', adapter_agent=adapter_agent)
diff --git a/voltha/adapters/adtran_olt/adtran_olt_handler.py b/voltha/adapters/adtran_olt/adtran_olt_handler.py
index 3ffecbb..47fcea8 100644
--- a/voltha/adapters/adtran_olt/adtran_olt_handler.py
+++ b/voltha/adapters/adtran_olt/adtran_olt_handler.py
@@ -71,7 +71,8 @@
         self.status_poll = None
         self.status_poll_interval = 5.0
         self.status_poll_skew = self.status_poll_interval / 10
-        self.zmq_client = None
+        self.pon_agent = None
+        self.pio_agent = None
         self.ssh_deferred = None
         self._system_id = None
         self._download_protocols = None
@@ -319,8 +320,9 @@
         # Make sure configured for ZMQ remote access
         self._ready_zmq()
 
-        # ZeroMQ client
-        self.zmq_client = AdtranZmqClient(self.ip_address, rx_callback=self.rx_packet, port=self.zmq_port)
+        # ZeroMQ clients
+        self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
+        self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
 
         # Download support
         self._download_deferred = reactor.callLater(0, self._get_download_protocols)
@@ -364,7 +366,7 @@
     def _ready_zmq(self):
         from net.rcmd import RCmd
         # Check for port status
-        command = 'netstat -pan | grep -i 0.0.0.0:{} |  wc -l'.format(self.zmq_port)
+        command = 'netstat -pan | grep -i 0.0.0.0:{} |  wc -l'.format(self.pon_agent_port)
         rcmd = RCmd(self.ip_address, self.netconf_username, self.netconf_password, command)
 
         try:
@@ -405,7 +407,7 @@
         # Drop registration for adapter messages
         self.adapter_agent.unregister_for_inter_adapter_messages()
 
-        c, self.zmq_client = self.zmq_client, None
+        c, self.pon_agent = self.pon_agent, None
         if c is not None:
             try:
                 c.shutdown()
@@ -418,8 +420,9 @@
         super(AdtranOltHandler, self).reenable(done_deferred=done_deferred)
 
         self._ready_zmq()
-        self.zmq_client = AdtranZmqClient(self.ip_address, rx_callback=self.rx_packet,
-                                          port=self.zmq_port)
+        self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
+        self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+
         # Register for adapter messages
         self.adapter_agent.register_for_inter_adapter_messages()
 
@@ -428,7 +431,7 @@
     def reboot(self):
         self._cancel_deferred()
 
-        c, self.zmq_client = self.zmq_client, None
+        c, self.pon_agent = self.pon_agent, None
         if c is not None:
             c.shutdown()
 
@@ -451,7 +454,9 @@
         # Register for adapter messages
         self.adapter_agent.register_for_inter_adapter_messages()
 
-        self.zmq_client = AdtranZmqClient(self.ip_address, rx_callback=self.rx_packet, port=self.zmq_port)
+        self.pon_agent = AdtranZmqClient(self.ip_address, port=self.pon_agent_port, rx_callback=self.rx_pa_packet)
+        self.pio_agent = AdtranZmqClient(self.ip_address, port=self.pio_port, rx_callback=self.rx_pio_packet)
+
         self.status_poll = reactor.callLater(5, self.poll_for_status)
 
     def delete(self):
@@ -460,28 +465,60 @@
         # Drop registration for adapter messages
         self.adapter_agent.unregister_for_inter_adapter_messages()
 
-        c, self.zmq_client = self.zmq_client, None
+        c, self.pon_agent = self.pon_agent, None
         if c is not None:
             c.shutdown()
 
         super(AdtranOltHandler, self).delete()
 
-    def rx_packet(self, message):
-        try:
-            self.log.debug('rx_packet')
+    def rx_pa_packet(self, packets):
+        self.log.debug('rx-pon-agent-packet')
 
-            pon_id, onu_id, msg_bytes, is_omci = AdtranZmqClient.decode_packet(message,
-                                                                               self.is_async_control)
-            if is_omci:
-                proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
-                self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
-            else:
-                pass  # TODO: Packet in support not yet supported
-                # self.adapter_agent.send_packet_in(logical_device_id=logical_device_id,
-                #                                   logical_port_no=cvid,  # C-VID encodes port no
-                #                                   packet=str(msg))
-        except Exception as e:
-            self.log.exception('rx_packet', e=e)
+        for packet in packets:
+            try:
+                pon_id, onu_id, msg_bytes, is_omci = \
+                    AdtranZmqClient.decode_pon_agent_packet(packet,
+                                                            self.is_async_control)
+                if is_omci:
+                    proxy_address = self._pon_onu_id_to_proxy_address(pon_id, onu_id)
+
+                    if proxy_address is not None:
+                        self.adapter_agent.receive_proxied_message(proxy_address, msg_bytes)
+
+            except Exception as e:
+                self.log.exception('rx-pon-agent-packet', e=e)
+
+    def _compute_logical_port_no(self, port_no, evc_map, packet):
+        logical_port_no = None
+
+        if self.is_pon_port(port_no):
+            pon = self.get_southbound_port(port_no)
+
+        elif self.is_nni_port(port_no):
+            nni = self.get_northbound_port(port_no)
+            logical_port = nni.get_logical_port() if nni is not None else None
+            logical_port_no = logical_port.ofp_port.port_no if logical_port is not None else None
+
+        # TODO: Need to decode base on port_no & evc_map
+        return logical_port_no
+
+    def rx_pio_packet(self, packets):
+        self.log.debug('rx-packet-in', type=type(packets), data=packets)
+        assert isinstance(packets, list), 'Expected a list of packets'
+
+        if self.logical_device_id is not None:
+            for packet in packets:
+                try:
+                    port_no, evc_map, packet = AdtranZmqClient.decode_packet_in_message(packet)
+                    # packet.show()
+
+                    logical_port_no = self._compute_logical_port_no(port_no, evc_map, packet)
+
+                    self.adapter_agent.send_packet_in(logical_device_id=self.logical_device_id,
+                                                      logical_port_no=logical_port_no,
+                                                      packet=str(packet))
+                except Exception as e:
+                    self.log.exception('rx_pio_packet', e=e)
 
     def poll_for_status(self):
         self.log.debug('Initiating-status-poll')
@@ -582,7 +619,7 @@
         if isinstance(msg, Packet):
             msg = str(msg)
 
-        if self.zmq_client is not None:
+        if self.pon_agent is not None:
             pon_id, onu_id = self._proxy_address_to_pon_onu_id(proxy_address)
 
             pon = self.southbound_ports.get(pon_id)
@@ -594,10 +631,10 @@
                     data = AdtranZmqClient.encode_omci_message(msg, pon_id, onu_id,
                                                                self.is_async_control)
                     try:
-                        self.zmq_client.send(data)
+                        self.pon_agent.send(data)
 
                     except Exception as e:
-                        self.log.exception('zmqClient-send', pon_id=pon_id, onu_id=onu_id, e=e)
+                        self.log.exception('pon-agent-send', pon_id=pon_id, onu_id=onu_id, e=e)
                 else:
                     self.log.debug('onu-invalid-or-disabled', pon_id=pon_id, onu_id=onu_id)
             else:
@@ -664,6 +701,9 @@
         pon_id = self._port_number_to_pon_id(port)
         return self.southbound_ports.get(pon_id, None)
 
+    def get_northbound_port(self, port):
+        return self.northbound_ports.get(port, None)
+
     def get_port_name(self, port):
         if self.is_nni_port(port):
             return self.northbound_ports[port].name
diff --git a/voltha/adapters/adtran_olt/net/adtran_zmq.py b/voltha/adapters/adtran_olt/net/adtran_zmq.py
index 4bbd704..9ef0731 100644
--- a/voltha/adapters/adtran_olt/net/adtran_zmq.py
+++ b/voltha/adapters/adtran_olt/net/adtran_zmq.py
@@ -12,47 +12,56 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import binascii
-import struct
+import sys
 import json
-
+import struct
+import binascii
 import structlog
+
+from twisted.internet.defer import succeed
+from twisted.internet import threads
+
 from txzmq import ZmqEndpoint, ZmqFactory
 from txzmq.connection import ZmqConnection
+
+import zmq
 from zmq import constants
+from zmq.utils import jsonapi
+from zmq.utils.strtypes import b, u
+from zmq.auth.base import Authenticator
+
+from threading import Thread, Event
 
 log = structlog.get_logger()
 zmq_factory = ZmqFactory()
 
 # An OMCI message minimally has a 32-bit PON index and 32-bit ONU ID.
 
-DEFAULT_ZEROMQ_OMCI_TCP_PORT = 5656
+DEFAULT_PON_AGENT_TCP_PORT = 5656
+DEFAULT_PIO_TCP_PORT = 5657
 
 
 class AdtranZmqClient(object):
     """
-    Adtran ZeroMQ Client for PON Agent packet in/out service
-    PON Agent expects and external PAIR socket with
+    Adtran ZeroMQ Client for PON Agent and/or packet in/out service
     """
-    def __init__(self, ip_address, rx_callback=None, port=DEFAULT_ZEROMQ_OMCI_TCP_PORT):
-        self.external_conn = 'tcp://{}:{}'.format(ip_address, port)
+    def __init__(self, ip_address, rx_callback, port):
+        external_conn = 'tcp://{}:{}'.format(ip_address, port)
+        endpoint = ZmqEndpoint('connect', external_conn)
 
-        self.zmq_endpoint = ZmqEndpoint('connect', self.external_conn)
-        self.socket = ZmqPairConnection(zmq_factory,
-                                        self.zmq_endpoint)
-
-        self.socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
+        self._socket = ZmqPairConnection(zmq_factory, endpoint)
+        self._socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
 
     def send(self, data):
         try:
-            self.socket.send(data)
+            self._socket.send(data)
 
         except Exception as e:
             log.exception('send', e=e)
 
     def shutdown(self):
-        self.socket.onReceive = AdtranZmqClient.rx_nop
-        self.socket.shutdown()
+        self._socket.onReceive = AdtranZmqClient.rx_nop
+        self._socket.shutdown()
 
     @staticmethod
     def rx_nop(message):
@@ -118,24 +127,17 @@
                            })
 
     @staticmethod
-    def decode_packet(packet, is_async_control):
+    def decode_pon_agent_packet(packet, is_async_control):
         """
-        Decode the packet provided by the ZMQ client
+        Decode the PON-Agent packet provided by the ZMQ client
 
         :param packet: (bytes) Packet
         :param is_async_control: (bool) Newer async/JSON support
         :return: (long, long, bytes, boolean) PON Index, ONU ID, Frame Contents (OMCI or Ethernet),\
                                               and a flag indicating if it is OMCI
         """
-        # TODO: For now, only OMCI supported
-        if isinstance(packet, list):
-            if len(packet) > 1:
-                pass  # TODO: Can we get multiple packets?
-
-            return AdtranZmqClient._decode_omci_message_json(packet[0]) if is_async_control \
-                else AdtranZmqClient._decode_omci_message_legacy(packet[0])
-
-        return -1, -1, None, False
+        return AdtranZmqClient._decode_omci_message_json(packet) if is_async_control \
+            else AdtranZmqClient._decode_omci_message_legacy(packet)
 
     @staticmethod
     def _decode_omci_message_legacy(packet):
@@ -167,12 +169,25 @@
         return pon_id, onu_id, msg_data, is_omci
 
     @staticmethod
-    def _decode_packet_in_message(packet):
-        # TODO: This is not yet supported
-        (pon_index, onu_id) = struct.unpack_from('!II', packet)
-        msg = binascii.hexlify(packet[8:])
+    def decode_packet_in_message(packet):
+        from scapy.layers.l2 import Ether
+        try:
+            message = json.loads(packet)
+            log.debug('message', message=message)
 
-        return pon_index, onu_id, msg, False
+            for field in ['url', 'evc-map-name', 'total-len', 'port-number', 'message-contents']:
+                assert field in message, "Missing field '{}' in received packet".format(field)
+
+            decoded = message['message-contents'].decode('base64')
+            assert len(decoded.encode('hex')) == message['total-len'], \
+                'Decoded length ({}) != Message Encoded lenght ({})'.\
+                    format(len(decoded.encode('hex')), message['total-len'])
+
+            return message['port-number'], message['evc-map'], Ether(decoded)
+
+        except Exception as e:
+            log.exception('decode', e=e)
+            raise
 
 
 class ZmqPairConnection(ZmqConnection):
@@ -198,3 +213,255 @@
         :param message: message data
         """
         raise NotImplementedError(self)
+
+    def send(self, message):
+        """
+        Send message via ZeroMQ socket.
+
+        Sending is performed directly to ZeroMQ without queueing. If HWM is
+        reached on ZeroMQ side, sending operation is aborted with exception
+        from ZeroMQ (EAGAIN).
+
+        After writing read is scheduled as ZeroMQ may not signal incoming
+        messages after we touched socket with write request.
+
+        :param message: message data, could be either list of str (multipart
+            message) or just str
+        :type message: str or list of str
+        """
+        from txzmq.compat import is_nonstr_iter
+        from twisted.internet import reactor
+
+        if not is_nonstr_iter(message):
+            self.socket.send(message, constants.NOBLOCK)
+        else:
+            # for m in message[:-1]:
+            #     self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
+            # self.socket.send(message[-1], constants.NOBLOCK)
+            self.socket.send_multipart(message, flags=constants.NOBLOCK)
+
+        if self.read_scheduled is None:
+            self.read_scheduled = reactor.callLater(0, self.doRead)
+
+###############################################################################################
+###############################################################################################
+###############################################################################################
+###############################################################################################
+
+def _inherit_docstrings(cls):
+    """inherit docstrings from Authenticator, so we don't duplicate them"""
+    for name, method in cls.__dict__.items():
+        if name.startswith('_'):
+            continue
+        upstream_method = getattr(Authenticator, name, None)
+        if not method.__doc__:
+            method.__doc__ = upstream_method.__doc__
+    return cls
+
+@_inherit_docstrings
+class TwistedZmqAuthenticator(object):
+    """Run ZAP authentication in a background thread but communicate via Twisted ZMQ"""
+
+    def __init__(self, encoding='utf-8'):
+        self.context = zmq_factory.context
+        self.encoding = encoding
+        self.pipe = None
+        self.pipe_endpoint = "inproc://{0}.inproc".format(id(self))
+        self.thread = None
+
+    def allow(self, *addresses):
+        try:
+            self.pipe.send([b'ALLOW'] + [b(a, self.encoding) for a in addresses])
+
+        except Exception as e:
+            log.exception('allow', e=e)
+
+    def deny(self, *addresses):
+        try:
+            self.pipe.send([b'DENY'] + [b(a, self.encoding) for a in addresses])
+
+        except Exception as e:
+            log.exception('deny', e=e)
+
+    def configure_plain(self, domain='*', passwords=None):
+        try:
+            self.pipe.send([b'PLAIN', b(domain, self.encoding), jsonapi.dumps(passwords or {})])
+
+        except Exception as e:
+            log.exception('configure-plain', e=e)
+
+    def configure_curve(self, domain='*', location=''):
+        try:
+            domain = b(domain, self.encoding)
+            location = b(location, self.encoding)
+            self.pipe.send([b'CURVE', domain, location])
+
+        except Exception as e:
+            log.exception('configure-curve', e=e)
+
+    def start(self, rx_callback=AdtranZmqClient.rx_nop):
+        """Start the authentication thread"""
+        try:
+            # create a socket to communicate with auth thread.
+
+            endpoint = ZmqEndpoint('bind', self.pipe_endpoint)      # We are server, thread will be client
+            self.pipe = ZmqPairConnection(zmq_factory, endpoint)
+            self.pipe.onReceive = rx_callback
+
+            self.thread = LocalAuthenticationThread(self.context,
+                                                    self.pipe_endpoint,
+                                                    encoding=self.encoding)
+
+            return threads.deferToThread(TwistedZmqAuthenticator._do_thread_start,
+                                         self.thread, timeout=10)
+
+        except Exception as e:
+            log.exception('start', e=e)
+
+    @staticmethod
+    def _do_thread_start(thread, timeout=10):
+        thread.start()
+
+        # Event.wait:Changed in version 2.7: Previously, the method always returned None.
+        if sys.version_info < (2, 7):
+            thread.started.wait(timeout=timeout)
+
+        elif not thread.started.wait(timeout=timeout):
+            raise RuntimeError("Authenticator thread failed to start")
+
+    def stop(self):
+        """Stop the authentication thread"""
+        pipe, self.pipe = self.pipe, None
+        thread, self.thread = self.thread, None
+
+        if pipe:
+            pipe.send(b'TERMINATE')
+            pipe.onReceive = AdtranZmqClient.rx_nop
+            pipe.shutdown()
+
+            if thread.is_alive():
+                return threads.deferToThread(TwistedZmqAuthenticator._do_thread_join,
+                                             thread)
+        return succeed('done')
+
+    @staticmethod
+    def _do_thread_join(thread, timeout=1):
+        thread.join(timeout)
+        pass
+
+    def is_alive(self):
+        """Is the ZAP thread currently running?"""
+        return self.thread and self.thread.is_alive()
+
+    def __del__(self):
+        self.stop()
+
+
+# NOTE: Following is a duplicated from zmq code since class was not exported
+class LocalAuthenticationThread(Thread):
+    """A Thread for running a zmq Authenticator
+
+    This is run in the background by ThreadedAuthenticator
+    """
+
+    def __init__(self, context, endpoint, encoding='utf-8', authenticator=None):
+        super(LocalAuthenticationThread, self).__init__(name='0mq Authenticator')
+        self.context = context or zmq.Context.instance()
+        self.encoding = encoding
+        self.started = Event()
+        self.authenticator = authenticator or Authenticator(context, encoding=encoding)
+
+        # create a socket to communicate back to main thread.
+        self.pipe = context.socket(zmq.PAIR)
+        self.pipe.linger = 1
+        self.pipe.connect(endpoint)
+
+    def run(self):
+        """Start the Authentication Agent thread task"""
+        try:
+            self.authenticator.start()
+            self.started.set()
+            zap = self.authenticator.zap_socket
+            poller = zmq.Poller()
+            poller.register(self.pipe, zmq.POLLIN)
+            poller.register(zap, zmq.POLLIN)
+            while True:
+                try:
+                    socks = dict(poller.poll())
+                except zmq.ZMQError:
+                    break  # interrupted
+
+                if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
+                    terminate = self._handle_pipe()
+                    if terminate:
+                        break
+
+                if zap in socks and socks[zap] == zmq.POLLIN:
+                    self._handle_zap()
+
+            self.pipe.close()
+            self.authenticator.stop()
+
+        except Exception as e:
+            log.exception("run", e=e)
+
+    def _handle_zap(self):
+        """
+        Handle a message from the ZAP socket.
+        """
+        msg = self.authenticator.zap_socket.recv_multipart()
+        if not msg:
+            return
+        self.authenticator.handle_zap_message(msg)
+
+    def _handle_pipe(self):
+        """
+        Handle a message from front-end API.
+        """
+        terminate = False
+
+        # Get the whole message off the pipe in one go
+        msg = self.pipe.recv_multipart()
+
+        if msg is None:
+            terminate = True
+            return terminate
+
+        command = msg[0]
+        log.debug("auth received API command", command=command)
+
+        if command == b'ALLOW':
+            addresses = [u(m, self.encoding) for m in msg[1:]]
+            try:
+                self.authenticator.allow(*addresses)
+            except Exception as e:
+                log.exception("Failed to allow", addresses=addresses, e=e)
+
+        elif command == b'DENY':
+            addresses = [u(m, self.encoding) for m in msg[1:]]
+            try:
+                self.authenticator.deny(*addresses)
+            except Exception as e:
+                log.exception("Failed to deny", addresses=addresses, e=e)
+
+        elif command == b'PLAIN':
+            domain = u(msg[1], self.encoding)
+            json_passwords = msg[2]
+            self.authenticator.configure_plain(domain, jsonapi.loads(json_passwords))
+
+        elif command == b'CURVE':
+            # For now we don't do anything with domains
+            domain = u(msg[1], self.encoding)
+
+            # If location is CURVE_ALLOW_ANY, allow all clients. Otherwise
+            # treat location as a directory that holds the certificates.
+            location = u(msg[2], self.encoding)
+            self.authenticator.configure_curve(domain, location)
+
+        elif command == b'TERMINATE':
+            terminate = True
+
+        else:
+            log.error("Invalid auth command from API", command=command)
+
+        return terminate
diff --git a/voltha/adapters/adtran_olt/xpon/olt_tcont.py b/voltha/adapters/adtran_olt/xpon/olt_tcont.py
index 5efcdcc..95fd8cc 100644
--- a/voltha/adapters/adtran_olt/xpon/olt_tcont.py
+++ b/voltha/adapters/adtran_olt/xpon/olt_tcont.py
@@ -32,10 +32,10 @@
 
     @staticmethod
     def create(tcont, td):
-        from traffic_descriptor import TrafficDescriptor
+        from olt_traffic_descriptor import OltTrafficDescriptor
 
         assert isinstance(tcont, dict), 'TCONT should be a dictionary'
-        assert isinstance(td, TrafficDescriptor), 'Invalid Traffic Descriptor data type'
+        assert isinstance(td, OltTrafficDescriptor), 'Invalid Traffic Descriptor data type'
 
         return OltTCont(tcont['alloc-id'], td,
                         name=tcont['name'],
diff --git a/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py b/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
index fd4f753..1aa3848 100644
--- a/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
+++ b/voltha/adapters/adtran_olt/xpon/olt_traffic_descriptor.py
@@ -51,12 +51,12 @@
         else:
             best_effort = None
 
-        return TrafficDescriptor(traffic_disc['fixed-bandwidth'],
-                                 traffic_disc['assured-bandwidth'],
-                                 traffic_disc['maximum-bandwidth'],
-                                 name=traffic_disc['name'],
-                                 best_effort=best_effort,
-                                 additional=additional)
+        return OltTrafficDescriptor(traffic_disc['fixed-bandwidth'],
+                                    traffic_disc['assured-bandwidth'],
+                                    traffic_disc['maximum-bandwidth'],
+                                    name=traffic_disc['name'],
+                                    best_effort=best_effort,
+                                    additional=additional)
 
     @inlineCallbacks
     def add_to_hardware(self, session, pon_id, onu_id, alloc_id):