[VOL-3518] Multi-threaded packet processing in maclearner app
Change-Id: I9eb3ebec813a1f002fe0be3da46257cca1c81ff6
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index df943e2..437cd3c 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -22,6 +22,7 @@
import org.onlab.packet.EthType;
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterEvent;
@@ -182,6 +183,9 @@
private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
protected ExecutorService eventExecutor;
+ // Packet workers - 0 will leverage available processors
+ private static final int DEFAULT_THREADS = 0;
+ private PredictableExecutor packetWorkers;
@Activate
public void activate() {
@@ -201,6 +205,9 @@
.withSerializer(createSerializer())
.withApplicationId(appId)
.build();
+ packetWorkers = new PredictableExecutor(DEFAULT_THREADS,
+ groupedThreads("onos/mac-learner-host-loc-provider",
+ "packet-worker-%d", log));
//mac learner must process the packet before director processors
packetService.addProcessor(macLearnerPacketProcessor,
PacketProcessor.advisor(2));
@@ -277,6 +284,7 @@
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(MacLearnerEvent.class);
+ packetWorkers.shutdown();
if (eventExecutor != null) {
eventExecutor.shutdown();
}
@@ -458,6 +466,10 @@
@Override
public void process(PacketContext context) {
+ packetWorkers.submit(() -> processPacketInternal(context));
+ }
+
+ private void processPacketInternal(PacketContext context) {
// process the packet and get the payload
Ethernet packet = context.inPacket().parsed();
@@ -539,8 +551,9 @@
log.debug("Link not found for device {}", deviceId);
}
hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
- packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
- hloc, auxLocation, null);
+ packet.getSourceMAC(), packet.getDestinationMAC(),
+ vlan, innerVlan, outerTpid,
+ hloc, auxLocation, null);
DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
//This packet is dhcp.
processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);