[VOL-3518] Multi-threaded packet processing in maclearner app
Change-Id: I9eb3ebec813a1f002fe0be3da46257cca1c81ff6
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index df943e2..437cd3c 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -22,6 +22,7 @@
import org.onlab.packet.EthType;
import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterEvent;
@@ -182,6 +183,9 @@
private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
protected ExecutorService eventExecutor;
+ // Packet workers - 0 will leverage available processors
+ private static final int DEFAULT_THREADS = 0;
+ private PredictableExecutor packetWorkers;
@Activate
public void activate() {
@@ -201,6 +205,9 @@
.withSerializer(createSerializer())
.withApplicationId(appId)
.build();
+ packetWorkers = new PredictableExecutor(DEFAULT_THREADS,
+ groupedThreads("onos/mac-learner-host-loc-provider",
+ "packet-worker-%d", log));
//mac learner must process the packet before director processors
packetService.addProcessor(macLearnerPacketProcessor,
PacketProcessor.advisor(2));
@@ -277,6 +284,7 @@
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(MacLearnerEvent.class);
+ packetWorkers.shutdown();
if (eventExecutor != null) {
eventExecutor.shutdown();
}
@@ -458,6 +466,10 @@
@Override
public void process(PacketContext context) {
+ packetWorkers.submit(() -> processPacketInternal(context));
+ }
+
+ private void processPacketInternal(PacketContext context) {
// process the packet and get the payload
Ethernet packet = context.inPacket().parsed();
@@ -539,8 +551,9 @@
log.debug("Link not found for device {}", deviceId);
}
hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
- packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
- hloc, auxLocation, null);
+ packet.getSourceMAC(), packet.getDestinationMAC(),
+ vlan, innerVlan, outerTpid,
+ hloc, auxLocation, null);
DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
//This packet is dhcp.
processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);
diff --git a/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java b/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
index adb59db..adfbc5a 100644
--- a/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
+++ b/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
@@ -33,12 +33,16 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+import static org.onlab.junit.TestTools.assertAfter;
/**
* Set of tests of the Mac Learner ONOS application component.
*/
public class MacLearnerManagerTest extends TestBaseMacLearner {
+ private static final int DELAY = 250;
+ private static final int PROCESSING_LENGTH = 500;
+
@Before
public void setUp() throws IOException {
setUpApp();
@@ -65,10 +69,13 @@
CLIENT_VLAN,
VlanId.NONE,
CLIENT_CP));
- Optional<MacAddress> macAddress = macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
- CLIENT_CP.port(), CLIENT_VLAN);
- assertTrue(macAddress.isPresent());
- assertEquals(CLIENT_MAC, macAddress.get());
+ assertAfter(DELAY, PROCESSING_LENGTH, () -> {
+ Optional<MacAddress> macAddress =
+ macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
+ CLIENT_CP.port(), CLIENT_VLAN);
+ assertTrue(macAddress.isPresent());
+ assertEquals(CLIENT_MAC, macAddress.get());
+ });
}
@Test
@@ -77,10 +84,13 @@
CLIENT_VLAN,
CLIENT_QINQ_VLAN,
CLIENT_CP));
- Optional<MacAddress> macAddress = macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
- CLIENT_CP.port(), CLIENT_QINQ_VLAN);
- assertTrue(macAddress.isPresent());
- assertEquals(CLIENT_MAC, macAddress.get());
+ assertAfter(DELAY, PROCESSING_LENGTH, () -> {
+ Optional<MacAddress> macAddress = macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
+ CLIENT_CP.port(), CLIENT_QINQ_VLAN);
+ assertTrue(macAddress.isPresent());
+ assertEquals(CLIENT_MAC, macAddress.get());
+ });
+
}
@Test
@@ -89,15 +99,17 @@
CLIENT_VLAN,
CLIENT_QINQ_VLAN,
CLIENT_CP));
- HostId hostId = HostId.hostId(CLIENT_MAC, CLIENT_QINQ_VLAN);
- Host host = hostService.getHost(hostId);
- assertNotNull(host);
- assertEquals(OLT_DEVICE_ID, host.location().deviceId());
- assertEquals(UNI_PORT, host.location().port());
- Optional<HostLocation> optAuxLoc = host.auxLocations().stream().findFirst();
- assertTrue(optAuxLoc.isPresent());
- assertEquals(AGG_DEVICE_ID, optAuxLoc.get().deviceId());
- assertEquals(AGG_OLT_PORT, optAuxLoc.get().port());
+ assertAfter(DELAY, PROCESSING_LENGTH, () -> {
+ HostId hostId = HostId.hostId(CLIENT_MAC, CLIENT_QINQ_VLAN);
+ Host host = hostService.getHost(hostId);
+ assertNotNull(host);
+ assertEquals(OLT_DEVICE_ID, host.location().deviceId());
+ assertEquals(UNI_PORT, host.location().port());
+ Optional<HostLocation> optAuxLoc = host.auxLocations().stream().findFirst();
+ assertTrue(optAuxLoc.isPresent());
+ assertEquals(AGG_DEVICE_ID, optAuxLoc.get().deviceId());
+ assertEquals(AGG_OLT_PORT, optAuxLoc.get().port());
+ });
}
}