SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS

Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index b301f94..6283c08 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
 package org.opencord.igmpproxy.impl;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
 import org.onlab.packet.IGMPMembership;
@@ -59,6 +60,11 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketServiceAdapter;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.impl.store.groupmember.AbstractGroupMemberStore;
+import org.opencord.igmpproxy.impl.store.machine.AbstractStateMachineStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -78,6 +84,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class IgmpManagerBase {
 
@@ -96,7 +104,7 @@
 
     // Common connect point of aggregation switch used by all devices.
     protected static final ConnectPoint COMMON_CONNECT_POINT =
-           ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
+            ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
     // Uplink ports for two olts A and B
     protected static final PortNumber PORT_A = PortNumber.portNumber(1);
     protected static final PortNumber PORT_B = PortNumber.portNumber(2);
@@ -135,23 +143,24 @@
 
         @Override
         public Device getDevice(DeviceId deviceId) {
-           if (flagForDevice) {
-               DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                           .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
-               SparseAnnotations annotations = annotationsBuilder.build();
-               Annotations[] da = {annotations };
-               Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
-               flagForDevice = false;
-               return deviceA;
+            if (flagForDevice) {
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+                Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
+                flagForDevice = false;
+                return deviceA;
             } else {
-               DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                          .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
-               SparseAnnotations annotations = annotationsBuilder.build();
-               Annotations[] da = {annotations };
-               Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
-               return deviceB;
-           }
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+                Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
+                return deviceB;
+            }
         }
+
         @Override
         public List<Port> getPorts(DeviceId deviceId) {
             return lsPorts;
@@ -162,7 +171,7 @@
             DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
                     .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
             SparseAnnotations annotations = annotationsBuilder.build();
-            Annotations[] da = {annotations };
+            Annotations[] da = {annotations};
             Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_C, Device.Type.OTHER, "", "", "", "", null, da);
             lsDevices.add(deviceA);
             return lsDevices;
@@ -184,31 +193,32 @@
     static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS = IgmpproxySsmTranslateConfig.class;
     static final Class<McastConfig> MCAST_CONFIG_CLASS = McastConfig.class;
     ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
-         new ConfigFactory<ApplicationId, IgmpproxyConfig>(
-        SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
-    @Override
-    public IgmpproxyConfig createConfig() {
-          return new IgmpproxyConfig();
-        }
-    };
+            new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+                @Override
+                public IgmpproxyConfig createConfig() {
+                    return new IgmpproxyConfig();
+                }
+            };
 
     ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
-        new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
-        SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+            new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
 
-        @Override
-        public IgmpproxySsmTranslateConfig createConfig() {
-            return new IgmpproxySsmTranslateConfig();
-        }
-    };
+                @Override
+                public IgmpproxySsmTranslateConfig createConfig() {
+                    return new IgmpproxySsmTranslateConfig();
+                }
+            };
 
 
     class MockIgmpProxyConfig extends IgmpproxyConfig {
         boolean igmpOnPodBasis = true;
 
         MockIgmpProxyConfig(boolean igmpFlagValue) {
-           igmpOnPodBasis = igmpFlagValue;
-       }
+            igmpOnPodBasis = igmpFlagValue;
+        }
+
         @Override
         public boolean igmpOnPodBasis() {
             return igmpOnPodBasis;
@@ -224,17 +234,18 @@
 
         @Override
         public ConnectPoint connectPoint() {
-               return COMMON_CONNECT_POINT;
+            return COMMON_CONNECT_POINT;
         }
     }
 
 
-     class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
-         Boolean igmpOnPodFlag = false;
+    class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
+        Boolean igmpOnPodFlag = false;
 
-         TestNetworkConfigRegistry(Boolean igmpFlag) {
-             igmpOnPodFlag = igmpFlag;
-         }
+        TestNetworkConfigRegistry(Boolean igmpFlag) {
+            igmpOnPodFlag = igmpFlag;
+        }
+
         @SuppressWarnings("unchecked")
         @Override
         public <S> Set<S> getSubjects(Class<S> subjectClass) {
@@ -242,19 +253,19 @@
                 return (Set<S>) ImmutableSet.of(DEVICE_ID_OF_A, DEVICE_ID_OF_B);
             }
             return null;
-       }
+        }
 
-         @SuppressWarnings("unchecked")
-         @Override
-         public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
-             if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
-                 IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
-                 return (C) igmpproxyConfig;
-             } else {
-                 super.getConfig(subject, configClass);
-             }
-             return null;
-         }
+        @SuppressWarnings("unchecked")
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
+                IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
+                return (C) igmpproxyConfig;
+            } else {
+                super.getConfig(subject, configClass);
+            }
+            return null;
+        }
     }
 
 
@@ -276,12 +287,18 @@
         @Override
         public void emit(OutboundPacket packet) {
             synchronized (savedPackets) {
-               savedPackets.add(packet);
-               savedPackets.notify();
+                savedPackets.add(packet);
+                savedPackets.notify();
             }
         }
-     }
+    }
 
+    class TestIgmpLeaderShipService implements IgmpLeadershipService {
+        @Override
+        public boolean isLocalLeader(DeviceId deviceId) {
+            return true;
+        }
+    }
 
     class MockMastershipService extends MastershipServiceAdapter {
         @Override
@@ -407,9 +424,9 @@
      * Mocks the DefaultPacketContext.
      */
     final class TestPacketContext extends DefaultPacketContext {
-       TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
-         super(time, inPkt, outPkt, block);
-         }
+        TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
+            super(time, inPkt, outPkt, block);
+        }
 
         @Override
         public void send() {
@@ -423,31 +440,31 @@
      * @param reply Ethernet packet
      * @throws InterruptedException
      */
-     void sendPacket(Ethernet reply) {
+    void sendPacket(Ethernet reply) {
 
-         if (reply != null) {
-             final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
+        if (reply != null) {
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
 
-             if (flagForQueryPacket) {
-                 InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
-                 context = new TestPacketContext(127L, inBoundPacket, null, false);
-                 packetProcessor.process(context);
-             } else {
-                 if (flagForPacket) {
-                     InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
-                     context = new TestPacketContext(127L, inPacket, null, false);
-                     flagForPacket = false;
+            if (flagForQueryPacket) {
+                InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
+                context = new TestPacketContext(127L, inBoundPacket, null, false);
+                packetProcessor.process(context);
+            } else {
+                if (flagForPacket) {
+                    InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+                    context = new TestPacketContext(127L, inPacket, null, false);
+                    flagForPacket = false;
 
-                     packetProcessor.process(context);
-                 } else {
-                     InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
-                     context = new TestPacketContext(127L, inBoundPacket, null, false);
-                     flagForPacket = true;
+                    packetProcessor.process(context);
+                } else {
+                    InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+                    context = new TestPacketContext(127L, inBoundPacket, null, false);
+                    flagForPacket = true;
 
-                     packetProcessor.process(context);
-                 }
-             }
-         }
+                    packetProcessor.process(context);
+                }
+            }
+        }
     }
 
     protected class MockSadisService implements SadisService {
@@ -498,7 +515,7 @@
     private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
         MockSubscriberAndDeviceInformation sub =
                 new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
-                                                       CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+                        CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
 
         @Override
         public SubscriberAndDeviceInformation get(String id) {
@@ -653,15 +670,15 @@
         }
 
         @Override
-         public void disableComponent(String name) {
-             // TODO Auto-generated method stub
-         }
+        public void disableComponent(String name) {
+            // TODO Auto-generated method stub
+        }
 
-         @Override
-         public ServiceReference getServiceReference() {
-             // TODO Auto-generated method stub
-             return null;
-         }
+        @Override
+        public ServiceReference getServiceReference() {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
 
     Ethernet buildWrongIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -669,7 +686,7 @@
         igmpMembership.setRecordType((byte) 0x33);
 
         return IgmpSender.getInstance().buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp,
-            igmpMembership, sourceIp, false);
+                igmpMembership, sourceIp, false);
     }
 
     Ethernet buildUnknownIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -679,4 +696,53 @@
         return IgmpSender.getInstance().buildIgmpPacket((byte) 0x44, groupIp, igmpMembership, sourceIp, false);
     }
 
+    class TestStateMachineStoreService extends AbstractStateMachineStore {
+        private static final int DEFAULT_COUNT = 0;
+        private Map<StateMachineId, AtomicLong> countsMap;
+
+        public TestStateMachineStoreService(Map<StateMachineId, StateMachine> map) {
+            super();
+            stateMachineMap = Maps.newConcurrentMap();
+            countsMap = Maps.newConcurrentMap();
+        }
+
+        @Override
+        public long increaseAndGetCounter(StateMachineId stateMachineId) {
+            AtomicLong count = countsMap.get(stateMachineId);
+            if (count == null) {
+                count = new AtomicLong(DEFAULT_COUNT);
+                countsMap.put(stateMachineId, count);
+            }
+            return count.incrementAndGet();
+        }
+
+        @Override
+        public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+            AtomicLong count = countsMap.get(stateMachineId);
+            if (count.get() > 0) {
+                return count.decrementAndGet();
+            } else {
+                return count.get();
+            }
+        }
+
+        @Override
+        public boolean removeCounter(StateMachineId stateMachineId) {
+            countsMap.remove(stateMachineId);
+            return true;
+        }
+
+        @Override
+        public long getCounter(StateMachineId stateMachineId) {
+            return countsMap.get(stateMachineId).get();
+        }
+
+    }
+
+
+    class TestGroupMemberStoreService extends AbstractGroupMemberStore {
+        public TestGroupMemberStoreService() {
+            super(Maps.newConcurrentMap());
+        }
+    }
 }
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
index dc220f8..e0daa14 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
@@ -15,6 +15,7 @@
  */
 package org.opencord.igmpproxy.impl;
 
+import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -23,7 +24,6 @@
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
-import org.opencord.igmpproxy.impl.IgmpManagerBase.MockComponentContext;
 
 import static org.junit.Assert.*;
 
@@ -42,6 +42,7 @@
     @Before
     public void setUp() {
         igmpManager = new IgmpManager();
+        igmpManager.igmpLeadershipService = new TestIgmpLeaderShipService();
         igmpManager.coreService = new CoreServiceAdapter();
         igmpManager.mastershipService = new MockMastershipService();
         igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
@@ -55,16 +56,22 @@
         TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
+
+        igmpManager.groupMemberStore = new TestGroupMemberStoreService();
+        StateMachineManager stateMachineManager = new StateMachineManager();
+        stateMachineManager.stateMachineStore = new TestStateMachineStoreService(Maps.newConcurrentMap());
+        stateMachineManager.activate(new MockComponentContext());
+        igmpManager.stateMachineService = stateMachineManager;
+
         // By default - we send query messages
-        SingleStateMachine.sendQuery = true;
+        StateMachineManager.sendQuery = true;
     }
 
     // Tear Down the IGMP application.
     @After
     public void tearDown() {
         igmpManager.deactivate();
-        IgmpManager.groupMemberMap.clear();
-        StateMachine.clearMap();
+        igmpManager.stateMachineService.clearAllMaps();
     }
 
     // Checking the Default value of IGMP_ON_POD_BASIS.
@@ -88,7 +95,7 @@
     @Test
     public void testIgmpOnPodBasisDefaultValue() throws InterruptedException {
         // We need to count join messages sent on the upstream
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
@@ -118,7 +125,7 @@
     @Test
     public void testIgmpOnPodBasisTrueValue() throws InterruptedException {
         // We need to count join messages
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(true);
         igmpManager.activate();
@@ -129,7 +136,7 @@
         sendPacket(firstPacket);
         // Emitted packet is stored in list savedPackets
         synchronized (savedPackets) {
-          savedPackets.wait(WAIT_TIMEOUT);
+            savedPackets.wait(WAIT_TIMEOUT);
         }
         assertNotNull(savedPackets);
         assertEquals(1, savedPackets.size());
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
index 2677511..b2e8271 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
@@ -21,6 +21,7 @@
 
 import java.util.List;
 
+import com.google.common.collect.Maps;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -53,6 +54,7 @@
     @Before
     public void setUp() {
         igmpManager = new IgmpManager();
+        igmpManager.igmpLeadershipService = new TestIgmpLeaderShipService();
         igmpManager.coreService = new CoreServiceAdapter();
         igmpManager.mastershipService = new MockMastershipService();
         igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
@@ -61,6 +63,11 @@
         igmpManager.flowRuleService = new FlowRuleServiceAdapter();
         igmpManager.multicastService = new TestMulticastRouteService();
         igmpManager.sadisService = new MockSadisService();
+        igmpManager.groupMemberStore = new TestGroupMemberStoreService();
+        StateMachineManager stateMachineService = new StateMachineManager();
+        stateMachineService.stateMachineStore = new TestStateMachineStoreService(Maps.newConcurrentMap());
+        stateMachineService.activate(new MockComponentContext());
+        igmpManager.stateMachineService = stateMachineService;
         igmpStatisticsManager = new IgmpStatisticsManager();
         igmpStatisticsManager.cfgService = new MockCfgService();
         igmpStatisticsManager.addListener(mockListener);
@@ -68,7 +75,7 @@
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
         // By default - we send query messages
-        SingleStateMachine.sendQuery = true;
+        StateMachineManager.sendQuery = true;
     }
 
     // Tear Down the IGMP application.
@@ -76,14 +83,13 @@
     public void tearDown() {
         igmpStatisticsManager.removeListener(mockListener);
         igmpStatisticsManager.deactivate();
-        IgmpManager.groupMemberMap.clear();
-        StateMachine.clearMap();
+        igmpManager.stateMachineService.clearAllMaps();
     }
 
     //Test Igmp Statistics.
     @Test
     public void testIgmpStatistics() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
 
@@ -103,7 +109,7 @@
         }
 
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-            assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
+                assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
         assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
         assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
         assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
@@ -119,18 +125,18 @@
     //Test packet with Unknown Multicast IpAddress
     @Test
     public void testIgmpUnknownMulticastIpAddress() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
 
         Ethernet firstPacket =
-             IgmpSender.getInstance().buildIgmpV3Join(UNKNOWN_GRP_IP, SOURCE_IP_OF_A);
+                IgmpSender.getInstance().buildIgmpV3Join(UNKNOWN_GRP_IP, SOURCE_IP_OF_A);
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-             igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
     }
 
     //Test Igmp Query Statistics.
@@ -146,15 +152,15 @@
 
         //IGMPV3 General Membership Query packet
         Ethernet igmpv3MembershipQueryPkt1 =
-              IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A);
+                IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A);
         sendPacket(igmpv3MembershipQueryPkt1);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+                assertEquals(igmpStatisticsManager.getIgmpStats()
+                        .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
         assertEquals(igmpStatisticsManager.getIgmpStats()
-            .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
+                .getIgmpGeneralMembershipQuery().longValue(), 1);
         assertEquals(igmpStatisticsManager.getIgmpStats()
-            .getIgmpGeneralMembershipQuery().longValue(), 1);
-        assertEquals(igmpStatisticsManager.getIgmpStats()
-             .getCurrentGrpNumCounter().longValue(), 1);
+                .getCurrentGrpNumCounter().longValue(), 1);
     }
 
     //Test Events
@@ -163,10 +169,10 @@
         final int waitEventGeneration = igmpStatisticsManager.statisticsGenerationPeriodInSeconds * 1000;
         //assert that event listened as the app activates
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-            assertEquals(mockListener.events.size(), 1));
+                assertEquals(mockListener.events.size(), 1));
 
         assertAfter(waitEventGeneration / 2, waitEventGeneration, () ->
-            assertEquals(mockListener.events.size(), 2));
+                assertEquals(mockListener.events.size(), 2));
 
         for (IgmpStatisticsEvent event : mockListener.events) {
             assertEquals(event.type(), IgmpStatisticsEvent.Type.STATS_UPDATE);
@@ -176,7 +182,7 @@
     //Test packet with Unknown Wrong Membership mode
     @Test
     public void testWrongIgmpPacket() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
@@ -185,14 +191,14 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-            igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
     }
 
     //Test packet with Unknown IGMP type.
     @Test
     public void testUnknownIgmpPacket() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
         igmpManager.activate();
@@ -201,14 +207,14 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-            igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
     }
 
     //Test packet with Insufficient Permission.
     @Test
     public void testSufficientPermission() throws InterruptedException {
-        SingleStateMachine.sendQuery = false;
+        StateMachineManager.sendQuery = false;
 
         flagForPermission = true;
         igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
@@ -218,8 +224,9 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-        assertEquals((long) 1,
-            igmpStatisticsManager.getIgmpStats().getFailJoinReqInsuffPermissionAccessCounter().longValue()));
+                assertEquals((long) 1,
+                        igmpStatisticsManager.getIgmpStats()
+                                .getFailJoinReqInsuffPermissionAccessCounter().longValue()));
     }
 
     public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {