blob: 66db06f3f12318c986b3d07d01caafbf7546d01a [file] [log] [blame]
Gilles Depatieed99efe2019-03-12 16:12:26 -04001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18vOLT-HA Unicast Test Case module
19"""
20
21import time
22import testCaseUtils
23import logging
24import subprocess
25import commands
26
27
28class Unicast(object):
29 """
30 This class implements voltha Unicast test case
31 """
32
33 PING_TEST_FILENAME = 'voltha_ping_test.log'
34 TCPDUMP_FILENAME = 'voltha_tcpdump.log'
35
36 def __init__(self):
37 self.dirs = dict()
38 self.dirs['log'] = None
39 self.dirs['root'] = None
40 self.dirs['voltha'] = None
41
42 self.__rgName = testCaseUtils.discover_rg_pod_name()
43 self.__fields = None
44 self.__tcpdumpPid = None
45 self.__sadisCTag = None
46 self.__sadisSTag = None
47
48 def u_set_log_dirs(self, root_dir, voltha_dir, log_dir):
49 testCaseUtils.config_dirs(self, log_dir, root_dir, voltha_dir)
50
51 def execute_ping_test(self):
52 logging.info('Ping 1.2.3.4 IP Test')
53 process_output = open('%s/%s' % (testCaseUtils.get_dir(self, 'log'), self.PING_TEST_FILENAME), 'w')
54 pingTest = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-it', '-n', 'voltha', self.__rgName, '--',
55 '/bin/ping', '-I', 'eth0', '1.2.3.4'],
56 stdout=process_output,
57 stderr=process_output)
58
59 self.execute_tcpdump()
60
61 self.kill_ping_test()
62
63 pingTest.wait()
64 process_output.close()
65
66 testCaseUtils.print_log_file(self, self.PING_TEST_FILENAME)
67 testCaseUtils.print_log_file(self, self.TCPDUMP_FILENAME)
68 print
69
70 def execute_tcpdump(self):
71 logging.info('Execute tcpdump')
72 process_output = open('%s/%s' % (testCaseUtils.get_dir(self, 'log'), self.TCPDUMP_FILENAME), 'w')
73 tcpdump = subprocess.Popen(['sudo', '/usr/sbin/tcpdump', '-nei', 'pon1'],
74 stdout=process_output,
75 stderr=process_output)
76 self.__tcpdumpPid = tcpdump.pid
77
78 time.sleep(20)
79
80 self.kill_tcpdump()
81 tcpdump.wait()
82 process_output.close()
83
84 def kill_ping_test(self):
85 procPidPing1 = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'ps', '-ef'],
86 stdout=subprocess.PIPE,
87 stderr=subprocess.PIPE)
88 procPidPing2 = subprocess.Popen(['grep', '-e', '/bin/ping'], stdin=procPidPing1.stdout,
89 stdout=subprocess.PIPE,
90 stderr=subprocess.PIPE)
91 procPidPing3 = subprocess.Popen(['awk', "{print $2}"], stdin=procPidPing2.stdout,
92 stdout=subprocess.PIPE,
93 stderr=subprocess.PIPE)
94
95 procPidPing1.stdout.close()
96 procPidPing2.stdout.close()
97
98 out, err = procPidPing3.communicate()
99 pingPid = out.strip()
100
101 procKillPing = subprocess.Popen(['/usr/bin/kubectl', 'exec', '-n', 'voltha', self.__rgName, '--', 'kill', pingPid],
102 stdout=subprocess.PIPE,
103 stderr=subprocess.PIPE)
104 out, err = procKillPing.communicate()
105 assert not err, 'Killing Ping returned %s' % err
106
107 def kill_tcpdump(self):
108 procKillTcpdump = subprocess.Popen(['sudo', 'pkill', '-P', str(self.__tcpdumpPid)],
109 stdout=subprocess.PIPE,
110 stderr=subprocess.PIPE)
111 out, err = procKillTcpdump.communicate()
112 assert not err, 'Killing Tcpdump returned %s' % err
113
114 def ping_test_should_have_failed(self):
115 statusLines = testCaseUtils.get_fields_from_grep_command(self, 'Destination Host Unreachable', self.PING_TEST_FILENAME)
116 assert statusLines, 'Ping Test Issue, no Destination Host Unreachable'
117 lineCount = 0
118 lines = statusLines.splitlines()
119 for line in lines:
120 logging.debug(line)
121 lineCount += 1
122 if lineCount > 1: # Must have 2 or more instances
123 return True
124
125 def stag_and_ctag_should_match_sadis_entry(self):
126 logging.info('Evaluating sTag and cTag in each packet')
127 statusLines = testCaseUtils.get_fields_from_grep_command(self, '802.1Q', self.TCPDUMP_FILENAME)
128 assert statusLines, 'tcpdump contains no 802.1Q tagged packets'
129 lines = statusLines.splitlines()
130 for line in lines:
131 header, tagId, after = line.partition('802.1Q')
132 self.__fields = testCaseUtils.parse_fields(after, ',')
133 stag = self.__fields[1].strip().split(':')[1].strip().split()[1].strip()
134 before, tagId, after = line.rpartition('802.1Q')
135 self.__fields = testCaseUtils.parse_fields(after, ',')
136 ctag = self.__fields[1].strip().split()[1].strip()
137 self.stag_and_ctag_should_match_sadis_file(ctag, stag)
138
139 def should_have_q_in_q_vlan_tagging(self):
140 statusLines = testCaseUtils.get_fields_from_grep_command(self, '"Request who-has"', self.TCPDUMP_FILENAME)
141 assert statusLines, 'tcpdump contains no ping packets'
142 lines = statusLines.splitlines()
143 for line in lines:
144 tagCount = line.count('802.1Q')
145 assert tagCount == 2, 'Found a non double tagged packet'
146
147 def retrieve_stag_and_ctag_from_sadis_entries(self):
148 logging.info('Retrieving sTag and cTag from Sadis entries')
149 ctagGrepCommand = "grep %s %s/tests/atests/build/sadis_json" % ('cTag', testCaseUtils.get_dir(self, 'voltha'))
150 statusLines = commands.getstatusoutput(ctagGrepCommand)[1]
151 assert statusLines, 'No cTag found in sadis_json'
152 self.__sadisCTag = statusLines.split(':')[1].strip(',')
153 stagGrepCommand = "grep %s %s/tests/atests/build/sadis_json" % ('sTag', testCaseUtils.get_dir(self, 'voltha'))
154 statusLines = commands.getstatusoutput(stagGrepCommand)[1]
155 assert statusLines, 'No sTag found in sadis_json'
156 self.__sadisSTag = statusLines.split(':')[1].strip(',')
157
158 def stag_and_ctag_should_match_sadis_file(self, ctag, stag):
159 assert ctag == self.__sadisCTag and stag == self.__sadisSTag, 'cTag and/or sTag do not match value in sadis file\n \
160 vlan cTag = %s, sadis cTag = %s : vlan sTag = %s, sadis sTag = %s' % (ctag, self.__sadisCTag, stag, self.__sadisSTag)
161
162
163def run_test(root_dir, voltha_dir, log_dir):
164
165 unicast = Unicast()
166 unicast.u_set_log_dirs(root_dir, voltha_dir, log_dir)
167 unicast.execute_ping_test()
168 unicast.should_have_q_in_q_vlan_tagging()
169 unicast.retrieve_stag_and_ctag_from_sadis_entries()
170 unicast.stag_and_ctag_should_match_sadis_entry()
171
172
173
174
175
176
177