SEBA-986-CordMcastStatisticsService should use distributed storage infrastructure of ONOS

Change-Id: I3e22f3bde896f5a14b85fad88e5f84e58d3840a0
diff --git a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
index 1044adc..e2f4779 100644
--- a/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
+++ b/app/src/main/java/org/opencord/cordmcast/impl/CordMcastStatisticsManager.java
@@ -19,6 +19,8 @@
 import org.onlab.packet.VlanId;
 import org.onlab.util.SafeRecurringTask;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.event.AbstractListenerManager;
 import org.onosproject.mcast.api.McastRoute;
 import org.onosproject.mcast.api.MulticastRouteService;
@@ -40,6 +42,7 @@
 import java.util.List;
 import java.util.ArrayList;
 import java.util.Properties;
+import java.util.Objects;
 
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -61,13 +64,20 @@
         extends AbstractListenerManager<CordMcastStatisticsEvent, CordMcastStatisticsEventListener>
         implements CordMcastStatisticsService {
 
+    private static final String CORD_MCAST_STATISTICS_LEADERSHIP = "cord-mcast-statistics-leadership";
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected MulticastRouteService mcastService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
-    CordMcastStatisticsEventListener listener;
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
 
     /**
      * Multicast Statistics generation time interval.
@@ -82,6 +92,7 @@
 
     @Activate
     public void activate(ComponentContext context) {
+        leadershipService.runForLeadership(CORD_MCAST_STATISTICS_LEADERSHIP);
         eventDispatcher.addSink(CordMcastStatisticsEvent.class, listenerRegistry);
         executor = Executors.newScheduledThreadPool(1);
         componentConfigService.registerProperties(getClass());
@@ -112,6 +123,8 @@
         eventDispatcher.removeSink(CordMcastStatisticsEvent.class);
         scheduledFuture.cancel(true);
         executor.shutdown();
+        leadershipService.withdraw(CORD_MCAST_STATISTICS_LEADERSHIP);
+        log.info("CordMcastStatisticsManager deactivated.");
     }
 
     public List<CordMcastStatistics> getMcastDetails() {
@@ -139,6 +152,12 @@
      * pushing mcast stat data as event.
      */
     protected void publishEvent() {
+        // Only publish events if we are the cluster leader for Igmp-stats
+        if (!Objects.equals(leadershipService.getLeader(CORD_MCAST_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            log.debug("This is not leader of : {}", CORD_MCAST_STATISTICS_LEADERSHIP);
+            return;
+        }
         log.debug("pushing cord mcast event to kafka");
         List<CordMcastStatistics> routeList = getMcastDetails();
         routeList.forEach(mcastStats -> {
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
index 84eb986..acd85f3 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTest.java
@@ -27,6 +27,7 @@
 import org.onlab.packet.VlanId;
 import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.mcast.api.McastEvent;
 import org.onosproject.mcast.api.McastRoute;
 import org.onosproject.mcast.api.McastRouteUpdate;
@@ -86,6 +87,8 @@
     private void init(boolean vlanEnabled, VlanId egressVlan, VlanId egressInnerVlan) {
         cordMcast = new CordMcast();
         cordMcastStatisticsManager = new CordMcastStatisticsManager();
+        cordMcastStatisticsManager.clusterService = new ClusterServiceAdapter();
+        cordMcastStatisticsManager.leadershipService = new LeadershipServiceMcastAdapter();
         cordMcast.coreService = new MockCoreService();
 
         TestNetworkConfigRegistry testNetworkConfigRegistry = new TestNetworkConfigRegistry();
@@ -154,23 +157,23 @@
 
       // ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
       assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
-      assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+      assertEquals(forwardMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD);
 
       // Output port number will be PORT_B i.e. 16
       Collection<TrafficTreatment> traffictreatMentCollection =
            nextMap.get(DEVICE_ID_OF_A).next();
-      assertTrue(1 == traffictreatMentCollection.size());
+      assertEquals(1, traffictreatMentCollection.size());
       OutputInstruction output = null;
       for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
          output = outputPort(trafficTreatment);
       }
       assertNotNull(output);
-      assertTrue(PORT_B == output.port());
+      assertEquals(PORT_B, output.port());
       // Checking the group ip address
       TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
       IPCriterion ipCriterion = ipAddress(trafficSelector);
       assertNotNull(ipCriterion);
-      assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+      assertEquals(MULTICAST_IP, ipCriterion.ip().address());
       //checking the vlan criterion
       TrafficSelector meta = forwardMap.get(DEVICE_ID_OF_A).meta();
       VlanIdCriterion vlanId = vlanId(meta, Criterion.Type.VLAN_VID);
@@ -198,23 +201,23 @@
         }
         // ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
         assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
-        assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+        assertEquals(forwardMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD);
 
         // Output port number will be PORT_B i.e. 16
         Collection<TrafficTreatment> traffictreatMentCollection =
                 nextMap.get(DEVICE_ID_OF_A).next();
-        assertTrue(1 == traffictreatMentCollection.size());
+        assertEquals(1, traffictreatMentCollection.size());
         OutputInstruction output = null;
         for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
             output = outputPort(trafficTreatment);
         }
         assertNotNull(output);
-        assertTrue(PORT_B == output.port());
+        assertEquals(PORT_B, output.port());
         // Checking the group ip address
         TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
         IPCriterion ipCriterion = ipAddress(trafficSelector);
         assertNotNull(ipCriterion);
-        assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+        assertEquals(MULTICAST_IP, ipCriterion.ip().address());
         //checking the vlan criteria
         TrafficSelector meta = forwardMap.get(DEVICE_ID_OF_A).meta();
         VlanIdCriterion vlanIdCriterion = vlanId(meta, Criterion.Type.VLAN_VID);
@@ -244,23 +247,23 @@
 
         // ForwardMap will contain the operation "Add" in the flowObjective. None -> CP_B
         assertNotNull(forwardMap.get(DEVICE_ID_OF_A));
-        assertTrue(forwardMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD);
+        assertEquals(forwardMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD);
 
         // Output port number will be PORT_B i.e. 16
         Collection<TrafficTreatment> traffictreatMentCollection =
                 nextMap.get(DEVICE_ID_OF_A).next();
-        assertTrue(1 == traffictreatMentCollection.size());
+        assertEquals(1, traffictreatMentCollection.size());
         OutputInstruction output = null;
         for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
             output = outputPort(trafficTreatment);
         }
         assertNotNull(output);
-        assertTrue(PORT_B == output.port());
+        assertEquals(PORT_B, output.port());
         // Checking the group ip address
         TrafficSelector trafficSelector = forwardMap.get(DEVICE_ID_OF_A).selector();
         IPCriterion ipCriterion = ipAddress(trafficSelector);
         assertNotNull(ipCriterion);
-        assertTrue(MULTICAST_IP.equals(ipCriterion.ip().address()));
+        assertEquals(MULTICAST_IP, ipCriterion.ip().address());
         //checking the vlan criteria
         TrafficSelector meta = forwardMap.get(DEVICE_ID_OF_A).meta();
         VlanIdCriterion vlanIdCriterion = vlanId(meta, Criterion.Type.VLAN_VID);
@@ -289,16 +292,16 @@
 
        // NextMap will contain the operation "ADD_TO_EXISTING" in the DefaultNextObjective.
        assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.ADD_TO_EXISTING));
+       assertEquals(nextMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.ADD_TO_EXISTING));
        // Output port number will be changed to 24 i.e. PORT_C
        Collection<TrafficTreatment> traffictreatMentCollection = nextMap.get(DEVICE_ID_OF_A).next();
-       assertTrue(1 == traffictreatMentCollection.size());
+       assertEquals(1, traffictreatMentCollection.size());
        OutputInstruction output = null;
        for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
           output = outputPort(trafficTreatment);
        }
        assertNotNull(output);
-       assertTrue(PORT_C == output.port());
+       assertEquals(PORT_C, output.port());
     }
 
     @Test
@@ -316,19 +319,19 @@
        cordMcast.listener.event(event);
        // Operation will be REMOVE_FROM_EXISTING and nextMap will be updated. ( None --> CP_C)
        assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE_FROM_EXISTING));
+       assertEquals(nextMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.REMOVE_FROM_EXISTING));
 
        // Output port number will be PORT_B i.e. 16
        // Port_B is removed from the group.
        Collection<TrafficTreatment> traffictreatMentCollection =
             nextMap.get(DEVICE_ID_OF_A).next();
-       assertTrue(1 == traffictreatMentCollection.size());
+       assertEquals(1, traffictreatMentCollection.size());
        OutputInstruction output = null;
        for (TrafficTreatment trafficTreatment : traffictreatMentCollection) {
           output = outputPort(trafficTreatment);
        }
        assertNotNull(output);
-       assertTrue(PORT_B == output.port());
+       assertEquals(PORT_B, output.port());
 
     }
 
@@ -348,7 +351,7 @@
 
        // Operation will be REMOVE and nextMap will be updated.  None --> { }
        assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-       assertTrue(nextMap.get(DEVICE_ID_OF_A).op() == Objective.Operation.REMOVE));
+       assertEquals(nextMap.get(DEVICE_ID_OF_A).op(), Objective.Operation.REMOVE));
   }
 
   @Test
@@ -374,8 +377,8 @@
        cordMcast.listener.event(event);
        // OltInfo flag is set to true when olt device is unkown
        assertAfter(WAIT, WAIT * 2, () -> assertTrue(knownOltFlag));
-       assertTrue(0 == forwardMap.size());
-       assertTrue(0 == nextMap.size());
+       assertEquals(0, forwardMap.size());
+       assertEquals(0, nextMap.size());
 
   }
 
@@ -389,8 +392,8 @@
       McastEvent event = new McastEvent(McastEvent.Type.ROUTE_ADDED, previousSubject, currentSubject);
       cordMcast.listener.event(event);
       // There will be no forwarding objective
-      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
-      assertTrue(0 == nextMap.size());
+      assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+      assertEquals(0, nextMap.size());
 
    }
 
@@ -407,8 +410,8 @@
       McastEvent event = new McastEvent(McastEvent.Type.ROUTE_REMOVED, previousSubject, currentSubject);
       cordMcast.listener.event(event);
       // There will be no forwarding objective
-      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
-      assertTrue(0 == nextMap.size());
+      assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+      assertEquals(0, nextMap.size());
 
    }
 
@@ -425,8 +428,8 @@
       McastEvent event = new McastEvent(McastEvent.Type.SOURCES_ADDED, previousSubject, currentSubject);
       cordMcast.listener.event(event);
       // There will be no forwarding objective
-      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
-      assertTrue(0 == nextMap.size());
+      assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+      assertEquals(0, nextMap.size());
 
    }
 
@@ -442,8 +445,8 @@
       // Warning message of unknown event will be displayed.
       McastEvent event = new McastEvent(McastEvent.Type.SOURCES_REMOVED, previousSubject, currentSubject);
       cordMcast.listener.event(event);
-      assertAfter(WAIT, WAIT * 2, () -> assertTrue(0 == forwardMap.size()));
-      assertTrue(0 == nextMap.size());
+      assertAfter(WAIT, WAIT * 2, () -> assertEquals(0, forwardMap.size()));
+      assertEquals(0, nextMap.size());
    }
 
     @Test
diff --git a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
index 9271976..758be58 100644
--- a/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
+++ b/app/src/test/java/org/opencord/cordmcast/impl/McastTestBase.java
@@ -28,6 +28,8 @@
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.TestApplicationId;
+import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.event.DefaultEventSinkRegistry;
@@ -151,6 +153,13 @@
           }
      }
 
+     class LeadershipServiceMcastAdapter extends LeadershipServiceAdapter {
+        @Override
+        public NodeId getLeader(String path) {
+            return NodeId.nodeId("local");
+        }
+     }
+
     protected class MockSadisService implements SadisService {
 
         @Override