[VOL-3518] Multi-threaded packet processing in maclearner app

Change-Id: I9eb3ebec813a1f002fe0be3da46257cca1c81ff6
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index df943e2..437cd3c 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -22,6 +22,7 @@
 import org.onlab.packet.EthType;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
+import org.onlab.util.PredictableExecutor;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterEvent;
@@ -182,6 +183,9 @@
     private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
 
     protected ExecutorService eventExecutor;
+    // Packet workers - 0 will leverage available processors
+    private static final int DEFAULT_THREADS = 0;
+    private PredictableExecutor packetWorkers;
 
     @Activate
     public void activate() {
@@ -201,6 +205,9 @@
                 .withSerializer(createSerializer())
                 .withApplicationId(appId)
                 .build();
+        packetWorkers = new PredictableExecutor(DEFAULT_THREADS,
+                                                groupedThreads("onos/mac-learner-host-loc-provider",
+                                                                                "packet-worker-%d", log));
         //mac learner must process the packet before director processors
         packetService.addProcessor(macLearnerPacketProcessor,
                 PacketProcessor.advisor(2));
@@ -277,6 +284,7 @@
         clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(MacLearnerEvent.class);
+        packetWorkers.shutdown();
         if (eventExecutor != null) {
             eventExecutor.shutdown();
         }
@@ -458,6 +466,10 @@
 
         @Override
         public void process(PacketContext context) {
+            packetWorkers.submit(() -> processPacketInternal(context));
+        }
+
+        private void processPacketInternal(PacketContext context) {
             // process the packet and get the payload
             Ethernet packet = context.inPacket().parsed();
 
@@ -539,8 +551,9 @@
                             log.debug("Link not found for device {}", deviceId);
                         }
                         hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
-                                packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
-                                hloc, auxLocation, null);
+                                                          packet.getSourceMAC(), packet.getDestinationMAC(),
+                                                          vlan, innerVlan, outerTpid,
+                                                          hloc, auxLocation, null);
                         DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
                         //This packet is dhcp.
                         processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);