Publish cluster-wide AAA stats

Change-Id: Icbdacdae08b6235be022f85eb41ce6d0f8f35a35
diff --git a/app/pom.xml b/app/pom.xml
index 91c79e4..79fbd13 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -72,6 +72,13 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onos-api</artifactId>
             <version>${onos.version}</version>
             <classifier>tests</classifier>
diff --git a/app/src/main/java/org/opencord/aaa/cli/AaaShowCountersCommand.java b/app/src/main/java/org/opencord/aaa/cli/AaaShowCountersCommand.java
index 054f6f1..3c08464 100644
--- a/app/src/main/java/org/opencord/aaa/cli/AaaShowCountersCommand.java
+++ b/app/src/main/java/org/opencord/aaa/cli/AaaShowCountersCommand.java
@@ -19,52 +19,27 @@
 import org.apache.karaf.shell.api.action.lifecycle.Service;
 import org.onosproject.cli.AbstractShellCommand;
 import org.opencord.aaa.AaaStatistics;
+import org.opencord.aaa.AaaStatisticsSnapshot;
 import org.opencord.aaa.AuthenticationStatisticsService;
 
 /**
  * Display current value of all aaa statistics counters.
  */
 @Service
-@Command(scope = "onos", name = "show-aaa-counters",
+@Command(scope = "onos", name = "aaa-statistics",
 description = "Display current value of all aaa statistics counters")
 public class AaaShowCountersCommand extends AbstractShellCommand {
+
     @Override
     protected void doExecute() {
-
-        AaaStatistics aaaStats = new AaaStatistics();
-
         AuthenticationStatisticsService aaaStatisticsManager =
                 AbstractShellCommand.get(AuthenticationStatisticsService.class);
-        aaaStats = aaaStatisticsManager.getAaaStats();
 
-        System.out.format("%30s %10d\n", "AccessRequestsTx", aaaStats.getAccessRequestsTx());
-        System.out.format("%30s %10d\n", "ChallengeResponsesRx", aaaStats.getChallengeResponsesRx());
-        System.out.format("%30s %10d\n", "RequestReTx", aaaStats.getRequestReTx());
-        System.out.format("%30s %10d\n", "AcceptResponsesRx", aaaStats.getAcceptResponsesRx());
-        System.out.format("%30s %10d\n", "RejectResponsesRx", aaaStats.getRejectResponsesRx());
-        System.out.format("%30s %10d\n", "PendingRequests", aaaStats.getPendingRequests());
-        System.out.format("%30s %10d\n", "DroppedResponsesRx", aaaStats.getDroppedResponsesRx());
-        System.out.format("%30s %10d\n", "InvalidValidatorsRx", aaaStats.getInvalidValidatorsRx());
-        System.out.format("%30s %10d\n", "MalformedResponsesRx", aaaStats.getMalformedResponsesRx());
-        System.out.format("%30s %10d\n", "UnknownServerRx", aaaStats.getUnknownServerRx());
-        System.out.format("%30s %10d\n", "UnknownTypeRx", aaaStats.getUnknownTypeRx());
-        System.out.format("%30s %10d\n", "RequestRttMillis", aaaStats.getRequestRttMilis());
-        System.out.format("%30s %10d\n", "EapolLogoffRx", aaaStats.getEapolLogoffRx());
-        System.out.format("%30s %10d\n", "EapolAuthSuccessTrans", aaaStats.getEapolAuthSuccessTrans());
-        System.out.format("%30s %10d\n", "EapolAuthFailureTrans", aaaStats.getEapolAuthFailureTrans());
-        System.out.format("%30s %10d\n", "EapolStartReqTrans", aaaStats.getEapolStartReqTrans());
-        System.out.format("%30s %10d\n", "EapolTransRespNotNak", aaaStats.getEapolTransRespNotNak());
-        System.out.format("%30s %10d\n", "EapPktTxauthChooseEap", aaaStats.getEapPktTxauthChooseEap());
-        System.out.format("%30s %10d\n", "EapolResIdentityMsgTrans", aaaStats.getEapolResIdentityMsgTrans());
-        System.out.format("%30s %10d\n", "AuthStateIdle", aaaStats.getAuthStateIdle());
-        System.out.format("%30s %10d\n", "RequestIdFramesTx", aaaStats.getRequestIdFramesTx());
-        System.out.format("%30s %10d\n", "ReqEapFramesTx", aaaStats.getReqEapFramesTx());
-        System.out.format("%30s %10d\n", "InvalidPktType", aaaStats.getInvalidPktType());
-        System.out.format("%30s %10d\n", "InvalidBodyLength", aaaStats.getInvalidBodyLength());
-        System.out.format("%30s %10d\n", "ValidEapolFramesRx", aaaStats.getValidEapolFramesRx());
-        System.out.format("%30s %10d\n", "PendingResSupp", aaaStats.getPendingResSupp());
-        System.out.format("%30s %10d\n", "EapolFramesTx", aaaStats.getEapolFramesTx());
-        System.out.format("%30s %10d\n", "TimedOutPackets", aaaStats.getTimedOutPackets());
+        AaaStatisticsSnapshot stats = aaaStatisticsManager.getClusterStatistics();
 
-  }
+        for (String name : AaaStatistics.COUNTER_NAMES) {
+            print("%30s %10d", name, stats.get(name));
+        }
+
+    }
 }
diff --git a/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java b/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
index 11958a6..c562261 100644
--- a/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
+++ b/app/src/main/java/org/opencord/aaa/cli/AaaShowUsersCommand.java
@@ -21,11 +21,17 @@
 import org.onosproject.cli.AbstractShellCommand;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.utils.Comparators;
 import org.opencord.aaa.AuthenticationRecord;
 import org.opencord.aaa.AuthenticationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
 
+import java.util.Comparator;
+import java.util.List;
+
+import static com.google.common.collect.Lists.newArrayList;
+
 /**
  * Shows the users in the aaa.
  */
@@ -36,11 +42,19 @@
     @Override
     protected void doExecute() {
 
+        final Comparator<AuthenticationRecord> authenticationRecordComparator =
+                (a1, a2) -> Comparators.CONNECT_POINT_COMPARATOR.
+                        compare(a1.supplicantConnectPoint(), a2.supplicantConnectPoint());
+
         DeviceService devService = get(DeviceService.class);
         SadisService sadisService = get(SadisService.class);
         AuthenticationService authService = get(AuthenticationService.class);
 
-        for (AuthenticationRecord auth : authService.getAuthenticationRecords()) {
+        List<AuthenticationRecord> authentications = newArrayList(authService.getAuthenticationRecords());
+
+        authentications.sort(authenticationRecordComparator);
+
+        for (AuthenticationRecord auth : authentications) {
             String username = "UNKNOWN";
             if (auth.username() != null) {
                 username = new String(auth.username());
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
index 79fc22c..85b1ce2 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaManager.java
@@ -49,7 +49,6 @@
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.ElementId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.config.ConfigFactory;
 import org.onosproject.net.config.NetworkConfigEvent;
@@ -66,6 +65,7 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.MapEvent;
 import org.onosproject.store.service.MapEventListener;
@@ -80,7 +80,6 @@
 import org.opencord.aaa.AuthenticationEventListener;
 import org.opencord.aaa.AuthenticationRecord;
 import org.opencord.aaa.AuthenticationService;
-import org.opencord.aaa.AuthenticationStatisticsEvent;
 import org.opencord.aaa.AuthenticationStatisticsService;
 import org.opencord.aaa.RadiusCommunicator;
 import org.opencord.aaa.RadiusOperationalStatusEvent;
@@ -101,7 +100,6 @@
 
 import javax.crypto.Mac;
 import javax.crypto.spec.SecretKeySpec;
-import java.net.URI;
 import java.util.Optional;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.Executors;
@@ -113,8 +111,6 @@
 import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
 import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT;
 import static org.opencord.aaa.impl.OsgiPropertyConstants.OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
-import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
 import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE;
 import static org.opencord.aaa.impl.OsgiPropertyConstants.STATUS_SERVER_MODE_DEFAULT;
 
@@ -122,7 +118,6 @@
  * AAA application for ONOS.
  */
 @Component(immediate = true, property = {
-        STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
         OPERATIONAL_STATUS_SERVER_EVENT_GENERATION + ":Integer=" + OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT,
         OPERATIONAL_STATUS_SERVER_TIMEOUT + ":Integer=" + OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT,
         STATUS_SERVER_MODE + ":String=" + STATUS_SERVER_MODE_DEFAULT,
@@ -168,12 +163,10 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected RadiusOperationalStatusService radiusOperationalStatusService;
 
-    protected AuthenticationStatisticsEventPublisher authenticationStatisticsPublisher;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
     private final DeviceListener deviceListener = new InternalDeviceListener();
 
     // Properties
-    private int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
     private int operationalStatusEventGenerationPeriodInSeconds = OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT;
     private int operationalStatusServerTimeoutInSeconds = OPERATIONAL_STATUS_SERVER_TIMEOUT_DEFAULT;
     protected String operationalStatusEvaluationMode = STATUS_SERVER_MODE_DEFAULT;
@@ -228,12 +221,11 @@
     // latest configuration
     AaaConfig newCfg;
 
-    ScheduledFuture<?> scheduledFuture;
     ScheduledFuture<?> scheduledStatusServerChecker;
     ScheduledExecutorService executor;
     String configuredAaaServerAddress;
-    HashSet<Byte> outPacketSet = new HashSet<Byte>();
-    HashSet<Byte> outPacketSupp = new HashSet<Byte>();
+    HashSet<Byte> outPacketSet = new HashSet<>();
+    HashSet<Byte> outPacketSupp = new HashSet<>();
     static final List<Byte> VALID_EAPOL_TYPE = Arrays.asList(EAPOL.EAPOL_START, EAPOL.EAPOL_LOGOFF, EAPOL.EAPOL_PACKET);
     static final int HEADER_LENGTH = 4;
     // Configuration properties factory
@@ -295,16 +287,7 @@
         appId = coreService.registerApplication(APP_NAME);
 
         KryoNamespace authSerializer = KryoNamespace.newBuilder()
-                .register(byte[].class)
-                .register(String.class)
-                .register(long.class)
-                .register(boolean.class)
-                .register(URI.class)
-                .register(DeviceId.class)
-                .register(ElementId.class)
-                .register(PortNumber.class)
-                .register(ConnectPoint.class)
-                .register(MacAddress.class)
+                .register(KryoNamespaces.API)
                 .register(AuthenticationRecord.class)
                 .build();
 
@@ -335,12 +318,8 @@
         deviceService.addListener(deviceListener);
         getConfiguredAaaServerAddress();
         radiusOperationalStatusService.initialize(nasIpAddress.getAddress(), radiusSecret, impl);
-        authenticationStatisticsPublisher =
-                new AuthenticationStatisticsEventPublisher();
         executor = Executors.newScheduledThreadPool(3);
 
-        scheduledFuture = executor.scheduleAtFixedRate(authenticationStatisticsPublisher,
-            0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
         scheduledStatusServerChecker = executor.scheduleAtFixedRate(new ServerStatusChecker(), 0,
             operationalStatusEventGenerationPeriodInSeconds, TimeUnit.SECONDS);
 
@@ -357,7 +336,6 @@
         impl.deactivate();
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AuthenticationEvent.class);
-        scheduledFuture.cancel(true);
         scheduledStatusServerChecker.cancel(true);
         executor.shutdown();
 
@@ -369,11 +347,7 @@
     public void modified(ComponentContext context) {
         Dictionary<String, Object> properties = context.getProperties();
 
-        String s = Tools.get(properties, "statisticsGenerationPeriodInSeconds");
-        statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_GENERATION_PERIOD_DEFAULT
-                : Integer.parseInt(s.trim());
-
-        s = Tools.get(properties, "operationalStatusEventGenerationPeriodInSeconds");
+        String s = Tools.get(properties, "operationalStatusEventGenerationPeriodInSeconds");
         operationalStatusEventGenerationPeriodInSeconds = Strings.isNullOrEmpty(s)
                 ? OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT
                     : Integer.parseInt(s.trim());
@@ -1121,51 +1095,6 @@
         }
     }
 
-    private class AuthenticationStatisticsEventPublisher implements Runnable {
-        private final Logger log = getLogger(getClass());
-        public void run() {
-            log.info("Notifying AuthenticationStatisticsEvent");
-            aaaStatisticsManager.calculatePacketRoundtripTime();
-            log.debug("AcceptResponsesRx---" + aaaStatisticsManager.getAaaStats().getAcceptResponsesRx());
-            log.debug("AccessRequestsTx---" + aaaStatisticsManager.getAaaStats().getAccessRequestsTx());
-            log.debug("ChallengeResponsesRx---" + aaaStatisticsManager.getAaaStats().getChallengeResponsesRx());
-            log.debug("DroppedResponsesRx---" + aaaStatisticsManager.getAaaStats().getDroppedResponsesRx());
-            log.debug("InvalidValidatorsRx---" + aaaStatisticsManager.getAaaStats().getInvalidValidatorsRx());
-            log.debug("MalformedResponsesRx---" + aaaStatisticsManager.getAaaStats().getMalformedResponsesRx());
-            log.debug("PendingRequests---" + aaaStatisticsManager.getAaaStats().getPendingRequests());
-            log.debug("RejectResponsesRx---" + aaaStatisticsManager.getAaaStats().getRejectResponsesRx());
-            log.debug("RequestReTx---" + aaaStatisticsManager.getAaaStats().getRequestReTx());
-            log.debug("RequestRttMilis---" + aaaStatisticsManager.getAaaStats().getRequestRttMilis());
-            log.debug("UnknownServerRx---" + aaaStatisticsManager.getAaaStats().getUnknownServerRx());
-            log.debug("UnknownTypeRx---" + aaaStatisticsManager.getAaaStats().getUnknownTypeRx());
-            log.debug("TimedOutPackets----" + aaaStatisticsManager.getAaaStats().getTimedOutPackets());
-            log.debug("EapolLogoffRx---" + aaaStatisticsManager.getAaaStats().getEapolLogoffRx());
-            log.debug("EapolAuthSuccessTrans---" + aaaStatisticsManager.getAaaStats().getEapolAuthSuccessTrans());
-            log.debug("EapolAuthFailureTrans---" +
-            aaaStatisticsManager.getAaaStats().getEapolAuthFailureTrans());
-            log.debug("EapolStartReqTrans---" +
-            aaaStatisticsManager.getAaaStats().getEapolStartReqTrans());
-            log.debug("EapolTransRespNotNak---" +
-            aaaStatisticsManager.getAaaStats().getEapolTransRespNotNak());
-            log.debug("EapPktTxauthChooseEap---" +
-            aaaStatisticsManager.getAaaStats().getEapPktTxauthChooseEap());
-            log.debug("EapolResIdentityMsgTrans---" +
-            aaaStatisticsManager.getAaaStats().getEapolResIdentityMsgTrans());
-            log.debug("EapolFramesTx---" + aaaStatisticsManager.getAaaStats().getEapolFramesTx());
-            log.debug("AuthStateIdle---" + aaaStatisticsManager.getAaaStats().getAuthStateIdle());
-            log.debug("RequestIdFramesTx---" + aaaStatisticsManager.getAaaStats().getRequestIdFramesTx());
-            log.debug("ReqEapFramesTx---" + aaaStatisticsManager.getAaaStats().getReqEapFramesTx());
-            log.debug("InvalidPktType---" + aaaStatisticsManager.getAaaStats().getInvalidPktType());
-            log.debug("InvalidBodyLength---" + aaaStatisticsManager.getAaaStats().getInvalidBodyLength());
-            log.debug("ValidEapolFramesRx---" + aaaStatisticsManager.getAaaStats().getValidEapolFramesRx());
-            log.debug("PendingResSupp---" + aaaStatisticsManager.getAaaStats().getPendingResSupp());
-            log.debug("ResIdEapFramesRx---" + aaaStatisticsManager.getAaaStats().getEapolattrIdentity());
-            aaaStatisticsManager.getStatsDelegate().
-                notify(new AuthenticationStatisticsEvent(AuthenticationStatisticsEvent.Type.STATS_UPDATE,
-                    aaaStatisticsManager.getAaaStats()));
-        }
-    }
-
     private class ServerStatusChecker implements Runnable {
         @Override
         public void run() {
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java
index 125898e..8bb2f53 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java
@@ -16,60 +16,171 @@
 
 package org.opencord.aaa.impl;
 
+import com.google.common.base.Strings;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.opencord.aaa.AaaStatistics;
-import org.opencord.aaa.AuthenticationStatisticsDelegate;
+import org.opencord.aaa.AaaStatisticsSnapshot;
 import org.opencord.aaa.AuthenticationStatisticsEvent;
 import org.opencord.aaa.AuthenticationStatisticsEventListener;
 import org.opencord.aaa.AuthenticationStatisticsService;
+import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
-
-
-@Component(immediate = true)
+/**
+ * Manages collection and publishing of statistics for the AAA application.
+ */
+@Component(immediate = true, property = {
+        STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+        STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
+})
 public class AaaStatisticsManager
-extends AbstractListenerManager<AuthenticationStatisticsEvent, AuthenticationStatisticsEventListener>
-implements AuthenticationStatisticsService {
+        extends AbstractListenerManager<AuthenticationStatisticsEvent, AuthenticationStatisticsEventListener>
+        implements AuthenticationStatisticsService {
 
-    private AuthenticationStatisticsDelegate statsDelegate;
+    private static final String AAA_STATISTICS_LEADERSHIP = "aaa-statistics";
 
-    @Override
-    public AuthenticationStatisticsDelegate getStatsDelegate() {
-        return statsDelegate;
-    }
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("aaa-statistics-reset");
+
+    private int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+    private int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
+
+    private ScheduledExecutorService executor;
+
+    private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
+
+    private EventuallyConsistentMap<NodeId, AaaStatisticsSnapshot> statistics;
 
     private final Logger log = getLogger(getClass());
     private AaaStatistics aaaStats;
-    public Map<Byte, Long> outgoingPacketMap = new HashMap<Byte, Long>();
+    private Map<Byte, Long> outgoingPacketMap = new HashMap<>();
     private static final int PACKET_COUNT_FOR_AVERAGE_RTT_CALCULATION = 5;
 
+    KryoNamespace serializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(AaaStatisticsSnapshot.class)
+            .register(ClusterMessage.class)
+            .register(MessageSubject.class)
+            .build();
+
     @Override
     public AaaStatistics getAaaStats() {
         return aaaStats;
     }
 
+    @Override
+    public AaaStatisticsSnapshot getClusterStatistics() {
+        return aggregate();
+    }
+
     @Activate
-    public void activate() {
+    public void activate(ComponentContext context) {
         log.info("Activate aaaStatisticsManager");
-        aaaStats = new AaaStatistics();
-        statsDelegate = new InternalAuthenticationDelegateForStatistics();
+        modified(context);
+
+        statistics = storageService.<NodeId, AaaStatisticsSnapshot>eventuallyConsistentMapBuilder()
+                        .withName("aaa-statistics")
+                        .withSerializer(serializer)
+                        .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                        .build();
+
+        AaaStatisticsSnapshot snapshot = statistics.get(clusterService.getLocalNode().id());
+        if (snapshot == null) {
+            aaaStats = new AaaStatistics();
+        } else {
+            aaaStats = AaaStatistics.fromSnapshot(snapshot);
+        }
+
+        leadershipService.runForLeadership(AAA_STATISTICS_LEADERSHIP);
+
         eventDispatcher.addSink(AuthenticationStatisticsEvent.class, listenerRegistry);
+
+        executor = Executors.newScheduledThreadPool(1);
+
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+                this::resetLocal, executor);
+
+        syncTask = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::syncStats),
+                0, statisticsSyncPeriodInSeconds, TimeUnit.SECONDS);
+
+        publisherTask = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
+                0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
     }
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+
+        publisherTask.cancel(true);
+        syncTask.cancel(true);
+        executor.shutdownNow();
+
+        leadershipService.withdraw(AAA_STATISTICS_LEADERSHIP);
+
         eventDispatcher.removeSink(AuthenticationStatisticsEvent.class);
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+
+        String s = Tools.get(properties, "statisticsGenerationPeriodInSeconds");
+        statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_GENERATION_PERIOD_DEFAULT
+                : Integer.parseInt(s.trim());
+
+        s = Tools.get(properties, "statisticsSyncPeriodInSeconds");
+        statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_SYNC_PERIOD_DEFAULT
+                : Integer.parseInt(s.trim());
+    }
+
     @Override
     public void handleRoundtripTime(byte inPacketIdentifier) {
         long inTimeInMilis = System.currentTimeMillis();
@@ -83,7 +194,8 @@
 
     @Override
     public void resetAllCounters() {
-        aaaStats.resetAllCounters();
+        ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, new byte[]{});
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
     }
 
     @Override
@@ -100,14 +212,46 @@
     }
 
     /**
-     *Delegate allowing the StateMachine to notify us of events.
+     * Pushes in-memory stats into the eventually-consistent map for cluster-wide retention.
      */
-    private class InternalAuthenticationDelegateForStatistics implements AuthenticationStatisticsDelegate {
-        @Override
-        public void notify(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
-            log.debug("Authentication Statistics event {} for {}", authenticationStatisticsEvent.type(),
-                    authenticationStatisticsEvent.subject());
-            post(authenticationStatisticsEvent);
+    private void syncStats() {
+        calculatePacketRoundtripTime();
+
+        statistics.put(clusterService.getLocalNode().id(), aaaStats.snapshot());
+    }
+
+    /**
+     * Aggregates cluster-wise stats from the ec-map.
+     *
+     * @return aggregate stats
+     */
+    private AaaStatisticsSnapshot aggregate() {
+        return statistics.values().stream()
+                .reduce(new AaaStatisticsSnapshot(), AaaStatisticsSnapshot::add);
+    }
+
+    /**
+     * Publishes cluster-wide stats.
+     */
+    private void publishStats() {
+        // only publish if we are the leader
+        if (!Objects.equals(leadershipService.getLeader(AAA_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            return;
         }
+
+        AaaStatisticsSnapshot clusterStats = aggregate();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Notifying stats: {}", clusterStats);
+        }
+
+        post(new AuthenticationStatisticsEvent(AuthenticationStatisticsEvent.Type.STATS_UPDATE,
+                AaaStatistics.fromSnapshot(clusterStats)));
+    }
+
+    private void resetLocal(ClusterMessage m) {
+        aaaStats.resetAllCounters();
+        syncStats();
     }
 }
diff --git a/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
index 1c59799..34a2cc3 100644
--- a/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/aaa/impl/OsgiPropertyConstants.java
@@ -27,6 +27,9 @@
     public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
     public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
 
+    public static final String STATISTICS_SYNC_PERIOD = "statisticsSyncPeriodInSeconds";
+    public static final int STATISTICS_SYNC_PERIOD_DEFAULT = 5;
+
     public static final String OPERATIONAL_STATUS_SERVER_EVENT_GENERATION =
             "operationalStatusEventGenerationPeriodInSeconds";
     public static final int OPERATIONAL_STATUS_SERVER_EVENT_GENERATION_DEFAULT = 30;
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
index 4309e31..839d0b1 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaManagerTest.java
@@ -26,6 +26,9 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.RADIUS;
 import org.onlab.packet.RADIUSAttribute;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.event.DefaultEventSinkRegistry;
 import org.onosproject.event.Event;
@@ -34,6 +37,7 @@
 import org.onosproject.net.config.Config;
 import org.onosproject.net.config.NetworkConfigRegistryAdapter;
 import org.onosproject.net.packet.InboundPacket;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.aaa.AaaConfig;
 
@@ -77,6 +81,13 @@
         }
     }
 
+    static final class TestLeadershipService extends LeadershipServiceAdapter {
+        @Override
+        public NodeId getLeader(String path) {
+            return new ClusterServiceAdapter().getLocalNode().id();
+        }
+    }
+
     /**
      * Mocks the network config registry.
      */
@@ -124,9 +135,13 @@
         aaaManager.cfgService = new MockCfgService();
         aaaManager.storageService = new TestStorageService();
         aaaStatisticsManager = new AaaStatisticsManager();
+        aaaStatisticsManager.storageService = new TestStorageService();
+        aaaStatisticsManager.clusterService = new ClusterServiceAdapter();
+        aaaStatisticsManager.leadershipService = new TestLeadershipService();
+        aaaStatisticsManager.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
         aaaManager.radiusOperationalStatusService = new RadiusOperationalStatusManager();
         TestUtils.setField(aaaStatisticsManager, "eventDispatcher", new TestEventDispatcher());
-        aaaStatisticsManager.activate();
+        aaaStatisticsManager.activate(new MockComponentContext());
         aaaManager.aaaStatisticsManager = this.aaaStatisticsManager;
         TestUtils.setField(aaaManager, "eventDispatcher", new TestEventDispatcher());
         aaaManager.activate(new AaaTestBase.MockComponentContext());
diff --git a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
index 573e4de..c9564b0 100644
--- a/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
+++ b/app/src/test/java/org/opencord/aaa/impl/AaaStatisticsTest.java
@@ -26,6 +26,7 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.RADIUS;
 import org.onlab.packet.RADIUSAttribute;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.event.DefaultEventSinkRegistry;
@@ -38,6 +39,7 @@
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.aaa.AaaConfig;
 import org.slf4j.Logger;
@@ -144,9 +146,13 @@
         aaaManager.cfgService = new MockCfgService();
         aaaManager.storageService = new TestStorageService();
         aaaStatisticsManager = new AaaStatisticsManager();
+        aaaStatisticsManager.storageService = new TestStorageService();
+        aaaStatisticsManager.clusterService = new ClusterServiceAdapter();
+        aaaStatisticsManager.leadershipService = new AaaManagerTest.TestLeadershipService();
+        aaaStatisticsManager.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
         aaaSupplicantStatsManager = new AaaSupplicantMachineStatsManager();
         TestUtils.setField(aaaStatisticsManager, "eventDispatcher", new TestEventDispatcher());
-        aaaStatisticsManager.activate();
+        aaaStatisticsManager.activate(new MockComponentContext());
         TestUtils.setField(aaaSupplicantStatsManager, "eventDispatcher", new TestEventDispatcher());
         aaaSupplicantStatsManager.activate();
         aaaManager.aaaStatisticsManager = this.aaaStatisticsManager;