SEBA-644 igmpproxy should forward IGMP to BNG on a per pod basis or
a per OLT basis.

- "igmpOnPodBasis" config property added. If true ongoing IGMP
  messages are evaluated on PON basis; they are evaluated
  OLT basis otherwise.

Change-Id: Ic931598deb99f3d09a5bc5d8117de0c56e92a039
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index 1675069..657b683 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -20,6 +20,7 @@
 import org.onosproject.net.DeviceId;
 
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * State machine for single IGMP group member. The state machine is implemented on
@@ -39,7 +40,7 @@
     private Ip4Address groupIp;
     private Ip4Address srcIp;
 
-    private int count = DEFAULT_COUNT;
+    private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
     private int timerId = IgmpTimer.INVALID_TIMER_ID;
     private int timeOut = DEFAULT_MAX_RESP;
     private State[] states =
@@ -63,18 +64,21 @@
         this.srcIp = src;
     }
 
+    public Ip4Address getGroupIp() {
+        return groupIp;
+    }
 
     public DeviceId getDeviceId() {
         return devId;
     }
     public boolean increaseCounter() {
-        count++;
+        count.incrementAndGet();
         return true;
     }
 
     public boolean decreaseCounter() {
-        if (count > 0) {
-            count--;
+        if (count.get() > 0) {
+            count.decrementAndGet();
             return true;
         } else {
             return false;
@@ -82,7 +86,7 @@
     }
 
     public int getCounter() {
-        return count;
+        return count.get();
     }
     public int currentState() {
         return currentState;
@@ -92,13 +96,13 @@
         currentState = transition[currentState][msg];
     }
 
-    public void join() {
-        states[currentState].join();
+    public void join(boolean messageOutAllowed) {
+        states[currentState].join(messageOutAllowed);
         next(TRANSITION_JOIN);
     }
 
-    public void leave() {
-        states[currentState].leave();
+    public void leave(boolean messageOutAllowed) {
+        states[currentState].leave(messageOutAllowed);
         next(TRANSITION_LEAVE);
     }
 
@@ -124,12 +128,14 @@
     }
 
     class State {
-        public void join() {
+        public void join(boolean messageOutAllowed) {
         }
 
-        public void leave() {
-            Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
-            IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+        public void leave(boolean messageOutAllowed) {
+            if (messageOutAllowed) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+            }
         }
 
         public void query(int maxResp) {
@@ -141,11 +147,13 @@
     }
 
     class NonMember extends State {
-        public void join() {
-            Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
-            IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
-            timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
-            timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+        public void join(boolean messageOutAllowed) {
+            if (messageOutAllowed) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+                timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+            }
         }
     }