SEBA-640 IgmpProxy should use distributed storage infrastructure of ONOS

Change-Id: I4b1c4d326a5501e9c0e046e3ee8d973ca5f73d70
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index b301f94..6283c08 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
 package org.opencord.igmpproxy.impl;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
 import org.onlab.packet.IGMPMembership;
@@ -59,6 +60,11 @@
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketServiceAdapter;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.impl.store.groupmember.AbstractGroupMemberStore;
+import org.opencord.igmpproxy.impl.store.machine.AbstractStateMachineStore;
+import org.opencord.igmpproxy.statemachine.StateMachine;
+import org.opencord.igmpproxy.statemachine.StateMachineId;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -78,6 +84,8 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicLong;
 
 public class IgmpManagerBase {
 
@@ -96,7 +104,7 @@
 
     // Common connect point of aggregation switch used by all devices.
     protected static final ConnectPoint COMMON_CONNECT_POINT =
-           ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
+            ConnectPoint.deviceConnectPoint("of:00000000000000003/3");
     // Uplink ports for two olts A and B
     protected static final PortNumber PORT_A = PortNumber.portNumber(1);
     protected static final PortNumber PORT_B = PortNumber.portNumber(2);
@@ -135,23 +143,24 @@
 
         @Override
         public Device getDevice(DeviceId deviceId) {
-           if (flagForDevice) {
-               DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                           .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
-               SparseAnnotations annotations = annotationsBuilder.build();
-               Annotations[] da = {annotations };
-               Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
-               flagForDevice = false;
-               return deviceA;
+            if (flagForDevice) {
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+                Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_A, Device.Type.OTHER, "", "", "", "", null, da);
+                flagForDevice = false;
+                return deviceA;
             } else {
-               DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
-                          .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
-               SparseAnnotations annotations = annotationsBuilder.build();
-               Annotations[] da = {annotations };
-               Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
-               return deviceB;
-           }
+                DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
+                        .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_B.toString());
+                SparseAnnotations annotations = annotationsBuilder.build();
+                Annotations[] da = {annotations};
+                Device deviceB = new DefaultDevice(null, DEVICE_ID_OF_B, Device.Type.OTHER, "", "", "", "", null, da);
+                return deviceB;
+            }
         }
+
         @Override
         public List<Port> getPorts(DeviceId deviceId) {
             return lsPorts;
@@ -162,7 +171,7 @@
             DefaultAnnotations.Builder annotationsBuilder = DefaultAnnotations.builder()
                     .set(AnnotationKeys.MANAGEMENT_ADDRESS, SOURCE_IP_OF_A.toString());
             SparseAnnotations annotations = annotationsBuilder.build();
-            Annotations[] da = {annotations };
+            Annotations[] da = {annotations};
             Device deviceA = new DefaultDevice(null, DEVICE_ID_OF_C, Device.Type.OTHER, "", "", "", "", null, da);
             lsDevices.add(deviceA);
             return lsDevices;
@@ -184,31 +193,32 @@
     static final Class<IgmpproxySsmTranslateConfig> IGMPPROXY_SSM_CONFIG_CLASS = IgmpproxySsmTranslateConfig.class;
     static final Class<McastConfig> MCAST_CONFIG_CLASS = McastConfig.class;
     ConfigFactory<ApplicationId, IgmpproxyConfig> igmpproxyConfigFactory =
-         new ConfigFactory<ApplicationId, IgmpproxyConfig>(
-        SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
-    @Override
-    public IgmpproxyConfig createConfig() {
-          return new IgmpproxyConfig();
-        }
-    };
+            new ConfigFactory<ApplicationId, IgmpproxyConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_CONFIG_CLASS, "igmpproxy") {
+                @Override
+                public IgmpproxyConfig createConfig() {
+                    return new IgmpproxyConfig();
+                }
+            };
 
     ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig> igmpproxySsmConfigFactory =
-        new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
-        SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
+            new ConfigFactory<ApplicationId, IgmpproxySsmTranslateConfig>(
+                    SubjectFactories.APP_SUBJECT_FACTORY, IGMPPROXY_SSM_CONFIG_CLASS, "ssmTranslate", true) {
 
-        @Override
-        public IgmpproxySsmTranslateConfig createConfig() {
-            return new IgmpproxySsmTranslateConfig();
-        }
-    };
+                @Override
+                public IgmpproxySsmTranslateConfig createConfig() {
+                    return new IgmpproxySsmTranslateConfig();
+                }
+            };
 
 
     class MockIgmpProxyConfig extends IgmpproxyConfig {
         boolean igmpOnPodBasis = true;
 
         MockIgmpProxyConfig(boolean igmpFlagValue) {
-           igmpOnPodBasis = igmpFlagValue;
-       }
+            igmpOnPodBasis = igmpFlagValue;
+        }
+
         @Override
         public boolean igmpOnPodBasis() {
             return igmpOnPodBasis;
@@ -224,17 +234,18 @@
 
         @Override
         public ConnectPoint connectPoint() {
-               return COMMON_CONNECT_POINT;
+            return COMMON_CONNECT_POINT;
         }
     }
 
 
-     class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
-         Boolean igmpOnPodFlag = false;
+    class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
+        Boolean igmpOnPodFlag = false;
 
-         TestNetworkConfigRegistry(Boolean igmpFlag) {
-             igmpOnPodFlag = igmpFlag;
-         }
+        TestNetworkConfigRegistry(Boolean igmpFlag) {
+            igmpOnPodFlag = igmpFlag;
+        }
+
         @SuppressWarnings("unchecked")
         @Override
         public <S> Set<S> getSubjects(Class<S> subjectClass) {
@@ -242,19 +253,19 @@
                 return (Set<S>) ImmutableSet.of(DEVICE_ID_OF_A, DEVICE_ID_OF_B);
             }
             return null;
-       }
+        }
 
-         @SuppressWarnings("unchecked")
-         @Override
-         public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
-             if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
-                 IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
-                 return (C) igmpproxyConfig;
-             } else {
-                 super.getConfig(subject, configClass);
-             }
-             return null;
-         }
+        @SuppressWarnings("unchecked")
+        @Override
+        public <S, C extends Config<S>> C getConfig(S subject, Class<C> configClass) {
+            if (configClass.getName().equalsIgnoreCase("org.opencord.igmpproxy.impl.IgmpproxyConfig")) {
+                IgmpproxyConfig igmpproxyConfig = new MockIgmpProxyConfig(igmpOnPodFlag);
+                return (C) igmpproxyConfig;
+            } else {
+                super.getConfig(subject, configClass);
+            }
+            return null;
+        }
     }
 
 
@@ -276,12 +287,18 @@
         @Override
         public void emit(OutboundPacket packet) {
             synchronized (savedPackets) {
-               savedPackets.add(packet);
-               savedPackets.notify();
+                savedPackets.add(packet);
+                savedPackets.notify();
             }
         }
-     }
+    }
 
+    class TestIgmpLeaderShipService implements IgmpLeadershipService {
+        @Override
+        public boolean isLocalLeader(DeviceId deviceId) {
+            return true;
+        }
+    }
 
     class MockMastershipService extends MastershipServiceAdapter {
         @Override
@@ -407,9 +424,9 @@
      * Mocks the DefaultPacketContext.
      */
     final class TestPacketContext extends DefaultPacketContext {
-       TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
-         super(time, inPkt, outPkt, block);
-         }
+        TestPacketContext(long time, InboundPacket inPkt, OutboundPacket outPkt, boolean block) {
+            super(time, inPkt, outPkt, block);
+        }
 
         @Override
         public void send() {
@@ -423,31 +440,31 @@
      * @param reply Ethernet packet
      * @throws InterruptedException
      */
-     void sendPacket(Ethernet reply) {
+    void sendPacket(Ethernet reply) {
 
-         if (reply != null) {
-             final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
+        if (reply != null) {
+            final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
 
-             if (flagForQueryPacket) {
-                 InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
-                 context = new TestPacketContext(127L, inBoundPacket, null, false);
-                 packetProcessor.process(context);
-             } else {
-                 if (flagForPacket) {
-                     InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
-                     context = new TestPacketContext(127L, inPacket, null, false);
-                     flagForPacket = false;
+            if (flagForQueryPacket) {
+                InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_C, reply, byteBuffer);
+                context = new TestPacketContext(127L, inBoundPacket, null, false);
+                packetProcessor.process(context);
+            } else {
+                if (flagForPacket) {
+                    InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+                    context = new TestPacketContext(127L, inPacket, null, false);
+                    flagForPacket = false;
 
-                     packetProcessor.process(context);
-                 } else {
-                     InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
-                     context = new TestPacketContext(127L, inBoundPacket, null, false);
-                     flagForPacket = true;
+                    packetProcessor.process(context);
+                } else {
+                    InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+                    context = new TestPacketContext(127L, inBoundPacket, null, false);
+                    flagForPacket = true;
 
-                     packetProcessor.process(context);
-                 }
-             }
-         }
+                    packetProcessor.process(context);
+                }
+            }
+        }
     }
 
     protected class MockSadisService implements SadisService {
@@ -498,7 +515,7 @@
     private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
         MockSubscriberAndDeviceInformation sub =
                 new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
-                                                       CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
+                        CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
 
         @Override
         public SubscriberAndDeviceInformation get(String id) {
@@ -653,15 +670,15 @@
         }
 
         @Override
-         public void disableComponent(String name) {
-             // TODO Auto-generated method stub
-         }
+        public void disableComponent(String name) {
+            // TODO Auto-generated method stub
+        }
 
-         @Override
-         public ServiceReference getServiceReference() {
-             // TODO Auto-generated method stub
-             return null;
-         }
+        @Override
+        public ServiceReference getServiceReference() {
+            // TODO Auto-generated method stub
+            return null;
+        }
     }
 
     Ethernet buildWrongIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -669,7 +686,7 @@
         igmpMembership.setRecordType((byte) 0x33);
 
         return IgmpSender.getInstance().buildIgmpPacket(IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT, groupIp,
-            igmpMembership, sourceIp, false);
+                igmpMembership, sourceIp, false);
     }
 
     Ethernet buildUnknownIgmpPacket(Ip4Address groupIp, Ip4Address sourceIp) {
@@ -679,4 +696,53 @@
         return IgmpSender.getInstance().buildIgmpPacket((byte) 0x44, groupIp, igmpMembership, sourceIp, false);
     }
 
+    class TestStateMachineStoreService extends AbstractStateMachineStore {
+        private static final int DEFAULT_COUNT = 0;
+        private Map<StateMachineId, AtomicLong> countsMap;
+
+        public TestStateMachineStoreService(Map<StateMachineId, StateMachine> map) {
+            super();
+            stateMachineMap = Maps.newConcurrentMap();
+            countsMap = Maps.newConcurrentMap();
+        }
+
+        @Override
+        public long increaseAndGetCounter(StateMachineId stateMachineId) {
+            AtomicLong count = countsMap.get(stateMachineId);
+            if (count == null) {
+                count = new AtomicLong(DEFAULT_COUNT);
+                countsMap.put(stateMachineId, count);
+            }
+            return count.incrementAndGet();
+        }
+
+        @Override
+        public long decreaseAndGetCounter(StateMachineId stateMachineId) {
+            AtomicLong count = countsMap.get(stateMachineId);
+            if (count.get() > 0) {
+                return count.decrementAndGet();
+            } else {
+                return count.get();
+            }
+        }
+
+        @Override
+        public boolean removeCounter(StateMachineId stateMachineId) {
+            countsMap.remove(stateMachineId);
+            return true;
+        }
+
+        @Override
+        public long getCounter(StateMachineId stateMachineId) {
+            return countsMap.get(stateMachineId).get();
+        }
+
+    }
+
+
+    class TestGroupMemberStoreService extends AbstractGroupMemberStore {
+        public TestGroupMemberStoreService() {
+            super(Maps.newConcurrentMap());
+        }
+    }
 }