Refactor all python test libraries to conform to python coding guidelines
Refactor robot Test Case Suite to conform to Robot code coding guidelines
Now that I have pycharm installed in a cloned VM that I resized to allow for
development tools, I see coding issues with the original code that I had
This is mostly a cosmetic change as no fundamental changes were made
to the original code
Rebased and fixed conflicts
Change-Id: I5dc0534e92fa708b45399944994101afd7efed63
diff --git a/tests/atests/common/authentication.py b/tests/atests/common/authentication.py
index 6bbc669..5cde7b5 100644
--- a/tests/atests/common/authentication.py
+++ b/tests/atests/common/authentication.py
@@ -21,10 +21,9 @@
import time
import os
import subprocess
-import commands
import testCaseUtils
import logging
-import signal
+
class Authentication(object):
@@ -34,61 +33,63 @@
AUTHENTICATE_FILENAME = 'voltha_authenticate.log'
def __init__(self):
- self.dirs = {}
- self.dirs ['log'] = None
- self.dirs ['root'] = None
- self.dirs ['voltha'] = None
+ self.dirs = dict()
+ self.dirs['log'] = None
+ self.dirs['root'] = None
+ self.dirs['voltha'] = None
self.__rgName = None
self.__radiusName = None
self.__radiusIp = None
- def aSetLogDirs(self, rootDir, volthaDir, logDir):
- testCaseUtils.configDirs(self, logDir, rootDir, volthaDir)
+ def a_set_log_dirs(self, root_dir, voltha_dir, log_dir):
+ testCaseUtils.config_dirs(self, log_dir, root_dir, voltha_dir)
def discover_rg_pod_name(self):
- self.__rgName = testCaseUtils.extractPodName('rg-').strip()
+ self.__rgName = testCaseUtils.extract_pod_name('rg-').strip()
def discover_freeradius_pod_name(self):
- self.__radiusName = testCaseUtils.extractPodName('freeradius').strip()
- logging.info ('freeradius Name = %s' % self.__radiusName)
+ self.__radiusName = testCaseUtils.extract_pod_name('freeradius').strip()
+ logging.info('freeradius Name = %s' % self.__radiusName)
def discover_freeradius_ip_addr(self):
- ipAddr = testCaseUtils.extractRadiusIpAddr(self.__radiusName)
+ ipAddr = testCaseUtils.extract_radius_ip_addr(self.__radiusName)
assert ipAddr, 'No IP address listed for freeradius'
self.__radiusIp = ipAddr.strip()
logging.info('freeradius IP = %s' % self.__radiusIp)
def set_current_freeradius_ip_in_aaa_json(self):
- status = testCaseUtils.modifyRadiusIpInJsonUsingSed(self, self.__radiusIp)
- assertFalse = 'Setting Radius Ip in Json File did not return Success'
+ status = testCaseUtils.modify_radius_ip_in_json_using_sed(self, self.__radiusIp)
+ assert (status == 0), 'Setting Radius Ip in Json File did not return Success'
def alter_aaa_application_configuration_in_onos_using_aaa_json(self):
- logging.info ('Altering the Onos NetCfg AAA apps with Freeradius IP address')
- logging.debug ('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
- 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/aaa_json'
- % testCaseUtils.getDir(self, 'voltha'))
+ logging.info('Altering the Onos NetCfg AAA apps with Freeradius IP address')
+ logging.debug('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
+ 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/aaa_json'
+ % testCaseUtils.get_dir(self, 'voltha'))
os.system('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
- 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/aaa_json'
- % testCaseUtils.getDir(self, 'voltha'))
+ 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/aaa_json'
+ % testCaseUtils.get_dir(self, 'voltha'))
- def execute_authenticatication_on_rg(self):
- logging.info ('Running Radius Authentication from RG')
- process_output = open('%s/%s' % (testCaseUtils.getDir(self, 'log'), self.AUTHENTICATE_FILENAME), 'w')
- proc1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'bash', '-c', \
+ def execute_authentication_on_rg(self):
+ logging.info('Running Radius Authentication from RG')
+ process_output = open('%s/%s' % (testCaseUtils.get_dir(self, 'log'), self.AUTHENTICATE_FILENAME), 'w')
+ proc1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'bash', '-c',
'/sbin/wpa_supplicant -Dwired -ieth0 -c /etc/wpa_supplicant/wpa_supplicant.conf'],
stdout=process_output,
stderr=process_output)
+
time.sleep(15)
+ logging.debug('return value from supplicant subprocess = %s' % proc1.returncode)
procPidSupplicant1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'ps', '-ef'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
procPidSupplicant2 = subprocess.Popen(['grep', '-e', '/sbin/wpa_supplicant'], stdin=procPidSupplicant1.stdout,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
procPidSupplicant3 = subprocess.Popen(['awk', "{print $2}"], stdin=procPidSupplicant2.stdout,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
procPidSupplicant1.stdout.close()
procPidSupplicant2.stdout.close()
@@ -97,19 +98,20 @@
supplicantPid = out.strip()
procKillSupplicant1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'kill', supplicantPid],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
out, err = procKillSupplicant1.communicate()
+ assert not err, 'Killing Supplicant returned %s' % err
procPidBash1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'ps', '-ef'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
procPidBash2 = subprocess.Popen(['grep', '-e', '/bin/bash'], stdin=procPidBash1.stdout,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
procPidBash3 = subprocess.Popen(['awk', "{print $2}"], stdin=procPidBash2.stdout,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
procPidBash1.stdout.close()
procPidBash2.stdout.close()
@@ -118,13 +120,14 @@
bashPid = out.strip()
procKillBash1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'kill', '-9', bashPid],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
out, err = procKillBash1.communicate()
+ assert not err, 'Killing Bash returned %s' % err
process_output.close()
- testCaseUtils.printLogFile(self, self.AUTHENTICATE_FILENAME)
+ testCaseUtils.print_log_file(self, self.AUTHENTICATE_FILENAME)
def verify_authentication_should_have_started(self):
statusLines = testCaseUtils.get_fields_from_grep_command(self, 'CTRL-EVENT-EAP-STARTED', self.AUTHENTICATE_FILENAME)
@@ -142,17 +145,17 @@
statusLines = testCaseUtils.get_fields_from_grep_command(self, 'CTRL-EVENT-TERMINATING', self.AUTHENTICATE_FILENAME)
assert statusLines, 'Authentication was not terminated'
-def runTest(rootDir, volthaDir, logDir):
+
+def run_test(root_dir, voltha_dir, log_dir):
auth = Authentication()
- auth.aSetLogDirs(rootDir, volthaDir, logDir)
+ auth.a_set_log_dirs(root_dir, voltha_dir, log_dir)
auth.discover_rg_pod_name()
auth.discover_freeradius_pod_name()
auth.discover_freeradius_ip_addr()
auth.set_current_freeradius_ip_in_aaa_json()
auth.alter_aaa_application_configuration_in_onos_using_aaa_json()
- auth.execute_authenticatication_on_rg()
+ auth.execute_authentication_on_rg()
auth.verify_authentication_should_have_started()
auth.verify_authentication_should_have_completed()
auth.verify_authentication_should_have_disconnected()
auth.verify_authentication_should_have_terminated()
-
diff --git a/tests/atests/common/auto_test.py b/tests/atests/common/auto_test.py
index 308ec04..f4b90ae 100755
--- a/tests/atests/common/auto_test.py
+++ b/tests/atests/common/auto_test.py
@@ -31,14 +31,15 @@
DEFAULT_LOG_DIR = '/tmp/voltha_test_results'
logging.basicConfig(level=logging.INFO)
-def dir_init(logDir=DEFAULT_LOG_DIR, volthaDir=os.environ['VOLTHA_BASE']):
+
+def dir_init(log_dir=DEFAULT_LOG_DIR, voltha_dir=os.environ['VOLTHA_BASE']):
logging.info(__file__)
"""
Init automated testing environment and return three directories: root dir,
voltha sources dir and log dir
"""
- rootDir = os.path.abspath(os.path.dirname(__file__))
+ root_dir = os.path.abspath(os.path.dirname(__file__))
currentTime = time.strftime("%Y-%m-%d-%H-%M-%S")
@@ -46,13 +47,13 @@
# added to the log directory name
# logDir += '_' + currentTime
- os.system('mkdir -p ' + logDir + ' > /dev/null 2>&1')
- os.system('rm -rf %s/*' % logDir)
+ os.system('mkdir -p ' + log_dir + ' > /dev/null 2>&1')
+ os.system('rm -rf %s/*' % log_dir)
logging.info('Starting Voltha Test Case Suite at: %s\nRoot Directory: %s\n'
- 'VOLTHA Directory: %s\nLog Directory: %s' %
- (currentTime, rootDir, volthaDir, logDir))
+ 'VOLTHA Directory: %s\nLog Directory: %s' %
+ (currentTime, root_dir, voltha_dir, log_dir))
- return rootDir, volthaDir, logDir
+ return root_dir, voltha_dir, log_dir
#
@@ -70,12 +71,12 @@
ROOT_DIR, VOLTHA_DIR, LOG_DIR = dir_init(args.logDir)
- volthaMngr.voltha_Initialize(ROOT_DIR, VOLTHA_DIR, LOG_DIR)
+ volthaMngr.voltha_initialize(ROOT_DIR, VOLTHA_DIR, LOG_DIR)
- preprovisioning.runTest('olt.voltha.svc', 50060, 'ponsim_olt', 'ponsim_onu', LOG_DIR)
+ preprovisioning.run_test('olt.voltha.svc', 50060, 'ponsim_olt', 'ponsim_onu', LOG_DIR)
- discovery.runTest('olt.voltha.svc', 'ponsim_olt', 'ponsim_onu', LOG_DIR)
+ discovery.run_test('olt.voltha.svc', 'ponsim_olt', 'ponsim_onu', LOG_DIR)
- authentication.runTest(ROOT_DIR, VOLTHA_DIR, LOG_DIR)
+ authentication.run_test(ROOT_DIR, VOLTHA_DIR, LOG_DIR)
time.sleep(5)
diff --git a/tests/atests/common/discovery.py b/tests/atests/common/discovery.py
index a0f3ef5..e125dcf 100755
--- a/tests/atests/common/discovery.py
+++ b/tests/atests/common/discovery.py
@@ -18,12 +18,10 @@
vOLT-HA Discovery Test Case module
"""
-import time
-import os
-import commands
import testCaseUtils
import logging
+
class Discovery(object):
"""
@@ -31,10 +29,10 @@
"""
def __init__(self):
- self.dirs = {}
- self.dirs ['log'] = None
- self.dirs ['root'] = None
- self.dirs ['voltha'] = None
+ self.dirs = dict()
+ self.dirs['log'] = None
+ self.dirs['root'] = None
+ self.dirs['voltha'] = None
self.__logicalDeviceType = None
self.__oltType = None
@@ -45,98 +43,100 @@
self.__onuDeviceIds = []
self.__peers = None
- def dSetLogDirs(self, logDir):
- testCaseUtils.configDirs(self, logDir)
+ def d_set_log_dirs(self, log_dir):
+ testCaseUtils.config_dirs(self, log_dir)
- def dConfigure(self, logicalDeviceType, oltType, onuType):
- self.__logicalDeviceType = logicalDeviceType
- self.__oltType = oltType
- self.__onuType = onuType
+ def d_configure(self, logical_device_type, olt_type, onu_type):
+ self.__logicalDeviceType = logical_device_type
+ self.__oltType = olt_type
+ self.__onuType = onu_type
- def logicalDevice(self):
+ def logical_device(self):
logging.info('Logical Device Info')
statusLines = testCaseUtils.get_fields_from_grep_command(self, self.__logicalDeviceType, 'voltha_devices_after_enable.log')
assert statusLines, 'No Logical Devices listed under devices'
- self.__fields = testCaseUtils.parseFields(statusLines)
+ self.__fields = testCaseUtils.parse_fields(statusLines)
self.__logicalDeviceId = self.__fields[4].strip()
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_logical_device.log', 'logical_device ' + self.__logicalDeviceId, 'voltha_logical_device_ports.log', 'ports', 'voltha_logical_device_flows.log', 'flows')
- testCaseUtils.printLogFile (self, 'voltha_logical_device_ports.log')
- testCaseUtils.printLogFile (self, 'voltha_logical_device_flows.log')
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_logical_device.log', 'logical_device ' + self.__logicalDeviceId,
+ 'voltha_logical_device_ports.log', 'ports', 'voltha_logical_device_flows.log', 'flows')
+ testCaseUtils.print_log_file(self, 'voltha_logical_device_ports.log')
+ testCaseUtils.print_log_file(self, 'voltha_logical_device_flows.log')
- def oltDiscovery(self):
+ def olt_discovery(self):
logging.info('Olt Discovery')
statusLines = testCaseUtils.get_fields_from_grep_command(self, self.__oltType, 'voltha_devices_after_enable.log')
assert statusLines, 'No Olt listed under devices'
- self.__fields = testCaseUtils.parseFields(statusLines)
+ self.__fields = testCaseUtils.parse_fields(statusLines)
self.__oltDeviceId = self.__fields[1].strip()
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_olt_device.log', 'device ' + self.__oltDeviceId, 'voltha_olt_ports.log', 'ports', 'voltha_olt_flows.log', 'flows')
- testCaseUtils.printLogFile (self, 'voltha_olt_ports.log')
- testCaseUtils.printLogFile (self, 'voltha_olt_flows.log')
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_olt_device.log', 'device ' + self.__oltDeviceId, 'voltha_olt_ports.log',
+ 'ports', 'voltha_olt_flows.log', 'flows')
+ testCaseUtils.print_log_file(self, 'voltha_olt_ports.log')
+ testCaseUtils.print_log_file(self, 'voltha_olt_flows.log')
- def onuDiscovery(self):
+ def onu_discovery(self):
logging.info('Onu Discovery')
statusLines = testCaseUtils.get_fields_from_grep_command(self, self.__onuType, 'voltha_devices_after_enable.log')
assert statusLines, 'No Onu listed under devices'
lines = statusLines.splitlines()
for line in lines:
- self.__fields = testCaseUtils.parseFields(line)
+ self.__fields = testCaseUtils.parse_fields(line)
onuDeviceId = self.__fields[1].strip()
self.__onuDeviceIds.append(onuDeviceId)
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_onu_device_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log', 'device ' + onuDeviceId,
- 'voltha_onu_ports_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log', 'ports',
- 'voltha_onu_flows_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log', 'flows')
- testCaseUtils.printLogFile (self, 'voltha_onu_ports_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
- testCaseUtils.printLogFile (self, 'voltha_onu_flows_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_onu_device_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log',
+ 'device ' + onuDeviceId, 'voltha_onu_ports_' +
+ str(self.__onuDeviceIds.index(onuDeviceId)) + '.log', 'ports', 'voltha_onu_flows_' +
+ str(self.__onuDeviceIds.index(onuDeviceId)) + '.log', 'flows')
+ testCaseUtils.print_log_file(self, 'voltha_onu_ports_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
+ testCaseUtils.print_log_file(self, 'voltha_onu_flows_' + str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
def olt_ports_should_be_enabled_and_active(self):
statusLines = testCaseUtils.get_fields_from_grep_command(self, self.__oltDeviceId, 'voltha_olt_ports.log')
assert statusLines, 'No Olt device listed under ports'
lines = statusLines.splitlines()
for line in lines:
- self.__fields = testCaseUtils.parseFields(line)
- assert self.check_states(self.__oltDeviceId) == True, 'States of %s does match expected ' % self.__oltDeviceId
+ self.__fields = testCaseUtils.parse_fields(line)
+ assert (self.check_states(self.__oltDeviceId) is True), 'States of %s does match expected ' % self.__oltDeviceId
portType = self.__fields[3].strip()
assert (portType == 'ETHERNET_NNI' or portType == 'PON_OLT'),\
- 'Port type for %s does not match expected ETHERNET_NNI or PON_OLT' % self.__oltDeviceId
+ 'Port type for %s does not match expected ETHERNET_NNI or PON_OLT' % self.__oltDeviceId
if portType == 'PON_OLT':
self.__peers = self.__fields[7].strip()
peerFields = self.__peers.split(',')
peerDevices = peerFields[1::2]
for peerDevice in peerDevices:
deviceFields = peerDevice.split(':')
- deviceId = deviceFields[1].replace("'","").replace('u','').rstrip("}]").strip()
+ deviceId = deviceFields[1].replace("'", "").replace('u', '').rstrip("}]").strip()
assert deviceId in self.__onuDeviceIds, 'ONU Device %s not found as Peer' % deviceId
def onu_ports_should_be_enabled_and_active(self):
for onuDeviceId in self.__onuDeviceIds:
- statusLines = testCaseUtils.get_fields_from_grep_command(self, onuDeviceId, 'voltha_onu_ports_' + \
- str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
+ statusLines = testCaseUtils.get_fields_from_grep_command(self, onuDeviceId, 'voltha_onu_ports_' +
+ str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
assert statusLines, 'No Onu device listed under ports'
lines = statusLines.splitlines()
for line in lines:
- self.__fields = testCaseUtils.parseFields(line)
- assert self.check_states(onuDeviceId) == True, 'States of %s does match expected ' % onuDeviceId
+ self.__fields = testCaseUtils.parse_fields(line)
+ assert (self.check_states(onuDeviceId) is True), 'States of %s does match expected ' % onuDeviceId
portType = self.__fields[3].strip()
assert (portType == 'ETHERNET_UNI' or portType == 'PON_ONU'),\
- 'Port type for %s does not match expected ETHERNET_UNI or PON_ONU' % onuDeviceId
+ 'Port type for %s does not match expected ETHERNET_UNI or PON_ONU' % onuDeviceId
if portType == 'PON_ONU':
self.__peers = self.__fields[7].strip()
peerFields = self.__peers.split(',')
peerDevice = peerFields[1]
deviceFields = peerDevice.split(':')
- deviceId = deviceFields[1].replace("'","").replace('u','').rstrip("}]").strip()
+ deviceId = deviceFields[1].replace("'", "").replace('u', '').rstrip("}]").strip()
assert deviceId == self.__oltDeviceId, 'OLT Device %s not found as Peer' % deviceId
-
- def check_states(self, deviceId):
+ def check_states(self, device_id):
result = True
adminState = self.__fields[4].strip()
- assert adminState == 'ENABLED', 'Admin State of %s not ENABLED' % deviceId
- operStatus = self.__fields[5].strip()
- assert operStatus == 'ACTIVE', 'Oper Status of %s not ACTIVE' % deviceId
+ assert adminState == 'ENABLED', 'Admin State of %s not ENABLED' % device_id
+ operatorStatus = self.__fields[5].strip()
+ assert operatorStatus == 'ACTIVE', 'Operator Status of %s not ACTIVE' % device_id
return result
def olt_should_have_at_least_one_flow(self):
@@ -149,8 +149,8 @@
def onu_should_have_at_least_one_flow(self):
for onuDeviceId in self.__onuDeviceIds:
- statusLines = testCaseUtils.get_fields_from_grep_command(self, 'Flows', 'voltha_onu_flows_' + \
- str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
+ statusLines = testCaseUtils.get_fields_from_grep_command(self, 'Flows', 'voltha_onu_flows_' +
+ str(self.__onuDeviceIds.index(onuDeviceId)) + '.log')
assert statusLines, 'No Onu flows under device %s' % onuDeviceId
before, flows, numFlows = statusLines.partition('Flows')
plainNumber = numFlows.strip().strip('():')
@@ -158,14 +158,14 @@
assert int(plainNumber) > 0, 'Zero number of flows for Onu %s' % onuDeviceId
-def runTest(logicalDeviceType, oltType, onuType, logDir):
+def run_test(logical_device_type, olt_type, onu_type, log_dir):
discovery = Discovery()
- discovery.dSetLogDirs(logDir)
- discovery.dConfigure(logicalDeviceType, oltType, onuType)
- discovery.oltDiscovery()
- discovery.onuDiscovery()
- discovery.logicalDevice()
- discovery.olt_ports_should_be_enabled_and_active()
+ discovery.d_set_log_dirs(log_dir)
+ discovery.d_configure(logical_device_type, olt_type, onu_type)
+ discovery.olt_discovery()
+ discovery.onu_discovery()
+ discovery.logical_device()
+ discovery.olt_ports_should_be_enabled_and_active()
discovery.onu_ports_should_be_enabled_and_active()
discovery.olt_should_have_at_least_one_flow()
discovery.onu_should_have_at_least_one_flow()
diff --git a/tests/atests/common/preprovisioning.py b/tests/atests/common/preprovisioning.py
index 81cfd8a..affa991 100755
--- a/tests/atests/common/preprovisioning.py
+++ b/tests/atests/common/preprovisioning.py
@@ -19,11 +19,10 @@
"""
import time
-import os
-import commands
import testCaseUtils
import logging
+
class Preprovisioning(object):
"""
@@ -31,10 +30,10 @@
"""
def __init__(self):
- self.dirs = {}
- self.dirs ['log'] = None
- self.dirs ['root'] = None
- self.dirs ['voltha'] = None
+ self.dirs = dict()
+ self.dirs['log'] = None
+ self.dirs['root'] = None
+ self.dirs['voltha'] = None
self.__oltIpAddress = None
self.__oltPort = None
@@ -43,20 +42,20 @@
self.__fields = []
self.__oltDeviceId = None
- def pSetLogDirs(self, logDir):
- testCaseUtils.configDirs(self, logDir)
+ def p_set_log_dirs(self, log_dir):
+ testCaseUtils.config_dirs(self, log_dir)
- def pConfigure(self, oltIpAddress, oltPort, oltType, onuType):
- self.__oltIpAddress = oltIpAddress
- self.__oltPort = oltPort
- self.__oltType = oltType
- self.__onuType = onuType
+ def p_configure(self, olt_ip_address, olt_port, olt_type, onu_type):
+ self.__oltIpAddress = olt_ip_address
+ self.__oltPort = olt_port
+ self.__oltType = olt_type
+ self.__onuType = onu_type
- def preprovisionOlt(self):
+ def preprovision_olt(self):
logging.info('Do PROVISIONING')
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_preprovision_olt.log', 'preprovision_olt -t ponsim_olt -H %s:%s' %
- (self.__oltIpAddress, self.__oltPort))
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_preprovision_olt.log', 'preprovision_olt -t ponsim_olt -H %s:%s' %
+ (self.__oltIpAddress, self.__oltPort))
time.sleep(5)
def status_should_be_success_after_preprovision_command(self):
@@ -64,15 +63,15 @@
assert statusLines, 'Preprovision Olt command should have returned success but did not'
def query_devices_before_enabling(self):
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_devices_before_enable.log', 'devices')
- testCaseUtils.printLogFile (self, 'voltha_devices_before_enable.log')
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_devices_before_enable.log', 'devices')
+ testCaseUtils.print_log_file(self, 'voltha_devices_before_enable.log')
time.sleep(5)
def check_olt_fields_before_enabling(self):
statusLines = testCaseUtils.get_fields_from_grep_command(self, self.__oltType, 'voltha_devices_before_enable.log')
assert statusLines, 'No Olt listed under devices'
- self.__fields = testCaseUtils.parseFields(statusLines)
+ self.__fields = testCaseUtils.parse_fields(statusLines)
self.__oltDeviceId = self.__fields[1].strip()
logging.debug("OLT device id = %s" % self.__oltDeviceId)
adminState = self.__fields[3].strip()
@@ -83,20 +82,20 @@
assert hostPortFields[0].strip() == self.__oltIpAddress or hostPortFields[1] == str(self.__oltPort), \
'Olt IP or Port does not match'
- def check_states(self, devType):
+ def check_states(self, dev_type):
result = True
adminState = self.__fields[7].strip()
- assert adminState == 'ENABLED', 'Admin State of %s not ENABLED' % devType
- operStatus = self.__fields[8].strip()
- assert operStatus == 'ACTIVE', 'Oper Status of %s not ACTIVE' % devType
+ assert adminState == 'ENABLED', 'Admin State of %s not ENABLED' % dev_type
+ operatorStatus = self.__fields[8].strip()
+ assert operatorStatus == 'ACTIVE', 'Operator Status of %s not ACTIVE' % dev_type
connectStatus = self.__fields[9].strip()
- assert connectStatus == 'REACHABLE', 'Connect Status of %s not REACHABLE' % devType
+ assert connectStatus == 'REACHABLE', 'Connect Status of %s not REACHABLE' % dev_type
return result
def check_olt_fields_after_enabling(self):
statusLines = testCaseUtils.get_fields_from_grep_command(self, self.__oltType, 'voltha_devices_after_enable.log')
assert statusLines, 'No Olt listed under devices'
- self.__fields = testCaseUtils.parseFields(statusLines)
+ self.__fields = testCaseUtils.parse_fields(statusLines)
assert self.check_states(self.__oltType), 'States of %s does match expected' % self.__oltType
hostPort = self.__fields[11].strip()
assert hostPort, 'hostPort field is empty'
@@ -111,28 +110,29 @@
lenLines = len(lines)
assert lenLines == 1, 'Fixed single onu does not match, ONU Count was %d' % lenLines
for line in lines:
- self.__fields = testCaseUtils.parseFields(line)
- assert self.check_states(self.__onuType) == True, 'States of %s does match expected' % self.__onuType
+ self.__fields = testCaseUtils.parse_fields(line)
+ assert (self.check_states(self.__onuType) is True), 'States of %s does match expected' % self.__onuType
def enable(self):
logging.info('Enable %s OLT device' % self.__oltDeviceId)
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_enable.log', 'enable ' + self.__oltDeviceId)
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_enable.log', 'enable ' + self.__oltDeviceId)
def status_should_be_success_after_enable_command(self):
statusLines = testCaseUtils.get_fields_from_grep_command(self, 'success', 'voltha_enable.log')
assert statusLines, 'Enable command should have returned success but did not'
def query_devices_after_enabling(self):
- testCaseUtils.send_command_to_voltha_cli(testCaseUtils.getDir(self, 'log'),
- 'voltha_devices_after_enable.log', 'devices')
- testCaseUtils.printLogFile (self, 'voltha_devices_after_enable.log')
+ testCaseUtils.send_command_to_voltha_cli(testCaseUtils.get_dir(self, 'log'),
+ 'voltha_devices_after_enable.log', 'devices')
+ testCaseUtils.print_log_file(self, 'voltha_devices_after_enable.log')
-def runTest(oltIpAddress, oltPort, oltType, onuType, logDir):
+
+def run_test(olt_ip_address, olt_port, olt_type, onu_type, log_dir):
preprovisioning = Preprovisioning()
- preprovisioning.pSetLogDirs(logDir)
- preprovisioning.pConfigure(oltIpAddress, oltPort, oltType, onuType)
- preprovisioning.preprovisionOlt()
+ preprovisioning.p_set_log_dirs(log_dir)
+ preprovisioning.p_configure(olt_ip_address, olt_port, olt_type, onu_type)
+ preprovisioning.preprovision_olt()
preprovisioning.status_should_be_success_after_preprovision_command()
preprovisioning.query_devices_before_enabling()
preprovisioning.check_olt_fields_before_enabling()
@@ -141,6 +141,3 @@
preprovisioning.query_devices_after_enabling()
preprovisioning.check_olt_fields_after_enabling()
preprovisioning.check_onu_fields_after_enabling()
-
-
-
diff --git a/tests/atests/common/testCaseUtils.py b/tests/atests/common/testCaseUtils.py
index fe76687..e51199d 100755
--- a/tests/atests/common/testCaseUtils.py
+++ b/tests/atests/common/testCaseUtils.py
@@ -18,69 +18,72 @@
vOLT-HA Test Case Utils module
"""
import time
-import os
import commands
import subprocess
import pexpect
import sys
-def configDirs(self, logDir, rootDir = None, volthaDir = None):
- self.dirs ['log'] = logDir
- self.dirs ['root'] = rootDir
- self.dirs ['voltha'] = volthaDir
+
+def config_dirs(self, log_dir, root_dir=None, voltha_dir=None):
+ self.dirs['log'] = log_dir
+ self.dirs['root'] = root_dir
+ self.dirs['voltha'] = voltha_dir
-def getDir(self, Dir):
- return self.dirs.get(Dir)
+
+def get_dir(self, directory):
+ return self.dirs.get(directory)
-def removeLeadingLine(logDir, logFile):
- with open(logDir + '/' + logFile, 'r+') as file:
- lines = file.readlines()
- file.seek(0)
+
+def remove_leading_line(log_dir, log_file):
+ with open(log_dir + '/' + log_file, 'r+') as FILE:
+ lines = FILE.readlines()
+ FILE.seek(0)
lines = lines[1:]
for line in lines:
- file.write(line)
- file.truncate()
- file.close()
+ FILE.write(line)
+ FILE.truncate()
+ FILE.close()
-def send_command_to_voltha_cli(logDir, logFile1, cmd1, logFile2 = None, cmd2 = None, logFile3 = None, cmd3 = None):
- output = open(logDir + '/' + logFile1, 'w')
+
+def send_command_to_voltha_cli(log_dir, log_file1, cmd1, log_file2=None, cmd2=None, log_file3=None, cmd3=None):
+ output = open(log_dir + '/' + log_file1, 'w')
child = pexpect.spawn('ssh -p 30110 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no voltha@localhost')
child.expect('[pP]assword:')
child.sendline('admin')
child.expect('\((\\x1b\[\d*;?\d+m){1,2}voltha(\\x1b\[\d*;?\d+m){1,2}\)')
time.sleep(10)
- bytes = child.sendline(cmd1)
+ child.sendline(cmd1)
i = child.expect(['\((\\x1b\[\d*;?\d+m){1,2}voltha(\\x1b\[\d*;?\d+m){1,2}\)',
- '\((\\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\\x1b\[\d*;?\d+m){1,2}\)'])
+ '\((\\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\\x1b\[\d*;?\d+m){1,2}\)'])
if i == 0:
output.write(child.before)
output.close()
- removeLeadingLine(logDir, logFile1)
+ remove_leading_line(log_dir, log_file1)
elif i == 1:
- if logFile2 != None and cmd2 != None:
- output = open(logDir + '/' + logFile2, 'w')
- bytes = child.sendline(cmd2)
+ if log_file2 is not None and cmd2 is not None:
+ output = open(log_dir + '/' + log_file2, 'w')
+ child.sendline(cmd2)
child.expect('\((\\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\\x1b\[\d*;?\d+m){1,2}\)')
output.write(child.before)
output.close()
- removeLeadingLine(logDir, logFile2)
- if logFile3 != None and cmd3 != None:
- output = open(logDir + '/' + logFile3, 'w')
- bytes = child.sendline(cmd3)
+ remove_leading_line(log_dir, log_file2)
+ if log_file3 is not None and cmd3 is not None:
+ output = open(log_dir + '/' + log_file3, 'w')
+ child.sendline(cmd3)
child.expect('\((\\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\\x1b\[\d*;?\d+m){1,2}\)')
output.write(child.before)
output.close()
- removeLeadingLine(logDir, logFile3)
+ remove_leading_line(log_dir, log_file3)
child.close()
-def send_command_to_onos_cli(logDir, cmd, logFile):
- output = open(logDir + '/' + logFile, 'w')
+
+def send_command_to_onos_cli(log_dir, cmd, log_file):
+ output = open(log_dir + '/' + log_file, 'w')
child = pexpect.spawn('ssh -p 30115 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no karaf@localhost')
child.expect('[pP]assword:')
child.sendline('karaf')
child.expect('(\\x1b\[\d*;?\d+m){1,2}onos>(\\x1b\[\d*;?\d+m){1,2}')
- child.sendline('flows')
- child.expect('flows')
+ child.sendline(cmd)
child.expect('(\\x1b\[\d*;?\d+m){1,2}onos>(\\x1b\[\d*;?\d+m){1,2}')
output.write(child.before)
@@ -88,41 +91,46 @@
output.close()
child.close()
-def get_fields_from_grep_command(self, searchWord, logFile):
+
+def get_fields_from_grep_command(self, search_word, log_file):
grepCommand =\
- "grep %s %s/%s" % (searchWord, getDir(self, 'log'), logFile)
+ "grep %s %s/%s" % (search_word, get_dir(self, 'log'), log_file)
statusLines = commands.getstatusoutput(grepCommand)[1]
return statusLines
-def parseFields(statusLine):
- statusList = statusLine.split("|")
+
+def parse_fields(status_line):
+ statusList = status_line.split("|")
return statusList
-def printLogFile(self, logFile):
- with open(getDir(self, 'log') + '/' + logFile, 'r+') as file:
- lines = file.readlines()
+
+def print_log_file(self, log_file):
+ with open(get_dir(self, 'log') + '/' + log_file, 'r+') as FILE:
+ lines = FILE.readlines()
print
for line in lines:
- sys.stdout.write (line)
+ sys.stdout.write(line)
-def extractPodIpAddr(podName):
+
+def extract_pod_ip_addr(pod_name):
proc1 = subprocess.Popen(['/usr/bin/kubectl', 'get', 'svc', '--all-namespaces'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- proc2 = subprocess.Popen(['grep', '-e', podName], stdin=proc1.stdout,
+ proc2 = subprocess.Popen(['grep', '-e', pod_name], stdin=proc1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc3 = subprocess.Popen(['awk', "{print $4}"], stdin=proc2.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- proc1.stdout.close
- proc2.stdout.close
+ proc1.stdout.close()
+ proc2.stdout.close()
out, err = proc3.communicate()
return out
-def extractRadiusIpAddr(podName):
- proc1 = subprocess.Popen(['/usr/bin/kubectl', 'describe', 'pod', '-n', 'voltha', podName],
+
+def extract_radius_ip_addr(pod_name):
+ proc1 = subprocess.Popen(['/usr/bin/kubectl', 'describe', 'pod', '-n', 'voltha', pod_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc2 = subprocess.Popen(['grep', '^IP:'], stdin=proc1.stdout,
@@ -131,31 +139,32 @@
proc3 = subprocess.Popen(['awk', "{print $2}"], stdin=proc2.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
-
- proc1.stdout.close
- proc2.stdout.close
+
+ proc1.stdout.close()
+ proc2.stdout.close()
out, err = proc3.communicate()
return out
-def extractPodName(shortPodName):
+
+def extract_pod_name(short_pod_name):
proc1 = subprocess.Popen(['/usr/bin/kubectl', 'get', 'pods', '--all-namespaces'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
- proc2 = subprocess.Popen(['grep', '-e', shortPodName], stdin=proc1.stdout,
+ proc2 = subprocess.Popen(['grep', '-e', short_pod_name], stdin=proc1.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc3 = subprocess.Popen(['awk', "{print $2}"], stdin=proc2.stdout,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
-
-
- proc1.stdout.close
- proc2.stdout.close
+
+ proc1.stdout.close()
+ proc2.stdout.close()
out, err = proc3.communicate()
return out
-def modifyRadiusIpInJsonUsingSed(self, newIpAddr):
- sedCommand ="sed -i '/radiusIp/c\ \"radiusIp\":\"'%s'\",' %s/tests/atests/build/aaa_json" % (newIpAddr, getDir(self, 'voltha'))
+
+def modify_radius_ip_in_json_using_sed(self, new_ip_addr):
+ sedCommand = "sed -i '/radiusIp/c\ \"radiusIp\":\"'%s'\",' %s/tests/atests/build/aaa_json" \
+ % (new_ip_addr, get_dir(self, 'voltha'))
status = commands.getstatusoutput(sedCommand)[0]
return status
-
diff --git a/tests/atests/common/volthaMngr.py b/tests/atests/common/volthaMngr.py
index c164989..9b314c2 100755
--- a/tests/atests/common/volthaMngr.py
+++ b/tests/atests/common/volthaMngr.py
@@ -20,48 +20,48 @@
import os
-import time
import subprocess
import testCaseUtils
import logging
+
class VolthaMngr(object):
"""
This class implements voltha startup/shutdown callable helper functions
"""
def __init__(self):
- self.dirs = {}
- self.dirs ['root'] = None
- self.dirs ['voltha'] = None
- self.dirs ['log'] = None
+ self.dirs = dict()
+ self.dirs['root'] = None
+ self.dirs['voltha'] = None
+ self.dirs['log'] = None
- def vSetLogDirs(self, rootDir, volthaDir, logDir):
- testCaseUtils.configDirs(self, logDir, rootDir, volthaDir)
+ def v_set_log_dirs(self, root_dir, voltha_dir, log_dir):
+ testCaseUtils.config_dirs(self, log_dir, root_dir, voltha_dir)
- def startAllPods(self):
- proc1 = subprocess.Popen([testCaseUtils.getDir(self, 'root') + '/build.sh', 'start'],
+ def start_all_pods(self):
+ proc1 = subprocess.Popen([testCaseUtils.get_dir(self, 'root') + '/build.sh', 'start'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc1.communicate()[0]
print(output)
- proc1.stdout.close
+ proc1.stdout.close()
- def stopAllPods(self):
- proc1 = subprocess.Popen([testCaseUtils.getDir(self, 'root') + '/build.sh', 'stop'],
+ def stop_all_pods(self):
+ proc1 = subprocess.Popen([testCaseUtils.get_dir(self, 'root') + '/build.sh', 'stop'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc1.communicate()[0]
print(output)
- proc1.stdout.close
+ proc1.stdout.close()
- def resetKubeAdm(self):
- proc1 = subprocess.Popen([testCaseUtils.getDir(self, 'root') + '/build.sh', 'clear'],
+ def reset_kube_adm(self):
+ proc1 = subprocess.Popen([testCaseUtils.get_dir(self, 'root') + '/build.sh', 'clear'],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
output = proc1.communicate()[0]
print(output)
- proc1.stdout.close
+ proc1.stdout.close()
"""
Because we are not deploying SEBA with XOS and NEM, and that a standalone Voltha
@@ -69,63 +69,62 @@
NetCfg in two fashion.
One is to add to device list and the other is to add the missing Sadis section
"""
- def alterOnosNetCfg(self):
- logging.info ('Altering the Onos NetCfg to suit Voltha\'s needs')
- logging.debug ('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
- 'http://localhost:30120/onos/v1/network/configuration/devices/ -d @%s/tests/atests/build/devices_json'
- % testCaseUtils.getDir(self, 'voltha'))
+ def alter_onos_net_cfg(self):
+ logging.info('Altering the Onos NetCfg to suit Voltha\'s needs')
+ logging.debug('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
+ 'http://localhost:30120/onos/v1/network/configuration/devices/ -d @%s/tests/atests/build/devices_json'
+ % testCaseUtils.get_dir(self, 'voltha'))
os.system('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
- 'http://localhost:30120/onos/v1/network/configuration/devices/ -d @%s/tests/atests/build/devices_json'
- % testCaseUtils.getDir(self, 'voltha'))
- logging.debug ('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
- 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/sadis_json'
- % testCaseUtils.getDir(self, 'voltha'))
+ 'http://localhost:30120/onos/v1/network/configuration/devices/ -d @%s/tests/atests/build/devices_json'
+ % testCaseUtils.get_dir(self, 'voltha'))
+ logging.debug('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
+ 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/sadis_json'
+ % testCaseUtils.get_dir(self, 'voltha'))
os.system('curl --user karaf:karaf -X POST -H "Content-Type: application/json" '
- 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/sadis_json'
- % testCaseUtils.getDir(self, 'voltha'))
-
- def getAllRunningPods(self):
- allRunningPods = []
- proc1 = subprocess.Popen(['/usr/bin/kubectl', 'get', 'pods', '--all-namespaces'],
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc2 = subprocess.Popen(['grep', '-v', 'NAMESPACE'], stdin=proc1.stdout,
- stdout=subprocess.PIPE,
- stderr=subprocess.PIPE)
- proc1.stdout.close
- out, err = proc2.communicate()
- print(out)
- if out:
- for line in out.split('\n'):
- items = line.split()
- nsName = {}
- if len(items) > 2:
- nsName = {}
- nsName['NS'] = items[0]
- nsName['Name'] = items[1]
- allRunningPods.append(nsName)
- return allRunningPods
-
- def collectPodLogs(self):
+ 'http://localhost:30120/onos/v1/network/configuration/apps/ -d @%s/tests/atests/build/sadis_json'
+ % testCaseUtils.get_dir(self, 'voltha'))
+
+ def collect_pod_logs(self):
logging.info('Collect logs from all Pods')
- allRunningPods = self.getAllRunningPods()
+ allRunningPods = get_all_running_pods()
for nsName in allRunningPods:
Namespace = nsName.get('NS')
- podName = nsName.get('Name')
+ podName = nsName.get('Name')
if 'onos' not in podName:
os.system('/usr/bin/kubectl logs -n %s -f %s > %s/%s.log 2>&1 &' %
- (Namespace, podName, testCaseUtils.getDir(self, 'log'), podName))
+ (Namespace, podName, testCaseUtils.get_dir(self, 'log'), podName))
else:
os.system('/usr/bin/kubectl logs -n %s -f %s onos > %s/%s.log 2>&1 &' %
- (Namespace, podName, testCaseUtils.getDir(self, 'log'), podName))
+ (Namespace, podName, testCaseUtils.get_dir(self, 'log'), podName))
-def voltha_Initialize(rootDir, volthaDir, logDir):
- voltha = VolthaMngr()
- voltha.vSetLogDirs(rootDir, volthaDir, logDir)
- voltha.stopAllPods()
- voltha.resetKubeAdm()
- voltha.startAllPods()
- voltha.alterOnosNetCfg()
- voltha.collectPodLogs()
+def get_all_running_pods():
+ allRunningPods = []
+ proc1 = subprocess.Popen(['/usr/bin/kubectl', 'get', 'pods', '--all-namespaces'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc2 = subprocess.Popen(['grep', '-v', 'NAMESPACE'], stdin=proc1.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ proc1.stdout.close()
+ out, err = proc2.communicate()
+ print(out)
+ if out:
+ for line in out.split('\n'):
+ items = line.split()
+ if len(items) > 2:
+ nsName = dict()
+ nsName['NS'] = items[0]
+ nsName['Name'] = items[1]
+ allRunningPods.append(nsName)
+ return allRunningPods
+
+def voltha_initialize(root_dir, voltha_dir, log_dir):
+ voltha = VolthaMngr()
+ voltha.v_set_log_dirs(root_dir, voltha_dir, log_dir)
+ voltha.stop_all_pods()
+ voltha.reset_kube_adm()
+ voltha.start_all_pods()
+ voltha.alter_onos_net_cfg()
+ voltha.collect_pod_logs()
diff --git a/tests/atests/robot/voltha_automated_test_suite.robot b/tests/atests/robot/voltha_automated_test_suite.robot
index e9d4e86..7779d82 100755
--- a/tests/atests/robot/voltha_automated_test_suite.robot
+++ b/tests/atests/robot/voltha_automated_test_suite.robot
@@ -46,8 +46,8 @@
... and then enables both it and a number of ponsim-ONUs with predefined IP/port
... information. It then verifies that all the physical and logical devices are ACTIVE
... and REACHEABLE
- PSet Log Dirs ${LOG_DIR}
- PConfigure ${OLT_IP_ADDR} ${OLT_PORT_ID} ${OLT_TYPE} ${ONU_TYPE}
+ P Set Log Dirs ${LOG_DIR}
+ P Configure ${OLT_IP_ADDR} ${OLT_PORT_ID} ${OLT_TYPE} ${ONU_TYPE}
Preprovision Olt
Wait Until Keyword Succeeds 60s 2s Query Devices Before Enabling
Status Should Be Success After Preprovision Command
@@ -65,8 +65,8 @@
... It also insures that the peers fields contains device Id entries for the corresponding
... Olt or Onu device. Functionality to support multiple ONU accomodated
... The extent of the flow validation is limited to checking whether number of Flows is > 0
- DSet Log Dirs ${LOG_DIR}
- DConfigure ${LOGICAL_TYPE} ${OLT_TYPE} ${ONU_TYPE}
+ D Set Log Dirs ${LOG_DIR}
+ D Configure ${LOGICAL_TYPE} ${OLT_TYPE} ${ONU_TYPE}
Olt Discovery
Onu Discovery
Logical Device
@@ -80,13 +80,13 @@
... This test attempts to perform a Radius Authentication from the RG
... It uses the wpa_supplicant app to authenticate using EAPOL.
... We then verify the generated log file confirming all the authentication steps
- ASet Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
+ A Set Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
Discover RG Pod Name
Discover Freeradius Pod Name
Discover Freeradius Ip Addr
Set Current Freeradius Ip In AAA Json
Alter AAA Application Configuration In Onos Using AAA Json
- Execute Authenticatication On RG
+ Execute Authentication On RG
Verify Authentication Should Have Started
Verify Authentication Should Have Completed
Verify Authentication Should Have Disconnected
@@ -97,20 +97,20 @@
[Documentation] Start Voltha infrastructure to run test(s). This includes starting all
... Kubernetes Pods and start collection of logs. PonsimV2 has now been
... containerized and does not need to be managed separately
- ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR} Dir Init ${LOG_DIR}
+ ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR} Dir Init ${LOG_DIR}
Set Suite Variable ${ROOT_DIR}
Set Suite Variable ${VOLTHA_DIR}
Set Suite Variable ${LOG_DIR}
- VSet Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
+ V Set Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
Stop Voltha
Start All Pods
Sleep 60
${pod_status} Run kubectl get pods --all-namespaces
- Log To Console \n ${pod_status}
- Alter Onos NetCfg
+ Log To Console \n ${pod_status}
+ Alter Onos Net Cfg
Stop Voltha
- [Documentation] Stop Voltha infrastucture. This includes clearing all installation milestones
+ [Documentation] Stop Voltha infrastructure. This includes clearing all installation milestones
... files and stopping all Kubernetes pods
Collect Pod Logs
Stop All Pods