Test-Voltha:
Adding some util functions and test scenarios for TLS.
Change-Id: I489330839b24a6daa3c065feb61c51258becae45
diff --git a/src/test/voltha/volthaTest.py b/src/test/voltha/volthaTest.py
index b74753e..3b3d51d 100644
--- a/src/test/voltha/volthaTest.py
+++ b/src/test/voltha/volthaTest.py
@@ -2,10 +2,25 @@
import sys
import unittest
import time
+import json
+import requests
from nose.tools import *
+from nose.twistedtools import reactor, deferred
+from twisted.internet import defer
from CordTestConfig import setup_module
from CordTestUtils import log_test
from VolthaCtrl import VolthaCtrl
+from CordTestUtils import log_test, get_controller
+from portmaps import g_subscriber_port_map
+from OltConfig import *
+from EapTLS import TLSAuthTest
+from OnosCtrl import OnosCtrl
+from CordLogger import CordLogger
+from scapy.all import *
+from scapy_ssl_tls.ssl_tls import *
+from scapy_ssl_tls.ssl_tls_crypto import *
+from CordTestServer import cord_test_onos_restart, cord_test_shell, cord_test_radius_restart
+
class voltha_exchange(unittest.TestCase):
@@ -14,10 +29,184 @@
VOLTHA_HOST = 'localhost'
VOLTHA_REST_PORT = 8881
voltha = None
+ apps = ('org.opencord.aaa', 'org.onosproject.dhcp')
+ olt_apps = () #'org.opencord.cordmcast')
+ vtn_app = 'org.opencord.vtn'
+ table_app = 'org.ciena.cordigmp'
+
+ test_path = os.path.dirname(os.path.realpath(__file__))
+ table_app_file = os.path.join(test_path, '..', 'apps/ciena-cordigmp-multitable-2.0-SNAPSHOT.oar')
+ olt_app_file = os.path.join(test_path, '..', 'apps/olt-app-1.2-SNAPSHOT.oar')
+ #onos_config_path = os.path.join(test_path, '..', 'setup/onos-config')
+ olt_conf_file = os.getenv('OLT_CONFIG_FILE', os.path.join(test_path, '..', 'setup/olt_config.json'))
+ onos_restartable = bool(int(os.getenv('ONOS_RESTART', 0)))
+ VOLTHA_ENABLED = True
+ INTF_TX_DEFAULT = 'veth2'
+ INTF_RX_DEFAULT = 'veth0'
+ TESTCASE_TIMEOUT = 300
+ VOLTHA_CONFIG_FAKE = True
+ VOLTHA_UPLINK_VLAN_MAP = { 'of:0000000000000001' : '222' }
+ VOLTHA_ONU_UNI_PORT = 'veth0'
+
+ CLIENT_CERT = """-----BEGIN CERTIFICATE-----
+MIICuDCCAiGgAwIBAgIBAjANBgkqhkiG9w0BAQUFADCBizELMAkGA1UEBhMCVVMx
+CzAJBgNVBAgTAkNBMRIwEAYDVQQHEwlTb21ld2hlcmUxEzARBgNVBAoTCkNpZW5h
+IEluYy4xHjAcBgkqhkiG9w0BCQEWD2FkbWluQGNpZW5hLmNvbTEmMCQGA1UEAxMd
+RXhhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTYwNjA2MjExMjI3WhcN
+MTcwNjAxMjExMjI3WjBnMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEzARBgNV
+BAoTCkNpZW5hIEluYy4xFzAVBgNVBAMUDnVzZXJAY2llbmEuY29tMR0wGwYJKoZI
+hvcNAQkBFg51c2VyQGNpZW5hLmNvbTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkC
+gYEAwvXiSzb9LZ6c7uNziUfKvoHO7wu/uiFC5YUpXbmVGuGZizbVrny0xnR85Dfe
++9R4diansfDhIhzOUl1XjN3YDeSS9OeF5YWNNE8XDhlz2d3rVzaN6hIhdotBkUjg
+rUewjTg5OFR31QEyG3v8xR3CLgiE9xQELjZbSA07pD79zuUCAwEAAaNPME0wEwYD
+VR0lBAwwCgYIKwYBBQUHAwIwNgYDVR0fBC8wLTAroCmgJ4YlaHR0cDovL3d3dy5l
+eGFtcGxlLmNvbS9leGFtcGxlX2NhLmNybDANBgkqhkiG9w0BAQUFAAOBgQDAjkrY
+6tDChmKbvr8w6Du/t8vHjTCoCIocHTN0qzWOeb1YsAGX89+TrWIuO1dFyYd+Z0KC
+PDKB5j/ygml9Na+AklSYAVJIjvlzXKZrOaPmhZqDufi+rXWti/utVqY4VMW2+HKC
+nXp37qWeuFLGyR1519Y1d6F/5XzqmvbwURuEug==
+-----END CERTIFICATE-----"""
+
+ CLIENT_CERT_INVALID = '''-----BEGIN CERTIFICATE-----
+MIIDvTCCAqWgAwIBAgIBAjANBgkqhkiG9w0BAQUFADCBizELMAkGA1UEBhMCVVMx
+CzAJBgNVBAgTAkNBMRIwEAYDVQQHEwlTb21ld2hlcmUxEzARBgNVBAoTCkNpZW5h
+IEluYy4xHjAcBgkqhkiG9w0BCQEWD2FkbWluQGNpZW5hLmNvbTEmMCQGA1UEAxMd
+RXhhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTYwMzExMTg1MzM2WhcN
+MTcwMzA2MTg1MzM2WjBnMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEzARBgNV
+BAoTCkNpZW5hIEluYy4xFzAVBgNVBAMUDnVzZXJAY2llbmEuY29tMR0wGwYJKoZI
+hvcNAQkBFg51c2VyQGNpZW5hLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
+AQoCggEBAOxemcBsPn9tZsCa5o2JA6sQDC7A6JgCNXXl2VFzKLNNvB9PS6D7ZBsQ
+5An0zEDMNzi51q7lnrYg1XyiE4S8FzMGAFr94RlGMQJUbRD9V/oqszMX4k++iAOK
+tIA1gr3x7Zi+0tkjVSVzXTmgNnhChAamdMsjYUG5+CY9WAicXyy+VEV3zTphZZDR
+OjcjEp4m/TSXVPYPgYDXI40YZKX5BdvqykWtT/tIgZb48RS1NPyN/XkCYzl3bv21
+qx7Mc0fcEbsJBIIRYTUkfxnsilcnmLxSYO+p+DZ9uBLBzcQt+4Rd5pLSfi21WM39
+2Z2oOi3vs/OYAPAqgmi2JWOv3mePa/8CAwEAAaNPME0wEwYDVR0lBAwwCgYIKwYB
+BQUHAwIwNgYDVR0fBC8wLTAroCmgJ4YlaHR0cDovL3d3dy5leGFtcGxlLmNvbS9l
+eGFtcGxlX2NhLmNybDANBgkqhkiG9w0BAQUFAAOCAQEALBzMPDTIB6sLyPl0T6JV
+MjOkyldAVhXWiQsTjaGQGJUUe1cmUJyZbUZEc13MygXMPOM4x7z6VpXGuq1c/Vxn
+VzQ2fNnbJcIAHi/7G8W5/SQfPesIVDsHTEc4ZspPi5jlS/MVX3HOC+BDbOjdbwqP
+RX0JEr+uOyhjO+lRxG8ilMRACoBUbw1eDuVDoEBgErSUC44pq5ioDw2xelc+Y6hQ
+dmtYwfY0DbvwxHtA495frLyPcastDiT/zre7NL51MyUDPjjYjghNQEwvu66IKbQ3
+T1tJBrgI7/WI+dqhKBFolKGKTDWIHsZXQvZ1snGu/FRYzg1l+R/jT8cRB9BDwhUt
+yg==
+-----END CERTIFICATE-----'''
@classmethod
def setUpClass(cls):
cls.voltha = VolthaCtrl(cls.VOLTHA_HOST, rest_port = cls.VOLTHA_REST_PORT)
+# cls.update_apps_version()
+ dids = OnosCtrl.get_device_ids()
+ device_map = {}
+ for did in dids:
+ device_map[did] = { 'basic' : { 'driver' : 'pmc-olt' } }
+ network_cfg = {}
+ network_cfg = { 'devices' : device_map }
+ ## Restart ONOS with cpqd driver config for OVS
+ cls.start_onos(network_cfg = network_cfg)
+ cls.install_app_table()
+ cls.olt = OltConfig(olt_conf_file = cls.olt_conf_file)
+ cls.port_map, cls.port_list = cls.olt.olt_port_map()
+ cls.switches = cls.port_map['switches']
+ cls.num_ports = cls.port_map['num_ports']
+ if cls.num_ports > 1:
+ cls.num_ports -= 1 ##account for the tx port
+ cls.activate_apps(cls.apps + cls.olt_apps)
+
+ def remove_olt(self, switch_map):
+ controller = get_controller()
+ auth = ('karaf', 'karaf')
+ #remove subscriber for every port on all the voltha devices
+ for device, device_map in switch_map.iteritems():
+ uni_ports = device_map['ports']
+ uplink_vlan = device_map['uplink_vlan']
+ for port in uni_ports:
+ rest_url = 'http://{}:8181/onos/olt/oltapp/{}/{}'.format(controller,
+ device,
+ port)
+ resp = requests.delete(rest_url, auth = auth)
+ if resp.status_code not in [204, 202, 200]:
+ log_test.error('Error deleting subscriber for device %s on port %s' %(device, port))
+ else:
+ log_test.info('Deleted subscriber for device %s on port %s' %(device, port))
+ OnosCtrl.uninstall_app(self.olt_app_file)
+
+ def config_olt(self, switch_map):
+ controller = get_controller()
+ OnosCtrl.install_app(self.olt_app_file)
+ time.sleep(5)
+ auth = ('karaf', 'karaf')
+ #configure subscriber for every port on all the voltha devices
+ for device, device_map in switch_map.iteritems():
+ uni_ports = device_map['ports']
+ uplink_vlan = device_map['uplink_vlan']
+ for port in uni_ports:
+ vlan = port
+ rest_url = 'http://{}:8181/onos/olt/oltapp/{}/{}/{}'.format(controller,
+ device,
+ port,
+ vlan)
+ resp = requests.post(rest_url, auth = auth)
+ #assert_equal(resp.ok, True)
+
+ @classmethod
+ def start_onos(cls, network_cfg = None):
+ if cls.onos_restartable is False:
+ log_test.info('ONOS restart is disabled. Skipping ONOS restart')
+ return
+ if cls.VOLTHA_ENABLED is True:
+ log_test.info('ONOS restart skipped as VOLTHA is running')
+ return
+ if network_cfg is None:
+ network_cfg = cls.device_dict
+
+ if type(network_cfg) is tuple:
+ res = []
+ for v in network_cfg:
+ res += v.items()
+ config = dict(res)
+ else:
+ config = network_cfg
+ log_test.info('Restarting ONOS with new network configuration')
+ return cord_test_onos_restart(config = config)
+
+ @classmethod
+ def install_app_table(cls):
+ ##Uninstall the existing app if any
+ OnosCtrl.uninstall_app(cls.table_app)
+ time.sleep(2)
+ log_test.info('Installing the multi table app %s for subscriber test' %(cls.table_app_file))
+ OnosCtrl.install_app(cls.table_app_file)
+ time.sleep(3)
+ #onos_ctrl = OnosCtrl(cls.vtn_app)
+ #onos_ctrl.deactivate()
+
+ @classmethod
+ def activate_apps(cls, apps):
+ for app in apps:
+ onos_ctrl = OnosCtrl(app)
+ status, _ = onos_ctrl.activate()
+ assert_equal(status, True)
+ time.sleep(2)
+
+ def tls_flow_check(self, olt_uni_port, cert_info = None):
+ def tls_fail_cb():
+ log_test.info('TLS verification failed')
+ if cert_info is None:
+ tls = TLSAuthTest(intf = olt_uni_port)
+ log_test.info('Running subscriber %s tls auth test with valid TLS certificate' %olt_uni_port)
+ tls.runTest()
+ assert_equal(tls.failTest, False)
+ if cert_info == "no_cert":
+ tls = TLSAuthTest(fail_cb = tls_fail_cb, intf = olt_uni_port, client_cert = '')
+ log_test.info('Running subscriber %s tls auth test with no TLS certificate' %olt_uni_port)
+ tls.runTest()
+ assert_equal(tls.failTest, True)
+ if cert_info == "invalid_cert":
+ tls = TLSAuthTest(fail_cb = tls_fail_cb, intf = olt_uni_port, client_cert = self.CLIENT_CERT_INVALID)
+ log_test.info('Running subscriber %s tls auth test with invalid TLS certificate' %olt_uni_port)
+ tls.runTest()
+ assert_equal(tls.failTest, True)
+ self.test_status = True
+ return self.test_status
def test_olt_enable_disable(self):
log_test.info('Enabling OLT type %s, MAC %s' %(self.OLT_TYPE, self.OLT_MAC))
@@ -50,7 +239,33 @@
4. Validate that eap tls valid auth packets are being exchanged between subscriber, onos and freeradius.
5. Verify that subscriber is authenticated successfully.
"""
+ log_test.info('Enabling ponsim_olt')
+ ponsim_address = '{}:50060'.format(self.VOLTHA_HOST)
+ device_id, status = self.voltha.enable_device('ponsim_olt', address = ponsim_address)
+ assert_not_equal(device_id, None)
+ voltha = VolthaCtrl(self.VOLTHA_HOST,
+ rest_port = self.VOLTHA_REST_PORT,
+ uplink_vlan_map = self.VOLTHA_UPLINK_VLAN_MAP)
+ time.sleep(10)
+ switch_map = None
+ olt_configured = False
+ switch_map = voltha.config(fake = self.VOLTHA_CONFIG_FAKE)
+ log_test.info('Installing OLT app')
+ OnosCtrl.install_app(self.olt_app_file)
+ time.sleep(5)
+ log_test.info('Adding subscribers through OLT app')
+ self.config_olt(switch_map)
+ olt_configured = True
+ time.sleep(5)
+ auth_status = self.tls_flow_check(self.INTF_RX_DEFAULT)
+ try:
+ assert_equal(auth_status, True)
+ assert_equal(status, True)
+ time.sleep(10)
+ finally:
+ self.voltha.disable_device(device_id, delete = True)
+ @deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_eap_tls_authentication_failure(self):
"""
Test Method:
@@ -61,7 +276,38 @@
4. Validate that eap tls without cert auth packet is being exchanged between subscriber, onos and freeradius.
5. Verify that subscriber authentication is unsuccessful..
"""
+ df = defer.Deferred()
+ def tls_flow_check_with_no_cert_scenario(df):
+ log_test.info('Enabling ponsim_olt')
+ ponsim_address = '{}:50060'.format(self.VOLTHA_HOST)
+ device_id, status = self.voltha.enable_device('ponsim_olt', address = ponsim_address)
+ assert_not_equal(device_id, None)
+ voltha = VolthaCtrl(self.VOLTHA_HOST,
+ rest_port = self.VOLTHA_REST_PORT,
+ uplink_vlan_map = self.VOLTHA_UPLINK_VLAN_MAP)
+ time.sleep(10)
+ switch_map = None
+ olt_configured = False
+ switch_map = voltha.config(fake = self.VOLTHA_CONFIG_FAKE)
+ log_test.info('Installing OLT app')
+ OnosCtrl.install_app(self.olt_app_file)
+ time.sleep(5)
+ log_test.info('Adding subscribers through OLT app')
+ self.config_olt(switch_map)
+ olt_configured = True
+ time.sleep(5)
+ auth_status = self.tls_flow_check(self.INTF_RX_DEFAULT, cert_info = "no_cert")
+ try:
+ assert_equal(auth_status, True)
+ assert_equal(status, True)
+ time.sleep(10)
+ finally:
+ self.voltha.disable_device(device_id, delete = True)
+ df.callback(0)
+ reactor.callLater(0, tls_flow_check_with_no_cert_scenario, df)
+ return df
+ @deferred(TESTCASE_TIMEOUT)
def test_subscriber_with_voltha_for_eap_tls_authentication_using_invalid_cert(self):
"""
Test Method:
@@ -72,6 +318,36 @@
4. Validate that eap tls with invalid cert auth packet is being exchanged between subscriber, onos and freeradius.
5. Verify that subscriber authentication is unsuccessful..
"""
+ df = defer.Deferred()
+ def tls_flow_check_with_invalid_cert_scenario(df):
+ log_test.info('Enabling ponsim_olt')
+ ponsim_address = '{}:50060'.format(self.VOLTHA_HOST)
+ device_id, status = self.voltha.enable_device('ponsim_olt', address = ponsim_address)
+ assert_not_equal(device_id, None)
+ voltha = VolthaCtrl(self.VOLTHA_HOST,
+ rest_port = self.VOLTHA_REST_PORT,
+ uplink_vlan_map = self.VOLTHA_UPLINK_VLAN_MAP)
+ time.sleep(10)
+ switch_map = None
+ olt_configured = False
+ switch_map = voltha.config(fake = self.VOLTHA_CONFIG_FAKE)
+ log_test.info('Installing OLT app')
+ OnosCtrl.install_app(self.olt_app_file)
+ time.sleep(5)
+ log_test.info('Adding subscribers through OLT app')
+ self.config_olt(switch_map)
+ olt_configured = True
+ time.sleep(5)
+ auth_status = self.tls_flow_check(self.INTF_RX_DEFAULT, cert_info = "invalid_cert")
+ try:
+ assert_equal(auth_status, True)
+ assert_equal(status, True)
+ time.sleep(10)
+ finally:
+ self.voltha.disable_device(device_id, delete = True)
+ df.callback(0)
+ reactor.callLater(0, tls_flow_check_with_invalid_cert_scenario, df)
+ return df
def test_subscriber_with_voltha_for_eap_tls_authentication_with_aaa_app_deactivation(self):
"""