[VOL-3515] multithreaded packet processor
Change-Id: Icf89075447cb93a2c2d41756cbe285d8e55a1b5d
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 43d8021..82828fa 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -17,7 +17,11 @@
import com.google.common.base.Strings;
import com.google.common.collect.Maps;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.util.Tools.groupedThreads;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.*;
import static org.slf4j.LoggerFactory.getLogger;
import com.google.common.collect.Sets;
@@ -102,18 +106,12 @@
import javax.crypto.spec.SecretKeySpec;
import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE_DEFAULT;
-
/**
* AAA application for ONOS.
*/
@@ -121,12 +119,14 @@
OPERATIONAL_STATUS_SERVER_EVENT_GENERATION + ":Integer=" + OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT,
OPERATIONAL_STATUS_SERVER_TIMEOUT + ":Integer=" + OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT,
STATUS_SERVER_MODE + ":String=" + STATUS_SERVER_MODE_DEFAULT,
+ PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
})
public class AaaManager
extends AbstractListenerManager<AuthenticationEvent, AuthenticationEventListener>
implements AuthenticationService {
private static final String APP_NAME = "org.opencord.aaa";
+ private static final int STATE_MACHINE_THREADS = 3;
private final Logger log = getLogger(getClass());
@@ -170,6 +170,10 @@
private int operationalStatusEventGenerationPeriodInSeconds = OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
private int operationalStatusServerTimeoutInSeconds = OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
protected String operationalStatusEvaluationMode = STATUS_SERVER_MODE_DEFAULT;
+ /**
+ * Number of threads used to process the packet.
+ */
+ protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
private IdentifierManager idManager;
@@ -223,7 +227,6 @@
AaaConfig newCfg;
ScheduledFuture<?> scheduledStatusServerChecker;
- ScheduledExecutorService executor;
String configuredAaaServerAddress;
HashSet<Byte> outPacketSet = new HashSet<>();
HashSet<Byte> outPacketSupp = new HashSet<>();
@@ -247,6 +250,8 @@
private StateMachineDelegate delegate = new InternalStateMachineDelegate();
+ protected ExecutorService packetProcessorExecutor;
+ protected ScheduledExecutorService serverStatusAndStateMachineTimeoutExecutor;
/**
* Builds an EAPOL packet based on the given parameters.
*
@@ -283,6 +288,7 @@
@Activate
public void activate(ComponentContext context) {
+
idManager = new IdentifierManager();
stateMachines = Maps.newConcurrentMap();
appId = coreService.registerApplication(APP_NAME);
@@ -320,10 +326,12 @@
deviceService.addListener(deviceListener);
getConfiguredAaaServerAddress();
radiusOperationalStatusService.initialize(nasIpAddress.getAddress(), radiusSecret, impl);
- executor = Executors.newScheduledThreadPool(3);
+ serverStatusAndStateMachineTimeoutExecutor = Executors.newScheduledThreadPool(STATE_MACHINE_THREADS,
+ groupedThreads("onos/aaa", "aaa-machine-%d", log));
- scheduledStatusServerChecker = executor.scheduleAtFixedRate(new ServerStatusChecker(), 0,
- operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
+ scheduledStatusServerChecker = serverStatusAndStateMachineTimeoutExecutor.scheduleAtFixedRate(
+ new ServerStatusChecker(), 0,
+ operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
log.info("Started");
}
@@ -339,8 +347,8 @@
deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(AuthenticationEvent.class);
scheduledStatusServerChecker.cancel(true);
- executor.shutdown();
-
+ serverStatusAndStateMachineTimeoutExecutor.shutdown();
+ packetProcessorExecutor.shutdown();
authenticationsConsistentMap.removeListener(mapListener);
log.info("Stopped");
@@ -371,6 +379,19 @@
} else {
properties.put("operationalStatusEvaluationMode", operationalStatusEvaluationMode);
}
+
+ s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+ int oldpacketProcessorThreads = packetProcessorThreads;
+ packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+ : Integer.parseInt(s.trim());
+ if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+ if (packetProcessorExecutor != null) {
+ packetProcessorExecutor.shutdown();
+ }
+ packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+ groupedThreads("onos/aaa", "aaa-packet-%d", log));
+
+ }
}
protected void configureRadiusCommunication() {
@@ -468,141 +489,171 @@
* Handles RADIUS packets.
*
* @param radiusPacket RADIUS packet coming from the RADIUS server.
- * @throws DeserializationException if packet deserialization fails
*/
- public void handleRadiusPacket(RADIUS radiusPacket) throws DeserializationException {
- if (log.isTraceEnabled()) {
- log.trace("Received RADIUS packet {}", radiusPacket);
- }
- if (radiusOperationalStatusService.isRadiusResponseForOperationalStatus(radiusPacket.getIdentifier())) {
- radiusOperationalStatusService.handleRadiusPacketForOperationalStatus(radiusPacket);
- return;
- }
+ public void handleRadiusPacket(RADIUS radiusPacket) {
+ packetProcessorExecutor.execute(() -> {
+ if (log.isTraceEnabled()) {
+ log.trace("Received RADIUS packet {}", radiusPacket);
+ }
+ if (radiusOperationalStatusService.isRadiusResponseForOperationalStatus(radiusPacket.getIdentifier())) {
+ radiusOperationalStatusService.handleRadiusPacketForOperationalStatus(radiusPacket);
+ return;
+ }
- RequestIdentifier identifier = RequestIdentifier.of(radiusPacket.getIdentifier());
- String sessionId = idManager.getSessionId(identifier);
+ RequestIdentifier identifier = RequestIdentifier.of(radiusPacket.getIdentifier());
+ String sessionId = idManager.getSessionId(identifier);
- if (sessionId == null) {
- log.error("Invalid packet identifier {}, could not find corresponding "
- + "state machine ... exiting", radiusPacket.getIdentifier());
- aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
- aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
- return;
- }
+ if (sessionId == null) {
+ log.error("Invalid packet identifier {}, could not find corresponding "
+ + "state machine ... exiting", radiusPacket.getIdentifier());
+ aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
+ aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+ return;
+ }
- idManager.releaseIdentifier(identifier);
- StateMachine stateMachine = stateMachines.get(sessionId);
- if (stateMachine == null) {
- log.error("Invalid packet identifier {}, could not find corresponding "
- + "state machine ... exiting", radiusPacket.getIdentifier());
- aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
- aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
- return;
- }
+ idManager.releaseIdentifier(identifier);
+ StateMachine stateMachine = stateMachines.get(sessionId);
+ if (stateMachine == null) {
+ log.error("Invalid packet identifier {}, could not find corresponding "
+ + "state machine ... exiting", radiusPacket.getIdentifier());
+ aaaStatisticsManager.getAaaStats().incrementNumberOfSessionsExpired();
+ aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+ return;
+ }
- //instance of StateMachine using the sessionId for updating machine stats
- StateMachine machineStats = stateMachines.get(stateMachine.sessionId());
+ //instance of StateMachine using the sessionId for updating machine stats
+ StateMachine machineStats = stateMachines.get(stateMachine.sessionId());
- EAP eapPayload;
- Ethernet eth;
- checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
+ EAP eapPayload;
+ Ethernet eth;
+ checkReceivedPacketForValidValidator(radiusPacket, stateMachine.requestAuthenticator());
- //increasing packets and octets received from server
- machineStats.incrementTotalPacketsReceived();
- machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
-
- if (outPacketSet.contains(radiusPacket.getIdentifier())) {
- aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
- outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
- }
- switch (radiusPacket.getCode()) {
- case RADIUS.RADIUS_CODE_ACCESS_CHALLENGE:
- log.debug("RADIUS packet: RADIUS_CODE_ACCESS_CHALLENGE");
- RADIUSAttribute radiusAttrState = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_STATE);
- byte[] challengeState = null;
- if (radiusAttrState != null) {
- challengeState = radiusAttrState.getValue();
- }
- eapPayload = radiusPacket.decapsulateMessage();
- stateMachine.setChallengeInfo(eapPayload.getIdentifier(), challengeState);
- eth = buildEapolResponse(stateMachine.supplicantAddress(),
- MacAddress.valueOf(nasMacAddress),
- stateMachine.vlanId(),
- EAPOL.EAPOL_PACKET,
- eapPayload, stateMachine.priorityCode());
- log.debug("Send EAP challenge response to supplicant {}", stateMachine.supplicantAddress().toString());
- sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), true);
- aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
- outPacketSupp.add(eapPayload.getIdentifier());
- aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
- //increasing packets send to server
- machineStats.incrementTotalPacketsSent();
- machineStats.incrementTotalOctetSent(eapPayload.getLength());
- break;
- case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
- log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
- //send an EAPOL - Success to the supplicant.
- byte[] eapMessageSuccess =
- radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE).getValue();
- eapPayload = EAP.deserializer().deserialize(
- eapMessageSuccess, 0, eapMessageSuccess.length);
- eth = buildEapolResponse(stateMachine.supplicantAddress(),
- MacAddress.valueOf(nasMacAddress),
- stateMachine.vlanId(),
- EAPOL.EAPOL_PACKET,
- eapPayload, stateMachine.priorityCode());
- log.info("Send EAP success message to supplicant {}", stateMachine.supplicantAddress().toString());
- sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
- aaaStatisticsManager.getAaaStats().incrementEapolAuthSuccessTrans();
-
- stateMachine.authorizeAccess();
- aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
- //increasing packets send to server
- machineStats.incrementTotalPacketsSent();
- machineStats.incrementTotalOctetSent(eapPayload.getLength());
- break;
- case RADIUS.RADIUS_CODE_ACCESS_REJECT:
- log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
- //send an EAPOL - Failure to the supplicant.
- byte[] eapMessageFailure;
- eapPayload = new EAP();
- RADIUSAttribute radiusAttrEap = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE);
- if (radiusAttrEap == null) {
- eapPayload.setCode(EAP.FAILURE);
- eapPayload.setIdentifier(stateMachine.challengeIdentifier());
- eapPayload.setLength(EAP.EAP_HDR_LEN_SUC_FAIL);
- } else {
- eapMessageFailure = radiusAttrEap.getValue();
- eapPayload = EAP.deserializer().deserialize(
- eapMessageFailure, 0, eapMessageFailure.length);
- }
- eth = buildEapolResponse(stateMachine.supplicantAddress(),
- MacAddress.valueOf(nasMacAddress),
- stateMachine.vlanId(),
- EAPOL.EAPOL_PACKET,
- eapPayload, stateMachine.priorityCode());
- log.warn("Send EAP failure message to supplicant {}", stateMachine.supplicantAddress().toString());
- sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
- aaaStatisticsManager.getAaaStats().incrementEapolauthFailureTrans();
-
- stateMachine.denyAccess();
- aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
- //increasing packets send to server
- machineStats.incrementTotalPacketsSent();
- machineStats.incrementTotalOctetSent(eapPayload.getLength());
- //pushing machine stats to kafka
- AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
- aaaSupplicantStatsManager.getMachineStatsDelegate()
- .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE, machineObj));
- break;
- default:
- log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
- aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
- //increasing packets received to server
- machineStats.incrementTotalPacketsReceived();
+ //increasing packets and octets received from server
+ machineStats.incrementTotalPacketsReceived();
+ try {
machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
- }
- aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+ } catch (DeserializationException e) {
+ log.error(e.getMessage());
+ return;
+ }
+
+ if (outPacketSet.contains(radiusPacket.getIdentifier())) {
+ aaaStatisticsManager.getAaaStats().increaseOrDecreasePendingRequests(false);
+ outPacketSet.remove(new Byte(radiusPacket.getIdentifier()));
+ }
+
+ switch (radiusPacket.getCode()) {
+ case RADIUS.RADIUS_CODE_ACCESS_CHALLENGE:
+ log.debug("RADIUS packet: RADIUS_CODE_ACCESS_CHALLENGE");
+ RADIUSAttribute radiusAttrState = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_STATE);
+ byte[] challengeState = null;
+ if (radiusAttrState != null) {
+ challengeState = radiusAttrState.getValue();
+ }
+ try {
+ eapPayload = radiusPacket.decapsulateMessage();
+ eth = buildEapolResponse(stateMachine.supplicantAddress(),
+ MacAddress.valueOf(nasMacAddress),
+ stateMachine.vlanId(),
+ EAPOL.EAPOL_PACKET,
+ eapPayload, stateMachine.priorityCode());
+ stateMachine.setChallengeInfo(eapPayload.getIdentifier(), challengeState);
+ } catch (DeserializationException e) {
+ log.error(e.getMessage());
+ break;
+ }
+ log.debug("Send EAP challenge response to supplicant {}",
+ stateMachine.supplicantAddress().toString());
+ sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), true);
+ aaaStatisticsManager.getAaaStats().increaseChallengeResponsesRx();
+ outPacketSupp.add(eapPayload.getIdentifier());
+ aaaStatisticsManager.getAaaStats().incrementPendingResSupp();
+ //increasing packets send to server
+ machineStats.incrementTotalPacketsSent();
+ machineStats.incrementTotalOctetSent(eapPayload.getLength());
+ break;
+ case RADIUS.RADIUS_CODE_ACCESS_ACCEPT:
+ log.debug("RADIUS packet: RADIUS_CODE_ACCESS_ACCEPT");
+ //send an EAPOL - Success to the supplicant.
+ byte[] eapMessageSuccess =
+ radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE).getValue();
+ try {
+ eapPayload = EAP.deserializer().deserialize(
+ eapMessageSuccess, 0, eapMessageSuccess.length);
+ } catch (DeserializationException e) {
+ log.error(e.getMessage());
+ break;
+ }
+
+ eth = buildEapolResponse(stateMachine.supplicantAddress(),
+ MacAddress.valueOf(nasMacAddress),
+ stateMachine.vlanId(),
+ EAPOL.EAPOL_PACKET,
+ eapPayload, stateMachine.priorityCode());
+ log.info("Send EAP success message to supplicant {}", stateMachine.supplicantAddress().toString());
+ sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
+ aaaStatisticsManager.getAaaStats().incrementEapolAuthSuccessTrans();
+
+ stateMachine.authorizeAccess();
+ aaaStatisticsManager.getAaaStats().increaseAcceptResponsesRx();
+ //increasing packets send to server
+ machineStats.incrementTotalPacketsSent();
+ machineStats.incrementTotalOctetSent(eapPayload.getLength());
+ break;
+ case RADIUS.RADIUS_CODE_ACCESS_REJECT:
+ log.debug("RADIUS packet: RADIUS_CODE_ACCESS_REJECT");
+ //send an EAPOL - Failure to the supplicant.
+ byte[] eapMessageFailure;
+ eapPayload = new EAP();
+ RADIUSAttribute radiusAttrEap = radiusPacket.getAttribute(RADIUSAttribute.RADIUS_ATTR_EAP_MESSAGE);
+ if (radiusAttrEap == null) {
+ eapPayload.setCode(EAP.FAILURE);
+ eapPayload.setIdentifier(stateMachine.challengeIdentifier());
+ eapPayload.setLength(EAP.EAP_HDR_LEN_SUC_FAIL);
+ } else {
+ eapMessageFailure = radiusAttrEap.getValue();
+ try {
+ eapPayload = EAP.deserializer().deserialize(
+ eapMessageFailure, 0, eapMessageFailure.length);
+ } catch (DeserializationException e) {
+ log.error(e.getMessage());
+ break;
+ }
+ }
+ eth = buildEapolResponse(stateMachine.supplicantAddress(),
+ MacAddress.valueOf(nasMacAddress),
+ stateMachine.vlanId(),
+ EAPOL.EAPOL_PACKET,
+ eapPayload, stateMachine.priorityCode());
+ log.warn("Send EAP failure message to supplicant {}", stateMachine.supplicantAddress().toString());
+ sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
+ aaaStatisticsManager.getAaaStats().incrementEapolauthFailureTrans();
+
+ stateMachine.denyAccess();
+ aaaStatisticsManager.getAaaStats().increaseRejectResponsesRx();
+ //increasing packets send to server
+ machineStats.incrementTotalPacketsSent();
+ machineStats.incrementTotalOctetSent(eapPayload.getLength());
+ //pushing machine stats to kafka
+ AaaSupplicantMachineStats machineObj = aaaSupplicantStatsManager.getSupplicantStats(machineStats);
+ aaaSupplicantStatsManager.getMachineStatsDelegate()
+ .notify(new AaaMachineStatisticsEvent(AaaMachineStatisticsEvent.Type.STATS_UPDATE,
+ machineObj));
+ break;
+ default:
+ log.warn("Unknown RADIUS message received with code: {}", radiusPacket.getCode());
+ aaaStatisticsManager.getAaaStats().increaseUnknownTypeRx();
+ //increasing packets received to server
+ machineStats.incrementTotalPacketsReceived();
+ try {
+ machineStats.incrementTotalOctetReceived(radiusPacket.decapsulateMessage().getLength());
+ } catch (DeserializationException e) {
+ log.error(e.getMessage());
+ break;
+ }
+ }
+ aaaStatisticsManager.getAaaStats().countDroppedResponsesRx();
+ });
}
/**
@@ -752,7 +803,8 @@
aaaStatisticsManager.getAaaStats().incrementValidEapolFramesRx();
}
- StateMachine stateMachine = stateMachines.computeIfAbsent(sessionId, id -> new StateMachine(id, executor));
+ StateMachine stateMachine = stateMachines.computeIfAbsent(sessionId, id ->
+ new StateMachine(id, serverStatusAndStateMachineTimeoutExecutor));
stateMachine.setEapolTypeVal(eapol.getEapolType());
switch (eapol.getEapolType()) {
diff --git a/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
index 34a2cc3..71826c0 100644
--- a/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
@@ -39,4 +39,7 @@
public static final String STATUS_SERVER_MODE = "operationalStatusEvaluationMode";
public static final String STATUS_SERVER_MODE_DEFAULT = "AUTO";
+
+ public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+ public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
}
diff --git a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
index 7299818..1b5613c 100644
--- a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
+++ b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
@@ -121,7 +121,7 @@
private State[] states = {new Idle(), new Started(), new Pending(), new Authorized(), new Unauthorized() };
// Cleanup Timer instance created for this session
- private ScheduledExecutorService executor;
+ private ScheduledExecutorService timeoutExecutor;
private java.util.concurrent.ScheduledFuture<?> cleanupTimer = null;
// TimeStamp of last EAPOL or RADIUS message received.
@@ -171,7 +171,7 @@
}
private void scheduleTimeout() {
- cleanupTimer = executor.schedule(this::timeout, cleanupTimerTimeOutInMins, TimeUnit.MINUTES);
+ cleanupTimer = timeoutExecutor.schedule(this::timeout, cleanupTimerTimeOutInMins, TimeUnit.MINUTES);
}
public static void unsetDelegate(StateMachineDelegate delegate) {
@@ -210,7 +210,7 @@
public StateMachine(String sessionId, ScheduledExecutorService executor) {
log.info("Creating a new state machine for {}", sessionId);
this.sessionId = sessionId;
- this.executor = executor;
+ this.timeoutExecutor = executor;
}
/**