Adding tls exchange debug logs to show interface for TLS exchange
Change-Id: Id1a46bd27e695ba950d6405bc79f3d1f2c10f6ab
diff --git a/src/test/utils/EapTLS.py b/src/test/utils/EapTLS.py
index 34cc6b8..dec36e0 100644
--- a/src/test/utils/EapTLS.py
+++ b/src/test/utils/EapTLS.py
@@ -131,7 +131,7 @@
def handle_server_hello_done(self, server_hello_done):
if server_hello_done[-4:] == self.server_hello_done_signature:
- log_test.info('server hello received')
+ log_test.info('server hello received over interface %s' %self.intf)
self.server_hello_done_received = True
def __init__(self, intf = 'veth0', client_cert = None, client_priv_key = None,
@@ -216,8 +216,8 @@
self.pkt_map[pkt_type][self.DATA_IDX] += data
if reassembled is True:
self.pkt_map[pkt_type][self.CB_IDX](self.pkt_map[pkt_type][self.DATA_IDX])
- log_test.info('Appending packet type %02x to packet history of len %d'
- %(ord(pkt_type), len(self.pkt_map[pkt_type][self.DATA_IDX])))
+ log_test.info('Interface %s, Appending packet type %02x to packet history of len %d'
+ %(self.intf, ord(pkt_type), len(self.pkt_map[pkt_type][self.DATA_IDX])))
self.pkt_history.append(self.pkt_map[pkt_type][self.DATA_IDX])
data = ''.join(self.pkt_map[pkt_type][:self.DATA_IDX+1])
self.load_tls_record(data, pkt_type = pkt_type)
@@ -226,7 +226,7 @@
def tlsFail(self):
##Force a failure
- log_test.info('entering into testFail function')
+ log_test.info('entering into testFail function for interface %s' %self.intf)
self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_FINISHED
self.nextState = self.tlsStateTable.ST_EAP_TLS_FINISHED
self.failTest = True
@@ -246,7 +246,7 @@
else:
self.pkt_update(self.pkt_last, tls_data)
self.pending_bytes -= len(tls_data)
- print('Offset: %d, pkt : %d, pending %d\n' %(offset, len(pkt), self.pending_bytes))
+ print('Interface: %s, Offset: %d, pkt : %d, pending %d\n' %(self.intf, offset, len(pkt), self.pending_bytes))
while self.pending_bytes == 0 and offset < len(pkt):
tls_data = r[offset:]
hexdump(tls_data)
@@ -273,7 +273,7 @@
reassembled = False)
self.pending_bytes -= len(tls_data) - 5 - type_hdrlen
self.pkt_last = pkt_type
- log_test.info('Pending bytes left %d' %(self.pending_bytes))
+ log_test.info('Interface: %s, Pending bytes left %d' %(self.intf, self.pending_bytes))
assert self.pending_bytes > 0
elif tls_data[0] == self.SERVER_HELLO_DONE:
self.server_hello_done_eap_id = pkt[EAP].id
@@ -296,23 +296,23 @@
#if self.src_mac == 'mcast': self.setup(src_mac='mcast')
#if self.src_mac == 'zeros': self.setup(src_mac='zeros')
#if self.src_mac == 'default': self.setup(src_mac='default')
- log_test.info('source mac is %s'%self.src_mac)
+ #log_test.info('Interface: %s, Source mac is %s' %(self.intf, self.src_mac))
self.setup(src_mac=self.src_mac)
self.nextEvent = self.tlsEventTable.EVT_EAP_START
def _eapStart(self):
- log_test.info('_eapStart method started')
+ log_test.info('_eapStart method started over interface %s' %(self.intf))
self.eapol_start()
self.nextEvent = self.tlsEventTable.EVT_EAP_ID_REQ
def _eapIdReq(self):
- log_test.info( 'Inside EAP ID Req' )
+ log_test.info('Inside EAP ID Req for interface %s' %(self.intf))
def eapol_cb(pkt):
- log_test.info('Got EAPOL packet with type id and code request')
- log_test.info('Packet code: %d, type: %d, id: %d', pkt[EAP].code, pkt[EAP].type, pkt[EAP].id)
- log_test.info("<====== Send EAP Response with identity = %s ================>" % USER)
+ log_test.info('Got EAPOL packet with type id and code request for interface %s' %(self.intf))
+ log_test.info('Interface: %s, Packet code: %d, type: %d, id: %d' %(self.intf, pkt[EAP].code, pkt[EAP].type, pkt[EAP].id))
+ log_test.info("Send EAP Response with identity %s over interface %s" % (USER, self.intf))
if self.id_mismatch_in_identifier_response_packet:
- log_test.info('\nSending invalid id field in EAP Identity Response packet')
+ log_test.info('\nSending invalid id field in EAP Identity Response packet over interface %s' %(self.intf))
self.eapol_id_req(pkt[EAP].id+10, USER)
else:
self.eapol_id_req(pkt[EAP].id, USER)
@@ -329,7 +329,7 @@
def _eapTlsHelloReq(self):
def eapol_cb(pkt):
- log_test.info('Got hello request for id %d', pkt[EAP].id)
+ log_test.info('Got hello request for id %d over interface %s', pkt[EAP].id, self.intf)
self.client_hello = TLSClientHello(version= self.version,
gmt_unix_time=self.gmt_unix_time,
random_bytes= '\xAB' * 28,
@@ -355,7 +355,7 @@
else:
reqdata = TLSRecord()/client_hello_data
self.load_tls_record(str(reqdata))
- log_test.info("Sending Client Hello TLS payload of len %d, id %d" %(len(reqdata),pkt[EAP].id))
+ log_test.info("Sending Client Hello TLS payload of len %d, id %d over interface %s" %(len(reqdata),pkt[EAP].id, self.intf))
if self.id_mismatch_in_client_hello_packet:
log_test.info('\nsending invalid id field in client hello packet')
eap_payload = self.eapTLS(EAP_RESPONSE, pkt[EAP].id+10, TLS_LENGTH_INCLUDED, str(reqdata))
@@ -406,7 +406,7 @@
return to_raw(TLSPlaintext(data = 'GET / HTTP/1.1\r\nHOST: localhost\r\n\r\n'), self.tls_ctx)
def _eapTlsCertReq(self):
- log_test.info('Receiving server certificates')
+ log_test.info('Receiving server certificates over interface %s', self.intf)
while self.server_hello_done_received == False:
r = self.eapol_scapy_recv(cb = self.eapol_server_hello_cb,
lfilter =
@@ -415,7 +415,7 @@
if len(r) == 0:
self.tlsFail()
return r
- log_test.info('Sending client certificate request')
+ log_test.info('Sending client certificate request over interface %s', self.intf)
rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
if self.client_cert:
der_cert = rex_pem.findall(self.client_cert)[0].decode("base64")
@@ -444,7 +444,7 @@
else:
client_cert_record = TLSRecord(content_type=TLSContentType.HANDSHAKE)/client_cert_verify
self.pkt_history.append(str(client_cert_verify))
- #log_test.info('TLS ctxt: %s' %self.tls_ctx)
+ #log_test.info('Interface: %s, TLS ctxt: %s' %(self.intf, self.tls_ctx))
client_ccs = TLSRecord(version="TLS_1_0")/TLSChangeCipherSpec()
enc_handshake_msg = self.get_encrypted_handshake_msg()
if self.invalid_content_type:
@@ -453,7 +453,7 @@
handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
reqdata += handshake_msg
- log_test.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
+ log_test.info("Sending Client Hello TLS Certificate payload of len %d over interface %s" %(len(reqdata), self.intf))
if self.dont_send_client_certificate:
log_test.info('\nskipping sending client certificate part')
pass
@@ -465,7 +465,7 @@
def _eapTlsCertReq_delay(self):
self.server_hello_done_received = True
- log_test.info('Sending client certificate request')
+ log_test.info('Sending client certificate request over interface %s', self.intf)
rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
if self.client_cert:
@@ -511,7 +511,7 @@
handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
reqdata += handshake_msg
- log_test.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
+ log_test.info("Sending Client Hello TLS Certificate payload of len %d over interface %s" %(len(reqdata), self.intf))
if self.dont_send_client_certificate:
log_test.info('\nSkipping sending client certificate part')
@@ -525,11 +525,11 @@
def _eapTlsChangeCipherSpec(self):
def eapol_cb(pkt):
r = str(pkt)
- log_test.info('data received in change cipher spec function is %s'%pkt.show())
+ log_test.info('Interface %s. Received data in change cipher spec function is %s'%(self.intf, pkt.show()))
tls_data = r[self.TLS_OFFSET:]
- log_test.info('Verifying TLS Change Cipher spec record type %x' %ord(tls_data[0]))
+ log_test.info('Verifying TLS Change Cipher spec record type %x over interface %s' %(ord(tls_data[0]), self.intf))
assert tls_data[0] == self.CHANGE_CIPHER
- log_test.info('Handshake finished. Sending empty data')
+ log_test.info('Handshake finished. Sending empty data over interface %s' %self.intf)
eap_payload = self.eapTLS(EAP_RESPONSE, pkt[EAP].id, 0, '')
self.eapol_send(EAPOL_EAPPACKET, eap_payload)
@@ -545,7 +545,7 @@
def _eapTlsFinished(self):
self.nextEvent = None
def eapol_cb(pkt):
- log_test.info('Server authentication successfull')
+ log_test.info('Server authentication successfull over interface %s' %self.intf)
timeout = 5
if self.failTest is True: