SEBA-989-Instance coordination and state distribution mechanism in IgmpStatisticsManager

Change-Id: Ibf3f3a2c5c91c010ef909692eea913f95ee7a92e
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
index 21d0011..69ba6b6 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
@@ -16,6 +16,22 @@
 
 package org.opencord.igmpproxy.impl;
 
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.igmpproxy.IgmpStatisticType;
+import org.opencord.igmpproxy.IgmpStatisticsEvent;
+import org.opencord.igmpproxy.IgmpStatisticsEventListener;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatistics;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -28,61 +44,95 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 
-import org.opencord.igmpproxy.IgmpStatistics;
-import org.opencord.igmpproxy.IgmpStatisticsService;
-import org.opencord.igmpproxy.IgmpStatisticsEvent;
-import org.opencord.igmpproxy.IgmpStatisticsEventListener;
-
 import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
 import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Dictionary;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 
 import com.google.common.base.Strings;
 
 
-
 /**
- *
  * Process the stats collected in Igmp proxy application. Publish to kafka onos.
- *
  */
 @Component(immediate = true, property = {
         STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+        STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
 })
 public class IgmpStatisticsManager extends
-                 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
-                         implements IgmpStatisticsService {
+        AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
+        implements IgmpStatisticsService {
+    private static final String IGMP_STATISTICS = "igmp-statistics";
+    private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
+
     private final Logger log = getLogger(getClass());
     private IgmpStatistics igmpStats;
 
-    ScheduledExecutorService executorForIgmp;
+    private ScheduledExecutorService executorForIgmp;
     private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
 
     protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+    protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
+
+    private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
+
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
+
+    private KryoNamespace statSerializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(IgmpStatistics.class)
+            .build();
+
+    //Statistics values are valid or invalid
+    private AtomicBoolean validityCheck = new AtomicBoolean(false);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService cfgService;
 
-    @Override
-    public IgmpStatistics getIgmpStats() {
-        return igmpStats;
-    }
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpLeadershipService leadershipManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
 
     @Activate
     public void activate(ComponentContext context) {
-        igmpStats = new IgmpStatistics();
+        igmpStats = getIgmpStatsInstance();
+
+
+        statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
+                .withName(IGMP_STATISTICS)
+                .withSerializer(statSerializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+
+        initStats(statistics.get(leadershipManager.getLocalNodeId()));
+        syncStats();
+
+        leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
 
         eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
         executorForIgmp = Executors.newScheduledThreadPool(1);
         cfgService.registerProperties(getClass());
+
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
+                this::resetLocal, executorForIgmp);
+
         modified(context);
         log.info("IgmpStatisticsManager Activated");
     }
@@ -90,69 +140,175 @@
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<String, Object> properties = context.getProperties();
-
         try {
             String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
             statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
-                Integer.parseInt(STATISTICS_GENERATION_PERIOD)
+                    Integer.parseInt(STATISTICS_GENERATION_PERIOD)
                     : Integer.parseInt(s.trim());
+            log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
+            statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
+                    Integer.parseInt(STATISTICS_SYNC_PERIOD)
+                    : Integer.parseInt(s.trim());
+            log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
         } catch (NumberFormatException ne) {
-            log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+            log.error("Unable to parse configuration parameter", ne);
             statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+            statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
         }
-        if (publisherTask != null) {
-            publisherTask.cancel(true);
-        }
-        publisherTask = executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
-                0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
+        stopPublishTask();
+        stopSyncTask();
+
+        startPublishTask();
+        startSyncTask();
     }
 
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(IgmpStatisticsEvent.class);
-        publisherTask.cancel(true);
+        stopPublishTask();
+        stopSyncTask();
         executorForIgmp.shutdown();
         cfgService.unregisterProperties(getClass(), false);
         igmpStats = null;
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+        leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
         log.info("IgmpStatisticsManager Deactivated");
     }
 
+    private IgmpStatistics getIgmpStatsInstance() {
+        if (igmpStats == null) {
+            igmpStats = new IgmpStatistics();
+            log.info("Instance of igmp-statistics created.");
+        }
+        return igmpStats;
+    }
+
+    private void syncStats() {
+        if (!validityCheck.get()) {
+            //sync with valid values
+            statistics.put(leadershipManager.getLocalNodeId(), snapshot());
+            validityCheck.set(true);
+            log.debug("Valid statistic values are put.");
+        }
+    }
+
+    private void initStats(IgmpStatistics init) {
+        if (init == null) {
+            log.warn("Igmp statistics was not created.");
+            return;
+        }
+        igmpStats.setStats(init);
+    }
+
+    private IgmpStatistics snapshot() {
+        return getIgmpStatsInstance();
+    }
+
+    private void startSyncTask() {
+        syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
+        log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
+    }
+
+    private void stopSyncTask() {
+        stopTask(syncTask);
+        log.debug("Sync task stopped.");
+    }
+
+    private void startPublishTask() {
+        publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
+        log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
+    }
+
+    private void stopPublishTask() {
+        stopTask(publisherTask);
+        log.debug("Publisher task stopped.");
+    }
+
+    private ScheduledFuture<?> startTask(Runnable r, int rate) {
+        return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+                0, rate, TimeUnit.SECONDS);
+    }
+
+    private void stopTask(ScheduledFuture<?> task) {
+        if (task != null) {
+            task.cancel(true);
+        }
+    }
+
+    private void resetLocal(ClusterMessage message) {
+        //reset all-statistics
+        igmpStats.resetAll();
+        validityCheck.set(false);
+    }
+
     /**
      * Publishes stats.
      */
     private void publishStats() {
+        // Only publish events if we are the cluster leader for Igmp-stats
+        if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
+                leadershipManager.getLocalNodeId())) {
+            log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
+            return;
+        }
 
         if (log.isDebugEnabled()) {
             log.debug("Notifying stats: {}", igmpStats);
-            log.debug("--IgmpDisconnect--" + igmpStats.getIgmpDisconnect());
-            log.debug("--IgmpFailJoinReq--" + igmpStats.getIgmpFailJoinReq());
-            log.debug("--IgmpJoinReq--" + igmpStats.getIgmpJoinReq());
-            log.debug("--IgmpLeaveReq--" + igmpStats.getIgmpLeaveReq());
-            log.debug("--IgmpMsgReceived--" + igmpStats.getIgmpMsgReceived());
-            log.debug("--IgmpSuccessJoinRejoinReq--" + igmpStats.getIgmpSuccessJoinRejoinReq());
-            log.debug("--Igmpv1MemershipReport--" + igmpStats.getIgmpv1MemershipReport());
-            log.debug("--Igmpv2LeaveGroup--" + igmpStats.getIgmpv2LeaveGroup());
-            log.debug("--Igmpv2MembershipReport--" + igmpStats.getIgmpv2MembershipReport());
-            log.debug("--Igmpv3MembershipQuery--" + igmpStats.getIgmpv3MembershipQuery());
-            log.debug("--Igmpv3MembershipReport--" + igmpStats.getIgmpv3MembershipReport());
-            log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getInvalidIgmpMsgReceived());
-            log.debug("--TotalMsgReceived--  " + igmpStats.getTotalMsgReceived());
-            log.debug("--UnknownIgmpTypePacketsRx--" + igmpStats.getUnknownIgmpTypePacketsRxCounter());
-            log.debug("--ReportsRxWithWrongMode--" + igmpStats.getReportsRxWithWrongModeCounter());
-            log.debug("--FailJoinReqInsuffPermission--" + igmpStats.getFailJoinReqInsuffPermissionAccessCounter());
-            log.debug("--FailJoinReqUnknownMulticastIp--" + igmpStats.getFailJoinReqUnknownMulticastIpCounter());
-            log.debug("--UnconfiguredGroupCounter--" + igmpStats.getUnconfiguredGroupCounter());
-            log.debug("--ValidIgmpPacketCounter--" + igmpStats.getValidIgmpPacketCounter());
-            log.debug("--IgmpChannelJoinCounter--" + igmpStats.getIgmpChannelJoinCounter());
-            log.debug("--CurrentGrpNumCounter--" + igmpStats.getCurrentGrpNumCounter());
-            log.debug("--IgmpValidChecksumCounter--" + igmpStats.getIgmpValidChecksumCounter());
-            log.debug("--InvalidIgmpLength--" + igmpStats.getInvalidIgmpLength());
-            log.debug("--IgmpGeneralMembershipQuery--" + igmpStats.getIgmpGeneralMembershipQuery());
-            log.debug("--IgmpGrpSpecificMembershipQuery--" + igmpStats.getIgmpGrpSpecificMembershipQuery());
-            log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" + igmpStats.getIgmpGrpAndSrcSpecificMembershipQuery());
+            log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
+            log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
+            log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
+            log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
+            log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
+            log.debug("--IgmpSuccessJoinRejoinReq--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
+            log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
+            log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
+            log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
+            log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
+            log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
+            log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
+            log.debug("--TotalMsgReceived--  " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
+            log.debug("--UnknownIgmpTypePacketsRx--" +
+                    igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
+            log.debug("--ReportsRxWithWrongMode--" +
+                    igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
+            log.debug("--FailJoinReqInsuffPermission--" +
+                    igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
+            log.debug("--FailJoinReqUnknownMulticastIp--" +
+                    igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
+            log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
+            log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
+            log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
+            log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
+            log.debug("--IgmpValidChecksumCounter--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
+            log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
+            log.debug("--IgmpGeneralMembershipQuery--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
+            log.debug("--IgmpGrpSpecificMembershipQuery--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
+            log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
         }
 
         post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
     }
 
+    @Override
+    public void increaseStat(IgmpStatisticType type) {
+        igmpStats.increaseStat(type);
+        validityCheck.set(false);
+    }
+
+    @Override
+    public void resetAllStats() {
+        ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
+                Serializer.using(statSerializer)::encode);
+    }
+
+    @Override
+    public Long getStat(IgmpStatisticType type) {
+        return igmpStats.getStat(type);
+    }
 }