Cleanup for using cord-tester logger over scapy logger which was an ugly hack.
Change-Id: I8af565f8eb4f69ddc6605e717a0c83772cc9417f
diff --git a/src/test/utils/EapTLS.py b/src/test/utils/EapTLS.py
index c92721a..6a68209 100644
--- a/src/test/utils/EapTLS.py
+++ b/src/test/utils/EapTLS.py
@@ -27,10 +27,11 @@
from nose.tools import *
from CordTestBase import CordTester
from CordContainer import *
+from CordTestUtils import log_test
import re
import time
-log.setLevel('INFO')
+log_test.setLevel('INFO')
def bytes_to_num(data):
try:
@@ -130,7 +131,7 @@
def handle_server_hello_done(self, server_hello_done):
if server_hello_done[-4:] == self.server_hello_done_signature:
- log.info('server hello received')
+ log_test.info('server hello received')
self.server_hello_done_received = True
def __init__(self, intf = 'veth0', client_cert = None, client_priv_key = None,
@@ -215,8 +216,8 @@
self.pkt_map[pkt_type][self.DATA_IDX] += data
if reassembled is True:
self.pkt_map[pkt_type][self.CB_IDX](self.pkt_map[pkt_type][self.DATA_IDX])
- log.info('Appending packet type %02x to packet history of len %d'
- %(ord(pkt_type), len(self.pkt_map[pkt_type][self.DATA_IDX])))
+ log_test.info('Appending packet type %02x to packet history of len %d'
+ %(ord(pkt_type), len(self.pkt_map[pkt_type][self.DATA_IDX])))
self.pkt_history.append(self.pkt_map[pkt_type][self.DATA_IDX])
data = ''.join(self.pkt_map[pkt_type][:self.DATA_IDX+1])
self.load_tls_record(data, pkt_type = pkt_type)
@@ -225,7 +226,7 @@
def tlsFail(self):
##Force a failure
- log.info('entering into testFail function')
+ log_test.info('entering into testFail function')
self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_FINISHED
self.nextState = self.tlsStateTable.ST_EAP_TLS_FINISHED
self.failTest = True
@@ -272,7 +273,7 @@
reassembled = False)
self.pending_bytes -= len(tls_data) - 5 - type_hdrlen
self.pkt_last = pkt_type
- log.info('Pending bytes left %d' %(self.pending_bytes))
+ log_test.info('Pending bytes left %d' %(self.pending_bytes))
assert self.pending_bytes > 0
elif tls_data[0] == self.SERVER_HELLO_DONE:
self.server_hello_done_eap_id = pkt[EAP].id
@@ -295,23 +296,23 @@
#if self.src_mac == 'mcast': self.setup(src_mac='mcast')
#if self.src_mac == 'zeros': self.setup(src_mac='zeros')
#if self.src_mac == 'default': self.setup(src_mac='default')
- log.info('source mac is %s'%self.src_mac)
+ log_test.info('source mac is %s'%self.src_mac)
self.setup(src_mac=self.src_mac)
self.nextEvent = self.tlsEventTable.EVT_EAP_START
def _eapStart(self):
- log.info('_eapStart method started')
+ log_test.info('_eapStart method started')
self.eapol_start()
self.nextEvent = self.tlsEventTable.EVT_EAP_ID_REQ
def _eapIdReq(self):
- log.info( 'Inside EAP ID Req' )
+ log_test.info( 'Inside EAP ID Req' )
def eapol_cb(pkt):
- log.info('Got EAPOL packet with type id and code request')
- log.info('Packet code: %d, type: %d, id: %d', pkt[EAP].code, pkt[EAP].type, pkt[EAP].id)
- log.info("<====== Send EAP Response with identity = %s ================>" % USER)
+ log_test.info('Got EAPOL packet with type id and code request')
+ log_test.info('Packet code: %d, type: %d, id: %d', pkt[EAP].code, pkt[EAP].type, pkt[EAP].id)
+ log_test.info("<====== Send EAP Response with identity = %s ================>" % USER)
if self.id_mismatch_in_identifier_response_packet:
- log.info('\nSending invalid id field in EAP Identity Response packet')
+ log_test.info('\nSending invalid id field in EAP Identity Response packet')
self.eapol_id_req(pkt[EAP].id+10, USER)
else:
self.eapol_id_req(pkt[EAP].id, USER)
@@ -328,7 +329,7 @@
def _eapTlsHelloReq(self):
def eapol_cb(pkt):
- log.info('Got hello request for id %d', pkt[EAP].id)
+ log_test.info('Got hello request for id %d', pkt[EAP].id)
self.client_hello = TLSClientHello(version= self.version,
gmt_unix_time=self.gmt_unix_time,
random_bytes= '\xAB' * 28,
@@ -340,10 +341,10 @@
cipher_suites=[self.cipher_suite]
)
if self.invalid_client_hello_handshake_type:
- log.info('sending server_hello instead of client_hello handshape type in client hello packet')
+ log_test.info('sending server_hello instead of client_hello handshape type in client hello packet')
client_hello_data = TLSHandshake(type='server_hello')/self.client_hello
elif self.invalid_client_hello_handshake_length:
- log.info('sending TLS Handshake message with zero length field in client hello packet')
+ log_test.info('sending TLS Handshake message with zero length field in client hello packet')
client_hello_data = TLSHandshake(length=0)/self.client_hello
else:
client_hello_data = TLSHandshake()/self.client_hello
@@ -354,14 +355,14 @@
else:
reqdata = TLSRecord()/client_hello_data
self.load_tls_record(str(reqdata))
- log.info("Sending Client Hello TLS payload of len %d, id %d" %(len(reqdata),pkt[EAP].id))
+ log_test.info("Sending Client Hello TLS payload of len %d, id %d" %(len(reqdata),pkt[EAP].id))
if self.id_mismatch_in_client_hello_packet:
- log.info('\nsending invalid id field in client hello packet')
+ log_test.info('\nsending invalid id field in client hello packet')
eap_payload = self.eapTLS(EAP_RESPONSE, pkt[EAP].id+10, TLS_LENGTH_INCLUDED, str(reqdata))
else:
eap_payload = self.eapTLS(EAP_RESPONSE, pkt[EAP].id, TLS_LENGTH_INCLUDED, str(reqdata))
if self.dont_send_client_hello:
- log.info('\nskipping client hello packet sending part')
+ log_test.info('\nskipping client hello packet sending part')
pass
else:
self.eapol_send(EAPOL_EAPPACKET, eap_payload)
@@ -405,7 +406,7 @@
return to_raw(TLSPlaintext(data = 'GET / HTTP/1.1\r\nHOST: localhost\r\n\r\n'), self.tls_ctx)
def _eapTlsCertReq(self):
- log.info('Receiving server certificates')
+ log_test.info('Receiving server certificates')
while self.server_hello_done_received == False:
r = self.eapol_scapy_recv(cb = self.eapol_server_hello_cb,
lfilter =
@@ -414,7 +415,7 @@
if len(r) == 0:
self.tlsFail()
return r
- log.info('Sending client certificate request')
+ log_test.info('Sending client certificate request')
rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
if self.client_cert:
der_cert = rex_pem.findall(self.client_cert)[0].decode("base64")
@@ -433,17 +434,17 @@
self.pkt_history.append(str(client_key_ex_data))
verify_signature = self.get_verify_signature(self.client_priv_key)
if self.invalid_cert_req_handshake:
- log.info("sending 'certificate-request' type of handshake message instead of 'certificate-verify' type")
+ log_test.info("sending 'certificate-request' type of handshake message instead of 'certificate-verify' type")
client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_REQUEST)/verify_signature
else:
client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_VERIFY)/verify_signature
if self.incorrect_tlsrecord_type_cert_req:
- log.info("sending TLS Record type as ALERT instead of HANDSHAKE in certificate request packet")
+ log_test.info("sending TLS Record type as ALERT instead of HANDSHAKE in certificate request packet")
client_cert_record = TLSRecord(content_type=TLSContentType.ALERT)/client_cert_verify
else:
client_cert_record = TLSRecord(content_type=TLSContentType.HANDSHAKE)/client_cert_verify
self.pkt_history.append(str(client_cert_verify))
- #log.info('TLS ctxt: %s' %self.tls_ctx)
+ #log_test.info('TLS ctxt: %s' %self.tls_ctx)
client_ccs = TLSRecord(version="TLS_1_0")/TLSChangeCipherSpec()
enc_handshake_msg = self.get_encrypted_handshake_msg()
if self.invalid_content_type:
@@ -452,9 +453,9 @@
handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
reqdata += handshake_msg
- log.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
+ log_test.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
if self.dont_send_client_certificate:
- log.info('\nskipping sending client certificate part')
+ log_test.info('\nskipping sending client certificate part')
pass
else:
status = self.eapFragmentSend(EAP_RESPONSE, self.server_hello_done_eap_id, TLS_LENGTH_INCLUDED,
@@ -464,7 +465,7 @@
def _eapTlsCertReq_delay(self):
self.server_hello_done_received = True
- log.info('Sending client certificate request')
+ log_test.info('Sending client certificate request')
rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
if self.client_cert:
@@ -488,19 +489,19 @@
verify_signature = self.get_verify_signature(self.client_priv_key)
if self.invalid_cert_req_handshake:
- log.info("Sending 'certificate-request' type of handshake message instead of 'certificate-verify' type")
+ log_test.info("Sending 'certificate-request' type of handshake message instead of 'certificate-verify' type")
client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_REQUEST)/verify_signature
else:
client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_VERIFY)/verify_signature
if self.incorrect_tlsrecord_type_cert_req:
- log.info("Sending TLS Record type as ALERT instead of HANDSHAKE in certificate request packet")
+ log_test.info("Sending TLS Record type as ALERT instead of HANDSHAKE in certificate request packet")
client_cert_record = TLSRecord(content_type=TLSContentType.ALERT)/client_cert_verify
else:
client_cert_record = TLSRecord(content_type=TLSContentType.HANDSHAKE)/client_cert_verify
self.pkt_history.append(str(client_cert_verify))
- #log.info('TLS ctxt: %s' %self.tls_ctx)
+ #log_test.info('TLS ctxt: %s' %self.tls_ctx)
client_ccs = TLSRecord(version="TLS_1_0")/TLSChangeCipherSpec()
enc_handshake_msg = self.get_encrypted_handshake_msg()
@@ -510,10 +511,10 @@
handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
reqdata += handshake_msg
- log.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
+ log_test.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
if self.dont_send_client_certificate:
- log.info('\nSkipping sending client certificate part')
+ log_test.info('\nSkipping sending client certificate part')
pass
else:
status = self.eapFragmentSend(EAP_RESPONSE, self.server_hello_done_eap_id, TLS_LENGTH_INCLUDED,
@@ -524,11 +525,11 @@
def _eapTlsChangeCipherSpec(self):
def eapol_cb(pkt):
r = str(pkt)
- log.info('data received in change cipher spec function is %s'%pkt.show())
+ log_test.info('data received in change cipher spec function is %s'%pkt.show())
tls_data = r[self.TLS_OFFSET:]
- log.info('Verifying TLS Change Cipher spec record type %x' %ord(tls_data[0]))
+ log_test.info('Verifying TLS Change Cipher spec record type %x' %ord(tls_data[0]))
assert tls_data[0] == self.CHANGE_CIPHER
- log.info('Handshake finished. Sending empty data')
+ log_test.info('Handshake finished. Sending empty data')
eap_payload = self.eapTLS(EAP_RESPONSE, pkt[EAP].id, 0, '')
self.eapol_send(EAPOL_EAPPACKET, eap_payload)
@@ -544,7 +545,7 @@
def _eapTlsFinished(self):
self.nextEvent = None
def eapol_cb(pkt):
- log.info('Server authentication successfull')
+ log_test.info('Server authentication successfull')
timeout = 5
if self.failTest is True: