[VOL-3515] multithreaded packet processor

Change-Id: Icf89075447cb93a2c2d41756cbe285d8e55a1b5d
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 43d8021..82828fa 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -17,7 +17,11 @@
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Maps;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import com.google.common.collect.Sets;
@@ -102,18 +106,12 @@
 import javax.crypto.spec.SecretKeySpec;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE_DEFAULT;
-
 /**
  * AAA application for ONOS.
  */
@@ -121,12 +119,14 @@
         OPERATIONAL_STATUS_SERVER_EVENT_GENERATION + ":Integer=" + OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT,
         OPERATIONAL_STATUS_SERVER_TIMEOUT + ":Integer=" + OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT,
         STATUS_SERVER_MODE + ":String=" + STATUS_SERVER_MODE_DEFAULT,
+        PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
 })
 public class AaaManager
         extends AbstractListenerManager<AuthenticationEvent, AuthenticationEventListener>
         implements AuthenticationService {
 
     private static final String APP_NAME = "org.opencord.aaa";
+    private static final int STATE_MACHINE_THREADS = 3;
 
     private final Logger log = getLogger(getClass());
 
@@ -170,6 +170,10 @@
     private int operationalStatusEventGenerationPeriodInSeconds = OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
     private int operationalStatusServerTimeoutInSeconds = OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
     protected String operationalStatusEvaluationMode = STATUS_SERVER_MODE_DEFAULT;
+    /**
+     * Number of threads used to process the packet.
+     */
+    protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
 
     private IdentifierManager idManager;
 
@@ -223,7 +227,6 @@
     AaaConfig newCfg;
 
     ScheduledFuture<?> scheduledStatusServerChecker;
-    ScheduledExecutorService executor;
     String configuredAaaServerAddress;
     HashSet<Byte> outPacketSet = new HashSet<>();
     HashSet<Byte> outPacketSupp = new HashSet<>();
@@ -247,6 +250,8 @@
 
     private StateMachineDelegate delegate = new InternalStateMachineDelegate();
 
+    protected ExecutorService packetProcessorExecutor;
+    protected ScheduledExecutorService serverStatusAndStateMachineTimeoutExecutor;
     /**
      * Builds an EAPOL packet based on the given parameters.
      *
@@ -283,6 +288,7 @@
 
     @Activate
     public void activate(ComponentContext context) {
+
         idManager = new IdentifierManager();
         stateMachines = Maps.newConcurrentMap();
         appId = coreService.registerApplication(APP_NAME);
@@ -320,10 +326,12 @@
         deviceService.addListener(deviceListener);
         getConfiguredAaaServerAddress();
         radiusOperationalStatusService.initialize(nasIpAddress.getAddress(), radiusSecret, impl);
-        executor = Executors.newScheduledThreadPool(3);
+        serverStatusAndStateMachineTimeoutExecutor = Executors.newScheduledThreadPool(STATE_MACHINE_THREADS,
+                           groupedThreads("onos/aaa", "aaa-machine-%d", log));
 
-        scheduledStatusServerChecker = executor.scheduleAtFixedRate(new ServerStatusChecker(), 0,
-            operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
+        scheduledStatusServerChecker = serverStatusAndStateMachineTimeoutExecutor.scheduleAtFixedRate(
+                new ServerStatusChecker(), 0,
+                operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
 
         log.info("Started");
     }
@@ -339,8 +347,8 @@
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AuthenticationEvent.class);
         scheduledStatusServerChecker.cancel(true);
-        executor.shutdown();
-
+        serverStatusAndStateMachineTimeoutExecutor.shutdown();
+        packetProcessorExecutor.shutdown();
         authenticationsConsistentMap.removeListener(mapListener);
 
         log.info("Stopped");
@@ -371,6 +379,19 @@
         } else {
             properties.put("operationalStatusEvaluationMode", operationalStatusEvaluationMode);
         }
+
+        s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+        int oldpacketProcessorThreads = packetProcessorThreads;
+        packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+                : Integer.parseInt(s.trim());
+        if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+            if (packetProcessorExecutor != null) {
+                packetProcessorExecutor.shutdown();
+            }
+            packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+                                                         groupedThreads("onos/aaa", "aaa-packet-%d", log));
+
+        }
     }
 
     protected void configureRadiusCommunication() {
@@ -468,141 +489,171 @@
      * Handles RADIUS packets.
      *
      * @param radiusPacket RADIUS packet coming from the RADIUS server.
-     * @throws DeserializationException if packet deserialization fails
      */
-    public void handleRadiusPacket(RADIUS radiusPacket) throws DeserializationException {
-        if (log.isTraceEnabled()) {
-            log.trace("Received RADIUS packet {}", radiusPacket);
-        }
-        if (radiusOperationalStatusService.isRadiusResponseForOperationalStatus(radiusPacket.getIdentifier())) {
-            radiusOperationalStatusService.handleRadiusPacketForOperationalStatus(radiusPacket);
-            return;
-        }
+    public void handleRadiusPacket(RADIUS radiusPacket) {
+        packetProcessorExecutor.execute(() -> {
+            if (log.isTraceEnabled()) {
+                log.trace("Received RADIUS packet {}", radiusPacket);
+            }
+            if (radiusOperationalStatusService.isRadiusResponseForOperationalStatus(radiusPacket.getIdentifier())) {
+                radiusOperationalStatusService.handleRadiusPacketForOperationalStatus(radiusPacket);
+                return;
+            }
 
-        RequestIdentifier identifier = RequestIdentifier.of(radiusPacket.getIdentifier());
-        String sessionId = idManager.getSessionId(identifier);
+            RequestIdentifier identifier = RequestIdentifier.of(radiusPacket.getIdentifier());
+            String sessionId = idManager.getSessionId(identifier);
 
-        if (sessionId == null) {
-            log.error("Invalid packet identifier {}, could not find corresponding "
-                    + "state machine ... exiting", radiusPacket.getIdentifier());
-            aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
-            aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
-            return;
-        }
+            if (sessionId == null) {
+                log.error("Invalid packet identifier {}, could not find corresponding "
+                                  + "state machine ... exiting", radiusPacket.getIdentifier());
+                aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
+                aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+                return;
+            }
 
-        idManager.releaseIdentifier(identifier);
-        StateMachine stateMachine = stateMachines.get(sessionId);
-        if (stateMachine == null) {
-            log.error("Invalid packet identifier {}, could not find corresponding "
-                    + "state machine ... exiting", radiusPacket.getIdentifier());
-            aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
-            aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
-            return;
-        }
+            idManager.releaseIdentifier(identifier);
+            StateMachine stateMachine = stateMachines.get(sessionId);
+            if (stateMachine == null) {
+                log.error("Invalid packet identifier {}, could not find corresponding "
+                                  + "state machine ... exiting", radiusPacket.getIdentifier());
+                aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
+                aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+                return;
+            }
 
-        //instance of StateMachine using the sessionId for updating machine stats
-        StateMachine machineStats = stateMachines.get(stateMachine.sessionId());
+            //instance of StateMachine using the sessionId for updating machine stats
+            StateMachine machineStats = stateMachines.get(stateMachine.sessionId());
 
-        EAP eapPayload;
-        Ethernet eth;
-        checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
+            EAP eapPayload;
+            Ethernet eth;
+            checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
 
-        //increasing packets and octets received from server
-        machineStats.incrementTotalPacketsReceived();
-        machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
-
-        if (outPacketSet.contains(radiusPacket.getIdentifier())) {
-            aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
-            outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
-        }
-        switch (radiusPacket.getCode()) {
-            case RADIUS.RADIUS_CODE_ACCESS_CHALLENGE:
-                log.debug("RADIUS packet: RADIUS_CODE_ACCESS_CHALLENGE");
-                RADIUSAttribute radiusAttrState = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_STATE);
-                byte[] challengeState = null;
-                if (radiusAttrState != null) {
-                    challengeState = radiusAttrState.getValue();
-                }
-                eapPayload = radiusPacket.decapsulateMessage();
-                stateMachine.setChallengeInfo(eapPayload.getIdentifier(), challengeState);
-                eth = buildEapolResponse(stateMachine.supplicantAddress(),
-                        MacAddress.valueOf(nasMacAddress),
-                        stateMachine.vlanId(),
-                        EAPOL.EAPOL_PACKET,
-                        eapPayload, stateMachine.priorityCode());
-                log.debug("Send EAP challenge response to supplicant {}", stateMachine.supplicantAddress().toString());
-                sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), true);
-                aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
-                outPacketSupp.add(eapPayload.getIdentifier());
-                aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
-                //increasing packets send to server
-                machineStats.incrementTotalPacketsSent();
-                machineStats.incrementTotalOctetSent(eapPayload.getLength());
-                break;
-            case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
-                log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
-                //send an EAPOL - Success to the supplicant.
-                byte[] eapMessageSuccess =
-                        radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE).getValue();
-                eapPayload = EAP.deserializer().deserialize(
-                        eapMessageSuccess, 0, eapMessageSuccess.length);
-                eth = buildEapolResponse(stateMachine.supplicantAddress(),
-                        MacAddress.valueOf(nasMacAddress),
-                        stateMachine.vlanId(),
-                        EAPOL.EAPOL_PACKET,
-                        eapPayload, stateMachine.priorityCode());
-                log.info("Send EAP success message to supplicant {}", stateMachine.supplicantAddress().toString());
-                sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
-                aaaStatisticsManager.getAaaStats().incrementEapolAuthSuccessTrans();
-
-                stateMachine.authorizeAccess();
-                aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
-                //increasing packets send to server
-                machineStats.incrementTotalPacketsSent();
-                machineStats.incrementTotalOctetSent(eapPayload.getLength());
-                break;
-            case RADIUS.RADIUS_CODE_ACCESS_REJECT:
-                log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
-                //send an EAPOL - Failure to the supplicant.
-                byte[] eapMessageFailure;
-                eapPayload = new EAP();
-                RADIUSAttribute radiusAttrEap = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE);
-                if (radiusAttrEap == null) {
-                    eapPayload.setCode(EAP.FAILURE);
-                    eapPayload.setIdentifier(stateMachine.challengeIdentifier());
-                    eapPayload.setLength(EAP.EAP_HDR_LEN_SUC_FAIL);
-                } else {
-                    eapMessageFailure = radiusAttrEap.getValue();
-                    eapPayload = EAP.deserializer().deserialize(
-                            eapMessageFailure, 0, eapMessageFailure.length);
-                }
-                eth = buildEapolResponse(stateMachine.supplicantAddress(),
-                        MacAddress.valueOf(nasMacAddress),
-                        stateMachine.vlanId(),
-                        EAPOL.EAPOL_PACKET,
-                        eapPayload, stateMachine.priorityCode());
-                log.warn("Send EAP failure message to supplicant {}", stateMachine.supplicantAddress().toString());
-                sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
-                aaaStatisticsManager.getAaaStats().incrementEapolauthFailureTrans();
-
-                stateMachine.denyAccess();
-                aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
-                //increasing packets send to server
-                machineStats.incrementTotalPacketsSent();
-                machineStats.incrementTotalOctetSent(eapPayload.getLength());
-                //pushing machine stats to kafka
-                AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
-                aaaSupplicantStatsManager.getMachineStatsDelegate()
-                        .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE, machineObj));
-                break;
-            default:
-                log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
-                aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
-                //increasing packets received to server
-                machineStats.incrementTotalPacketsReceived();
+            //increasing packets and octets received from server
+            machineStats.incrementTotalPacketsReceived();
+            try {
                 machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
-        }
-        aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+            } catch (DeserializationException e) {
+                log.error(e.getMessage());
+                return;
+            }
+
+            if (outPacketSet.contains(radiusPacket.getIdentifier())) {
+                aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
+                outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
+            }
+
+            switch (radiusPacket.getCode()) {
+                case RADIUS.RADIUS_CODE_ACCESS_CHALLENGE:
+                    log.debug("RADIUS packet: RADIUS_CODE_ACCESS_CHALLENGE");
+                    RADIUSAttribute radiusAttrState = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_STATE);
+                    byte[] challengeState = null;
+                    if (radiusAttrState != null) {
+                        challengeState = radiusAttrState.getValue();
+                    }
+                    try {
+                        eapPayload = radiusPacket.decapsulateMessage();
+                        eth = buildEapolResponse(stateMachine.supplicantAddress(),
+                                                 MacAddress.valueOf(nasMacAddress),
+                                                 stateMachine.vlanId(),
+                                                 EAPOL.EAPOL_PACKET,
+                                                 eapPayload, stateMachine.priorityCode());
+                        stateMachine.setChallengeInfo(eapPayload.getIdentifier(), challengeState);
+                    } catch (DeserializationException e) {
+                        log.error(e.getMessage());
+                        break;
+                    }
+                    log.debug("Send EAP challenge response to supplicant {}",
+                              stateMachine.supplicantAddress().toString());
+                    sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), true);
+                    aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
+                    outPacketSupp.add(eapPayload.getIdentifier());
+                    aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
+                    //increasing packets send to server
+                    machineStats.incrementTotalPacketsSent();
+                    machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                    break;
+                case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
+                    log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
+                    //send an EAPOL - Success to the supplicant.
+                    byte[] eapMessageSuccess =
+                            radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE).getValue();
+                    try {
+                    eapPayload = EAP.deserializer().deserialize(
+                            eapMessageSuccess, 0, eapMessageSuccess.length);
+                    } catch (DeserializationException e) {
+                        log.error(e.getMessage());
+                        break;
+                    }
+
+                    eth = buildEapolResponse(stateMachine.supplicantAddress(),
+                                             MacAddress.valueOf(nasMacAddress),
+                                             stateMachine.vlanId(),
+                                             EAPOL.EAPOL_PACKET,
+                                             eapPayload, stateMachine.priorityCode());
+                    log.info("Send EAP success message to supplicant {}", stateMachine.supplicantAddress().toString());
+                    sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
+                    aaaStatisticsManager.getAaaStats().incrementEapolAuthSuccessTrans();
+
+                    stateMachine.authorizeAccess();
+                    aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
+                    //increasing packets send to server
+                    machineStats.incrementTotalPacketsSent();
+                    machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                    break;
+                case RADIUS.RADIUS_CODE_ACCESS_REJECT:
+                    log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
+                    //send an EAPOL - Failure to the supplicant.
+                    byte[] eapMessageFailure;
+                    eapPayload = new EAP();
+                    RADIUSAttribute radiusAttrEap = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE);
+                    if (radiusAttrEap == null) {
+                        eapPayload.setCode(EAP.FAILURE);
+                        eapPayload.setIdentifier(stateMachine.challengeIdentifier());
+                        eapPayload.setLength(EAP.EAP_HDR_LEN_SUC_FAIL);
+                    } else {
+                        eapMessageFailure = radiusAttrEap.getValue();
+                        try {
+                            eapPayload = EAP.deserializer().deserialize(
+                                    eapMessageFailure, 0, eapMessageFailure.length);
+                        } catch (DeserializationException e) {
+                            log.error(e.getMessage());
+                            break;
+                        }
+                    }
+                    eth = buildEapolResponse(stateMachine.supplicantAddress(),
+                                             MacAddress.valueOf(nasMacAddress),
+                                             stateMachine.vlanId(),
+                                             EAPOL.EAPOL_PACKET,
+                                             eapPayload, stateMachine.priorityCode());
+                    log.warn("Send EAP failure message to supplicant {}", stateMachine.supplicantAddress().toString());
+                    sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
+                    aaaStatisticsManager.getAaaStats().incrementEapolauthFailureTrans();
+
+                    stateMachine.denyAccess();
+                    aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
+                    //increasing packets send to server
+                    machineStats.incrementTotalPacketsSent();
+                    machineStats.incrementTotalOctetSent(eapPayload.getLength());
+                    //pushing machine stats to kafka
+                    AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
+                    aaaSupplicantStatsManager.getMachineStatsDelegate()
+                            .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE,
+                                                                  machineObj));
+                    break;
+                default:
+                    log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
+                    aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
+                    //increasing packets received to server
+                    machineStats.incrementTotalPacketsReceived();
+                    try {
+                        machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
+                    } catch (DeserializationException e) {
+                        log.error(e.getMessage());
+                        break;
+                    }
+            }
+            aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+        });
     }
 
     /**
@@ -752,7 +803,8 @@
                 aaaStatisticsManager.getAaaStats().incrementValidEapolFramesRx();
             }
 
-            StateMachine stateMachine = stateMachines.computeIfAbsent(sessionId, id -> new StateMachine(id, executor));
+            StateMachine stateMachine = stateMachines.computeIfAbsent(sessionId, id ->
+                    new StateMachine(id, serverStatusAndStateMachineTimeoutExecutor));
             stateMachine.setEapolTypeVal(eapol.getEapolType());
 
             switch (eapol.getEapolType()) {