SEBA-644 igmpproxy should forward IGMP to BNG on a per pod basis or
a per OLT basis.

- "igmpOnPodBasis" config property added. If true ongoing IGMP
  messages are evaluated on PON basis; they are evaluated
  OLT basis otherwise.

Change-Id: Ic931598deb99f3d09a5bc5d8117de0c56e92a039
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 81ad1d4..391bdee 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -26,13 +26,16 @@
  * RFC 2236 "6. Host State Diagram".
  */
 public final class StateMachine {
+
+    private static final String GROUP = "Group";
+
     private StateMachine() {
 
     }
     private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
 
     private static String getId(DeviceId devId, Ip4Address groupIp) {
-        return devId.toString() + "Group" + groupIp.toString();
+        return devId.toString() + GROUP + groupIp.toString();
     }
 
     private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
@@ -40,7 +43,7 @@
         return map.get(id);
     }
 
-    public static void destorySingle(DeviceId devId, Ip4Address groupIp) {
+    public static void destroySingle(DeviceId devId, Ip4Address groupIp) {
         SingleStateMachine machine = get(devId, groupIp);
         if (null == machine) {
             return;
@@ -51,10 +54,20 @@
 
     public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
         SingleStateMachine machine = get(devId, groupIp);
+
         if (null == machine) {
             machine = new SingleStateMachine(devId, groupIp, srcIP);
             map.put(getId(devId, groupIp), machine);
-            machine.join();
+
+            boolean shouldSendJoin = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are already active members of this group
+                // across the entire POD
+                shouldSendJoin = false;
+            }
+            machine.join(shouldSendJoin);
             return true;
         }
         machine.increaseCounter();
@@ -66,11 +79,21 @@
         if (null == machine) {
             return false;
         }
-        machine.decreaseCounter();
 
+        machine.decreaseCounter();
+        // make sure machine instance still exists.
+        // it may be removed by the preceding thread
         if (machine.getCounter() == 0) {
-            machine.leave();
-            destorySingle(devId, groupIp);
+            boolean shouldSendLeave = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are still active members of this group
+                // across the entire POD
+                shouldSendLeave = false;
+            }
+            machine.leave(shouldSendLeave);
+            destroySingle(devId, groupIp);
             return true;
         }
         return false;
@@ -116,4 +139,23 @@
         map.clear();
     }
 
+    /**
+     * @param devId   id of the device being excluded
+     * @param groupIp group IP address
+     * @return true if this group has at least one listener connected to
+     * any device in the map except for the device specified; false otherwise.
+     */
+    private static boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+        for (SingleStateMachine machine : map.values()) {
+            if (machine.getDeviceId().equals(devId)) {
+                continue;
+            }
+            if (machine.getGroupIp().equals(groupIp)) {
+                //means group is being listened by other peers in the domain
+                return true;
+            }
+        }
+        return false;
+    }
+
 }