blob: 705c5e1bef471d638d90cea1183f62a5f4fa55a6 [file] [log] [blame]
Matteo Scandolo48d3d2d2017-08-08 13:05:27 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Chetan Gaonkercfcce782016-05-10 10:10:42 -070017#
18# Copyright 2016-present Ciena Corporation
19#
20# Licensed under the Apache License, Version 2.0 (the "License");
21# you may not use this file except in compliance with the License.
22# You may obtain a copy of the License at
23#
24# http://www.apache.org/licenses/LICENSE-2.0
25#
26# Unless required by applicable law or agreed to in writing, software
27# distributed under the License is distributed on an "AS IS" BASIS,
28# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
29# See the License for the specific language governing permissions and
30# limitations under the License.
31#
Chetan Gaonker3ff8eae2016-04-12 14:50:26 -070032'''
33Created on 23-Oct-2012
34
35@authors: Anil Kumar (anilkumar.s@paxterrasolutions.com),
36 Raghav Kashyap(raghavkashyap@paxterrasolutions.com)
37
38
39
40 TestON is free software: you can redistribute it and/or modify
41 it under the terms of the GNU General Public License as published by
42 the Free Software Foundation, either version 2 of the License, or
43 (at your option) any later version.
44
45 TestON is distributed in the hope that it will be useful,
46 but WITHOUT ANY WARRANTY; without even the implied warranty of
47 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
48 GNU General Public License for more details.
49
50 You should have received a copy of the GNU General Public License
51 along with TestON. If not, see <http://www.gnu.org/licenses/>.
52
53
54Utilities will take care about the basic functions like :
55 * Extended assertion,
56 * parse_args for key-value pair handling
57 * Parsing the params or topology file.
58
59'''
60import re
61from configobj import ConfigObj
62import ast
63import smtplib
64
65import email
66import os
67import email.mime.application
68import time
69import random
70from clicommon import *
71
72class Utilities:
73 '''
74 Utilities will take care about the basic functions like :
75 * Extended assertion,
76 * parse_args for key-value pair handling
77 * Parsing the params or topology file.
78 '''
79
80 def __init__(self):
81 self.wrapped = sys.modules[__name__]
82
83 def __getattr__(self, name):
84 '''
85 This will invoke, if the attribute wasn't found the usual ways.
86 Here it will look for assert_attribute and will execute when AttributeError occurs.
87 It will return the result of the assert_attribute.
88 '''
89 try:
90 return getattr(self.wrapped, name)
91 except AttributeError:
92 def assertHandling(**kwargs):
93 nameVar = re.match("^assert",name,flags=0)
94 matchVar = re.match("assert(_not_|_)(equals|matches|greater|lesser)",name,flags=0)
95 notVar = 0
96 operators = ""
97
98 try :
99 if matchVar.group(1) == "_not_" and matchVar.group(2) :
100 notVar = 1
101 operators = matchVar.group(2)
102 elif matchVar.group(1) == "_" and matchVar.group(2):
103 operators = matchVar.group(2)
104 except AttributeError:
105 if matchVar==None and nameVar:
106 operators ='equals'
107 result = self._assert(NOT=notVar,operator=operators,**kwargs)
108 if result == main.TRUE:
109 main.log.info("Assertion Passed")
110 main.STEPRESULT = main.TRUE
111 elif result == main.FALSE:
112 main.log.warn("Assertion Failed")
113 main.STEPRESULT = main.FALSE
114 else:
115 main.log.error("There is an Error in Assertion")
116 main.STEPRESULT = main.ERROR
117 return result
118 return assertHandling
119
120 def _assert (self,**assertParam):
121 '''
122 It will take the arguments :
123 expect:'Expected output'
124 actual:'Actual output'
125 onpass:'Action or string to be triggered or displayed respectively when the assert passed'
126 onfail:'Action or string to be triggered or displayed respectively when the assert failed'
127 not:'optional argument to specify the negation of the each assertion type'
128 operator:'assertion type will be defined by using operator. Like equal , greater, lesser, matches.'
129
130 It will return the assertion result.
131
132 '''
133
134 arguments = self.parse_args(["EXPECT","ACTUAL","ONPASS","ONFAIL","NOT","OPERATOR"],**assertParam)
135
136 result = 0
137 valuetype = ''
138 operation = "not "+ str(arguments["OPERATOR"]) if arguments['NOT'] and arguments['NOT'] == 1 else arguments["OPERATOR"]
139 operators = {'equals':{'STR':'==','NUM':'=='}, 'matches' : '=~', 'greater':'>' ,'lesser':'<'}
140
141 expectMatch = re.match('^\s*[+-]?0(e0)?\s*$', str(arguments["EXPECT"]), re.I+re.M)
142 if not ((not expectMatch) and (arguments["EXPECT"]==0)):
143 valuetype = 'NUM'
144 else :
145 if arguments["OPERATOR"] == 'greater' or arguments["OPERATOR"] == 'lesser':
146 main.log.error("Numeric comparison on strings is not possibele")
147 return main.ERROR
148
149 valuetype = 'STR'
150 arguments["ACTUAL"] = str(arguments["ACTUAL"])
151 if arguments["OPERATOR"] != 'matches':
152 arguments["EXPECT"] = str(arguments["EXPECT"])
153
154 try :
155 opcode = operators[str(arguments["OPERATOR"])][valuetype] if arguments["OPERATOR"] == 'equals' else operators[str(arguments["OPERATOR"])]
156
157 except KeyError as e:
158 print "Key Error in assertion"
159 print e
160 return main.FALSE
161
162 if opcode == '=~':
163 try:
164 assert re.search(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
165 result = main.TRUE
166 except AssertionError:
167 try :
168 assert re.match(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
169 result = main.TRUE
170 except AssertionError:
171 main.log.error("Assertion Failed")
172 result = main.FALSE
173 else :
174 try:
175 if str(opcode)=="==":
176 main.log.info("Verifying the Expected is equal to the actual or not using assert_equal")
177 if (arguments["EXPECT"] == arguments["ACTUAL"]):
178 result = main.TRUE
179 else :
180 result = main.FALSE
181 elif str(opcode) == ">":
182 main.log.info("Verifying the Expected is Greater than the actual or not using assert_greater")
183 if (ast.literal_eval(arguments["EXPECT"]) > ast.literal_eval(arguments["ACTUAL"])) :
184 result = main.TRUE
185 else :
186 result = main.FALSE
187 elif str(opcode) == "<":
188 main.log.info("Verifying the Expected is Lesser than the actual or not using assert_lesser")
189 if (ast.literal_eval(arguments["EXPECT"]) < ast.literal_eval(arguments["ACTUAL"])):
190 result = main.TRUE
191 else :
192 result = main.FALSE
193 except AssertionError:
194 main.log.error("Assertion Failed")
195 result = main.FALSE
196 result = result if result else 0
197 result = not result if arguments["NOT"] and arguments["NOT"] == 1 else result
198 resultString = ""
199 if result :
200 resultString = str(resultString) + "PASS"
201 main.log.info(arguments["ONPASS"])
202 else :
203 resultString = str(resultString) + "FAIL"
204 if not isinstance(arguments["ONFAIL"],str):
205 eval(str(arguments["ONFAIL"]))
206 else :
207 main.log.error(arguments["ONFAIL"])
208 main.log.report(arguments["ONFAIL"])
209 main.onFailMsg = arguments[ 'ONFAIL' ]
210
211 msg = arguments["ON" + str(resultString)]
212
213 if not isinstance(msg,str):
214 try:
215 eval(str(msg))
216 except SyntaxError as e:
217 print "function definition is not right"
218 print e
219
220 main.last_result = result
221 if main.stepResults[2]:
222 main.stepResults[2][-1] = result
223 try:
224 main.stepResults[3][-1] = arguments[ 'ONFAIL' ]
225 except AttributeError:
226 pass
227 else:
228 main.log.warn( "Assertion called before a test step" )
229 return result
230
231 def parse_args(self,args, **kwargs):
232 '''
233 It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase.
234 '''
235 newArgs = {}
236 for key,value in kwargs.iteritems():
237 if isinstance(args,list) and str.upper(key) in args:
238 for each in args:
239 if each==str.upper(key):
240 newArgs [str(each)] = value
241 elif each != str.upper(key) and (newArgs.has_key(str(each)) == False ):
242 newArgs[str(each)] = None
243
244 return newArgs
245
246 def send_mail(self):
247 # Create a text/plain message
248 msg = email.mime.Multipart.MIMEMultipart()
249 try :
250 if main.test_target:
251 sub = "Result summary of \"" + main.TEST + "\" run on component \"" +\
252 main.test_target + "\" Version \"" +\
253 vars( main )[main.test_target].get_version() + "\": " +\
254 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
255 else :
256 sub = "Result summary of \"" + main.TEST + "\": " +\
257 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
258 except ( KeyError, AttributeError ):
259 sub = "Result summary of \"" + main.TEST + "\": " +\
260 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
261
262 msg['Subject'] = sub
263 msg['From'] = main.sender
264 msg['To'] = main.mail
265
266 # The main body is just another attachment
267 body = email.mime.Text.MIMEText( main.logHeader + "\n" +
268 main.testResult)
269 msg.attach( body )
270
271 # Attachments
272 for filename in os.listdir( main.logdir ):
273 filepath = main.logdir + "/" + filename
274 fp = open( filepath, 'rb' )
275 att = email.mime.application.MIMEApplication( fp.read(),
276 _subtype="" )
277 fp.close()
278 att.add_header( 'Content-Disposition',
279 'attachment',
280 filename=filename )
281 msg.attach( att )
282 try:
283 smtp = smtplib.SMTP( main.smtp )
284 smtp.starttls()
285 smtp.login( main.sender, main.senderPwd )
286 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
287 smtp.quit()
288 except Exception:
289 main.log.exception( "Error sending email" )
290 return main.TRUE
291
292 def send_warning_email( self, subject=None ):
293 try:
294 if not subject:
295 subject = main.TEST + " PAUSED!"
296 # Create a text/plain message
297 msg = email.mime.Multipart.MIMEMultipart()
298
299 msg['Subject'] = subject
300 msg['From'] = main.sender
301 msg['To'] = main.mail
302
303 smtp = smtplib.SMTP( main.smtp )
304 smtp.starttls()
305 smtp.login( main.sender, main.senderPwd )
306 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
307 smtp.quit()
308 except Exception:
309 main.log.exception( "" )
310 return main.FALSE
311 return main.TRUE
312
313 def parse(self,fileName):
314 '''
315 This will parse the params or topo or cfg file and return content in the file as Dictionary
316 '''
317 self.fileName = fileName
318 matchFileName = re.match(r'(.*)\.(cfg|params|topo)',self.fileName,re.M|re.I)
319 if matchFileName:
320 try :
321 parsedInfo = ConfigObj(self.fileName)
322 return parsedInfo
323 except StandardError:
324 print "There is no such file to parse "+fileName
325 else:
326 return 0
327
328 def retry( self, f, retValue, args=(), kwargs={},
329 sleep=1, attempts=2, randomTime=False ):
330 """
331 Given a function and bad return values, retry will retry a function
332 until successful or give up after a certain number of attempts.
333
334 Arguments:
335 f - a callable object
336 retValue - Return value(s) of f to retry on. This can be a list or an
337 object.
338 args - A tuple containing the arguments of f.
339 kwargs - A dictionary containing the keyword arguments of f.
340 sleep - Time in seconds to sleep between retries. If random is True,
341 this is the max time to wait. Defaults to 1 second.
342 attempts - Max number of attempts before returning. If set to 1,
343 f will only be called once. Defaults to 2 trys.
344 random - Boolean indicating if the wait time is random between 0
345 and sleep or exactly sleep seconds. Defaults to False.
346 """
347 # TODO: be able to pass in a conditional statement(s). For example:
348 # retCondition = "< 7"
349 # Then we do something like 'if eval( "ret " + retCondition ):break'
350 try:
351 assert attempts > 0, "attempts must be more than 1"
352 assert sleep >= 0, "sleep must be >= 0"
353 if not isinstance( retValue, list ):
354 retValue = [ retValue ]
355 for i in range( 0, attempts ):
356 ret = f( *args, **kwargs )
357 if ret not in retValue:
358 # NOTE that False in [ 0 ] == True
359 break
360 if randomTime:
361 sleeptime = random.randint( 0, sleep )
362 else:
363 sleeptime = sleep
364 time.sleep( sleeptime )
365 return ret
366 except AssertionError:
367 main.log.exception( "Invalid arguements for retry: " )
368 main.cleanup()
369 main.exit()
370 except Exception:
371 main.log.exception( "Uncaught exception in retry: " )
372 main.cleanup()
373 main.exit()
374
375utilities = Utilities()
376
377if __name__ != "__main__":
378 import sys
379
380 sys.modules[__name__] = Utilities()