IgmpProxy performance improvements
Change-Id: I0364437ca934253cadb62981e84dd240eee9d0c9
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index 133ff18..2025e0b 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -16,22 +16,7 @@
package org.opencord.igmpproxy.impl;
import com.google.common.collect.Sets;
-import org.onosproject.net.Device;
-import org.opencord.igmpproxy.IgmpLeadershipService;
-import org.opencord.igmpproxy.IgmpStatisticType;
-import org.opencord.igmpproxy.IgmpStatisticsService;
-import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
-import org.opencord.igmpproxy.GroupMemberId;
-import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
-import org.opencord.igmpproxy.statemachine.StateMachineService;
-import org.opencord.sadis.BaseInformationService;
-import org.opencord.sadis.SadisService;
-import org.opencord.sadis.SubscriberAndDeviceInformation;
-import org.osgi.service.component.annotations.Activate;
-import org.osgi.service.component.annotations.Component;
-import org.osgi.service.component.annotations.Deactivate;
-import org.osgi.service.component.annotations.Reference;
-import org.osgi.service.component.annotations.ReferenceCardinality;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.onlab.packet.EthType;
import org.onlab.packet.Ethernet;
import org.onlab.packet.IGMP;
@@ -45,8 +30,11 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
+import org.onosproject.mcast.api.McastRoute;
+import org.onosproject.mcast.api.MulticastRouteService;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
import org.onosproject.net.DeviceId;
import org.onosproject.net.Port;
import org.onosproject.net.PortNumber;
@@ -68,22 +56,36 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.flowobjective.ObjectiveContext;
import org.onosproject.net.flowobjective.ObjectiveError;
-import org.onosproject.mcast.api.McastRoute;
-import org.onosproject.mcast.api.MulticastRouteService;
import org.onosproject.net.packet.InboundPacket;
import org.onosproject.net.packet.PacketContext;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
+import org.opencord.igmpproxy.GroupMemberId;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatisticType;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
+import org.opencord.igmpproxy.impl.store.groupmember.GroupMemberStore;
+import org.opencord.igmpproxy.statemachine.StateMachineService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.Arrays;
-import java.util.List;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.TimerTask;
@@ -91,16 +93,15 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
-import static org.onlab.packet.IGMPMembership.MODE_IS_INCLUDE;
-import static org.onlab.packet.IGMPMembership.MODE_IS_EXCLUDE;
-import static org.onlab.packet.IGMPMembership.CHANGE_TO_INCLUDE_MODE;
-import static org.onlab.packet.IGMPMembership.CHANGE_TO_EXCLUDE_MODE;
import static org.onlab.packet.IGMPMembership.ALLOW_NEW_SOURCES;
import static org.onlab.packet.IGMPMembership.BLOCK_OLD_SOURCES;
-
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static org.onlab.packet.IGMPMembership.CHANGE_TO_EXCLUDE_MODE;
+import static org.onlab.packet.IGMPMembership.CHANGE_TO_INCLUDE_MODE;
+import static org.onlab.packet.IGMPMembership.MODE_IS_EXCLUDE;
+import static org.onlab.packet.IGMPMembership.MODE_IS_INCLUDE;
import static org.onlab.util.Tools.groupedThreads;
/**
@@ -220,6 +221,9 @@
private int maxResp = 10; //unit is 1 sec
private int keepAliveInterval = 120; //unit is 1 sec
+ private int numberOfIgmpReportProcessorThreads = 20;
+ ExecutorService[] igmpReportProcessServiceExecutorList;
+
private ExecutorService eventExecutor;
public static int getUnsolicitedTimeout() {
@@ -265,8 +269,9 @@
}
deviceService.addListener(deviceListener);
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
- eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
+ eventExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
"events-igmp-%d", log));
+ initializeIgmpReportProcessServiceExecutors();
log.info("Started");
}
@@ -274,6 +279,7 @@
protected void deactivate() {
scheduledExecutorService.shutdown();
eventExecutor.shutdown();
+ shutdownIgmpReportProcessServiceExecutors();
// de-register and null our handler
networkConfig.removeListener(configListener);
@@ -285,13 +291,32 @@
log.info("Stopped");
}
+ private void initializeIgmpReportProcessServiceExecutors() {
+ igmpReportProcessServiceExecutorList = new ExecutorService[numberOfIgmpReportProcessorThreads];
+ for (int i = 0; i < numberOfIgmpReportProcessorThreads; i++) {
+ ThreadFactory igmpReportProcessorThreadFactory =
+ new ThreadFactoryBuilder().setNameFormat("report-processor-igmp-" + i)
+ .setUncaughtExceptionHandler((t, e) ->
+ log.error("Uncaught exception on {}: ", t.getName(), e))
+ .build();
+ ExecutorService igmpReportProcessServiceExecutor = Executors.newSingleThreadExecutor(
+ igmpReportProcessorThreadFactory);
+ igmpReportProcessServiceExecutorList[i] = igmpReportProcessServiceExecutor;
+ }
+ }
+ private void shutdownIgmpReportProcessServiceExecutors() {
+ for (ExecutorService executor : igmpReportProcessServiceExecutorList) {
+ executor.shutdown();
+ }
+ }
+
protected Ip4Address getDeviceIp(DeviceId ofDeviceId) {
try {
String[] mgmtAddress = deviceService.getDevice(ofDeviceId)
.annotations().value(AnnotationKeys.MANAGEMENT_ADDRESS).split(":");
return Ip4Address.valueOf(mgmtAddress[0]);
} catch (Exception ex) {
- log.info("No valid Ipaddress for " + ofDeviceId.toString());
+ log.info("No valid Ipaddress for {}", ofDeviceId);
return null;
}
}
@@ -344,6 +369,16 @@
return (maxResp + 5) / 10;
}
+ private void queueIgmpReport(IGMPMembership igmpGroup, VlanId vlan, ConnectPoint cp, byte igmpType) {
+ int packetHashCode = Objects.hash(igmpGroup.getGaddr(), connectPoint);
+ int threadId = Math.abs(packetHashCode % numberOfIgmpReportProcessorThreads);
+ log.debug("IGMP report for ConnectPoint {} and group IP {} shall be processed in thread #{}",
+ cp, igmpGroup.getGaddr(), threadId);
+
+ igmpReportProcessServiceExecutorList[threadId].execute(
+ () -> processIgmpReport(igmpGroup, vlan, cp, igmpType));
+ }
+
private Ip4Address ssmTranslateRoute(IpAddress group) {
return ssmTranslateTable.get(group);
}
@@ -353,11 +388,11 @@
PortNumber portNumber = cp.port();
log.debug("Processing IGMP report on membership {} for vlan {} on port {} with type {}",
- igmpGroup, vlan, cp, igmpType);
+ igmpGroup, vlan, cp, igmpType);
Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
if (!groupIp.isMulticast()) {
- log.info(groupIp.toString() + " is not a valid group address");
+ log.info("{} is not a valid group address", groupIp);
igmpStatisticsManager.increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER);
return;
}
@@ -405,6 +440,7 @@
}
}
GroupMemberId groupMemberKey = GroupMemberId.of(groupIp, deviceId, portNumber);
+ log.debug("{} for {}", join ? "Join" : "Leave", groupMemberKey);
GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
if (join) {
@@ -522,12 +558,13 @@
* Packet processor responsible for forwarding packets along their paths.
*/
private class IgmpPacketProcessor implements PacketProcessor {
+
@Override
public void process(PacketContext context) {
-
eventExecutor.execute(() -> {
try {
InboundPacket pkt = context.inPacket();
+ log.debug("IgmpPacketProcessor shall process InboundPacket: {}", pkt);
Ethernet ethPkt = pkt.parsed();
if (ethPkt == null) {
return;
@@ -624,15 +661,16 @@
Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
while (itr.hasNext()) {
IGMPGroup group = itr.next();
+ log.debug("IGMPGroup {}", group.getGaddr());
if (group instanceof IGMPMembership) {
- processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+ queueIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
pkt.receivedFrom(), igmp.getIgmpType());
} else {
IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
IGMPMembership.MODE_IS_EXCLUDE :
IGMPMembership.MODE_IS_INCLUDE);
- processIgmpReport(mgroup, VlanId.vlanId(vlan),
+ queueIgmpReport(mgroup, VlanId.vlanId(vlan),
pkt.receivedFrom(), igmp.getIgmpType());
}
}
@@ -961,6 +999,11 @@
pimSSmInterworking = newCfg.pimSsmInterworking();
enableIgmpProvisioning = newCfg.enableIgmpProvisioning();
igmpOnPodBasis = newCfg.igmpOnPodBasis();
+ if (numberOfIgmpReportProcessorThreads != newCfg.numberOfIgmpReportProcessorThreads()) {
+ numberOfIgmpReportProcessorThreads = newCfg.numberOfIgmpReportProcessorThreads();
+ shutdownIgmpReportProcessServiceExecutors();
+ initializeIgmpReportProcessServiceExecutors();
+ }
if (newCfg.outgoingIgmpWithV3() != null &&
outgoingIgmpWithV3 != newCfg.outgoingIgmpWithV3()) {
outgoingIgmpWithV3 = newCfg.outgoingIgmpWithV3();
@@ -1130,4 +1173,4 @@
processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
}
-}
+}
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
index 6d8591a..e2db2ff 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpproxyConfig.java
@@ -32,6 +32,7 @@
protected static final String DEFAULT_LAST_QUERY_COUNT = "2";
protected static final String DEFAULT_IGMP_COS = "7";
protected static final String DEFAULT_UNI_IGMP_COS = "7";
+ protected static final String DEFAULT_NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS = "20";
protected static final Boolean DEFAULT_FAST_LEAVE = false;
protected static final Boolean DEFAULT_PERIODIC_QUERY = true;
protected static final String DEFAULT_WITH_RA_UPLINK = "true";
@@ -61,12 +62,13 @@
private static final String ENABLE_IGMP_PROVISIONING = "enableIgmpProvisioning";
private static final String IGMP_ON_POD_BASIS = "igmpOnPodBasis";
private static final String OUTGOING_IGMP_WITH_V3 = "outgoingIgmpWithV3";
+ private static final String NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS = "numberOfIgmpReportProcessorThreads";
/**
* Gets the value of a string property, protecting for an empty
* JSON object.
*
- * @param name name of the property
+ * @param name name of the property
* @param defaultValue default value if none has been specified
* @return String value if one os found, default value otherwise
*/
@@ -212,7 +214,7 @@
return DEFAULT_IGMP_PROVISIONING_SUPPORT;
}
return Boolean.parseBoolean(getStringProperty(ENABLE_IGMP_PROVISIONING,
- DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
+ DEFAULT_IGMP_PROVISIONING_SUPPORT.toString()));
}
public boolean igmpOnPodBasis() {
@@ -220,7 +222,7 @@
return DEFAULT_IGMP_ON_POD_BASIS;
}
return Boolean.parseBoolean(getStringProperty(IGMP_ON_POD_BASIS,
- DEFAULT_IGMP_ON_POD_BASIS.toString()));
+ DEFAULT_IGMP_ON_POD_BASIS.toString()));
}
public Boolean outgoingIgmpWithV3() {
@@ -229,4 +231,9 @@
}
return Boolean.parseBoolean(getStringProperty(OUTGOING_IGMP_WITH_V3, DEFAULT_OUTGOING_IGMP_WITH_V3.toString()));
}
-}
+
+ public int numberOfIgmpReportProcessorThreads() {
+ return Integer.parseInt(getStringProperty(NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS,
+ DEFAULT_NUMBER_OF_IGMP_REPORT_PROCESSOR_THREADS));
+ }
+}
\ No newline at end of file