VOL-1059: Test Case: Unicast flow setup ctag/stag assignment
Removing un-needed s/c tag inequality test
Change-Id: Ia0da32101f590b76170994f05ce6072f3a9bce2b
diff --git a/tests/atests/build/dhcp_json b/tests/atests/build/dhcp_json
index eb7d232..dc772d8 100755
--- a/tests/atests/build/dhcp_json
+++ b/tests/atests/build/dhcp_json
@@ -5,7 +5,7 @@
"mac":"ca:fe:ca:fe:ca:fe",
"subnet":"255.255.252.0",
"broadcast":"10.1.11.255",
- "router":"10.1.8.1",
+ "router":"10.1.11.1",
"domain":"8.8.8.8",
"ttl":"63",
"lease":"300",
diff --git a/tests/atests/common/authentication.py b/tests/atests/common/authentication.py
index 5cde7b5..aedbbf2 100644
--- a/tests/atests/common/authentication.py
+++ b/tests/atests/common/authentication.py
@@ -38,16 +38,13 @@
self.dirs['root'] = None
self.dirs['voltha'] = None
- self.__rgName = None
+ self.__rgName = testCaseUtils.discover_rg_pod_name()
self.__radiusName = None
self.__radiusIp = None
def a_set_log_dirs(self, root_dir, voltha_dir, log_dir):
testCaseUtils.config_dirs(self, log_dir, root_dir, voltha_dir)
- def discover_rg_pod_name(self):
- self.__rgName = testCaseUtils.extract_pod_name('rg-').strip()
-
def discover_freeradius_pod_name(self):
self.__radiusName = testCaseUtils.extract_pod_name('freeradius').strip()
logging.info('freeradius Name = %s' % self.__radiusName)
@@ -96,7 +93,7 @@
out, err = procPidSupplicant3.communicate()
supplicantPid = out.strip()
-
+
procKillSupplicant1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'kill', supplicantPid],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
@@ -149,7 +146,6 @@
def run_test(root_dir, voltha_dir, log_dir):
auth = Authentication()
auth.a_set_log_dirs(root_dir, voltha_dir, log_dir)
- auth.discover_rg_pod_name()
auth.discover_freeradius_pod_name()
auth.discover_freeradius_ip_addr()
auth.set_current_freeradius_ip_in_aaa_json()
diff --git a/tests/atests/common/auto_test.py b/tests/atests/common/auto_test.py
index b43665a..d93e7b5 100755
--- a/tests/atests/common/auto_test.py
+++ b/tests/atests/common/auto_test.py
@@ -27,6 +27,7 @@
import discovery
import authentication
import dhcp
+import unicast
import logging
DEFAULT_LOG_DIR = '/tmp/voltha_test_results'
@@ -82,4 +83,6 @@
dhcp.run_test(ROOT_DIR, VOLTHA_DIR, LOG_DIR)
+ unicast.run_test(ROOT_DIR, VOLTHA_DIR, LOG_DIR)
+
time.sleep(5)
diff --git a/tests/atests/common/dhcp.py b/tests/atests/common/dhcp.py
index cedd7ee..d52234a 100644
--- a/tests/atests/common/dhcp.py
+++ b/tests/atests/common/dhcp.py
@@ -40,7 +40,7 @@
self.dirs['root'] = None
self.dirs['voltha'] = None
- self.__rgName = None
+ self.__rgName = testCaseUtils.discover_rg_pod_name()
self.__fields = None
self.__deviceId = None
self.__portNumber = None
@@ -48,9 +48,6 @@
def h_set_log_dirs(self, root_dir, voltha_dir, log_dir):
testCaseUtils.config_dirs(self, log_dir, root_dir, voltha_dir)
- def lookup_rg_pod_name(self):
- self.__rgName = testCaseUtils.extract_pod_name('rg-').strip()
-
def discover_authorized_users(self):
testCaseUtils.send_command_to_onos_cli(testCaseUtils.get_dir(self, 'log'),
'voltha_onos_users.log', 'aaa-users')
@@ -164,6 +161,29 @@
testCaseUtils.print_log_file(self, self.ASSIGN_DHCP_IP_FILENAME)
+ procPidDhclient1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'ps', '-ef'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ procPidDhclient2 = subprocess.Popen(['grep', '-e', 'dhclient'], stdin=procPidDhclient1.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ procPidDhclient3 = subprocess.Popen(['awk', "{print $2}"], stdin=procPidDhclient2.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ procPidDhclient1.stdout.close()
+ procPidDhclient2.stdout.close()
+
+ out, err = procPidDhclient3.communicate()
+ dhclientPid = out.strip()
+ if dhclientPid:
+ procKillDhclient = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'kill', dhclientPid],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ out, err = procKillDhclient.communicate()
+ assert not err, 'Killing dhclient returned %s' % err
+
def should_have_dhcp_assigned_ip(self):
process_output = open('%s/%s' % (testCaseUtils.get_dir(self, 'log'), self.CHECK_ASSIGNED_IP_FILENAME), 'w')
ifConfigCheck1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'bash', '-c',
@@ -196,7 +216,6 @@
set_firewall_rules()
dhcp = DHCP()
dhcp.h_set_log_dirs(root_dir, voltha_dir, log_dir)
- dhcp.lookup_rg_pod_name()
dhcp.discover_authorized_users()
dhcp.retrieve_authorized_users_device_id_and_port_number()
dhcp.add_subscriber_access()
diff --git a/tests/atests/common/testCaseUtils.py b/tests/atests/common/testCaseUtils.py
index 7ed9cf3..1fe4f82 100755
--- a/tests/atests/common/testCaseUtils.py
+++ b/tests/atests/common/testCaseUtils.py
@@ -168,3 +168,7 @@
% (new_ip_addr, get_dir(self, 'voltha'))
status = commands.getstatusoutput(sedCommand)[0]
return status
+
+
+def discover_rg_pod_name():
+ return extract_pod_name('rg-').strip()
diff --git a/tests/atests/common/unicast.py b/tests/atests/common/unicast.py
new file mode 100644
index 0000000..66db06f
--- /dev/null
+++ b/tests/atests/common/unicast.py
@@ -0,0 +1,177 @@
+#
+# Copyright 2018 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+vOLT-HA Unicast Test Case module
+"""
+
+import time
+import testCaseUtils
+import logging
+import subprocess
+import commands
+
+
+class Unicast(object):
+ """
+ This class implements voltha Unicast test case
+ """
+
+ PING_TEST_FILENAME = 'voltha_ping_test.log'
+ TCPDUMP_FILENAME = 'voltha_tcpdump.log'
+
+ def __init__(self):
+ self.dirs = dict()
+ self.dirs['log'] = None
+ self.dirs['root'] = None
+ self.dirs['voltha'] = None
+
+ self.__rgName = testCaseUtils.discover_rg_pod_name()
+ self.__fields = None
+ self.__tcpdumpPid = None
+ self.__sadisCTag = None
+ self.__sadisSTag = None
+
+ def u_set_log_dirs(self, root_dir, voltha_dir, log_dir):
+ testCaseUtils.config_dirs(self, log_dir, root_dir, voltha_dir)
+
+ def execute_ping_test(self):
+ logging.info('Ping 1.2.3.4 IP Test')
+ process_output = open('%s/%s' % (testCaseUtils.get_dir(self, 'log'), self.PING_TEST_FILENAME), 'w')
+ pingTest = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-it', '-n', 'voltha', self.__rgName, '--',
+ '/bin/ping', '-I', 'eth0', '1.2.3.4'],
+ stdout=process_output,
+ stderr=process_output)
+
+ self.execute_tcpdump()
+
+ self.kill_ping_test()
+
+ pingTest.wait()
+ process_output.close()
+
+ testCaseUtils.print_log_file(self, self.PING_TEST_FILENAME)
+ testCaseUtils.print_log_file(self, self.TCPDUMP_FILENAME)
+ print
+
+ def execute_tcpdump(self):
+ logging.info('Execute tcpdump')
+ process_output = open('%s/%s' % (testCaseUtils.get_dir(self, 'log'), self.TCPDUMP_FILENAME), 'w')
+ tcpdump = subprocess.Popen(['sudo', '/usr/sbin/tcpdump', '-nei', 'pon1'],
+ stdout=process_output,
+ stderr=process_output)
+ self.__tcpdumpPid = tcpdump.pid
+
+ time.sleep(20)
+
+ self.kill_tcpdump()
+ tcpdump.wait()
+ process_output.close()
+
+ def kill_ping_test(self):
+ procPidPing1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'ps', '-ef'],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ procPidPing2 = subprocess.Popen(['grep', '-e', '/bin/ping'], stdin=procPidPing1.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ procPidPing3 = subprocess.Popen(['awk', "{print $2}"], stdin=procPidPing2.stdout,
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+
+ procPidPing1.stdout.close()
+ procPidPing2.stdout.close()
+
+ out, err = procPidPing3.communicate()
+ pingPid = out.strip()
+
+ procKillPing = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'kill', pingPid],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = procKillPing.communicate()
+ assert not err, 'Killing Ping returned %s' % err
+
+ def kill_tcpdump(self):
+ procKillTcpdump = subprocess.Popen(['sudo', 'pkill', '-P', str(self.__tcpdumpPid)],
+ stdout=subprocess.PIPE,
+ stderr=subprocess.PIPE)
+ out, err = procKillTcpdump.communicate()
+ assert not err, 'Killing Tcpdump returned %s' % err
+
+ def ping_test_should_have_failed(self):
+ statusLines = testCaseUtils.get_fields_from_grep_command(self, 'Destination Host Unreachable', self.PING_TEST_FILENAME)
+ assert statusLines, 'Ping Test Issue, no Destination Host Unreachable'
+ lineCount = 0
+ lines = statusLines.splitlines()
+ for line in lines:
+ logging.debug(line)
+ lineCount += 1
+ if lineCount > 1: # Must have 2 or more instances
+ return True
+
+ def stag_and_ctag_should_match_sadis_entry(self):
+ logging.info('Evaluating sTag and cTag in each packet')
+ statusLines = testCaseUtils.get_fields_from_grep_command(self, '802.1Q', self.TCPDUMP_FILENAME)
+ assert statusLines, 'tcpdump contains no 802.1Q tagged packets'
+ lines = statusLines.splitlines()
+ for line in lines:
+ header, tagId, after = line.partition('802.1Q')
+ self.__fields = testCaseUtils.parse_fields(after, ',')
+ stag = self.__fields[1].strip().split(':')[1].strip().split()[1].strip()
+ before, tagId, after = line.rpartition('802.1Q')
+ self.__fields = testCaseUtils.parse_fields(after, ',')
+ ctag = self.__fields[1].strip().split()[1].strip()
+ self.stag_and_ctag_should_match_sadis_file(ctag, stag)
+
+ def should_have_q_in_q_vlan_tagging(self):
+ statusLines = testCaseUtils.get_fields_from_grep_command(self, '"Request who-has"', self.TCPDUMP_FILENAME)
+ assert statusLines, 'tcpdump contains no ping packets'
+ lines = statusLines.splitlines()
+ for line in lines:
+ tagCount = line.count('802.1Q')
+ assert tagCount == 2, 'Found a non double tagged packet'
+
+ def retrieve_stag_and_ctag_from_sadis_entries(self):
+ logging.info('Retrieving sTag and cTag from Sadis entries')
+ ctagGrepCommand = "grep %s %s/tests/atests/build/sadis_json" % ('cTag', testCaseUtils.get_dir(self, 'voltha'))
+ statusLines = commands.getstatusoutput(ctagGrepCommand)[1]
+ assert statusLines, 'No cTag found in sadis_json'
+ self.__sadisCTag = statusLines.split(':')[1].strip(',')
+ stagGrepCommand = "grep %s %s/tests/atests/build/sadis_json" % ('sTag', testCaseUtils.get_dir(self, 'voltha'))
+ statusLines = commands.getstatusoutput(stagGrepCommand)[1]
+ assert statusLines, 'No sTag found in sadis_json'
+ self.__sadisSTag = statusLines.split(':')[1].strip(',')
+
+ def stag_and_ctag_should_match_sadis_file(self, ctag, stag):
+ assert ctag == self.__sadisCTag and stag == self.__sadisSTag, 'cTag and/or sTag do not match value in sadis file\n \
+ vlan cTag = %s, sadis cTag = %s : vlan sTag = %s, sadis sTag = %s' % (ctag, self.__sadisCTag, stag, self.__sadisSTag)
+
+
+def run_test(root_dir, voltha_dir, log_dir):
+
+ unicast = Unicast()
+ unicast.u_set_log_dirs(root_dir, voltha_dir, log_dir)
+ unicast.execute_ping_test()
+ unicast.should_have_q_in_q_vlan_tagging()
+ unicast.retrieve_stag_and_ctag_from_sadis_entries()
+ unicast.stag_and_ctag_should_match_sadis_entry()
+
+
+
+
+
+
+
diff --git a/tests/atests/robot/voltha_automated_test_suite.robot b/tests/atests/robot/voltha_automated_test_suite.robot
index efa6639..4acb91f 100755
--- a/tests/atests/robot/voltha_automated_test_suite.robot
+++ b/tests/atests/robot/voltha_automated_test_suite.robot
@@ -21,11 +21,13 @@
Library ../common/discovery.py
Library ../common/authentication.py
Library ../common/dhcp.py
+Library ../common/unicast.py
Library volthaMngr.VolthaMngr
Library preprovisioning.Preprovisioning
Library discovery.Discovery
Library authentication.Authentication
Library dhcp.DHCP
+Library unicast.Unicast
Suite Setup Start Voltha
Suite Teardown Stop Voltha
@@ -88,7 +90,6 @@
... It uses the wpa_supplicant app to authenticate using EAPOL.
... We then verify the generated log file confirming all the authentication steps
A Set Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
- Discover RG Pod Name
Discover Freeradius Pod Name
Discover Freeradius Ip Addr
Set Current Freeradius Ip In AAA Json
@@ -108,7 +109,6 @@
... IP address granted to RG upon instantiating the RG pod. Finally we invoke
... 'dhclient' on RG to request a DHCP IP address.
H Set Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
- Lookup Rg Pod Name
Set Firewall Rules
Discover Authorized Users
Retrieve Authorized Users Device Id And Port Number
@@ -121,6 +121,19 @@
Wait Until Keyword Succeeds ${RETRY_TIMEOUT_60} ${RETRY_INTERVAL_2} Assign Dhcp Ip Addr To Rg
Wait Until Keyword Succeeds ${RETRY_TIMEOUT_60} ${RETRY_INTERVAL_2} Should Have Dhcp Assigned Ip
+Unicast flow setup ctag/stag assignment
+ [Documentation] Unicast ctag/stag assignment
+ ... We call Ping from RG to a non-existant IP address an we ignore the
+ ... Destination Host Unreachable message. We then invoke tcpdump on 'pon1'
+ ... network looking for ARP request from RG IP address. These packets should
+ ... be double tagged with different s and c Tags but matching tag configuration
+ ... in sadis entry.
+ U Set Log Dirs ${ROOT_DIR} ${VOLTHA_DIR} ${LOG_DIR}
+ Execute Ping Test
+ Should Have Q In Q Vlan Tagging
+ Retrieve Stag And Ctag From Sadis Entries
+ Stag And Ctag Should Match Sadis Entry
+
*** Keywords ***
Start Voltha
[Documentation] Start Voltha infrastructure to run test(s). This includes starting all