Introduced eapol receive to use scapy with packet lambda filters.
Changed the tls/pap tests to use the eapol scapy receive functions.
diff --git a/src/test/utils/EapPAP.py b/src/test/utils/EapPAP.py
index 48b75a8..a128f05 100644
--- a/src/test/utils/EapPAP.py
+++ b/src/test/utils/EapPAP.py
@@ -4,11 +4,12 @@
import nosePAPAuthHolder as PAPAuthHolder
from socket import *
from struct import *
-import scapy
+from scapy.all import *
from nose.tools import *
from CordTestBase import CordTester
PAP_USER = "raduser"
PAP_PASSWD = "radpass"
+log.setLevel('INFO')
class PAPAuthTest(EapolPacket, CordTester):
@@ -49,37 +50,37 @@
self.nextEvent = self.PAPEventTable.EVT_EAP_ID_REQ
def _eapIdReq(self):
- print 'Inside EAP ID Req'
- p = self.eapol_recv()
- code, pkt_id, eaplen = unpack("!BBH", p[0:4])
- print "Code %d, id %d, len %d" %(code, pkt_id, eaplen)
- assert_equal(code, EAP_REQUEST)
- reqtype = unpack("!B", p[4:5])[0]
- reqdata = p[5:4+eaplen]
- assert_equal(reqtype, EAP_TYPE_ID)
- print "<====== Send EAP Response with identity = %s ================>" % PAP_USER
- self.eapol_id_req(pkt_id, PAP_USER)
+ log.info( 'Inside EAP ID Req' )
+ def eapol_cb(pkt):
+ log.info('Got EAPOL packet with type id and code request')
+ log.info('Packet code: %d, type: %d, id: %s', pkt[EAP].code, pkt[EAP].type, pkt[EAP].id)
+ log.info("<====== Send EAP Response with identity = %s ================>" % PAP_USER)
+ self.eapol_id_req(pkt[EAP].id, PAP_USER)
+
+ self.eapol_scapy_recv(cb = eapol_cb,
+ lfilter = lambda pkt: pkt[EAP].type == EAP.TYPE_ID and pkt[EAP].code == EAP.REQUEST)
self.nextEvent = self.PAPEventTable.EVT_EAP_PAP_USER_REQ
def _eapPAPUserReq(self):
- print 'UserReq Inside Challenge'
- p = self.eapol_recv()
- code, pkt_id, eaplen = unpack("!BBH", p[0:4])
- print "Code %d, id %d, len %d" %(code, pkt_id, eaplen)
- assert_equal(code, EAP_REQUEST)
- reqtype = unpack("!B", p[4:5])[0]
- reqdata = p[5:4+eaplen]
- assert_equal(reqtype, EAP_TYPE_TLS)
- print "<====== Send EAP Response with Password = %s ================>" % PAP_PASSWD
- self.eapol_id_req(pkt_id, PAP_PASSWD)
+ log.info('UserReq Inside Challenge')
+ def eapol_cb(pkt):
+ log.info('Got EAPOL packet with type id and code request')
+ log.info('Packet code: %d, id: %s', pkt[EAP].code, pkt[EAP].id)
+ log.info('Send EAP Response for id %s with Password = %s' %(pkt[EAP].id, PAP_PASSWD) )
+ self.eapol_id_req(pkt[EAP].id, PAP_PASSWD)
+
+ self.eapol_scapy_recv(cb = eapol_cb,
+ lfilter = lambda pkt: pkt[EAP].type == EAP_TYPE_TLS and pkt[EAP].code == EAP.REQUEST)
#self.nextEvent = self.PAPEventTable.EVT_EAP_PAP_PASSWD_REQ
- self.nextEvent = self.PAPEventTable.EVT_EAP_PAP_DONE
-
+ self.nextEvent = None
+
def _eapPAPPassReq(self):
- print 'PassReq Inside Challenge'
- p = self.eapol_recv()
- code, pkt_id, eaplen = unpack("!BBH", p[0:4])
- print "Code %d, id %d, len %d" %(code, pkt_id, eaplen)
- assert_equal(code, EAP_SUCCESS)
+ log.info('PassReq Inside Challenge')
+ def eapol_cb(pkt):
+ log.info('Got EAPOL packet with type id and code request')
+ log.info('Packet code: %d, type: %d', pkt[EAP].code, pkt[EAP].type)
+
+ self.eapol_scapy_recv(cb = eapol_cb,
+ lfilter = lambda pkt: pkt[EAP].code == EAP.SUCCESS)
self.nextEvent = self.PAPEventTable.EVT_EAP_PAP_DONE