SEBA-989-Instance coordination and state distribution mechanism in IgmpStatisticsManager

Change-Id: Ibf3f3a2c5c91c010ef909692eea913f95ee7a92e
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index 2595911..1905ec2 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
 package org.opencord.igmpproxy.impl;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
@@ -26,6 +27,9 @@
 import org.onlab.packet.VlanId;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.event.DefaultEventSinkRegistry;
 import org.onosproject.event.Event;
@@ -295,10 +299,34 @@
     }
 
     class TestIgmpLeaderShipService implements IgmpLeadershipService {
+        private NodeId nodeId = NodeId.nodeId("test-id");
+        private Leader leader = new Leader(nodeId, 0, 0);
+        private static final String TEST_TOPIC = "igmp-statistics";
+
         @Override
         public boolean isLocalLeader(DeviceId deviceId) {
             return true;
         }
+
+        @Override
+        public NodeId getLocalNodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public NodeId getLeader(String topic) {
+            return nodeId;
+        }
+
+        @Override
+        public Leadership runForLeadership(String topic) {
+            return new Leadership(TEST_TOPIC, leader, Lists.newArrayList(nodeId));
+        }
+
+        @Override
+        public void withdraw(String topic) {
+
+        }
     }
 
     class MockMastershipService extends MastershipServiceAdapter {
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
index e0daa14..3ce7760 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
@@ -24,6 +24,8 @@
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
 
 import static org.junit.Assert.*;
 
@@ -54,6 +56,9 @@
         igmpStatisticsManager = new IgmpStatisticsManager();
         igmpStatisticsManager.cfgService = new MockCfgService();
         TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+        igmpStatisticsManager.storageService = new TestStorageService();
+        igmpStatisticsManager.leadershipManager = new TestIgmpLeaderShipService();
+        igmpStatisticsManager.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
 
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
index 4421bde..3c9f5aa 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
@@ -33,6 +33,9 @@
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
 
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.igmpproxy.IgmpStatisticType;
 import org.opencord.igmpproxy.IgmpStatisticsEvent;
 
 import com.google.common.collect.Lists;
@@ -73,6 +76,9 @@
         igmpStatisticsManager.cfgService = new MockCfgService();
         igmpStatisticsManager.addListener(mockListener);
         TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+        igmpStatisticsManager.storageService = new TestStorageService();
+        igmpStatisticsManager.leadershipManager = new TestIgmpLeaderShipService();
+        igmpStatisticsManager.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
         // By default - we send query messages
@@ -110,16 +116,25 @@
         }
 
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getUnconfiguredGroupCounter().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getValidIgmpPacketCounter().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpChannelJoinCounter().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpLeaveReq().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpMsgReceived().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpValidChecksumCounter().longValue());
+                assertEquals((long) 2, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED).longValue()));
+        assertEquals(1L, igmpStatisticsManager.getStat(IgmpStatisticType.IGMP_JOIN_REQ).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_LEAVE_REQ).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_MSG_RECEIVED).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER).longValue());
 
     }
 
@@ -136,8 +151,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER).longValue()));
     }
 
     //Test Igmp Query Statistics.
@@ -157,12 +172,11 @@
                 IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A, VlanId.MAX_VLAN);
         sendPacket(igmpv3MembershipQueryPkt1);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals(igmpStatisticsManager.getIgmpStats()
-                        .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
-        assertEquals(igmpStatisticsManager.getIgmpStats()
-                .getIgmpGeneralMembershipQuery().longValue(), 1);
-        assertEquals(igmpStatisticsManager.getIgmpStats()
-                .getCurrentGrpNumCounter().longValue(), 1);
+                assertEquals(igmpStatisticsManager
+                        .getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY).longValue(), 1));
+        assertEquals(igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY).longValue(), 1);
+        assertEquals(igmpStatisticsManager.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER).longValue(), 1);
     }
 
     //Test Events
@@ -193,8 +207,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER).longValue()));
     }
 
     //Test packet with Unknown IGMP type.
@@ -209,8 +223,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER).longValue()));
     }
 
     //Test packet with Insufficient Permission.
@@ -226,9 +240,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats()
-                                .getFailJoinReqInsuffPermissionAccessCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER).longValue()));
     }
 
     public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {