blob: 88f1913e99034679102ff62a953491bb4c4258fb [file] [log] [blame]
Zack Williams821c5022020-01-15 15:11:46 -07001#
2# Copyright 2018 the original author or authors.
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
16
17"""
18Test Case Utils module
19"""
20
21from __future__ import absolute_import
22
23import time
24import subprocess
25import pexpect
26import sys
27
28
29class TestCaseUtils():
30
31 @staticmethod
32 def config_dirs(self, log_dir, root_dir=None, voltha_dir=None):
33 self.dirs["log"] = log_dir
34 self.dirs["root"] = root_dir
35 self.dirs["voltha"] = voltha_dir
36
37 def get_dir(self, directory):
38 return self.dirs.get(directory)
39
40 @staticmethod
41 def remove_leading_line(log_dir, log_file):
42 with open(log_dir + "/" + log_file, "r+") as FILE:
43 lines = FILE.readlines()
44 FILE.seek(0)
45 lines = lines[1:]
46 for line in lines:
47 FILE.write(line)
48 FILE.truncate()
49 FILE.close()
50
51 @staticmethod
52 def write_log_of_voltha_cli_comand(
53 log_dir,
54 log_file1,
55 cmd1,
56 log_file2=None,
57 cmd2=None,
58 log_file3=None,
59 cmd3=None,
60 host="localhost",
61 ):
62 output = open(log_dir + "/" + log_file1, "wb")
63 child = pexpect.spawn(
64 "ssh -p 30110 -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no voltha@%s"
65 % host
66 )
67 child.expect(r"[pP]assword:")
68 child.sendline("admin")
69 child.expect(r"\((\x1b\[\d*;?\d+m){1,2}voltha(\x1b\[\d*;?\d+m){1,2}\)")
70 time.sleep(10)
71 child.sendline(cmd1)
72 i = child.expect(
73 [
74 r"\((\x1b\[\d*;?\d+m){1,2}voltha(\x1b\[\d*;?\d+m){1,2}\)",
75 r"\((\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\x1b\[\d*;?\d+m){1,2}\)",
76 ]
77 )
78 if i == 0:
79 output.write(child.before)
80 output.close()
81 TestCaseUtils.remove_leading_line(log_dir, log_file1)
82 elif i == 1:
83 if log_file2 is not None and cmd2 is not None:
84 output = open(log_dir + "/" + log_file2, "wb")
85 child.sendline(cmd2)
86 child.expect(
87 r"\((\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\x1b\[\d*;?\d+m){1,2}\)"
88 )
89 output.write(child.before)
90 output.close()
91 TestCaseUtils.remove_leading_line(log_dir, log_file2)
92 if log_file3 is not None and cmd3 is not None:
93 output = open(log_dir + "/" + log_file3, "wb")
94 child.sendline(cmd3)
95 child.expect(
96 r"\((\x1b\[\d*;?\d+m){1,2}.*device [0-9a-f]{16}(\x1b\[\d*;?\d+m){1,2}\)"
97 )
98 output.write(child.before)
99 output.close()
100 TestCaseUtils.remove_leading_line(log_dir, log_file3)
101 child.close()
102
103 @staticmethod
104 def write_log_of_onos_cli_command(log_dir, log_file, cmd, host="localhost", port=30115):
105 output = open(log_dir + "/" + log_file, "wb")
106 child = pexpect.spawn(
107 "ssh -p %s -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no karaf@%s"
108 % (port, host)
109 )
110 child.expect(r"[pP]assword:")
111 child.sendline("karaf")
112 # Expected prompt:
113 # onos> (ONOS 1.x)
114 # karaf@root > (ONOS 2.x)
115 child.expect([r'(\x1b\[\d*;?\d+m){1,2}onos> (\x1b\[\d*;?\d+m){1,2}', r'karaf@root >'])
116 child.sendline(cmd)
117 child.expect([r'(\x1b\[\d*;?\d+m){1,2}onos> (\x1b\[\d*;?\d+m){1,2}', r'karaf@root >'])
118
119 output.write(child.before)
120
121 output.close()
122 child.close()
123
124 def get_fields_from_grep_command(self, search_word, log_file):
125 grepCommand = "grep %s %s/%s" % (search_word, self.get_dir("log"), log_file)
126 statusLines = subprocess.getstatusoutput(grepCommand)[1]
127 return statusLines
128
129 @staticmethod
130 def parse_fields(status_line, delimiter):
131 statusList = status_line.split(delimiter)
132 return statusList
133
134 def print_log_file(self, log_file):
135 with open(self.get_dir("log") + "/" + log_file, "r+") as FILE:
136 lines = FILE.readlines()
137 print
138 for line in lines:
139 sys.stdout.write(line)
140
141 @staticmethod
142 def extract_pod_ip_addr(pod_name):
143 proc1 = subprocess.Popen(
144 ["/usr/bin/kubectl", "get", "svc", "--all-namespaces"],
145 stdout=subprocess.PIPE,
146 stderr=subprocess.PIPE,
147 )
148 proc2 = subprocess.Popen(
149 ["grep", "-e", pod_name],
150 stdin=proc1.stdout,
151 stdout=subprocess.PIPE,
152 stderr=subprocess.PIPE,
153 )
154 proc3 = subprocess.Popen(
155 ["awk", "{print $4}"],
156 stdin=proc2.stdout,
157 stdout=subprocess.PIPE,
158 stderr=subprocess.PIPE,
159 )
160
161 proc1.stdout.close()
162 proc2.stdout.close()
163 out, err = proc3.communicate()
164 return out
165
166 @staticmethod
167 def extract_radius_ip_addr(pod_name):
168 proc1 = subprocess.Popen(
169 ["/usr/bin/kubectl", "describe", "pod", "-n", "voltha", pod_name],
170 stdout=subprocess.PIPE,
171 stderr=subprocess.PIPE,
172 )
173 proc2 = subprocess.Popen(
174 ["grep", "^IP:"],
175 stdin=proc1.stdout,
176 stdout=subprocess.PIPE,
177 stderr=subprocess.PIPE,
178 )
179 proc3 = subprocess.Popen(
180 ["awk", "{print $2}"],
181 stdin=proc2.stdout,
182 stdout=subprocess.PIPE,
183 stderr=subprocess.PIPE,
184 )
185
186 proc1.stdout.close()
187 proc2.stdout.close()
188 out, err = proc3.communicate()
189 return out
190
191 @staticmethod
192 def extract_pod_name(short_pod_name):
193 proc1 = subprocess.Popen(
194 ["/usr/bin/kubectl", "get", "pods", "--all-namespaces"],
195 stdout=subprocess.PIPE,
196 stderr=subprocess.PIPE,
197 )
198 proc2 = subprocess.Popen(
199 ["grep", "-e", short_pod_name],
200 stdin=proc1.stdout,
201 stdout=subprocess.PIPE,
202 stderr=subprocess.PIPE,
203 )
204 proc3 = subprocess.Popen(
205 ["awk", "{print $2}"],
206 stdin=proc2.stdout,
207 stdout=subprocess.PIPE,
208 stderr=subprocess.PIPE,
209 )
210
211 proc1.stdout.close()
212 proc2.stdout.close()
213 out, err = proc3.communicate()
214 return out
215
216 def modify_radius_ip_in_json_using_sed(self, new_ip_addr):
217 sedCommand = (
218 "sed -i '/radiusIp/c\\ \"radiusIp\":\"'%s'\",' %s/tests/atests/build/aaa_json"
219 % (new_ip_addr, self.get_dir("voltha"))
220 )
221 status = subprocess.getstatusoutput(sedCommand)[0]
222 return status
223
224 @staticmethod
225 def discover_rg_pod_name():
226 return TestCaseUtils.extract_pod_name("rg0").strip()
227
228 @staticmethod
229 def retrieve_authorized_users_device_id_and_port_number(status_line):
230 fields = TestCaseUtils.parse_fields(status_line, ",")
231 deviceField = fields[2].strip()
232 deviceStr, equal, deviceId = deviceField.partition("=")
233 device_Id = deviceId
234 portField = fields[4].strip()
235 portNumStr, equal, portNum = portField.partition("=")
236 portNumber = portNum
237 return device_Id, portNumber
238
239 def add_subscriber_access(self, device_id, port_number):
240 TestCaseUtils.write_log_of_onos_cli_command(
241 self.get_dir("log"),
242 "voltha_add_subscriber_access.log",
243 "volt-add-subscriber-access %s %s" % (device_id, port_number),
244 )