blob: 15320daf95468e7ab163d32abdb70323bd644d82 [file] [log] [blame]
Chetan Gaonker3ff8eae2016-04-12 14:50:26 -07001#!/usr/bin/env python
2'''
3Created on 23-Oct-2012
4
5@authors: Anil Kumar (anilkumar.s@paxterrasolutions.com),
6 Raghav Kashyap(raghavkashyap@paxterrasolutions.com)
7
8
9
10 TestON is free software: you can redistribute it and/or modify
11 it under the terms of the GNU General Public License as published by
12 the Free Software Foundation, either version 2 of the License, or
13 (at your option) any later version.
14
15 TestON is distributed in the hope that it will be useful,
16 but WITHOUT ANY WARRANTY; without even the implied warranty of
17 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 GNU General Public License for more details.
19
20 You should have received a copy of the GNU General Public License
21 along with TestON. If not, see <http://www.gnu.org/licenses/>.
22
23
24Utilities will take care about the basic functions like :
25 * Extended assertion,
26 * parse_args for key-value pair handling
27 * Parsing the params or topology file.
28
29'''
30import re
31from configobj import ConfigObj
32import ast
33import smtplib
34
35import email
36import os
37import email.mime.application
38import time
39import random
40from clicommon import *
41
42class Utilities:
43 '''
44 Utilities will take care about the basic functions like :
45 * Extended assertion,
46 * parse_args for key-value pair handling
47 * Parsing the params or topology file.
48 '''
49
50 def __init__(self):
51 self.wrapped = sys.modules[__name__]
52
53 def __getattr__(self, name):
54 '''
55 This will invoke, if the attribute wasn't found the usual ways.
56 Here it will look for assert_attribute and will execute when AttributeError occurs.
57 It will return the result of the assert_attribute.
58 '''
59 try:
60 return getattr(self.wrapped, name)
61 except AttributeError:
62 def assertHandling(**kwargs):
63 nameVar = re.match("^assert",name,flags=0)
64 matchVar = re.match("assert(_not_|_)(equals|matches|greater|lesser)",name,flags=0)
65 notVar = 0
66 operators = ""
67
68 try :
69 if matchVar.group(1) == "_not_" and matchVar.group(2) :
70 notVar = 1
71 operators = matchVar.group(2)
72 elif matchVar.group(1) == "_" and matchVar.group(2):
73 operators = matchVar.group(2)
74 except AttributeError:
75 if matchVar==None and nameVar:
76 operators ='equals'
77 result = self._assert(NOT=notVar,operator=operators,**kwargs)
78 if result == main.TRUE:
79 main.log.info("Assertion Passed")
80 main.STEPRESULT = main.TRUE
81 elif result == main.FALSE:
82 main.log.warn("Assertion Failed")
83 main.STEPRESULT = main.FALSE
84 else:
85 main.log.error("There is an Error in Assertion")
86 main.STEPRESULT = main.ERROR
87 return result
88 return assertHandling
89
90 def _assert (self,**assertParam):
91 '''
92 It will take the arguments :
93 expect:'Expected output'
94 actual:'Actual output'
95 onpass:'Action or string to be triggered or displayed respectively when the assert passed'
96 onfail:'Action or string to be triggered or displayed respectively when the assert failed'
97 not:'optional argument to specify the negation of the each assertion type'
98 operator:'assertion type will be defined by using operator. Like equal , greater, lesser, matches.'
99
100 It will return the assertion result.
101
102 '''
103
104 arguments = self.parse_args(["EXPECT","ACTUAL","ONPASS","ONFAIL","NOT","OPERATOR"],**assertParam)
105
106 result = 0
107 valuetype = ''
108 operation = "not "+ str(arguments["OPERATOR"]) if arguments['NOT'] and arguments['NOT'] == 1 else arguments["OPERATOR"]
109 operators = {'equals':{'STR':'==','NUM':'=='}, 'matches' : '=~', 'greater':'>' ,'lesser':'<'}
110
111 expectMatch = re.match('^\s*[+-]?0(e0)?\s*$', str(arguments["EXPECT"]), re.I+re.M)
112 if not ((not expectMatch) and (arguments["EXPECT"]==0)):
113 valuetype = 'NUM'
114 else :
115 if arguments["OPERATOR"] == 'greater' or arguments["OPERATOR"] == 'lesser':
116 main.log.error("Numeric comparison on strings is not possibele")
117 return main.ERROR
118
119 valuetype = 'STR'
120 arguments["ACTUAL"] = str(arguments["ACTUAL"])
121 if arguments["OPERATOR"] != 'matches':
122 arguments["EXPECT"] = str(arguments["EXPECT"])
123
124 try :
125 opcode = operators[str(arguments["OPERATOR"])][valuetype] if arguments["OPERATOR"] == 'equals' else operators[str(arguments["OPERATOR"])]
126
127 except KeyError as e:
128 print "Key Error in assertion"
129 print e
130 return main.FALSE
131
132 if opcode == '=~':
133 try:
134 assert re.search(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
135 result = main.TRUE
136 except AssertionError:
137 try :
138 assert re.match(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
139 result = main.TRUE
140 except AssertionError:
141 main.log.error("Assertion Failed")
142 result = main.FALSE
143 else :
144 try:
145 if str(opcode)=="==":
146 main.log.info("Verifying the Expected is equal to the actual or not using assert_equal")
147 if (arguments["EXPECT"] == arguments["ACTUAL"]):
148 result = main.TRUE
149 else :
150 result = main.FALSE
151 elif str(opcode) == ">":
152 main.log.info("Verifying the Expected is Greater than the actual or not using assert_greater")
153 if (ast.literal_eval(arguments["EXPECT"]) > ast.literal_eval(arguments["ACTUAL"])) :
154 result = main.TRUE
155 else :
156 result = main.FALSE
157 elif str(opcode) == "<":
158 main.log.info("Verifying the Expected is Lesser than the actual or not using assert_lesser")
159 if (ast.literal_eval(arguments["EXPECT"]) < ast.literal_eval(arguments["ACTUAL"])):
160 result = main.TRUE
161 else :
162 result = main.FALSE
163 except AssertionError:
164 main.log.error("Assertion Failed")
165 result = main.FALSE
166 result = result if result else 0
167 result = not result if arguments["NOT"] and arguments["NOT"] == 1 else result
168 resultString = ""
169 if result :
170 resultString = str(resultString) + "PASS"
171 main.log.info(arguments["ONPASS"])
172 else :
173 resultString = str(resultString) + "FAIL"
174 if not isinstance(arguments["ONFAIL"],str):
175 eval(str(arguments["ONFAIL"]))
176 else :
177 main.log.error(arguments["ONFAIL"])
178 main.log.report(arguments["ONFAIL"])
179 main.onFailMsg = arguments[ 'ONFAIL' ]
180
181 msg = arguments["ON" + str(resultString)]
182
183 if not isinstance(msg,str):
184 try:
185 eval(str(msg))
186 except SyntaxError as e:
187 print "function definition is not right"
188 print e
189
190 main.last_result = result
191 if main.stepResults[2]:
192 main.stepResults[2][-1] = result
193 try:
194 main.stepResults[3][-1] = arguments[ 'ONFAIL' ]
195 except AttributeError:
196 pass
197 else:
198 main.log.warn( "Assertion called before a test step" )
199 return result
200
201 def parse_args(self,args, **kwargs):
202 '''
203 It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase.
204 '''
205 newArgs = {}
206 for key,value in kwargs.iteritems():
207 if isinstance(args,list) and str.upper(key) in args:
208 for each in args:
209 if each==str.upper(key):
210 newArgs [str(each)] = value
211 elif each != str.upper(key) and (newArgs.has_key(str(each)) == False ):
212 newArgs[str(each)] = None
213
214 return newArgs
215
216 def send_mail(self):
217 # Create a text/plain message
218 msg = email.mime.Multipart.MIMEMultipart()
219 try :
220 if main.test_target:
221 sub = "Result summary of \"" + main.TEST + "\" run on component \"" +\
222 main.test_target + "\" Version \"" +\
223 vars( main )[main.test_target].get_version() + "\": " +\
224 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
225 else :
226 sub = "Result summary of \"" + main.TEST + "\": " +\
227 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
228 except ( KeyError, AttributeError ):
229 sub = "Result summary of \"" + main.TEST + "\": " +\
230 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
231
232 msg['Subject'] = sub
233 msg['From'] = main.sender
234 msg['To'] = main.mail
235
236 # The main body is just another attachment
237 body = email.mime.Text.MIMEText( main.logHeader + "\n" +
238 main.testResult)
239 msg.attach( body )
240
241 # Attachments
242 for filename in os.listdir( main.logdir ):
243 filepath = main.logdir + "/" + filename
244 fp = open( filepath, 'rb' )
245 att = email.mime.application.MIMEApplication( fp.read(),
246 _subtype="" )
247 fp.close()
248 att.add_header( 'Content-Disposition',
249 'attachment',
250 filename=filename )
251 msg.attach( att )
252 try:
253 smtp = smtplib.SMTP( main.smtp )
254 smtp.starttls()
255 smtp.login( main.sender, main.senderPwd )
256 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
257 smtp.quit()
258 except Exception:
259 main.log.exception( "Error sending email" )
260 return main.TRUE
261
262 def send_warning_email( self, subject=None ):
263 try:
264 if not subject:
265 subject = main.TEST + " PAUSED!"
266 # Create a text/plain message
267 msg = email.mime.Multipart.MIMEMultipart()
268
269 msg['Subject'] = subject
270 msg['From'] = main.sender
271 msg['To'] = main.mail
272
273 smtp = smtplib.SMTP( main.smtp )
274 smtp.starttls()
275 smtp.login( main.sender, main.senderPwd )
276 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
277 smtp.quit()
278 except Exception:
279 main.log.exception( "" )
280 return main.FALSE
281 return main.TRUE
282
283 def parse(self,fileName):
284 '''
285 This will parse the params or topo or cfg file and return content in the file as Dictionary
286 '''
287 self.fileName = fileName
288 matchFileName = re.match(r'(.*)\.(cfg|params|topo)',self.fileName,re.M|re.I)
289 if matchFileName:
290 try :
291 parsedInfo = ConfigObj(self.fileName)
292 return parsedInfo
293 except StandardError:
294 print "There is no such file to parse "+fileName
295 else:
296 return 0
297
298 def retry( self, f, retValue, args=(), kwargs={},
299 sleep=1, attempts=2, randomTime=False ):
300 """
301 Given a function and bad return values, retry will retry a function
302 until successful or give up after a certain number of attempts.
303
304 Arguments:
305 f - a callable object
306 retValue - Return value(s) of f to retry on. This can be a list or an
307 object.
308 args - A tuple containing the arguments of f.
309 kwargs - A dictionary containing the keyword arguments of f.
310 sleep - Time in seconds to sleep between retries. If random is True,
311 this is the max time to wait. Defaults to 1 second.
312 attempts - Max number of attempts before returning. If set to 1,
313 f will only be called once. Defaults to 2 trys.
314 random - Boolean indicating if the wait time is random between 0
315 and sleep or exactly sleep seconds. Defaults to False.
316 """
317 # TODO: be able to pass in a conditional statement(s). For example:
318 # retCondition = "< 7"
319 # Then we do something like 'if eval( "ret " + retCondition ):break'
320 try:
321 assert attempts > 0, "attempts must be more than 1"
322 assert sleep >= 0, "sleep must be >= 0"
323 if not isinstance( retValue, list ):
324 retValue = [ retValue ]
325 for i in range( 0, attempts ):
326 ret = f( *args, **kwargs )
327 if ret not in retValue:
328 # NOTE that False in [ 0 ] == True
329 break
330 if randomTime:
331 sleeptime = random.randint( 0, sleep )
332 else:
333 sleeptime = sleep
334 time.sleep( sleeptime )
335 return ret
336 except AssertionError:
337 main.log.exception( "Invalid arguements for retry: " )
338 main.cleanup()
339 main.exit()
340 except Exception:
341 main.log.exception( "Uncaught exception in retry: " )
342 main.cleanup()
343 main.exit()
344
345utilities = Utilities()
346
347if __name__ != "__main__":
348 import sys
349
350 sys.modules[__name__] = Utilities()