Distribute authentication state amongst the cluster
Change-Id: I63f36b3e2e5830241a2cb4e1c084d9e214995d2b
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 67cfb75..d04d0a7 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -26,6 +26,7 @@
import org.onlab.packet.MacAddress;
import org.onlab.packet.RADIUS;
import org.onlab.packet.RADIUSAttribute;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.core.ApplicationId;
@@ -34,6 +35,7 @@
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.ElementId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.config.ConfigFactory;
import org.onosproject.net.config.NetworkConfigEvent;
@@ -50,6 +52,12 @@
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.opencord.aaa.AaaConfig;
import org.opencord.aaa.AaaMachineStatisticsEvent;
import org.opencord.aaa.AaaMachineStatisticsService;
@@ -80,19 +88,19 @@
import javax.crypto.Mac;
import javax.crypto.spec.SecretKeySpec;
import java.net.InetAddress;
+import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Dictionary;
import java.util.HashSet;
import java.util.List;
-import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
@@ -126,6 +134,9 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected PacketService packetService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -166,6 +177,8 @@
private ConcurrentMap<String, StateMachine> stateMachines;
+ private ConsistentMap<ConnectPoint, AuthenticationRecord> authentications;
+
// NAS IP address
protected InetAddress nasIpAddress;
@@ -232,6 +245,8 @@
// Listener for config changes
private final InternalConfigListener cfgListener = new InternalConfigListener();
+ private final InternalMapEventListener mapListener = new InternalMapEventListener();
+
private StateMachineDelegate delegate = new InternalStateMachineDelegate();
/**
@@ -273,6 +288,28 @@
idManager = new IdentifierManager();
stateMachines = Maps.newConcurrentMap();
appId = coreService.registerApplication(APP_NAME);
+
+ KryoNamespace authSerializer = KryoNamespace.newBuilder()
+ .register(byte[].class)
+ .register(String.class)
+ .register(long.class)
+ .register(boolean.class)
+ .register(URI.class)
+ .register(DeviceId.class)
+ .register(ElementId.class)
+ .register(PortNumber.class)
+ .register(ConnectPoint.class)
+ .register(MacAddress.class)
+ .register(AuthenticationRecord.class)
+ .build();
+
+ authentications = storageService.<ConnectPoint, AuthenticationRecord>consistentMapBuilder()
+ .withApplicationId(appId)
+ .withName("authentications")
+ .withSerializer(Serializer.using(authSerializer))
+ .build();
+ authentications.addListener(mapListener);
+
eventDispatcher.addSink(AuthenticationEvent.class, listenerRegistry);
netCfgService.addListener(cfgListener);
netCfgService.registerConfigFactory(factory);
@@ -318,6 +355,9 @@
scheduledFuture.cancel(true);
scheduledStatusServerChecker.cancel(true);
executor.shutdown();
+
+ authentications.removeListener(mapListener);
+
log.info("Stopped");
}
@Modified
@@ -613,39 +653,28 @@
}
@Override
- public List<AuthenticationRecord> getAuthenticationRecords() {
- return stateMachines.values().stream()
- .map(this::toAuthRecord)
- .collect(Collectors.toList());
- }
-
- private AuthenticationRecord toAuthRecord(StateMachine stateMachine) {
- return new AuthenticationRecord(stateMachine.supplicantConnectpoint(),
- stateMachine.username(), stateMachine.supplicantAddress(), stateMachine.stateString());
+ public Iterable<AuthenticationRecord> getAuthenticationRecords() {
+ return authentications.asJavaMap().values();
}
@Override
public boolean removeAuthenticationStateByMac(MacAddress mac) {
- StateMachine stateMachine = null;
+ Optional<Versioned<AuthenticationRecord>> r = authentications.values().stream()
+ .filter(v -> v.value().supplicantAddress().equals(mac))
+ .findFirst();
- for (Map.Entry<String, StateMachine> e : stateMachines.entrySet()) {
- if (e.getValue().supplicantAddress() != null &&
- e.getValue().supplicantAddress().equals(mac)) {
- stateMachine = stateMachines.remove(e.getKey());
- break;
- }
+ if (r.isEmpty()) {
+ return false;
}
- if (stateMachine != null) {
- stateMachine.stop();
- return true;
- }
+ Versioned<AuthenticationRecord> removed =
+ authentications.remove(r.get().value().supplicantConnectPoint());
- return false;
+ return removed != null;
}
- public StateMachine getStateMachine(String sessionId) {
+ StateMachine getStateMachine(String sessionId) {
return stateMachines.get(sessionId);
}
@@ -719,7 +748,7 @@
DeviceId deviceId = inPacket.receivedFrom().deviceId();
PortNumber portNumber = inPacket.receivedFrom().port();
- String sessionId = deviceId.toString() + portNumber.toString();
+ String sessionId = sessionId(inPacket.receivedFrom());
EAPOL eapol = (EAPOL) ethPkt.getPayload();
if (log.isTraceEnabled()) {
log.trace("Received EAPOL packet {} in enclosing packet {} from "
@@ -749,6 +778,7 @@
case EAPOL.EAPOL_START:
log.debug("EAP packet: EAPOL_START");
stateMachine.setSupplicantConnectpoint(inPacket.receivedFrom());
+ stateMachine.setSupplicantAddress(srcMac);
stateMachine.start();
aaaStatisticsManager.getAaaStats().incrementEapolStartReqTrans();
@@ -761,7 +791,6 @@
ethPkt.getVlanID(), EAPOL.EAPOL_PACKET,
eapPayload, stateMachine.priorityCode());
- stateMachine.setSupplicantAddress(srcMac);
stateMachine.setVlanId(ethPkt.getVlanID());
log.debug("Getting EAP identity from supplicant {}", stateMachine.supplicantAddress().toString());
sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
@@ -903,6 +932,13 @@
handleStateMachineTimeout(authenticationEvent.subject());
}
+ AuthenticationRecord record = authenticationEvent.authenticationRecord();
+ if (record == null) {
+ authentications.remove(authenticationEvent.subject());
+ } else {
+ authentications.put(authenticationEvent.subject(), record);
+ }
+
post(authenticationEvent);
}
}
@@ -991,6 +1027,19 @@
}
}
+ private class InternalMapEventListener implements MapEventListener<ConnectPoint, AuthenticationRecord> {
+ @Override
+ public void event(MapEvent<ConnectPoint, AuthenticationRecord> event) {
+ if (event.type() == MapEvent.Type.REMOVE) {
+ // remove local state machine if user has requested remove
+ StateMachine sm = stateMachines.remove(sessionId(event.key()));
+ if (sm != null) {
+ sm.stop();
+ }
+ }
+ }
+ }
+
private class InternalDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {