During subscriber removal wait for flows to be removed before installing the default EAPOL flow

Change-Id: Idd758526b509621dfb42f3e883bac8c3a8931ec5
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index a4f2b5f..eed2b3e 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -175,7 +175,7 @@
     protected ApplicationId appId;
     private static final Integer MAX_PRIORITY = 10000;
     private static final Integer MIN_PRIORITY = 1000;
-    private static final short EAPOL_DEFAULT_VLAN = 4091;
+    protected static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final int NONE_TP_ID = -1;
     private static final String V4 = "V4";
     private static final String V6 = "V6";
@@ -567,29 +567,40 @@
         return true;
     }
 
-    private boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+    protected boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
                                           String multicastServiceName) {
 
         if (log.isTraceEnabled()) {
             log.trace("Removal of subscriber on {} started",
                     portWithName(sub.port));
         }
-        SubscriberAndDeviceInformation si = subsService.get(sub.portName());
-        if (si == null) {
-            log.error("Subscriber information not found in sadis for port {} during subscriber removal",
-                    portWithName(sub.port));
-            // NOTE that we are returning true so that the subscriber is removed from the queue
-            // and we can move on provisioning others
-            return true;
-        }
+        SubscriberAndDeviceInformation si = sub.subscriberAndDeviceInformation;
 
         handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
 
         if (enableEapol) {
             // remove the tagged eapol
             handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
+        }
+        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
 
-            // and add the default one back (only if the port is ENABLED and still present on the device)
+        handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
+
+        if (enableEapol) {
+
+            // if any of the services still has flows, return false
+            Iterator<UniTagInformation> iter = sub.subscriberAndDeviceInformation.uniTagList().iterator();
+            while (iter.hasNext()) {
+                UniTagInformation entry = iter.next();
+                if (areSubscriberFlowsPendingRemoval(sub.port, entry)) {
+                    log.info("Subscriber {} still have flows on service {}, postpone default EAPOL installation.",
+                            portWithName(sub.port), entry.getServiceName());
+                    return false;
+                }
+            }
+
+            // once the flows are removed add the default one back
+            // (only if the port is ENABLED and still present on the device)
             if (sub.port.isEnabled() && deviceService.getPort(sub.device.id(), sub.port.number()) != null) {
 
                 // NOTE we remove the subscriber when the port goes down
@@ -598,10 +609,6 @@
                         FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
             }
         }
-        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
-
-        handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
-
         // FIXME check the return status of the flow and return accordingly
         log.info("Removal of subscriber on {} completed", portWithName(sub.port));
         return true;
@@ -661,6 +668,15 @@
                 status.subscriberFlowsStatus == OltFlowsStatus.PENDING_ADD);
     }
 
+    public boolean areSubscriberFlowsPendingRemoval(Port port, UniTagInformation uti) {
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during pending_remove flow check {} for port {} and UniTagInformation {}",
+                    status, portWithName(port), uti);
+        }
+        return status != null && status.subscriberFlowsStatus == OltFlowsStatus.PENDING_REMOVE;
+    }
+
     @Override
     public void purgeDeviceFlows(DeviceId deviceId) {
         log.debug("Purging flows on device {}", deviceId);
@@ -744,7 +760,7 @@
         }
     }
 
-    private boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
+    protected boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
                                     String oltBandwidthProfile, FlowOperation action, VlanId vlanId) {
 
         // create a subscriberKey for the EAPOL flow
@@ -846,7 +862,7 @@
     }
 
     // FIXME it's confusing that si is not necessarily inside the DiscoveredSubscriber
-    private boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
+    protected boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
                                                SubscriberAndDeviceInformation si) {
         if (!enableEapol) {
             return true;
@@ -879,7 +895,7 @@
         return success.get();
     }
 
-    private void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
+    protected void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
         sub.subscriberAndDeviceInformation.uniTagList().forEach(uti -> {
             if (uti.getIsIgmpRequired()) {
                 DeviceId deviceId = sub.device.id();
@@ -1003,7 +1019,7 @@
                                              SubscriberAndDeviceInformation si, String multicastServiceName) {
 
         Optional<Port> nniPort = oltDeviceService.getNniPort(device);
-        if (nniPort.isEmpty()) {
+        if (nniPort == null || nniPort.isEmpty()) {
             log.error("Cannot configure DP flows as upstream port is not configured for subscriber {} on {}",
                     si.id(), portWithName(port));
             return;