During subscriber removal wait for flows to be removed before installing the default EAPOL flow

Change-Id: Idd758526b509621dfb42f3e883bac8c3a8931ec5
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index a4f2b5f..eed2b3e 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -175,7 +175,7 @@
     protected ApplicationId appId;
     private static final Integer MAX_PRIORITY = 10000;
     private static final Integer MIN_PRIORITY = 1000;
-    private static final short EAPOL_DEFAULT_VLAN = 4091;
+    protected static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final int NONE_TP_ID = -1;
     private static final String V4 = "V4";
     private static final String V6 = "V6";
@@ -567,29 +567,40 @@
         return true;
     }
 
-    private boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
+    protected boolean removeSubscriberFlows(DiscoveredSubscriber sub, String defaultBandwithProfile,
                                           String multicastServiceName) {
 
         if (log.isTraceEnabled()) {
             log.trace("Removal of subscriber on {} started",
                     portWithName(sub.port));
         }
-        SubscriberAndDeviceInformation si = subsService.get(sub.portName());
-        if (si == null) {
-            log.error("Subscriber information not found in sadis for port {} during subscriber removal",
-                    portWithName(sub.port));
-            // NOTE that we are returning true so that the subscriber is removed from the queue
-            // and we can move on provisioning others
-            return true;
-        }
+        SubscriberAndDeviceInformation si = sub.subscriberAndDeviceInformation;
 
         handleSubscriberDhcpFlows(sub.device.id(), sub.port, FlowOperation.REMOVE, si);
 
         if (enableEapol) {
             // remove the tagged eapol
             handleSubscriberEapolFlows(sub, FlowOperation.REMOVE, si);
+        }
+        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
 
-            // and add the default one back (only if the port is ENABLED and still present on the device)
+        handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
+
+        if (enableEapol) {
+
+            // if any of the services still has flows, return false
+            Iterator<UniTagInformation> iter = sub.subscriberAndDeviceInformation.uniTagList().iterator();
+            while (iter.hasNext()) {
+                UniTagInformation entry = iter.next();
+                if (areSubscriberFlowsPendingRemoval(sub.port, entry)) {
+                    log.info("Subscriber {} still have flows on service {}, postpone default EAPOL installation.",
+                            portWithName(sub.port), entry.getServiceName());
+                    return false;
+                }
+            }
+
+            // once the flows are removed add the default one back
+            // (only if the port is ENABLED and still present on the device)
             if (sub.port.isEnabled() && deviceService.getPort(sub.device.id(), sub.port.number()) != null) {
 
                 // NOTE we remove the subscriber when the port goes down
@@ -598,10 +609,6 @@
                         FlowOperation.ADD, VlanId.vlanId(EAPOL_DEFAULT_VLAN));
             }
         }
-        handleSubscriberDataFlows(sub.device, sub.port, FlowOperation.REMOVE, si, multicastServiceName);
-
-        handleSubscriberIgmpFlows(sub, FlowOperation.REMOVE);
-
         // FIXME check the return status of the flow and return accordingly
         log.info("Removal of subscriber on {} completed", portWithName(sub.port));
         return true;
@@ -661,6 +668,15 @@
                 status.subscriberFlowsStatus == OltFlowsStatus.PENDING_ADD);
     }
 
+    public boolean areSubscriberFlowsPendingRemoval(Port port, UniTagInformation uti) {
+        OltPortStatus status = getOltPortStatus(port, uti);
+        if (log.isTraceEnabled()) {
+            log.trace("Status during pending_remove flow check {} for port {} and UniTagInformation {}",
+                    status, portWithName(port), uti);
+        }
+        return status != null && status.subscriberFlowsStatus == OltFlowsStatus.PENDING_REMOVE;
+    }
+
     @Override
     public void purgeDeviceFlows(DeviceId deviceId) {
         log.debug("Purging flows on device {}", deviceId);
@@ -744,7 +760,7 @@
         }
     }
 
-    private boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
+    protected boolean handleEapolFlow(DiscoveredSubscriber sub, String bandwidthProfile,
                                     String oltBandwidthProfile, FlowOperation action, VlanId vlanId) {
 
         // create a subscriberKey for the EAPOL flow
@@ -846,7 +862,7 @@
     }
 
     // FIXME it's confusing that si is not necessarily inside the DiscoveredSubscriber
-    private boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
+    protected boolean handleSubscriberEapolFlows(DiscoveredSubscriber sub, FlowOperation action,
                                                SubscriberAndDeviceInformation si) {
         if (!enableEapol) {
             return true;
@@ -879,7 +895,7 @@
         return success.get();
     }
 
-    private void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
+    protected void handleSubscriberIgmpFlows(DiscoveredSubscriber sub, FlowOperation action) {
         sub.subscriberAndDeviceInformation.uniTagList().forEach(uti -> {
             if (uti.getIsIgmpRequired()) {
                 DeviceId deviceId = sub.device.id();
@@ -1003,7 +1019,7 @@
                                              SubscriberAndDeviceInformation si, String multicastServiceName) {
 
         Optional<Port> nniPort = oltDeviceService.getNniPort(device);
-        if (nniPort.isEmpty()) {
+        if (nniPort == null || nniPort.isEmpty()) {
             log.error("Cannot configure DP flows as upstream port is not configured for subscriber {} on {}",
                     si.id(), portWithName(port));
             return;
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
index 39f764e..326311d 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -44,6 +44,7 @@
 import org.onosproject.net.HostLocation;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flow.DefaultFlowRule;
 import org.onosproject.net.flow.DefaultTrafficTreatment;
 import org.onosproject.net.flow.FlowRule;
@@ -84,9 +85,11 @@
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.PENDING_REMOVE;
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.REMOVED;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
 
 public class OltFlowServiceTest extends OltTestHelpers {
 
+    private OltFlowService component;
     private OltFlowService oltFlowService;
     OltFlowService.InternalFlowListener internalFlowListener;
     private final ApplicationId testAppId = new DefaultApplicationId(1, "org.opencord.olt.test");
@@ -104,30 +107,33 @@
 
     @Before
     public void setUp() {
-        oltFlowService = new OltFlowService();
-        oltFlowService.cfgService = new ComponentConfigAdapter();
-        oltFlowService.sadisService = Mockito.mock(SadisService.class);
-        oltFlowService.coreService = Mockito.spy(new CoreServiceAdapter());
-        oltFlowService.oltMeterService = Mockito.mock(OltMeterService.class);
-        oltFlowService.flowObjectiveService = Mockito.mock(FlowObjectiveService.class);
-        oltFlowService.hostService = Mockito.mock(HostService.class);
-        oltFlowService.flowRuleService = Mockito.mock(FlowRuleService.class);
-        oltFlowService.storageService = new TestStorageService();
-        oltFlowService.oltDeviceService = Mockito.mock(OltDeviceService.class);
-        oltFlowService.appId = testAppId;
+        component = new OltFlowService();
+        component.cfgService = new ComponentConfigAdapter();
+        component.sadisService = Mockito.mock(SadisService.class);
+        component.coreService = Mockito.spy(new CoreServiceAdapter());
+        component.oltMeterService = Mockito.mock(OltMeterService.class);
+        component.flowObjectiveService = Mockito.mock(FlowObjectiveService.class);
+        component.hostService = Mockito.mock(HostService.class);
+        component.flowRuleService = Mockito.mock(FlowRuleService.class);
+        component.storageService = new TestStorageService();
+        component.oltDeviceService = Mockito.mock(OltDeviceService.class);
+        component.appId = testAppId;
+        component.deviceService = Mockito.mock(DeviceService.class);
 
         doReturn(Mockito.mock(BaseInformationService.class))
-                .when(oltFlowService.sadisService).getSubscriberInfoService();
-        doReturn(testAppId).when(oltFlowService.coreService).registerApplication("org.opencord.olt");
-        oltFlowService.activate(null);
-        oltFlowService.bindSadisService(oltFlowService.sadisService);
+                .when(component.sadisService).getSubscriberInfoService();
+        doReturn(testAppId).when(component.coreService).registerApplication("org.opencord.olt");
+        component.activate(null);
+        component.bindSadisService(component.sadisService);
 
-        internalFlowListener = spy(oltFlowService.internalFlowListener);
+        internalFlowListener = spy(component.internalFlowListener);
+
+        oltFlowService = spy(component);
     }
 
     @After
     public void tearDown() {
-        oltFlowService.deactivate(null);
+        component.deactivate(null);
     }
 
     @Test
@@ -149,25 +155,25 @@
         ServiceKey sk3 = new ServiceKey(new AccessDevicePort(port3), new UniTagInformation());
 
         // cpStatus map for the test
-        oltFlowService.cpStatus = oltFlowService.storageService.
+        component.cpStatus = component.storageService.
                 <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
         OltPortStatus cp1Status = new OltPortStatus(PENDING_ADD, NONE, NONE);
-        oltFlowService.cpStatus.put(sk1, cp1Status);
+        component.cpStatus.put(sk1, cp1Status);
 
         //check that we only update the provided value
-        oltFlowService.updateConnectPointStatus(sk1, ADDED, null, null);
-        OltPortStatus updated = oltFlowService.cpStatus.get(sk1);
+        component.updateConnectPointStatus(sk1, ADDED, null, null);
+        OltPortStatus updated = component.cpStatus.get(sk1);
         Assert.assertEquals(ADDED, updated.defaultEapolStatus);
         Assert.assertEquals(NONE, updated.subscriberFlowsStatus);
         Assert.assertEquals(NONE, updated.dhcpStatus);
 
         // check that it creates an entry if it does not exist
-        oltFlowService.updateConnectPointStatus(sk2, PENDING_ADD, NONE, NONE);
-        Assert.assertNotNull(oltFlowService.cpStatus.get(sk2));
+        component.updateConnectPointStatus(sk2, PENDING_ADD, NONE, NONE);
+        Assert.assertNotNull(component.cpStatus.get(sk2));
 
         // check that if we create a new entry with null values they're converted to NONE
-        oltFlowService.updateConnectPointStatus(sk3, null, null, null);
-        updated = oltFlowService.cpStatus.get(sk3);
+        component.updateConnectPointStatus(sk3, null, null, null);
+        updated = component.cpStatus.get(sk3);
         Assert.assertEquals(NONE, updated.defaultEapolStatus);
         Assert.assertEquals(NONE, updated.subscriberFlowsStatus);
         Assert.assertEquals(NONE, updated.dhcpStatus);
@@ -191,17 +197,17 @@
         ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), new UniTagInformation());
 
         // cpStatus map for the test
-        oltFlowService.cpStatus = oltFlowService.storageService.
+        component.cpStatus = component.storageService.
                 <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
 
         // check that an entry is not created if the only status is pending remove
-        oltFlowService.updateConnectPointStatus(sk1, null, null, PENDING_REMOVE);
-        OltPortStatus entry = oltFlowService.cpStatus.get(sk1);
+        component.updateConnectPointStatus(sk1, null, null, PENDING_REMOVE);
+        OltPortStatus entry = component.cpStatus.get(sk1);
         Assert.assertNull(entry);
 
         // check that an entry is not created if the only status is ERROR
-        oltFlowService.updateConnectPointStatus(sk1, null, null, ERROR);
-        entry = oltFlowService.cpStatus.get(sk1);
+        component.updateConnectPointStatus(sk1, null, null, ERROR);
+        entry = component.cpStatus.get(sk1);
         Assert.assertNull(entry);
     }
 
@@ -216,7 +222,7 @@
         Port port = new DefaultPort(device, PortNumber.portNumber(16), true,
                                     DefaultAnnotations.builder().set(PORT_NAME, "name-1").build());
         ServiceKey skWithStatus = new ServiceKey(new AccessDevicePort(port),
-                oltFlowService.defaultEapolUniTag);
+                component.defaultEapolUniTag);
 
         Port port17 = new DefaultPort(device, PortNumber.portNumber(17), true,
                                       DefaultAnnotations.builder().set(PORT_NAME, "name-1").build());
@@ -233,12 +239,12 @@
                 null
         );
 
-        oltFlowService.cpStatus.put(skWithStatus, portStatusAdded);
-        Assert.assertTrue(oltFlowService.hasDefaultEapol(port));
+        component.cpStatus.put(skWithStatus, portStatusAdded);
+        Assert.assertTrue(component.hasDefaultEapol(port));
 
-        oltFlowService.cpStatus.put(skWithStatus, portStatusRemoved);
+        component.cpStatus.put(skWithStatus, portStatusRemoved);
 
-        Assert.assertFalse(oltFlowService.hasDefaultEapol(port17));
+        Assert.assertFalse(component.hasDefaultEapol(port17));
     }
 
     @Test
@@ -275,19 +281,19 @@
                 ADDED
         );
 
-        oltFlowService.cpStatus.put(skWithStatus, withDefaultEapol);
-        Assert.assertFalse(oltFlowService.hasSubscriberFlows(port, uti));
+        component.cpStatus.put(skWithStatus, withDefaultEapol);
+        Assert.assertFalse(component.hasSubscriberFlows(port, uti));
 
-        oltFlowService.cpStatus.put(skWithStatus, withDhcp);
-        Assert.assertTrue(oltFlowService.hasDhcpFlows(port, uti));
+        component.cpStatus.put(skWithStatus, withDhcp);
+        Assert.assertTrue(component.hasDhcpFlows(port, uti));
 
-        oltFlowService.cpStatus.put(skWithStatus, withSubFlow);
-        Assert.assertTrue(oltFlowService.hasSubscriberFlows(port, uti));
+        component.cpStatus.put(skWithStatus, withSubFlow);
+        Assert.assertTrue(component.hasSubscriberFlows(port, uti));
     }
 
     @Test
     public void testHandleBasicPortFlowsNoEapol() throws Exception {
-        oltFlowService.enableEapol = false;
+        component.enableEapol = false;
         // create empty service for testing
         List<UniTagInformation> uniTagInformationList = new LinkedList<>();
         UniTagInformation empty = new UniTagInformation.Builder().build();
@@ -300,10 +306,10 @@
                 new DiscoveredSubscriber(testDevice,
                                          uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
                                          false, si);
-        oltFlowService.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+        component.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
         // if eapol is not enabled there's nothing we need to do,
         // so make sure we don't even call sadis
-        verify(oltFlowService.subsService, never()).get(any());
+        verify(component.subsService, never()).get(any());
     }
 
     @Test
@@ -320,14 +326,14 @@
                                          false, si);
         // whether the meter is pending or not is up to the createMeter method to handle
         // we just don't proceed with the subscriber till it's ready
-        doReturn(false).when(oltFlowService.oltMeterService)
+        doReturn(false).when(component.oltMeterService)
                 .createMeter(addedSub.device.id(), DEFAULT_BP_ID_DEFAULT);
-        boolean res = oltFlowService.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+        boolean res = component.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
 
         Assert.assertFalse(res);
 
         // we do not create flows
-        verify(oltFlowService.flowObjectiveService, never())
+        verify(component.flowObjectiveService, never())
                 .filter(eq(addedSub.device.id()), any());
     }
 
@@ -345,11 +351,11 @@
                                          false, si);
         // this is the happy case, we have the meter so we check that the default EAPOL flow
         // is installed
-        doReturn(true).when(oltFlowService.oltMeterService)
+        doReturn(true).when(component.oltMeterService)
                 .createMeter(deviceId, DEFAULT_BP_ID_DEFAULT);
-        doReturn(true).when(oltFlowService.oltMeterService)
+        doReturn(true).when(component.oltMeterService)
                 .hasMeterByBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
-        doReturn(MeterId.meterId(1)).when(oltFlowService.oltMeterService)
+        doReturn(MeterId.meterId(1)).when(component.oltMeterService)
                 .getMeterIdForBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
@@ -361,9 +367,9 @@
                 .withMeta(
                         DefaultTrafficTreatment.builder()
                                 .meter(MeterId.meterId(1))
-                                .writeMetadata(oltFlowService.createTechProfValueForWriteMetadata(
+                                .writeMetadata(component.createTechProfValueForWriteMetadata(
                                         VlanId.vlanId(eapolDefaultVlan),
-                                        oltFlowService.defaultTechProfileId, MeterId.meterId(1)), 0)
+                                        component.defaultTechProfileId, MeterId.meterId(1)), 0)
                                 .setOutput(PortNumber.CONTROLLER)
                                 .pushVlan()
                                 .setVlanId(VlanId.vlanId(eapolDefaultVlan)).build()
@@ -371,7 +377,7 @@
                 .add();
 
 
-        oltFlowService.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+        component.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
 
         // we check for an existing meter (present)
         // FIXME understand why the above test invokes this call and this one doesn't
@@ -379,12 +385,12 @@
 //                .hasMeterByBandwidthProfile(eq(addedSub.device.id()), eq(DEFAULT_BP_ID_DEFAULT));
 
         // the meter exist, no need to check for PENDING or to create it
-        verify(oltFlowService.oltMeterService, never())
+        verify(component.oltMeterService, never())
                 .hasPendingMeterByBandwidthProfile(eq(deviceId), eq(DEFAULT_BP_ID_DEFAULT));
-        verify(oltFlowService.oltMeterService, never())
+        verify(component.oltMeterService, never())
                 .createMeterForBp(eq(deviceId), eq(DEFAULT_BP_ID_DEFAULT));
 
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
     }
 
@@ -402,7 +408,7 @@
                                          false, si);
         // we are testing that when a port goes down we remove the default EAPOL flow
 
-        doReturn(MeterId.meterId(1)).when(oltFlowService.oltMeterService)
+        doReturn(MeterId.meterId(1)).when(component.oltMeterService)
                 .getMeterIdForBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
@@ -414,25 +420,25 @@
                 .withMeta(
                         DefaultTrafficTreatment.builder()
                                 .meter(MeterId.meterId(1))
-                                .writeMetadata(oltFlowService.createTechProfValueForWriteMetadata(
+                                .writeMetadata(component.createTechProfValueForWriteMetadata(
                                         VlanId.vlanId(eapolDefaultVlan),
-                                        oltFlowService.defaultTechProfileId, MeterId.meterId(1)), 0)
+                                        component.defaultTechProfileId, MeterId.meterId(1)), 0)
                                 .setOutput(PortNumber.CONTROLLER)
                                 .pushVlan()
                                 .setVlanId(VlanId.vlanId(eapolDefaultVlan)).build()
                 )
                 .add();
 
-        oltFlowService.handleBasicPortFlows(removedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+        component.handleBasicPortFlows(removedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
 
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
     }
 
     @Test
     public void testHandleNniFlowsOnlyLldp() {
-        oltFlowService.enableDhcpOnNni = false;
-        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+        component.enableDhcpOnNni = false;
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
                 .permit()
@@ -443,17 +449,17 @@
                 .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
                 .add();
 
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), any());
     }
 
     @Test
     public void testHandleNniFlowsDhcpV4() {
-        oltFlowService.enableDhcpOnNni = true;
-        oltFlowService.enableDhcpV4 = true;
-        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+        component.enableDhcpOnNni = true;
+        component.enableDhcpV4 = true;
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
                 .permit()
@@ -468,18 +474,18 @@
                 .add();
 
         // invoked with the correct DHCP filtering objective
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
         // invoked only twice, LLDP and DHCP
-        verify(oltFlowService.flowObjectiveService, times(2))
+        verify(component.flowObjectiveService, times(2))
                 .filter(eq(deviceId), any());
     }
 
     @Test
     public void testRemoveNniFlowsDhcpV4() {
-        oltFlowService.enableDhcpOnNni = true;
-        oltFlowService.enableDhcpV4 = true;
-        oltFlowService.handleNniFlows(testDevice, nniPortDisabled, OltFlowService.FlowOperation.REMOVE);
+        component.enableDhcpOnNni = true;
+        component.enableDhcpV4 = true;
+        component.handleNniFlows(testDevice, nniPortDisabled, OltFlowService.FlowOperation.REMOVE);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
                 .deny()
@@ -494,19 +500,19 @@
                 .add();
 
         // invoked with the correct DHCP filtering objective
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
         // invoked only twice, LLDP and DHCP
-        verify(oltFlowService.flowObjectiveService, times(2))
+        verify(component.flowObjectiveService, times(2))
                 .filter(eq(deviceId), any());
     }
 
     @Test
     public void testHandleNniFlowsDhcpV6() {
-        oltFlowService.enableDhcpOnNni = true;
-        oltFlowService.enableDhcpV4 = false;
-        oltFlowService.enableDhcpV6 = true;
-        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+        component.enableDhcpOnNni = true;
+        component.enableDhcpV4 = false;
+        component.enableDhcpV6 = true;
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
                 .permit()
@@ -521,18 +527,18 @@
                 .add();
 
         // invoked with the correct DHCP filtering objective
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
         // invoked only twice, LLDP and DHCP
-        verify(oltFlowService.flowObjectiveService, times(2))
+        verify(component.flowObjectiveService, times(2))
                 .filter(eq(deviceId), any());
     }
 
     @Test
     public void testHandleNniFlowsIgmp() {
-        oltFlowService.enableDhcpOnNni = false;
-        oltFlowService.enableIgmpOnNni = true;
-        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+        component.enableDhcpOnNni = false;
+        component.enableIgmpOnNni = true;
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
 
         FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
                 .permit()
@@ -544,10 +550,10 @@
                 .add();
 
         // invoked with the correct DHCP filtering objective
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
         // invoked only twice, LLDP and DHCP
-        verify(oltFlowService.flowObjectiveService, times(2))
+        verify(component.flowObjectiveService, times(2))
                 .filter(eq(deviceId), any());
     }
 
@@ -562,7 +568,7 @@
         SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
         si.setUniTagList(uniTagInformationList);
 
-        boolean isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        boolean isMacAvailable = component.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
         // we return true as we don't care wether it's available or not
         Assert.assertTrue(isMacAvailable);
     }
@@ -582,16 +588,16 @@
         si.setUniTagList(uniTagInformationList);
 
         // with no hosts discovered, return false
-        boolean isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        boolean isMacAvailable = component.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
         Assert.assertFalse(isMacAvailable);
 
         // with a discovered host, return true
         Host fakeHost = new DefaultHost(ProviderId.NONE, HostId.hostId(MacAddress.NONE), MacAddress.ZERO,
                 hsiaCtag, HostLocation.NONE, new HashSet<>(), DefaultAnnotations.builder().build());
         Set<Host> hosts = new HashSet<>(Arrays.asList(fakeHost));
-        doReturn(hosts).when(oltFlowService.hostService).getConnectedHosts((ConnectPoint) any());
+        doReturn(hosts).when(component.hostService).getConnectedHosts((ConnectPoint) any());
 
-        isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        isMacAvailable = component.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
         Assert.assertTrue(isMacAvailable);
     }
 
@@ -606,7 +612,7 @@
         SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
         si.setUniTagList(uniTagInformationList);
 
-        boolean isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        boolean isMacAvailable = component.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
         Assert.assertTrue(isMacAvailable);
     }
 
@@ -615,7 +621,7 @@
 
         String usBp = "usBp";
         String usOltBp = "usOltBp";
-        oltFlowService.enableDhcpV4 = true;
+        component.enableDhcpV4 = true;
 
         // create two services, one requires DHCP the other doesn't
         List<UniTagInformation> uniTagInformationList = new LinkedList<>();
@@ -641,9 +647,9 @@
                                          false, si);
 
         // return meter IDs
-        doReturn(MeterId.meterId(2)).when(oltFlowService.oltMeterService)
+        doReturn(MeterId.meterId(2)).when(component.oltMeterService)
                 .getMeterIdForBandwidthProfile(addedSub.device.id(), usBp);
-        doReturn(MeterId.meterId(3)).when(oltFlowService.oltMeterService)
+        doReturn(MeterId.meterId(3)).when(component.oltMeterService)
                 .getMeterIdForBandwidthProfile(addedSub.device.id(), usOltBp);
 
         // TODO improve the matches on the filter
@@ -658,15 +664,15 @@
                 .withPriority(10000)
                 .add();
 
-        oltFlowService.handleSubscriberDhcpFlows(addedSub.device.id(), addedSub.port,
+        component.handleSubscriberDhcpFlows(addedSub.device.id(), addedSub.port,
                 OltFlowService.FlowOperation.ADD, si);
-        verify(oltFlowService.flowObjectiveService, times(1))
+        verify(component.flowObjectiveService, times(1))
                 .filter(eq(addedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
     }
 
     @Test
     public void testInternalFlowListenerNotMaster() {
-        doReturn(false).when(oltFlowService.oltDeviceService).isLocalLeader(any());
+        doReturn(false).when(component.oltDeviceService).isLocalLeader(any());
 
         FlowRule flowRule = DefaultFlowRule.builder()
                 .forDevice(DeviceId.deviceId("foo"))
@@ -700,4 +706,61 @@
         // if we're not master of the device, we should not update
         verify(internalFlowListener, never()).updateCpStatus(any(), any(), any());
     }
+
+    @Test
+    public void testRemoveSubscriberFlows() {
+        // test that if we have EAPOL we wait till the tagged flow is removed
+        // before installing the default one
+
+        // setup
+        component.enableEapol = true;
+
+        // mock data
+        DeviceId deviceId = DeviceId.deviceId("test-device");
+        ProviderId pid = new ProviderId("of", "foo");
+        Device device =
+                new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
+        Port port = new DefaultPort(device, PortNumber.portNumber(1), true,
+                DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation hsia = new UniTagInformation.Builder()
+                .setUpstreamBandwidthProfile("usbp")
+                .setDownstreamBandwidthProfile("dsbp")
+                .setPonCTag(VlanId.vlanId((short) 900)).build();
+        uniTagInformationList.add(hsia);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        DiscoveredSubscriber sub = new DiscoveredSubscriber(
+                device, port, DiscoveredSubscriber.Status.REMOVED, true, si);
+
+        // first test that when we remove the EAPOL flow we return false so that the
+        // subscriber is not removed from the queue
+        doReturn(true).when(oltFlowService).areSubscriberFlowsPendingRemoval(any(), any());
+        boolean res = oltFlowService.removeSubscriberFlows(sub, DEFAULT_BP_ID_DEFAULT, DEFAULT_MCAST_SERVICE_NAME);
+        verify(oltFlowService, times(1))
+                .handleSubscriberDhcpFlows(deviceId, port, OltFlowService.FlowOperation.REMOVE, si);
+        verify(oltFlowService, times(1))
+                .handleSubscriberEapolFlows(sub, OltFlowService.FlowOperation.REMOVE, si);
+        verify(oltFlowService, times(1))
+                .handleSubscriberDataFlows(device, port, OltFlowService.FlowOperation.REMOVE,
+                        si, DEFAULT_MCAST_SERVICE_NAME);
+        verify(oltFlowService, times(1))
+                .handleSubscriberIgmpFlows(sub, OltFlowService.FlowOperation.REMOVE);
+        verify(oltFlowService, never())
+                .handleEapolFlow(any(), any(), any(),
+                        eq(OltFlowService.FlowOperation.ADD), eq(VlanId.vlanId(OltFlowService.EAPOL_DEFAULT_VLAN)));
+        Assert.assertFalse(res);
+
+        // then test that if the tagged EAPOL is not there we install the default EAPOL
+        // and return true so we remove the subscriber from the queue
+        doReturn(false).when(oltFlowService).areSubscriberFlowsPendingRemoval(any(), any());
+        doReturn(port).when(oltFlowService.deviceService).getPort(deviceId, port.number());
+        res = oltFlowService.removeSubscriberFlows(sub, DEFAULT_BP_ID_DEFAULT, DEFAULT_MCAST_SERVICE_NAME);
+        verify(oltFlowService, times(1))
+                .handleEapolFlow(any(), any(), any(),
+                        eq(OltFlowService.FlowOperation.ADD), eq(VlanId.vlanId(OltFlowService.EAPOL_DEFAULT_VLAN)));
+        Assert.assertTrue(res);
+    }
 }
\ No newline at end of file