IgmpProxy performance improvements

Change-Id: I0364437ca934253cadb62981e84dd240eee9d0c9
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index 133ff18..2025e0b 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -16,22 +16,7 @@
 package org.opencord.igmpproxy.impl;
 
 import com.google.common.collect.Sets;
-import org.onosproject.net.Device;
-import org.opencord.igmpproxy.IgmpLeadershipService;
-import org.opencord.igmpproxy.IgmpStatisticType;
-import org.opencord.igmpproxy.IgmpStatisticsService;
-import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
-import org.opencord.igmpproxy.GroupMemberId;
-import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
-import org.opencord.igmpproxy.statemachine.StateMachineService;
-import org.opencord.sadis.BaseInformationService;
-import org.opencord.sadis.SadisService;
-import org.opencord.sadis.SubscriberAndDeviceInformation;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import org.onlab.packet.EthType;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
@@ -45,8 +30,11 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
@@ -68,22 +56,36 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.mcast.api.McastRoute;
-import org.onosproject.mcast.api.MulticastRouteService;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.PacketContext;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatisticType;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.List;
 import java.util.Collection;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.TimerTask;
@@ -91,16 +93,15 @@
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 
-import static org.onlab.packet.IGMPMembership.MODE_IS_INCLUDE;
-import static org.onlab.packet.IGMPMembership.MODE_IS_EXCLUDE;
-import static org.onlab.packet.IGMPMembership.CHANGE_TO_INCLUDE_MODE;
-import static org.onlab.packet.IGMPMembership.CHANGE_TO_EXCLUDE_MODE;
 import static org.onlab.packet.IGMPMembership.ALLOW_NEW_SOURCES;
 import static org.onlab.packet.IGMPMembership.BLOCK_OLD_SOURCES;
-
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.packet.IGMPMembership.CHANGE_TO_EXCLUDE_MODE;
+import static org.onlab.packet.IGMPMembership.CHANGE_TO_INCLUDE_MODE;
+import static org.onlab.packet.IGMPMembership.MODE_IS_EXCLUDE;
+import static org.onlab.packet.IGMPMembership.MODE_IS_INCLUDE;
 import static org.onlab.util.Tools.groupedThreads;
 
 /**
@@ -220,6 +221,9 @@
     private int maxResp = 10; //unit is 1 sec
     private int keepAliveInterval = 120; //unit is 1 sec
 
+    private int numberOfIgmpReportProcessorThreads = 20;
+    ExecutorService[] igmpReportProcessServiceExecutorList;
+
     private ExecutorService eventExecutor;
 
     public static int getUnsolicitedTimeout() {
@@ -265,8 +269,9 @@
         }
         deviceService.addListener(deviceListener);
         scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
-        eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
+        eventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
                 "events-igmp-%d", log));
+        initializeIgmpReportProcessServiceExecutors();
         log.info("Started");
     }
 
@@ -274,6 +279,7 @@
     protected void deactivate() {
         scheduledExecutorService.shutdown();
         eventExecutor.shutdown();
+        shutdownIgmpReportProcessServiceExecutors();
 
         // de-register and null our handler
         networkConfig.removeListener(configListener);
@@ -285,13 +291,32 @@
         log.info("Stopped");
     }
 
+    private void initializeIgmpReportProcessServiceExecutors() {
+        igmpReportProcessServiceExecutorList = new ExecutorService[numberOfIgmpReportProcessorThreads];
+        for (int i = 0; i < numberOfIgmpReportProcessorThreads; i++) {
+            ThreadFactory igmpReportProcessorThreadFactory =
+                    new ThreadFactoryBuilder().setNameFormat("report-processor-igmp-" + i)
+                            .setUncaughtExceptionHandler((t, e) ->
+                                    log.error("Uncaught exception on {}: ", t.getName(), e))
+                            .build();
+            ExecutorService igmpReportProcessServiceExecutor = Executors.newSingleThreadExecutor(
+                    igmpReportProcessorThreadFactory);
+            igmpReportProcessServiceExecutorList[i] = igmpReportProcessServiceExecutor;
+        }
+    }
+    private void shutdownIgmpReportProcessServiceExecutors() {
+        for (ExecutorService executor : igmpReportProcessServiceExecutorList) {
+            executor.shutdown();
+        }
+    }
+
     protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
         try {
             String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
                     .annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
             return Ip4Address.valueOf(mgmtAddress[0]);
         } catch (Exception ex) {
-            log.info("No valid Ipaddress for " + ofDeviceId.toString());
+            log.info("No valid Ipaddress for {}", ofDeviceId);
             return null;
         }
     }
@@ -344,6 +369,16 @@
         return (maxResp + 5) / 10;
     }
 
+    private void queueIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
+        int packetHashCode = Objects.hash(igmpGroup.getGaddr(), connectPoint);
+        int threadId = Math.abs(packetHashCode % numberOfIgmpReportProcessorThreads);
+        log.debug("IGMP report for ConnectPoint {} and group IP {} shall be processed in thread #{}",
+                cp, igmpGroup.getGaddr(), threadId);
+
+        igmpReportProcessServiceExecutorList[threadId].execute(
+                () -> processIgmpReport(igmpGroup, vlan, cp, igmpType));
+    }
+
     private Ip4Address ssmTranslateRoute(IpAddress group) {
         return ssmTranslateTable.get(group);
     }
@@ -353,11 +388,11 @@
         PortNumber portNumber = cp.port();
 
         log.debug("Processing IGMP report on membership {} for vlan {} on port {} with type {}",
-                  igmpGroup, vlan, cp, igmpType);
+                igmpGroup, vlan, cp, igmpType);
 
         Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
         if (!groupIp.isMulticast()) {
-            log.info(groupIp.toString() + " is not a valid group address");
+            log.info("{} is not a valid group address", groupIp);
             igmpStatisticsManager.increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER);
             return;
         }
@@ -405,6 +440,7 @@
             }
         }
         GroupMemberId groupMemberKey = GroupMemberId.of(groupIp, deviceId, portNumber);
+        log.debug("{} for {}", join ? "Join" : "Leave", groupMemberKey);
         GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
 
         if (join) {
@@ -522,12 +558,13 @@
      * Packet processor responsible for forwarding packets along their paths.
      */
     private class IgmpPacketProcessor implements PacketProcessor {
+
         @Override
         public void process(PacketContext context) {
-
             eventExecutor.execute(() -> {
                 try {
                     InboundPacket pkt = context.inPacket();
+                    log.debug("IgmpPacketProcessor shall process InboundPacket: {}", pkt);
                     Ethernet ethPkt = pkt.parsed();
                     if (ethPkt == null) {
                         return;
@@ -624,15 +661,16 @@
         Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
         while (itr.hasNext()) {
             IGMPGroup group = itr.next();
+            log.debug("IGMPGroup {}", group.getGaddr());
             if (group instanceof IGMPMembership) {
-                processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+                queueIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
                         pkt.receivedFrom(), igmp.getIgmpType());
             } else {
                 IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
                 mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
                         IGMPMembership.MODE_IS_EXCLUDE :
                         IGMPMembership.MODE_IS_INCLUDE);
-                processIgmpReport(mgroup, VlanId.vlanId(vlan),
+                queueIgmpReport(mgroup, VlanId.vlanId(vlan),
                         pkt.receivedFrom(), igmp.getIgmpType());
             }
         }
@@ -961,6 +999,11 @@
             pimSSmInterworking = newCfg.pimSsmInterworking();
             enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
             igmpOnPodBasis = newCfg.igmpOnPodBasis();
+            if (numberOfIgmpReportProcessorThreads != newCfg.numberOfIgmpReportProcessorThreads()) {
+                numberOfIgmpReportProcessorThreads = newCfg.numberOfIgmpReportProcessorThreads();
+                shutdownIgmpReportProcessServiceExecutors();
+                initializeIgmpReportProcessServiceExecutors();
+            }
             if (newCfg.outgoingIgmpWithV3() != null &&
                     outgoingIgmpWithV3 != newCfg.outgoingIgmpWithV3()) {
                 outgoingIgmpWithV3 = newCfg.outgoingIgmpWithV3();
@@ -1130,4 +1173,4 @@
         processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
     }
 
-}
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
index 6d8591a..e2db2ff 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
@@ -32,6 +32,7 @@
     protected static final String DEFAULT_LAST_QUERY_COUNT = "2";
     protected static final String DEFAULT_IGMP_COS = "7";
     protected static final String DEFAULT_UNI_IGMP_COS = "7";
+    protected static final String DEFAULT_NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS = "20";
     protected static final Boolean DEFAULT_FAST_LEAVE = false;
     protected static final Boolean DEFAULT_PERIODIC_QUERY = true;
     protected static final String DEFAULT_WITH_RA_UPLINK = "true";
@@ -61,12 +62,13 @@
     private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
     private static final String IGMP_ON_POD_BASIS = "igmpOnPodBasis";
     private static final String OUTGOING_IGMP_WITH_V3 = "outgoingIgmpWithV3";
+    private static final String NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS = "numberOfIgmpReportProcessorThreads";
 
     /**
      * Gets the value of a string property, protecting for an empty
      * JSON object.
      *
-     * @param name name of the property
+     * @param name         name of the property
      * @param defaultValue default value if none has been specified
      * @return String value if one os found, default value otherwise
      */
@@ -212,7 +214,7 @@
             return DEFAULT_IGMP_PROVISIONING_SUPPORT;
         }
         return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
-                                                      DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
+                DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
     }
 
     public boolean igmpOnPodBasis() {
@@ -220,7 +222,7 @@
             return DEFAULT_IGMP_ON_POD_BASIS;
         }
         return Boolean.parseBoolean(getStringProperty(IGMP_ON_POD_BASIS,
-                                                      DEFAULT_IGMP_ON_POD_BASIS.toString()));
+                DEFAULT_IGMP_ON_POD_BASIS.toString()));
     }
 
     public Boolean outgoingIgmpWithV3() {
@@ -229,4 +231,9 @@
         }
         return Boolean.parseBoolean(getStringProperty(OUTGOING_IGMP_WITH_V3, DEFAULT_OUTGOING_IGMP_WITH_V3.toString()));
     }
-}
+
+    public int numberOfIgmpReportProcessorThreads() {
+        return Integer.parseInt(getStringProperty(NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS,
+                DEFAULT_NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS));
+    }
+}
\ No newline at end of file