SEBA-986-CordMcastStatisticsService should use distributed storage infrastructure of ONOS

Change-Id: I3e22f3bde896f5a14b85fad88e5f84e58d3840a0
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
index 1044adc..e2f4779 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
@@ -19,6 +19,8 @@
 import org.onlab.packet.VlanId;
 import org.onlab.util.SafeRecurringTask;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mcast.api.McastRoute;
 import org.onosproject.mcast.api.MulticastRouteService;
@@ -40,6 +42,7 @@
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Properties;
+import java.util.Objects;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -61,13 +64,20 @@
         extends AbstractListenerManager<CordMcastStatisticsEvent, CordMcastStatisticsEventListener>
         implements CordMcastStatisticsService {
 
+    private static final String CORD_MCAST_STATISTICS_LEADERSHIP = "cord-mcast-statistics-leadership";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MulticastRouteService mcastService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
-    CordMcastStatisticsEventListener listener;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
 
     /**
      * Multicast Statistics generation time interval.
@@ -82,6 +92,7 @@
 
     @Activate
     public void activate(ComponentContext context) {
+        leadershipService.runForLeadership(CORD_MCAST_STATISTICS_LEADERSHIP);
         eventDispatcher.addSink(CordMcastStatisticsEvent.class, listenerRegistry);
         executor = Executors.newScheduledThreadPool(1);
         componentConfigService.registerProperties(getClass());
@@ -112,6 +123,8 @@
         eventDispatcher.removeSink(CordMcastStatisticsEvent.class);
         scheduledFuture.cancel(true);
         executor.shutdown();
+        leadershipService.withdraw(CORD_MCAST_STATISTICS_LEADERSHIP);
+        log.info("CordMcastStatisticsManager deactivated.");
     }
 
     public List<CordMcastStatistics> getMcastDetails() {
@@ -139,6 +152,12 @@
      * pushing mcast stat data as event.
      */
     protected void publishEvent() {
+        // Only publish events if we are the cluster leader for Igmp-stats
+        if (!Objects.equals(leadershipService.getLeader(CORD_MCAST_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            log.debug("This is not leader of : {}", CORD_MCAST_STATISTICS_LEADERSHIP);
+            return;
+        }
         log.debug("pushing cord mcast event to kafka");
         List<CordMcastStatistics> routeList = getMcastDetails();
         routeList.forEach(mcastStats -> {