[VOL-3515] multithreaded packet processor

Change-Id: Icf89075447cb93a2c2d41756cbe285d8e55a1b5d
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 43d8021..82828fa 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -17,7 +17,11 @@
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import com.google.common.collect.Sets;
@@ -102,18 +106,12 @@
 import javax.crypto.spec.SecretKeySpec;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE_DEFAULT;
-
 /**
  * AAA application for ONOS.
  */
@@ -121,12 +119,14 @@
         OPERATIONAL_STATUS_SERVER_EVENT_GENERATION + ":Integer=" + OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT,
         OPERATIONAL_STATUS_SERVER_TIMEOUT + ":Integer=" + OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT,
         STATUS_SERVER_MODE + ":String=" + STATUS_SERVER_MODE_DEFAULT,
+        PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
 })
 public class AaaManager
         extends AbstractListenerManager<AuthenticationEvent, AuthenticationEventListener>
         implements AuthenticationService {
 
     private static final String APP_NAME = "org.opencord.aaa";
+    private static final int STATE_MACHINE_THREADS = 3;
 
     private final Logger log = getLogger(getClass());
 
@@ -170,6 +170,10 @@
     private int operationalStatusEventGenerationPeriodInSeconds = OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
     private int operationalStatusServerTimeoutInSeconds = OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
     protected String operationalStatusEvaluationMode = STATUS_SERVER_MODE_DEFAULT;
+    /**
+     * Number of threads used to process the packet.
+     */
+    protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
 
     private IdentifierManager idManager;
 
@@ -223,7 +227,6 @@
     AaaConfig newCfg;
 
     ScheduledFuture<?> scheduledStatusServerChecker;
-    ScheduledExecutorService executor;
     String configuredAaaServerAddress;
     HashSet<Byte> outPacketSet = new HashSet<>();
     HashSet<Byte> outPacketSupp = new HashSet<>();
@@ -247,6 +250,8 @@
 
     private StateMachineDelegate delegate = new InternalStateMachineDelegate();
 
+    protected ExecutorService packetProcessorExecutor;
+    protected ScheduledExecutorService serverStatusAndStateMachineTimeoutExecutor;
     /**
      * Builds an EAPOL packet based on the given parameters.
      *
@@ -283,6 +288,7 @@
 
     @Activate
     public void activate(ComponentContext context) {
+
         idManager = new IdentifierManager();
         stateMachines = Maps.newConcurrentMap();
         appId = coreService.registerApplication(APP_NAME);
@@ -320,10 +326,12 @@
         deviceService.addListener(deviceListener);
         getConfiguredAaaServerAddress();
         radiusOperationalStatusService.initialize(nasIpAddress.getAddress(), radiusSecret, impl);
-        executor = Executors.newScheduledThreadPool(3);
+        serverStatusAndStateMachineTimeoutExecutor = Executors.newScheduledThreadPool(STATE_MACHINE_THREADS,
+                           groupedThreads("onos/aaa", "aaa-machine-%d", log));
 
-        scheduledStatusServerChecker = executor.scheduleAtFixedRate(new ServerStatusChecker(), 0,
-            operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
+        scheduledStatusServerChecker = serverStatusAndStateMachineTimeoutExecutor.scheduleAtFixedRate(
+                new ServerStatusChecker(), 0,
+                operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
 
         log.info("Started");
     }
@@ -339,8 +347,8 @@
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AuthenticationEvent.class);
         scheduledStatusServerChecker.cancel(true);
-        executor.shutdown();
-
+        serverStatusAndStateMachineTimeoutExecutor.shutdown();
+        packetProcessorExecutor.shutdown();
         authenticationsConsistentMap.removeListener(mapListener);
 
         log.info("Stopped");
@@ -371,6 +379,19 @@
         } else {
             properties.put("operationalStatusEvaluationMode", operationalStatusEvaluationMode);
         }
+
+        s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+        int oldpacketProcessorThreads = packetProcessorThreads;
+        packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+                : Integer.parseInt(s.trim());
+        if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+            if (packetProcessorExecutor != null) {
+                packetProcessorExecutor.shutdown();
+            }
+            packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+                                                         groupedThreads("onos/aaa", "aaa-packet-%d", log));
+
+        }
     }
 
     protected void configureRadiusCommunication() {
@@ -468,141 +489,171 @@
      * Handles RADIUS packets.
      *
      * @param radiusPacket RADIUS packet coming from the RADIUS server.
-     * @throws DeserializationException if packet deserialization fails
      */
-    public void handleRadiusPacket(RADIUS radiusPacket) throws DeserializationException {
-        if (log.isTraceEnabled()) {
-            log.trace("Received RADIUS packet {}", radiusPacket);
-        }
-        if (radiusOperationalStatusService.isRadiusResponseForOperationalStatus(radiusPacket.getIdentifier())) {
-            radiusOperationalStatusService.handleRadiusPacketForOperationalStatus(radiusPacket);
-            return;
-        }
+    public void handleRadiusPacket(RADIUS radiusPacket) {
+        packetProcessorExecutor.execute(() -> {
+            if (log.isTraceEnabled()) {
+                log.trace("Received RADIUS packet {}", radiusPacket);
+            }
+            if (radiusOperationalStatusService.isRadiusResponseForOperationalStatus(radiusPacket.getIdentifier())) {
+                radiusOperationalStatusService.handleRadiusPacketForOperationalStatus(radiusPacket);
+                return;
+            }
 
-        RequestIdentifier identifier = RequestIdentifier.of(radiusPacket.getIdentifier());
-        String sessionId = idManager.getSessionId(identifier);
+            RequestIdentifier identifier = RequestIdentifier.of(radiusPacket.getIdentifier());
+            String sessionId = idManager.getSessionId(identifier);
 
-        if (sessionId == null) {
-            log.error("Invalid packet identifier {}, could not find corresponding "
-                    + "state machine ... exiting", radiusPacket.getIdentifier());
-            aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
-            aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
-            return;
-        }
+            if (sessionId == null) {
+                log.error("Invalid packet identifier {}, could not find corresponding "
+                                  + "state machine ... exiting", radiusPacket.getIdentifier());
+                aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
+                aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+                return;
+            }
 
-        idManager.releaseIdentifier(identifier);
-        StateMachine stateMachine = stateMachines.get(sessionId);
-        if (stateMachine == null) {
-            log.error("Invalid packet identifier {}, could not find corresponding "
-                    + "state machine ... exiting", radiusPacket.getIdentifier());
-            aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
-            aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
-            return;
-        }
+            idManager.releaseIdentifier(identifier);
+            StateMachine stateMachine = stateMachines.get(sessionId);
+            if (stateMachine == null) {
+                log.error("Invalid packet identifier {}, could not find corresponding "
+                                  + "state machine ... exiting", radiusPacket.getIdentifier());
+                aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
+                aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+                return;
+            }
 
-        //instance of StateMachine using the sessionId for updating machine stats
-        StateMachine machineStats = stateMachines.get(stateMachine.sessionId());
+            //instance of StateMachine using the sessionId for updating machine stats
+            StateMachine machineStats = stateMachines.get(stateMachine.sessionId());
 
-        EAP eapPayload;
-        Ethernet eth;
-        checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
+            EAP eapPayload;
+            Ethernet eth;
+            checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
 
-        //increasing packets and octets received from server
-        machineStats.incrementTotalPacketsReceived();
-        machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
-
-        if (outPacketSet.contains(radiusPacket.getIdentifier())) {
-            aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
-            outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
-        }
-        switch (radiusPacket.getCode()) {
-            case RADIUS.RADIUS_CODE_ACCESS_CHALLENGE:
-                log.debug("RADIUS packet: RADIUS_CODE_ACCESS_CHALLENGE");
-                RADIUSAttribute radiusAttrState = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_STATE);
-                byte[] challengeState = null;
-                if (radiusAttrState != null) {
-                    challengeState = radiusAttrState.getValue();
-                }
-                eapPayload = radiusPacket.decapsulateMessage();
-                stateMachine.setChallengeInfo(eapPayload.getIdentifier(), challengeState);
-                eth = buildEapolResponse(stateMachine.supplicantAddress(),
-                        MacAddress.valueOf(nasMacAddress),
-                        stateMachine.vlanId(),
-                        EAPOL.EAPOL_PACKET,
-                        eapPayload, stateMachine.priorityCode());
-                log.debug("Send EAP challenge response to supplicant {}", stateMachine.supplicantAddress().toString());
-                sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), true);
-                aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
-                outPacketSupp.add(eapPayload.getIdentifier());
-                aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
-                //increasing packets send to server
-                machineStats.incrementTotalPacketsSent();
-                machineStats.incrementTotalOctetSent(eapPayload.getLength());
-                break;
-            case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
-                log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
-                //send an EAPOL - Success to the supplicant.
-                byte[] eapMessageSuccess =
-                        radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE).getValue();
-                eapPayload = EAP.deserializer().deserialize(
-                        eapMessageSuccess, 0, eapMessageSuccess.length);
-                eth = buildEapolResponse(stateMachine.supplicantAddress(),
-                        MacAddress.valueOf(nasMacAddress),
-                        stateMachine.vlanId(),
-                        EAPOL.EAPOL_PACKET,
-                        eapPayload, stateMachine.priorityCode());
-                log.info("Send EAP success message to supplicant {}", stateMachine.supplicantAddress().toString());
-                sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
-                aaaStatisticsManager.getAaaStats().incrementEapolAuthSuccessTrans();
-
-                stateMachine.authorizeAccess();
-                aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
-                //increasing packets send to server
-                machineStats.incrementTotalPacketsSent();
-                machineStats.incrementTotalOctetSent(eapPayload.getLength());
-                break;
-            case RADIUS.RADIUS_CODE_ACCESS_REJECT:
-                log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
-                //send an EAPOL - Failure to the supplicant.
-                byte[] eapMessageFailure;
-                eapPayload = new EAP();
-                RADIUSAttribute radiusAttrEap = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE);
-                if (radiusAttrEap == null) {
-                    eapPayload.setCode(EAP.FAILURE);
-                    eapPayload.setIdentifier(stateMachine.challengeIdentifier());
-                    eapPayload.setLength(EAP.EAP_HDR_LEN_SUC_FAIL);
-                } else {
-                    eapMessageFailure = radiusAttrEap.getValue();
-                    eapPayload = EAP.deserializer().deserialize(
-                            eapMessageFailure, 0, eapMessageFailure.length);
-                }
-                eth = buildEapolResponse(stateMachine.supplicantAddress(),
-                        MacAddress.valueOf(nasMacAddress),
-                        stateMachine.vlanId(),
-                        EAPOL.EAPOL_PACKET,
-                        eapPayload, stateMachine.priorityCode());
-                log.warn("Send EAP failure message to supplicant {}", stateMachine.supplicantAddress().toString());
-                sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
-                aaaStatisticsManager.getAaaStats().incrementEapolauthFailureTrans();
-
-                stateMachine.denyAccess();
-                aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
-                //increasing packets send to server
-                machineStats.incrementTotalPacketsSent();
-                machineStats.incrementTotalOctetSent(eapPayload.getLength());
-                //pushing machine stats to kafka
-                AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
-                aaaSupplicantStatsManager.getMachineStatsDelegate()
-                        .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE, machineObj));
-                break;
-            default:
-                log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
-                aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
-                //increasing packets received to server
-                machineStats.incrementTotalPacketsReceived();
+            //increasing packets and octets received from server
+            machineStats.incrementTotalPacketsReceived();
+            try {
                 machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
-        }
-        aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+            } catch (DeserializationException e) {
+                log.error(e.getMessage());
+                return;
+            }
+
+            if (outPacketSet.contains(radiusPacket.getIdentifier())) {
+                aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
+                outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
+            }
+
+            switch (radiusPacket.getCode()) {
+                case RADIUS.RADIUS_CODE_ACCESS_CHALLENGE:
+                    log.debug("RADIUS packet: RADIUS_CODE_ACCESS_CHALLENGE");
+                    RADIUSAttribute radiusAttrState = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_STATE);
+                    byte[] challengeState = null;
+                    if (radiusAttrState != null) {
+                        challengeState = radiusAttrState.getValue();
+                    }
+                    try {
+                        eapPayload = radiusPacket.decapsulateMessage();
+                        eth = buildEapolResponse(stateMachine.supplicantAddress(),
+                                                 MacAddress.valueOf(nasMacAddress),
+                                                 stateMachine.vlanId(),
+                                                 EAPOL.EAPOL_PACKET,
+                                                 eapPayload, stateMachine.priorityCode());
+                        stateMachine.setChallengeInfo(eapPayload.getIdentifier(), challengeState);
+                    } catch (DeserializationException e) {
+                        log.error(e.getMessage());
+                        break;
+                    }
+                    log.debug("Send EAP challenge response to supplicant {}",
+                              stateMachine.supplicantAddress().toString());
+                    sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), true);
+                    aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
+                    outPacketSupp.add(eapPayload.getIdentifier());
+                    aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
+                    //increasing packets send to server
+                    machineStats.incrementTotalPacketsSent();
+                    machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                    break;
+                case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
+                    log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
+                    //send an EAPOL - Success to the supplicant.
+                    byte[] eapMessageSuccess =
+                            radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE).getValue();
+                    try {
+                    eapPayload = EAP.deserializer().deserialize(
+                            eapMessageSuccess, 0, eapMessageSuccess.length);
+                    } catch (DeserializationException e) {
+                        log.error(e.getMessage());
+                        break;
+                    }
+
+                    eth = buildEapolResponse(stateMachine.supplicantAddress(),
+                                             MacAddress.valueOf(nasMacAddress),
+                                             stateMachine.vlanId(),
+                                             EAPOL.EAPOL_PACKET,
+                                             eapPayload, stateMachine.priorityCode());
+                    log.info("Send EAP success message to supplicant {}", stateMachine.supplicantAddress().toString());
+                    sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
+                    aaaStatisticsManager.getAaaStats().incrementEapolAuthSuccessTrans();
+
+                    stateMachine.authorizeAccess();
+                    aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
+                    //increasing packets send to server
+                    machineStats.incrementTotalPacketsSent();
+                    machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                    break;
+                case RADIUS.RADIUS_CODE_ACCESS_REJECT:
+                    log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
+                    //send an EAPOL - Failure to the supplicant.
+                    byte[] eapMessageFailure;
+                    eapPayload = new EAP();
+                    RADIUSAttribute radiusAttrEap = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE);
+                    if (radiusAttrEap == null) {
+                        eapPayload.setCode(EAP.FAILURE);
+                        eapPayload.setIdentifier(stateMachine.challengeIdentifier());
+                        eapPayload.setLength(EAP.EAP_HDR_LEN_SUC_FAIL);
+                    } else {
+                        eapMessageFailure = radiusAttrEap.getValue();
+                        try {
+                            eapPayload = EAP.deserializer().deserialize(
+                                    eapMessageFailure, 0, eapMessageFailure.length);
+                        } catch (DeserializationException e) {
+                            log.error(e.getMessage());
+                            break;
+                        }
+                    }
+                    eth = buildEapolResponse(stateMachine.supplicantAddress(),
+                                             MacAddress.valueOf(nasMacAddress),
+                                             stateMachine.vlanId(),
+                                             EAPOL.EAPOL_PACKET,
+                                             eapPayload, stateMachine.priorityCode());
+                    log.warn("Send EAP failure message to supplicant {}", stateMachine.supplicantAddress().toString());
+                    sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
+                    aaaStatisticsManager.getAaaStats().incrementEapolauthFailureTrans();
+
+                    stateMachine.denyAccess();
+                    aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
+                    //increasing packets send to server
+                    machineStats.incrementTotalPacketsSent();
+                    machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                    //pushing machine stats to kafka
+                    AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
+                    aaaSupplicantStatsManager.getMachineStatsDelegate()
+                            .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE,
+                                                                  machineObj));
+                    break;
+                default:
+                    log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
+                    aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
+                    //increasing packets received to server
+                    machineStats.incrementTotalPacketsReceived();
+                    try {
+                        machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
+                    } catch (DeserializationException e) {
+                        log.error(e.getMessage());
+                        break;
+                    }
+            }
+            aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+        });
     }
 
     /**
@@ -752,7 +803,8 @@
                 aaaStatisticsManager.getAaaStats().incrementValidEapolFramesRx();
             }
 
-            StateMachine stateMachine = stateMachines.computeIfAbsent(sessionId, id -> new StateMachine(id, executor));
+            StateMachine stateMachine = stateMachines.computeIfAbsent(sessionId, id ->
+                    new StateMachine(id, serverStatusAndStateMachineTimeoutExecutor));
             stateMachine.setEapolTypeVal(eapol.getEapolType());
 
             switch (eapol.getEapolType()) {
diff --git a/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
index 34a2cc3..71826c0 100644
--- a/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
@@ -39,4 +39,7 @@
 
     public static final String STATUS_SERVER_MODE = "operationalStatusEvaluationMode";
     public static final String STATUS_SERVER_MODE_DEFAULT = "AUTO";
+
+    public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+    public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
 }
diff --git a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
index 7299818..1b5613c 100644
--- a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
+++ b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
@@ -121,7 +121,7 @@
     private State[] states = {new Idle(), new Started(), new Pending(), new Authorized(), new Unauthorized() };
 
     // Cleanup Timer instance created for this session
-    private ScheduledExecutorService executor;
+    private ScheduledExecutorService timeoutExecutor;
     private java.util.concurrent.ScheduledFuture<?> cleanupTimer = null;
 
     // TimeStamp of last EAPOL or RADIUS message received.
@@ -171,7 +171,7 @@
     }
 
     private void scheduleTimeout() {
-        cleanupTimer = executor.schedule(this::timeout, cleanupTimerTimeOutInMins, TimeUnit.MINUTES);
+        cleanupTimer = timeoutExecutor.schedule(this::timeout, cleanupTimerTimeOutInMins, TimeUnit.MINUTES);
     }
 
     public static void unsetDelegate(StateMachineDelegate delegate) {
@@ -210,7 +210,7 @@
     public StateMachine(String sessionId, ScheduledExecutorService executor) {
         log.info("Creating a new state machine for {}", sessionId);
         this.sessionId = sessionId;
-        this.executor = executor;
+        this.timeoutExecutor = executor;
     }
 
     /**
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
index 839d0b1..110ebec 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
@@ -40,15 +40,20 @@
 import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.aaa.AaaConfig;
+import org.slf4j.Logger;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkState;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.onosproject.net.intent.TestTools.assertAfter;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Set of tests of the ONOS application component.
@@ -57,6 +62,8 @@
 
     static final String BAD_IP_ADDRESS = "198.51.100.0";
 
+    private final Logger log = getLogger(getClass());
+
     private AaaManager aaaManager;
     private AaaStatisticsManager aaaStatisticsManager;
 
@@ -144,6 +151,8 @@
         aaaStatisticsManager.activate(new MockComponentContext());
         aaaManager.aaaStatisticsManager = this.aaaStatisticsManager;
         TestUtils.setField(aaaManager, "eventDispatcher", new TestEventDispatcher());
+
+
         aaaManager.activate(new AaaTestBase.MockComponentContext());
     }
 
@@ -206,7 +215,7 @@
         sendPacket(identifyPacket);
 
         RADIUS radiusIdentifyPacket = (RADIUS) fetchPacket(1);
-        byte reqId = radiusIdentifyPacket.getIdentifier();
+        AtomicReference<Byte> reqId = new AtomicReference<>(radiusIdentifyPacket.getIdentifier());
 
         checkRadiusPacketFromSupplicant(radiusIdentifyPacket);
 
@@ -230,47 +239,61 @@
 
         RADIUS radiusCodeAccessChallengePacket =
                 constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_CHALLENGE, EAP.ATTR_MD5,
-                reqId, aaaManager.radiusSecret.getBytes());
+                                                         reqId.get(), aaaManager.radiusSecret.getBytes());
         aaaManager.handleRadiusPacket(radiusCodeAccessChallengePacket);
 
-        Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
-        checkRadiusPacket(aaaManager, radiusChallengeMD5Packet, EAP.ATTR_MD5);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
+            checkRadiusPacket(aaaManager, radiusChallengeMD5Packet, EAP.ATTR_MD5);
 
-        // (4) Supplicant MD5 response
+            // (4) Supplicant MD5 response
+            try {
+                Ethernet md5RadiusPacket =
+                        constructSupplicantIdentifyPacket(stateMachine,
+                                                          EAP.ATTR_MD5,
+                                                          stateMachine.challengeIdentifier(),
+                                                          radiusChallengeMD5Packet);
+                sendPacket(md5RadiusPacket);
+            } catch (Exception e) {
+                log.error(e.getMessage());
+                fail();
+            }
+        });
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
+            try {
+                checkRadiusPacketFromSupplicant(responseMd5RadiusPacket);
+            } catch (Exception e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            //assertThat(responseMd5RadiusPacket.getIdentifier(), is((byte) 9));
+            reqId.set(responseMd5RadiusPacket.getIdentifier());
+            assertThat(responseMd5RadiusPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
 
-        Ethernet md5RadiusPacket =
-                constructSupplicantIdentifyPacket(stateMachine,
-                                                  EAP.ATTR_MD5,
-                                                  stateMachine.challengeIdentifier(),
-                                                  radiusChallengeMD5Packet);
-        sendPacket(md5RadiusPacket);
+            //  State machine should be in pending state
 
-        RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
 
-        checkRadiusPacketFromSupplicant(responseMd5RadiusPacket);
-        //assertThat(responseMd5RadiusPacket.getIdentifier(), is((byte) 9));
-        reqId = responseMd5RadiusPacket.getIdentifier();
-        assertThat(responseMd5RadiusPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
+            // (5) RADIUS Success
 
-        //  State machine should be in pending state
+            RADIUS successPacket =
+                    constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_ACCEPT,
+                                                             EAP.SUCCESS, reqId.get(),
+                                                             aaaManager.radiusSecret.getBytes());
+            aaaManager.handleRadiusPacket((successPacket));
+        });
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet supplicantSuccessPacket = (Ethernet) fetchPacket(4);
 
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
+            checkRadiusPacket(aaaManager, supplicantSuccessPacket, EAP.SUCCESS);
 
-        // (5) RADIUS Success
+            //  State machine should be in authorized state
 
-        RADIUS successPacket =
-                constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_ACCEPT,
-                EAP.SUCCESS, reqId, aaaManager.radiusSecret.getBytes());
-        aaaManager.handleRadiusPacket((successPacket));
-        Ethernet supplicantSuccessPacket = (Ethernet) fetchPacket(4);
-
-        checkRadiusPacket(aaaManager, supplicantSuccessPacket, EAP.SUCCESS);
-
-        //  State machine should be in authorized state
-
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_AUTHORIZED));
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_AUTHORIZED));
+        });
     }
 
     @Test
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
index adfa05f..c4f390c 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
@@ -49,12 +49,14 @@
 import java.nio.ByteBuffer;
 
 import static com.google.common.base.Preconditions.checkState;
+import static junit.framework.TestCase.fail;
 import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.notNullValue;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
 import static org.onosproject.net.NetTestTools.connectPoint;
+import static org.onosproject.net.intent.TestTools.assertAfter;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -65,6 +67,7 @@
     static final String BAD_IP_ADDRESS = "198.51.100.0";
     static final Long ZERO = (long) 0;
 
+
     private final Logger log = getLogger(getClass());
     private AaaManager aaaManager;
     private AaaStatisticsManager aaaStatisticsManager;
@@ -159,6 +162,7 @@
         aaaManager.aaaStatisticsManager = this.aaaStatisticsManager;
         aaaManager.aaaSupplicantStatsManager = this.aaaSupplicantStatsManager;
         TestUtils.setField(aaaManager, "eventDispatcher", new TestEventDispatcher());
+
         aaaManager.activate(new AaaTestBase.MockComponentContext());
     }
 
@@ -239,56 +243,74 @@
                 aaaManager.radiusSecret.getBytes());
         aaaManager.handleRadiusPacket(radiusCodeAccessChallengePacket);
 
-        Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
-        checkRadiusPacket(aaaManager, radiusChallengeMD5Packet, EAP.ATTR_MD5);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
+            checkRadiusPacket(aaaManager, radiusChallengeMD5Packet, EAP.ATTR_MD5);
+            // (4) Supplicant MD5 response
 
-        // (4) Supplicant MD5 response
+            Ethernet md5RadiusPacket = null;
+            try {
+                md5RadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_MD5,
+                                      stateMachine.challengeIdentifier(), radiusChallengeMD5Packet);
+            } catch (Exception e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            sendPacket(md5RadiusPacket);
+        });
 
-        Ethernet md5RadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_MD5,
-                stateMachine.challengeIdentifier(), radiusChallengeMD5Packet);
-        sendPacket(md5RadiusPacket);
 
-        RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
 
-        checkRadiusPacketFromSupplicant(responseMd5RadiusPacket);
-        //assertThat(responseMd5RadiusPacket.getIdentifier(), is((byte) 9));
-        assertThat(responseMd5RadiusPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
+            try {
+                checkRadiusPacketFromSupplicant(responseMd5RadiusPacket);
+            } catch (DeserializationException e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            //assertThat(responseMd5RadiusPacket.getIdentifier(), is((byte) 9));
+            assertThat(responseMd5RadiusPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
 
-        // State machine should be in pending state
+            // State machine should be in pending state
 
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
 
-        // (5) RADIUS Success
+            // (5) RADIUS Success
 
-        RADIUS successPacket =
-                constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_ACCEPT, EAP.SUCCESS,
-                        responseMd5RadiusPacket.getIdentifier(), aaaManager.radiusSecret.getBytes());
-        aaaManager.handleRadiusPacket((successPacket));
-        Ethernet supplicantSuccessPacket = (Ethernet) fetchPacket(4);
+            RADIUS successPacket =
+                    constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_ACCEPT, EAP.SUCCESS,
+                                                             responseMd5RadiusPacket.getIdentifier(),
+                                                             aaaManager.radiusSecret.getBytes());
+            aaaManager.handleRadiusPacket((successPacket));
+        });
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet supplicantSuccessPacket = (Ethernet) fetchPacket(4);
 
-        checkRadiusPacket(aaaManager, supplicantSuccessPacket, EAP.SUCCESS);
+            checkRadiusPacket(aaaManager, supplicantSuccessPacket, EAP.SUCCESS);
 
-        // State machine should be in authorized state
+            // State machine should be in authorized state
 
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_AUTHORIZED));
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_AUTHORIZED));
 
-        //Check for increase of Stats
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolAuthSuccessTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolStartReqTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getValidEapolFramesRx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolFramesTx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getReqEapFramesTx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getRequestIdFramesTx(), ZERO);
-        assertEquals(aaaStatisticsManager.getAaaStats().getInvalidBodyLength(), ZERO);
-        assertEquals(aaaStatisticsManager.getAaaStats().getInvalidPktType(), ZERO);
-        assertEquals(aaaStatisticsManager.getAaaStats().getPendingResSupp(), ZERO);
-       // Counts the aaa Statistics count and displays in the log
-       countAaaStatistics();
+            //Check for increase of Stats
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolAuthSuccessTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolStartReqTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getValidEapolFramesRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolFramesTx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getReqEapFramesTx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getRequestIdFramesTx(), ZERO);
+            assertEquals(aaaStatisticsManager.getAaaStats().getInvalidBodyLength(), ZERO);
+            assertEquals(aaaStatisticsManager.getAaaStats().getInvalidPktType(), ZERO);
+            assertEquals(aaaStatisticsManager.getAaaStats().getPendingResSupp(), ZERO);
+            // Counts the aaa Statistics count and displays in the log
+            countAaaStatistics();
+        });
 
     }
 
@@ -304,62 +326,85 @@
         // (1) Supplicant start up
         Ethernet startPacket = constructSupplicantStartPacket();
         sendPacket(startPacket);
-
-        Ethernet responsePacket = (Ethernet) fetchPacket(0);
-        checkRadiusPacket(aaaManager, responsePacket, EAP.ATTR_IDENTITY);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet responsePacket = (Ethernet) fetchPacket(0);
+            checkRadiusPacket(aaaManager, responsePacket, EAP.ATTR_IDENTITY);
+        });
 
         // (2) Supplicant identify
 
         Ethernet identifyPacket = constructSupplicantIdentifyPacket(null, EAP.ATTR_IDENTITY, (byte) 1, null);
         sendPacket(identifyPacket);
 
-        RADIUS radiusIdentifyPacket = (RADIUS) fetchPacket(1);
-        checkRadiusPacketFromSupplicant(radiusIdentifyPacket);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            RADIUS radiusIdentifyPacket = (RADIUS) fetchPacket(1);
+            try {
+                checkRadiusPacketFromSupplicant(radiusIdentifyPacket);
+            } catch (DeserializationException e) {
+                log.error(e.getMessage());
+                fail();
+            }
 
-        assertThat(radiusIdentifyPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
-        assertThat(new String(radiusIdentifyPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_USERNAME).getValue()),
-                is("testuser"));
-        IpAddress nasIp = IpAddress.valueOf(IpAddress.Version.INET,
-                  radiusIdentifyPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_NAS_IP).getValue());
-        assertThat(nasIp.toString(), is(aaaManager.nasIpAddress.getHostAddress()));
+            assertThat(radiusIdentifyPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
+            assertThat(new String(radiusIdentifyPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_USERNAME).getValue()),
+                       is("testuser"));
+            IpAddress nasIp = IpAddress.valueOf(IpAddress.Version.INET,
+                                                radiusIdentifyPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_NAS_IP)
+                                                        .getValue());
+            assertThat(nasIp.toString(), is(aaaManager.nasIpAddress.getHostAddress()));
 
-        // State machine should have been created by now
+            // State machine should have been created by now
 
-        StateMachine stateMachine = aaaManager.getStateMachine(SESSION_ID);
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
+            StateMachine stateMachine = aaaManager.getStateMachine(SESSION_ID);
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
 
-        // (3) RADIUS NAK challenge
+            // (3) RADIUS NAK challenge
 
-       RADIUS radiusCodeAccessChallengePacket = constructRadiusCodeAccessChallengePacket(
-                  RADIUS.RADIUS_CODE_ACCESS_CHALLENGE, EAP.ATTR_NAK, radiusIdentifyPacket.getIdentifier(),
-                  aaaManager.radiusSecret.getBytes());
-        aaaManager.handleRadiusPacket(radiusCodeAccessChallengePacket);
+            RADIUS radiusCodeAccessChallengePacket = constructRadiusCodeAccessChallengePacket(
+                    RADIUS.RADIUS_CODE_ACCESS_CHALLENGE, EAP.ATTR_NAK, radiusIdentifyPacket.getIdentifier(),
+                    aaaManager.radiusSecret.getBytes());
+            aaaManager.handleRadiusPacket(radiusCodeAccessChallengePacket);
+        });
 
-        Ethernet radiusChallengeNakPacket = (Ethernet) fetchPacket(2);
-        checkRadiusPacket(aaaManager, radiusChallengeNakPacket, EAP.ATTR_NAK);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet radiusChallengeNakPacket = (Ethernet) fetchPacket(2);
+            checkRadiusPacket(aaaManager, radiusChallengeNakPacket, EAP.ATTR_NAK);
 
-        // (4) Supplicant NAK response
+            // (4) Supplicant NAK response
+            StateMachine stateMachine = aaaManager.getStateMachine(SESSION_ID);
+            assertThat(stateMachine, notNullValue());
+            Ethernet nakRadiusPacket = null;
+            try {
+                nakRadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_NAK,
+                                                                    stateMachine.challengeIdentifier(),
+                                                                    radiusChallengeNakPacket);
+            } catch (Exception e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            sendPacket(nakRadiusPacket);
+        });
 
-       Ethernet nakRadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_NAK,
-           stateMachine.challengeIdentifier(), radiusChallengeNakPacket);
-       sendPacket(nakRadiusPacket);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            //Statistic Should be increased.
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getPendingResSupp(), ZERO);
 
-       //Statistic Should be increased.
-       assertNotEquals(aaaStatisticsManager.getAaaStats().getPendingResSupp(), ZERO);
-
-       //Test if packet with invalid eapol type recieved.
-       // Supplicant ASF Packet
-       Ethernet invalidPacket = constructSupplicantAsfPacket();
-       sendPacket(invalidPacket);
-       //Statistic Should be increased.
-       assertNotEquals(aaaStatisticsManager.getAaaStats().getInvalidPktType(), ZERO);
-       assertNotEquals(aaaStatisticsManager.getAaaStats().getAccessRequestsTx(), ZERO);
-       assertNotEquals(aaaStatisticsManager.getAaaStats().getChallengeResponsesRx(), ZERO);
-       assertNotEquals(aaaStatisticsManager.getAaaStats().getDroppedResponsesRx(), ZERO);
-       assertNotEquals(aaaStatisticsManager.getAaaStats().getInvalidValidatorsRx(), ZERO);
-       // Counts the aaa Statistics count and displays in the log
-       countAaaStatistics();
+            //Test if packet with invalid eapol type recieved.
+            // Supplicant ASF Packet
+            Ethernet invalidPacket = constructSupplicantAsfPacket();
+            sendPacket(invalidPacket);
+        });
+        //Statistic Should be increased.
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getInvalidPktType(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getAccessRequestsTx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getChallengeResponsesRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getDroppedResponsesRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getInvalidValidatorsRx(), ZERO);
+            // Counts the aaa Statistics count and displays in the log
+            countAaaStatistics();
+        });
     }
 
 
@@ -393,48 +438,64 @@
                 aaaManager.radiusSecret.getBytes());
         aaaManager.handleRadiusPacket(radiusCodeAccessChallengePacket);
 
-        Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
 
-        // (4) Supplicant MD5 response
+            // (4) Supplicant MD5 response
 
-        Ethernet md5RadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_MD5,
-                stateMachine.challengeIdentifier(), radiusChallengeMD5Packet);
-        sendPacket(md5RadiusPacket);
-        aaaManager.aaaStatisticsManager.calculatePacketRoundtripTime();
+            Ethernet md5RadiusPacket = null;
+            try {
+                md5RadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_MD5,
+                                                                    stateMachine.challengeIdentifier(),
+                                                                    radiusChallengeMD5Packet);
+            } catch (Exception e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            sendPacket(md5RadiusPacket);
+        });
 
-        RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            aaaManager.aaaStatisticsManager.calculatePacketRoundtripTime();
 
-        // (5) RADIUS Rejected
+            RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
 
-        RADIUS rejectedPacket =
-                constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_REJECT, EAP.FAILURE,
-                        responseMd5RadiusPacket.getIdentifier(), aaaManager.radiusSecret.getBytes());
-        aaaManager.handleRadiusPacket((rejectedPacket));
-        Ethernet supplicantRejectedPacket = (Ethernet) fetchPacket(4);
+            // (5) RADIUS Rejected
 
-        checkRadiusPacket(aaaManager, supplicantRejectedPacket, EAP.FAILURE);
+            RADIUS rejectedPacket =
+                    constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_REJECT, EAP.FAILURE,
+                                                             responseMd5RadiusPacket.getIdentifier(),
+                                                             aaaManager.radiusSecret.getBytes());
+            aaaManager.handleRadiusPacket((rejectedPacket));
+        });
 
-        // State machine should be in unauthorized state
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_UNAUTHORIZED));
-        // Calculated the total round trip time
-        aaaManager.aaaStatisticsManager.calculatePacketRoundtripTime();
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet supplicantRejectedPacket = (Ethernet) fetchPacket(4);
 
-        //Check for increase of Stats
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolAuthFailureTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolStartReqTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak(), ZERO);
+            checkRadiusPacket(aaaManager, supplicantRejectedPacket, EAP.FAILURE);
 
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getAccessRequestsTx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getChallengeResponsesRx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getDroppedResponsesRx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getInvalidValidatorsRx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getRejectResponsesRx(), ZERO);
+            // State machine should be in unauthorized state
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_UNAUTHORIZED));
+            // Calculated the total round trip time
+            aaaManager.aaaStatisticsManager.calculatePacketRoundtripTime();
 
-        // Counts the aaa Statistics count
-        countAaaStatistics();
+            //Check for increase of Stats
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolAuthFailureTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolStartReqTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak(), ZERO);
+
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getAccessRequestsTx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getChallengeResponsesRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getDroppedResponsesRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getInvalidValidatorsRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getRejectResponsesRx(), ZERO);
+
+            // Counts the aaa Statistics count
+            countAaaStatistics();
+        });
 
     }
 
@@ -532,59 +593,79 @@
                 RADIUS.RADIUS_CODE_ACCESS_CHALLENGE, EAP.ATTR_MD5,
                 radiusIdentifyPacket.getIdentifier(), aaaManager.radiusSecret.getBytes());
         aaaManager.handleRadiusPacket(radiusCodeAccessChallengePacket);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
+            checkRadiusPacket(aaaManager, radiusChallengeMD5Packet, EAP.ATTR_MD5);
 
-        Ethernet radiusChallengeMD5Packet = (Ethernet) fetchPacket(2);
-        checkRadiusPacket(aaaManager, radiusChallengeMD5Packet, EAP.ATTR_MD5);
+            // (4) Supplicant MD5 response
 
-        // (4) Supplicant MD5 response
+            Ethernet md5RadiusPacket = null;
+            try {
+                md5RadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_MD5,
+                                                                    stateMachine.challengeIdentifier(),
+                                                                    radiusChallengeMD5Packet);
+            } catch (Exception e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            sendPacket(md5RadiusPacket);
+        });
 
-        Ethernet md5RadiusPacket = constructSupplicantIdentifyPacket(stateMachine, EAP.ATTR_MD5,
-                stateMachine.challengeIdentifier(), radiusChallengeMD5Packet);
-        sendPacket(md5RadiusPacket);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
 
-        RADIUS responseMd5RadiusPacket = (RADIUS) fetchPacket(3);
+            try {
+                checkRadiusPacketFromSupplicant(responseMd5RadiusPacket);
+            } catch (DeserializationException e) {
+                log.error(e.getMessage());
+                fail();
+            }
+            assertThat(responseMd5RadiusPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
 
-        checkRadiusPacketFromSupplicant(responseMd5RadiusPacket);
-        assertThat(responseMd5RadiusPacket.getCode(), is(RADIUS.RADIUS_CODE_ACCESS_REQUEST));
+            // State machine should be in pending state
 
-        // State machine should be in pending state
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
 
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_PENDING));
+            // (5) RADIUS Success
 
-        // (5) RADIUS Success
+            RADIUS successPacket =
+                    constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_ACCEPT, EAP.SUCCESS,
+                                                             responseMd5RadiusPacket.getIdentifier(),
+                                                             aaaManager.radiusSecret.getBytes());
+            aaaManager.handleRadiusPacket((successPacket));
+        });
 
-        RADIUS successPacket =
-                constructRadiusCodeAccessChallengePacket(RADIUS.RADIUS_CODE_ACCESS_ACCEPT, EAP.SUCCESS,
-                        responseMd5RadiusPacket.getIdentifier(), aaaManager.radiusSecret.getBytes());
-        aaaManager.handleRadiusPacket((successPacket));
-        Ethernet supplicantSuccessPacket = (Ethernet) fetchPacket(4);
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            Ethernet supplicantSuccessPacket = (Ethernet) fetchPacket(4);
 
-        checkRadiusPacket(aaaManager, supplicantSuccessPacket, EAP.SUCCESS);
+            checkRadiusPacket(aaaManager, supplicantSuccessPacket, EAP.SUCCESS);
 
-        // State machine should be in authorized state
+            // State machine should be in authorized state
 
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_AUTHORIZED));
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_AUTHORIZED));
 
-        // Supplicant trigger EAP Logoff
-        Ethernet logoffPacket = constructSupplicantLogoffPacket();
-        sendPacket(logoffPacket);
+            // Supplicant trigger EAP Logoff
+            Ethernet logoffPacket = constructSupplicantLogoffPacket();
+            sendPacket(logoffPacket);
+        });
+        assertAfter(ASSERTION_DELAY, ASSERTION_LENGTH, () -> {
+            // State machine should be in logoff state
+            assertThat(stateMachine, notNullValue());
+            assertThat(stateMachine.state(), is(StateMachine.STATE_IDLE));
 
-        // State machine should be in logoff state
-        assertThat(stateMachine, notNullValue());
-        assertThat(stateMachine.state(), is(StateMachine.STATE_IDLE));
-
-        //Check for increase in stats
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolLogoffRx(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolAuthSuccessTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolStartReqTrans(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap(), ZERO);
-        assertNotEquals(aaaStatisticsManager.getAaaStats().getAuthStateIdle(), ZERO);
-        // Counts the aaa Statistics count
-        countAaaStatistics();
+            //Check for increase in stats
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolLogoffRx(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolAuthSuccessTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolStartReqTrans(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap(), ZERO);
+            assertNotEquals(aaaStatisticsManager.getAaaStats().getAuthStateIdle(), ZERO);
+            // Counts the aaa Statistics count
+            countAaaStatistics();
+        });
 
     }
 
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaTestBase.java b/app/src/test/java/org/opencord/aaa/impl/AaaTestBase.java
index 30d2356..fdd23e9 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaTestBase.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaTestBase.java
@@ -73,6 +73,11 @@
  */
 public class AaaTestBase {
 
+    //Time in ms to wait before checking packets
+    static final int ASSERTION_DELAY = 250;
+    //Duration in ms of the assertion for packets
+    static final int ASSERTION_LENGTH = 500;
+
     MacAddress clientMac = MacAddress.valueOf("1a:1a:1a:1a:1a:1a");
     MacAddress serverMac = MacAddress.valueOf("2a:2a:2a:2a:2a:2a");