Publish cluster-wide AAA stats

Change-Id: Icbdacdae08b6235be022f85eb41ce6d0f8f35a35
diff --git a/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java b/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java
index 125898e..8bb2f53 100644
--- a/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java
+++ b/app/src/main/java/org/opencord/aaa/impl/AaaStatisticsManager.java
@@ -16,60 +16,171 @@
 
 package org.opencord.aaa.impl;
 
+import com.google.common.base.Strings;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.event.AbstractListenerManager;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.opencord.aaa.AaaStatistics;
-import org.opencord.aaa.AuthenticationStatisticsDelegate;
+import org.opencord.aaa.AaaStatisticsSnapshot;
 import org.opencord.aaa.AuthenticationStatisticsEvent;
 import org.opencord.aaa.AuthenticationStatisticsEventListener;
 import org.opencord.aaa.AuthenticationStatisticsService;
+import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
+import java.util.Dictionary;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
+import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
-
-
-@Component(immediate = true)
+/**
+ * Manages collection and publishing of statistics for the AAA application.
+ */
+@Component(immediate = true, property = {
+        STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+        STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
+})
 public class AaaStatisticsManager
-extends AbstractListenerManager<AuthenticationStatisticsEvent, AuthenticationStatisticsEventListener>
-implements AuthenticationStatisticsService {
+        extends AbstractListenerManager<AuthenticationStatisticsEvent, AuthenticationStatisticsEventListener>
+        implements AuthenticationStatisticsService {
 
-    private AuthenticationStatisticsDelegate statsDelegate;
+    private static final String AAA_STATISTICS_LEADERSHIP = "aaa-statistics";
 
-    @Override
-    public AuthenticationStatisticsDelegate getStatsDelegate() {
-        return statsDelegate;
-    }
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("aaa-statistics-reset");
+
+    private int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+    private int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
+
+    private ScheduledExecutorService executor;
+
+    private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
+
+    private EventuallyConsistentMap<NodeId, AaaStatisticsSnapshot> statistics;
 
     private final Logger log = getLogger(getClass());
     private AaaStatistics aaaStats;
-    public Map<Byte, Long> outgoingPacketMap = new HashMap<Byte, Long>();
+    private Map<Byte, Long> outgoingPacketMap = new HashMap<>();
     private static final int PACKET_COUNT_FOR_AVERAGE_RTT_CALCULATION = 5;
 
+    KryoNamespace serializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(AaaStatisticsSnapshot.class)
+            .register(ClusterMessage.class)
+            .register(MessageSubject.class)
+            .build();
+
     @Override
     public AaaStatistics getAaaStats() {
         return aaaStats;
     }
 
+    @Override
+    public AaaStatisticsSnapshot getClusterStatistics() {
+        return aggregate();
+    }
+
     @Activate
-    public void activate() {
+    public void activate(ComponentContext context) {
         log.info("Activate aaaStatisticsManager");
-        aaaStats = new AaaStatistics();
-        statsDelegate = new InternalAuthenticationDelegateForStatistics();
+        modified(context);
+
+        statistics = storageService.<NodeId, AaaStatisticsSnapshot>eventuallyConsistentMapBuilder()
+                        .withName("aaa-statistics")
+                        .withSerializer(serializer)
+                        .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                        .build();
+
+        AaaStatisticsSnapshot snapshot = statistics.get(clusterService.getLocalNode().id());
+        if (snapshot == null) {
+            aaaStats = new AaaStatistics();
+        } else {
+            aaaStats = AaaStatistics.fromSnapshot(snapshot);
+        }
+
+        leadershipService.runForLeadership(AAA_STATISTICS_LEADERSHIP);
+
         eventDispatcher.addSink(AuthenticationStatisticsEvent.class, listenerRegistry);
+
+        executor = Executors.newScheduledThreadPool(1);
+
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+                this::resetLocal, executor);
+
+        syncTask = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::syncStats),
+                0, statisticsSyncPeriodInSeconds, TimeUnit.SECONDS);
+
+        publisherTask = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
+                0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
     }
 
     @Deactivate
     public void deactivate() {
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+
+        publisherTask.cancel(true);
+        syncTask.cancel(true);
+        executor.shutdownNow();
+
+        leadershipService.withdraw(AAA_STATISTICS_LEADERSHIP);
+
         eventDispatcher.removeSink(AuthenticationStatisticsEvent.class);
     }
 
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+
+        String s = Tools.get(properties, "statisticsGenerationPeriodInSeconds");
+        statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_GENERATION_PERIOD_DEFAULT
+                : Integer.parseInt(s.trim());
+
+        s = Tools.get(properties, "statisticsSyncPeriodInSeconds");
+        statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_SYNC_PERIOD_DEFAULT
+                : Integer.parseInt(s.trim());
+    }
+
     @Override
     public void handleRoundtripTime(byte inPacketIdentifier) {
         long inTimeInMilis = System.currentTimeMillis();
@@ -83,7 +194,8 @@
 
     @Override
     public void resetAllCounters() {
-        aaaStats.resetAllCounters();
+        ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, new byte[]{});
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
     }
 
     @Override
@@ -100,14 +212,46 @@
     }
 
     /**
-     *Delegate allowing the StateMachine to notify us of events.
+     * Pushes in-memory stats into the eventually-consistent map for cluster-wide retention.
      */
-    private class InternalAuthenticationDelegateForStatistics implements AuthenticationStatisticsDelegate {
-        @Override
-        public void notify(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
-            log.debug("Authentication Statistics event {} for {}", authenticationStatisticsEvent.type(),
-                    authenticationStatisticsEvent.subject());
-            post(authenticationStatisticsEvent);
+    private void syncStats() {
+        calculatePacketRoundtripTime();
+
+        statistics.put(clusterService.getLocalNode().id(), aaaStats.snapshot());
+    }
+
+    /**
+     * Aggregates cluster-wise stats from the ec-map.
+     *
+     * @return aggregate stats
+     */
+    private AaaStatisticsSnapshot aggregate() {
+        return statistics.values().stream()
+                .reduce(new AaaStatisticsSnapshot(), AaaStatisticsSnapshot::add);
+    }
+
+    /**
+     * Publishes cluster-wide stats.
+     */
+    private void publishStats() {
+        // only publish if we are the leader
+        if (!Objects.equals(leadershipService.getLeader(AAA_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            return;
         }
+
+        AaaStatisticsSnapshot clusterStats = aggregate();
+
+        if (log.isDebugEnabled()) {
+            log.debug("Notifying stats: {}", clusterStats);
+        }
+
+        post(new AuthenticationStatisticsEvent(AuthenticationStatisticsEvent.Type.STATS_UPDATE,
+                AaaStatistics.fromSnapshot(clusterStats)));
+    }
+
+    private void resetLocal(ClusterMessage m) {
+        aaaStats.resetAllCounters();
+        syncStats();
     }
 }