blob: e2209a95308878519c57f30c0046db2b0fa3c4e0 [file] [log] [blame]
Chetan Gaonkercfcce782016-05-10 10:10:42 -07001#
2# Copyright 2016-present Ciena Corporation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15#
Chetan Gaonker3ff8eae2016-04-12 14:50:26 -070016#!/usr/bin/env python
17'''
18Created on 23-Oct-2012
19
20@authors: Anil Kumar (anilkumar.s@paxterrasolutions.com),
21 Raghav Kashyap(raghavkashyap@paxterrasolutions.com)
22
23
24
25 TestON is free software: you can redistribute it and/or modify
26 it under the terms of the GNU General Public License as published by
27 the Free Software Foundation, either version 2 of the License, or
28 (at your option) any later version.
29
30 TestON is distributed in the hope that it will be useful,
31 but WITHOUT ANY WARRANTY; without even the implied warranty of
32 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
33 GNU General Public License for more details.
34
35 You should have received a copy of the GNU General Public License
36 along with TestON. If not, see <http://www.gnu.org/licenses/>.
37
38
39Utilities will take care about the basic functions like :
40 * Extended assertion,
41 * parse_args for key-value pair handling
42 * Parsing the params or topology file.
43
44'''
45import re
46from configobj import ConfigObj
47import ast
48import smtplib
49
50import email
51import os
52import email.mime.application
53import time
54import random
55from clicommon import *
56
57class Utilities:
58 '''
59 Utilities will take care about the basic functions like :
60 * Extended assertion,
61 * parse_args for key-value pair handling
62 * Parsing the params or topology file.
63 '''
64
65 def __init__(self):
66 self.wrapped = sys.modules[__name__]
67
68 def __getattr__(self, name):
69 '''
70 This will invoke, if the attribute wasn't found the usual ways.
71 Here it will look for assert_attribute and will execute when AttributeError occurs.
72 It will return the result of the assert_attribute.
73 '''
74 try:
75 return getattr(self.wrapped, name)
76 except AttributeError:
77 def assertHandling(**kwargs):
78 nameVar = re.match("^assert",name,flags=0)
79 matchVar = re.match("assert(_not_|_)(equals|matches|greater|lesser)",name,flags=0)
80 notVar = 0
81 operators = ""
82
83 try :
84 if matchVar.group(1) == "_not_" and matchVar.group(2) :
85 notVar = 1
86 operators = matchVar.group(2)
87 elif matchVar.group(1) == "_" and matchVar.group(2):
88 operators = matchVar.group(2)
89 except AttributeError:
90 if matchVar==None and nameVar:
91 operators ='equals'
92 result = self._assert(NOT=notVar,operator=operators,**kwargs)
93 if result == main.TRUE:
94 main.log.info("Assertion Passed")
95 main.STEPRESULT = main.TRUE
96 elif result == main.FALSE:
97 main.log.warn("Assertion Failed")
98 main.STEPRESULT = main.FALSE
99 else:
100 main.log.error("There is an Error in Assertion")
101 main.STEPRESULT = main.ERROR
102 return result
103 return assertHandling
104
105 def _assert (self,**assertParam):
106 '''
107 It will take the arguments :
108 expect:'Expected output'
109 actual:'Actual output'
110 onpass:'Action or string to be triggered or displayed respectively when the assert passed'
111 onfail:'Action or string to be triggered or displayed respectively when the assert failed'
112 not:'optional argument to specify the negation of the each assertion type'
113 operator:'assertion type will be defined by using operator. Like equal , greater, lesser, matches.'
114
115 It will return the assertion result.
116
117 '''
118
119 arguments = self.parse_args(["EXPECT","ACTUAL","ONPASS","ONFAIL","NOT","OPERATOR"],**assertParam)
120
121 result = 0
122 valuetype = ''
123 operation = "not "+ str(arguments["OPERATOR"]) if arguments['NOT'] and arguments['NOT'] == 1 else arguments["OPERATOR"]
124 operators = {'equals':{'STR':'==','NUM':'=='}, 'matches' : '=~', 'greater':'>' ,'lesser':'<'}
125
126 expectMatch = re.match('^\s*[+-]?0(e0)?\s*$', str(arguments["EXPECT"]), re.I+re.M)
127 if not ((not expectMatch) and (arguments["EXPECT"]==0)):
128 valuetype = 'NUM'
129 else :
130 if arguments["OPERATOR"] == 'greater' or arguments["OPERATOR"] == 'lesser':
131 main.log.error("Numeric comparison on strings is not possibele")
132 return main.ERROR
133
134 valuetype = 'STR'
135 arguments["ACTUAL"] = str(arguments["ACTUAL"])
136 if arguments["OPERATOR"] != 'matches':
137 arguments["EXPECT"] = str(arguments["EXPECT"])
138
139 try :
140 opcode = operators[str(arguments["OPERATOR"])][valuetype] if arguments["OPERATOR"] == 'equals' else operators[str(arguments["OPERATOR"])]
141
142 except KeyError as e:
143 print "Key Error in assertion"
144 print e
145 return main.FALSE
146
147 if opcode == '=~':
148 try:
149 assert re.search(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
150 result = main.TRUE
151 except AssertionError:
152 try :
153 assert re.match(str(arguments["EXPECT"]),str(arguments["ACTUAL"]))
154 result = main.TRUE
155 except AssertionError:
156 main.log.error("Assertion Failed")
157 result = main.FALSE
158 else :
159 try:
160 if str(opcode)=="==":
161 main.log.info("Verifying the Expected is equal to the actual or not using assert_equal")
162 if (arguments["EXPECT"] == arguments["ACTUAL"]):
163 result = main.TRUE
164 else :
165 result = main.FALSE
166 elif str(opcode) == ">":
167 main.log.info("Verifying the Expected is Greater than the actual or not using assert_greater")
168 if (ast.literal_eval(arguments["EXPECT"]) > ast.literal_eval(arguments["ACTUAL"])) :
169 result = main.TRUE
170 else :
171 result = main.FALSE
172 elif str(opcode) == "<":
173 main.log.info("Verifying the Expected is Lesser than the actual or not using assert_lesser")
174 if (ast.literal_eval(arguments["EXPECT"]) < ast.literal_eval(arguments["ACTUAL"])):
175 result = main.TRUE
176 else :
177 result = main.FALSE
178 except AssertionError:
179 main.log.error("Assertion Failed")
180 result = main.FALSE
181 result = result if result else 0
182 result = not result if arguments["NOT"] and arguments["NOT"] == 1 else result
183 resultString = ""
184 if result :
185 resultString = str(resultString) + "PASS"
186 main.log.info(arguments["ONPASS"])
187 else :
188 resultString = str(resultString) + "FAIL"
189 if not isinstance(arguments["ONFAIL"],str):
190 eval(str(arguments["ONFAIL"]))
191 else :
192 main.log.error(arguments["ONFAIL"])
193 main.log.report(arguments["ONFAIL"])
194 main.onFailMsg = arguments[ 'ONFAIL' ]
195
196 msg = arguments["ON" + str(resultString)]
197
198 if not isinstance(msg,str):
199 try:
200 eval(str(msg))
201 except SyntaxError as e:
202 print "function definition is not right"
203 print e
204
205 main.last_result = result
206 if main.stepResults[2]:
207 main.stepResults[2][-1] = result
208 try:
209 main.stepResults[3][-1] = arguments[ 'ONFAIL' ]
210 except AttributeError:
211 pass
212 else:
213 main.log.warn( "Assertion called before a test step" )
214 return result
215
216 def parse_args(self,args, **kwargs):
217 '''
218 It will accept the (key,value) pair and will return the (key,value) pairs with keys in uppercase.
219 '''
220 newArgs = {}
221 for key,value in kwargs.iteritems():
222 if isinstance(args,list) and str.upper(key) in args:
223 for each in args:
224 if each==str.upper(key):
225 newArgs [str(each)] = value
226 elif each != str.upper(key) and (newArgs.has_key(str(each)) == False ):
227 newArgs[str(each)] = None
228
229 return newArgs
230
231 def send_mail(self):
232 # Create a text/plain message
233 msg = email.mime.Multipart.MIMEMultipart()
234 try :
235 if main.test_target:
236 sub = "Result summary of \"" + main.TEST + "\" run on component \"" +\
237 main.test_target + "\" Version \"" +\
238 vars( main )[main.test_target].get_version() + "\": " +\
239 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
240 else :
241 sub = "Result summary of \"" + main.TEST + "\": " +\
242 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
243 except ( KeyError, AttributeError ):
244 sub = "Result summary of \"" + main.TEST + "\": " +\
245 str( main.TOTAL_TC_SUCCESS ) + "% Passed"
246
247 msg['Subject'] = sub
248 msg['From'] = main.sender
249 msg['To'] = main.mail
250
251 # The main body is just another attachment
252 body = email.mime.Text.MIMEText( main.logHeader + "\n" +
253 main.testResult)
254 msg.attach( body )
255
256 # Attachments
257 for filename in os.listdir( main.logdir ):
258 filepath = main.logdir + "/" + filename
259 fp = open( filepath, 'rb' )
260 att = email.mime.application.MIMEApplication( fp.read(),
261 _subtype="" )
262 fp.close()
263 att.add_header( 'Content-Disposition',
264 'attachment',
265 filename=filename )
266 msg.attach( att )
267 try:
268 smtp = smtplib.SMTP( main.smtp )
269 smtp.starttls()
270 smtp.login( main.sender, main.senderPwd )
271 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
272 smtp.quit()
273 except Exception:
274 main.log.exception( "Error sending email" )
275 return main.TRUE
276
277 def send_warning_email( self, subject=None ):
278 try:
279 if not subject:
280 subject = main.TEST + " PAUSED!"
281 # Create a text/plain message
282 msg = email.mime.Multipart.MIMEMultipart()
283
284 msg['Subject'] = subject
285 msg['From'] = main.sender
286 msg['To'] = main.mail
287
288 smtp = smtplib.SMTP( main.smtp )
289 smtp.starttls()
290 smtp.login( main.sender, main.senderPwd )
291 smtp.sendmail( msg['From'], [msg['To']], msg.as_string() )
292 smtp.quit()
293 except Exception:
294 main.log.exception( "" )
295 return main.FALSE
296 return main.TRUE
297
298 def parse(self,fileName):
299 '''
300 This will parse the params or topo or cfg file and return content in the file as Dictionary
301 '''
302 self.fileName = fileName
303 matchFileName = re.match(r'(.*)\.(cfg|params|topo)',self.fileName,re.M|re.I)
304 if matchFileName:
305 try :
306 parsedInfo = ConfigObj(self.fileName)
307 return parsedInfo
308 except StandardError:
309 print "There is no such file to parse "+fileName
310 else:
311 return 0
312
313 def retry( self, f, retValue, args=(), kwargs={},
314 sleep=1, attempts=2, randomTime=False ):
315 """
316 Given a function and bad return values, retry will retry a function
317 until successful or give up after a certain number of attempts.
318
319 Arguments:
320 f - a callable object
321 retValue - Return value(s) of f to retry on. This can be a list or an
322 object.
323 args - A tuple containing the arguments of f.
324 kwargs - A dictionary containing the keyword arguments of f.
325 sleep - Time in seconds to sleep between retries. If random is True,
326 this is the max time to wait. Defaults to 1 second.
327 attempts - Max number of attempts before returning. If set to 1,
328 f will only be called once. Defaults to 2 trys.
329 random - Boolean indicating if the wait time is random between 0
330 and sleep or exactly sleep seconds. Defaults to False.
331 """
332 # TODO: be able to pass in a conditional statement(s). For example:
333 # retCondition = "< 7"
334 # Then we do something like 'if eval( "ret " + retCondition ):break'
335 try:
336 assert attempts > 0, "attempts must be more than 1"
337 assert sleep >= 0, "sleep must be >= 0"
338 if not isinstance( retValue, list ):
339 retValue = [ retValue ]
340 for i in range( 0, attempts ):
341 ret = f( *args, **kwargs )
342 if ret not in retValue:
343 # NOTE that False in [ 0 ] == True
344 break
345 if randomTime:
346 sleeptime = random.randint( 0, sleep )
347 else:
348 sleeptime = sleep
349 time.sleep( sleeptime )
350 return ret
351 except AssertionError:
352 main.log.exception( "Invalid arguements for retry: " )
353 main.cleanup()
354 main.exit()
355 except Exception:
356 main.log.exception( "Uncaught exception in retry: " )
357 main.cleanup()
358 main.exit()
359
360utilities = Utilities()
361
362if __name__ != "__main__":
363 import sys
364
365 sys.modules[__name__] = Utilities()