blob: dfd6f884a0af56e818a6f27a685c77cb09555b01 [file] [log] [blame]
A R Karthicka2e53d62016-02-19 17:38:30 -08001import sys, os
A R Karthicka2e53d62016-02-19 17:38:30 -08002from EapolAAA import *
3from enum import *
4import noseTlsAuthHolder as tlsAuthHolder
5from scapy_ssl_tls.ssl_tls import *
6from socket import *
7from struct import *
8import scapy
9from nose.tools import *
10from CordTestBase import CordTester
Chetan Gaonker4a25e2b2016-03-04 14:45:15 -080011import re
Chetan Gaonkerf8f77182016-03-11 15:34:57 -080012log.setLevel('DEBUG')
A R Karthicka2e53d62016-02-19 17:38:30 -080013class TLSAuthTest(EapolPacket, CordTester):
14
15 tlsStateTable = Enumeration("TLSStateTable", ("ST_EAP_SETUP",
16 "ST_EAP_START",
17 "ST_EAP_ID_REQ",
18 "ST_EAP_TLS_HELLO_REQ",
19 "ST_EAP_TLS_CERT_REQ",
Chetan Gaonkerf8f77182016-03-11 15:34:57 -080020 "ST_EAP_TLS_CHANGE_CIPHER_SPEC",
21 "ST_EAP_TLS_FINISHED",
A R Karthicka2e53d62016-02-19 17:38:30 -080022 "ST_EAP_TLS_DONE"
23 )
24 )
25 tlsEventTable = Enumeration("TLSEventTable", ("EVT_EAP_SETUP",
26 "EVT_EAP_START",
27 "EVT_EAP_ID_REQ",
28 "EVT_EAP_TLS_HELLO_REQ",
29 "EVT_EAP_TLS_CERT_REQ",
Chetan Gaonkerf8f77182016-03-11 15:34:57 -080030 "EVT_EAP_TLS_CHANGE_CIPHER_SPEC",
31 "EVT_EAP_TLS_FINISHED",
A R Karthicka2e53d62016-02-19 17:38:30 -080032 "EVT_EAP_TLS_DONE"
33 )
34 )
35 def __init__(self, intf = 'veth0'):
36 self.fsmTable = tlsAuthHolder.initTlsAuthHolderFsmTable(self, self.tlsStateTable, self.tlsEventTable)
37 EapolPacket.__init__(self, intf)
38 CordTester.__init__(self, self.fsmTable, self.tlsStateTable.ST_EAP_TLS_DONE)
39 #self.tlsStateTable, self.tlsEventTable)
40 self.currentState = self.tlsStateTable.ST_EAP_SETUP
41 self.currentEvent = self.tlsEventTable.EVT_EAP_SETUP
42 self.nextState = None
43 self.nextEvent = None
44
45 def _eapSetup(self):
A R Karthicka2e53d62016-02-19 17:38:30 -080046 self.setup()
47 self.nextEvent = self.tlsEventTable.EVT_EAP_START
48
49 def _eapStart(self):
A R Karthicka2e53d62016-02-19 17:38:30 -080050 self.eapol_start()
51 self.nextEvent = self.tlsEventTable.EVT_EAP_ID_REQ
52
53 def _eapIdReq(self):
A R Karthicka2e53d62016-02-19 17:38:30 -080054 p = self.eapol_recv()
55 code, pkt_id, eaplen = unpack("!BBH", p[0:4])
A R Karthicka2e53d62016-02-19 17:38:30 -080056 assert_equal(code, EAP_REQUEST)
57 reqtype = unpack("!B", p[4:5])[0]
58 reqdata = p[5:4+eaplen]
59 assert_equal(reqtype, EAP_TYPE_ID)
Chetan Gaonkerf8f77182016-03-11 15:34:57 -080060 log.debug("<====== Send EAP Response with identity = %s ================>" % USER)
A R Karthicka2e53d62016-02-19 17:38:30 -080061 self.eapol_id_req(pkt_id, USER)
62 self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_HELLO_REQ
63
64 def _eapTlsHelloReq(self):
A R Karthicka2e53d62016-02-19 17:38:30 -080065 p = self.eapol_recv()
66 code, pkt_id, eaplen = unpack("!BBH", p[0:4])
A R Karthicka2e53d62016-02-19 17:38:30 -080067 assert_equal(code, EAP_REQUEST)
68 reqtype = unpack("!B", p[4:5])[0]
69 assert_equal(reqtype, EAP_TYPE_TLS)
70 reqdata = TLSRecord(version="TLS_1_0")/TLSHandshake()/TLSClientHello(version="TLS_1_0",
71 gmt_unix_time=1234,
72 random_bytes="A" * 28,
73 session_id='',
74 compression_methods=(TLSCompressionMethod.NULL),
75 cipher_suites=[TLSCipherSuite.RSA_WITH_AES_128_CBC_SHA]
76 )
77
78 #reqdata.show()
Chetan Gaonkerf8f77182016-03-11 15:34:57 -080079 log.debug("------> Sending Client Hello TLS payload of len %d ----------->" %len(reqdata))
A R Karthicka2e53d62016-02-19 17:38:30 -080080 eap_payload = self.eapTLS(EAP_RESPONSE, pkt_id, TLS_LENGTH_INCLUDED, str(reqdata))
81 self.eapol_send(EAPOL_EAPPACKET, eap_payload)
82 self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CERT_REQ
83
84 def _eapTlsCertReq(self):
A R Karthicka2e53d62016-02-19 17:38:30 -080085 p = self.eapol_recv()
Chetan Gaonker4a25e2b2016-03-04 14:45:15 -080086 code, pkt_id, eaplen = unpack("!BBH", p[0:4])
Chetan Gaonker4a25e2b2016-03-04 14:45:15 -080087 assert_equal(code, EAP_REQUEST)
88 reqtype = unpack("!B", p[4:5])[0]
89 assert_equal(reqtype, EAP_TYPE_TLS)
90 rex_pem = re.compile(r'\-+BEGIN[^\-]+\-+(.*?)\-+END[^\-]+\-+', re.DOTALL)
Chetan Gaonkerf8f77182016-03-11 15:34:57 -080091 self.pem_cert = """-----BEGIN CERTIFICATE-----
92MIIDvTCCAqWgAwIBAgIBAjANBgkqhkiG9w0BAQUFADCBizELMAkGA1UEBhMCVVMx
93CzAJBgNVBAgTAkNBMRIwEAYDVQQHEwlTb21ld2hlcmUxEzARBgNVBAoTCkNpZW5h
94IEluYy4xHjAcBgkqhkiG9w0BCQEWD2FkbWluQGNpZW5hLmNvbTEmMCQGA1UEAxMd
95RXhhbXBsZSBDZXJ0aWZpY2F0ZSBBdXRob3JpdHkwHhcNMTYwMzExMTg1MzM2WhcN
96MTcwMzA2MTg1MzM2WjBnMQswCQYDVQQGEwJVUzELMAkGA1UECBMCQ0ExEzARBgNV
97BAoTCkNpZW5hIEluYy4xFzAVBgNVBAMUDnVzZXJAY2llbmEuY29tMR0wGwYJKoZI
98hvcNAQkBFg51c2VyQGNpZW5hLmNvbTCCASIwDQYJKoZIhvcNAQEBBQADggEPADCC
99AQoCggEBAOxemcBsPn9tZsCa5o2JA6sQDC7A6JgCNXXl2VFzKLNNvB9PS6D7ZBsQ
1005An0zEDMNzi51q7lnrYg1XyiE4S8FzMGAFr94RlGMQJUbRD9V/oqszMX4k++iAOK
101tIA1gr3x7Zi+0tkjVSVzXTmgNnhChAamdMsjYUG5+CY9WAicXyy+VEV3zTphZZDR
102OjcjEp4m/TSXVPYPgYDXI40YZKX5BdvqykWtT/tIgZb48RS1NPyN/XkCYzl3bv21
103qx7Mc0fcEbsJBIIRYTUkfxnsilcnmLxSYO+p+DZ9uBLBzcQt+4Rd5pLSfi21WM39
1042Z2oOi3vs/OYAPAqgmi2JWOv3mePa/8CAwEAAaNPME0wEwYDVR0lBAwwCgYIKwYB
105BQUHAwIwNgYDVR0fBC8wLTAroCmgJ4YlaHR0cDovL3d3dy5leGFtcGxlLmNvbS9l
106eGFtcGxlX2NhLmNybDANBgkqhkiG9w0BAQUFAAOCAQEALBzMPDTIB6sLyPl0T6JV
107MjOkyldAVhXWiQsTjaGQGJUUe1cmUJyZbUZEc13MygXMPOM4x7z6VpXGuq1c/Vxn
108VzQ2fNnbJcIAHi/7G8W5/SQfPesIVDsHTEc4ZspPi5jlS/MVX3HOC+BDbOjdbwqP
109RX0JEr+uOyhjO+lRxG8ilMRACoBUbw1eDuVDoEBgErSUC44pq5ioDw2xelc+Y6hQ
110dmtYwfY0DbvwxHtA495frLyPcastDiT/zre7NL51MyUDPjjYjghNQEwvu66IKbQ3
111T1tJBrgI7/WI+dqhKBFolKGKTDWIHsZXQvZ1snGu/FRYzg1l+R/jT8cRB9BDwhUt
112yg==
Chetan Gaonker4a25e2b2016-03-04 14:45:15 -0800113-----END CERTIFICATE-----"""
114 self.der_cert = rex_pem.findall(self.pem_cert)[0].decode("base64")
Chetan Gaonker4a25e2b2016-03-04 14:45:15 -0800115 reqdata = TLSRecord(version="TLS_1_0")/TLSHandshake()/TLSCertificateList(
116 certificates=[TLSCertificate(data=x509.X509Cert(self.der_cert))])
117 #reqdata.show()
Chetan Gaonkerf8f77182016-03-11 15:34:57 -0800118 log.info("------> Sending Client Hello TLS Certificate payload of len %d ----------->" %len(reqdata))
Chetan Gaonker4a25e2b2016-03-04 14:45:15 -0800119 eap_payload = self.eapTLS(EAP_RESPONSE, pkt_id, TLS_LENGTH_INCLUDED, str(reqdata))
120 self.eapol_send(EAPOL_EAPPACKET, eap_payload)
Chetan Gaonkerf8f77182016-03-11 15:34:57 -0800121 self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_CHANGE_CIPHER_SPEC
122
123 def _eapTlsChangeCipherSpec(self):
124 p = self.eapol_recv()
125 code, pkt_id, eaplen = unpack("!BBH", p[0:4])
126 assert_equal(code, EAP_REQUEST)
127 reqtype = unpack("!B", p[4:5])[0]
128 assert_equal(reqtype, EAP_TYPE_TLS)
129 reqdata = TLSFinished(data="")
130 eap_payload = self.eapTLS(EAP_RESPONSE, pkt_id, TLS_LENGTH_INCLUDED, str(reqdata))
131 self.eapol_send(EAPOL_EAPPACKET, eap_payload)
132 self.nextEvent = self.tlsEventTable.EVT_EAP_TLS_FINISHED
133
134 def _eapTlsFinished(self):
135 p = self.eapol_recv()
136 code, pkt_id, eaplen = unpack("!BBH", p[0:4])
137 log.debug("Code %d, id %d, len %d" %(code, pkt_id, eaplen))
138 assert_equal(code, EAP_REQUEST)
139 reqtype = unpack("!B", p[4:5])[0]
140 assert_equal(reqtype, EAP_TYPE_TLS)
141 #We stop here as certification validation success implies auth success
A R Karthicka2e53d62016-02-19 17:38:30 -0800142 self.nextEvent = None