[VOL-4619] Only master cleans up queues on device disable/remove

Change-Id: Ic84e422bd8884815ec6ae78f4cdbc03cdf2c8068
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index e391537..1f103d8 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -554,7 +554,7 @@
                             }
 
                             if (log.isTraceEnabled()) {
-                            log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
+                                log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
                                      sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
                             }
 
@@ -582,6 +582,10 @@
                                                   portWithName(sub.port));
                                     }
                                     removeSubscriberFromQueue(sub);
+                                } else {
+                                    log.debug("Not handling basic port flows " +
+                                                      "for {}, leaving in the queue",
+                                              portWithName(sub.port));
                                 }
                             }
                         }
@@ -793,7 +797,11 @@
                             //NOTE all the instances will call these methods
                             oltFlowService.purgeDeviceFlows(deviceId);
                             oltMeterService.purgeDeviceMeters(deviceId);
-                            clearQueueForDevice(deviceId);
+                            // cpStatus is a distributed map, thus only master will update it.
+                            if (oltDeviceService.isLocalLeader(deviceId)) {
+                                log.debug("Master, clearing cp status for {}", deviceId);
+                                clearQueueForDevice(deviceId);
+                            }
                         } else {
                             log.info("Device {} availability changed to false, but ports are still available, " +
                                             "assuming temporary disconnection.",
@@ -811,7 +819,10 @@
                         log.info("Device {} Removed, purging meters and flows", deviceId);
                         oltFlowService.purgeDeviceFlows(deviceId);
                         oltMeterService.purgeDeviceMeters(deviceId);
-                        clearQueueForDevice(deviceId);
+                        if (oltDeviceService.isLocalLeader(deviceId)) {
+                            log.debug("Master, clearing cp status for {}", deviceId);
+                            clearQueueForDevice(deviceId);
+                        }
                         return;
                     default:
                         if (log.isTraceEnabled()) {
@@ -830,6 +841,7 @@
                     Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
                     if (entry.getKey().deviceId().equals(devId)) {
                         eventsQueues.remove(entry.getKey());
+                        log.debug("Removing key from queue {}", entry.getKey());
                     }
                 }
             } finally {