[VOL-4549] Improving event handling in order to reach a higher scale

- isSubscriberServiceProvisioned is not holding the lock for the full iteration on the map, but reading, releasing the lock and then iterating
- PORT_ADDED event for UNI ports are discarded as inconsequential, when they are discovered the status is always DISABLED, we start operating on the once the status changes and that happens in a PORT_UPDATED event. In case of ports up/down we always get a PORT_UPDATED as it's already known to the system.
- the executor now uses a custom pool for serving threads
- the queue add/remove now holds a lock for all the computation and uses the .compute method to process the operation

Change-Id: Icedea07d32d1cddb339d672f3b274a6c7f941903
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 5ee9375..e391537 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -56,16 +56,19 @@
 
 import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -85,6 +88,7 @@
                 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
                 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
                 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
+                FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
                 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
                 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
         })
@@ -150,6 +154,11 @@
     /**
      * Number of threads used to process flows.
      **/
+    protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
+
+    /**
+     * Number of threads used to process flows.
+     **/
     protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
 
     /**
@@ -252,13 +261,20 @@
             flowProcessingThreads = isNullOrEmpty(flowThreads) ?
                     oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
 
-            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
+            String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
+            int oldExecutorQueueSize = flowExecutorQueueSize;
+            flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
+                    oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
+
+            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
+                    || oldExecutorQueueSize != flowExecutorQueueSize) {
                 if (flowsExecutor != null) {
                     flowsExecutor.shutdown();
                 }
-                flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
-                        groupedThreads(ONOS_OLT_SERVICE,
-                                "flows-installer-%d"));
+
+                flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
+                        TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
+                        new ThreadPoolExecutor.DiscardPolicy());
             }
 
             String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
@@ -279,11 +295,12 @@
             requeueDelay = isNullOrEmpty(queueDelay) ?
                     REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
         }
-        log.info("Modified. Values = {}: {}, {}: {}, " +
+        log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
                         "{}:{}, {}:{}, {}:{}",
                 DEFAULT_BP_ID, defaultBpId,
                 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
                 FLOW_PROCESSING_THREADS, flowProcessingThreads,
+                FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
                 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
                 REQUEUE_DELAY, requeueDelay);
     }
@@ -490,97 +507,99 @@
     }
 
     protected void processDiscoveredSubscribers() {
-        log.info("Started processDiscoveredSubscribers loop");
-        while (true) {
-            Set<ConnectPoint> discoveredCps;
-            try {
-                queueReadLock.lock();
-                discoveredCps = eventsQueues.keySet();
-            } catch (Exception e) {
-                log.error("Cannot read keys from queue map", e);
-                return;
-            } finally {
-                queueReadLock.unlock();
-            }
 
-            discoveredCps.forEach(cp -> {
-                LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
-
+            log.info("Started processDiscoveredSubscribers loop");
+            while (true) {
+                Set<ConnectPoint> discoveredCps;
                 try {
                     queueReadLock.lock();
-                    eventsQueue = eventsQueues.get(cp);
+                    discoveredCps = new HashSet<>(eventsQueues.keySet());
                 } catch (Exception e) {
-                    log.error("Cannot get key from queue map", e);
-                    return;
+                    log.error("Cannot read keys from queue map", e);
+                    continue;
                 } finally {
                     queueReadLock.unlock();
                 }
 
-                if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
-                    // if we're not local leader for this device, ignore this queue
-                    if (log.isTraceEnabled()) {
-                        log.trace("Ignoring queue on CP {} as not master of the device", cp);
+                discoveredCps.forEach(cp -> {
+                    LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
+
+                    try {
+                        queueReadLock.lock();
+                        eventsQueue = eventsQueues.get(cp);
+                    } catch (Exception e) {
+                        log.error("Cannot get key from queue map", e);
+                        return;
+                    } finally {
+                        queueReadLock.unlock();
                     }
-                    return;
-                }
 
-                flowsExecutor.execute(() -> {
-                    if (!eventsQueue.isEmpty()) {
-                        // we do not remove the event from the queue until it has been processed
-                        // in that way we guarantee that events are processed in order
-                        DiscoveredSubscriber sub = eventsQueue.peek();
-                        if (sub == null) {
-                            // the queue is empty
-                            return;
-                        }
-
+                    if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
+                        // if we're not local leader for this device, ignore this queue
                         if (log.isTraceEnabled()) {
-                            log.trace("Processing subscriber on port {} with status {}",
-                                    portWithName(sub.port), sub.status);
+                            log.trace("Ignoring queue on CP {} as not master of the device", cp);
                         }
+                        return;
+                    }
 
-                        if (sub.hasSubscriber) {
-                            // this is a provision subscriber call
-                            if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Provisioning of subscriber on {} completed",
-                                            portWithName(sub.port));
-                                }
-                                removeSubscriberFromQueue(sub);
-                            }
-                        } else {
-                            // this is a port event (ENABLED/DISABLED)
-                            // means no subscriber was provisioned on that port
-
-                            if (!deviceService.isAvailable(sub.device.id()) ||
-                                    deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
-                                // If the device is not connected or the port is not available do nothing
-                                // This can happen when we disable and then immediately delete the device,
-                                // the queue is populated but the meters and flows are already gone
-                                // thus there is nothing left to do
+                    try {
+                        flowsExecutor.execute(() -> {
+                        if (!eventsQueue.isEmpty()) {
+                            // we do not remove the event from the queue until it has been processed
+                            // in that way we guarantee that events are processed in order
+                            DiscoveredSubscriber sub = eventsQueue.peek();
+                            if (sub == null) {
+                                // the queue is empty
                                 return;
                             }
 
-                            if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Processing of port {} completed",
-                                            portWithName(sub.port));
+                            if (log.isTraceEnabled()) {
+                            log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
+                                     sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
+                            }
+
+                            if (sub.hasSubscriber) {
+                                // this is a provision subscriber call
+                                if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
+                                    removeSubscriberFromQueue(sub);
                                 }
-                                removeSubscriberFromQueue(sub);
+                            } else {
+                                // this is a port event (ENABLED/DISABLED)
+                                // means no subscriber was provisioned on that port
+
+                                if (!deviceService.isAvailable(sub.device.id()) ||
+                                        deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
+                                    // If the device is not connected or the port is not available do nothing
+                                    // This can happen when we disable and then immediately delete the device,
+                                    // the queue is populated but the meters and flows are already gone
+                                    // thus there is nothing left to do
+                                    return;
+                                }
+
+                                if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
+                                    if (log.isTraceEnabled()) {
+                                        log.trace("Processing of port {} completed",
+                                                  portWithName(sub.port));
+                                    }
+                                    removeSubscriberFromQueue(sub);
+                                }
                             }
                         }
+                    });
+                    } catch (Exception e) {
+                        log.error("Exception processing subscriber", e);
                     }
                 });
-            });
 
-            try {
-                TimeUnit.MILLISECONDS.sleep(requeueDelay);
-            } catch (InterruptedException e) {
-                continue;
+                try {
+                    TimeUnit.MILLISECONDS.sleep(requeueDelay);
+                } catch (InterruptedException e) {
+                    log.debug("Interrupted while waiting to requeue", e);
+                }
             }
-        }
     }
 
+
     /**
      * Checks the subscriber uni tag list and find the uni tag information.
      * using the pon c tag, pon s tag and the technology profile id
@@ -641,58 +660,55 @@
     }
 
     protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
-        ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
-        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
         try {
-            queueReadLock.lock();
-            q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
-        } finally {
-            queueReadLock.unlock();
-        }
+            ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
 
-        log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
-                portWithName(sub.port), sub.status, sub.hasSubscriber);
-        q.add(sub);
-
-        try {
-            queueWriteLock.lock();
-            eventsQueues.put(cp, q);
-        } catch (UnsupportedOperationException | ClassCastException |
-                NullPointerException | IllegalArgumentException e) {
-            log.error("Cannot add subscriber to queue: {}", e.getMessage());
-        } finally {
-            queueWriteLock.unlock();
+            try {
+                queueWriteLock.lock();
+                eventsQueues.compute(cp, (subcp, queue) -> {
+                    queue = queue == null ? new LinkedBlockingQueue<>() : queue;
+                    log.info("Adding subscriber {} to queue: {} with existing {}",
+                             sub, portWithName(sub.port), queue);
+                    queue.add(sub);
+                    return queue;
+                });
+            } catch (UnsupportedOperationException | ClassCastException |
+                    NullPointerException | IllegalArgumentException e) {
+                log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
+            } finally {
+                queueWriteLock.unlock();
+            }
+        } catch (Exception e) {
+            log.error("Can't add {} to queue", sub, e);
         }
     }
 
     protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
         ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
-        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
-        if (log.isTraceEnabled()) {
-            log.trace("removing subscriber {} from queue", sub);
-        }
-        try {
-            queueReadLock.lock();
-            q = eventsQueues.get(cp);
-        } finally {
-            queueReadLock.unlock();
-        }
-        if (q == null) {
-            log.warn("Cannot find queue for connectPoint {}", cp);
-            return;
-        }
-        boolean removed = q.remove(sub);
-        if (!removed) {
-            log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
-            return;
-        } else {
-            log.debug("Subscriber {} has been removed from the queue", sub);
-        }
-
         try {
             queueWriteLock.lock();
-            eventsQueues.remove(cp); // am I needed??
-            eventsQueues.put(cp, q);
+            eventsQueues.compute(cp, (subcp, queue) -> {
+                if (log.isTraceEnabled()) {
+                    log.trace("Removing subscriber {} from queue : {} " +
+                                      "with existing {}", sub,
+                              portWithName(sub.port), queue);
+                }
+                if (queue == null) {
+                    log.warn("Cannot find queue for connectPoint {}", cp);
+                    return queue;
+                }
+                boolean removed = queue.remove(sub);
+                if (!removed) {
+                    log.warn("Subscriber {} has not been removed from queue, " +
+                                     "is it still there? {}", sub, queue);
+                    return queue;
+                } else {
+                    log.debug("Subscriber {} has been removed from the queue {}",
+                              sub, queue);
+                }
+
+                return queue;
+            });
         } catch (UnsupportedOperationException | ClassCastException |
                 NullPointerException | IllegalArgumentException e) {
             log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
@@ -721,7 +737,16 @@
 
         @Override
         public void event(DeviceEvent event) {
+            if (log.isTraceEnabled()) {
+                log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
+                         event.port() != null ? event.port().number() : null);
+            }
             eventExecutor.execute(() -> {
+                if (log.isTraceEnabled()) {
+                    log.trace("OltListener Executor receives event {} for: {}/{}",
+                             event.type(), event.subject().id(),
+                              event.port() != null ? event.port().number() : null);
+                }
                 boolean isOlt = oltDeviceService.isOlt(event.subject());
                 DeviceId deviceId = event.subject().id();
                 switch (event.type()) {
@@ -815,8 +840,15 @@
         protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
             log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
                     portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
+            boolean isNni = oltDeviceService.isNniPort(device, port.number());
+
+            if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
+                log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
+                return;
+            }
+
             if (port.isEnabled()) {
-                if (oltDeviceService.isNniPort(device, port.number())) {
+                if (isNni) {
                     OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
                     // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
                     // In that case the flows are purged anyway, so there's no need to deal with them,
@@ -853,7 +885,7 @@
                     addSubscriberToQueue(sub);
                 }
             } else {
-                if (oltDeviceService.isNniPort(device, port.number())) {
+                if (isNni) {
                     // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
                     // In that case the flows are purged anyway, so there's no need to deal with them,
                     // it would actually be counter-productive as the openflow connection is severed and they won't
@@ -918,4 +950,26 @@
             }
         }
     }
+
+    protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
+
+        public ThreadPoolQueue(int capacity) {
+            super(capacity);
+        }
+
+        @Override
+        public boolean offer(Runnable runnable) {
+            if (runnable == null) {
+                return false;
+            }
+            try {
+                put(runnable);
+            } catch (InterruptedException e1) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+
+    }
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 4dafd16..ecaaa3d 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -85,11 +85,13 @@
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -722,20 +724,20 @@
 
     @Override
     public boolean isSubscriberServiceProvisioned(AccessDevicePort cp) {
-        // if any service is programmed on this port, returns true
-        AtomicBoolean provisioned = new AtomicBoolean(false);
+        Set<Map.Entry<ServiceKey, Boolean>> subs;
         try {
             provisionedSubscribersReadLock.lock();
-            for (Map.Entry<ServiceKey, Boolean> entry : provisionedSubscribers.entrySet()) {
-                if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
-                    provisioned.set(true);
-                    break;
-                }
-            }
+            subs = new HashSet<>(provisionedSubscribers.entrySet());
         } finally {
             provisionedSubscribersReadLock.unlock();
         }
-        return provisioned.get();
+
+        for (Map.Entry<ServiceKey, Boolean> entry : subs) {
+            if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index a73a42b..35095b6 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -65,10 +65,14 @@
     public static final int REQUIRED_DRIVERS_PROPERTY_DELAY_DEFAULT = 5;
 
     public static final String FLOW_PROCESSING_THREADS = "flowProcessingThreads";
-    public static final int FLOW_PROCESSING_THREADS_DEFAULT = 8;
+    public static final int FLOW_PROCESSING_THREADS_DEFAULT = 32;
+
+    //Giving it a value of * 4 the number of flows.
+    public static final String FLOW_EXECUTOR_QUEUE_SIZE = "flowExecutorQueueSize";
+    public static final int FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT = 128;
 
     public static final String SUBSCRIBER_PROCESSING_THREADS = "subscriberProcessingThreads";
-    public static final int SUBSCRIBER_PROCESSING_THREADS_DEFAULT = 8;
+    public static final int SUBSCRIBER_PROCESSING_THREADS_DEFAULT = 24;
 
     public static final String REQUEUE_DELAY = "requeueDelay";
     public static final int REQUEUE_DELAY_DEFAULT = 500;
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java b/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
index 6af50f5..b3b20e6 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
@@ -186,7 +186,8 @@
         PortNumber uniPortNumber = PortNumber.portNumber(16);
         Port uniAddedDisabled = new OltPort(testDevice, false, uniPortNumber,
                 DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1").build());
-        DeviceEvent uniAddedDisabledEvent = new DeviceEvent(DeviceEvent.Type.PORT_ADDED, testDevice, uniAddedDisabled);
+        DeviceEvent uniAddedDisabledEvent =
+                new DeviceEvent(DeviceEvent.Type.PORT_UPDATED, testDevice, uniAddedDisabled);
         ConnectPoint cp = new ConnectPoint(testDevice.id(), uniPortNumber);
 
         // if the port does not have default EAPOL we should not generate an event
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
index 0448f9d..d968c24 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
@@ -45,6 +45,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -115,8 +116,7 @@
         reset(component.oltFlowService);
 
         component.bindSadisService(sadisService);
-        component.eventsQueues = component.storageService.
-                <ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder().build().asJavaMap();
+        component.eventsQueues = new HashMap<>();
         component.eventsQueues.put(cp, new LinkedBlockingQueue<>());
 
         component.discoveredSubscriberExecutor.execute(() -> {