[VOL-3518] Multi-threaded packet processing in maclearner app

Change-Id: I9eb3ebec813a1f002fe0be3da46257cca1c81ff6
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index df943e2..437cd3c 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -22,6 +22,7 @@
 import org.onlab.packet.EthType;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterEvent;
@@ -182,6 +183,9 @@
     private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
 
     protected ExecutorService eventExecutor;
+    // Packet workers - 0 will leverage available processors
+    private static final int DEFAULT_THREADS = 0;
+    private PredictableExecutor packetWorkers;
 
     @Activate
     public void activate() {
@@ -201,6 +205,9 @@
                 .withSerializer(createSerializer())
                 .withApplicationId(appId)
                 .build();
+        packetWorkers = new PredictableExecutor(DEFAULT_THREADS,
+                                                groupedThreads("onos/mac-learner-host-loc-provider",
+                                                                                "packet-worker-%d", log));
         //mac learner must process the packet before director processors
         packetService.addProcessor(macLearnerPacketProcessor,
                 PacketProcessor.advisor(2));
@@ -277,6 +284,7 @@
         clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(MacLearnerEvent.class);
+        packetWorkers.shutdown();
         if (eventExecutor != null) {
             eventExecutor.shutdown();
         }
@@ -458,6 +466,10 @@
 
         @Override
         public void process(PacketContext context) {
+            packetWorkers.submit(() -> processPacketInternal(context));
+        }
+
+        private void processPacketInternal(PacketContext context) {
             // process the packet and get the payload
             Ethernet packet = context.inPacket().parsed();
 
@@ -539,8 +551,9 @@
                             log.debug("Link not found for device {}", deviceId);
                         }
                         hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
-                                packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
-                                hloc, auxLocation, null);
+                                                          packet.getSourceMAC(), packet.getDestinationMAC(),
+                                                          vlan, innerVlan, outerTpid,
+                                                          hloc, auxLocation, null);
                         DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
                         //This packet is dhcp.
                         processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);
diff --git a/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java b/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
index adb59db..adfbc5a 100644
--- a/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
+++ b/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
@@ -33,12 +33,16 @@
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.onlab.junit.TestTools.assertAfter;
 
 /**
  * Set of tests of the Mac Learner ONOS application component.
  */
 public class MacLearnerManagerTest extends TestBaseMacLearner {
 
+    private static final int DELAY = 250;
+    private static final int PROCESSING_LENGTH = 500;
+
     @Before
     public void setUp() throws IOException {
         setUpApp();
@@ -65,10 +69,13 @@
                 CLIENT_VLAN,
                 VlanId.NONE,
                 CLIENT_CP));
-        Optional<MacAddress> macAddress = macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
-                CLIENT_CP.port(), CLIENT_VLAN);
-        assertTrue(macAddress.isPresent());
-        assertEquals(CLIENT_MAC, macAddress.get());
+        assertAfter(DELAY, PROCESSING_LENGTH, () -> {
+            Optional<MacAddress> macAddress =
+                    macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
+                                                    CLIENT_CP.port(), CLIENT_VLAN);
+            assertTrue(macAddress.isPresent());
+            assertEquals(CLIENT_MAC, macAddress.get());
+        });
     }
 
     @Test
@@ -77,10 +84,13 @@
                 CLIENT_VLAN,
                 CLIENT_QINQ_VLAN,
                 CLIENT_CP));
-        Optional<MacAddress> macAddress = macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
-                CLIENT_CP.port(), CLIENT_QINQ_VLAN);
-        assertTrue(macAddress.isPresent());
-        assertEquals(CLIENT_MAC, macAddress.get());
+        assertAfter(DELAY, PROCESSING_LENGTH, () -> {
+            Optional<MacAddress> macAddress = macLearnerManager.getMacMapping(CLIENT_CP.deviceId(),
+                                                                              CLIENT_CP.port(), CLIENT_QINQ_VLAN);
+            assertTrue(macAddress.isPresent());
+            assertEquals(CLIENT_MAC, macAddress.get());
+        });
+
     }
 
     @Test
@@ -89,15 +99,17 @@
                 CLIENT_VLAN,
                 CLIENT_QINQ_VLAN,
                 CLIENT_CP));
-        HostId hostId = HostId.hostId(CLIENT_MAC, CLIENT_QINQ_VLAN);
-        Host host = hostService.getHost(hostId);
-        assertNotNull(host);
-        assertEquals(OLT_DEVICE_ID, host.location().deviceId());
-        assertEquals(UNI_PORT, host.location().port());
-        Optional<HostLocation> optAuxLoc = host.auxLocations().stream().findFirst();
-        assertTrue(optAuxLoc.isPresent());
-        assertEquals(AGG_DEVICE_ID, optAuxLoc.get().deviceId());
-        assertEquals(AGG_OLT_PORT, optAuxLoc.get().port());
+        assertAfter(DELAY, PROCESSING_LENGTH, () -> {
+            HostId hostId = HostId.hostId(CLIENT_MAC, CLIENT_QINQ_VLAN);
+            Host host = hostService.getHost(hostId);
+            assertNotNull(host);
+            assertEquals(OLT_DEVICE_ID, host.location().deviceId());
+            assertEquals(UNI_PORT, host.location().port());
+            Optional<HostLocation> optAuxLoc = host.auxLocations().stream().findFirst();
+            assertTrue(optAuxLoc.isPresent());
+            assertEquals(AGG_DEVICE_ID, optAuxLoc.get().deviceId());
+            assertEquals(AGG_OLT_PORT, optAuxLoc.get().port());
+        });
     }
 
 }