Distribute authentication state amongst the cluster

Change-Id: I63f36b3e2e5830241a2cb4e1c084d9e214995d2b
diff --git a/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java b/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java
index 87054c0..580e10c 100644
--- a/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java
+++ b/api/src/main/java/org/opencord/aaa/AuthenticationEvent.java
@@ -55,6 +55,8 @@
         TIMEOUT
     }
 
+    private AuthenticationRecord authRecord;
+
     /**
      * Creates a new authentication event.
      *
@@ -65,4 +67,25 @@
         super(type, connectPoint);
     }
 
+    /**
+     * Creates a new authentication event.
+     *
+     * @param type event type
+     * @param connectPoint port
+     * @param record information about the authentication state
+     */
+    public AuthenticationEvent(Type type, ConnectPoint connectPoint, AuthenticationRecord record) {
+        super(type, connectPoint);
+        this.authRecord = record;
+    }
+
+    /**
+     * Gets information about the authentication state.
+     *
+     * @return authentication record
+     */
+    public AuthenticationRecord authenticationRecord() {
+        return this.authRecord;
+    }
+
 }
diff --git a/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java b/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java
index 5433fd9..a0d099f 100644
--- a/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java
+++ b/api/src/main/java/org/opencord/aaa/AuthenticationRecord.java
@@ -32,6 +32,8 @@
 
     private final String state;
 
+    private final long lastChanged;
+
     /**
      * Creates a new authentication record.
      *
@@ -39,13 +41,15 @@
      * @param username user name
      * @param supplicantAddress MAC address of supplicant
      * @param state authentication state
+     * @param lastChanged timestamp of latest activity
      */
     public AuthenticationRecord(ConnectPoint supplicantConnectPoint, byte[] username,
-                                MacAddress supplicantAddress, String state) {
+                                MacAddress supplicantAddress, String state, long lastChanged) {
         this.supplicantConnectPoint = supplicantConnectPoint;
         this.username = username;
         this.supplicantAddress = supplicantAddress;
         this.state = state;
+        this.lastChanged = lastChanged;
     }
 
     /**
@@ -83,4 +87,13 @@
     public String state() {
         return state;
     }
+
+    /**
+     * Gets the timestamp of the last activity on this authentication.
+     *
+     * @return timestamp
+     */
+    public long lastChanged() {
+        return lastChanged;
+    }
 }
diff --git a/api/src/main/java/org/opencord/aaa/AuthenticationService.java b/api/src/main/java/org/opencord/aaa/AuthenticationService.java
index c474203..3399966 100644
--- a/api/src/main/java/org/opencord/aaa/AuthenticationService.java
+++ b/api/src/main/java/org/opencord/aaa/AuthenticationService.java
@@ -19,8 +19,6 @@
 import org.onlab.packet.MacAddress;
 import org.onosproject.event.ListenerService;
 
-import java.util.List;
-
 /**
  * Service for interacting with authentication state.
  */
@@ -32,13 +30,13 @@
      *
      * @return list of authentication records
      */
-    List<AuthenticationRecord> getAuthenticationRecords();
+    Iterable<AuthenticationRecord> getAuthenticationRecords();
 
     /**
      * Removes an authentication record.
      *
      * @param mac MAC address of record to remove
-     * @return true if a record was remove, otherwise false
+     * @return true if a record was removed, otherwise false
      */
     boolean removeAuthenticationStateByMac(MacAddress mac);
 
diff --git a/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java b/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
index cd1cea5..11958a6 100644
--- a/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
+++ b/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
@@ -17,6 +17,7 @@
 
 import org.apache.karaf.shell.api.action.Command;
 import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onlab.util.Tools;
 import org.onosproject.cli.AbstractShellCommand;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
@@ -40,9 +41,6 @@
         AuthenticationService authService = get(AuthenticationService.class);
 
         for (AuthenticationRecord auth : authService.getAuthenticationRecords()) {
-            String deviceId = auth.supplicantConnectPoint().deviceId().toString();
-            String portNum = auth.supplicantConnectPoint().port().toString();
-
             String username = "UNKNOWN";
             if (auth.username() != null) {
                 username = new String(auth.username());
@@ -61,8 +59,9 @@
                 subsId = subscriber.nasPortId();
             }
 
-            print("UserName=%s,CurrentState=%s,DeviceId=%s,MAC=%s,PortNumber=%s,SubscriberId=%s",
-                  username, auth.state(), deviceId, mac, portNum, subsId);
+            print("%s: %s, last-changed=%s, mac=%s, subid=%s, username=%s",
+                    auth.supplicantConnectPoint(), auth.state(), Tools.timeAgo(auth.lastChanged()),
+                    mac, subsId, username);
         }
     }
 }
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 67cfb75..d04d0a7 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -26,6 +26,7 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.RADIUS;
 import org.onlab.packet.RADIUSAttribute;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -34,6 +35,7 @@
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.ElementId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
@@ -50,6 +52,12 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.MapEvent;
+import org.onosproject.store.service.MapEventListener;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.opencord.aaa.AaaConfig;
 import org.opencord.aaa.AaaMachineStatisticsEvent;
 import org.opencord.aaa.AaaMachineStatisticsService;
@@ -80,19 +88,19 @@
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
 import java.net.InetAddress;
+import java.net.URI;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Dictionary;
 import java.util.HashSet;
 import java.util.List;
-import java.util.Map;
+import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
 import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION;
@@ -126,6 +134,9 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected PacketService packetService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -166,6 +177,8 @@
 
     private ConcurrentMap<String, StateMachine> stateMachines;
 
+    private ConsistentMap<ConnectPoint, AuthenticationRecord> authentications;
+
     // NAS IP address
     protected InetAddress nasIpAddress;
 
@@ -232,6 +245,8 @@
     // Listener for config changes
     private final InternalConfigListener cfgListener = new InternalConfigListener();
 
+    private final InternalMapEventListener mapListener = new InternalMapEventListener();
+
     private StateMachineDelegate delegate = new InternalStateMachineDelegate();
 
     /**
@@ -273,6 +288,28 @@
         idManager = new IdentifierManager();
         stateMachines = Maps.newConcurrentMap();
         appId = coreService.registerApplication(APP_NAME);
+
+        KryoNamespace authSerializer = KryoNamespace.newBuilder()
+                .register(byte[].class)
+                .register(String.class)
+                .register(long.class)
+                .register(boolean.class)
+                .register(URI.class)
+                .register(DeviceId.class)
+                .register(ElementId.class)
+                .register(PortNumber.class)
+                .register(ConnectPoint.class)
+                .register(MacAddress.class)
+                .register(AuthenticationRecord.class)
+                .build();
+
+        authentications = storageService.<ConnectPoint, AuthenticationRecord>consistentMapBuilder()
+                .withApplicationId(appId)
+                .withName("authentications")
+                .withSerializer(Serializer.using(authSerializer))
+                .build();
+        authentications.addListener(mapListener);
+
         eventDispatcher.addSink(AuthenticationEvent.class, listenerRegistry);
         netCfgService.addListener(cfgListener);
         netCfgService.registerConfigFactory(factory);
@@ -318,6 +355,9 @@
         scheduledFuture.cancel(true);
         scheduledStatusServerChecker.cancel(true);
         executor.shutdown();
+
+        authentications.removeListener(mapListener);
+
         log.info("Stopped");
     }
     @Modified
@@ -613,39 +653,28 @@
     }
 
     @Override
-    public List<AuthenticationRecord> getAuthenticationRecords() {
-        return stateMachines.values().stream()
-                .map(this::toAuthRecord)
-                .collect(Collectors.toList());
-    }
-
-    private AuthenticationRecord toAuthRecord(StateMachine stateMachine) {
-        return new AuthenticationRecord(stateMachine.supplicantConnectpoint(),
-                stateMachine.username(), stateMachine.supplicantAddress(), stateMachine.stateString());
+    public Iterable<AuthenticationRecord> getAuthenticationRecords() {
+        return authentications.asJavaMap().values();
     }
 
     @Override
     public boolean removeAuthenticationStateByMac(MacAddress mac) {
 
-        StateMachine stateMachine = null;
+        Optional<Versioned<AuthenticationRecord>> r = authentications.values().stream()
+                .filter(v -> v.value().supplicantAddress().equals(mac))
+                .findFirst();
 
-        for (Map.Entry<String, StateMachine> e : stateMachines.entrySet()) {
-            if (e.getValue().supplicantAddress() != null &&
-                    e.getValue().supplicantAddress().equals(mac)) {
-                stateMachine = stateMachines.remove(e.getKey());
-                break;
-            }
+        if (r.isEmpty()) {
+            return false;
         }
 
-        if (stateMachine != null) {
-            stateMachine.stop();
-            return true;
-        }
+        Versioned<AuthenticationRecord> removed =
+                authentications.remove(r.get().value().supplicantConnectPoint());
 
-        return false;
+        return removed != null;
     }
 
-    public StateMachine getStateMachine(String sessionId) {
+    StateMachine getStateMachine(String sessionId) {
         return stateMachines.get(sessionId);
     }
 
@@ -719,7 +748,7 @@
 
             DeviceId deviceId = inPacket.receivedFrom().deviceId();
             PortNumber portNumber = inPacket.receivedFrom().port();
-            String sessionId = deviceId.toString() + portNumber.toString();
+            String sessionId = sessionId(inPacket.receivedFrom());
             EAPOL eapol = (EAPOL) ethPkt.getPayload();
             if (log.isTraceEnabled()) {
                 log.trace("Received EAPOL packet {} in enclosing packet {} from "
@@ -749,6 +778,7 @@
                 case EAPOL.EAPOL_START:
                     log.debug("EAP packet: EAPOL_START");
                     stateMachine.setSupplicantConnectpoint(inPacket.receivedFrom());
+                    stateMachine.setSupplicantAddress(srcMac);
                     stateMachine.start();
 
                     aaaStatisticsManager.getAaaStats().incrementEapolStartReqTrans();
@@ -761,7 +791,6 @@
                                                       ethPkt.getVlanID(), EAPOL.EAPOL_PACKET,
                                                       eapPayload, stateMachine.priorityCode());
 
-                    stateMachine.setSupplicantAddress(srcMac);
                     stateMachine.setVlanId(ethPkt.getVlanID());
                     log.debug("Getting EAP identity from supplicant {}", stateMachine.supplicantAddress().toString());
                     sendPacketToSupplicant(eth, stateMachine.supplicantConnectpoint(), false);
@@ -903,6 +932,13 @@
                 handleStateMachineTimeout(authenticationEvent.subject());
             }
 
+            AuthenticationRecord record = authenticationEvent.authenticationRecord();
+            if (record == null) {
+                authentications.remove(authenticationEvent.subject());
+            } else {
+                authentications.put(authenticationEvent.subject(), record);
+            }
+
             post(authenticationEvent);
         }
     }
@@ -991,6 +1027,19 @@
         }
     }
 
+    private class InternalMapEventListener implements MapEventListener<ConnectPoint, AuthenticationRecord> {
+        @Override
+        public void event(MapEvent<ConnectPoint, AuthenticationRecord> event) {
+            if (event.type() == MapEvent.Type.REMOVE) {
+                // remove local state machine if user has requested remove
+                StateMachine sm = stateMachines.remove(sessionId(event.key()));
+                if (sm != null) {
+                    sm.stop();
+                }
+            }
+        }
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
diff --git a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
index 744b0ae..898433b 100644
--- a/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
+++ b/app/src/main/java/org/opencord/aaa/impl/StateMachine.java
@@ -20,6 +20,7 @@
 import org.onlab.packet.MacAddress;
 import org.onosproject.net.ConnectPoint;
 import org.opencord.aaa.AuthenticationEvent;
+import org.opencord.aaa.AuthenticationRecord;
 import org.opencord.aaa.StateMachineDelegate;
 import org.slf4j.Logger;
 
@@ -506,7 +507,8 @@
 
         states[currentState].start();
 
-        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.STARTED, supplicantConnectpoint));
+        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.STARTED,
+                supplicantConnectpoint, toAuthRecord()));
 
         // move to the next state
         next(TRANSITION_START);
@@ -519,7 +521,8 @@
     public void requestAccess() {
         states[currentState].requestAccess();
 
-        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.REQUESTED, supplicantConnectpoint));
+        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.REQUESTED,
+                supplicantConnectpoint, toAuthRecord()));
 
         // move to the next state
         next(TRANSITION_REQUEST_ACCESS);
@@ -533,7 +536,8 @@
         // move to the next state
         next(TRANSITION_AUTHORIZE_ACCESS);
 
-        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.APPROVED, supplicantConnectpoint));
+        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.APPROVED,
+                supplicantConnectpoint, toAuthRecord()));
 
         // Clear mapping
         deleteStateMachineMapping(this);
@@ -547,7 +551,8 @@
         // move to the next state
         next(TRANSITION_DENY_ACCESS);
 
-        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.DENIED, supplicantConnectpoint));
+        delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.DENIED,
+                supplicantConnectpoint, toAuthRecord()));
 
         // Clear mappings
         deleteStateMachineMapping(this);
@@ -558,10 +563,19 @@
      */
     public void logoff() {
         states[currentState].logoff();
+
+        // TODO event here?
+
         // move to the next state
         next(TRANSITION_LOGOFF);
     }
 
+    private AuthenticationRecord toAuthRecord() {
+        return new AuthenticationRecord(this.supplicantConnectpoint(),
+                this.username(), this.supplicantAddress(), this.stateString(),
+                this.getLastPacketReceivedTime());
+    }
+
     /**
      * Gets the current state.
      *
@@ -720,7 +734,10 @@
                 (System.currentTimeMillis() - lastPacketReceivedTime) > ((cleanupTimerTimeOutInMins * 60 * 1000) / 2);
 
         if (TIMEOUT_ELIGIBLE_STATES.contains(currentState) && noTrafficWithinThreshold) {
-            delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.TIMEOUT, this.supplicantConnectpoint));
+            this.setSessionTerminateReason(SessionTerminationReasons.TIME_OUT.reason);
+
+            delegate.notify(new AuthenticationEvent(AuthenticationEvent.Type.TIMEOUT,
+                    this.supplicantConnectpoint));
             // If StateMachine is not eligible for cleanup yet, reschedule cleanupTimer further.
         } else {
             this.scheduleTimeout();
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
index d50be7d..4309e31 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
@@ -34,6 +34,7 @@
 import org.onosproject.net.config.Config;
 import org.onosproject.net.config.NetworkConfigRegistryAdapter;
 import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.store.service.TestStorageService;
 import org.opencord.aaa.AaaConfig;
 
 import java.net.InetAddress;
@@ -121,6 +122,7 @@
         aaaManager.deviceService = new TestDeviceService();
         aaaManager.sadisService = new MockSadisService();
         aaaManager.cfgService = new MockCfgService();
+        aaaManager.storageService = new TestStorageService();
         aaaStatisticsManager = new AaaStatisticsManager();
         aaaManager.radiusOperationalStatusService = new RadiusOperationalStatusManager();
         TestUtils.setField(aaaStatisticsManager, "eventDispatcher", new TestEventDispatcher());
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
index 2f67ba0..93245ae 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
@@ -38,6 +38,7 @@
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.service.TestStorageService;
 import org.opencord.aaa.AaaConfig;
 import org.slf4j.Logger;
 
@@ -141,6 +142,7 @@
         aaaManager.deviceService = new TestDeviceService();
         aaaManager.sadisService = new MockSadisService();
         aaaManager.cfgService = new MockCfgService();
+        aaaManager.storageService = new TestStorageService();
         aaaStatisticsManager = new AaaStatisticsManager();
         aaaSupplicantStatsManager = new AaaSupplicantMachineStatsManager();
         TestUtils.setField(aaaStatisticsManager, "eventDispatcher", new TestEventDispatcher());