blob: e9094c8d6351da69829fad936a932ca416202b5f [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
# Copyright 2016-present Ciena Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
#### Authentication parameters
from scapy.all import *
from scapy_ssl_tls.ssl_tls import *
from socket import *
from struct import *
import os
import sys
import binascii
import shutil
from nose.tools import assert_equal, assert_not_equal, assert_raises, assert_true
from CordTestUtils import log_test
USER = "raduser"
PASS = "radpass"
WRONG_USER = "XXXX"
WRONG_PASS = "XXXX"
NO_USER = ""
NO_PASS = ""
DEV = "tap0"
ETHERTYPE_PAE = 0x888e
PAE_GROUP_ADDR = "\xff\xff\xff\xff\xff\xff"
EAPOL_VERSION = 1
EAPOL_EAPPACKET = 0
EAPOL_START = 1
EAPOL_LOGOFF = 2
EAPOL_KEY = 3
EAPOL_ASF = 4
EAP_REQUEST = 1
EAP_RESPONSE = 2
EAP_SUCCESS = 3
EAP_FAILURE = 4
EAP_TYPE_ID = 1
EAP_TYPE_MD5 = 4
EAP_TYPE_MSCHAP = 26
EAP_TYPE_TLS = 13
cCertMsg = '\x0b\x00\x00\x03\x00\x00\x00'
TLS_LENGTH_INCLUDED = 0x80
TLS_MORE_FRAGMENTS = 0x40
RADIUS_USER_MAC_START = (0x02 << 40) | (0x03 << 32) | (0x04 << 24) | 1
RADIUS_USER_MAC_END = (0x02 << 40) | (0x03 << 32) | (0x04 << 24) | (0xff << 16) | ( 0xff << 8 ) | 0xff
class EapolPacket(object):
src_mac_map = { 'bcast': 'ff:ff:ff:ff:ff:ff',
'mcast': '01:80:C2:00:00:03',
'zeros': '00:00:00:00:00:00',
'default': None
}
def __init__(self, intf = 'veth0'):
self.intf = intf
self.s = None
self.max_recv_size = 1600
def setup(self, src_mac = 'default'):
self.s = socket(AF_PACKET, SOCK_RAW, htons(ETHERTYPE_PAE))
self.s.bind((self.intf, ETHERTYPE_PAE))
self.mymac = self.s.getsockname()[4]
mac = None
mac_str = None
if src_mac == 'random':
mac = RandMAC()._fix()
elif src_mac in self.src_mac_map:
mac = self.src_mac_map[src_mac]
if mac is None:
mac = self.mymac
mac_str = binascii.hexlify(mac)
if mac_str is None:
mac_str = mac
self.llheader = Ether(dst = PAE_GROUP_ADDR, src = mac, type = ETHERTYPE_PAE)
log_test.info('llheader packet is %s'%self.llheader.show())
log_test.info('source mac of packet is %s'%mac_str)
self.recv_sock = L2Socket(iface = self.intf, type = ETHERTYPE_PAE)
def cleanup(self):
if self.s is not None:
self.s.close()
self.s = None
def eapol(self, req_type, payload=""):
return EAPOL(version = EAPOL_VERSION, type = req_type)/payload
def eap(self, code, pkt_id, req_type=0, data=""):
return EAP(code = code, id = pkt_id, type = req_type)/data
def eapFragmentSend(self, code, pkt_id, flags = TLS_LENGTH_INCLUDED, payload = "", fragsize = 1024):
req_type = EAP_TYPE_TLS
if code in [ EAP_SUCCESS, EAP_FAILURE ]:
data = pack("!BBH", code, pkt_id, 4)
self.eapol_send(EAPOL_EAPPACKET, data)
return True
if len(payload) <= fragsize:
if flags & TLS_LENGTH_INCLUDED:
flags_dlen = pack("!BL", flags, len(payload))
data = pack("!BBHB", code, pkt_id, 5 + len(flags_dlen) + len(payload), req_type) \
+ flags_dlen + payload
self.eapol_send(EAPOL_EAPPACKET, data)
return True
flags_str = pack("!B", flags)
data = pack("!BBHB", code, pkt_id, 5+len(flags_str)+len(payload), req_type) + flags_str + payload
self.eapol_send(EAPOL_EAPPACKET, data)
return True
fragments = []
data = payload[:]
frag = 0
def eapol_frag_cb(pkt):
r = str(pkt)
tls_data = r[self.TLS_OFFSET:]
frag_data = fragments[frag]
##change packet id in response to match request
eap_payload = frag_data[:1] + pack("!B", pkt[EAP].id) + frag_data[2:]
self.eapol_send(EAPOL_EAPPACKET, eap_payload)
while len(data) > 0:
data_frag = data[:fragsize]
data = data[fragsize:]
if frag == 0:
##first frag, include the total length
flags_dlen = pack("!BL", TLS_LENGTH_INCLUDED | TLS_MORE_FRAGMENTS, len(payload))
fragments.append(pack("!BBHB", code, pkt_id, 5 + len(flags_dlen) + len(data_frag), req_type) \
+ flags_dlen + data_frag)
else:
if len(data) > 0:
flags = TLS_MORE_FRAGMENTS
else:
flags = 0
flags_str = pack("!B", flags)
fragments.append(pack("!BBHB", code, pkt_id, 5+len(flags_str)+len(data_frag), req_type) + \
flags_str + data_frag)
frag += 1
frag = 0
self.eapol_send(EAPOL_EAPPACKET, fragments[frag])
for frag in range(len(fragments)-1):
frag += 1
r = self.eapol_scapy_recv(cb = eapol_frag_cb,
lfilter = lambda pkt: EAP in pkt and pkt[EAP].type == EAP_TYPE_TLS and \
pkt[EAP].code == EAP.REQUEST)
return True
def eapTLS(self, code, pkt_id, flags = TLS_LENGTH_INCLUDED, data=""):
req_type = EAP_TYPE_TLS
if code in [EAP_SUCCESS, EAP_FAILURE]:
return pack("!BBH", code, pkt_id, 4)
else:
if flags & TLS_LENGTH_INCLUDED:
flags_dlen = pack("!BL", flags, len(data))
return pack("!BBHB", code, pkt_id, 5+len(flags_dlen)+len(data), req_type) + flags_dlen + data
flags_str = pack("!B", flags)
return pack("!BBHB", code, pkt_id, 5+len(flags_str)+len(data), req_type) + flags_str + data
def eapTLSFragment(self, code, pkt_id, frag, data="", data_len = 0):
req_type = EAP_TYPE_TLS
if frag == 0:
flags = TLS_LENGTH_INCLUDED | TLS_MORE_FRAGMENTS
elif frag > 0:
flags = TLS_MORE_FRAGMENTS
else:
#last fragment
flags = 0
if data_len == 0:
data_len = len(data)
if flags & TLS_LENGTH_INCLUDED:
flags_dlen = pack("!BL", flags, data_len)
return pack("!BBHB", code, pkt_id, 5+len(flags_dlen)+len(data), req_type) + flags_dlen + data
flags_str = pack("!B", flags)
return pack("!BBHB", code, pkt_id, 5+len(flags_str)+len(data), req_type) + flags_str + data
def eapol_send(self, eapol_type, eap_payload):
return sendp(self.llheader/self.eapol(eapol_type, eap_payload), iface=self.intf)
def eapol_recv(self):
p = self.s.recv(self.max_recv_size)[14:]
vers,pkt_type,eapollen = unpack("!BBH",p[:4])
print "Version %d, type %d, len %d" %(vers, pkt_type, eapollen)
assert_equal(pkt_type, EAPOL_EAPPACKET)
return p[4:]
def eapol_scapy_recv(self, cb = None, lfilter = None, count = 1, timeout = 10):
def eapol_default_cb(pkt): pass
if cb is None:
cb = eapol_default_cb
return sniff(prn = cb, lfilter = lfilter, count = count, timeout = timeout, opened_socket = self.recv_sock)
def eapol_start(self):
eap_payload = self.eap(EAPOL_START, 2)
return self.eapol_send(EAPOL_START, eap_payload)
def eapol_logoff(self):
eap_payload = self.eap(EAPOL_LOGOFF, 2)
return self.eapol_send(EAPOL_LOGOFF, eap_payload)
def eapol_id_req(self, pkt_id = 0, user = USER):
eap_payload = self.eap(EAP_RESPONSE, pkt_id, EAP_TYPE_ID, user)
return self.eapol_send(EAPOL_EAPPACKET, eap_payload)
def eap_md5_challenge_recv(self,rad_pwd):
PASS = rad_pwd
print 'Inside EAP MD5 Challenge Exchange'
p = self.s.recv(self.max_recv_size)[14:]
vers,pkt_type,eapollen = unpack("!BBH",p[:4])
print "EAPOL Version %d, type %d, len %d" %(vers, pkt_type, eapollen)
code, pkt_id, eaplen = unpack("!BBH", p[4:8])
print "EAP Code %d, id %d, len %d" %(code, pkt_id, eaplen)
assert_equal(code, EAP_REQUEST)
reqtype = unpack("!B", p[8:9])[0]
reqdata = p[9:4+eaplen]
print 'Request type is %d' %(reqtype)
assert_equal(reqtype, EAP_TYPE_MD5)
challenge=pack("!B",pkt_id)+PASS+reqdata[1:]
print "Generating md5 challenge for %s" % challenge
return (challenge,pkt_id)
def eap_Status(self):
print 'Inside EAP Status'
p = self.s.recv(self.max_recv_size)[14:]
code, id, eaplen = unpack("!BBH", p[4:8])
return code
@classmethod
def eap_invalid_tls_packets_info(self, invalid_field_name = None, invalid_field_value = None):
log_test.info( 'Changing invalid field values in tls auth packets' )
if invalid_field_name == 'eapolTlsVersion':
global EAPOL_VERSION
log_test.info( 'Changing invalid field values in tls auth packets====== version changing' )
EAPOL_VERSION = invalid_field_value
if invalid_field_name == 'eapolTlsType':
global EAP_TYPE_TLS
log_test.info( 'Changing invalid field values in tls auth packets====== EAP TYPE TLS changing' )
EAP_TYPE_TLS = invalid_field_value
if invalid_field_name == 'eapolTypeID':
global EAP_TYPE_ID
log_test.info( 'Changing invalid field values in tls auth packets====== EAP TYPE TLS changing' )
EAP_TYPE_ID = invalid_field_value
if invalid_field_name == 'eapolResponse':
global EAP_RESPONSE
log_test.info( 'Changing invalid field values in tls auth packets====== EAP TYPE TLS changing' )
EAP_RESPONSE = invalid_field_value
@classmethod
def eap_tls_packets_field_value_replace(self, invalid_field_name = None):
log_test.info( 'Changing invalid field values in tls auth packets' )
if invalid_field_name == 'eapolTlsVersion':
global EAPOL_VERSION
EAPOL_VERSION = 1
log_test.info( 'Changing invalid field values in tls auth packets====== version changing' )
if invalid_field_name == 'eapolTlsType':
global EAP_TYPE_TLS
EAP_TYPE_TLS = 13
log_test.info( 'Changing invalid field values in tls auth packets====== version changing' )
if invalid_field_name == 'eapolTypeID':
global EAP_TYPE_ID
EAP_TYPE_ID = 1
log_test.info( 'Changing invalid field values in tls auth packets====== version changing' )
if invalid_field_name == 'eapolResponse':
global EAP_RESPONSE
EAP_RESPONSE = 2
log_test.info( 'Changing invalid field values in tls auth packets====== version changing' )
def get_radius_macs(num, start = 0, end = 0):
"""Generate radius server mac addresses"""
"""Scope to generate 256*256*256 mac addresses"""
if start == 0 or end == 0:
s = (0x00 << 40) | (0x02 << 32) | ( 0x03 << 24) | (1)
e = (0x00 << 40) | (0x02 << 32) | ( 0x03 << 24) | (0xff << 16) | (0xff << 8) | (0xff)
else:
s = start
e = end
n_macs = []
for v in xrange(s, e):
mask = (v & 0xff0000) == 0xff0000 or \
(v & 0x00ff00) == 0x00ff00 or \
(v & 0x0000ff) == 0x0000ff
if mask:
continue
n_macs.append(v)
if len(n_macs) == num:
break
def n_to_mac(n):
n_tuple = ( (n >> 40) & 0xff,
(n >> 32) & 0xff,
(n >> 24) & 0xff,
(n >> 16) & 0xff,
(n >> 8) & 0xff,
n & 0xff,
)
return '%02x:%02x:%02x:%02x:%02x:%02x' %(n_tuple)
#convert the number to macs
return map(n_to_mac, n_macs)
def get_radius_networks(num):
PORT_SUBNET_START = '12.0.0.0'
PORT_SUBNET_MASK = '/24'
PORT_SUBNET_END = '220.0.0.0'
port_start_list = map(lambda ip: int(ip), PORT_SUBNET_START.split('.'))
port_end_list = map(lambda ip: int(ip), PORT_SUBNET_END.split('.'))
port_subnet_start = (port_start_list[0] << 24) | ( port_start_list[1] << 16 ) | ( port_start_list[2] << 8 ) | 0
port_subnet_end = (port_end_list[0] << 24) | ( port_end_list[1] << 16 ) | ( port_end_list[2] << 8 ) | 0
mask = int(PORT_SUBNET_MASK[1:])
net_list = []
for n in xrange(port_subnet_start, port_subnet_end, 256):
subnet = ((n >> 24) & 0xff, (n >> 16) & 0xff, (n >> 8) & 0xff, 0, mask)
prefix = subnet[:3]
gw = prefix + (1,)
subnet_s = '{}.{}.{}.{}/{}'.format(*subnet)
prefix_s = '{}.{}.{}'.format(*prefix)
gw_s = '{}.{}.{}.{}'.format(*gw)
net_list.append((prefix_s, subnet_s, gw_s))
if len(net_list) >= num:
break
return net_list
def get_radius_user_file():
cur_dir = os.path.dirname(os.path.realpath(__file__))
radius_authorize = 'setup/radius-config/freeradius/mods-config/files/authorize'
radius_user_file = os.path.join(cur_dir, '..', *radius_authorize.split('/'))
return radius_user_file
def radius_add_users(num):
global RADIUS_USER_MAC_START, RADIUS_USER_MAC_END
template = '''
%s Cleartext-Password := "radpass"
\tReply-Message := "Hello, %%{User-Name}"
'''
radius_user_file = get_radius_user_file()
if not os.access(radius_user_file, os.F_OK):
return False
mac_start = RADIUS_USER_MAC_START
mac_end = RADIUS_USER_MAC_END
macs = get_radius_macs(num, start = mac_start, end = mac_end)
save_file = '{}.save'.format(radius_user_file)
new_file = '{}.new'.format(radius_user_file)
shutil.copy(radius_user_file, save_file)
with open(radius_user_file, 'r') as f:
lines = f.readlines()
for m in macs:
lines.append(template %(m))
with open(new_file, 'w') as f:
f.writelines(lines)
os.rename(new_file, radius_user_file)
return True
def radius_restore_users():
radius_user_file = get_radius_user_file()
save_file = '{}.save'.format(radius_user_file)
if not os.access(save_file, os.F_OK):
return False
os.rename(save_file, radius_user_file)
return True