Test: Receive TLS server certificates till we receive server hello done.
This avoids a hack that tries to receive a pre-determined number of fragments.
Change-Id: I4d10d8e6dd8a87c231a936ce7bad61ad9def4ee4
diff --git a/src/test/utils/EapTLS.py b/src/test/utils/EapTLS.py
index dd33b05..6d93683 100644
--- a/src/test/utils/EapTLS.py
+++ b/src/test/utils/EapTLS.py
@@ -134,6 +134,7 @@
self.nextEvent = None
self.pending_bytes = 0 #for TLS fragment reassembly
self.server_hello_done_received = False
+ self.server_hello_done_eap_id = 0
self.send_tls_response = True
self.server_certs = []
self.pkt_last = ''
@@ -223,6 +224,7 @@
log.info('Pending bytes left %d' %(self.pending_bytes))
assert self.pending_bytes > 0
elif tls_data[0] == self.SERVER_HELLO_DONE:
+ self.server_hello_done_eap_id = pkt[EAP].id
self.pkt_update(tls_data[0], tls_data, reassembled = True)
break
else:
@@ -232,8 +234,8 @@
self.pending_bytes = 0
self.pkt_last = ''
- #send TLS response
- if self.send_tls_response:
+ #send TLS response ack till we receive server hello done
+ if self.server_hello_done_received == False:
eap_payload = self.eapTLS(EAP_RESPONSE, pkt[EAP].id, TLS_LENGTH_INCLUDED, '')
self.eapol_send(EAPOL_EAPPACKET, eap_payload)
@@ -289,15 +291,7 @@
self.tlsFail()
return r
- for i in range(2):
- r = self.eapol_scapy_recv(cb = self.eapol_server_hello_cb,
- lfilter =
- lambda pkt: EAP in pkt and pkt[EAP].type == EAP_TYPE_TLS and pkt[EAP].code == EAP.REQUEST)
- if len(r) == 0:
- self.tlsFail()
- return r
-
- ##send cert request when we receive the last server hello fragment
+ #move to client/server certificate request
self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CERT_REQ
def get_verify_data(self):
@@ -326,51 +320,48 @@
return to_raw(TLSPlaintext(data = 'GET / HTTP/1.1\r\nHOST: localhost\r\n\r\n'), self.tls_ctx)
def _eapTlsCertReq(self):
+ log.info('Receiving server certificates')
+ while self.server_hello_done_received == False:
+ r = self.eapol_scapy_recv(cb = self.eapol_server_hello_cb,
+ lfilter =
+ lambda pkt: EAP in pkt and pkt[EAP].type == EAP_TYPE_TLS and \
+ pkt[EAP].code == EAP.REQUEST)
+ if len(r) == 0:
+ self.tlsFail()
+ return r
- def eapol_cb(pkt):
- log.info('Got cert request')
- self.send_tls_response = False
- self.eapol_server_hello_cb(pkt)
- assert self.server_hello_done_received == True
- rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
- if self.client_cert:
- der_cert = rex_pem.findall(self.client_cert)[0].decode("base64")
- client_certificate_list = TLSHandshake()/TLSCertificateList(
- certificates=[TLSCertificate(data=x509.X509Cert(der_cert))])
- else:
- client_certificate_list = TLSHandshake()/TLSCertificateList(certificates=[])
- client_certificate = TLSRecord(version="TLS_1_0")/client_certificate_list
- kex_data = self.tls_ctx.get_client_kex_data()
- client_key_ex_data = TLSHandshake()/kex_data
- client_key_ex = TLSRecord()/client_key_ex_data
- if self.client_cert:
- self.load_tls_record(str(client_certificate))
- self.pkt_history.append(str(client_certificate_list))
- self.load_tls_record(str(client_key_ex))
- self.pkt_history.append(str(client_key_ex_data))
- verify_signature = self.get_verify_signature(self.client_priv_key)
- client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_VERIFY)/verify_signature
- client_cert_record = TLSRecord(content_type=TLSContentType.HANDSHAKE)/client_cert_verify
- self.pkt_history.append(str(client_cert_verify))
- #log.info('TLS ctxt: %s' %self.tls_ctx)
- client_ccs = TLSRecord(version="TLS_1_0")/TLSChangeCipherSpec()
- enc_handshake_msg = self.get_encrypted_handshake_msg()
- handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
- reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
- reqdata += handshake_msg
- log.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
- status = self.eapFragmentSend(EAP_RESPONSE, pkt[EAP].id, TLS_LENGTH_INCLUDED,
- payload = reqdata, fragsize = 1024)
- assert_equal(status, True)
-
- r = self.eapol_scapy_recv(cb = eapol_cb,
- lfilter =
- lambda pkt: EAP in pkt and pkt[EAP].type == EAP_TYPE_TLS and pkt[EAP].code == EAP.REQUEST)
- if len(r) > 0:
- self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CHANGE_CIPHER_SPEC
+ log.info('Sending client certificate request')
+ rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
+ if self.client_cert:
+ der_cert = rex_pem.findall(self.client_cert)[0].decode("base64")
+ client_certificate_list = TLSHandshake()/TLSCertificateList(
+ certificates=[TLSCertificate(data=x509.X509Cert(der_cert))])
else:
- self.tlsFail()
- return r
+ client_certificate_list = TLSHandshake()/TLSCertificateList(certificates=[])
+ client_certificate = TLSRecord(version="TLS_1_0")/client_certificate_list
+ kex_data = self.tls_ctx.get_client_kex_data()
+ client_key_ex_data = TLSHandshake()/kex_data
+ client_key_ex = TLSRecord()/client_key_ex_data
+ if self.client_cert:
+ self.load_tls_record(str(client_certificate))
+ self.pkt_history.append(str(client_certificate_list))
+ self.load_tls_record(str(client_key_ex))
+ self.pkt_history.append(str(client_key_ex_data))
+ verify_signature = self.get_verify_signature(self.client_priv_key)
+ client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_VERIFY)/verify_signature
+ client_cert_record = TLSRecord(content_type=TLSContentType.HANDSHAKE)/client_cert_verify
+ self.pkt_history.append(str(client_cert_verify))
+ #log.info('TLS ctxt: %s' %self.tls_ctx)
+ client_ccs = TLSRecord(version="TLS_1_0")/TLSChangeCipherSpec()
+ enc_handshake_msg = self.get_encrypted_handshake_msg()
+ handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
+ reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
+ reqdata += handshake_msg
+ log.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
+ status = self.eapFragmentSend(EAP_RESPONSE, self.server_hello_done_eap_id, TLS_LENGTH_INCLUDED,
+ payload = reqdata, fragsize = 1024)
+ assert_equal(status, True)
+ self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CHANGE_CIPHER_SPEC
def _eapTlsChangeCipherSpec(self):
def eapol_cb(pkt):