[VOL-4549] Improving event handling in order to reach a higher scale

- isSubscriberServiceProvisioned is not holding the lock for the full iteration on the map, but reading, releasing the lock and then iterating
- PORT_ADDED event for UNI ports are discarded as inconsequential, when they are discovered the status is always DISABLED, we start operating on the once the status changes and that happens in a PORT_UPDATED event. In case of ports up/down we always get a PORT_UPDATED as it's already known to the system.
- the executor now uses a custom pool for serving threads
- the queue add/remove now holds a lock for all the computation and uses the .compute method to process the operation

Change-Id: Icedea07d32d1cddb339d672f3b274a6c7f941903
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 5ee9375..e391537 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -56,16 +56,19 @@
 
 import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -85,6 +88,7 @@
                 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
                 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
                 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
+                FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
                 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
                 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
         })
@@ -150,6 +154,11 @@
     /**
      * Number of threads used to process flows.
      **/
+    protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
+
+    /**
+     * Number of threads used to process flows.
+     **/
     protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
 
     /**
@@ -252,13 +261,20 @@
             flowProcessingThreads = isNullOrEmpty(flowThreads) ?
                     oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
 
-            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
+            String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
+            int oldExecutorQueueSize = flowExecutorQueueSize;
+            flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
+                    oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
+
+            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
+                    || oldExecutorQueueSize != flowExecutorQueueSize) {
                 if (flowsExecutor != null) {
                     flowsExecutor.shutdown();
                 }
-                flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
-                        groupedThreads(ONOS_OLT_SERVICE,
-                                "flows-installer-%d"));
+
+                flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
+                        TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
+                        new ThreadPoolExecutor.DiscardPolicy());
             }
 
             String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
@@ -279,11 +295,12 @@
             requeueDelay = isNullOrEmpty(queueDelay) ?
                     REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
         }
-        log.info("Modified. Values = {}: {}, {}: {}, " +
+        log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
                         "{}:{}, {}:{}, {}:{}",
                 DEFAULT_BP_ID, defaultBpId,
                 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
                 FLOW_PROCESSING_THREADS, flowProcessingThreads,
+                FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
                 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
                 REQUEUE_DELAY, requeueDelay);
     }
@@ -490,97 +507,99 @@
     }
 
     protected void processDiscoveredSubscribers() {
-        log.info("Started processDiscoveredSubscribers loop");
-        while (true) {
-            Set<ConnectPoint> discoveredCps;
-            try {
-                queueReadLock.lock();
-                discoveredCps = eventsQueues.keySet();
-            } catch (Exception e) {
-                log.error("Cannot read keys from queue map", e);
-                return;
-            } finally {
-                queueReadLock.unlock();
-            }
 
-            discoveredCps.forEach(cp -> {
-                LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
-
+            log.info("Started processDiscoveredSubscribers loop");
+            while (true) {
+                Set<ConnectPoint> discoveredCps;
                 try {
                     queueReadLock.lock();
-                    eventsQueue = eventsQueues.get(cp);
+                    discoveredCps = new HashSet<>(eventsQueues.keySet());
                 } catch (Exception e) {
-                    log.error("Cannot get key from queue map", e);
-                    return;
+                    log.error("Cannot read keys from queue map", e);
+                    continue;
                 } finally {
                     queueReadLock.unlock();
                 }
 
-                if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
-                    // if we're not local leader for this device, ignore this queue
-                    if (log.isTraceEnabled()) {
-                        log.trace("Ignoring queue on CP {} as not master of the device", cp);
+                discoveredCps.forEach(cp -> {
+                    LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
+
+                    try {
+                        queueReadLock.lock();
+                        eventsQueue = eventsQueues.get(cp);
+                    } catch (Exception e) {
+                        log.error("Cannot get key from queue map", e);
+                        return;
+                    } finally {
+                        queueReadLock.unlock();
                     }
-                    return;
-                }
 
-                flowsExecutor.execute(() -> {
-                    if (!eventsQueue.isEmpty()) {
-                        // we do not remove the event from the queue until it has been processed
-                        // in that way we guarantee that events are processed in order
-                        DiscoveredSubscriber sub = eventsQueue.peek();
-                        if (sub == null) {
-                            // the queue is empty
-                            return;
-                        }
-
+                    if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
+                        // if we're not local leader for this device, ignore this queue
                         if (log.isTraceEnabled()) {
-                            log.trace("Processing subscriber on port {} with status {}",
-                                    portWithName(sub.port), sub.status);
+                            log.trace("Ignoring queue on CP {} as not master of the device", cp);
                         }
+                        return;
+                    }
 
-                        if (sub.hasSubscriber) {
-                            // this is a provision subscriber call
-                            if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Provisioning of subscriber on {} completed",
-                                            portWithName(sub.port));
-                                }
-                                removeSubscriberFromQueue(sub);
-                            }
-                        } else {
-                            // this is a port event (ENABLED/DISABLED)
-                            // means no subscriber was provisioned on that port
-
-                            if (!deviceService.isAvailable(sub.device.id()) ||
-                                    deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
-                                // If the device is not connected or the port is not available do nothing
-                                // This can happen when we disable and then immediately delete the device,
-                                // the queue is populated but the meters and flows are already gone
-                                // thus there is nothing left to do
+                    try {
+                        flowsExecutor.execute(() -> {
+                        if (!eventsQueue.isEmpty()) {
+                            // we do not remove the event from the queue until it has been processed
+                            // in that way we guarantee that events are processed in order
+                            DiscoveredSubscriber sub = eventsQueue.peek();
+                            if (sub == null) {
+                                // the queue is empty
                                 return;
                             }
 
-                            if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Processing of port {} completed",
-                                            portWithName(sub.port));
+                            if (log.isTraceEnabled()) {
+                            log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
+                                     sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
+                            }
+
+                            if (sub.hasSubscriber) {
+                                // this is a provision subscriber call
+                                if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
+                                    removeSubscriberFromQueue(sub);
                                 }
-                                removeSubscriberFromQueue(sub);
+                            } else {
+                                // this is a port event (ENABLED/DISABLED)
+                                // means no subscriber was provisioned on that port
+
+                                if (!deviceService.isAvailable(sub.device.id()) ||
+                                        deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
+                                    // If the device is not connected or the port is not available do nothing
+                                    // This can happen when we disable and then immediately delete the device,
+                                    // the queue is populated but the meters and flows are already gone
+                                    // thus there is nothing left to do
+                                    return;
+                                }
+
+                                if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
+                                    if (log.isTraceEnabled()) {
+                                        log.trace("Processing of port {} completed",
+                                                  portWithName(sub.port));
+                                    }
+                                    removeSubscriberFromQueue(sub);
+                                }
                             }
                         }
+                    });
+                    } catch (Exception e) {
+                        log.error("Exception processing subscriber", e);
                     }
                 });
-            });
 
-            try {
-                TimeUnit.MILLISECONDS.sleep(requeueDelay);
-            } catch (InterruptedException e) {
-                continue;
+                try {
+                    TimeUnit.MILLISECONDS.sleep(requeueDelay);
+                } catch (InterruptedException e) {
+                    log.debug("Interrupted while waiting to requeue", e);
+                }
             }
-        }
     }
 
+
     /**
      * Checks the subscriber uni tag list and find the uni tag information.
      * using the pon c tag, pon s tag and the technology profile id
@@ -641,58 +660,55 @@
     }
 
     protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
-        ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
-        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
         try {
-            queueReadLock.lock();
-            q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
-        } finally {
-            queueReadLock.unlock();
-        }
+            ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
 
-        log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
-                portWithName(sub.port), sub.status, sub.hasSubscriber);
-        q.add(sub);
-
-        try {
-            queueWriteLock.lock();
-            eventsQueues.put(cp, q);
-        } catch (UnsupportedOperationException | ClassCastException |
-                NullPointerException | IllegalArgumentException e) {
-            log.error("Cannot add subscriber to queue: {}", e.getMessage());
-        } finally {
-            queueWriteLock.unlock();
+            try {
+                queueWriteLock.lock();
+                eventsQueues.compute(cp, (subcp, queue) -> {
+                    queue = queue == null ? new LinkedBlockingQueue<>() : queue;
+                    log.info("Adding subscriber {} to queue: {} with existing {}",
+                             sub, portWithName(sub.port), queue);
+                    queue.add(sub);
+                    return queue;
+                });
+            } catch (UnsupportedOperationException | ClassCastException |
+                    NullPointerException | IllegalArgumentException e) {
+                log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
+            } finally {
+                queueWriteLock.unlock();
+            }
+        } catch (Exception e) {
+            log.error("Can't add {} to queue", sub, e);
         }
     }
 
     protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
         ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
-        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
-        if (log.isTraceEnabled()) {
-            log.trace("removing subscriber {} from queue", sub);
-        }
-        try {
-            queueReadLock.lock();
-            q = eventsQueues.get(cp);
-        } finally {
-            queueReadLock.unlock();
-        }
-        if (q == null) {
-            log.warn("Cannot find queue for connectPoint {}", cp);
-            return;
-        }
-        boolean removed = q.remove(sub);
-        if (!removed) {
-            log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
-            return;
-        } else {
-            log.debug("Subscriber {} has been removed from the queue", sub);
-        }
-
         try {
             queueWriteLock.lock();
-            eventsQueues.remove(cp); // am I needed??
-            eventsQueues.put(cp, q);
+            eventsQueues.compute(cp, (subcp, queue) -> {
+                if (log.isTraceEnabled()) {
+                    log.trace("Removing subscriber {} from queue : {} " +
+                                      "with existing {}", sub,
+                              portWithName(sub.port), queue);
+                }
+                if (queue == null) {
+                    log.warn("Cannot find queue for connectPoint {}", cp);
+                    return queue;
+                }
+                boolean removed = queue.remove(sub);
+                if (!removed) {
+                    log.warn("Subscriber {} has not been removed from queue, " +
+                                     "is it still there? {}", sub, queue);
+                    return queue;
+                } else {
+                    log.debug("Subscriber {} has been removed from the queue {}",
+                              sub, queue);
+                }
+
+                return queue;
+            });
         } catch (UnsupportedOperationException | ClassCastException |
                 NullPointerException | IllegalArgumentException e) {
             log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
@@ -721,7 +737,16 @@
 
         @Override
         public void event(DeviceEvent event) {
+            if (log.isTraceEnabled()) {
+                log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
+                         event.port() != null ? event.port().number() : null);
+            }
             eventExecutor.execute(() -> {
+                if (log.isTraceEnabled()) {
+                    log.trace("OltListener Executor receives event {} for: {}/{}",
+                             event.type(), event.subject().id(),
+                              event.port() != null ? event.port().number() : null);
+                }
                 boolean isOlt = oltDeviceService.isOlt(event.subject());
                 DeviceId deviceId = event.subject().id();
                 switch (event.type()) {
@@ -815,8 +840,15 @@
         protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
             log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
                     portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
+            boolean isNni = oltDeviceService.isNniPort(device, port.number());
+
+            if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
+                log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
+                return;
+            }
+
             if (port.isEnabled()) {
-                if (oltDeviceService.isNniPort(device, port.number())) {
+                if (isNni) {
                     OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
                     // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
                     // In that case the flows are purged anyway, so there's no need to deal with them,
@@ -853,7 +885,7 @@
                     addSubscriberToQueue(sub);
                 }
             } else {
-                if (oltDeviceService.isNniPort(device, port.number())) {
+                if (isNni) {
                     // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
                     // In that case the flows are purged anyway, so there's no need to deal with them,
                     // it would actually be counter-productive as the openflow connection is severed and they won't
@@ -918,4 +950,26 @@
             }
         }
     }
+
+    protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
+
+        public ThreadPoolQueue(int capacity) {
+            super(capacity);
+        }
+
+        @Override
+        public boolean offer(Runnable runnable) {
+            if (runnable == null) {
+                return false;
+            }
+            try {
+                put(runnable);
+            } catch (InterruptedException e1) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+
+    }
 }