TEST: [WIP] Module to test for perspective network conditions
It covers following conditions for different CORD Apps(Presently for IGMP & TLS).
1. Network lag
2. Out of Order , or rearranged packets.
3. Drop packets or Duplicate packet generation
4. Burst or block of traffic.
Change-Id: I1f2c6a9c192e7caacdf86eb680f17dcf4b144573
diff --git a/src/test/utils/EapTLS.py b/src/test/utils/EapTLS.py
index f4e7346..c9cf599 100644
--- a/src/test/utils/EapTLS.py
+++ b/src/test/utils/EapTLS.py
@@ -223,7 +223,7 @@
def tlsFail(self):
##Force a failure
- log.info('entering into testFail funciton')
+ log.info('entering into testFail function')
self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_FINISHED
self.nextState = self.tlsStateTable.ST_EAP_TLS_FINISHED
self.failTest = True
@@ -407,7 +407,6 @@
if len(r) == 0:
self.tlsFail()
return r
-
log.info('Sending client certificate request')
rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
if self.client_cert:
@@ -456,6 +455,65 @@
assert_equal(status, True)
self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CHANGE_CIPHER_SPEC
+ def _eapTlsCertReq_delay(self):
+ self.server_hello_done_received = True
+ log.info('Sending client certificate request')
+ rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
+
+ if self.client_cert:
+ der_cert = rex_pem.findall(self.client_cert)[0].decode("base64")
+ client_certificate_list = TLSHandshake()/TLSCertificateList(
+ certificates=[TLSCertificate(data=x509.X509Cert(der_cert))])
+ else:
+ client_certificate_list = TLSHandshake()/TLSCertificateList(certificates=[])
+
+ client_certificate = TLSRecord(version="TLS_1_0")/client_certificate_list
+ kex_data = self.tls_ctx.get_client_kex_data()
+ client_key_ex_data = TLSHandshake()/kex_data
+ client_key_ex = TLSRecord()/client_key_ex_data
+
+ if self.client_cert:
+ self.load_tls_record(str(client_certificate))
+ self.pkt_history.append(str(client_certificate_list))
+
+ self.load_tls_record(str(client_key_ex))
+ self.pkt_history.append(str(client_key_ex_data))
+ verify_signature = self.get_verify_signature(self.client_priv_key)
+
+ if self.invalid_cert_req_handshake:
+ log.info("Sending 'certificate-request' type of handshake message instead of 'certificate-verify' type")
+ client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_REQUEST)/verify_signature
+ else:
+ client_cert_verify = TLSHandshake(type=TLSHandshakeType.CERTIFICATE_VERIFY)/verify_signature
+
+ if self.incorrect_tlsrecord_type_cert_req:
+ log.info("Sending TLS Record type as ALERT instead of HANDSHAKE in certificate request packet")
+ client_cert_record = TLSRecord(content_type=TLSContentType.ALERT)/client_cert_verify
+ else:
+ client_cert_record = TLSRecord(content_type=TLSContentType.HANDSHAKE)/client_cert_verify
+
+ self.pkt_history.append(str(client_cert_verify))
+ #log.info('TLS ctxt: %s' %self.tls_ctx)
+ client_ccs = TLSRecord(version="TLS_1_0")/TLSChangeCipherSpec()
+ enc_handshake_msg = self.get_encrypted_handshake_msg()
+
+ if self.invalid_content_type:
+ handshake_msg = str(TLSRecord(content_type=self.invalid_content_type)/enc_handshake_msg)
+ else:
+ handshake_msg = str(TLSRecord(content_type=TLSContentType.HANDSHAKE)/enc_handshake_msg)
+ reqdata = str(TLS.from_records([client_certificate, client_key_ex, client_cert_record, client_ccs]))
+ reqdata += handshake_msg
+ log.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
+
+ if self.dont_send_client_certificate:
+ log.info('\nSkipping sending client certificate part')
+ pass
+ else:
+ status = self.eapFragmentSend(EAP_RESPONSE, self.server_hello_done_eap_id, TLS_LENGTH_INCLUDED,
+ payload = reqdata, fragsize = 1024)
+ assert_equal(status, True)
+ self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CHANGE_CIPHER_SPEC
+
def _eapTlsChangeCipherSpec(self):
def eapol_cb(pkt):
r = str(pkt)