[VOL-4619] Only master cleans up queues on device disable/remove

Change-Id: Ic84e422bd8884815ec6ae78f4cdbc03cdf2c8068
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index e391537..1f103d8 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -554,7 +554,7 @@
                             }
 
                             if (log.isTraceEnabled()) {
-                            log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
+                                log.trace("Processing subscriber {} on port {} with status {}, has subscriber {}",
                                      sub, portWithName(sub.port), sub.status, sub.hasSubscriber);
                             }
 
@@ -582,6 +582,10 @@
                                                   portWithName(sub.port));
                                     }
                                     removeSubscriberFromQueue(sub);
+                                } else {
+                                    log.debug("Not handling basic port flows " +
+                                                      "for {}, leaving in the queue",
+                                              portWithName(sub.port));
                                 }
                             }
                         }
@@ -793,7 +797,11 @@
                             //NOTE all the instances will call these methods
                             oltFlowService.purgeDeviceFlows(deviceId);
                             oltMeterService.purgeDeviceMeters(deviceId);
-                            clearQueueForDevice(deviceId);
+                            // cpStatus is a distributed map, thus only master will update it.
+                            if (oltDeviceService.isLocalLeader(deviceId)) {
+                                log.debug("Master, clearing cp status for {}", deviceId);
+                                clearQueueForDevice(deviceId);
+                            }
                         } else {
                             log.info("Device {} availability changed to false, but ports are still available, " +
                                             "assuming temporary disconnection.",
@@ -811,7 +819,10 @@
                         log.info("Device {} Removed, purging meters and flows", deviceId);
                         oltFlowService.purgeDeviceFlows(deviceId);
                         oltMeterService.purgeDeviceMeters(deviceId);
-                        clearQueueForDevice(deviceId);
+                        if (oltDeviceService.isLocalLeader(deviceId)) {
+                            log.debug("Master, clearing cp status for {}", deviceId);
+                            clearQueueForDevice(deviceId);
+                        }
                         return;
                     default:
                         if (log.isTraceEnabled()) {
@@ -830,6 +841,7 @@
                     Map.Entry<ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>> entry = iter.next();
                     if (entry.getKey().deviceId().equals(devId)) {
                         eventsQueues.remove(entry.getKey());
+                        log.debug("Removing key from queue {}", entry.getKey());
                     }
                 }
             } finally {
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 4fb56df..6252144 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -480,6 +480,7 @@
 
         // we only need to something if EAPOL is enabled
         if (!enableEapol) {
+            log.debug("Eapol is disabled for {}", portWithName(sub.port));
             return true;
         }
 
@@ -495,7 +496,7 @@
     }
 
     private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
-
+        log.debug("Adding default flows for {}, status {}", portWithName(sub.port), sub.status);
         if (!oltMeterService.createMeter(sub.device.id(), bandwidthProfileId)) {
             if (log.isTraceEnabled()) {
                 log.trace("waiting on meter for bp {} and sub {}", bandwidthProfileId, sub);
@@ -503,6 +504,8 @@
             return false;
         }
         if (hasDefaultEapol(sub.port)) {
+            OltPortStatus status = getOltPortStatus(sub.port, defaultEapolUniTag);
+            log.debug("Eapol is already present for {} with status {}", portWithName(sub.port), status);
             return true;
         }
         return handleEapolFlow(sub, bandwidthProfileId,
@@ -1227,7 +1230,6 @@
             if (!VlanId.vlanId(VlanId.NO_VID).equals(unitagMatch)) {
                 filterBuilder.addCondition(Criteria.matchVlanId(unitagMatch));
             }
-
             if (!VlanId.vlanId(VlanId.NO_VID).equals(cTag)) {
                 treatmentBuilder.setVlanId(cTag);
             }