Distribute authentication state amongst the cluster
Change-Id: I63f36b3e2e5830241a2cb4e1c084d9e214995d2b
diff --git a/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java b/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java
index 87054c0..580e10c 100644
--- a/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java
+++ b/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java
@@ -55,6 +55,8 @@
TIMEOUT
}
+ private AuthenticationRecord authRecord;
+
/**
* Creates a new authentication event.
*
@@ -65,4 +67,25 @@
super(type, connectPoint);
}
+ /**
+ * Creates a new authentication event.
+ *
+ * @param type event type
+ * @param connectPoint port
+ * @param record information about the authentication state
+ */
+ public AuthenticationEvent(Type type, ConnectPoint connectPoint, AuthenticationRecord record) {
+ super(type, connectPoint);
+ this.authRecord = record;
+ }
+
+ /**
+ * Gets information about the authentication state.
+ *
+ * @return authentication record
+ */
+ public AuthenticationRecord authenticationRecord() {
+ return this.authRecord;
+ }
+
}
diff --git a/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java b/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java
index 5433fd9..a0d099f 100644
--- a/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java
+++ b/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java
@@ -32,6 +32,8 @@
private final String state;
+ private final long lastChanged;
+
/**
* Creates a new authentication record.
*
@@ -39,13 +41,15 @@
* @param username user name
* @param supplicantAddress MAC address of supplicant
* @param state authentication state
+ * @param lastChanged timestamp of latest activity
*/
public AuthenticationRecord(ConnectPoint supplicantConnectPoint, byte[] username,
- MacAddress supplicantAddress, String state) {
+ MacAddress supplicantAddress, String state, long lastChanged) {
this.supplicantConnectPoint = supplicantConnectPoint;
this.username = username;
this.supplicantAddress = supplicantAddress;
this.state = state;
+ this.lastChanged = lastChanged;
}
/**
@@ -83,4 +87,13 @@
public String state() {
return state;
}
+
+ /**
+ * Gets the timestamp of the last activity on this authentication.
+ *
+ * @return timestamp
+ */
+ public long lastChanged() {
+ return lastChanged;
+ }
}
diff --git a/api/src/main/java/org/opencord/aaa/AuthenticationService.java b/api/src/main/java/org/opencord/aaa/AuthenticationService.java
index c474203..3399966 100644
--- a/api/src/main/java/org/opencord/aaa/AuthenticationService.java
+++ b/api/src/main/java/org/opencord/aaa/AuthenticationService.java
@@ -19,8 +19,6 @@
import org.onlab.packet.MacAddress;
import org.onosproject.event.ListenerService;
-import java.util.List;
-
/**
* Service for interacting with authentication state.
*/
@@ -32,13 +30,13 @@
*
* @return list of authentication records
*/
- List<AuthenticationRecord> getAuthenticationRecords();
+ Iterable<AuthenticationRecord> getAuthenticationRecords();
/**
* Removes an authentication record.
*
* @param mac MAC address of record to remove
- * @return true if a record was remove, otherwise false
+ * @return true if a record was removed, otherwise false
*/
boolean removeAuthenticationStateByMac(MacAddress mac);
diff --git a/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java b/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
index cd1cea5..11958a6 100644
--- a/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
+++ b/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
@@ -17,6 +17,7 @@
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.util.Tools;
import org.onosproject.cli.AbstractShellCommand;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
@@ -40,9 +41,6 @@
AuthenticationService authService = get(AuthenticationService.class);
for (AuthenticationRecord auth : authService.getAuthenticationRecords()) {
- String deviceId = auth.supplicantConnectPoint().deviceId().toString();
- String portNum = auth.supplicantConnectPoint().port().toString();
-
String username = "UNKNOWN";
if (auth.username() != null) {
username = new String(auth.username());
@@ -61,8 +59,9 @@
subsId = subscriber.nasPortId();
}
- print("UserName=%s,CurrentState=%s,DeviceId=%s,MAC=%s,PortNumber=%s,SubscriberId=%s",
- username, auth.state(), deviceId, mac, portNum, subsId);
+ print("%s: %s, last-changed=%s, mac=%s, subid=%s, username=%s",
+ auth.supplicantConnectPoint(), auth.state(), Tools.timeAgo(auth.lastChanged()),
+ mac, subsId, username);
}
}
}
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 67cfb75..d04d0a7 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -26,6 +26,7 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.RADIUS;
import org.onlab.packet.RADIUSAttribute;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
@@ -34,6 +35,7 @@
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.ElementId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
@@ -50,6 +52,12 @@
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.opencord.aaa.AaaConfig;
import org.opencord.aaa.AaaMachineStatisticsEvent;
import org.opencord.aaa.AaaMachineStatisticsService;
@@ -80,19 +88,19 @@
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.net.InetAddress;
+import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
@@ -126,6 +134,9 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -166,6 +177,8 @@
private ConcurrentMap<String, StateMachine> stateMachines;
+ private ConsistentMap<ConnectPoint, AuthenticationRecord> authentications;
+
// NAS IP address
protected InetAddress nasIpAddress;
@@ -232,6 +245,8 @@
// Listener for config changes
private final InternalConfigListener cfgListener = new InternalConfigListener();
+ private final InternalMapEventListener mapListener = new InternalMapEventListener();
+
private StateMachineDelegate delegate = new InternalStateMachineDelegate();
/**
@@ -273,6 +288,28 @@
idManager = new IdentifierManager();
stateMachines = Maps.newConcurrentMap();
appId = coreService.registerApplication(APP_NAME);
+
+ KryoNamespace authSerializer = KryoNamespace.newBuilder()
+ .register(byte[].class)
+ .register(String.class)
+ .register(long.class)
+ .register(boolean.class)
+ .register(URI.class)
+ .register(DeviceId.class)
+ .register(ElementId.class)
+ .register(PortNumber.class)
+ .register(ConnectPoint.class)
+ .register(MacAddress.class)
+ .register(AuthenticationRecord.class)
+ .build();
+
+ authentications = storageService.<ConnectPoint, AuthenticationRecord>consistentMapBuilder()
+ .withApplicationId(appId)
+ .withName("authentications")
+ .withSerializer(Serializer.using(authSerializer))
+ .build();
+ authentications.addListener(mapListener);
+
eventDispatcher.addSink(AuthenticationEvent.class, listenerRegistry);
netCfgService.addListener(cfgListener);
netCfgService.registerConfigFactory(factory);
@@ -318,6 +355,9 @@
scheduledFuture.cancel(true);
scheduledStatusServerChecker.cancel(true);
executor.shutdown();
+
+ authentications.removeListener(mapListener);
+
log.info("Stopped");
}
@Modified
@@ -613,39 +653,28 @@
}
@Override
- public List<AuthenticationRecord> getAuthenticationRecords() {
- return stateMachines.values().stream()
- .map(this::toAuthRecord)
- .collect(Collectors.toList());
- }
-
- private AuthenticationRecord toAuthRecord(StateMachine stateMachine) {
- return new AuthenticationRecord(stateMachine.supplicantConnectpoint(),
- stateMachine.username(), stateMachine.supplicantAddress(), stateMachine.stateString());
+ public Iterable<AuthenticationRecord> getAuthenticationRecords() {
+ return authentications.asJavaMap().values();
}
@Override
public boolean removeAuthenticationStateByMac(MacAddress mac) {
- StateMachine stateMachine = null;
+ Optional<Versioned<AuthenticationRecord>> r = authentications.values().stream()
+ .filter(v -> v.value().supplicantAddress().equals(mac))
+ .findFirst();
- for (Map.Entry<String, StateMachine> e : stateMachines.entrySet()) {
- if (e.getValue().supplicantAddress() != null &&
- e.getValue().supplicantAddress().equals(mac)) {
- stateMachine = stateMachines.remove(e.getKey());
- break;
- }
+ if (r.isEmpty()) {
+ return false;
}
- if (stateMachine != null) {
- stateMachine.stop();
- return true;
- }
+ Versioned<AuthenticationRecord> removed =
+ authentications.remove(r.get().value().supplicantConnectPoint());
- return false;
+ return removed != null;
}
- public StateMachine getStateMachine(String sessionId) {
+ StateMachine getStateMachine(String sessionId) {
return stateMachines.get(sessionId);
}
@@ -719,7 +748,7 @@
DeviceId deviceId = inPacket.receivedFrom().deviceId();
PortNumber portNumber = inPacket.receivedFrom().port();
- String sessionId = deviceId.toString() + portNumber.toString();
+ String sessionId = sessionId(inPacket.receivedFrom());
EAPOL eapol = (EAPOL) ethPkt.getPayload();
if (log.isTraceEnabled()) {
log.trace("Received EAPOL packet {} in enclosing packet {} from "
@@ -749,6 +778,7 @@
case EAPOL.EAPOL_START:
log.debug("EAP packet: EAPOL_START");
stateMachine.setSupplicantConnectpoint(inPacket.receivedFrom());
+ stateMachine.setSupplicantAddress(srcMac);
stateMachine.start();
aaaStatisticsManager.getAaaStats().incrementEapolStartReqTrans();
@@ -761,7 +791,6 @@
ethPkt.getVlanID(), EAPOL.EAPOL_PACKET,
eapPayload, stateMachine.priorityCode());
- stateMachine.setSupplicantAddress(srcMac);
stateMachine.setVlanId(ethPkt.getVlanID());
log.debug("Getting EAP identity from supplicant {}", stateMachine.supplicantAddress().toString());
sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
@@ -903,6 +932,13 @@
handleStateMachineTimeout(authenticationEvent.subject());
}
+ AuthenticationRecord record = authenticationEvent.authenticationRecord();
+ if (record == null) {
+ authentications.remove(authenticationEvent.subject());
+ } else {
+ authentications.put(authenticationEvent.subject(), record);
+ }
+
post(authenticationEvent);
}
}
@@ -991,6 +1027,19 @@
}
}
+ private class InternalMapEventListener implements MapEventListener<ConnectPoint, AuthenticationRecord> {
+ @Override
+ public void event(MapEvent<ConnectPoint, AuthenticationRecord> event) {
+ if (event.type() == MapEvent.Type.REMOVE) {
+ // remove local state machine if user has requested remove
+ StateMachine sm = stateMachines.remove(sessionId(event.key()));
+ if (sm != null) {
+ sm.stop();
+ }
+ }
+ }
+ }
+
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
diff --git a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
index 744b0ae..898433b 100644
--- a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
+++ b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
@@ -20,6 +20,7 @@
import org.onlab.packet.MacAddress;
import org.onosproject.net.ConnectPoint;
import org.opencord.aaa.AuthenticationEvent;
+import org.opencord.aaa.AuthenticationRecord;
import org.opencord.aaa.StateMachineDelegate;
import org.slf4j.Logger;
@@ -506,7 +507,8 @@
states[currentState].start();
- delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.STARTED, supplicantConnectpoint));
+ delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.STARTED,
+ supplicantConnectpoint, toAuthRecord()));
// move to the next state
next(TRANSITION_START);
@@ -519,7 +521,8 @@
public void requestAccess() {
states[currentState].requestAccess();
- delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.REQUESTED, supplicantConnectpoint));
+ delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.REQUESTED,
+ supplicantConnectpoint, toAuthRecord()));
// move to the next state
next(TRANSITION_REQUEST_ACCESS);
@@ -533,7 +536,8 @@
// move to the next state
next(TRANSITION_AUTHORIZE_ACCESS);
- delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.APPROVED, supplicantConnectpoint));
+ delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.APPROVED,
+ supplicantConnectpoint, toAuthRecord()));
// Clear mapping
deleteStateMachineMapping(this);
@@ -547,7 +551,8 @@
// move to the next state
next(TRANSITION_DENY_ACCESS);
- delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.DENIED, supplicantConnectpoint));
+ delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.DENIED,
+ supplicantConnectpoint, toAuthRecord()));
// Clear mappings
deleteStateMachineMapping(this);
@@ -558,10 +563,19 @@
*/
public void logoff() {
states[currentState].logoff();
+
+ // TODO event here?
+
// move to the next state
next(TRANSITION_LOGOFF);
}
+ private AuthenticationRecord toAuthRecord() {
+ return new AuthenticationRecord(this.supplicantConnectpoint(),
+ this.username(), this.supplicantAddress(), this.stateString(),
+ this.getLastPacketReceivedTime());
+ }
+
/**
* Gets the current state.
*
@@ -720,7 +734,10 @@
(System.currentTimeMillis() - lastPacketReceivedTime) > ((cleanupTimerTimeOutInMins * 60 * 1000) / 2);
if (TIMEOUT_ELIGIBLE_STATES.contains(currentState) && noTrafficWithinThreshold) {
- delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.TIMEOUT, this.supplicantConnectpoint));
+ this.setSessionTerminateReason(SessionTerminationReasons.TIME_OUT.reason);
+
+ delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.TIMEOUT,
+ this.supplicantConnectpoint));
// If StateMachine is not eligible for cleanup yet, reschedule cleanupTimer further.
} else {
this.scheduleTimeout();
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
index d50be7d..4309e31 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
@@ -34,6 +34,7 @@
import org.onosproject.net.config.Config;
import org.onosproject.net.config.NetworkConfigRegistryAdapter;
import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.store.service.TestStorageService;
import org.opencord.aaa.AaaConfig;
import java.net.InetAddress;
@@ -121,6 +122,7 @@
aaaManager.deviceService = new TestDeviceService();
aaaManager.sadisService = new MockSadisService();
aaaManager.cfgService = new MockCfgService();
+ aaaManager.storageService = new TestStorageService();
aaaStatisticsManager = new AaaStatisticsManager();
aaaManager.radiusOperationalStatusService = new RadiusOperationalStatusManager();
TestUtils.setField(aaaStatisticsManager, "eventDispatcher", new TestEventDispatcher());
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
index 2f67ba0..93245ae 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
@@ -38,6 +38,7 @@
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.service.TestStorageService;
import org.opencord.aaa.AaaConfig;
import org.slf4j.Logger;
@@ -141,6 +142,7 @@
aaaManager.deviceService = new TestDeviceService();
aaaManager.sadisService = new MockSadisService();
aaaManager.cfgService = new MockCfgService();
+ aaaManager.storageService = new TestStorageService();
aaaStatisticsManager = new AaaStatisticsManager();
aaaSupplicantStatsManager = new AaaSupplicantMachineStatsManager();
TestUtils.setField(aaaStatisticsManager, "eventDispatcher", new TestEventDispatcher());