SEBA-644 igmpproxy should forward IGMP to BNG on a per pod basis or
a per OLT basis.

- "igmpOnPodBasis" config property added. If true ongoing IGMP
  messages are evaluated on PON basis; they are evaluated
  OLT basis otherwise.

Change-Id: Ic931598deb99f3d09a5bc5d8117de0c56e92a039
diff --git a/pom.xml b/pom.xml
index 1d115a0..a167628 100644
--- a/pom.xml
+++ b/pom.xml
@@ -110,35 +110,16 @@
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
-                <version>3.0.1</version>
+                <version>4.1.0</version>
                 <extensions>true</extensions>
+                <inherited>true</inherited>
             </plugin>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-compiler-plugin</artifactId>
-                <version>2.5.1</version>
+                <version>3.8.0</version>
                 <configuration>
-                    <source>1.8</source>
-                    <target>1.8</target>
-                </configuration>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-scr-plugin</artifactId>
-                <version>1.21.0</version>
-                <executions>
-                    <execution>
-                        <id>generate-scr-srcdescriptor</id>
-                        <goals>
-                            <goal>scr</goal>
-                        </goals>
-                    </execution>
-                </executions>
-                <configuration>
-                    <supportedProjectTypes>
-                        <supportedProjectType>bundle</supportedProjectType>
-                        <supportedProjectType>war</supportedProjectType>
-                    </supportedProjectTypes>
+                    <release>11</release>
                 </configuration>
             </plugin>
             <plugin>
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index 40d390e..826f260 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -79,10 +79,14 @@
 import java.util.Set;
 import java.util.TimerTask;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.util.Tools.groupedThreads;
+
 /**
  * Igmp process application, use proxy mode, support first join/ last leave , fast leave
  * period query and keep alive, packet out igmp message to uplink port features.
@@ -116,6 +120,7 @@
     public static ConnectPoint connectPoint = null;
     private static ConnectPoint sourceDeviceAndPort = null;
     private static boolean enableIgmpProvisioning = false;
+    private static boolean igmpOnPodBasis = false;
 
     private static final Integer MAX_PRIORITY = 10000;
     private static final String INSTALLED = "installed";
@@ -172,6 +177,8 @@
     private int maxResp = 10; //unit is 1 sec
     private int keepAliveInterval = 120; //unit is 1 sec
 
+    private ExecutorService eventExecutor;
+
     public static int getUnsolicitedTimeout() {
         return unSolicitedTimeout;
     }
@@ -226,6 +233,8 @@
         }
         deviceService.addListener(deviceListener);
         scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 0, 1000, TimeUnit.MILLISECONDS);
+        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
+                                                                        "events-igmp-%d", log));
 
         log.info("Started");
     }
@@ -233,6 +242,7 @@
     @Deactivate
     protected void deactivate() {
         scheduledExecutorService.shutdown();
+        eventExecutor.shutdown();
 
         // de-register and null our handler
         networkConfig.removeListener(configListener);
@@ -439,93 +449,96 @@
     private class IgmpPacketProcessor implements PacketProcessor {
         @Override
         public void process(PacketContext context) {
+            eventExecutor.execute(() -> {
+                try {
+                    InboundPacket pkt = context.inPacket();
+                    Ethernet ethPkt = pkt.parsed();
+                    if (ethPkt == null) {
+                        return;
+                    }
 
-            try {
-                InboundPacket pkt = context.inPacket();
-                Ethernet ethPkt = pkt.parsed();
-                if (ethPkt == null) {
-                    return;
-                }
+                    if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
+                        return;
+                    }
 
-                if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
-                    return;
-                }
+                    IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
 
-                IPv4 ipv4Pkt = (IPv4) ethPkt.getPayload();
+                    if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
+                        return;
+                    }
 
-                if (ipv4Pkt.getProtocol() != IPv4.PROTOCOL_IGMP) {
-                    return;
-                }
+                    short vlan = ethPkt.getVlanID();
+                    DeviceId deviceId = pkt.receivedFrom().deviceId();
 
-                short vlan = ethPkt.getVlanID();
-                DeviceId deviceId = pkt.receivedFrom().deviceId();
+                    if (oltData.get(deviceId) == null &&
+                            !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                        log.error("Device not registered in netcfg :" + deviceId.toString());
+                        return;
+                    }
 
-                if (oltData.get(deviceId) == null &&
-                        !isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                    log.error("Device not registered in netcfg :" + deviceId.toString());
-                    return;
-                }
+                    IGMP igmp = (IGMP) ipv4Pkt.getPayload();
+                    switch (igmp.getIgmpType()) {
+                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+                            //Discard Query from OLT’s non-uplink port’s
+                            if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
+                                if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                                    log.info("IGMP Picked up query from connectPoint");
+                                    //OK to process packet
+                                    processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0),
+                                                                 pkt.receivedFrom(),
+                                                                 0xff & igmp.getMaxRespField());
+                                    break;
+                                } else {
+                                    //Not OK to process packet
+                                    log.warn("IGMP Picked up query from non-uplink port");
+                                    return;
+                                }
+                            }
 
-                IGMP igmp = (IGMP) ipv4Pkt.getPayload();
-                switch (igmp.getIgmpType()) {
-                    case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
-                        //Discard Query from OLT’s non-uplink port’s
-                        if (!pkt.receivedFrom().port().equals(getDeviceUplink(deviceId))) {
-                            if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                                log.info("IGMP Picked up query from connectPoint");
-                                //OK to process packet
-                                processIgmpConnectPointQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
-                                        0xff & igmp.getMaxRespField());
-                                break;
-                            } else {
-                                //Not OK to process packet
-                                log.warn("IGMP Picked up query from non-uplink port");
+                            processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
+                                             0xff & igmp.getMaxRespField());
+                            break;
+                        case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+                            log.debug("IGMP version 1  message types are not currently supported.");
+                            break;
+                        case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+                        case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+                        case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
+                            //Discard join/leave from OLT’s uplink port’s
+                            if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
+                                    isConnectPoint(deviceId, pkt.receivedFrom().port())) {
+                                log.info("IGMP Picked up join/leave from uplink/connectPoint port");
                                 return;
                             }
-                        }
 
-                        processIgmpQuery((IGMPQuery) igmp.getGroups().get(0), pkt.receivedFrom(),
-                                0xff & igmp.getMaxRespField());
-                        break;
-                    case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
-                        log.debug("IGMP version 1  message types are not currently supported.");
-                        break;
-                    case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
-                    case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
-                    case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
-                        //Discard join/leave from OLT’s uplink port’s
-                        if (pkt.receivedFrom().port().equals(getDeviceUplink(deviceId)) ||
-                                isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                            log.info("IGMP Picked up join/leave from uplink/connectPoint port");
-                            return;
-                        }
-
-                        Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
-                        while (itr.hasNext()) {
-                            IGMPGroup group = itr.next();
-                            if (group instanceof IGMPMembership) {
-                                processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
-                                        pkt.receivedFrom(), igmp.getIgmpType());
-                            } else if (group instanceof IGMPQuery) {
-                                IGMPMembership mgroup;
-                                mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
-                                mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
-                                        IGMPMembership.MODE_IS_EXCLUDE : IGMPMembership.MODE_IS_INCLUDE);
-                                processIgmpReport(mgroup, VlanId.vlanId(vlan),
-                                        pkt.receivedFrom(), igmp.getIgmpType());
+                            Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+                            while (itr.hasNext()) {
+                                IGMPGroup group = itr.next();
+                                if (group instanceof IGMPMembership) {
+                                    processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+                                                      pkt.receivedFrom(), igmp.getIgmpType());
+                                } else if (group instanceof IGMPQuery) {
+                                    IGMPMembership mgroup;
+                                    mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+                                    mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+                                                                 IGMPMembership.MODE_IS_EXCLUDE :
+                                                                 IGMPMembership.MODE_IS_INCLUDE);
+                                    processIgmpReport(mgroup, VlanId.vlanId(vlan),
+                                                      pkt.receivedFrom(), igmp.getIgmpType());
+                                }
                             }
-                        }
-                        break;
+                            break;
 
-                    default:
-                        log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
-                        break;
+                        default:
+                            log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            break;
+                    }
+
+                } catch (Exception ex) {
+                    log.error("igmp process error : {} ", ex);
+                    ex.printStackTrace();
                 }
-
-            } catch (Exception ex) {
-                log.error("igmp process error : {} ", ex);
-                ex.printStackTrace();
-            }
+            });
         }
     }
 
@@ -591,6 +604,10 @@
         }
     }
 
+    public static boolean isIgmpOnPodBasis() {
+        return igmpOnPodBasis;
+    }
+
     private void processFilterObjective(DeviceId devId, PortNumber port, boolean remove) {
         if (!enableIgmpProvisioning) {
             log.debug("IGMP trap rules won't be installed since enableIgmpProvisioning flag is not set");
@@ -728,6 +745,7 @@
             fastLeave = newCfg.fastLeave();
             pimSSmInterworking = newCfg.pimSsmInterworking();
             enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
+            igmpOnPodBasis = newCfg.igmpOnPodBasis();
 
             if (connectPointMode != newCfg.connectPointMode() ||
                     connectPoint != newCfg.connectPoint()) {
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
index 2e36a50..b59c2c1 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpproxyConfig.java
@@ -38,6 +38,7 @@
     private static final Boolean DEFAULT_CONNECT_POINT_MODE = true;
     private static final Boolean DEFAULT_PIMSSM_INTERWORKING = false;
     private static final Boolean DEFAULT_IGMP_PROVISIONING_SUPPORT = Boolean.FALSE;
+    private static final Boolean DEFAULT_IGMP_ON_POD_BASIS = Boolean.FALSE;
 
     protected static final String CONNECT_POINT_MODE = "globalConnectPointMode";
     protected static final String CONNECT_POINT = "globalConnectPoint";
@@ -55,7 +56,7 @@
     private static final String PIMSSM_INTERWORKING = "pimSSmInterworking";
     private static final String SOURCE_DEV_PORT = "sourceDeviceAndPort";
     private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
-
+    private static final String IGMP_ON_POD_BASIS = "igmpOnPodBasis";
 
     /**
      * Gets the value of a string property, protecting for an empty
@@ -205,4 +206,12 @@
         return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
                                                       DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
     }
+
+    public boolean igmpOnPodBasis() {
+        if (object == null || object.path(IGMP_ON_POD_BASIS) == null) {
+            return DEFAULT_IGMP_ON_POD_BASIS;
+        }
+        return Boolean.parseBoolean(getStringProperty(IGMP_ON_POD_BASIS,
+                                                      DEFAULT_IGMP_ON_POD_BASIS.toString()));
+    }
 }
diff --git a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
index 1675069..657b683 100644
--- a/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/SingleStateMachine.java
@@ -20,6 +20,7 @@
 import org.onosproject.net.DeviceId;
 
 import java.util.Random;
+import java.util.concurrent.atomic.AtomicInteger;
 
 /**
  * State machine for single IGMP group member. The state machine is implemented on
@@ -39,7 +40,7 @@
     private Ip4Address groupIp;
     private Ip4Address srcIp;
 
-    private int count = DEFAULT_COUNT;
+    private AtomicInteger count = new AtomicInteger(DEFAULT_COUNT);
     private int timerId = IgmpTimer.INVALID_TIMER_ID;
     private int timeOut = DEFAULT_MAX_RESP;
     private State[] states =
@@ -63,18 +64,21 @@
         this.srcIp = src;
     }
 
+    public Ip4Address getGroupIp() {
+        return groupIp;
+    }
 
     public DeviceId getDeviceId() {
         return devId;
     }
     public boolean increaseCounter() {
-        count++;
+        count.incrementAndGet();
         return true;
     }
 
     public boolean decreaseCounter() {
-        if (count > 0) {
-            count--;
+        if (count.get() > 0) {
+            count.decrementAndGet();
             return true;
         } else {
             return false;
@@ -82,7 +86,7 @@
     }
 
     public int getCounter() {
-        return count;
+        return count.get();
     }
     public int currentState() {
         return currentState;
@@ -92,13 +96,13 @@
         currentState = transition[currentState][msg];
     }
 
-    public void join() {
-        states[currentState].join();
+    public void join(boolean messageOutAllowed) {
+        states[currentState].join(messageOutAllowed);
         next(TRANSITION_JOIN);
     }
 
-    public void leave() {
-        states[currentState].leave();
+    public void leave(boolean messageOutAllowed) {
+        states[currentState].leave(messageOutAllowed);
         next(TRANSITION_LEAVE);
     }
 
@@ -124,12 +128,14 @@
     }
 
     class State {
-        public void join() {
+        public void join(boolean messageOutAllowed) {
         }
 
-        public void leave() {
-            Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
-            IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+        public void leave(boolean messageOutAllowed) {
+            if (messageOutAllowed) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3Leave(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+            }
         }
 
         public void query(int maxResp) {
@@ -141,11 +147,13 @@
     }
 
     class NonMember extends State {
-        public void join() {
-            Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
-            IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
-            timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
-            timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+        public void join(boolean messageOutAllowed) {
+            if (messageOutAllowed) {
+                Ethernet eth = IgmpSender.getInstance().buildIgmpV3Join(groupIp, srcIp);
+                IgmpSender.getInstance().sendIgmpPacketUplink(eth, devId);
+                timeOut = getTimeOut(IgmpManager.getUnsolicitedTimeout());
+                timerId = IgmpTimer.start(SingleStateMachine.this, timeOut);
+            }
         }
     }
 
diff --git a/src/main/java/org/opencord/igmpproxy/StateMachine.java b/src/main/java/org/opencord/igmpproxy/StateMachine.java
index 81ad1d4..391bdee 100644
--- a/src/main/java/org/opencord/igmpproxy/StateMachine.java
+++ b/src/main/java/org/opencord/igmpproxy/StateMachine.java
@@ -26,13 +26,16 @@
  * RFC 2236 "6. Host State Diagram".
  */
 public final class StateMachine {
+
+    private static final String GROUP = "Group";
+
     private StateMachine() {
 
     }
     private static Map<String, SingleStateMachine> map = Maps.newConcurrentMap();
 
     private static String getId(DeviceId devId, Ip4Address groupIp) {
-        return devId.toString() + "Group" + groupIp.toString();
+        return devId.toString() + GROUP + groupIp.toString();
     }
 
     private static SingleStateMachine get(DeviceId devId, Ip4Address groupIp) {
@@ -40,7 +43,7 @@
         return map.get(id);
     }
 
-    public static void destorySingle(DeviceId devId, Ip4Address groupIp) {
+    public static void destroySingle(DeviceId devId, Ip4Address groupIp) {
         SingleStateMachine machine = get(devId, groupIp);
         if (null == machine) {
             return;
@@ -51,10 +54,20 @@
 
     public static boolean join(DeviceId devId, Ip4Address groupIp, Ip4Address srcIP) {
         SingleStateMachine machine = get(devId, groupIp);
+
         if (null == machine) {
             machine = new SingleStateMachine(devId, groupIp, srcIP);
             map.put(getId(devId, groupIp), machine);
-            machine.join();
+
+            boolean shouldSendJoin = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are already active members of this group
+                // across the entire POD
+                shouldSendJoin = false;
+            }
+            machine.join(shouldSendJoin);
             return true;
         }
         machine.increaseCounter();
@@ -66,11 +79,21 @@
         if (null == machine) {
             return false;
         }
-        machine.decreaseCounter();
 
+        machine.decreaseCounter();
+        // make sure machine instance still exists.
+        // it may be removed by the preceding thread
         if (machine.getCounter() == 0) {
-            machine.leave();
-            destorySingle(devId, groupIp);
+            boolean shouldSendLeave = true;
+            if (IgmpManager.isIgmpOnPodBasis() &&
+                    groupListenedByOtherDevices(devId, groupIp)) {
+                // unset the flag if igmp messages are evaluated on POD basis
+                // and there are still active members of this group
+                // across the entire POD
+                shouldSendLeave = false;
+            }
+            machine.leave(shouldSendLeave);
+            destroySingle(devId, groupIp);
             return true;
         }
         return false;
@@ -116,4 +139,23 @@
         map.clear();
     }
 
+    /**
+     * @param devId   id of the device being excluded
+     * @param groupIp group IP address
+     * @return true if this group has at least one listener connected to
+     * any device in the map except for the device specified; false otherwise.
+     */
+    private static boolean groupListenedByOtherDevices(DeviceId devId, Ip4Address groupIp) {
+        for (SingleStateMachine machine : map.values()) {
+            if (machine.getDeviceId().equals(devId)) {
+                continue;
+            }
+            if (machine.getGroupIp().equals(groupIp)) {
+                //means group is being listened by other peers in the domain
+                return true;
+            }
+        }
+        return false;
+    }
+
 }