Test: New changes and test scenarios for
different net conditions triggered by corrupted
packets , burst of packets, delayed packets etc.
Change-Id: Id0aa8adfe0eb9f13458c42618d6810b2845c8ef6
diff --git a/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap b/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap
new file mode 100644
index 0000000..1407801
--- /dev/null
+++ b/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap
Binary files differ
diff --git a/src/test/netCondition/netConditionTest.py b/src/test/netCondition/netConditionTest.py
index 022cfbd..5005772 100644
--- a/src/test/netCondition/netConditionTest.py
+++ b/src/test/netCondition/netConditionTest.py
@@ -34,7 +34,6 @@
from EapTLS import TLSAuthTest
from scapy_ssl_tls.ssl_tls import *
from scapy_ssl_tls.ssl_tls_crypto import *
-log.setLevel('INFO')
from EapolAAA import *
from enum import *
import noseTlsAuthHolder as tlsAuthHolder
@@ -49,12 +48,12 @@
import re
from random import randint
from time import sleep
-
import json
from OnosFlowCtrl import OnosFlowCtrl
from OltConfig import OltConfig
from threading import current_thread
import collections
+log.setLevel('INFO')
class IGMPTestState:
@@ -168,13 +167,12 @@
''' Activate the igmp app'''
apps = self.app_igmp
self.onos_ctrl = OnosCtrl(apps)
-# self.onos_ctrl = OnosCtrl(self.app_tls)
self.onos_aaa_config()
self.onos_ctrl.activate()
self.igmp_channel = IgmpChannel()
def setUp_tls(self):
- ''' Activate the igmp app'''
+ ''' Activate the aaa app'''
apps = self.app_tls
self.onos_ctrl = OnosCtrl(apps)
self.onos_aaa_config()
@@ -185,13 +183,6 @@
for app in apps:
onos_ctrl = OnosCtrl(app)
onos_ctrl.deactivate()
-# log.info('Restarting the Radius container in the setup after running every subscriber test cases by default')
-# rest = Container('cord-radius', 'cord-test/radius',)
-# rest.restart('cord-radius','10')
-# radius = Radius()
-# radius_ip = radius.ip()
-# print('Radius server is running with IP %s' %radius_ip)
- #os.system('ifconfig '+INTF_RX_DEFAULT+' up')
def onos_load_igmp_config(self, config):
log.info('onos load config is %s'%config)
@@ -348,7 +339,7 @@
return 0
def send_igmp_join(self, groups, src_list = ['1.2.3.4'], record_type=IGMP_V3_GR_TYPE_INCLUDE,
- ip_pkt = None, iface = 'veth0', ssm_load = False, delay = 1):
+ ip_pkt = None, iface = 'veth0', ssm_load = False, delay = 1, ip_src = None):
if ssm_load is True:
self.onos_ssm_table_load(groups, src_list)
igmp = IGMPv3(type = IGMP_TYPE_V3_MEMBERSHIP_REPORT, max_resp_code=30,
@@ -358,7 +349,11 @@
gr.sources = src_list
igmp.grps.append(gr)
if ip_pkt is None:
- ip_pkt = self.igmp_eth/self.igmp_ip
+ if ip_src is None:
+ ip_pkt = self.igmp_eth/self.igmp_ip
+ else:
+ igmp_ip_src = IP(dst = self.IP_DST, src = ip_src)
+ ip_pkt = self.igmp_eth/igmp_ip_src
pkt = ip_pkt/igmp
IGMPv3.fixup(pkt)
sendp(pkt, iface=iface)
@@ -418,7 +413,6 @@
else:
log.info('Sending IGMP join for group %s and waiting for periodic query packets and printing one packet' %groups)
resp = srp1(pkt, iface=iface)
-# resp = srp1(pkt, iface=iface) if rec_queryCount else srp3(pkt, iface=iface)
resp[0].summary()
log.info('Sent IGMP join for group %s and received a query packet and printing packet' %groups)
if delay != 0:
@@ -910,7 +904,6 @@
time.sleep(300)
for thread in threads:
thread.join()
- #df.callback(0)
reactor.callLater(0, eap_tls_eapTlsHelloReq_pkt_delay, df)
return df
@@ -937,7 +930,6 @@
log.info('Authentication successful for user %d'%i)
# Client authendicating multiple times one after other and making random delay in between authendication.
for i in xrange(clients):
-# thread = threading.Thread(target = multiple_tls_random_delay)
multiple_tls_random_delay()
time.sleep(randomDelay)
df.callback(0)
@@ -954,7 +946,6 @@
clients = 10
def eap_tls_eapTlsHelloReq_pkt_delay(df):
def multiple_tls_random_delay():
- #randomDelay = 3
for x in xrange(clients):
tls.append(TLSAuthTest(src_mac = 'random'))
for x in xrange(clients):
@@ -988,7 +979,6 @@
def test_netCondition_with_delay_between_mac_flow_and_traffic(self):
df = defer.Deferred()
randomDelay = randint(10,300)
- #self.setUpClass_flows()
egress = 1
ingress = 2
egress_mac = '00:00:00:00:00:01'
@@ -1105,7 +1095,6 @@
sendp(pkt, count=50, iface = self.port_map[ingress])
thread.join()
assert_equal(self.success, True)
- # df.callback(0)
def creating_tcp_flow(df):
flow = OnosFlowCtrl(deviceId = self.device_id,
@@ -1215,7 +1204,6 @@
time.sleep(randomDelay_in_thread)
log.info('This is running in a thread, with igmp join sent and delay {}'.format(randomDelay_in_thread))
status = self.verify_igmp_data_traffic_in_thread(group,intf=self.V_INF1,source=source, data_pkt = data_pkt)
- #assert_equal(status, True)
log.info('Data received for group %s from source %s and status is %s '%(group,source,status))
self.igmp_threads_result.append(status)
@@ -1225,7 +1213,6 @@
thread.start()
threads.append(thread)
-# time.sleep(50)
for thread in threads:
thread.join()
@@ -1414,7 +1401,6 @@
subscribers_src_ip.append(subscriber_sourceip)
count += 1
self.onos_ssm_table_load(groups,src_list=sources,flag=True)
-
def multiple_joins_send_in_threads(group, source, subscriber_src_ip,invalid_igmp_join,data_pkt = data_pkt):
if invalid_igmp_join is None:
self.send_igmp_join(groups = [group], src_list = [source],record_type = IGMP_V3_GR_TYPE_INCLUDE,
@@ -1428,10 +1414,8 @@
time.sleep(randomDelay_in_thread)
log.info('This is running in thread with igmp join sent and delay {}'.format(randomDelay_in_thread))
status = self.verify_igmp_data_traffic_in_thread(group,intf=self.V_INF1,source=source, data_pkt = data_pkt,negative=negative_traffic)
- #assert_equal(status, True)
log.info('data received for group %s from source %s and status is %s '%(group,source,status))
self.igmp_threads_result.append(status)
-
for i in range(subscriber):
thread = threading.Thread(target = multiple_joins_send_in_threads, args = (groups[i], sources[i], subscribers_src_ip[i], invalid_joins))
if bunch_traffic == 'yes':
@@ -1444,12 +1428,9 @@
time.sleep(randint(1,2))
thread.start()
threads.append(thread)
-
-# time.sleep(250)
for thread in threads:
thread.join()
-
@deferred(TEST_TIMEOUT_DELAY+50)
def test_netCondition_with_throttle_between_multiple_igmp_joins_and_data_from_multiple_subscribers(self):
self.setUp_tls()
@@ -1473,7 +1454,7 @@
return df
@deferred(TEST_TIMEOUT_DELAY+50)
- def test_netCondition_with_invalid_igmp_type_and_multiple_igmp_joins_and_data_from_multiple_subscribers(self):
+ def test_netCondition_with_invalid_igmp_type_multiple_igmp_joins_and_data_from_multiple_subscribers(self):
self.setUp_tls()
df = defer.Deferred()
log.info('IGMP Thread status before running igmp thread %s '%(self.igmp_threads_result))
@@ -1496,7 +1477,7 @@
return df
@deferred(TEST_TIMEOUT_DELAY+50)
- def test_netCondition_with_invalid_record_type_and_multiple_igmp_joins_and_data_from_multiple_subscribers(self):
+ def test_netCondition_with_invalid_record_type_multiple_igmp_joins_and_data_from_multiple_subscribers(self):
self.setUp_tls()
df = defer.Deferred()
log.info('IGMP Thread status before running igmp thread %s '%(self.igmp_threads_result))
@@ -1548,7 +1529,7 @@
self.setUp_tls()
df = defer.Deferred()
threads = []
- clients = 1
+ clients = 100
def eap_tls_eapTlsHelloReq_pkt_delay(df):
def multiple_tls_random_delay():
randomDelay = randint(10,300)
@@ -1574,10 +1555,8 @@
time.sleep(randint(1,2))
thread.start()
threads.append(thread)
- time.sleep(300)
for thread in threads:
thread.join()
- #df.callback(0)
reactor.callLater(0, eap_tls_eapTlsHelloReq_pkt_delay, df)
return df
@@ -1586,7 +1565,7 @@
self.setUp_tls()
df = defer.Deferred()
threads = []
- clients = 1
+ clients = 100
def eap_tls_eapTlsHelloReq_pkt_delay(df):
def multiple_tls_random_delay():
randomDelay = randint(10,300)
@@ -1615,10 +1594,8 @@
time.sleep(randint(1,2))
thread.start()
threads.append(thread)
- time.sleep(300)
for thread in threads:
thread.join()
- #df.callback(0)
reactor.callLater(0, eap_tls_eapTlsHelloReq_pkt_delay, df)
return df
@@ -1627,7 +1604,7 @@
self.setUp_tls()
df = defer.Deferred()
threads = []
- clients = 1
+ clients = 100
def eap_tls_eapTlsHelloReq_pkt_delay(df):
def multiple_tls_random_delay():
randomDelay = randint(10,300)
@@ -1656,10 +1633,8 @@
time.sleep(randint(1,2))
thread.start()
threads.append(thread)
- time.sleep(300)
for thread in threads:
thread.join()
- #df.callback(0)
reactor.callLater(0, eap_tls_eapTlsHelloReq_pkt_delay, df)
return df
@@ -1668,7 +1643,7 @@
self.setUp_tls()
df = defer.Deferred()
threads = []
- clients = 1
+ clients = 100
def eap_tls_eapTlsHelloReq_pkt_delay(df):
def multiple_tls_random_delay():
randomDelay = randint(10,300)
@@ -1691,10 +1666,8 @@
time.sleep(randint(1,2))
thread.start()
threads.append(thread)
- time.sleep(300)
for thread in threads:
thread.join()
- #df.callback(0)
reactor.callLater(0, eap_tls_eapTlsHelloReq_pkt_delay, df)
return df
@@ -1703,7 +1676,7 @@
self.setUp_tls()
df = defer.Deferred()
threads = []
- clients = 1
+ clients = 100
def eap_tls_eapTlsHelloReq_pkt_delay(df):
def multiple_tls_random_delay():
randomDelay = randint(10,300)
@@ -1726,10 +1699,717 @@
time.sleep(randint(1,2))
thread.start()
threads.append(thread)
- time.sleep(300)
for thread in threads:
thread.join()
- #df.callback(0)
reactor.callLater(0, eap_tls_eapTlsHelloReq_pkt_delay, df)
return df
+ @deferred(TEST_TIMEOUT_DELAY+50)
+ def test_netCondition_in_eapol_tls_with_invalid_eapol_version_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ tls = TLSAuthTest()
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ EapolPacket.eap_tls_packets_field_value_replace(invalid_field_name= 'eapolTlsVersion')
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ EapolPacket.eap_invalid_tls_packets_info(invalid_field_name= 'eapolTlsVersion', invalid_field_value= 20)
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eapol_tls_with_invalid_eapol_tls_type_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ tls = TLSAuthTest()
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ EapolPacket.eap_tls_packets_field_value_replace(invalid_field_name= 'eapolTlsType')
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ EapolPacket.eap_invalid_tls_packets_info(invalid_field_name= 'eapolTlsType', invalid_field_value= 20)
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eapol_tls_with_invalid_eapol_type_ID_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ tls = TLSAuthTest()
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ EapolPacket.eap_tls_packets_field_value_replace(invalid_field_name= 'eapolTypeID')
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ EapolPacket.eap_invalid_tls_packets_info(invalid_field_name= 'eapolTypeID', invalid_field_value= 20)
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eapol_tls_with_invalid_eapol_response_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ tls = TLSAuthTest()
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ EapolPacket.eap_tls_packets_field_value_replace(invalid_field_name= 'eapolResponse')
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ EapolPacket.eap_invalid_tls_packets_info(invalid_field_name= 'eapolResponse', invalid_field_value= 20)
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eap_tls_with_invalid_eap_content_type_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_invalid_content_type_cb():
+ log.info('TLS authentication failed with invalid content type in TLSContentType packet')
+ tls = TLSAuthTest(fail_cb = tls_invalid_content_type_cb, invalid_content_type = 44)
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-250)
+ def test_netCondition_in_eap_tls_with_invalid_tls_version_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_invalid_eap_tls_version_in_client_auth_packet():
+ log.info('TLS authentication failed with invalid tls version field in the packet')
+ tls = TLSAuthTest(fail_cb = tls_invalid_eap_tls_version_in_client_auth_packet, version = 'TLS_2_1')
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-250)
+ def test_netCondition_in_eap_tls_with_invalid_tls_cipher_suite_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_with_invalid_tls_cipher_suite_field_in_client_auth_packet_cb():
+ log.info('TLS authentication failed with invalid tls cipher suite field in the packet')
+ tls = TLSAuthTest(fail_cb = tls_with_invalid_tls_cipher_suite_field_in_client_auth_packet_cb, cipher_suite = 'RSA_WITH_AES_512_CBC_SHA')
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eap_tls_with_id_mismatch_in_identifier_field_in_client_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_with_invalid_id_in_identifier_response_packet_cb():
+ log.info('TLS authentication failed with invalid id in identifier packet')
+ tls = TLSAuthTest(fail_cb = tls_with_invalid_id_in_identifier_response_packet_cb,
+ id_mismatch_in_identifier_response_packet = True)
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eap_tls_with_id_mismatch_in_client_hello_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_with_invalid_id_in_client_hello_packet_cb():
+ log.info('TLS authentication failed with invalid id in client hello packet')
+ tls = TLSAuthTest(fail_cb = tls_with_invalid_id_in_client_hello_packet_cb,
+ id_mismatch_in_client_hello_packet = True)
+
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eap_tls_with_invalid_client_hello_handshake_type_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_incorrect_handshake_type_client_hello_cb():
+ log.info('TLS authentication failed with incorrect handshake type in client hello packet')
+ tls = TLSAuthTest(fail_cb = tls_incorrect_handshake_type_client_hello_cb, invalid_client_hello_handshake_type=True)
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eap_tls_with_invalid_client_cert_req_handshake_auth_packet(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_incorrect_handshake_type_certificate_request_cb():
+ log.info('TLS authentication failed with incorrect handshake type in client certificate request packet')
+ tls = TLSAuthTest(fail_cb = tls_incorrect_handshake_type_certificate_request_cb, invalid_cert_req_handshake = True)
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-150)
+ def test_netCondition_in_eap_tls_with_invalid_client_key_ex_replacing_server_key_ex(self):
+ self.setUp_tls()
+ randomDelay = randint(10,300)
+ df = defer.Deferred()
+ def tls_clientkeyex_replace_with_serverkeyex_cb():
+ log.info('TLS authentication failed with client key exchange replaced with server key exchange')
+ tls = TLSAuthTest(fail_cb = tls_clientkeyex_replace_with_serverkeyex_cb,clientkeyex_replace_with_serverkeyex=True)
+ def eap_tls_eapTlsHelloReq_pkt_delay():
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ df.callback(0)
+ def eap_tls_verify(df):
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ log.info('Holding tlsHelloReq packet for a period of random delay = {} secs'.format(randomDelay))
+ t = Timer(randomDelay, eap_tls_eapTlsHelloReq_pkt_delay)
+ t.start()
+ reactor.callLater(0, eap_tls_verify, df)
+ return df
+
+ def tcpreplay_radius_server_packets_from_pcap_file(self, pcap_file_path =None, error_pkt = None):
+ #default radius server packets in path in test/netCondition/xxx.pcap file
+ if pcap_file_path and (error_pkt == None):
+ pcap_file_path = pcap_file_path
+ elif error_pkt == None:
+ pcap_file_path = "/root/test/src/test/netCondition/tls_auth_exhange_packets_Radius_server_packets_only.pcap"
+ elif error_pkt:
+ pcap_file_path = "/root/test/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap"
+ log.info('Started replaying pcap file packets on docker0 interface using tcprelay linux command')
+ time.sleep(0.4)
+ sendp(rdpcap(pcap_file_path), iface="eth0", loop=0, inter=1)
+ time.sleep(5)
+ log.info('Replayed pcap file packets on docker0 interface')
+
+ def tcpreplay_radius_server_error_packets_from_pcap_file(self, pcap_file_path =None, error_pkt = None):
+ #default radius server packets in path in test/netCondition/xxx.pcap file
+ if pcap_file_path:
+ pcap_file_path = pcap_file_path
+ else:
+ pcap_file_path = "/root/test/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap"
+ log.info('Started replaying pcap file error packets on docker0 interface using tcprelay linux command')
+ time.sleep(0.4)
+ sendp(rdpcap(pcap_file_path), iface="eth0", loop=0, inter=1)
+ time.sleep(5)
+ return 'success'
+
+ def emulating_invalid_radius_server_packets_from_pcap_file(self, pcap_file_path =None, pkt_no = None,L2 = None, L3 =None, L4=None, no_of_radius_attribute=None):
+ #default radius server packets in path in test/netCondition/xxx.pcap file
+ random_port = 1222
+ if pcap_file_path:
+ pcap_file_path = pcap_file_path
+ else:
+ pcap_file_path = "/root/test/src/test/netCondition/tls_auth_exhange_packets_Radius_server_packets_only.pcap"
+ log.info('Started corrupting tls server packet no = {}'.format(pkt_no))
+ radius_server_pkts = rdpcap(pcap_file_path)
+ error_server_pkt = radius_server_pkts[pkt_no]
+ if pkt_no == 0:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][2].value = '\n\xd8\xf0\xbbW\xd6$;\xd2s\xd5\xc5Ck\xd5\x01'
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ if pkt_no == 1:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][2].type = 79
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+
+ if pkt_no == 2:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][1].type = 79
+ error_server_pkt[3][2].len = 18
+ error_server_pkt[3][2].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ error_server_pkt[3][3].len = 18
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xff\xeb\x99\xa77\xfc\xa3\xe9'
+
+ if pkt_no == 3:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][1].type = 79
+ error_server_pkt[3][2].len = 18
+ error_server_pkt[3][2].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ error_server_pkt[3][3].len = 18
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xff\xeb\x99\xa77\xfc\xa3\xe9'
+
+ if pkt_no == 4:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][1].type = 79
+ error_server_pkt[3][2].len = 18
+ error_server_pkt[3][2].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ error_server_pkt[3][3].len = 18
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xff\xeb\x99\xa77\xfc\xa3\xe9'
+
+ if pkt_no == 5:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][1].type = 79
+ error_server_pkt[3][2].len = 18
+ error_server_pkt[3][2].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ error_server_pkt[3][3].len = 18
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xff\xeb\x99\xa77\xfc\xa3\xe9'
+
+ if pkt_no == 6:
+ if L4:
+ error_server_pkt[UDP].sport = random_port
+ error_server_pkt[UDP].dport = random_port
+
+ if no_of_radius_attribute:
+ error_server_pkt[3][1].type = 79
+ error_server_pkt[3][2].len = 18
+ error_server_pkt[3][2].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ error_server_pkt[3][3].len = 18
+ error_server_pkt[3][3].value = 'R\x1d`#R\x1cm[\xfd\xeb\xb9\xa84\xfc\xa3\xe9'
+ error_server_pkt[3][5].len = 18
+ error_server_pkt[3][5].value = 'R\x1d`#R\x1cm[\xff\xeb\x99\xa77\xfc\xa3\xe9'
+
+
+ error_server_pkt.show()
+ radius_server_pkts[pkt_no] = error_server_pkt
+ wrpcap("/root/test/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap", radius_server_pkts)
+ pcap_file_path = "/root/test/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap"
+
+ log.info('Done corrupting tls server packet no = {} send back filepath along with file name'.format(pkt_no))
+ return pcap_file_path
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_emulating_server_packets_without_radius_server_container(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ threads = []
+ threads_status = []
+ clients = 1
+ radius_image = 'cord-test/radius:candidate'
+ delay = 20
+ que = Queue.Queue()
+ def eap_tls_emulating_server_pkts(df):
+ def tls_client_packets(start):
+ time.sleep(0.2)
+ randomDelay = randint(10,300)
+ tls = TLSAuthTest(src_mac = 'random')
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ if tls.failTest == False:
+ log.info('Authentication successful for user')
+ return 'success'
+ else:
+ log.info('Authentication not successful for user')
+ return 'failed'
+ thread_client = threading.Thread(target=lambda q, arg1: q.put(tls_client_packets(arg1)), args=(que, 'start'))
+ thread_radius = threading.Thread(target = Container.pause_container, args = (radius_image,delay))
+ thread_tcpreplay = threading.Thread(target = self.tcpreplay_radius_server_packets_from_pcap_file)
+ threads.append(thread_radius)
+ threads.append(thread_client)
+ threads.append(thread_tcpreplay)
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+ while not que.empty():
+ threads_status = que.get()
+ assert_equal(threads_status, 'success')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+
+ def eap_tls_emulating_server_pkts_negative_testing(df,msg):
+ threads = []
+ threads_status = []
+ clients = 1
+ radius_image = 'cord-test/radius:candidate'
+ delay = 20
+ que = Queue.Queue()
+ def tls_client_packets(start):
+ time.sleep(0.2)
+ randomDelay = randint(10,300)
+ def tls_invalid_server_packets_scenario_cb():
+ log.info('TLS authentication failed with {}'.format(msg))
+ tls = TLSAuthTest(fail_cb = tls_invalid_server_packets_scenario_cb, src_mac = 'random')
+ tls._eapSetup()
+ tls.tlsEventTable.EVT_EAP_SETUP
+ tls._eapStart()
+ tls.tlsEventTable.EVT_EAP_START
+ tls._eapIdReq()
+ tls.tlsEventTable.EVT_EAP_ID_REQ
+ tls._eapTlsHelloReq()
+ tls._eapTlsCertReq()
+ tls._eapTlsChangeCipherSpec()
+ tls._eapTlsFinished()
+ if tls.failTest == True:
+ log.info('Authentication not successful for user')
+ return 'failed'
+ else:
+ log.info('Authentication successful for user')
+ return 'success'
+ def tcpreplay_radius_server_error_packets_from_pcap_file(pcap_file_path =None, error_pkt = None):
+ #default radius server packets in path in test/netCondition/xxx.pcap file
+ if pcap_file_path:
+ pcap_file_path = pcap_file_path
+ else:
+ pcap_file_path = "/root/test/src/test/netCondition/error_tls_auth_exhange_packets_Radius_server_packets_only.pcap"
+
+ log.info('Started replaying pcap file error packets on docker0 interface using tcprelay linux command')
+ time.sleep(0.4)
+ sendp(rdpcap(pcap_file_path), iface="eth0", loop=0, inter=1)
+ time.sleep(5)
+ return 'success'
+ thread_client = threading.Thread(target=lambda q, arg1: q.put(tls_client_packets(arg1)), args=(que, 'start'))
+ thread_radius = threading.Thread(target = Container.pause_container, args = (radius_image,delay))
+ thread_tcpreplay = threading.Thread(target = tcpreplay_radius_server_error_packets_from_pcap_file)
+ threads.append(thread_radius)
+ threads.append(thread_client)
+ threads.append(thread_tcpreplay)
+ for thread in threads:
+ thread.start()
+ for thread in threads:
+ thread.join()
+ while not que.empty():
+ threads_status = que.get()
+ assert_equal(threads_status, 'failed')
+
+
+ @deferred(TEST_TIMEOUT_DELAY-250)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_server_eapid_response_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 0, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping server eapId response')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_server_eapid_response_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 0, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid server eapId response')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_server_hello_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 1, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping server hello packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_server_hello_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 1, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid server hello packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_client_certficate_access_challenge_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 2, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping client certificate access challenge packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_client_certficate_access_challenge_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 2, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid client certificate access challenge packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_client_certficate_with_2nd_fragment_access_challenge_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 3, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping client certificate with 2nd fragment access challenge packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_client_certficate_with_2nd_fragment_access_challenge_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 3, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid client certificate for 2nd fragment access challenge packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_client_certficate_with_3rd_fragment_access_challenge_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 4, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping client certificate for 3rd fragment access challenge packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_client_certficate_with_3rd_fragment_access_challenge_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 4, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid client certificate for 3rd fragment access challenge packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_cipher_suite_request_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 5, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping cipher suite request server packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_cipher_suite_request_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 5, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid cipher suite request server packet')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_dropped_access_accept_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 6, L4 = True)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'dropping access accept server packet ')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
+ @deferred(TEST_TIMEOUT_DELAY-50)
+ def test_netCondition_in_eap_tls_with_valid_client_and_invalid_access_accept_server_packet(self):
+ self.setUp_tls()
+ df = defer.Deferred()
+ self.emulating_invalid_radius_server_packets_from_pcap_file(pkt_no = 6, no_of_radius_attribute = 1)
+ def eap_tls_emulating_server_pkts(df):
+ self.eap_tls_emulating_server_pkts_negative_testing(msg = 'invalid access accept server packet ')
+ df.callback(0)
+ reactor.callLater(0, eap_tls_emulating_server_pkts, df)
+ return df
+
diff --git a/src/test/netCondition/tls_auth_exhange_packets_Radius_server_packets_only.pcap b/src/test/netCondition/tls_auth_exhange_packets_Radius_server_packets_only.pcap
new file mode 100644
index 0000000..cd700d2
--- /dev/null
+++ b/src/test/netCondition/tls_auth_exhange_packets_Radius_server_packets_only.pcap
Binary files differ
diff --git a/src/test/scapy/fields.py b/src/test/scapy/fields.py
new file mode 100644
index 0000000..3398145
--- /dev/null
+++ b/src/test/scapy/fields.py
@@ -0,0 +1,1026 @@
+## This file is part of Scapy
+## See http://www.secdev.org/projects/scapy for more informations
+## Copyright (C) Philippe Biondi <phil@secdev.org>
+## This program is published under a GPLv2 license
+
+"""
+Fields: basic data structures that make up parts of packets.
+"""
+
+import struct,copy,socket
+from config import conf
+from volatile import *
+from data import *
+from utils import *
+from base_classes import BasePacket,Gen,Net
+
+
+############
+## Fields ##
+############
+
+class Field:
+ """For more informations on how this work, please refer to
+ http://www.secdev.org/projects/scapy/files/scapydoc.pdf
+ chapter ``Adding a New Field''"""
+ islist=0
+ holds_packets=0
+ def __init__(self, name, default, fmt="H"):
+ self.name = name
+ if fmt[0] in "@=<>!":
+ self.fmt = fmt
+ else:
+ self.fmt = "!"+fmt
+ self.default = self.any2i(None,default)
+ self.sz = struct.calcsize(self.fmt)
+ self.owners = []
+
+ def register_owner(self, cls):
+ self.owners.append(cls)
+
+ def i2len(self, pkt, x):
+ """Convert internal value to a length usable by a FieldLenField"""
+ return self.sz
+ def i2count(self, pkt, x):
+ """Convert internal value to a number of elements usable by a FieldLenField.
+ Always 1 except for list fields"""
+ return 1
+ def h2i(self, pkt, x):
+ """Convert human value to internal value"""
+ return x
+ def i2h(self, pkt, x):
+ """Convert internal value to human value"""
+ return x
+ def m2i(self, pkt, x):
+ """Convert machine value to internal value"""
+ return x
+ def i2m(self, pkt, x):
+ """Convert internal value to machine value"""
+ if x is None:
+ x = 0
+ return x
+ def any2i(self, pkt, x):
+ """Try to understand the most input values possible and make an internal value from them"""
+ return self.h2i(pkt, x)
+ def i2repr(self, pkt, x):
+ """Convert internal value to a nice representation"""
+ return repr(self.i2h(pkt,x))
+ def addfield(self, pkt, s, val):
+ """Add an internal value to a string"""
+ return s+struct.pack(self.fmt, self.i2m(pkt,val))
+ def getfield(self, pkt, s):
+ """Extract an internal value from a string"""
+ return s[self.sz:], self.m2i(pkt, struct.unpack(self.fmt, s[:self.sz])[0])
+ def do_copy(self, x):
+ if hasattr(x, "copy"):
+ return x.copy()
+ if type(x) is list:
+ x = x[:]
+ for i in xrange(len(x)):
+ if isinstance(x[i], BasePacket):
+ x[i] = x[i].copy()
+ return x
+ def __repr__(self):
+ return "<Field (%s).%s>" % (",".join(x.__name__ for x in self.owners),self.name)
+ def copy(self):
+ return copy.deepcopy(self)
+ def randval(self):
+ """Return a volatile object whose value is both random and suitable for this field"""
+ fmtt = self.fmt[-1]
+ if fmtt in "BHIQ":
+ return {"B":RandByte,"H":RandShort,"I":RandInt, "Q":RandLong}[fmtt]()
+ elif fmtt == "s":
+ if self.fmt[0] in "0123456789":
+ l = int(self.fmt[:-1])
+ else:
+ l = int(self.fmt[1:-1])
+ return RandBin(l)
+ else:
+ warning("no random class for [%s] (fmt=%s)." % (self.name, self.fmt))
+
+
+
+
+class Emph:
+ fld = ""
+ def __init__(self, fld):
+ self.fld = fld
+ def __getattr__(self, attr):
+ return getattr(self.fld,attr)
+ def __hash__(self):
+ return hash(self.fld)
+ def __eq__(self, other):
+ return self.fld == other
+
+
+class ActionField:
+ _fld = None
+ def __init__(self, fld, action_method, **kargs):
+ self._fld = fld
+ self._action_method = action_method
+ self._privdata = kargs
+ def any2i(self, pkt, val):
+ getattr(pkt, self._action_method)(val, self._fld, **self._privdata)
+ return getattr(self._fld, "any2i")(pkt, val)
+ def __getattr__(self, attr):
+ return getattr(self._fld,attr)
+
+
+class ConditionalField:
+ fld = None
+ def __init__(self, fld, cond):
+ self.fld = fld
+ self.cond = cond
+ def _evalcond(self,pkt):
+ return self.cond(pkt)
+
+ def getfield(self, pkt, s):
+ if self._evalcond(pkt):
+ return self.fld.getfield(pkt,s)
+ else:
+ return s,None
+
+ def addfield(self, pkt, s, val):
+ if self._evalcond(pkt):
+ return self.fld.addfield(pkt,s,val)
+ else:
+ return s
+ def __getattr__(self, attr):
+ return getattr(self.fld,attr)
+
+
+class PadField:
+ """Add bytes after the proxified field so that it ends at the specified
+ alignment from its begining"""
+ _fld = None
+ def __init__(self, fld, align, padwith=None):
+ self._fld = fld
+ self._align = align
+ self._padwith = padwith or ""
+
+ def padlen(self, flen):
+ return -flen%self._align
+
+ def getfield(self, pkt, s):
+ remain,val = self._fld.getfield(pkt,s)
+ padlen = self.padlen(len(s)-len(remain))
+ return remain[padlen:], val
+
+ def addfield(self, pkt, s, val):
+ sval = self._fld.addfield(pkt, "", val)
+ return s+sval+struct.pack("%is" % (self.padlen(len(sval))), self._padwith)
+
+ def __getattr__(self, attr):
+ return getattr(self._fld,attr)
+
+
+class MACField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "6s")
+ def i2m(self, pkt, x):
+ if x is None:
+ return "\0\0\0\0\0\0"
+ return mac2str(x)
+ def m2i(self, pkt, x):
+ return str2mac(x)
+ def any2i(self, pkt, x):
+ if type(x) is str and len(x) is 6:
+ x = self.m2i(pkt, x)
+ return x
+ def i2repr(self, pkt, x):
+ x = self.i2h(pkt, x)
+ if self in conf.resolve:
+ x = conf.manufdb._resolve_MAC(x)
+ return x
+ def randval(self):
+ return RandMAC()
+
+
+class IPField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "4s")
+ def h2i(self, pkt, x):
+ if type(x) is str:
+ try:
+ inet_aton(x)
+ except socket.error:
+ x = Net(x)
+ elif type(x) is list:
+ x = [self.h2i(pkt, n) for n in x]
+ return x
+ def resolve(self, x):
+ if self in conf.resolve:
+ try:
+ ret = socket.gethostbyaddr(x)[0]
+ except:
+ pass
+ else:
+ if ret:
+ return ret
+ return x
+ def i2m(self, pkt, x):
+ return inet_aton(x)
+ def m2i(self, pkt, x):
+ return inet_ntoa(x)
+ def any2i(self, pkt, x):
+ return self.h2i(pkt,x)
+ def i2repr(self, pkt, x):
+ return self.resolve(self.i2h(pkt, x))
+ def randval(self):
+ return RandIP()
+
+class SourceIPField(IPField):
+ def __init__(self, name, dstname):
+ IPField.__init__(self, name, None)
+ self.dstname = dstname
+ def i2m(self, pkt, x):
+ if x is None:
+ iff,x,gw = pkt.route()
+ if x is None:
+ x = "0.0.0.0"
+ return IPField.i2m(self, pkt, x)
+ def i2h(self, pkt, x):
+ if x is None:
+ dst=getattr(pkt,self.dstname)
+ if isinstance(dst,Gen):
+ r = map(conf.route.route, dst)
+ r.sort()
+ if r[0] != r[-1]:
+ warning("More than one possible route for %s"%repr(dst))
+ iff,x,gw = r[0]
+ else:
+ iff,x,gw = conf.route.route(dst)
+ return IPField.i2h(self, pkt, x)
+
+
+
+
+class ByteField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "B")
+
+class XByteField(ByteField):
+ def i2repr(self, pkt, x):
+ return lhex(self.i2h(pkt, x))
+
+class OByteField(ByteField):
+ def i2repr(self, pkt, x):
+ return "%03o"%self.i2h(pkt, x)
+
+class X3BytesField(XByteField):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "!I")
+ def addfield(self, pkt, s, val):
+ return s+struct.pack(self.fmt, self.i2m(pkt,val))[1:4]
+ def getfield(self, pkt, s):
+ return s[3:], self.m2i(pkt, struct.unpack(self.fmt, "\x00"+s[:3])[0])
+
+class ThreeBytesField(X3BytesField, ByteField):
+ def i2repr(self, pkt, x):
+ return ByteField.i2repr(self, pkt, x)
+
+class ShortField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "H")
+
+class SignedShortField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "h")
+
+class LEShortField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "<H")
+
+class XShortField(ShortField):
+ def i2repr(self, pkt, x):
+ return lhex(self.i2h(pkt, x))
+
+
+class IntField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "I")
+
+class SignedIntField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "i")
+ def randval(self):
+ return RandSInt()
+
+class LEIntField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "<I")
+
+class LESignedIntField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "<i")
+ def randval(self):
+ return RandSInt()
+
+class XIntField(IntField):
+ def i2repr(self, pkt, x):
+ return lhex(self.i2h(pkt, x))
+
+
+class LongField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "Q")
+
+class XLongField(LongField):
+ def i2repr(self, pkt, x):
+ return lhex(self.i2h(pkt, x))
+
+class IEEEFloatField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "f")
+
+class IEEEDoubleField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "d")
+
+
+class StrField(Field):
+ def __init__(self, name, default, fmt="H", remain=0):
+ Field.__init__(self,name,default,fmt)
+ self.remain = remain
+ def i2len(self, pkt, i):
+ return len(i)
+ def i2m(self, pkt, x):
+ if x is None:
+ x = ""
+ elif type(x) is not str:
+ x=str(x)
+ return x
+ def addfield(self, pkt, s, val):
+ return s+self.i2m(pkt, val)
+ def getfield(self, pkt, s):
+ if self.remain == 0:
+ return "",self.m2i(pkt, s)
+ else:
+ return s[-self.remain:],self.m2i(pkt, s[:-self.remain])
+ def randval(self):
+ return RandBin(RandNum(0,1200))
+
+class PacketField(StrField):
+ holds_packets=1
+ def __init__(self, name, default, cls, remain=0):
+ StrField.__init__(self, name, default, remain=remain)
+ self.cls = cls
+ def i2m(self, pkt, i):
+ return str(i)
+ def m2i(self, pkt, m):
+ return self.cls(m)
+ def getfield(self, pkt, s):
+ i = self.m2i(pkt, s)
+ remain = ""
+ if conf.padding_layer in i:
+ r = i[conf.padding_layer]
+ del(r.underlayer.payload)
+ remain = r.load
+ return remain,i
+
+class PacketLenField(PacketField):
+ def __init__(self, name, default, cls, length_from=None):
+ PacketField.__init__(self, name, default, cls)
+ self.length_from = length_from
+ def getfield(self, pkt, s):
+ l = self.length_from(pkt)
+ try:
+ i = self.m2i(pkt, s[:l])
+ except Exception:
+ if conf.debug_dissector:
+ raise
+ i = conf.raw_layer(load=s[:l])
+ return s[l:],i
+
+
+class PacketListField(PacketField):
+ islist = 1
+ def __init__(self, name, default, cls, count_from=None, length_from=None):
+ if default is None:
+ default = [] # Create a new list for each instance
+ PacketField.__init__(self, name, default, cls)
+ self.count_from = count_from
+ self.length_from = length_from
+
+
+ def any2i(self, pkt, x):
+ if type(x) is not list:
+ return [x]
+ else:
+ return x
+ def i2count(self, pkt, val):
+ if type(val) is list:
+ return len(val)
+ return 1
+ def i2len(self, pkt, val):
+ return sum( len(p) for p in val )
+ def do_copy(self, x):
+ if x is None:
+ return None
+ else:
+ return [p if isinstance(p, basestring) else p.copy() for p in x]
+ def getfield(self, pkt, s):
+ c = l = None
+ if self.length_from is not None:
+ l = self.length_from(pkt)
+ elif self.count_from is not None:
+ c = self.count_from(pkt)
+
+ lst = []
+ ret = ""
+ remain = s
+ if l is not None:
+ remain,ret = s[:l],s[l:]
+ while remain:
+ if c is not None:
+ if c <= 0:
+ break
+ c -= 1
+ try:
+ p = self.m2i(pkt,remain)
+ except Exception:
+ if conf.debug_dissector:
+ raise
+ p = conf.raw_layer(load=remain)
+ remain = ""
+ else:
+ if conf.padding_layer in p:
+ pad = p[conf.padding_layer]
+ remain = pad.load
+ del(pad.underlayer.payload)
+ else:
+ remain = ""
+ lst.append(p)
+ return remain+ret,lst
+ def addfield(self, pkt, s, val):
+ return s+"".join(map(str, val))
+
+
+class StrFixedLenField(StrField):
+ def __init__(self, name, default, length=None, length_from=None):
+ StrField.__init__(self, name, default)
+ self.length_from = length_from
+ if length is not None:
+ self.length_from = lambda pkt,length=length: length
+ def i2repr(self, pkt, v):
+ if type(v) is str:
+ v = v.rstrip("\0")
+ return repr(v)
+ def getfield(self, pkt, s):
+ l = self.length_from(pkt)
+ return s[l:], self.m2i(pkt,s[:l])
+ def addfield(self, pkt, s, val):
+ l = self.length_from(pkt)
+ return s+struct.pack("%is"%l,self.i2m(pkt, val))
+ def randval(self):
+ try:
+ l = self.length_from(None)
+ except:
+ l = RandNum(0,200)
+ return RandBin(l)
+
+class StrFixedLenEnumField(StrFixedLenField):
+ def __init__(self, name, default, length=None, enum=None, length_from=None):
+ StrFixedLenField.__init__(self, name, default, length=length, length_from=length_from)
+ self.enum = enum
+ def i2repr(self, pkt, v):
+ r = v.rstrip("\0")
+ rr = repr(r)
+ if v in self.enum:
+ rr = "%s (%s)" % (rr, self.enum[v])
+ elif r in self.enum:
+ rr = "%s (%s)" % (rr, self.enum[r])
+ return rr
+
+class NetBIOSNameField(StrFixedLenField):
+ def __init__(self, name, default, length=31):
+ StrFixedLenField.__init__(self, name, default, length)
+ def i2m(self, pkt, x):
+ l = self.length_from(pkt)/2
+ if x is None:
+ x = ""
+ x += " "*(l)
+ x = x[:l]
+ x = "".join(map(lambda x: chr(0x41+(ord(x)>>4))+chr(0x41+(ord(x)&0xf)), x))
+ x = " "+x
+ return x
+ def m2i(self, pkt, x):
+ x = x.strip("\x00").strip(" ")
+ return "".join(map(lambda x,y: chr((((ord(x)-1)&0xf)<<4)+((ord(y)-1)&0xf)), x[::2],x[1::2]))
+
+class StrLenField(StrField):
+ def __init__(self, name, default, fld=None, length_from=None):
+ StrField.__init__(self, name, default)
+ self.length_from = length_from
+ def getfield(self, pkt, s):
+ l = self.length_from(pkt)
+ return s[l:], self.m2i(pkt,s[:l])
+
+class BoundStrLenField(StrLenField):
+ def __init__(self,name, default, minlen= 0, maxlen= 255, fld=None, length_from=None):
+ StrLenField.__init__(self, name, default, fld, length_from)
+ self.minlen= minlen
+ self.maxlen= maxlen
+
+ def randval(self):
+ return RandBin(RandNum(self.minlen, self.maxlen))
+
+class FieldListField(Field):
+ islist=1
+ def __init__(self, name, default, field, length_from=None, count_from=None):
+ if default is None:
+ default = [] # Create a new list for each instance
+ self.field = field
+ Field.__init__(self, name, default)
+ self.count_from = count_from
+ self.length_from = length_from
+
+ def i2count(self, pkt, val):
+ if type(val) is list:
+ return len(val)
+ return 1
+ def i2len(self, pkt, val):
+ return sum( self.field.i2len(pkt,v) for v in val )
+
+ def i2m(self, pkt, val):
+ if val is None:
+ val = []
+ return val
+ def any2i(self, pkt, x):
+ if type(x) is not list:
+ return [self.field.any2i(pkt, x)]
+ else:
+ return map(lambda e, pkt=pkt: self.field.any2i(pkt, e), x)
+ def i2repr(self, pkt, x):
+ return map(lambda e, pkt=pkt: self.field.i2repr(pkt,e), x)
+ def addfield(self, pkt, s, val):
+ val = self.i2m(pkt, val)
+ for v in val:
+ s = self.field.addfield(pkt, s, v)
+ return s
+ def getfield(self, pkt, s):
+ c = l = None
+ if self.length_from is not None:
+ l = self.length_from(pkt)
+ elif self.count_from is not None:
+ c = self.count_from(pkt)
+
+ val = []
+ ret=""
+ if l is not None:
+ s,ret = s[:l],s[l:]
+
+ while s:
+ if c is not None:
+ if c <= 0:
+ break
+ c -= 1
+ s,v = self.field.getfield(pkt, s)
+ val.append(v)
+ return s+ret, val
+
+class FieldLenField(Field):
+ def __init__(self, name, default, length_of=None, fmt = "H", count_of=None, adjust=lambda pkt,x:x, fld=None):
+ Field.__init__(self, name, default, fmt)
+ self.length_of=length_of
+ self.count_of=count_of
+ self.adjust=adjust
+ if fld is not None:
+ FIELD_LENGTH_MANAGEMENT_DEPRECATION(self.__class__.__name__)
+ self.length_of = fld
+ def i2m(self, pkt, x):
+ if x is None:
+ if self.length_of is not None:
+ fld,fval = pkt.getfield_and_val(self.length_of)
+ f = fld.i2len(pkt, fval)
+ else:
+ fld,fval = pkt.getfield_and_val(self.count_of)
+ f = fld.i2count(pkt, fval)
+ x = self.adjust(pkt,f)
+ return x
+
+class StrNullField(StrField):
+ def addfield(self, pkt, s, val):
+ return s+self.i2m(pkt, val)+"\x00"
+ def getfield(self, pkt, s):
+ l = s.find("\x00")
+ if l < 0:
+ #XXX \x00 not found
+ return "",s
+ return s[l+1:],self.m2i(pkt, s[:l])
+ def randval(self):
+ return RandTermString(RandNum(0,1200),"\x00")
+
+class StrStopField(StrField):
+ def __init__(self, name, default, stop, additionnal=0):
+ Field.__init__(self, name, default)
+ self.stop=stop
+ self.additionnal=additionnal
+ def getfield(self, pkt, s):
+ l = s.find(self.stop)
+ if l < 0:
+ return "",s
+# raise Scapy_Exception,"StrStopField: stop value [%s] not found" %stop
+ l += len(self.stop)+self.additionnal
+ return s[l:],s[:l]
+ def randval(self):
+ return RandTermString(RandNum(0,1200),self.stop)
+
+class LenField(Field):
+ def i2m(self, pkt, x):
+ if x is None:
+ x = len(pkt.payload)
+ return x
+
+class BCDFloatField(Field):
+ def i2m(self, pkt, x):
+ return int(256*x)
+ def m2i(self, pkt, x):
+ return x/256.0
+
+class BitField(Field):
+ def __init__(self, name, default, size):
+ Field.__init__(self, name, default)
+ self.rev = size < 0
+ self.size = abs(size)
+ def reverse(self, val):
+ if self.size == 16:
+ val = socket.ntohs(val)
+ elif self.size == 32:
+ val = socket.ntohl(val)
+ return val
+
+ def addfield(self, pkt, s, val):
+ val = self.i2m(pkt, val)
+ if type(s) is tuple:
+ s,bitsdone,v = s
+ else:
+ bitsdone = 0
+ v = 0
+ if self.rev:
+ val = self.reverse(val)
+ v <<= self.size
+ v |= val & ((1L<<self.size) - 1)
+ bitsdone += self.size
+ while bitsdone >= 8:
+ bitsdone -= 8
+ s = s+struct.pack("!B", v >> bitsdone)
+ v &= (1L<<bitsdone)-1
+ if bitsdone:
+ return s,bitsdone,v
+ else:
+ return s
+ def getfield(self, pkt, s):
+ if type(s) is tuple:
+ s,bn = s
+ else:
+ bn = 0
+ # we don't want to process all the string
+ nb_bytes = (self.size+bn-1)/8 + 1
+ w = s[:nb_bytes]
+
+ # split the substring byte by byte
+ bytes = struct.unpack('!%dB' % nb_bytes , w)
+
+ b = 0L
+ for c in range(nb_bytes):
+ b |= long(bytes[c]) << (nb_bytes-c-1)*8
+
+ # get rid of high order bits
+ b &= (1L << (nb_bytes*8-bn)) - 1
+
+ # remove low order bits
+ b = b >> (nb_bytes*8 - self.size - bn)
+
+ if self.rev:
+ b = self.reverse(b)
+
+ bn += self.size
+ s = s[bn/8:]
+ bn = bn%8
+ b = self.m2i(pkt, b)
+ if bn:
+ return (s,bn),b
+ else:
+ return s,b
+ def randval(self):
+ return RandNum(0,2**self.size-1)
+
+
+class BitFieldLenField(BitField):
+ def __init__(self, name, default, size, length_of=None, count_of=None, adjust=lambda pkt,x:x):
+ BitField.__init__(self, name, default, size)
+ self.length_of=length_of
+ self.count_of=count_of
+ self.adjust=adjust
+ def i2m(self, pkt, x):
+ return FieldLenField.i2m.im_func(self, pkt, x)
+
+
+class XBitField(BitField):
+ def i2repr(self, pkt, x):
+ return lhex(self.i2h(pkt,x))
+
+
+class EnumField(Field):
+ def __init__(self, name, default, enum, fmt = "H"):
+ i2s = self.i2s = {}
+ s2i = self.s2i = {}
+ if type(enum) is list:
+ keys = xrange(len(enum))
+ else:
+ keys = enum.keys()
+ if filter(lambda x: type(x) is str, keys):
+ i2s,s2i = s2i,i2s
+ for k in keys:
+ i2s[k] = enum[k]
+ s2i[enum[k]] = k
+ Field.__init__(self, name, default, fmt)
+
+ def any2i_one(self, pkt, x):
+ if type(x) is str:
+ if (x == 'TLS_2_1') | (x == 'RSA_WITH_AES_512_CBC_SHA'):
+ x = 70
+ else:
+ x = self.s2i[x]
+ return x
+
+ def any2i_one_negative_case(self, pkt, x):
+ if type(x) is str:
+ x = 770
+ return x
+ def i2repr_one(self, pkt, x):
+ if self not in conf.noenum and not isinstance(x,VolatileValue) and x in self.i2s:
+ return self.i2s[x]
+ return repr(x)
+
+ def any2i(self, pkt, x):
+ if type(x) is list:
+ return map(lambda z,pkt=pkt:self.any2i_one(pkt,z), x)
+ else:
+ return self.any2i_one(pkt,x)
+ def i2repr(self, pkt, x):
+ if type(x) is list:
+ return map(lambda z,pkt=pkt:self.i2repr_one(pkt,z), x)
+ else:
+ return self.i2repr_one(pkt,x)
+
+#scapy_obj = EnumField()
+#scapy_obj.any2i_one = scapy_obj.any2i_one_negative_case
+
+class CharEnumField(EnumField):
+ def __init__(self, name, default, enum, fmt = "1s"):
+ EnumField.__init__(self, name, default, enum, fmt)
+ k = self.i2s.keys()
+ if k and len(k[0]) != 1:
+ self.i2s,self.s2i = self.s2i,self.i2s
+ def any2i_one(self, pkt, x):
+ if len(x) != 1:
+ x = self.s2i[x]
+ return x
+
+class BitEnumField(BitField,EnumField):
+ def __init__(self, name, default, size, enum):
+ EnumField.__init__(self, name, default, enum)
+ self.rev = size < 0
+ self.size = abs(size)
+ def any2i(self, pkt, x):
+ return EnumField.any2i(self, pkt, x)
+ def i2repr(self, pkt, x):
+ return EnumField.i2repr(self, pkt, x)
+
+class ShortEnumField(EnumField):
+ def __init__(self, name, default, enum):
+ EnumField.__init__(self, name, default, enum, "H")
+
+class LEShortEnumField(EnumField):
+ def __init__(self, name, default, enum):
+ EnumField.__init__(self, name, default, enum, "<H")
+
+class ByteEnumField(EnumField):
+ def __init__(self, name, default, enum):
+ EnumField.__init__(self, name, default, enum, "B")
+
+class IntEnumField(EnumField):
+ def __init__(self, name, default, enum):
+ EnumField.__init__(self, name, default, enum, "I")
+
+class SignedIntEnumField(EnumField):
+ def __init__(self, name, default, enum):
+ EnumField.__init__(self, name, default, enum, "i")
+ def randval(self):
+ return RandSInt()
+
+class LEIntEnumField(EnumField):
+ def __init__(self, name, default, enum):
+ EnumField.__init__(self, name, default, enum, "<I")
+
+class XShortEnumField(ShortEnumField):
+ def i2repr_one(self, pkt, x):
+ if self not in conf.noenum and not isinstance(x,VolatileValue) and x in self.i2s:
+ return self.i2s[x]
+ return lhex(x)
+
+class MultiEnumField(EnumField):
+ def __init__(self, name, default, enum, depends_on, fmt = "H"):
+
+ self.depends_on = depends_on
+ self.i2s_multi = enum
+ self.s2i_multi = {}
+ self.s2i_all = {}
+ for m in enum:
+ self.s2i_multi[m] = s2i = {}
+ for k,v in enum[m].iteritems():
+ s2i[v] = k
+ self.s2i_all[v] = k
+ Field.__init__(self, name, default, fmt)
+ def any2i_one(self, pkt, x):
+ if type (x) is str:
+ v = self.depends_on(pkt)
+ if v in self.s2i_multi:
+ s2i = self.s2i_multi[v]
+ if x in s2i:
+ return s2i[x]
+ return self.s2i_all[x]
+ return x
+ def i2repr_one(self, pkt, x):
+ v = self.depends_on(pkt)
+ if v in self.i2s_multi:
+ return self.i2s_multi[v].get(x,x)
+ return x
+
+class BitMultiEnumField(BitField,MultiEnumField):
+ def __init__(self, name, default, size, enum, depends_on):
+ MultiEnumField.__init__(self, name, default, enum)
+ self.rev = size < 0
+ self.size = abs(size)
+ def any2i(self, pkt, x):
+ return MultiEnumField.any2i(self, pkt, x)
+ def i2repr(self, pkt, x):
+ return MultiEnumField.i2repr(self, pkt, x)
+
+
+class ByteEnumKeysField(ByteEnumField):
+ """ByteEnumField that picks valid values when fuzzed. """
+ def randval(self):
+ return RandEnumKeys(self.i2s)
+
+
+class ShortEnumKeysField(ShortEnumField):
+ """ShortEnumField that picks valid values when fuzzed. """
+ def randval(self):
+ return RandEnumKeys(self.i2s)
+
+
+class IntEnumKeysField(IntEnumField):
+ """IntEnumField that picks valid values when fuzzed. """
+ def randval(self):
+ return RandEnumKeys(self.i2s)
+
+
+# Little endian long field
+class LELongField(Field):
+ def __init__(self, name, default):
+ Field.__init__(self, name, default, "<Q")
+
+# Little endian fixed length field
+class LEFieldLenField(FieldLenField):
+ def __init__(self, name, default, length_of=None, fmt = "<H", count_of=None, adjust=lambda pkt,x:x, fld=None):
+ FieldLenField.__init__(self, name, default, length_of=length_of, fmt=fmt, count_of=count_of, fld=fld, adjust=adjust)
+
+
+class FlagsField(BitField):
+ def __init__(self, name, default, size, names):
+ self.multi = type(names) is list
+ if self.multi:
+ self.names = map(lambda x:[x], names)
+ else:
+ self.names = names
+ BitField.__init__(self, name, default, size)
+ def any2i(self, pkt, x):
+ if type(x) is str:
+ if self.multi:
+ x = map(lambda y:[y], x.split("+"))
+ y = 0
+ for i in x:
+ y |= 1 << self.names.index(i)
+ x = y
+ return x
+ def i2repr(self, pkt, x):
+ if type(x) is list or type(x) is tuple:
+ return repr(x)
+ if self.multi:
+ r = []
+ else:
+ r = ""
+ i=0
+ while x:
+ if x & 1:
+ r += self.names[i]
+ i += 1
+ x >>= 1
+ if self.multi:
+ r = "+".join(r)
+ return r
+
+
+
+
+class FixedPointField(BitField):
+ def __init__(self, name, default, size, frac_bits=16):
+ self.frac_bits = frac_bits
+ BitField.__init__(self, name, default, size)
+
+ def any2i(self, pkt, val):
+ if val is None:
+ return val
+ ival = int(val)
+ fract = int( (val-ival) * 2**self.frac_bits )
+ return (ival << self.frac_bits) | fract
+
+ def i2h(self, pkt, val):
+ int_part = val >> self.frac_bits
+ frac_part = val & (1L << self.frac_bits) - 1
+ frac_part /= 2.0**self.frac_bits
+ return int_part+frac_part
+ def i2repr(self, pkt, val):
+ return self.i2h(pkt, val)
+
+
+# Base class for IPv4 and IPv6 Prefixes inspired by IPField and IP6Field.
+# Machine values are encoded in a multiple of wordbytes bytes.
+class _IPPrefixFieldBase(Field):
+ def __init__(self, name, default, wordbytes, maxbytes, aton, ntoa, length_from):
+ self.wordbytes= wordbytes
+ self.maxbytes= maxbytes
+ self.aton= aton
+ self.ntoa= ntoa
+ Field.__init__(self, name, default, "%is" % self.maxbytes)
+ self.length_from= length_from
+
+ def _numbytes(self, pfxlen):
+ wbits= self.wordbytes * 8
+ return ((pfxlen + (wbits - 1)) / wbits) * self.wordbytes
+
+ def h2i(self, pkt, x):
+ # "fc00:1::1/64" -> ("fc00:1::1", 64)
+ [pfx,pfxlen]= x.split('/')
+ self.aton(pfx) # check for validity
+ return (pfx, int(pfxlen))
+
+
+ def i2h(self, pkt, x):
+ # ("fc00:1::1", 64) -> "fc00:1::1/64"
+ (pfx,pfxlen)= x
+ return "%s/%i" % (pfx,pfxlen)
+
+ def i2m(self, pkt, x):
+ # ("fc00:1::1", 64) -> ("\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64)
+ (pfx,pfxlen)= x
+ s= self.aton(pfx);
+ return (s[:self._numbytes(pfxlen)], pfxlen)
+
+ def m2i(self, pkt, x):
+ # ("\xfc\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01", 64) -> ("fc00:1::1", 64)
+ (s,pfxlen)= x
+
+ if len(s) < self.maxbytes:
+ s= s + ("\0" * (self.maxbytes - len(s)))
+ return (self.ntoa(s), pfxlen)
+
+ def any2i(self, pkt, x):
+ if x is None:
+ return (self.ntoa("\0"*self.maxbytes), 1)
+
+ return self.h2i(pkt,x)
+
+ def i2len(self, pkt, x):
+ (_,pfxlen)= x
+ return pfxlen
+
+ def addfield(self, pkt, s, val):
+ (rawpfx,pfxlen)= self.i2m(pkt,val)
+ fmt= "!%is" % self._numbytes(pfxlen)
+ return s+struct.pack(fmt, rawpfx)
+
+ def getfield(self, pkt, s):
+ pfxlen= self.length_from(pkt)
+ numbytes= self._numbytes(pfxlen)
+ fmt= "!%is" % numbytes
+ return s[numbytes:], self.m2i(pkt, (struct.unpack(fmt, s[:numbytes])[0], pfxlen))
+
+
+class IPPrefixField(_IPPrefixFieldBase):
+ def __init__(self, name, default, wordbytes=1, length_from= None):
+ _IPPrefixFieldBase.__init__(self, name, default, wordbytes, 4, inet_aton, inet_ntoa, length_from)
+
+
+class IP6PrefixField(_IPPrefixFieldBase):
+ def __init__(self, name, default, wordbytes= 1, length_from= None):
+ _IPPrefixFieldBase.__init__(self, name, default, wordbytes, 16, lambda a: inet_pton(socket.AF_INET6, a), lambda n: inet_ntop(socket.AF_INET6, n), length_from)
+
+#sriptpath_for_dynamic_import = /usr/local/lib/python2.7/dist-packages/scapy/fileds.py
+#sys.path.append(os.path.abspath(scriptpath))
+#import fields *
+
+
diff --git a/src/test/setup/cord-test.py b/src/test/setup/cord-test.py
index ae10f9d..b182ea1 100755
--- a/src/test/setup/cord-test.py
+++ b/src/test/setup/cord-test.py
@@ -280,6 +280,8 @@
def run_tests(self):
'''Run the list of tests'''
res = 0
+ print('Modifying scapy tool files before running a test: %s' %self.tests)
+ self.modify_scapy_files_for_specific_tests()
print('Running tests: %s' %self.tests)
for t in self.tests:
test = t.split(':')[0]
@@ -301,6 +303,14 @@
return res
+ def modify_scapy_files_for_specific_tests(self):
+ name = self.name
+ container_cmd_exec = Container(name = name, image = 'cord-test/nose')
+ tty = False
+ dckr = Client()
+ cmd = 'cp test/src/test/scapy/fields.py /usr/local/lib/python2.7/dist-packages/scapy/fields.py '
+ i = container_cmd_exec.execute(cmd = cmd, tty= tty, stream = True)
+
@classmethod
def list_tests(cls, tests):
print('Listing test cases')
diff --git a/src/test/utils/CordContainer.py b/src/test/utils/CordContainer.py
index 24aa6b5..3b62f68 100644
--- a/src/test/utils/CordContainer.py
+++ b/src/test/utils/CordContainer.py
@@ -155,6 +155,22 @@
self.id = ctn['Id']
return ctn
+ @classmethod
+ def pause_container(cls, image, delay):
+ cnt_list = filter(lambda c: c['Image'] == image, cls.dckr.containers(all=True))
+ for cnt in cnt_list:
+ print('Pause the container %s' %cnt['Id'])
+ if cnt.has_key('State') and cnt['State'] == 'running':
+ cls.dckr.pause(cnt['Id'])
+ if delay != 0:
+ time.sleep(delay)
+ for cnt in cnt_list:
+ print('Unpause the container %s' %cnt['Id'])
+ cls.dckr.unpause(cnt['Id'])
+ else:
+ print('Infinity time pause the container %s' %cnt['Id'])
+ return 'success'
+
def connect_to_br(self):
index = 0
with docker_netns(self.name) as pid:
@@ -191,7 +207,7 @@
ip.link('set', index=guest, state='up')
index += 1
- def execute(self, cmd, tty = True, stream = False, shell = False):
+ def execute(self, cmd, tty = True, stream = False, shell = False, detach = True):
res = 0
if type(cmd) == str:
cmds = (cmd,)
@@ -203,7 +219,7 @@
return res
for c in cmds:
i = self.dckr.exec_create(container=self.name, cmd=c, tty = tty, privileged = True)
- self.dckr.exec_start(i['Id'], stream = stream, detach=True)
+ self.dckr.exec_start(i['Id'], stream = stream, detach=detach)
result = self.dckr.exec_inspect(i['Id'])
res += 0 if result['ExitCode'] == None else result['ExitCode']
return res
diff --git a/src/test/utils/EapolAAA.py b/src/test/utils/EapolAAA.py
index 5cf79fd..d256e0b 100644
--- a/src/test/utils/EapolAAA.py
+++ b/src/test/utils/EapolAAA.py
@@ -223,3 +223,45 @@
code, id, eaplen = unpack("!BBH", p[4:8])
return code
+ @classmethod
+ def eap_invalid_tls_packets_info(self, invalid_field_name = None, invalid_field_value = None):
+ log.info( 'Changing invalid field values in tls auth packets' )
+ if invalid_field_name == 'eapolTlsVersion':
+ global EAPOL_VERSION
+ log.info( 'Changing invalid field values in tls auth packets====== version changing' )
+ EAPOL_VERSION = invalid_field_value
+ if invalid_field_name == 'eapolTlsType':
+ global EAP_TYPE_TLS
+ log.info( 'Changing invalid field values in tls auth packets====== EAP TYPE TLS changing' )
+ EAP_TYPE_TLS = invalid_field_value
+ if invalid_field_name == 'eapolTypeID':
+ global EAP_TYPE_ID
+ log.info( 'Changing invalid field values in tls auth packets====== EAP TYPE TLS changing' )
+ EAP_TYPE_ID = invalid_field_value
+ if invalid_field_name == 'eapolResponse':
+ global EAP_RESPONSE
+ log.info( 'Changing invalid field values in tls auth packets====== EAP TYPE TLS changing' )
+ EAP_RESPONSE = invalid_field_value
+
+
+ @classmethod
+ def eap_tls_packets_field_value_replace(self, invalid_field_name = None):
+ log.info( 'Changing invalid field values in tls auth packets' )
+ if invalid_field_name == 'eapolTlsVersion':
+ global EAPOL_VERSION
+ EAPOL_VERSION = 1
+ log.info( 'Changing invalid field values in tls auth packets====== version changing' )
+ if invalid_field_name == 'eapolTlsType':
+ global EAP_TYPE_TLS
+ EAP_TYPE_TLS = 13
+ log.info( 'Changing invalid field values in tls auth packets====== version changing' )
+ if invalid_field_name == 'eapolTypeID':
+ global EAP_TYPE_ID
+ EAP_TYPE_ID = 1
+ log.info( 'Changing invalid field values in tls auth packets====== version changing' )
+ if invalid_field_name == 'eapolResponse':
+ global EAP_RESPONSE
+ EAP_RESPONSE = 2
+ log.info( 'Changing invalid field values in tls auth packets====== version changing' )
+
+
diff --git a/src/test/utils/IGMP.py b/src/test/utils/IGMP.py
index c91dfdd..7e348bc 100644
--- a/src/test/utils/IGMP.py
+++ b/src/test/utils/IGMP.py
@@ -1,12 +1,9 @@
-#
# Copyright 2016-present Ciena Corporation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
-#
# http://www.apache.org/licenses/LICENSE-2.0
-#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -20,11 +17,13 @@
IGMP_TYPE_MEMBERSHIP_QUERY = 0x11
IGMP_TYPE_V3_MEMBERSHIP_REPORT = 0x22
+IGMP_TYPE_V3_MEMBERSHIP_REPORT_NEGATIVE = 0xdd
IGMP_TYPE_V1_MEMBERSHIP_REPORT = 0x12
IGMP_TYPE_V2_MEMBERSHIP_REPORT = 0x16
IGMP_TYPE_V2_LEAVE_GROUP = 0x17
IGMP_V3_GR_TYPE_INCLUDE = 0x01
+IGMP_V3_GR_TYPE_INCLUDE_NEGATIVE = 0xaa
IGMP_V3_GR_TYPE_EXCLUDE = 0x02
IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE = 0x03
IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE = 0x04
@@ -48,6 +47,7 @@
igmp_v3_gr_types = {
IGMP_V3_GR_TYPE_INCLUDE: "Include Mode",
+ IGMP_V3_GR_TYPE_INCLUDE_NEGATIVE: "Include Mode in negative scenario",
IGMP_V3_GR_TYPE_EXCLUDE: "Exclude Mode",
IGMP_V3_GR_TYPE_CHANGE_TO_INCLUDE: "Change to Include Mode",
IGMP_V3_GR_TYPE_CHANGE_TO_EXCLUDE: "Change to Exclude Mode",
@@ -149,7 +149,7 @@
return (byte1 & 0xf0) == 0xe0
@staticmethod
- def fixup(pkt):
+ def fixup(pkt, invalid_ttl = None):
"""Fixes up the underlying IP() and Ether() headers."""
assert pkt.haslayer(IGMPv3), "This packet is not an IGMPv4 packet; cannot fix it up"
@@ -157,7 +157,10 @@
if pkt.haslayer(IP):
ip = pkt.getlayer(IP)
- ip.ttl = 1
+ if invalid_ttl is None:
+ ip.ttl = 1
+ else:
+ ip.ttl = 20
ip.proto = 2
ip.tos = 0xc0
ip.options = [IPOption_Router_Alert()]
@@ -225,7 +228,8 @@
hexdump(str(pkt))
print "after fixup:"
- IGMPv3.fixup(pkt)
+
+ IGMPv3.fixup(pkt,'no')
hexdump(str(pkt))
print "construct v3 membership report - join a single group"