VOL-1397: Adtran-OLT - Initial containerization commit
 - Need to move VERSION to base directory

Change-Id: I9d62d0607a011ce642e379fd92b35ec48b300070
diff --git a/adapters/adtran_common/net/__init__.py b/adapters/adtran_common/net/__init__.py
new file mode 100644
index 0000000..d67fcf2
--- /dev/null
+++ b/adapters/adtran_common/net/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present ADTRAN, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/adapters/adtran_common/net/adtran_netconf.py b/adapters/adtran_common/net/adtran_netconf.py
new file mode 100644
index 0000000..4e39a6a
--- /dev/null
+++ b/adapters/adtran_common/net/adtran_netconf.py
@@ -0,0 +1,373 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from lxml import etree
+from ncclient import manager
+from ncclient.operations import RPCError
+from ncclient.transport.errors import SSHError
+from twisted.internet import defer, threads
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = structlog.get_logger('ncclient')
+
+ADTRAN_NS = 'http://www.adtran.com/ns/yang'
+
+
+def adtran_module_url(module):
+    return '{}/{}'.format(ADTRAN_NS, module)
+
+
+def phys_entities_rpc():
+    return """
+    <filter xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">
+      <physical-entities-state xmlns="{}">
+        <physical-entity/>
+      </physical-entities-state>
+    </filter>
+    """.format(adtran_module_url('adtran-physical-entities'))
+
+
+class AdtranNetconfClient(object):
+    """
+    Performs NETCONF requests
+    """
+    def __init__(self, host_ip, port=830, username='', password='', timeout=10):
+        self._ip = host_ip
+        self._port = port
+        self._username = username
+        self._password = password
+        self._timeout = timeout
+        self._session = None
+
+    def __str__(self):
+        return "AdtranNetconfClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+    @property
+    def capabilities(self):
+        """
+        Get the server's NETCONF capabilities
+
+        :return: (ncclient.capabilities.Capabilities) object representing the server's capabilities.
+        """
+        return self._session.server_capabilities if self._session else None
+
+    @property
+    def connected(self):
+        """
+        Is this client connected to a NETCONF server
+        :return: (boolean) True if connected
+        """
+        return self._session is not None and self._session.connected
+
+    def connect(self, connect_timeout=None):
+        """
+        Connect to the NETCONF server
+
+          o To disable attempting publickey authentication altogether, call with
+            allow_agent and look_for_keys as False.
+
+          o hostkey_verify enables hostkey verification from ~/.ssh/known_hosts
+
+        :return: (deferred) Deferred request
+        """
+        timeout = connect_timeout or self._timeout
+
+        return threads.deferToThread(self._do_connect, timeout)
+
+    def _do_connect(self, timeout):
+        try:
+            self._session = manager.connect(host=self._ip,
+                                            port=self._port,
+                                            username=self._username,
+                                            password=self._password,
+                                            allow_agent=False,
+                                            look_for_keys=False,
+                                            hostkey_verify=False,
+                                            timeout=timeout)
+
+        except SSHError as e:
+            # Log and rethrow exception so any errBack is called
+            log.warn('SSHError-during-connect', e=e)
+            raise e
+
+        except Exception as e:
+            # Log and rethrow exception so any errBack is called
+            log.exception('Connect-failed: {}', e=e)
+            raise e
+
+        # If debug logging is enabled, decrease the level, DEBUG is a significant
+        # performance hit during response XML decode
+
+        if log.isEnabledFor('DEBUG'):
+            log.setLevel('INFO')
+
+        # TODO: ncclient also supports RaiseMode:NONE to limit exceptions.  To set use:
+        #
+        #  self._session.raise_mode = RaiseMode:NONE
+        #
+        # and the when you get a response back, you can check   'response.ok' to
+        # see if it is 'True' if it is not, you can enumerate the 'response.errors'
+        # list for more information
+
+        return self._session
+
+    def close(self):
+        """
+        Close the connection to the NETCONF server
+        :return:  (deferred) Deferred request
+        """
+        s, self._session = self._session, None
+
+        if s is None or not s.connected:
+            return defer.returnValue(True)
+
+        return threads.deferToThread(self._do_close, s)
+
+    def _do_close(self, old_session):
+        return old_session.close_session()
+
+    @inlineCallbacks
+    def _reconnect(self):
+        try:
+            yield self.close()
+        except:
+            pass
+
+        try:
+            yield self.connect()
+        except:
+            pass
+
+    def get_config(self, source='running'):
+        """
+        Get the configuration from the specified source
+
+        :param source: (string) Configuration source, 'running', 'candidate', ...
+
+        :return: (deferred) Deferred request that wraps the GetReply class
+        """
+        if not self._session:
+            raise NotImplemented('No SSH Session')
+
+        if not self._session.connected:
+            self._reconnect()
+
+        return threads.deferToThread(self._do_get_config, source)
+
+    def _do_get_config(self, source):
+        """
+        Get the configuration from the specified source
+
+        :param source: (string) Configuration source, 'running', 'candidate', ...
+
+        :return: (GetReply) The configuration.
+        """
+        return self._session.get_config(source)
+
+    def get(self, payload):
+        """
+        Get the requested data from the server
+
+        :param payload: Payload/filter
+        :return: (deferred) for GetReply
+        """
+        log.debug('get', filter=payload)
+
+        if not self._session:
+            raise NotImplemented('No SSH Session')
+
+        if not self._session.connected:
+            self._reconnect()
+
+        return threads.deferToThread(self._do_get, payload)
+
+    def _do_get(self, payload):
+        """
+        Get the requested data from the server
+
+        :param payload: Payload/filter
+        :return: (GetReply) response
+        """
+        try:
+            log.debug('get', payload=payload)
+            response = self._session.get(payload)
+            # To get XML, use response.xml
+            log.debug('response', response=response)
+
+        except RPCError as e:
+            log.exception('get', e=e)
+            raise
+
+        return response
+
+    def lock(self, source, lock_timeout):
+        """
+        Lock the configuration system
+        :return: (deferred) for RpcReply
+        """
+        log.info('lock', source=source, timeout=lock_timeout)
+
+        if not self._session or not self._session.connected:
+            raise NotImplemented('TODO: Support auto-connect if needed')
+
+        return threads.deferToThread(self._do_lock, source, lock_timeout)
+
+    def _do_lock(self, source, lock_timeout):
+        """
+        Lock the configuration system
+        """
+        try:
+            response = self._session.lock(source, timeout=lock_timeout)
+            # To get XML, use response.xml
+
+        except RPCError as e:
+            log.exception('lock', e=e)
+            raise
+
+        return response
+
+    def unlock(self, source):
+        """
+        Get the requested data from the server
+        :param source: RPC request
+
+        :return: (deferred) for RpcReply
+        """
+        log.info('unlock', source=source)
+
+        if not self._session or not self._session.connected:
+            raise NotImplemented('TODO: Support auto-connect if needed')
+
+        return threads.deferToThread(self._do_unlock, source)
+
+    def _do_unlock(self, source):
+        """
+        Lock the configuration system
+        """
+        try:
+            response = self._session.unlock(source)
+            # To get XML, use response.xml
+
+        except RPCError as e:
+            log.exception('unlock', e=e)
+            raise
+
+        return response
+
+    @inlineCallbacks
+    def edit_config(self, config, target='running', default_operation='none',
+                    test_option=None, error_option=None, ignore_delete_error=False):
+        """
+        Loads all or part of the specified config to the target configuration datastore
+        with the ability to lock the datastore during the edit.
+
+        :param config is the configuration, which must be rooted in the config element.
+                      It can be specified either as a string or an Element.format="xml"
+        :param target is the name of the configuration datastore being edited
+        :param default_operation if specified must be one of { 'merge', 'replace', or 'none' }
+        :param test_option if specified must be one of { 'test_then_set', 'set' }
+        :param error_option if specified must be one of { 'stop-on-error',
+                            'continue-on-error', 'rollback-on-error' } The
+                            'rollback-on-error' error_option depends on the
+                            :rollback-on-error capability.
+        :param ignore_delete_error: (bool) For some startup deletes/clean-ups, we do a
+                                    delete high up in the config to get whole lists. If
+                                    these lists are empty, this helps suppress any error
+                                    message from NETConf on failure to delete an empty list
+
+        :return: (deferred) for RpcReply
+        """
+        if not self._session:
+            raise NotImplemented('No SSH Session')
+
+        if not self._session.connected:
+            try:
+                yield self._reconnect()
+
+            except Exception as e:
+                log.exception('edit-config-connect', e=e)
+
+        try:
+            if config[:7] != '<config':
+                config = '<config xmlns:xc="urn:ietf:params:xml:ns:netconf:base:1.0"' + \
+                         ' xmlns="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+                         config + '</config>'
+
+            log.debug('netconf-request', config=config, target=target,
+                      default_operation=default_operation)
+
+            rpc_reply = yield threads.deferToThread(self._do_edit_config, target,
+                                                    config, default_operation,
+                                                    test_option, error_option)
+        except Exception as e:
+            if ignore_delete_error and 'operation="delete"' in config.lower():
+                returnValue('ignoring-delete-error')
+            log.exception('edit_config', e=e, config=config, target=target)
+            raise
+
+        returnValue(rpc_reply)
+
+    def _do_edit_config(self, target, config, default_operation, test_option, error_option,
+                        ignore_delete_error=False):
+        """
+        Perform actual edit-config operation
+        """
+        try:
+            log.debug('edit-config', target=target, config=config)
+            
+            response = self._session.edit_config(target=target, config=config
+                                                 # TODO: Support additional options later
+                                                 # ,default_operation=default_operation,
+                                                 # test_option=test_option,
+                                                 # error_option=error_option
+                                                 )
+
+            log.debug('netconf-response', response=response)
+            # To get XML, use response.xml
+            # To check status, use response.ok  (boolean)
+
+        except RPCError as e:
+            if not ignore_delete_error or 'operation="delete"' not in config.lower():
+                log.exception('do_edit_config', e=e, config=config, target=target)
+            raise
+
+        return response
+
+    def rpc(self, rpc_string):
+        """
+        Custom RPC request
+        :param rpc_string: (string) RPC request
+        :return: (deferred) for GetReply
+        """
+        log.debug('rpc', rpc=rpc_string)
+
+        if not self._session:
+            raise NotImplemented('No SSH Session')
+
+        if not self._session.connected:
+            self._reconnect()
+
+        return threads.deferToThread(self._do_rpc, rpc_string)
+
+    def _do_rpc(self, rpc_string):
+        try:
+            response = self._session.dispatch(etree.fromstring(rpc_string))
+            # To get XML, use response.xml
+
+        except RPCError as e:
+            log.exception('rpc', e=e)
+            raise
+
+        return response
diff --git a/adapters/adtran_common/net/adtran_rest.py b/adapters/adtran_common/net/adtran_rest.py
new file mode 100644
index 0000000..9020e82
--- /dev/null
+++ b/adapters/adtran_common/net/adtran_rest.py
@@ -0,0 +1,189 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+
+import structlog
+import treq
+from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.error import ConnectionClosed, ConnectionDone, ConnectionLost
+
+log = structlog.get_logger()
+
+
+class RestInvalidResponseCode(Exception):
+    def __init__(self, message, url, code):
+        super(RestInvalidResponseCode, self).__init__(message)
+        self.url = url
+        self.code = code
+
+
+class AdtranRestClient(object):
+    """
+    Performs Adtran RESTCONF requests
+    """
+    # HTTP shortcuts
+    HELLO_URI = '/restconf/adtran-hello:hello'
+
+    REST_GET_REQUEST_HEADER = {'User-Agent': 'Adtran RESTConf',
+                               'Accept': ['application/json']}
+
+    REST_POST_REQUEST_HEADER = {'User-Agent': 'Adtran RESTConf',
+                                'Content-Type': 'application/json',
+                                'Accept': ['application/json']}
+
+    REST_PATCH_REQUEST_HEADER = REST_POST_REQUEST_HEADER
+    REST_PUT_REQUEST_HEADER = REST_POST_REQUEST_HEADER
+    REST_DELETE_REQUEST_HEADER = REST_GET_REQUEST_HEADER
+
+    HTTP_OK = 200
+    HTTP_CREATED = 201
+    HTTP_ACCEPTED = 202
+    HTTP_NON_AUTHORITATIVE_INFORMATION = 203
+    HTTP_NO_CONTENT = 204
+    HTTP_RESET_CONTENT = 205
+    HTTP_PARTIAL_CONTENT = 206
+    HTTP_NOT_FOUND = 404
+
+    _valid_methods = {'GET', 'POST', 'PATCH', 'DELETE'}
+    _valid_results = {'GET': [HTTP_OK, HTTP_NO_CONTENT],
+                      'POST': [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT],
+                      'PUT': [HTTP_OK, HTTP_CREATED, HTTP_NO_CONTENT],
+                      'PATCH': [HTTP_OK],
+                      'DELETE': [HTTP_OK, HTTP_ACCEPTED, HTTP_NO_CONTENT, HTTP_NOT_FOUND]
+                      }
+
+    for _method in _valid_methods:
+        assert _method in _valid_results  # Make sure we have a results entry for each supported method
+
+    def __init__(self, host_ip, port, username='', password='', timeout=10):
+        """
+        REST Client initialization
+
+        :param host_ip: (string) IP Address of Adtran Device
+        :param port: (int) Port number
+        :param username: (string) Username for credentials
+        :param password: (string) Password for credentials
+        :param timeout: (int) Number of seconds to wait for a response before timing out
+        """
+        self._ip = host_ip
+        self._port = port
+        self._username = username
+        self._password = password
+        self._timeout = timeout
+
+    def __str__(self):
+        return "AdtranRestClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+    @inlineCallbacks
+    def request(self, method, uri, data=None, name='', timeout=None, is_retry=False,
+                suppress_error=False):
+        """
+        Send a REST request to the Adtran device
+
+        :param method: (string) HTTP method
+        :param uri: (string) fully URL to perform method on
+        :param data: (string) optional data for the request body
+        :param name: (string) optional name of the request, useful for logging purposes
+        :param timeout: (int) Number of seconds to wait for a response before timing out
+        :param is_retry: (boolean) True if this method called recursively in order to recover
+                                   from a connection loss. Can happen sometimes in debug sessions
+                                   and in the real world.
+        :param suppress_error: (boolean) If true, do not output ERROR message on REST request failure
+        :return: (dict) On success with the proper results
+        """
+        log.debug('request', method=method, uri=uri, data=data, retry=is_retry)
+
+        if method.upper() not in self._valid_methods:
+            raise NotImplementedError("REST method '{}' is not supported".format(method))
+
+        url = 'http://{}:{}{}{}'.format(self._ip, self._port,
+                                        '/' if uri[0] != '/' else '',
+                                        uri)
+        response = None
+        timeout = timeout or self._timeout
+
+        try:
+            if method.upper() == 'GET':
+                response = yield treq.get(url,
+                                          auth=(self._username, self._password),
+                                          timeout=timeout,
+                                          headers=self.REST_GET_REQUEST_HEADER)
+            elif method.upper() == 'POST' or method.upper() == 'PUT':
+                response = yield treq.post(url,
+                                           data=data,
+                                           auth=(self._username, self._password),
+                                           timeout=timeout,
+                                           headers=self.REST_POST_REQUEST_HEADER)
+            elif method.upper() == 'PATCH':
+                response = yield treq.patch(url,
+                                            data=data,
+                                            auth=(self._username, self._password),
+                                            timeout=timeout,
+                                            headers=self.REST_PATCH_REQUEST_HEADER)
+            elif method.upper() == 'DELETE':
+                response = yield treq.delete(url,
+                                             auth=(self._username, self._password),
+                                             timeout=timeout,
+                                             headers=self.REST_DELETE_REQUEST_HEADER)
+            else:
+                raise NotImplementedError("REST method '{}' is not supported".format(method))
+
+        except NotImplementedError:
+            raise
+
+        except (ConnectionDone, ConnectionLost) as e:
+            if is_retry:
+                raise
+            returnValue(self.request(method, uri, data=data, name=name,
+                                     timeout=timeout, is_retry=True))
+
+        except ConnectionClosed:
+            returnValue(ConnectionClosed)
+
+        except Exception as e:
+            log.exception("rest-request", method=method, url=url, name=name, e=e)
+            raise
+
+        if response.code not in self._valid_results[method.upper()]:
+            message = "REST {} '{}' request to '{}' failed with status code {}".format(method, name,
+                                                                                       url, response.code)
+            if not suppress_error:
+                log.error(message)
+            raise RestInvalidResponseCode(message, url, response.code)
+
+        if response.code in {self.HTTP_NO_CONTENT, self.HTTP_NOT_FOUND}:
+            returnValue(None)
+
+        else:
+            # TODO: May want to support multiple body encodings in the future
+
+            headers = response.headers
+            type_key = 'content-type'
+            type_val = 'application/json'
+
+            if not headers.hasHeader(type_key) or type_val not in headers.getRawHeaders(type_key, []):
+                raise Exception("REST {} '{}' request response from '{}' was not JSON",
+                                method, name, url)
+
+            content = yield response.content()
+            try:
+                result = json.loads(content)
+
+            except Exception as e:
+                log.exception("json-decode", method=method, url=url, name=name,
+                              content=content, e=e)
+                raise
+
+            returnValue(result)
diff --git a/adapters/adtran_common/net/adtran_zmq.py b/adapters/adtran_common/net/adtran_zmq.py
new file mode 100644
index 0000000..1d1341c
--- /dev/null
+++ b/adapters/adtran_common/net/adtran_zmq.py
@@ -0,0 +1,379 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+import structlog
+
+from twisted.internet.defer import succeed
+from twisted.internet import threads
+
+from txzmq import ZmqEndpoint, ZmqFactory
+from txzmq.connection import ZmqConnection
+
+import zmq
+from zmq import constants
+from zmq.utils import jsonapi
+from zmq.utils.strtypes import b, u
+from zmq.auth.base import Authenticator
+
+from threading import Thread, Event
+
+zmq_factory = ZmqFactory()
+
+
+class AdtranZmqClient(object):
+    """
+    Adtran ZeroMQ Client for PON Agent and/or packet in/out service
+    """
+    def __init__(self, ip_address, rx_callback, port):
+        self.log = structlog.get_logger()
+
+        external_conn = 'tcp://{}:{}'.format(ip_address, port)
+
+        self.zmq_endpoint = ZmqEndpoint('connect', external_conn)
+        self._socket = ZmqPairConnection(zmq_factory, self.zmq_endpoint)
+        self._socket.onReceive = rx_callback or AdtranZmqClient.rx_nop
+        self.auth = None
+
+    def send(self, data):
+        try:
+            self._socket.send(data)
+
+        except Exception as e:
+            self.log.exception('send', e=e)
+
+    def shutdown(self):
+        self._socket.onReceive = AdtranZmqClient.rx_nop
+        self._socket.shutdown()
+
+    @property
+    def socket(self):
+        return self._socket
+
+    @staticmethod
+    def rx_nop(_):
+        pass
+
+    def setup_plain_security(self, username, password):
+        self.log.debug('setup-plain-security')
+
+        def configure_plain(_):
+            self.log.debug('plain-security', username=username,
+                           password=password)
+
+            self.auth.configure_plain(domain='*', passwords={username: password})
+            self._socket.socket.plain_username = username
+            self._socket.socket.plain_password = password
+
+        def add_endoints(_results):
+            self._socket.addEndpoints([self.zmq_endpoint])
+
+        def config_failure(_results):
+            raise Exception('Failed to configure plain-text security')
+
+        def endpoint_failure(_results):
+            raise Exception('Failed to complete endpoint setup')
+
+        self.auth = TwistedZmqAuthenticator()
+
+        d = self.auth.start()
+        d.addCallbacks(configure_plain, config_failure)
+        d.addCallbacks(add_endoints, endpoint_failure)
+
+        return d
+
+    def setup_curve_security(self):
+        self.log.debug('setup-curve-security')
+        raise NotImplementedError('TODO: curve transport security is not yet supported')
+
+
+class ZmqPairConnection(ZmqConnection):
+    """
+    Bidirectional messages to/from the socket.
+
+    Wrapper around ZeroMQ PUSH socket.
+    """
+    socketType = constants.PAIR
+
+    def messageReceived(self, message):
+        """
+        Called on incoming message from ZeroMQ.
+
+        :param message: message data
+        """
+        self.onReceive(message)
+
+    def onReceive(self, message):
+        """
+        Called on incoming message received from other end of the pair.
+
+        :param message: message data
+        """
+        raise NotImplementedError(self)
+
+    def send(self, message):
+        """
+        Send message via ZeroMQ socket.
+
+        Sending is performed directly to ZeroMQ without queueing. If HWM is
+        reached on ZeroMQ side, sending operation is aborted with exception
+        from ZeroMQ (EAGAIN).
+
+        After writing read is scheduled as ZeroMQ may not signal incoming
+        messages after we touched socket with write request.
+
+        :param message: message data, could be either list of str (multipart
+            message) or just str
+        :type message: str or list of str
+        """
+        from txzmq.compat import is_nonstr_iter
+        from twisted.internet import reactor
+
+        if not is_nonstr_iter(message):
+            self.socket.send(message, constants.NOBLOCK)
+        else:
+            # for m in message[:-1]:
+            #     self.socket.send(m, constants.NOBLOCK | constants.SNDMORE)
+            # self.socket.send(message[-1], constants.NOBLOCK)
+            self.socket.send_multipart(message, flags=constants.NOBLOCK)
+
+        if self.read_scheduled is None:
+            self.read_scheduled = reactor.callLater(0, self.doRead)
+
+###############################################################################################
+###############################################################################################
+###############################################################################################
+###############################################################################################
+
+
+def _inherit_docstrings(cls):
+    """inherit docstrings from Authenticator, so we don't duplicate them"""
+    for name, method in cls.__dict__.items():
+        if name.startswith('_'):
+            continue
+        upstream_method = getattr(Authenticator, name, None)
+        if not method.__doc__:
+            method.__doc__ = upstream_method.__doc__
+    return cls
+
+
+@_inherit_docstrings
+class TwistedZmqAuthenticator(object):
+    """Run ZAP authentication in a background thread but communicate via Twisted ZMQ"""
+
+    def __init__(self, encoding='utf-8'):
+        self.log = structlog.get_logger()
+        self.context = zmq_factory.context
+        self.encoding = encoding
+        self.pipe = None
+        self.pipe_endpoint = "inproc://{0}.inproc".format(id(self))
+        self.thread = None
+
+    def allow(self, *addresses):
+        try:
+            self.pipe.send([b'ALLOW'] + [b(a, self.encoding) for a in addresses])
+
+        except Exception as e:
+            self.log.exception('allow', e=e)
+
+    def deny(self, *addresses):
+        try:
+            self.pipe.send([b'DENY'] + [b(a, self.encoding) for a in addresses])
+
+        except Exception as e:
+            self.log.exception('deny', e=e)
+
+    def configure_plain(self, domain='*', passwords=None):
+        try:
+            self.pipe.send([b'PLAIN', b(domain, self.encoding), jsonapi.dumps(passwords or {})])
+
+        except Exception as e:
+            self.log.exception('configure-plain', e=e)
+
+    def configure_curve(self, domain='*', location=''):
+        try:
+            domain = b(domain, self.encoding)
+            location = b(location, self.encoding)
+            self.pipe.send([b'CURVE', domain, location])
+
+        except Exception as e:
+            self.log.exception('configure-curve', e=e)
+
+    def start(self, rx_callback=AdtranZmqClient.rx_nop):
+        """Start the authentication thread"""
+        try:
+            # create a socket to communicate with auth thread.
+
+            endpoint = ZmqEndpoint('bind', self.pipe_endpoint)      # We are server, thread will be client
+            self.pipe = ZmqPairConnection(zmq_factory, endpoint)
+            self.pipe.onReceive = rx_callback
+
+            self.thread = LocalAuthenticationThread(self.context,
+                                                    self.pipe_endpoint,
+                                                    encoding=self.encoding)
+
+            return threads.deferToThread(TwistedZmqAuthenticator._do_thread_start,
+                                         self.thread, timeout=10)
+
+        except Exception as e:
+            self.log.exception('start', e=e)
+
+    @staticmethod
+    def _do_thread_start(thread, timeout=10):
+        thread.start()
+
+        # Event.wait:Changed in version 2.7: Previously, the method always returned None.
+        if sys.version_info < (2, 7):
+            thread.started.wait(timeout=timeout)
+
+        elif not thread.started.wait(timeout=timeout):
+            raise RuntimeError("Authenticator thread failed to start")
+
+    def stop(self):
+        """Stop the authentication thread"""
+        pipe, self.pipe = self.pipe, None
+        thread, self.thread = self.thread, None
+
+        if pipe:
+            pipe.send(b'TERMINATE')
+            pipe.onReceive = AdtranZmqClient.rx_nop
+            pipe.shutdown()
+
+            if thread.is_alive():
+                return threads.deferToThread(TwistedZmqAuthenticator._do_thread_join,
+                                             thread)
+        return succeed('done')
+
+    @staticmethod
+    def _do_thread_join(thread, timeout=1):
+        thread.join(timeout)
+        pass
+
+    def is_alive(self):
+        """Is the ZAP thread currently running?"""
+        return self.thread and self.thread.is_alive()
+
+    def __del__(self):
+        self.stop()
+
+
+# NOTE: Following is a duplicated from zmq code since class was not exported
+class LocalAuthenticationThread(Thread):
+    """A Thread for running a zmq Authenticator
+
+    This is run in the background by ThreadedAuthenticator
+    """
+
+    def __init__(self, context, endpoint, encoding='utf-8', authenticator=None):
+        super(LocalAuthenticationThread, self).__init__(name='0mq Authenticator')
+        self.log = structlog.get_logger()
+        self.context = context or zmq.Context.instance()
+        self.encoding = encoding
+        self.started = Event()
+        self.authenticator = authenticator or Authenticator(context, encoding=encoding)
+
+        # create a socket to communicate back to main thread.
+        self.pipe = context.socket(zmq.PAIR)
+        self.pipe.linger = 1
+        self.pipe.connect(endpoint)
+
+    def run(self):
+        """Start the Authentication Agent thread task"""
+        try:
+            self.authenticator.start()
+            self.started.set()
+            zap = self.authenticator.zap_socket
+            poller = zmq.Poller()
+            poller.register(self.pipe, zmq.POLLIN)
+            poller.register(zap, zmq.POLLIN)
+            while True:
+                try:
+                    socks = dict(poller.poll())
+                except zmq.ZMQError:
+                    break  # interrupted
+
+                if self.pipe in socks and socks[self.pipe] == zmq.POLLIN:
+                    terminate = self._handle_pipe()
+                    if terminate:
+                        break
+
+                if zap in socks and socks[zap] == zmq.POLLIN:
+                    self._handle_zap()
+
+            self.pipe.close()
+            self.authenticator.stop()
+
+        except Exception as e:
+            self.log.exception("run", e=e)
+
+    def _handle_zap(self):
+        """
+        Handle a message from the ZAP socket.
+        """
+        msg = self.authenticator.zap_socket.recv_multipart()
+        if not msg:
+            return
+        self.authenticator.handle_zap_message(msg)
+
+    def _handle_pipe(self):
+        """
+        Handle a message from front-end API.
+        """
+        terminate = False
+
+        # Get the whole message off the pipe in one go
+        msg = self.pipe.recv_multipart()
+
+        if msg is None:
+            terminate = True
+            return terminate
+
+        command = msg[0]
+        self.log.debug("auth received API command", command=command)
+
+        if command == b'ALLOW':
+            addresses = [u(m, self.encoding) for m in msg[1:]]
+            try:
+                self.authenticator.allow(*addresses)
+            except Exception as e:
+                self.log.exception("Failed to allow", addresses=addresses, e=e)
+
+        elif command == b'DENY':
+            addresses = [u(m, self.encoding) for m in msg[1:]]
+            try:
+                self.authenticator.deny(*addresses)
+            except Exception as e:
+                self.log.exception("Failed to deny", addresses=addresses, e=e)
+
+        elif command == b'PLAIN':
+            domain = u(msg[1], self.encoding)
+            json_passwords = msg[2]
+            self.authenticator.configure_plain(domain, jsonapi.loads(json_passwords))
+
+        elif command == b'CURVE':
+            # For now we don't do anything with domains
+            domain = u(msg[1], self.encoding)
+
+            # If location is CURVE_ALLOW_ANY, allow all clients. Otherwise
+            # treat location as a directory that holds the certificates.
+            location = u(msg[2], self.encoding)
+            self.authenticator.configure_curve(domain, location)
+
+        elif command == b'TERMINATE':
+            terminate = True
+
+        else:
+            self.log.error("Invalid auth command from API", command=command)
+
+        return terminate
diff --git a/adapters/adtran_common/net/mock_netconf_client.py b/adapters/adtran_common/net/mock_netconf_client.py
new file mode 100644
index 0000000..314f2a0
--- /dev/null
+++ b/adapters/adtran_common/net/mock_netconf_client.py
@@ -0,0 +1,199 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+import random
+import time
+from adtran_netconf import AdtranNetconfClient
+from pyvoltha.common.utils.asleep import asleep
+from ncclient.operations.rpc import RPCReply, RPCError
+from twisted.internet.defer import inlineCallbacks, returnValue
+
+log = structlog.get_logger()
+
+_dummy_xml = '<rpc-reply message-id="br-549" ' + \
+      'xmlns="urn:ietf:params:xml:ns:netconf:base:1.0" ' + \
+      'xmlns:nc="urn:ietf:params:xml:ns:netconf:base:1.0">' + \
+      '<data/>' + \
+      '</rpc-reply>'
+
+
+class MockNetconfClient(AdtranNetconfClient):
+    """
+    Performs NETCONF requests
+    """
+    def __init__(self, host_ip, port=830, username='', password='', timeout=20):
+        super(MockNetconfClient, self).__init__(host_ip, port=port, username=username,
+                                                password=password, timeout=timeout)
+        self._connected = False
+        self._locked = {}
+
+    def __str__(self):
+        return "MockNetconfClient {}@{}:{}".format(self._username, self._ip, self._port)
+
+    @property
+    def capabilities(self):
+        """
+        Get the server's NETCONF capabilities
+
+        :return: (ncclient.capabilities.Capabilities) object representing the server's capabilities.
+        """
+        return None
+
+    @property
+    def connected(self):
+        """
+        Is this client connected to a NETCONF server
+        :return: (boolean) True if connected
+        """
+        return self._connected
+
+    @inlineCallbacks
+    def connect(self, connect_timeout=None):
+        """
+        Connect to the NETCONF server
+          o To disable attempting publickey authentication altogether, call with
+            allow_agent and look_for_keys as False.`
+
+          o hostkey_verify enables hostkey verification from ~/.ssh/known_hosts
+
+        :return: (deferred) Deferred request
+        """
+        yield asleep(random.uniform(0.1, 5.0))   # Simulate NETCONF request delay
+        self._connected = True
+        self._locked = {}
+        returnValue(True)
+
+    @inlineCallbacks
+    def close(self):
+        """
+        Close the connection to the NETCONF server
+        :return:  (deferred) Deferred request
+        """
+        yield asleep(random.uniform(0.1, 0.5))   # Simulate NETCONF request delay
+        self._connected = False
+        self._locked = {}
+        returnValue(True)
+
+    @inlineCallbacks
+    def get_config(self, source='running'):
+        """
+        Get the configuration from the specified source
+
+        :param source: (string) Configuration source, 'running', 'candidate', ...
+        :return: (deferred) Deferred request that wraps the GetReply class
+        """
+        yield asleep(random.uniform(0.1, 4.0))   # Simulate NETCONF request delay
+
+        # TODO: Customize if needed...
+        xml = _dummy_xml
+        returnValue(RPCReply(xml))
+
+    @inlineCallbacks
+    def get(self, payload):
+        """
+        Get the requested data from the server
+
+        :param payload: Payload/filter
+        :return: (defeered) for GetReply
+        """
+        yield asleep(random.uniform(0.1, 3.0))   # Simulate NETCONF request delay
+
+        # TODO: Customize if needed...
+        xml = _dummy_xml
+        returnValue(RPCReply(xml))
+
+    @inlineCallbacks
+    def lock(self, source, lock_timeout):
+        """
+        Lock the configuration system
+        :param source: is the name of the configuration datastore accessed
+        :param lock_timeout: timeout in seconds for holding the lock
+        :return: (defeered) for RpcReply
+        """
+        expire_time = time.time() + lock_timeout
+
+        if source not in self._locked:
+            self._locked[source] = None
+
+        while self._locked[source] is not None:
+            # Watch for lock timeout
+            if time.time() >= self._locked[source]:
+                self._locked[source] = None
+                break
+            yield asleep(0.1)
+
+        if time.time() < expire_time:
+            yield asleep(random.uniform(0.1, 0.5))   # Simulate NETCONF request delay
+            self._locked[source] = expire_time
+
+        returnValue(RPCReply(_dummy_xml) if expire_time > time.time() else RPCError('TODO'))
+
+    @inlineCallbacks
+    def unlock(self, source):
+        """
+        Get the requested data from the server
+        :param rpc_string: RPC request
+        :param source: is the name of the configuration datastore accessed
+        :return: (defeered) for RpcReply
+        """
+        if source not in self._locked:
+            self._locked[source] = None
+
+        if self._locked[source] is not None:
+            yield asleep(random.uniform(0.1, 0.5))   # Simulate NETCONF request delay
+
+        self._locked[source] = None
+        returnValue(RPCReply(_dummy_xml))
+
+    @inlineCallbacks
+    def edit_config(self, config, target='running', default_operation='merge',
+                    test_option=None, error_option=None):
+        """
+        Loads all or part of the specified config to the target configuration datastore with the ability to lock
+        the datastore during the edit.
+
+        :param config is the configuration, which must be rooted in the config element. It can be specified
+                      either as a string or an Element.format="xml"
+        :param target is the name of the configuration datastore being edited
+        :param default_operation if specified must be one of { 'merge', 'replace', or 'none' }
+        :param test_option if specified must be one of { 'test_then_set', 'set' }
+        :param error_option if specified must be one of { 'stop-on-error', 'continue-on-error', 'rollback-on-error' }
+                            The 'rollback-on-error' error_option depends on the :rollback-on-error capability.
+
+        :return: (defeered) for RpcReply
+        """
+        try:
+            yield asleep(random.uniform(0.1, 2.0))  # Simulate NETCONF request delay
+
+        except Exception as e:
+            log.exception('edit_config', e=e)
+            raise
+
+        # TODO: Customize if needed...
+        xml = _dummy_xml
+        returnValue(RPCReply(xml))
+
+    @inlineCallbacks
+    def rpc(self, rpc_string):
+        """
+        Custom RPC request
+        :param rpc_string: (string) RPC request
+        :return: (defeered) for GetReply
+        """
+        yield asleep(random.uniform(0.1, 2.0))   # Simulate NETCONF request delay
+
+        # TODO: Customize if needed...
+        xml = _dummy_xml
+        returnValue(RPCReply(xml))
diff --git a/adapters/adtran_common/net/rcmd.py b/adapters/adtran_common/net/rcmd.py
new file mode 100644
index 0000000..3062b4c
--- /dev/null
+++ b/adapters/adtran_common/net/rcmd.py
@@ -0,0 +1,112 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from twisted.internet.defer import Deferred, succeed
+from twisted.internet.protocol import Factory, Protocol
+from twisted.conch.client.knownhosts import ConsoleUI, KnownHostsFile
+from twisted.conch.endpoints import SSHCommandClientEndpoint
+from twisted.internet import reactor
+
+log = structlog.get_logger()
+_open = open
+
+
+class RCmd(object):
+    """
+    Execute a one-time remote command via SSH
+    """
+    def __init__(self, host, username, password,
+                 command,
+                 port=None,
+                 keys=None,
+                 known_hosts=None,
+                 agent=None):
+        self.reactor = reactor
+        self.host = host
+        self.port = port
+        self.username = username
+        self.password = password
+        self.keys = keys
+        # self.knownHosts = known_hosts
+        self.knownHosts = known_hosts
+        self.agent = agent
+        self.command = command
+        self.ui = RCmd.FixedResponseUI(True)
+
+    class NoiseProtocol(Protocol):
+        def __init__(self):
+            self.finished = Deferred()
+            self.strings = ["bif", "pow", "zot"]
+
+        def connectionMade(self):
+            log.debug('connection-made')
+            self._send_noise()
+
+        def _send_noise(self):
+            if self.strings:
+                self.transport.write(self.strings.pop(0) + "\n")
+            else:
+                self.transport.loseConnection()
+
+        def dataReceived(self, data):
+            log.debug('rx', data=data)
+            if self.finished is not None and not self.finished.called:
+                self.finished.callback(data)
+            self._send_noise()
+
+        def connectionLost(self, reason):
+            log.debug('connection-lost')
+            if not self.finished.called:
+                self.finished.callback(reason)
+
+    class PermissiveKnownHosts(KnownHostsFile):
+        def verifyHostKey(self, ui, hostname, ip, key):
+            log.debug('verifyHostKey')
+            return True
+
+    class FixedResponseUI(ConsoleUI):
+        def __init__(self, result):
+            super(RCmd.FixedResponseUI, self).__init__(lambda: _open("/dev/null",
+                                                                     "r+b",
+                                                                     buffering=0))
+            self.result = result
+
+        def prompt(self, _):
+            log.debug('prompt')
+            return succeed(True)
+
+        def warn(self, text):
+            log.debug('warn')
+            pass
+
+    def _endpoint_for_command(self, command):
+        return SSHCommandClientEndpoint.newConnection(
+            self.reactor, command, self.username, self.host,
+            port=self.port,
+            password=self.password,
+            keys=self.keys,
+            agentEndpoint=self.agent,
+            knownHosts=self.knownHosts,
+            ui=self.ui
+        )
+
+    def execute(self):
+        endpoint = self._endpoint_for_command(self.command)
+        factory = Factory()
+        factory.protocol = RCmd.NoiseProtocol
+
+        d = endpoint.connect(factory)
+        d.addCallback(lambda proto: proto.finished)
+        return d