Distribute authentication state amongst the cluster

Change-Id: I63f36b3e2e5830241a2cb4e1c084d9e214995d2b
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 67cfb75..d04d0a7 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -26,6 +26,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.RADIUS;
 import org.onlab.packet.RADIUSAttribute;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -34,6 +35,7 @@
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.ElementId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
@@ -50,6 +52,12 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.opencord.aaa.AaaConfig;
 import org.opencord.aaa.AaaMachineStatisticsEvent;
 import org.opencord.aaa.AaaMachineStatisticsService;
@@ -80,19 +88,19 @@
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 import java.net.InetAddress;
+import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
 import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
@@ -126,6 +134,9 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected PacketService packetService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -166,6 +177,8 @@
 
     private ConcurrentMap<String, StateMachine> stateMachines;
 
+    private ConsistentMap<ConnectPoint, AuthenticationRecord> authentications;
+
     // NAS IP address
     protected InetAddress nasIpAddress;
 
@@ -232,6 +245,8 @@
     // Listener for config changes
     private final InternalConfigListener cfgListener = new InternalConfigListener();
 
+    private final InternalMapEventListener mapListener = new InternalMapEventListener();
+
     private StateMachineDelegate delegate = new InternalStateMachineDelegate();
 
     /**
@@ -273,6 +288,28 @@
         idManager = new IdentifierManager();
         stateMachines = Maps.newConcurrentMap();
         appId = coreService.registerApplication(APP_NAME);
+
+        KryoNamespace authSerializer = KryoNamespace.newBuilder()
+                .register(byte[].class)
+                .register(String.class)
+                .register(long.class)
+                .register(boolean.class)
+                .register(URI.class)
+                .register(DeviceId.class)
+                .register(ElementId.class)
+                .register(PortNumber.class)
+                .register(ConnectPoint.class)
+                .register(MacAddress.class)
+                .register(AuthenticationRecord.class)
+                .build();
+
+        authentications = storageService.<ConnectPoint, AuthenticationRecord>consistentMapBuilder()
+                .withApplicationId(appId)
+                .withName("authentications")
+                .withSerializer(Serializer.using(authSerializer))
+                .build();
+        authentications.addListener(mapListener);
+
         eventDispatcher.addSink(AuthenticationEvent.class, listenerRegistry);
         netCfgService.addListener(cfgListener);
         netCfgService.registerConfigFactory(factory);
@@ -318,6 +355,9 @@
         scheduledFuture.cancel(true);
         scheduledStatusServerChecker.cancel(true);
         executor.shutdown();
+
+        authentications.removeListener(mapListener);
+
         log.info("Stopped");
     }
     @Modified
@@ -613,39 +653,28 @@
     }
 
     @Override
-    public List<AuthenticationRecord> getAuthenticationRecords() {
-        return stateMachines.values().stream()
-                .map(this::toAuthRecord)
-                .collect(Collectors.toList());
-    }
-
-    private AuthenticationRecord toAuthRecord(StateMachine stateMachine) {
-        return new AuthenticationRecord(stateMachine.supplicantConnectpoint(),
-                stateMachine.username(), stateMachine.supplicantAddress(), stateMachine.stateString());
+    public Iterable<AuthenticationRecord> getAuthenticationRecords() {
+        return authentications.asJavaMap().values();
     }
 
     @Override
     public boolean removeAuthenticationStateByMac(MacAddress mac) {
 
-        StateMachine stateMachine = null;
+        Optional<Versioned<AuthenticationRecord>> r = authentications.values().stream()
+                .filter(v -> v.value().supplicantAddress().equals(mac))
+                .findFirst();
 
-        for (Map.Entry<String, StateMachine> e : stateMachines.entrySet()) {
-            if (e.getValue().supplicantAddress() != null &&
-                    e.getValue().supplicantAddress().equals(mac)) {
-                stateMachine = stateMachines.remove(e.getKey());
-                break;
-            }
+        if (r.isEmpty()) {
+            return false;
         }
 
-        if (stateMachine != null) {
-            stateMachine.stop();
-            return true;
-        }
+        Versioned<AuthenticationRecord> removed =
+                authentications.remove(r.get().value().supplicantConnectPoint());
 
-        return false;
+        return removed != null;
     }
 
-    public StateMachine getStateMachine(String sessionId) {
+    StateMachine getStateMachine(String sessionId) {
         return stateMachines.get(sessionId);
     }
 
@@ -719,7 +748,7 @@
 
             DeviceId deviceId = inPacket.receivedFrom().deviceId();
             PortNumber portNumber = inPacket.receivedFrom().port();
-            String sessionId = deviceId.toString() + portNumber.toString();
+            String sessionId = sessionId(inPacket.receivedFrom());
             EAPOL eapol = (EAPOL) ethPkt.getPayload();
             if (log.isTraceEnabled()) {
                 log.trace("Received EAPOL packet {} in enclosing packet {} from "
@@ -749,6 +778,7 @@
                 case EAPOL.EAPOL_START:
                     log.debug("EAP packet: EAPOL_START");
                     stateMachine.setSupplicantConnectpoint(inPacket.receivedFrom());
+                    stateMachine.setSupplicantAddress(srcMac);
                     stateMachine.start();
 
                     aaaStatisticsManager.getAaaStats().incrementEapolStartReqTrans();
@@ -761,7 +791,6 @@
                                                       ethPkt.getVlanID(), EAPOL.EAPOL_PACKET,
                                                       eapPayload, stateMachine.priorityCode());
 
-                    stateMachine.setSupplicantAddress(srcMac);
                     stateMachine.setVlanId(ethPkt.getVlanID());
                     log.debug("Getting EAP identity from supplicant {}", stateMachine.supplicantAddress().toString());
                     sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
@@ -903,6 +932,13 @@
                 handleStateMachineTimeout(authenticationEvent.subject());
             }
 
+            AuthenticationRecord record = authenticationEvent.authenticationRecord();
+            if (record == null) {
+                authentications.remove(authenticationEvent.subject());
+            } else {
+                authentications.put(authenticationEvent.subject(), record);
+            }
+
             post(authenticationEvent);
         }
     }
@@ -991,6 +1027,19 @@
         }
     }
 
+    private class InternalMapEventListener implements MapEventListener<ConnectPoint, AuthenticationRecord> {
+        @Override
+        public void event(MapEvent<ConnectPoint, AuthenticationRecord> event) {
+            if (event.type() == MapEvent.Type.REMOVE) {
+                // remove local state machine if user has requested remove
+                StateMachine sm = stateMachines.remove(sessionId(event.key()));
+                if (sm != null) {
+                    sm.stop();
+                }
+            }
+        }
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {