blob: 77fd2601c50952102e149a8e5044d87461915641 [file] [log] [blame]
Chetan Gaonkercfcce782016-05-10 10:10:42 -07001#
2# Copyright 2016-present Ciena Corporation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Chetan Gaonker3ff8eae2016-04-12 14:50:26 -070016'''
17Created on 23-Oct-2012
18
19@authors: Anil Kumar (anilkumar.s@paxterrasolutions.com),
20 Raghav Kashyap(raghavkashyap@paxterrasolutions.com)
21
22
23
24 TestON is free software: you can redistribute it and/or modify
25 it under the terms of the GNU General Public License as published by
26 the Free Software Foundation, either version 2 of the License, or
27 (at your option) any later version.
28
29 TestON is distributed in the hope that it will be useful,
30 but WITHOUT ANY WARRANTY; without even the implied warranty of
31 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
32 GNU General Public License for more details.
33
34 You should have received a copy of the GNU General Public License
35 along with TestON. If not, see <http://www.gnu.org/licenses/>.
36
37
38Utilities will take care about the basic functions like :
39 * Extended assertion,
40 * parse_args for key-value pair handling
41 * Parsing the params or topology file.
42
43'''
44import re
45from configobj import ConfigObj
46import ast
47import smtplib
48
49import email
50import os
51import email.mime.application
52import time
53import random
54from clicommon import *
55
56class Utilities:
57 '''
58 Utilities will take care about the basic functions like :
59 * Extended assertion,
60 * parse_args for key-value pair handling
61 * Parsing the params or topology file.
62 '''
63
64 def __init__(self):
65 self.wrapped = sys.modules[__name__]
66
67 def __getattr__(self, name):
68 '''
69 This will invoke, if the attribute wasn't found the usual ways.
70 Here it will look for assert_attribute and will execute when AttributeError occurs.
71 It will return the result of the assert_attribute.
72 '''
73 try:
74 return getattr(self.wrapped, name)
75 except AttributeError:
76 def assertHandling(**kwargs):
77 nameVar = re.match("^assert",name,flags=0)
78 matchVar = re.match("assert(_not_|_)(equals|matches|greater|lesser)",name,flags=0)
79 notVar = 0
80 operators = ""
81
82 try :
83 if matchVar.group(1) == "_not_" and matchVar.group(2) :
84 notVar = 1
85 operators = matchVar.group(2)
86 elif matchVar.group(1) == "_" and matchVar.group(2):
87 operators = matchVar.group(2)
88 except AttributeError:
89 if matchVar==None and nameVar:
90 operators ='equals'
91 result = self._assert(NOT=notVar,operator=operators,**kwargs)
92 if result == main.TRUE:
93 main.log.info("Assertion Passed")
94 main.STEPRESULT = main.TRUE
95 elif result == main.FALSE:
96 main.log.warn("Assertion Failed")
97 main.STEPRESULT = main.FALSE
98 else:
99 main.log.error("There is an Error in Assertion")
100 main.STEPRESULT = main.ERROR
101 return result
102 return assertHandling
103
104 def _assert (self,**assertParam):
105 '''
106 It will take the arguments :
107 expect:'Expected output'
108 actual:'Actual output'
109 onpass:'Action or string to be triggered or displayed respectively when the assert passed'
110 onfail:'Action or string to be triggered or displayed respectively when the assert failed'
111 not:'optional argument to specify the negation of the each assertion type'
112 operator:'assertion type will be defined by using operator. Like equal , greater, lesser, matches.'
113
114 It will return the assertion result.
115
116 '''
117
118 arguments = self.parse_args(["EXPECT","ACTUAL","ONPASS","ONFAIL","NOT","OPERATOR"],**assertParam)
119
120 result = 0
121 valuetype = ''
122 operation = "not "+ str(arguments["OPERATOR"]) if arguments['NOT'] and arguments['NOT'] == 1 else arguments["OPERATOR"]
123 operators = {'equals':{'STR':'==','NUM':'=='}, 'matches' : '=~', 'greater':'>' ,'lesser':'<'}
124
125 expectMatch = re.match('^\s*[+-]?0(e0)?\s*$', str(arguments["EXPECT"]), re.I+re.M)
126 if not ((not expectMatch) and (arguments["EXPECT"]==0)):
127 valuetype = 'NUM'
128 else :
129 if arguments["OPERATOR"] == 'greater' or arguments["OPERATOR"] == 'lesser':
130 main.log.error("Numeric comparison on strings is not possibele")
131 return main.ERROR
132
133 valuetype = 'STR'
134 arguments["ACTUAL"] = str(arguments["ACTUAL"])
135 if arguments["OPERATOR"] != 'matches':
136 arguments["EXPECT"] = str(arguments["EXPECT"])
137
138 try :
139 opcode = operators[str(arguments["OPERATOR"])][valuetype] if arguments["OPERATOR"] == 'equals' else operators[str(arguments["OPERATOR"])]
140
141 except KeyError as e:
142 print "Key Error in assertion"
143 print e
144 return main.FALSE
145
146 if opcode == '=~':
147 try:
148 assert re.search(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
149 result = main.TRUE
150 except AssertionError:
151 try :
152 assert re.match(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
153 result = main.TRUE
154 except AssertionError:
155 main.log.error("Assertion Failed")
156 result = main.FALSE
157 else :
158 try:
159 if str(opcode)=="==":
160 main.log.info("Verifying the Expected is equal to the actual or not using assert_equal")
161 if (arguments["EXPECT"] == arguments["ACTUAL"]):
162 result = main.TRUE
163 else :
164 result = main.FALSE
165 elif str(opcode) == ">":
166 main.log.info("Verifying the Expected is Greater than the actual or not using assert_greater")
167 if (ast.literal_eval(arguments["EXPECT"]) > ast.literal_eval(arguments["ACTUAL"])) :
168 result = main.TRUE
169 else :
170 result = main.FALSE
171 elif str(opcode) == "<":
172 main.log.info("Verifying the Expected is Lesser than the actual or not using assert_lesser")
173 if (ast.literal_eval(arguments["EXPECT"]) < ast.literal_eval(arguments["ACTUAL"])):
174 result = main.TRUE
175 else :
176 result = main.FALSE
177 except AssertionError:
178 main.log.error("Assertion Failed")
179 result = main.FALSE
180 result = result if result else 0
181 result = not result if arguments["NOT"] and arguments["NOT"] == 1 else result
182 resultString = ""
183 if result :
184 resultString = str(resultString) + "PASS"
185 main.log.info(arguments["ONPASS"])
186 else :
187 resultString = str(resultString) + "FAIL"
188 if not isinstance(arguments["ONFAIL"],str):
189 eval(str(arguments["ONFAIL"]))
190 else :
191 main.log.error(arguments["ONFAIL"])
192 main.log.report(arguments["ONFAIL"])
193 main.onFailMsg = arguments[ 'ONFAIL' ]
194
195 msg = arguments["ON" + str(resultString)]
196
197 if not isinstance(msg,str):
198 try:
199 eval(str(msg))
200 except SyntaxError as e:
201 print "function definition is not right"
202 print e
203
204 main.last_result = result
205 if main.stepResults[2]:
206 main.stepResults[2][-1] = result
207 try:
208 main.stepResults[3][-1] = arguments[ 'ONFAIL' ]
209 except AttributeError:
210 pass
211 else:
212 main.log.warn( "Assertion called before a test step" )
213 return result
214
215 def parse_args(self,args, **kwargs):
216 '''
217 It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase.
218 '''
219 newArgs = {}
220 for key,value in kwargs.iteritems():
221 if isinstance(args,list) and str.upper(key) in args:
222 for each in args:
223 if each==str.upper(key):
224 newArgs [str(each)] = value
225 elif each != str.upper(key) and (newArgs.has_key(str(each)) == False ):
226 newArgs[str(each)] = None
227
228 return newArgs
229
230 def send_mail(self):
231 # Create a text/plain message
232 msg = email.mime.Multipart.MIMEMultipart()
233 try :
234 if main.test_target:
235 sub = "Result summary of \"" + main.TEST + "\" run on component \"" +\
236 main.test_target + "\" Version \"" +\
237 vars( main )[main.test_target].get_version() + "\": " +\
238 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
239 else :
240 sub = "Result summary of \"" + main.TEST + "\": " +\
241 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
242 except ( KeyError, AttributeError ):
243 sub = "Result summary of \"" + main.TEST + "\": " +\
244 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
245
246 msg['Subject'] = sub
247 msg['From'] = main.sender
248 msg['To'] = main.mail
249
250 # The main body is just another attachment
251 body = email.mime.Text.MIMEText( main.logHeader + "\n" +
252 main.testResult)
253 msg.attach( body )
254
255 # Attachments
256 for filename in os.listdir( main.logdir ):
257 filepath = main.logdir + "/" + filename
258 fp = open( filepath, 'rb' )
259 att = email.mime.application.MIMEApplication( fp.read(),
260 _subtype="" )
261 fp.close()
262 att.add_header( 'Content-Disposition',
263 'attachment',
264 filename=filename )
265 msg.attach( att )
266 try:
267 smtp = smtplib.SMTP( main.smtp )
268 smtp.starttls()
269 smtp.login( main.sender, main.senderPwd )
270 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
271 smtp.quit()
272 except Exception:
273 main.log.exception( "Error sending email" )
274 return main.TRUE
275
276 def send_warning_email( self, subject=None ):
277 try:
278 if not subject:
279 subject = main.TEST + " PAUSED!"
280 # Create a text/plain message
281 msg = email.mime.Multipart.MIMEMultipart()
282
283 msg['Subject'] = subject
284 msg['From'] = main.sender
285 msg['To'] = main.mail
286
287 smtp = smtplib.SMTP( main.smtp )
288 smtp.starttls()
289 smtp.login( main.sender, main.senderPwd )
290 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
291 smtp.quit()
292 except Exception:
293 main.log.exception( "" )
294 return main.FALSE
295 return main.TRUE
296
297 def parse(self,fileName):
298 '''
299 This will parse the params or topo or cfg file and return content in the file as Dictionary
300 '''
301 self.fileName = fileName
302 matchFileName = re.match(r'(.*)\.(cfg|params|topo)',self.fileName,re.M|re.I)
303 if matchFileName:
304 try :
305 parsedInfo = ConfigObj(self.fileName)
306 return parsedInfo
307 except StandardError:
308 print "There is no such file to parse "+fileName
309 else:
310 return 0
311
312 def retry( self, f, retValue, args=(), kwargs={},
313 sleep=1, attempts=2, randomTime=False ):
314 """
315 Given a function and bad return values, retry will retry a function
316 until successful or give up after a certain number of attempts.
317
318 Arguments:
319 f - a callable object
320 retValue - Return value(s) of f to retry on. This can be a list or an
321 object.
322 args - A tuple containing the arguments of f.
323 kwargs - A dictionary containing the keyword arguments of f.
324 sleep - Time in seconds to sleep between retries. If random is True,
325 this is the max time to wait. Defaults to 1 second.
326 attempts - Max number of attempts before returning. If set to 1,
327 f will only be called once. Defaults to 2 trys.
328 random - Boolean indicating if the wait time is random between 0
329 and sleep or exactly sleep seconds. Defaults to False.
330 """
331 # TODO: be able to pass in a conditional statement(s). For example:
332 # retCondition = "< 7"
333 # Then we do something like 'if eval( "ret " + retCondition ):break'
334 try:
335 assert attempts > 0, "attempts must be more than 1"
336 assert sleep >= 0, "sleep must be >= 0"
337 if not isinstance( retValue, list ):
338 retValue = [ retValue ]
339 for i in range( 0, attempts ):
340 ret = f( *args, **kwargs )
341 if ret not in retValue:
342 # NOTE that False in [ 0 ] == True
343 break
344 if randomTime:
345 sleeptime = random.randint( 0, sleep )
346 else:
347 sleeptime = sleep
348 time.sleep( sleeptime )
349 return ret
350 except AssertionError:
351 main.log.exception( "Invalid arguements for retry: " )
352 main.cleanup()
353 main.exit()
354 except Exception:
355 main.log.exception( "Uncaught exception in retry: " )
356 main.cleanup()
357 main.exit()
358
359utilities = Utilities()
360
361if __name__ != "__main__":
362 import sys
363
364 sys.modules[__name__] = Utilities()