[VOL-3500] Moving DHCP packet processing in separated thread
Change-Id: Ibb9ca631548d9cabb8e3c76daa8c55094a18304a
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index a765548..8e11002 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -15,31 +15,11 @@
*/
package org.opencord.dhcpl2relay.impl;
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.time.Instant;
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.function.Predicate;
-import java.util.stream.Collectors;
-
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
import org.apache.commons.io.HexDump;
import org.onlab.packet.DHCP;
import org.onlab.packet.Ethernet;
@@ -113,10 +93,30 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Sets;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static java.util.concurrent.Executors.newFixedThreadPool;
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
/**
* DHCP Relay Agent Application Component.
@@ -125,6 +125,7 @@
property = {
OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
+ PACKET_PROCESSOR_THREADS + ":Integer=" + PACKET_PROCESSOR_THREADS_DEFAULT,
})
public class DhcpL2Relay
extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -197,6 +198,11 @@
*/
protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
+ /**
+ * Number of threads used to process the packet.
+ */
+ protected int packetProcessorThreads = PACKET_PROCESSOR_THREADS_DEFAULT;
+
ScheduledFuture<?> refreshTask;
ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
@@ -221,8 +227,11 @@
private DhcpL2RelayStoreDelegate delegate = new InnerDhcpL2RelayStoreDelegate();
+ protected ExecutorService packetProcessorExecutor;
+
@Activate
protected void activate(ComponentContext context) {
+
//start the dhcp relay agent
appId = coreService.registerApplication(DHCP_L2RELAY_APP);
// ensure that host-learning via dhcp includes IP addresses
@@ -255,13 +264,15 @@
factories.forEach(cfgService::registerConfigFactory);
//update the dhcp server configuration.
updateConfig();
- //add the packet services.
- packetService.addProcessor(dhcpRelayPacketProcessor,
- PacketProcessor.director(0));
+
if (context != null) {
modified(context);
}
+ //add the packet services.
+ packetService.addProcessor(dhcpRelayPacketProcessor,
+ PacketProcessor.director(0));
+
log.info("DHCP-L2-RELAY Started");
}
@@ -279,6 +290,7 @@
packetService.removeProcessor(dhcpRelayPacketProcessor);
cancelDhcpPktsFromServer();
+ packetProcessorExecutor.shutdown();
componentConfigService.unregisterProperties(getClass(), false);
deviceService.removeListener(deviceListener);
mastershipService.removeListener(changeListener);
@@ -300,6 +312,19 @@
if (o != null) {
enableDhcpBroadcastReplies = o;
}
+
+ String s = Tools.get(properties, PACKET_PROCESSOR_THREADS);
+ int oldpacketProcessorThreads = packetProcessorThreads;
+ packetProcessorThreads = Strings.isNullOrEmpty(s) ? oldpacketProcessorThreads
+ : Integer.parseInt(s.trim());
+ if (packetProcessorExecutor == null || oldpacketProcessorThreads != packetProcessorThreads) {
+ if (packetProcessorExecutor != null) {
+ packetProcessorExecutor.shutdown();
+ }
+ packetProcessorExecutor = newFixedThreadPool(packetProcessorThreads,
+ groupedThreads("onos/dhcp",
+ "dhcp-packet-%d", log));
+ }
}
@Override
@@ -591,6 +616,12 @@
@Override
public void process(PacketContext context) {
+ packetProcessorExecutor.execute(() -> {
+ processInternal(context);
+ });
+ }
+
+ private void processInternal(PacketContext context) {
if (!configured()) {
log.warn("Missing DHCP relay config. Abort packet processing");
return;
@@ -612,6 +643,11 @@
if (udpPacket.getSourcePort() == UDP.DHCP_CLIENT_PORT ||
udpPacket.getSourcePort() == UDP.DHCP_SERVER_PORT) {
DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
+ if (log.isTraceEnabled()) {
+ log.trace("Processing packet with type {} from MAC {}",
+ getDhcpPacketType(dhcpPayload),
+ MacAddress.valueOf(dhcpPayload.getClientHardwareAddress()));
+ }
//This packet is dhcp.
processDhcpPacket(context, packet, dhcpPayload);
}
@@ -621,8 +657,15 @@
//forward the packet to ConnectPoint where the DHCP server is attached.
private void forwardPacket(Ethernet packet, PacketContext context) {
+ if (log.isTraceEnabled()) {
+ IPv4 ipv4Packet = (IPv4) packet.getPayload();
+ UDP udpPacket = (UDP) ipv4Packet.getPayload();
+ DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
+ log.trace("Emitting packet to server: packet {}, with MAC {}",
+ getDhcpPacketType(dhcpPayload),
+ MacAddress.valueOf(dhcpPayload.getClientHardwareAddress()));
+ }
ConnectPoint toSendTo = null;
-
if (!useOltUplink) {
toSendTo = dhcpServerConnectPoint.get();
} else {
@@ -895,7 +938,7 @@
// If we can't find the subscriber, can't process further
if (subsCp == null) {
log.warn("Couldn't find subscriber, service or host info for mac"
- + " address {} .. DHCP packet won't be delivered", dstMac);
+ + " address {} and vlan {} .. DHCP packet won't be delivered", dstMac, innerVlan);
return null;
}
@@ -984,17 +1027,15 @@
private void sendReply(Ethernet ethPacket, DHCP dhcpPayload, PacketContext context) {
MacAddress descMac = valueOf(dhcpPayload.getClientHardwareAddress());
ConnectPoint subCp = getConnectPointOfClient(descMac, context);
-
// Send packet out to requester if the host information is available
if (subCp != null) {
- log.info("Sending DHCP Packet to client at {}", subCp);
TrafficTreatment t = DefaultTrafficTreatment.builder()
.setOutput(subCp.port()).build();
OutboundPacket o = new DefaultOutboundPacket(
subCp.deviceId(), t, ByteBuffer.wrap(ethPacket.serialize()));
if (log.isTraceEnabled()) {
- log.trace("Relaying packet to DHCP client at {} {}", subCp,
- ethPacket);
+ log.trace("Relaying packet to DHCP client at {} with MacAddress {}, {}", subCp,
+ descMac, ethPacket);
}
packetService.emit(o);
} else {
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
index 3c47467..bcd4672 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
@@ -33,6 +33,9 @@
public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
+ public static final String PACKET_PROCESSOR_THREADS = "packetProcessorThreads";
+ public static final int PACKET_PROCESSOR_THREADS_DEFAULT = 10;
+
public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
}