diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 5ee9375..e391537 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -56,16 +56,19 @@
 
 import java.util.ArrayList;
 import java.util.Dictionary;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -85,6 +88,7 @@
                 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
                 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
                 FLOW_PROCESSING_THREADS + ":Integer=" + FLOW_PROCESSING_THREADS_DEFAULT,
+                FLOW_EXECUTOR_QUEUE_SIZE + ":Integer=" + FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT,
                 SUBSCRIBER_PROCESSING_THREADS + ":Integer=" + SUBSCRIBER_PROCESSING_THREADS_DEFAULT,
                 REQUEUE_DELAY + ":Integer=" + REQUEUE_DELAY_DEFAULT
         })
@@ -150,6 +154,11 @@
     /**
      * Number of threads used to process flows.
      **/
+    protected int flowExecutorQueueSize = FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT;
+
+    /**
+     * Number of threads used to process flows.
+     **/
     protected int subscriberProcessingThreads = SUBSCRIBER_PROCESSING_THREADS_DEFAULT;
 
     /**
@@ -252,13 +261,20 @@
             flowProcessingThreads = isNullOrEmpty(flowThreads) ?
                     oldFlowProcessingThreads : Integer.parseInt(flowThreads.trim());
 
-            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads) {
+            String executorQueueSize = get(properties, FLOW_EXECUTOR_QUEUE_SIZE);
+            int oldExecutorQueueSize = flowExecutorQueueSize;
+            flowExecutorQueueSize = isNullOrEmpty(executorQueueSize) ?
+                    oldExecutorQueueSize : Integer.parseInt(executorQueueSize.trim());
+
+            if (flowsExecutor == null || oldFlowProcessingThreads != flowProcessingThreads
+                    || oldExecutorQueueSize != flowExecutorQueueSize) {
                 if (flowsExecutor != null) {
                     flowsExecutor.shutdown();
                 }
-                flowsExecutor = Executors.newFixedThreadPool(flowProcessingThreads,
-                        groupedThreads(ONOS_OLT_SERVICE,
-                                "flows-installer-%d"));
+
+                flowsExecutor = new ThreadPoolExecutor(0, flowProcessingThreads, 30,
+                        TimeUnit.SECONDS, new ThreadPoolQueue(flowExecutorQueueSize),
+                        new ThreadPoolExecutor.DiscardPolicy());
             }
 
             String subscriberThreads = get(properties, SUBSCRIBER_PROCESSING_THREADS);
@@ -279,11 +295,12 @@
             requeueDelay = isNullOrEmpty(queueDelay) ?
                     REQUEUE_DELAY_DEFAULT : Integer.parseInt(queueDelay.trim());
         }
-        log.info("Modified. Values = {}: {}, {}: {}, " +
+        log.info("Modified. Values = {}: {}, {}:{}, {}:{}," +
                         "{}:{}, {}:{}, {}:{}",
                 DEFAULT_BP_ID, defaultBpId,
                 DEFAULT_MCAST_SERVICE_NAME, multicastServiceName,
                 FLOW_PROCESSING_THREADS, flowProcessingThreads,
+                FLOW_EXECUTOR_QUEUE_SIZE, flowExecutorQueueSize,
                 SUBSCRIBER_PROCESSING_THREADS, subscriberProcessingThreads,
                 REQUEUE_DELAY, requeueDelay);
     }
@@ -490,97 +507,99 @@
     }
 
     protected void processDiscoveredSubscribers() {
-        log.info("Started processDiscoveredSubscribers loop");
-        while (true) {
-            Set<ConnectPoint> discoveredCps;
-            try {
-                queueReadLock.lock();
-                discoveredCps = eventsQueues.keySet();
-            } catch (Exception e) {
-                log.error("Cannot read keys from queue map", e);
-                return;
-            } finally {
-                queueReadLock.unlock();
-            }
 
-            discoveredCps.forEach(cp -> {
-                LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
-
+            log.info("Started processDiscoveredSubscribers loop");
+            while (true) {
+                Set<ConnectPoint> discoveredCps;
                 try {
                     queueReadLock.lock();
-                    eventsQueue = eventsQueues.get(cp);
+                    discoveredCps = new HashSet<>(eventsQueues.keySet());
                 } catch (Exception e) {
-                    log.error("Cannot get key from queue map", e);
-                    return;
+                    log.error("Cannot read keys from queue map", e);
+                    continue;
                 } finally {
                     queueReadLock.unlock();
                 }
 
-                if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
-                    // if we're not local leader for this device, ignore this queue
-                    if (log.isTraceEnabled()) {
-                        log.trace("Ignoring queue on CP {} as not master of the device", cp);
+                discoveredCps.forEach(cp -> {
+                    LinkedBlockingQueue<DiscoveredSubscriber> eventsQueue;
+
+                    try {
+                        queueReadLock.lock();
+                        eventsQueue = eventsQueues.get(cp);
+                    } catch (Exception e) {
+                        log.error("Cannot get key from queue map", e);
+                        return;
+                    } finally {
+                        queueReadLock.unlock();
                     }
-                    return;
-                }
 
-                flowsExecutor.execute(() -> {
-                    if (!eventsQueue.isEmpty()) {
-                        // we do not remove the event from the queue until it has been processed
-                        // in that way we guarantee that events are processed in order
-                        DiscoveredSubscriber sub = eventsQueue.peek();
-                        if (sub == null) {
-                            // the queue is empty
-                            return;
-                        }
-
+                    if (!oltDeviceService.isLocalLeader(cp.deviceId())) {
+                        // if we're not local leader for this device, ignore this queue
                         if (log.isTraceEnabled()) {
-                            log.trace("Processing subscriber on port {} with status {}",
-                                    portWithName(sub.port), sub.status);
+                            log.trace("Ignoring queue on CP {} as not master of the device", cp);
                         }
+                        return;
+                    }
 
-                        if (sub.hasSubscriber) {
-                            // this is a provision subscriber call
-                            if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Provisioning of subscriber on {} completed",
-                                            portWithName(sub.port));
-                                }
-                                removeSubscriberFromQueue(sub);
-                            }
-                        } else {
-                            // this is a port event (ENABLED/DISABLED)
-                            // means no subscriber was provisioned on that port
-
-                            if (!deviceService.isAvailable(sub.device.id()) ||
-                                    deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
-                                // If the device is not connected or the port is not available do nothing
-                                // This can happen when we disable and then immediately delete the device,
-                                // the queue is populated but the meters and flows are already gone
-                                // thus there is nothing left to do
+                    try {
+                        flowsExecutor.execute(() -> {
+                        if (!eventsQueue.isEmpty()) {
+                            // we do not remove the event from the queue until it has been processed
+                            // in that way we guarantee that events are processed in order
+                            DiscoveredSubscriber sub = eventsQueue.peek();
+                            if (sub == null) {
+                                // the queue is empty
                                 return;
                             }
 
-                            if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
-                                if (log.isTraceEnabled()) {
-                                    log.trace("Processing of port {} completed",
-                                            portWithName(sub.port));
+                            if (log.isTraceEnabled()) {
+                            log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
+                                     sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
+                            }
+
+                            if (sub.hasSubscriber) {
+                                // this is a provision subscriber call
+                                if (oltFlowService.handleSubscriberFlows(sub, defaultBpId, multicastServiceName)) {
+                                    removeSubscriberFromQueue(sub);
                                 }
-                                removeSubscriberFromQueue(sub);
+                            } else {
+                                // this is a port event (ENABLED/DISABLED)
+                                // means no subscriber was provisioned on that port
+
+                                if (!deviceService.isAvailable(sub.device.id()) ||
+                                        deviceService.getPort(sub.device.id(), sub.port.number()) == null) {
+                                    // If the device is not connected or the port is not available do nothing
+                                    // This can happen when we disable and then immediately delete the device,
+                                    // the queue is populated but the meters and flows are already gone
+                                    // thus there is nothing left to do
+                                    return;
+                                }
+
+                                if (oltFlowService.handleBasicPortFlows(sub, defaultBpId, defaultBpId)) {
+                                    if (log.isTraceEnabled()) {
+                                        log.trace("Processing of port {} completed",
+                                                  portWithName(sub.port));
+                                    }
+                                    removeSubscriberFromQueue(sub);
+                                }
                             }
                         }
+                    });
+                    } catch (Exception e) {
+                        log.error("Exception processing subscriber", e);
                     }
                 });
-            });
 
-            try {
-                TimeUnit.MILLISECONDS.sleep(requeueDelay);
-            } catch (InterruptedException e) {
-                continue;
+                try {
+                    TimeUnit.MILLISECONDS.sleep(requeueDelay);
+                } catch (InterruptedException e) {
+                    log.debug("Interrupted while waiting to requeue", e);
+                }
             }
-        }
     }
 
+
     /**
      * Checks the subscriber uni tag list and find the uni tag information.
      * using the pon c tag, pon s tag and the technology profile id
@@ -641,58 +660,55 @@
     }
 
     protected void addSubscriberToQueue(DiscoveredSubscriber sub) {
-        ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
-        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
         try {
-            queueReadLock.lock();
-            q = eventsQueues.getOrDefault(cp, new LinkedBlockingQueue<>());
-        } finally {
-            queueReadLock.unlock();
-        }
+            ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
 
-        log.info("Adding subscriber to queue: {} with status {} and subscriber {}",
-                portWithName(sub.port), sub.status, sub.hasSubscriber);
-        q.add(sub);
-
-        try {
-            queueWriteLock.lock();
-            eventsQueues.put(cp, q);
-        } catch (UnsupportedOperationException | ClassCastException |
-                NullPointerException | IllegalArgumentException e) {
-            log.error("Cannot add subscriber to queue: {}", e.getMessage());
-        } finally {
-            queueWriteLock.unlock();
+            try {
+                queueWriteLock.lock();
+                eventsQueues.compute(cp, (subcp, queue) -> {
+                    queue = queue == null ? new LinkedBlockingQueue<>() : queue;
+                    log.info("Adding subscriber {} to queue: {} with existing {}",
+                             sub, portWithName(sub.port), queue);
+                    queue.add(sub);
+                    return queue;
+                });
+            } catch (UnsupportedOperationException | ClassCastException |
+                    NullPointerException | IllegalArgumentException e) {
+                log.error("Cannot add subscriber {} to queue: {}", portWithName(sub.port), e.getMessage());
+            } finally {
+                queueWriteLock.unlock();
+            }
+        } catch (Exception e) {
+            log.error("Can't add {} to queue", sub, e);
         }
     }
 
     protected void removeSubscriberFromQueue(DiscoveredSubscriber sub) {
         ConnectPoint cp = new ConnectPoint(sub.device.id(), sub.port.number());
-        LinkedBlockingQueue<DiscoveredSubscriber> q = null;
-        if (log.isTraceEnabled()) {
-            log.trace("removing subscriber {} from queue", sub);
-        }
-        try {
-            queueReadLock.lock();
-            q = eventsQueues.get(cp);
-        } finally {
-            queueReadLock.unlock();
-        }
-        if (q == null) {
-            log.warn("Cannot find queue for connectPoint {}", cp);
-            return;
-        }
-        boolean removed = q.remove(sub);
-        if (!removed) {
-            log.warn("Subscriber {} has not been removed from queue, is it still there? {}", sub, q);
-            return;
-        } else {
-            log.debug("Subscriber {} has been removed from the queue", sub);
-        }
-
         try {
             queueWriteLock.lock();
-            eventsQueues.remove(cp); // am I needed??
-            eventsQueues.put(cp, q);
+            eventsQueues.compute(cp, (subcp, queue) -> {
+                if (log.isTraceEnabled()) {
+                    log.trace("Removing subscriber {} from queue : {} " +
+                                      "with existing {}", sub,
+                              portWithName(sub.port), queue);
+                }
+                if (queue == null) {
+                    log.warn("Cannot find queue for connectPoint {}", cp);
+                    return queue;
+                }
+                boolean removed = queue.remove(sub);
+                if (!removed) {
+                    log.warn("Subscriber {} has not been removed from queue, " +
+                                     "is it still there? {}", sub, queue);
+                    return queue;
+                } else {
+                    log.debug("Subscriber {} has been removed from the queue {}",
+                              sub, queue);
+                }
+
+                return queue;
+            });
         } catch (UnsupportedOperationException | ClassCastException |
                 NullPointerException | IllegalArgumentException e) {
             log.error("Cannot remove subscriber {} from queue: {}", sub, e.getMessage());
@@ -721,7 +737,16 @@
 
         @Override
         public void event(DeviceEvent event) {
+            if (log.isTraceEnabled()) {
+                log.trace("OltListener receives event {} for: {}/{}", event.type(), event.subject().id(),
+                         event.port() != null ? event.port().number() : null);
+            }
             eventExecutor.execute(() -> {
+                if (log.isTraceEnabled()) {
+                    log.trace("OltListener Executor receives event {} for: {}/{}",
+                             event.type(), event.subject().id(),
+                              event.port() != null ? event.port().number() : null);
+                }
                 boolean isOlt = oltDeviceService.isOlt(event.subject());
                 DeviceId deviceId = event.subject().id();
                 switch (event.type()) {
@@ -815,8 +840,15 @@
         protected void handleOltPort(DeviceEvent.Type type, Device device, Port port) {
             log.info("OltDeviceListener receives event {} for port {} with status {} on device {}", type,
                     portWithName(port), port.isEnabled() ? "ENABLED" : "DISABLED", device.id());
+            boolean isNni = oltDeviceService.isNniPort(device, port.number());
+
+            if (!isNni && type == DeviceEvent.Type.PORT_ADDED) {
+                log.debug("Ignoring PORT_ADD on UNI port {}", portWithName(port));
+                return;
+            }
+
             if (port.isEnabled()) {
-                if (oltDeviceService.isNniPort(device, port.number())) {
+                if (isNni) {
                     OltFlowService.FlowOperation action = OltFlowService.FlowOperation.ADD;
                     // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
                     // In that case the flows are purged anyway, so there's no need to deal with them,
@@ -853,7 +885,7 @@
                     addSubscriberToQueue(sub);
                 }
             } else {
-                if (oltDeviceService.isNniPort(device, port.number())) {
+                if (isNni) {
                     // NOTE the NNI is only disabled if the OLT shuts down (reboot or failure).
                     // In that case the flows are purged anyway, so there's no need to deal with them,
                     // it would actually be counter-productive as the openflow connection is severed and they won't
@@ -918,4 +950,26 @@
             }
         }
     }
+
+    protected final class ThreadPoolQueue extends ArrayBlockingQueue<Runnable> {
+
+        public ThreadPoolQueue(int capacity) {
+            super(capacity);
+        }
+
+        @Override
+        public boolean offer(Runnable runnable) {
+            if (runnable == null) {
+                return false;
+            }
+            try {
+                put(runnable);
+            } catch (InterruptedException e1) {
+                Thread.currentThread().interrupt();
+                return false;
+            }
+            return true;
+        }
+
+    }
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 4dafd16..ecaaa3d 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -85,11 +85,13 @@
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -722,20 +724,20 @@
 
     @Override
     public boolean isSubscriberServiceProvisioned(AccessDevicePort cp) {
-        // if any service is programmed on this port, returns true
-        AtomicBoolean provisioned = new AtomicBoolean(false);
+        Set<Map.Entry<ServiceKey, Boolean>> subs;
         try {
             provisionedSubscribersReadLock.lock();
-            for (Map.Entry<ServiceKey, Boolean> entry : provisionedSubscribers.entrySet()) {
-                if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
-                    provisioned.set(true);
-                    break;
-                }
-            }
+            subs = new HashSet<>(provisionedSubscribers.entrySet());
         } finally {
             provisionedSubscribersReadLock.unlock();
         }
-        return provisioned.get();
+
+        for (Map.Entry<ServiceKey, Boolean> entry : subs) {
+            if (entry.getKey().getPort().equals(cp) && entry.getValue()) {
+                return true;
+            }
+        }
+        return false;
     }
 
     @Override
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index a73a42b..35095b6 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -65,10 +65,14 @@
     public static final int REQUIRED_DRIVERS_PROPERTY_DELAY_DEFAULT = 5;
 
     public static final String FLOW_PROCESSING_THREADS = "flowProcessingThreads";
-    public static final int FLOW_PROCESSING_THREADS_DEFAULT = 8;
+    public static final int FLOW_PROCESSING_THREADS_DEFAULT = 32;
+
+    //Giving it a value of * 4 the number of flows.
+    public static final String FLOW_EXECUTOR_QUEUE_SIZE = "flowExecutorQueueSize";
+    public static final int FLOW_EXECUTOR_QUEUE_SIZE_DEFAULT = 128;
 
     public static final String SUBSCRIBER_PROCESSING_THREADS = "subscriberProcessingThreads";
-    public static final int SUBSCRIBER_PROCESSING_THREADS_DEFAULT = 8;
+    public static final int SUBSCRIBER_PROCESSING_THREADS_DEFAULT = 24;
 
     public static final String REQUEUE_DELAY = "requeueDelay";
     public static final int REQUEUE_DELAY_DEFAULT = 500;
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java b/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
index 6af50f5..b3b20e6 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
@@ -186,7 +186,8 @@
         PortNumber uniPortNumber = PortNumber.portNumber(16);
         Port uniAddedDisabled = new OltPort(testDevice, false, uniPortNumber,
                 DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1").build());
-        DeviceEvent uniAddedDisabledEvent = new DeviceEvent(DeviceEvent.Type.PORT_ADDED, testDevice, uniAddedDisabled);
+        DeviceEvent uniAddedDisabledEvent =
+                new DeviceEvent(DeviceEvent.Type.PORT_UPDATED, testDevice, uniAddedDisabled);
         ConnectPoint cp = new ConnectPoint(testDevice.id(), uniPortNumber);
 
         // if the port does not have default EAPOL we should not generate an event
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
index 0448f9d..d968c24 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
@@ -45,6 +45,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -115,8 +116,7 @@
         reset(component.oltFlowService);
 
         component.bindSadisService(sadisService);
-        component.eventsQueues = component.storageService.
-                <ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder().build().asJavaMap();
+        component.eventsQueues = new HashMap<>();
         component.eventsQueues.put(cp, new LinkedBlockingQueue<>());
 
         component.discoveredSubscriberExecutor.execute(() -> {
