[VOL-4246] Feature parity with the previous implementation

Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java b/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
deleted file mode 100644
index 1f682cb..0000000
--- a/impl/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Copyright 2020-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.impl;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.net.DeviceId;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.equalTo;
-
-public class ConsistentHasherTest {
-
-    private static final int WEIGHT = 10;
-
-    private static final NodeId N1 = new NodeId("10.0.0.1");
-    private static final NodeId N2 = new NodeId("10.0.0.2");
-    private static final NodeId N3 = new NodeId("10.0.0.3");
-
-    private ConsistentHasher hasher;
-
-    @Before
-    public void setUp() {
-        List<NodeId> servers = new ArrayList<>();
-        servers.add(N1);
-        servers.add(N2);
-
-        hasher = new ConsistentHasher(servers, WEIGHT);
-    }
-
-    @Test
-    public void testHasher() {
-        DeviceId deviceId = DeviceId.deviceId("foo");
-        NodeId server = hasher.hash(deviceId.toString());
-
-        assertThat(server, equalTo(N1));
-
-        deviceId = DeviceId.deviceId("bsaf");
-        server = hasher.hash(deviceId.toString());
-
-        assertThat(server, equalTo(N2));
-    }
-
-    @Test
-    public void testAddServer() {
-        DeviceId deviceId = DeviceId.deviceId("foo");
-        NodeId server = hasher.hash(deviceId.toString());
-
-        assertThat(server, equalTo(N1));
-
-        hasher.addServer(N3);
-
-        server = hasher.hash(deviceId.toString());
-
-        assertThat(server, equalTo(N3));
-    }
-}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java b/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
new file mode 100644
index 0000000..f8229da
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltDeviceListenerTest.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.onlab.packet.ChassisId;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.AnnotationKeys;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.device.DeviceEvent;
+import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.provider.ProviderId;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class OltDeviceListenerTest extends OltTestHelpers {
+    private Olt olt;
+    private Olt.OltDeviceListener oltDeviceListener;
+
+    private final DeviceId deviceId = DeviceId.deviceId("test-device");
+    private final Device testDevice = new DefaultDevice(ProviderId.NONE, deviceId, Device.Type.OLT,
+            "testManufacturer", "1.0", "1.0", "SN", new ChassisId(1));
+
+    @Before
+    public void setUp() {
+        olt = new Olt();
+        olt.eventsQueues = new HashMap<>();
+        olt.mastershipService = Mockito.mock(MastershipService.class);
+        olt.oltDeviceService = Mockito.mock(OltDeviceService.class);
+        olt.oltFlowService = Mockito.mock(OltFlowService.class);
+        olt.oltMeterService = Mockito.mock(OltMeterService.class);
+        olt.deviceService = Mockito.mock(DeviceService.class);
+        olt.leadershipService = Mockito.mock(LeadershipService.class);
+        olt.clusterService = Mockito.mock(ClusterService.class);
+        olt.subsService = Mockito.mock(BaseInformationService.class);
+
+        Olt.OltDeviceListener baseClass = olt.deviceListener;
+        baseClass.eventExecutor = Mockito.mock(ExecutorService.class);
+        oltDeviceListener = Mockito.spy(baseClass);
+
+        // mock the executor so it immediately invokes the method
+        doAnswer(new Answer<Object>() {
+            public Object answer(InvocationOnMock invocation) throws Exception {
+                ((Runnable) invocation.getArguments()[0]).run();
+                return null;
+            }
+        }).when(baseClass.eventExecutor).execute(any(Runnable.class));
+
+        olt.eventsQueues.forEach((cp, q) -> q.clear());
+    }
+
+    @Test
+    public void testDeviceDisconnection() {
+        doReturn(true).when(olt.oltDeviceService).isOlt(testDevice);
+        doReturn(false).when(olt.deviceService).isAvailable(any());
+        doReturn(new LinkedList<Port>()).when(olt.deviceService).getPorts(any());
+        doReturn(true).when(olt.oltDeviceService).isLocalLeader(any());
+
+        DeviceEvent disconnect = new DeviceEvent(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED, testDevice, null);
+        oltDeviceListener.event(disconnect);
+
+        verify(olt.oltFlowService, times(1)).purgeDeviceFlows(testDevice.id());
+        verify(olt.oltMeterService, times(1)).purgeDeviceMeters(testDevice.id());
+    }
+
+    @Test
+    public void testPortEventOwnership() {
+        // make sure that we ignore events for devices that are not local to this node
+
+        // make sure the device is recognized as an OLT and the port is not an NNI
+        doReturn(true).when(olt.oltDeviceService).isOlt(testDevice);
+        doReturn(false).when(olt.oltDeviceService).isNniPort(eq(testDevice), any());
+
+        // make sure we're not leaders of the device
+        doReturn(false).when(olt.oltDeviceService).isLocalLeader(any());
+
+        // this is a new port, should not create an entry in the queue
+        // we're not owners of the device
+        Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
+                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1").build());
+        DeviceEvent uniUpdateEnabledEvent =
+                new DeviceEvent(DeviceEvent.Type.PORT_UPDATED, testDevice, uniUpdateEnabled);
+        oltDeviceListener.event(uniUpdateEnabledEvent);
+
+        // the queue won't even be created
+        assert olt.eventsQueues.isEmpty();
+    }
+
+    @Test
+    public void testNniEvent() throws InterruptedException {
+        // make sure the device is recognized as an OLT and the port is recognized as an NNI,
+        // and we're local leaders
+        doReturn(true).when(olt.oltDeviceService).isOlt(testDevice);
+        doReturn(true).when(olt.oltDeviceService).isNniPort(eq(testDevice), any());
+        doReturn(true).when(olt.oltDeviceService).isLocalLeader(any());
+
+        Port enabledNniPort = new OltPort(testDevice, true, PortNumber.portNumber(1048576),
+                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "nni-1").build());
+        DeviceEvent nniEnabledEvent = new DeviceEvent(DeviceEvent.Type.PORT_ADDED, testDevice, enabledNniPort);
+        oltDeviceListener.event(nniEnabledEvent);
+
+        // NNI events are straight forward, we can provision the flows directly
+        assert olt.eventsQueues.isEmpty();
+        verify(olt.oltFlowService, times(1))
+                .handleNniFlows(testDevice, enabledNniPort, OltFlowService.FlowOperation.ADD);
+
+        Port disabledNniPort = new OltPort(testDevice, false, PortNumber.portNumber(1048576),
+                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "nni-1").build());
+        DeviceEvent nniDisabledEvent = new DeviceEvent(DeviceEvent.Type.PORT_UPDATED, testDevice, disabledNniPort);
+        oltDeviceListener.event(nniDisabledEvent);
+
+        assert olt.eventsQueues.isEmpty();
+        verify(olt.oltFlowService, times(1))
+                .handleNniFlows(testDevice, disabledNniPort, OltFlowService.FlowOperation.REMOVE);
+
+        // when we disable the device we receive a PORT_REMOVED event with status ENABLED
+        // make sure we're removing the flows correctly
+        DeviceEvent nniRemoveEvent = new DeviceEvent(DeviceEvent.Type.PORT_REMOVED, testDevice, enabledNniPort);
+        oltDeviceListener.event(nniRemoveEvent);
+
+        assert olt.eventsQueues.isEmpty();
+        verify(olt.oltFlowService, times(1))
+                .handleNniFlows(testDevice, enabledNniPort, OltFlowService.FlowOperation.REMOVE);
+    }
+
+    @Test
+    public void testUniEvents() {
+        DiscoveredSubscriber sub;
+        // there are few cases we need to test in the UNI port case:
+        // - [X] UNI port added in disabled state
+        // - [X] UNI port added in disabled state (with default EAPOL installed)
+        // - UNI port added in enabled state
+        // - [X] UNI port updated to enabled state
+        // - UNI port updated to disabled state
+        // - UNI port removed (assumes it's disabled state)
+
+        // make sure the device is recognized as an OLT, the port is not an NNI,
+        // and we're local masters
+        doReturn(true).when(olt.oltDeviceService).isOlt(testDevice);
+        doReturn(false).when(olt.oltDeviceService).isNniPort(eq(testDevice), any());
+        doReturn(true).when(olt.oltDeviceService).isLocalLeader(any());
+
+        PortNumber uniPortNumber = PortNumber.portNumber(16);
+        Port uniAddedDisabled = new OltPort(testDevice, false, uniPortNumber,
+                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1").build());
+        DeviceEvent uniAddedDisabledEvent = new DeviceEvent(DeviceEvent.Type.PORT_ADDED, testDevice, uniAddedDisabled);
+        ConnectPoint cp = new ConnectPoint(testDevice.id(), uniPortNumber);
+
+        // if the port does not have default EAPOL we should not generate an event
+        oltDeviceListener.event(uniAddedDisabledEvent);
+
+        // no event == no queue is created
+        assert olt.eventsQueues.isEmpty();
+
+        // if the port has default EAPOL then create an entry in the queue to remove it
+        doReturn(true).when(olt.oltFlowService)
+                .hasDefaultEapol(uniAddedDisabled);
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        doReturn(si).when(olt.subsService).get("uni-1");
+
+        oltDeviceListener.event(uniAddedDisabledEvent);
+        LinkedBlockingQueue<DiscoveredSubscriber> q = olt.eventsQueues.get(cp);
+        assert !q.isEmpty();
+        sub = q.poll();
+        assert !sub.hasSubscriber; // this is not a provision subscriber call
+        assert sub.device.equals(testDevice);
+        assert sub.port.equals(uniAddedDisabled);
+        assert sub.status.equals(DiscoveredSubscriber.Status.REMOVED); // we need to remove flows for this port (if any)
+        assert q.isEmpty(); // the queue is now empty
+
+        Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
+                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1").build());
+        DeviceEvent uniUpdateEnabledEvent =
+                new DeviceEvent(DeviceEvent.Type.PORT_UPDATED, testDevice, uniUpdateEnabled);
+        oltDeviceListener.event(uniUpdateEnabledEvent);
+
+        assert !q.isEmpty();
+        sub = q.poll();
+        assert !sub.hasSubscriber; // this is not a provision subscriber call
+        assert sub.device.equals(testDevice);
+        assert sub.port.equals(uniUpdateEnabled);
+        assert sub.status.equals(DiscoveredSubscriber.Status.ADDED); // we need to remove flows for this port (if any)
+        assert q.isEmpty(); // the queue is now empty
+    }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltDeviceServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltDeviceServiceTest.java
new file mode 100644
index 0000000..5623896
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltDeviceServiceTest.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
+import org.opencord.sadis.SadisService;
+
+import java.util.LinkedList;
+
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+
+public class OltDeviceServiceTest {
+    OltDeviceService component;
+    private OltDeviceService oltDeviceService;
+
+    @Before
+    public void setUp() {
+        component = new OltDeviceService();
+        component.mastershipService = Mockito.mock(MastershipService.class);
+        component.deviceService = Mockito.mock(DeviceService.class);
+        component.leadershipService = Mockito.mock(LeadershipService.class);
+        component.clusterService = Mockito.mock(ClusterService.class);
+        component.sadisService = Mockito.mock(SadisService.class);
+        component.activate();
+
+        oltDeviceService = Mockito.spy(component);
+
+
+    }
+
+    @Test
+    public void testIsLocalLeader() {
+
+        NodeId nodeId = NodeId.nodeId("node1");
+        ControllerNode localNode = new DefaultControllerNode(nodeId, "host1");
+        DeviceId deviceId1 = DeviceId.deviceId("availableNotLocal");
+        DeviceId deviceId2 = DeviceId.deviceId("notAvailableButLocal");
+        Leadership leadership = new Leadership(deviceId2.toString(), new Leader(nodeId, 0, 0), new LinkedList<>());
+
+        doReturn(true).when(oltDeviceService.deviceService).isAvailable(eq(deviceId1));
+        doReturn(false).when(oltDeviceService.mastershipService).isLocalMaster(eq(deviceId1));
+        Assert.assertFalse(oltDeviceService.isLocalLeader(deviceId1));
+
+        doReturn(false).when(oltDeviceService.deviceService).isAvailable(eq(deviceId1));
+        doReturn(localNode).when(oltDeviceService.clusterService).getLocalNode();
+        doReturn(leadership).when(oltDeviceService.leadershipService).runForLeadership(eq(deviceId2.toString()));
+        Assert.assertTrue(oltDeviceService.isLocalLeader(deviceId2));
+
+    }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
new file mode 100644
index 0000000..c9bad5f
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -0,0 +1,669 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onlab.packet.ChassisId;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.IPv6;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.TpPort;
+import org.onlab.packet.VlanId;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultHost;
+import org.onosproject.net.DefaultPort;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.DefaultFlowRule;
+import org.onosproject.net.flow.DefaultTrafficTreatment;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.FlowRuleService;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.host.HostService;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.onosproject.net.AnnotationKeys.PORT_NAME;
+import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.NONE;
+import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.ADDED;
+import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.PENDING_ADD;
+import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.REMOVED;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+
+public class OltFlowServiceTest extends OltTestHelpers {
+
+    private OltFlowService oltFlowService;
+    OltFlowService.InternalFlowListener internalFlowListener;
+    private final ApplicationId testAppId = new DefaultApplicationId(1, "org.opencord.olt.test");
+    private final short eapolDefaultVlan = 4091;
+
+    private final DeviceId deviceId = DeviceId.deviceId("test-device");
+    private final Device testDevice = new DefaultDevice(ProviderId.NONE, deviceId, Device.Type.OLT,
+            "testManufacturer", "1.0", "1.0", "SN", new ChassisId(1));
+    Port nniPort = new OltPort(testDevice, true, PortNumber.portNumber(1048576),
+            DefaultAnnotations.builder().set(PORT_NAME, "nni-1").build());
+    Port nniPortDisabled = new OltPort(testDevice, false, PortNumber.portNumber(1048576),
+            DefaultAnnotations.builder().set(PORT_NAME, "nni-1").build());
+    Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
+            DefaultAnnotations.builder().set(PORT_NAME, "uni-1").build());
+
+    @Before
+    public void setUp() {
+        oltFlowService = new OltFlowService();
+        oltFlowService.cfgService = new ComponentConfigAdapter();
+        oltFlowService.sadisService = Mockito.mock(SadisService.class);
+        oltFlowService.coreService = Mockito.spy(new CoreServiceAdapter());
+        oltFlowService.oltMeterService = Mockito.mock(OltMeterService.class);
+        oltFlowService.flowObjectiveService = Mockito.mock(FlowObjectiveService.class);
+        oltFlowService.hostService = Mockito.mock(HostService.class);
+        oltFlowService.flowRuleService = Mockito.mock(FlowRuleService.class);
+        oltFlowService.storageService = new TestStorageService();
+        oltFlowService.oltDeviceService = Mockito.mock(OltDeviceService.class);
+        oltFlowService.appId = testAppId;
+
+        doReturn(Mockito.mock(BaseInformationService.class))
+                .when(oltFlowService.sadisService).getSubscriberInfoService();
+        doReturn(testAppId).when(oltFlowService.coreService).registerApplication("org.opencord.olt");
+        oltFlowService.activate(null);
+        oltFlowService.bindSadisService(oltFlowService.sadisService);
+
+        internalFlowListener = spy(oltFlowService.internalFlowListener);
+    }
+
+    @After
+    public void tearDown() {
+        oltFlowService.deactivate(null);
+    }
+
+    @Test
+    public void testUpdateConnectPointStatus() {
+
+        DeviceId deviceId = DeviceId.deviceId("test-device");
+        ProviderId pid = new ProviderId("of", "foo");
+        Device device =
+                new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
+        Port port1 = new DefaultPort(device, PortNumber.portNumber(1), true,
+                DefaultAnnotations.builder().set(PORT_NAME, "port-1").build());
+        Port port2 = new DefaultPort(device, PortNumber.portNumber(2), true,
+                DefaultAnnotations.builder().set(PORT_NAME, "port-2").build());
+        Port port3 = new DefaultPort(device, PortNumber.portNumber(3), true,
+                DefaultAnnotations.builder().set(PORT_NAME, "port-3").build());
+
+        ServiceKey sk1 = new ServiceKey(new AccessDevicePort(port1), new UniTagInformation());
+        ServiceKey sk2 = new ServiceKey(new AccessDevicePort(port2), new UniTagInformation());
+        ServiceKey sk3 = new ServiceKey(new AccessDevicePort(port3), new UniTagInformation());
+
+        // cpStatus map for the test
+        oltFlowService.cpStatus = oltFlowService.storageService.
+                <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
+        OltPortStatus cp1Status = new OltPortStatus(PENDING_ADD, NONE, NONE);
+        oltFlowService.cpStatus.put(sk1, cp1Status);
+
+        //check that we only update the provided value
+        oltFlowService.updateConnectPointStatus(sk1, ADDED, null, null);
+        OltPortStatus updated = oltFlowService.cpStatus.get(sk1);
+        Assert.assertEquals(ADDED, updated.defaultEapolStatus);
+        Assert.assertEquals(NONE, updated.subscriberFlowsStatus);
+        Assert.assertEquals(NONE, updated.dhcpStatus);
+
+        // check that it creates an entry if it does not exist
+        oltFlowService.updateConnectPointStatus(sk2, PENDING_ADD, NONE, NONE);
+        Assert.assertNotNull(oltFlowService.cpStatus.get(sk2));
+
+        // check that if we create a new entry with null values they're converted to NONE
+        oltFlowService.updateConnectPointStatus(sk3, null, null, null);
+        updated = oltFlowService.cpStatus.get(sk3);
+        Assert.assertEquals(NONE, updated.defaultEapolStatus);
+        Assert.assertEquals(NONE, updated.subscriberFlowsStatus);
+        Assert.assertEquals(NONE, updated.dhcpStatus);
+    }
+
+    @Test
+    public void testHasDefaultEapol() {
+        DeviceId deviceId = DeviceId.deviceId("test-device");
+        ProviderId pid = new ProviderId("of", "foo");
+
+        Device device =
+                new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
+
+        Port port = new DefaultPort(device, PortNumber.portNumber(16), true,
+                                    DefaultAnnotations.builder().set(PORT_NAME, "name-1").build());
+        ServiceKey skWithStatus = new ServiceKey(new AccessDevicePort(port),
+                oltFlowService.defaultEapolUniTag);
+
+        Port port17 = new DefaultPort(device, PortNumber.portNumber(17), true,
+                                      DefaultAnnotations.builder().set(PORT_NAME, "name-1").build());
+
+        OltPortStatus portStatusAdded = new OltPortStatus(
+                OltFlowService.OltFlowsStatus.ADDED,
+                NONE,
+                null
+        );
+
+        OltPortStatus portStatusRemoved = new OltPortStatus(
+                REMOVED,
+                NONE,
+                null
+        );
+
+        oltFlowService.cpStatus.put(skWithStatus, portStatusAdded);
+        Assert.assertTrue(oltFlowService.hasDefaultEapol(port));
+
+        oltFlowService.cpStatus.put(skWithStatus, portStatusRemoved);
+
+        Assert.assertFalse(oltFlowService.hasDefaultEapol(port17));
+    }
+
+    @Test
+    public void testHasSubscriberFlows() {
+        // TODO test with multiple services
+        DeviceId deviceId = DeviceId.deviceId("test-device");
+        ProviderId pid = new ProviderId("of", "foo");
+
+        Device device =
+                new DefaultDevice(pid, deviceId, Device.Type.OLT, "", "", "", "", null);
+
+        Port port = new DefaultPort(device, PortNumber.portNumber(16), true,
+                DefaultAnnotations.builder().set(PORT_NAME, "name-1").build());
+
+        UniTagInformation uti = new UniTagInformation.Builder().setServiceName("test").build();
+        ServiceKey skWithStatus = new ServiceKey(new AccessDevicePort(port),
+                uti);
+
+        OltPortStatus withDefaultEapol = new OltPortStatus(
+                ADDED,
+                NONE,
+                NONE
+        );
+
+        OltPortStatus withDhcp = new OltPortStatus(
+                REMOVED,
+                NONE,
+                ADDED
+        );
+
+        OltPortStatus withSubFlow = new OltPortStatus(
+                REMOVED,
+                ADDED,
+                ADDED
+        );
+
+        oltFlowService.cpStatus.put(skWithStatus, withDefaultEapol);
+        Assert.assertFalse(oltFlowService.hasSubscriberFlows(port, uti));
+
+        oltFlowService.cpStatus.put(skWithStatus, withDhcp);
+        Assert.assertTrue(oltFlowService.hasDhcpFlows(port, uti));
+
+        oltFlowService.cpStatus.put(skWithStatus, withSubFlow);
+        Assert.assertTrue(oltFlowService.hasSubscriberFlows(port, uti));
+    }
+
+    @Test
+    public void testHandleBasicPortFlowsNoEapol() throws Exception {
+        oltFlowService.enableEapol = false;
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                                         false, si);
+        oltFlowService.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+        // if eapol is not enabled there's nothing we need to do,
+        // so make sure we don't even call sadis
+        verify(oltFlowService.subsService, never()).get(any());
+    }
+
+    @Test
+    public void testHandleBasicPortFlowsWithEapolNoMeter() throws Exception {
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                                         false, si);
+        // whether the meter is pending or not is up to the createMeter method to handle
+        // we just don't proceed with the subscriber till it's ready
+        doReturn(false).when(oltFlowService.oltMeterService)
+                .createMeter(addedSub.device.id(), DEFAULT_BP_ID_DEFAULT);
+        boolean res = oltFlowService.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+
+        Assert.assertFalse(res);
+
+        // we do not create flows
+        verify(oltFlowService.flowObjectiveService, never())
+                .filter(eq(addedSub.device.id()), any());
+    }
+
+    @Test
+    public void testHandleBasicPortFlowsWithEapolAddedMeter() throws Exception {
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                                         false, si);
+        // this is the happy case, we have the meter so we check that the default EAPOL flow
+        // is installed
+        doReturn(true).when(oltFlowService.oltMeterService)
+                .createMeter(deviceId, DEFAULT_BP_ID_DEFAULT);
+        doReturn(true).when(oltFlowService.oltMeterService)
+                .hasMeterByBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
+        doReturn(MeterId.meterId(1)).when(oltFlowService.oltMeterService)
+                .getMeterIdForBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(uniUpdateEnabled.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(
+                        DefaultTrafficTreatment.builder()
+                                .meter(MeterId.meterId(1))
+                                .writeMetadata(oltFlowService.createTechProfValueForWriteMetadata(
+                                        VlanId.vlanId(eapolDefaultVlan),
+                                        oltFlowService.defaultTechProfileId, MeterId.meterId(1)), 0)
+                                .setOutput(PortNumber.CONTROLLER)
+                                .pushVlan()
+                                .setVlanId(VlanId.vlanId(eapolDefaultVlan)).build()
+                )
+                .add();
+
+
+        oltFlowService.handleBasicPortFlows(addedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+
+        // we check for an existing meter (present)
+        // FIXME understand why the above test invokes this call and this one doesn't
+//        verify(oltFlowService.oltMeterService, times(1))
+//                .hasMeterByBandwidthProfile(eq(addedSub.device.id()), eq(DEFAULT_BP_ID_DEFAULT));
+
+        // the meter exist, no need to check for PENDING or to create it
+        verify(oltFlowService.oltMeterService, never())
+                .hasPendingMeterByBandwidthProfile(eq(deviceId), eq(DEFAULT_BP_ID_DEFAULT));
+        verify(oltFlowService.oltMeterService, never())
+                .createMeterForBp(eq(deviceId), eq(DEFAULT_BP_ID_DEFAULT));
+
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testHandleBasicPortFlowsRemovedSub() throws Exception {
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                                         false, si);
+        // we are testing that when a port goes down we remove the default EAPOL flow
+
+        doReturn(MeterId.meterId(1)).when(oltFlowService.oltMeterService)
+                .getMeterIdForBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .deny()
+                .withKey(Criteria.matchInPort(uniUpdateEnabled.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(
+                        DefaultTrafficTreatment.builder()
+                                .meter(MeterId.meterId(1))
+                                .writeMetadata(oltFlowService.createTechProfValueForWriteMetadata(
+                                        VlanId.vlanId(eapolDefaultVlan),
+                                        oltFlowService.defaultTechProfileId, MeterId.meterId(1)), 0)
+                                .setOutput(PortNumber.CONTROLLER)
+                                .pushVlan()
+                                .setVlanId(VlanId.vlanId(eapolDefaultVlan)).build()
+                )
+                .add();
+
+        oltFlowService.handleBasicPortFlows(removedSub, DEFAULT_BP_ID_DEFAULT, DEFAULT_BP_ID_DEFAULT);
+
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testHandleNniFlowsOnlyLldp() {
+        oltFlowService.enableDhcpOnNni = false;
+        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.LLDP.ethType()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testHandleNniFlowsDhcpV4() {
+        oltFlowService.enableDhcpOnNni = true;
+        oltFlowService.enableDhcpV4 = true;
+        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(68)))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct DHCP filtering objective
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(oltFlowService.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testRemoveNniFlowsDhcpV4() {
+        oltFlowService.enableDhcpOnNni = true;
+        oltFlowService.enableDhcpV4 = true;
+        oltFlowService.handleNniFlows(testDevice, nniPortDisabled, OltFlowService.FlowOperation.REMOVE);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .deny()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(68)))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct DHCP filtering objective
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(oltFlowService.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testHandleNniFlowsDhcpV6() {
+        oltFlowService.enableDhcpOnNni = true;
+        oltFlowService.enableDhcpV4 = false;
+        oltFlowService.enableDhcpV6 = true;
+        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV6.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv6.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(546)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(547)))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct DHCP filtering objective
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(oltFlowService.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testHandleNniFlowsIgmp() {
+        oltFlowService.enableDhcpOnNni = false;
+        oltFlowService.enableIgmpOnNni = true;
+        oltFlowService.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_IGMP))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .add();
+
+        // invoked with the correct DHCP filtering objective
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(oltFlowService.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testMacAddressNotRequired() {
+        // create a single service that doesn't require mac address
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation hsia = new UniTagInformation.Builder()
+                .setEnableMacLearning(false)
+                .build();
+        uniTagInformationList.add(hsia);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        boolean isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        // we return true as we don't care wether it's available or not
+        Assert.assertTrue(isMacAvailable);
+    }
+
+    @Test
+    public void testIsMacAddressAvailableViaMacLearning() {
+
+        // create a single service that requires macLearning to be enabled
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        VlanId hsiaCtag = VlanId.vlanId((short) 11);
+        UniTagInformation hsia = new UniTagInformation.Builder()
+                .setPonCTag(hsiaCtag)
+                .setEnableMacLearning(true).build();
+        uniTagInformationList.add(hsia);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        // with no hosts discovered, return false
+        boolean isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        Assert.assertFalse(isMacAvailable);
+
+        // with a discovered host, return true
+        Host fakeHost = new DefaultHost(ProviderId.NONE, HostId.hostId(MacAddress.NONE), MacAddress.ZERO,
+                hsiaCtag, HostLocation.NONE, new HashSet<>(), DefaultAnnotations.builder().build());
+        Set<Host> hosts = new HashSet<>(Arrays.asList(fakeHost));
+        doReturn(hosts).when(oltFlowService.hostService).getConnectedHosts((ConnectPoint) any());
+
+        isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        Assert.assertTrue(isMacAvailable);
+    }
+
+    @Test
+    public void testIsMacAddressAvailableViaConfiguration() {
+        // create a single service that has a macAddress configured
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation hsia = new UniTagInformation.Builder()
+                .setConfiguredMacAddress("2e:0a:00:01:00:00")
+                .build();
+        uniTagInformationList.add(hsia);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        boolean isMacAvailable = oltFlowService.isMacAddressAvailable(testDevice.id(), uniUpdateEnabled, si);
+        Assert.assertTrue(isMacAvailable);
+    }
+
+    @Test
+    public void testHandleSubscriberDhcpFlowsAdd() {
+
+        String usBp = "usBp";
+        String usOltBp = "usOltBp";
+        oltFlowService.enableDhcpV4 = true;
+
+        // create two services, one requires DHCP the other doesn't
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        VlanId hsiaCtag = VlanId.vlanId((short) 11);
+        UniTagInformation hsia = new UniTagInformation.Builder()
+                .setPonCTag(hsiaCtag)
+                .setTechnologyProfileId(64)
+                .setUniTagMatch(VlanId.vlanId(VlanId.NO_VID))
+                .setUpstreamBandwidthProfile(usBp)
+                .setUpstreamOltBandwidthProfile(usOltBp)
+                .setIsDhcpRequired(true).build();
+        UniTagInformation mc = new UniTagInformation.Builder()
+                .setIsDhcpRequired(false).build();
+        uniTagInformationList.add(hsia);
+        uniTagInformationList.add(mc);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                                         false, si);
+
+        // return meter IDs
+        doReturn(MeterId.meterId(2)).when(oltFlowService.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), usBp);
+        doReturn(MeterId.meterId(3)).when(oltFlowService.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), usOltBp);
+
+        // TODO improve the matches on the filter
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(addedSub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(67)))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .add();
+
+        oltFlowService.handleSubscriberDhcpFlows(addedSub.device.id(), addedSub.port,
+                OltFlowService.FlowOperation.ADD, si);
+        verify(oltFlowService.flowObjectiveService, times(1))
+                .filter(eq(addedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testInternalFlowListenerNotMaster() {
+        doReturn(false).when(oltFlowService.oltDeviceService).isLocalLeader(any());
+
+        FlowRule flowRule = DefaultFlowRule.builder()
+                .forDevice(DeviceId.deviceId("foo"))
+                .fromApp(testAppId)
+                .makePermanent()
+                .withPriority(1000)
+                .build();
+        FlowRuleEvent event = new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED,
+                flowRule);
+
+        internalFlowListener.event(event);
+
+        // if we're not master of the device, we should not update
+        verify(internalFlowListener, never()).updateCpStatus(any(), any(), any());
+    }
+
+    @Test
+    public void testInternalFlowListenerDifferentApp() {
+        ApplicationId someAppId = new DefaultApplicationId(1, "org.opencord.olt.not-test");
+        FlowRule flowRule = DefaultFlowRule.builder()
+                .forDevice(DeviceId.deviceId("foo"))
+                .fromApp(someAppId)
+                .makePermanent()
+                .withPriority(1000)
+                .build();
+        FlowRuleEvent event = new FlowRuleEvent(FlowRuleEvent.Type.RULE_ADDED,
+                flowRule);
+
+        internalFlowListener.event(event);
+
+        // if we're not master of the device, we should not update
+        verify(internalFlowListener, never()).updateCpStatus(any(), any(), any());
+    }
+}
\ No newline at end of file
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
deleted file mode 100644
index 752e44d..0000000
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ /dev/null
@@ -1,574 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.impl;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
-import com.google.common.collect.Maps;
-import org.apache.commons.lang3.tuple.Pair;
-import org.junit.Before;
-import org.junit.Test;
-import org.onlab.packet.EthType;
-import org.onlab.packet.MacAddress;
-import org.onlab.packet.VlanId;
-import org.onosproject.cluster.NodeId;
-import org.onosproject.cluster.RoleInfo;
-import org.onosproject.mastership.MastershipInfo;
-import org.onosproject.mastership.MastershipListener;
-import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.DefaultAnnotations;
-import org.onosproject.net.DefaultPort;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.MastershipRole;
-import org.onosproject.net.PortNumber;
-import org.onosproject.net.flow.TrafficSelector;
-import org.onosproject.net.flow.TrafficTreatment;
-import org.onosproject.net.flow.criteria.Criterion;
-import org.onosproject.net.flow.criteria.EthTypeCriterion;
-import org.onosproject.net.flow.criteria.PortCriterion;
-import org.onosproject.net.flow.criteria.VlanIdCriterion;
-import org.onosproject.net.flow.instructions.Instruction;
-import org.onosproject.net.flow.instructions.Instructions;
-import org.onosproject.net.flow.instructions.L2ModificationInstruction;
-import org.onosproject.net.flowobjective.FilteringObjQueueKey;
-import org.onosproject.net.flowobjective.FilteringObjective;
-import org.onosproject.net.flowobjective.ForwardingObjQueueKey;
-import org.onosproject.net.flowobjective.ForwardingObjective;
-import org.onosproject.net.flowobjective.NextObjQueueKey;
-import org.onosproject.net.flowobjective.NextObjective;
-import org.onosproject.net.flowobjective.Objective;
-import org.onosproject.net.meter.MeterId;
-import org.onosproject.net.meter.MeterKey;
-import org.opencord.olt.AccessDevicePort;
-import org.opencord.sadis.BandwidthProfileInformation;
-import org.opencord.sadis.UniTagInformation;
-
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.ListMultimap;
-
-public class OltFlowTest extends TestBase {
-    private OltFlowService oltFlowService;
-    AccessDevicePort uniPort1 = new AccessDevicePort(new DefaultPort(olt, PortNumber.portNumber(1), true,
-            DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "BBSM00010001-1").build()),
-            AccessDevicePort.Type.UNI);
-    AccessDevicePort uniPort2 = new AccessDevicePort(new DefaultPort(olt, PortNumber.portNumber(2), true,
-            DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "BBSM00010002-1").build()),
-            AccessDevicePort.Type.UNI);
-    AccessDevicePort nniPort = new AccessDevicePort(new DefaultPort(olt, PortNumber.portNumber(65535), true,
-            DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "nni-1048576").build()),
-            AccessDevicePort.Type.NNI);
-
-    MacAddress macAddress = MacAddress.valueOf("00:00:00:00:0a:0b");
-
-    UniTagInformation.Builder tagInfoBuilder = new UniTagInformation.Builder();
-    UniTagInformation uniTagInfo = tagInfoBuilder.setUniTagMatch(VlanId.vlanId((short) 35))
-            .setPonCTag(VlanId.vlanId((short) 33))
-            .setPonSTag(VlanId.vlanId((short) 7))
-            .setDsPonCTagPriority(0)
-            .setUsPonSTagPriority(0)
-            .setTechnologyProfileId(64)
-            .setDownstreamBandwidthProfile(dsBpId)
-            .setUpstreamBandwidthProfile(usBpId)
-            .setIsDhcpRequired(true)
-            .setIsIgmpRequired(true)
-            .build();
-
-    UniTagInformation.Builder tagInfoBuilderNoPcp = new UniTagInformation.Builder();
-    UniTagInformation uniTagInfoNoPcp = tagInfoBuilderNoPcp.setUniTagMatch(VlanId.vlanId((short) 35))
-            .setPonCTag(VlanId.vlanId((short) 34))
-            .setPonSTag(VlanId.vlanId((short) 7))
-            .setDsPonCTagPriority(-1)
-            .setUsPonSTagPriority(-1)
-            .setUsPonCTagPriority(-1)
-            .setDsPonSTagPriority(-1)
-            .setTechnologyProfileId(64)
-            .setDownstreamBandwidthProfile(dsBpId)
-            .setUpstreamBandwidthProfile(usBpId)
-            .setIsDhcpRequired(true)
-            .setIsIgmpRequired(true)
-            .build();
-
-    UniTagInformation.Builder tagInfoBuilder2 = new UniTagInformation.Builder();
-    UniTagInformation uniTagInfoNoDhcpNoIgmp = tagInfoBuilder2
-            .setUniTagMatch(VlanId.vlanId((short) 35))
-            .setPonCTag(VlanId.vlanId((short) 33))
-            .setPonSTag(VlanId.vlanId((short) 7))
-            .setDsPonCTagPriority(0)
-            .setUsPonSTagPriority(0)
-            .setTechnologyProfileId(64)
-            .setDownstreamBandwidthProfile(dsBpId)
-            .setUpstreamBandwidthProfile(usBpId)
-            .build();
-
-    @Before
-    public void setUp() {
-        oltFlowService = new OltFlowService();
-        oltFlowService.oltMeterService = new MockOltMeterService();
-        oltFlowService.flowObjectiveService = new MockOltFlowObjectiveService();
-        oltFlowService.mastershipService = new MockMastershipService();
-        oltFlowService.sadisService = new MockSadisService();
-        oltFlowService.bpService = oltFlowService.sadisService.getBandwidthProfileService();
-        oltFlowService.appId = appId;
-        oltFlowService.pendingEapolForDevice = Maps.newConcurrentMap();
-    }
-
-    @Test
-    public void testDhcpFiltering() {
-        System.out.println(uniPort1);
-        oltFlowService.flowObjectiveService.clearQueue();
-        // ensure upstream dhcp traps can be added and removed
-        oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                true, true, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
-        oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                false, true, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
-
-        // Ensure upstream flow has no pcp unless properly specified.
-        oltFlowService.processDhcpFilteringObjectives(uniPort2,
-                usMeterId, null, uniTagInfoNoPcp,
-                true, true, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
-
-        // ensure upstream flows are not added if uniTagInfo is missing dhcp requirement
-        oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfoNoDhcpNoIgmp,
-                true, true, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
-
-        // ensure downstream traps don't succeed without global config for nni ports
-        oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null, null,
-                true, false, Optional.empty());
-        oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null, null,
-                false, false, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
-        // do global config for nni ports and now it should succeed
-        oltFlowService.enableDhcpOnNni = true;
-        oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null, null,
-                true, false, Optional.empty());
-        oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null, null,
-                false, false, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 5;
-
-        // turn on DHCPv6 and we should get 2 flows
-        oltFlowService.enableDhcpV6 = true;
-        oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                true, true, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 7;
-
-        // turn off DHCPv4 and it's only v6
-        oltFlowService.enableDhcpV4 = false;
-        oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                true, true, Optional.empty());
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 8;
-
-        // cleanup
-        oltFlowService.flowObjectiveService.clearQueue();
-        oltFlowService.enableDhcpV4 = true;
-        oltFlowService.enableDhcpV6 = false;
-    }
-
-    @Test
-    public void testPppoedFiltering() {
-        oltFlowService.flowObjectiveService.clearQueue();
-
-        // ensure pppoed traps are not added if global config is off.
-        oltFlowService.enablePppoe = false;
-        oltFlowService.processPPPoEDFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                true, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 0;
-
-        // ensure upstream pppoed traps can be added and removed
-        oltFlowService.enablePppoe = true;
-        oltFlowService.processPPPoEDFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                true, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
-        oltFlowService.processPPPoEDFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                false, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
-
-        // ensure downstream pppoed traps can be added and removed
-        oltFlowService.processPPPoEDFilteringObjectives(nniPort,
-                null, null, null,
-                true, false);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
-        oltFlowService.processPPPoEDFilteringObjectives(nniPort,
-                null, null, null,
-                false, false);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 4;
-
-        // cleanup
-        oltFlowService.flowObjectiveService.clearQueue();
-    }
-
-    @Test
-    public void testIgmpFiltering() {
-        oltFlowService.flowObjectiveService.clearQueue();
-
-        // ensure igmp flows can be added and removed
-        oltFlowService.processIgmpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfo,
-                true, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
-        oltFlowService.processIgmpFilteringObjectives(uniPort1, usMeterId,
-                null, uniTagInfo,
-                false, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
-
-        // ensure igmp flow is not added if uniTag has no igmp requirement
-        oltFlowService.processIgmpFilteringObjectives(uniPort1,
-                usMeterId, null, uniTagInfoNoDhcpNoIgmp,
-                true, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
-
-        //ensure igmp flow on NNI fails without global setting
-        oltFlowService.processIgmpFilteringObjectives(nniPort,
-                null, null, null,
-                true, false);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
-
-        // igmp trap on NNI should succeed with global config
-        oltFlowService.enableIgmpOnNni = true;
-        oltFlowService.processIgmpFilteringObjectives(nniPort,
-                null, null, null,
-                true, false);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
-        // cleanup
-        oltFlowService.flowObjectiveService.clearQueue();
-
-    }
-
-    @Test
-    public void testEapolFiltering() {
-        addBandwidthProfile(uniTagInfo.getUpstreamBandwidthProfile());
-        oltFlowService.enableEapol = true;
-
-        //will install
-        oltFlowService.processEapolFilteringObjectives(uniPort1,
-                uniTagInfo.getUpstreamBandwidthProfile(), Optional.empty(), new CompletableFuture<>(),
-                uniTagInfo.getUniTagMatch(), true);
-
-        //bp profile doesn't exist
-        oltFlowService.processEapolFilteringObjectives(uniPort1,
-                uniTagInfo.getDownstreamBandwidthProfile(), Optional.empty(), new CompletableFuture<>(),
-                uniTagInfo.getUniTagMatch(), true);
-    }
-
-    @Test
-    public void testLldpFiltering() {
-        oltFlowService.processLldpFilteringObjective(nniPort, true);
-        oltFlowService.processLldpFilteringObjective(nniPort, false);
-    }
-
-    @Test
-    public void testNniFiltering() {
-        oltFlowService.flowObjectiveService.clearQueue();
-        oltFlowService.enableDhcpOnNni = true;
-        oltFlowService.enableIgmpOnNni = true;
-        oltFlowService.processNniFilteringObjectives(nniPort, true);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives()
-                .size() == 3;
-        oltFlowService.processNniFilteringObjectives(nniPort, false);
-        assert oltFlowService.flowObjectiveService.getPendingFlowObjectives()
-                .size() == 6;
-        oltFlowService.flowObjectiveService.clearQueue();
-    }
-
-    @Test
-    public void testUpBuilder() {
-        ForwardingObjective objective =
-                oltFlowService.createUpBuilder(nniPort, uniPort1, usMeterId, usMeterId, uniTagInfo).add();
-        checkObjective(objective, true);
-    }
-
-    @Test
-    public void testDownBuilder() {
-        ForwardingObjective objective =
-                oltFlowService.createDownBuilder(nniPort, uniPort1, dsMeterId, dsMeterId, uniTagInfo,
-                        Optional.of(macAddress)).remove();
-        checkObjective(objective, false);
-    }
-
-    private void checkObjective(ForwardingObjective fwd, boolean upstream) {
-        TrafficTreatment treatment = fwd.treatment();
-
-        //check instructions
-        Set<Instructions.MeterInstruction> meters = treatment.meters();
-        assert !meters.isEmpty();
-
-        Instructions.MetadataInstruction writeMetadata = treatment.writeMetadata();
-        assert writeMetadata != null;
-
-        List<Instruction> immediateInstructions = treatment.immediate();
-        Optional<Instruction> vlanInstruction = immediateInstructions.stream()
-                .filter(i -> i.type() == Instruction.Type.L2MODIFICATION)
-                .filter(i -> ((L2ModificationInstruction) i).subtype() ==
-                        L2ModificationInstruction.L2SubType.VLAN_PUSH ||
-                        ((L2ModificationInstruction) i).subtype() ==
-                                L2ModificationInstruction.L2SubType.VLAN_POP)
-                .findAny();
-
-        assert vlanInstruction.isPresent();
-
-        //check match criteria
-        TrafficSelector selector = fwd.selector();
-        assert selector.getCriterion(Criterion.Type.IN_PORT) != null;
-        assert selector.getCriterion(Criterion.Type.VLAN_VID) != null;
-
-        if (!upstream) {
-            assert selector.getCriterion(Criterion.Type.METADATA) != null;
-            assert selector.getCriterion(Criterion.Type.ETH_DST) != null;
-        }
-    }
-
-    private class MockOltMeterService implements org.opencord.olt.internalapi.AccessDeviceMeterService {
-        @Override
-        public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
-            return null;
-        }
-
-        @Override
-        public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
-            return null;
-        }
-
-
-        @Override
-        public ImmutableSet<MeterKey> getProgMeters() {
-            return null;
-        }
-
-        @Override
-        public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
-                                   CompletableFuture<Object> meterFuture) {
-            return usMeterId;
-        }
-
-        @Override
-        public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-
-        }
-
-        @Override
-        public boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-            return false;
-        }
-
-
-        @Override
-        public void clearMeters(DeviceId deviceId) {
-        }
-
-        @Override
-        public void clearDeviceState(DeviceId deviceId) {
-
-        }
-    }
-
-    private class MockOltFlowObjectiveService implements org.onosproject.net.flowobjective.FlowObjectiveService {
-        List<String> flowObjectives = new ArrayList<>();
-
-        @Override
-        public void filter(DeviceId deviceId, FilteringObjective filteringObjective) {
-            flowObjectives.add(filteringObjective.toString());
-            EthTypeCriterion ethType = (EthTypeCriterion)
-                    filterForCriterion(filteringObjective.conditions(), Criterion.Type.ETH_TYPE);
-
-            Instructions.MeterInstruction meter = filteringObjective.meta().metered();
-            Instruction writeMetadata = filteringObjective.meta().writeMetadata();
-            VlanIdCriterion vlanIdCriterion = (VlanIdCriterion)
-                    filterForCriterion(filteringObjective.conditions(), Criterion.Type.VLAN_VID);
-            PortCriterion portCriterion = (PortCriterion) filteringObjective.key();
-
-            filteringObjective.meta().allInstructions().forEach(instruction -> {
-                if (instruction.type().equals(Instruction.Type.L2MODIFICATION)) {
-                    L2ModificationInstruction l2Instruction = (L2ModificationInstruction) instruction;
-                    if (l2Instruction.subtype().equals(L2ModificationInstruction.L2SubType.VLAN_PCP)) {
-                        //this, given the uniTagInfo we provide, should not be present
-                        assert false;
-                    }
-                }
-            });
-
-
-            if (ethType.ethType().equals(EthType.EtherType.LLDP.ethType()) ||
-                    portCriterion.port().equals(nniPort.number())) {
-                assert meter == null;
-                assert writeMetadata == null;
-                assert vlanIdCriterion == null;
-            } else {
-                assert meter.meterId().equals(usMeterId) || meter.meterId().equals(dsMeterId);
-                assert writeMetadata != null;
-                assert vlanIdCriterion == null || vlanIdCriterion.vlanId() == uniTagInfo.getUniTagMatch()
-                        || vlanIdCriterion.vlanId() == uniTagInfoNoPcp.getUniTagMatch();
-            }
-
-        }
-
-        @Override
-        public void forward(DeviceId deviceId, ForwardingObjective forwardingObjective) {
-
-        }
-
-        @Override
-        public void next(DeviceId deviceId, NextObjective nextObjective) {
-
-        }
-
-        @Override
-        public int allocateNextId() {
-            return 0;
-        }
-
-        @Override
-        public void initPolicy(String s) {
-
-        }
-
-        @Override
-        public void apply(DeviceId deviceId, Objective objective) {
-
-        }
-
-        @Override
-        public Map<Pair<Integer, DeviceId>, List<String>> getNextMappingsChain() {
-            return null;
-        }
-
-        @Override
-        public List<String> getNextMappings() {
-            return null;
-        }
-
-        @Override
-        public List<String> getPendingFlowObjectives() {
-            return ImmutableList.copyOf(flowObjectives);
-        }
-
-        @Override
-        public ListMultimap<FilteringObjQueueKey, Objective> getFilteringObjQueue() {
-            return null;
-        }
-
-        @Override
-        public ListMultimap<ForwardingObjQueueKey, Objective> getForwardingObjQueue() {
-            return null;
-        }
-
-        @Override
-        public ListMultimap<NextObjQueueKey, Objective> getNextObjQueue() {
-            return null;
-        }
-
-        @Override
-        public Map<FilteringObjQueueKey, Objective> getFilteringObjQueueHead() {
-            return null;
-        }
-
-        @Override
-        public Map<ForwardingObjQueueKey, Objective> getForwardingObjQueueHead() {
-            return null;
-        }
-
-        @Override
-        public Map<NextObjQueueKey, Objective> getNextObjQueueHead() {
-            return null;
-        }
-
-        @Override
-        public void clearQueue() {
-            flowObjectives.clear();
-        }
-
-        private Criterion filterForCriterion(Collection<Criterion> criteria, Criterion.Type type) {
-            return criteria.stream()
-                    .filter(c -> c.type().equals(type))
-                    .limit(1)
-                    .findFirst().orElse(null);
-        }
-    }
-
-    private class MockMastershipService implements org.onosproject.mastership.MastershipService {
-        @Override
-        public MastershipRole getLocalRole(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public boolean isLocalMaster(DeviceId deviceId) {
-            return true;
-        }
-
-        @Override
-        public CompletableFuture<MastershipRole> requestRoleFor(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public CompletableFuture<Void> relinquishMastership(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public NodeId getMasterFor(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public RoleInfo getNodesFor(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public MastershipInfo getMastershipFor(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public Set<DeviceId> getDevicesOf(NodeId nodeId) {
-            return null;
-        }
-
-        @Override
-        public void addListener(MastershipListener mastershipListener) {
-
-        }
-
-        @Override
-        public void removeListener(MastershipListener mastershipListener) {
-
-        }
-    }
-}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltMeterServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltMeterServiceTest.java
new file mode 100644
index 0000000..a7d653c
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltMeterServiceTest.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterServiceAdapter;
+import org.onosproject.net.meter.MeterState;
+import org.onosproject.store.service.StorageServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.sadis.SadisService;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+
+public class OltMeterServiceTest extends OltTestHelpers {
+    OltMeterService oltMeterService;
+    OltMeterService component;
+
+    DeviceId deviceId = DeviceId.deviceId("foo");
+
+    @Before
+    public void setUp() {
+        component = new OltMeterService();
+        component.cfgService = new ComponentConfigAdapter();
+        component.coreService = new CoreServiceAdapter();
+        component.storageService = new StorageServiceAdapter();
+        component.sadisService = Mockito.mock(SadisService.class);
+        component.meterService = new MeterServiceAdapter();
+        component.storageService = new TestStorageService();
+        component.activate(null);
+        oltMeterService = Mockito.spy(component);
+    }
+
+    @After
+    public void tearDown() {
+        component.deactivate(null);
+    }
+
+    @Test
+    public void testHasMeter() {
+
+        MeterData meterPending = new MeterData(MeterId.meterId(1),
+                MeterState.PENDING_ADD, "pending");
+        MeterData meterAdded = new MeterData(MeterId.meterId(2),
+                MeterState.ADDED, DEFAULT_BP_ID_DEFAULT);
+
+        Map<String, MeterData> deviceMeters = new HashMap<>();
+        deviceMeters.put("pending", meterPending);
+        deviceMeters.put(DEFAULT_BP_ID_DEFAULT, meterAdded);
+        oltMeterService.programmedMeters.put(deviceId, deviceMeters);
+
+        assert oltMeterService.hasMeterByBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT);
+        assert !oltMeterService.hasMeterByBandwidthProfile(deviceId, "pending");
+        assert !oltMeterService.hasMeterByBandwidthProfile(deviceId, "someBandwidthProfile");
+
+        assert !oltMeterService.hasMeterByBandwidthProfile(DeviceId.deviceId("bar"), DEFAULT_BP_ID_DEFAULT);
+    }
+
+    @Test
+    public void testGetMeterId() {
+
+        MeterData meterAdded = new MeterData(MeterId.meterId(2),
+                MeterState.ADDED, DEFAULT_BP_ID_DEFAULT);
+
+        Map<String, MeterData> deviceMeters = new HashMap<>();
+        deviceMeters.put(DEFAULT_BP_ID_DEFAULT, meterAdded);
+        oltMeterService.programmedMeters.put(deviceId, deviceMeters);
+
+        Assert.assertNull(oltMeterService.getMeterIdForBandwidthProfile(deviceId, "pending"));
+        Assert.assertEquals(MeterId.meterId(2),
+                oltMeterService.getMeterIdForBandwidthProfile(deviceId, DEFAULT_BP_ID_DEFAULT));
+    }
+
+    @Test
+    public void testCreateMeter() {
+
+        DeviceId deviceId = DeviceId.deviceId("foo");
+        String bp = "Default";
+
+        // if we already have a meter do nothing and return true
+        doReturn(true).when(oltMeterService).hasMeterByBandwidthProfile(deviceId, bp);
+        Assert.assertTrue(oltMeterService.createMeter(deviceId, bp));
+        verify(oltMeterService, never()).createMeterForBp(any(), any());
+
+        // if we have a pending meter, do nothing and return false
+        doReturn(false).when(oltMeterService).hasMeterByBandwidthProfile(deviceId, bp);
+        doReturn(true).when(oltMeterService).hasPendingMeterByBandwidthProfile(deviceId, bp);
+        Assert.assertFalse(oltMeterService.createMeter(deviceId, bp));
+        verify(oltMeterService, never()).createMeterForBp(any(), any());
+
+        // if the meter is not present at all, create it and return false
+        doReturn(false).when(oltMeterService).hasMeterByBandwidthProfile(deviceId, bp);
+        doReturn(false).when(oltMeterService).hasPendingMeterByBandwidthProfile(deviceId, bp);
+        Assert.assertFalse(oltMeterService.createMeter(deviceId, bp));
+        verify(oltMeterService, times(1)).createMeterForBp(deviceId, bp);
+    }
+
+    @Test
+    public void testConcurrentMeterCreation() throws InterruptedException {
+
+        ExecutorService executor = Executors.newFixedThreadPool(4);
+
+        DeviceId deviceId = DeviceId.deviceId("foo");
+        String bp = "Default";
+
+        // try to create 4 meters at the same time, only one should be created
+        for (int i = 0; i < 4; i++) {
+
+            executor.execute(() -> {
+                oltMeterService.createMeter(deviceId, bp);
+            });
+        }
+
+        TimeUnit.MILLISECONDS.sleep(600);
+
+        verify(oltMeterService, times(4)).hasMeterByBandwidthProfile(deviceId, bp);
+        verify(oltMeterService, times(1)).createMeterForBp(deviceId, bp);
+    }
+}
\ No newline at end of file
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java b/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java
deleted file mode 100644
index 000ea35..0000000
--- a/impl/src/test/java/org/opencord/olt/impl/OltMeterTest.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.impl;
-
-import com.google.common.collect.ImmutableMap;
-import org.junit.Before;
-import org.junit.Test;
-import org.onosproject.cfg.ComponentConfigAdapter;
-import org.onosproject.core.CoreServiceAdapter;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.meter.DefaultMeter;
-import org.onosproject.net.meter.Meter;
-import org.onosproject.net.meter.MeterId;
-import org.onosproject.net.meter.MeterKey;
-import org.onosproject.net.meter.MeterListener;
-import org.onosproject.net.meter.MeterRequest;
-import org.onosproject.store.service.TestStorageService;
-import org.opencord.sadis.BandwidthProfileInformation;
-
-import java.util.Collection;
-import java.util.concurrent.CompletableFuture;
-
-public class OltMeterTest extends TestBase {
-    private OltMeterService oltMeterService;
-
-    private BandwidthProfileInformation bandwidthProfileInformation = new BandwidthProfileInformation();
-
-    @Before
-    public void setUp() {
-        oltMeterService = new OltMeterService();
-        oltMeterService.storageService = new TestStorageService();
-        oltMeterService.meterService = new MockMeterService();
-        oltMeterService.coreService = new CoreServiceAdapter();
-        oltMeterService.componentConfigService = new ComponentConfigAdapter();
-        oltMeterService.activate(null);
-        oltMeterService.bpInfoToMeter = new MockConsistentMultimap<>();
-    }
-
-    @Test
-    public void testAddAndGetMeterIdToBpMapping() {
-        oltMeterService.addMeterIdToBpMapping(DEVICE_ID_1, usMeterId, usBpId);
-        MeterId usMeterId = oltMeterService.getMeterIdFromBpMapping(DEVICE_ID_1, usBpId);
-        assert usMeterId.equals(this.usMeterId);
-
-        oltMeterService.addMeterIdToBpMapping(DEVICE_ID_1, dsMeterId, dsBpId);
-        MeterId dsMeterId = oltMeterService.getMeterIdFromBpMapping(DEVICE_ID_1, dsBpId);
-        assert  dsMeterId.equals(this.dsMeterId);
-
-        ImmutableMap<String, Collection<MeterKey>> meterMappings = oltMeterService.getBpMeterMappings();
-        assert  meterMappings.size() == 2;
-    }
-
-    @Test
-    public void testCreateMeter() {
-        //with provided bandwidth profile information
-        bandwidthProfileInformation.setId(usBpId);
-        bandwidthProfileInformation.setExceededInformationRate(10000);
-        bandwidthProfileInformation.setExceededBurstSize(10000L);
-        bandwidthProfileInformation.setCommittedBurstSize(10000L);
-        bandwidthProfileInformation.setCommittedInformationRate(10000);
-
-        oltMeterService.addMeterIdToBpMapping(DEVICE_ID_1, usMeterId, usBpId);
-
-
-        MeterId meterId =
-                oltMeterService.createMeter(DEVICE_ID_1, bandwidthProfileInformation, new CompletableFuture<>());
-        assert meterId != null;
-
-        //with null bandwidth profile information
-        meterId = oltMeterService.createMeter(DEVICE_ID_1, null, new CompletableFuture<>());
-        assert meterId == null;
-    }
-
-
-    private class MockMeterService implements org.onosproject.net.meter.MeterService {
-        @Override
-        public Meter submit(MeterRequest meterRequest) {
-            return DefaultMeter.builder()
-                    .forDevice(DEVICE_ID_1)
-                    .fromApp(appId)
-                    .withId(usMeterId)
-                    .build();
-        }
-
-        @Override
-        public void withdraw(MeterRequest meterRequest, MeterId meterId) {
-
-        }
-
-        @Override
-        public Meter getMeter(DeviceId deviceId, MeterId meterId) {
-            return null;
-        }
-
-        @Override
-        public Collection<Meter> getAllMeters() {
-            return null;
-        }
-
-        @Override
-        public Collection<Meter> getMeters(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public MeterId allocateMeterId(DeviceId deviceId) {
-            return null;
-        }
-
-        @Override
-        public void freeMeterId(DeviceId deviceId, MeterId meterId) {
-
-        }
-
-        @Override
-        public void addListener(MeterListener meterListener) {
-
-        }
-
-        @Override
-        public void removeListener(MeterListener meterListener) {
-
-        }
-    }
-}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTest.java b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
index af63004..e8a76d0 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTest.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,155 +13,196 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.opencord.olt.impl;
 
-import static org.junit.Assert.assertEquals;
-
-import java.util.Set;
-
-import com.google.common.collect.Maps;
+import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
+import org.junit.runner.RunWith;
+import org.mockito.Mockito;
+import org.mockito.runners.MockitoJUnitRunner;
 import org.onlab.packet.ChassisId;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.net.AnnotationKeys;
-import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DefaultDevice;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.Element;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
-import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.provider.ProviderId;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class OltTest extends TestBase {
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.atLeastOnce;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+
+/**
+ * Set of tests of the ONOS application component.
+ */
+
+@RunWith(MockitoJUnitRunner.class)
+public class OltTest extends OltTestHelpers {
+
+    private Olt component;
+    private final ApplicationId testAppId = new DefaultApplicationId(1, "org.opencord.olt.test");
     private final Logger log = LoggerFactory.getLogger(getClass());
-    private Olt olt;
 
-
-    private static final String SCHEME_NAME = "olt";
-    private static final DefaultAnnotations DEVICE_ANNOTATIONS = DefaultAnnotations.builder()
-            .set(AnnotationKeys.PROTOCOL, SCHEME_NAME.toUpperCase()).build();
+    private DeviceId deviceId = DeviceId.deviceId("test-device");
+    private Device testDevice = new DefaultDevice(ProviderId.NONE, deviceId, Device.Type.OLT,
+            "testManufacturer", "1.0", "1.0", "SN", new ChassisId(1));
+    private Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
+                                                DefaultAnnotations.builder().set(AnnotationKeys.PORT_NAME, "uni-1")
+                                                        .build());
+    private ConnectPoint cp = new ConnectPoint(deviceId, uniUpdateEnabled.number());
+    private DiscoveredSubscriber sub;
 
     @Before
     public void setUp() {
-        olt = new Olt();
-        olt.deviceService = new MockDeviceService();
-        olt.sadisService = new MockSadisService();
-        olt.subsService = olt.sadisService.getSubscriberInfoService();
-        olt.pendingSubscribersForDevice = Maps.newConcurrentMap();
+        component = new Olt();
+        component.requeueDelay = 0; // avoid delays in the queue add to make things easier in testing
+        component.cfgService = new ComponentConfigAdapter();
+        component.deviceService = Mockito.mock(DeviceService.class);
+        component.storageService = new TestStorageService();
+        component.coreService = Mockito.spy(new CoreServiceAdapter());
+        component.oltDeviceService = Mockito.mock(OltDeviceService.class);
+
+        doReturn(testAppId).when(component.coreService).registerApplication("org.opencord.olt");
+
+        component.discoveredSubscriberExecutor =
+                Executors.newSingleThreadScheduledExecutor(
+                        groupedThreads("onos/olt",
+                "discovered-cp-%d", log));
+        component.flowsExecutor =
+                Executors.newFixedThreadPool(component.flowProcessingThreads,
+                                             groupedThreads("onos/olt-service",
+                                                            "flows-installer-%d"));
+
+        component.subscriberExecutor = Executors.newFixedThreadPool(component.subscriberProcessingThreads,
+                                                          groupedThreads("onos/olt-service",
+                                                                         "subscriber-installer-%d"));
+        SadisService sadisService = Mockito.mock(SadisService.class);
+        component.oltFlowService = Mockito.mock(OltFlowService.class);
+        component.sadisService = sadisService;
+
+        // reset the spy on oltFlowService
+        reset(component.oltFlowService);
+
+        component.bindSadisService(sadisService);
+        component.eventsQueues = component.storageService.
+                <ConnectPoint, LinkedBlockingQueue<DiscoveredSubscriber>>consistentMapBuilder().build().asJavaMap();
+        component.eventsQueues.put(cp, new LinkedBlockingQueue<>());
+
+        component.discoveredSubscriberExecutor.execute(() -> {
+            component.processDiscoveredSubscribers();
+        });
+        // create empty service for testing
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        UniTagInformation empty = new UniTagInformation.Builder().build();
+        uniTagInformationList.add(empty);
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+        sub = new DiscoveredSubscriber(testDevice,
+                                         uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                                         false, si);
     }
 
-    /**
-     * Tests that the getSubscriber method does throw a NullPointerException with a meaningful message.
-     */
+    @After
+    public void tearDown() {
+        component.deactivate(null);
+    }
+
     @Test
-    public void testGetSubscriberError() {
-        ConnectPoint cp = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1);
-        try {
-            olt.getSubscriber(cp);
-        } catch (NullPointerException e) {
-            assertEquals(e.getMessage(), "Invalid connect point");
-        }
+    public void testProcessDiscoveredSubscribersBasicPortSuccess() throws Exception {
+        doReturn(true).when(component.deviceService).isAvailable(any());
+        doReturn(sub.port).when(component.deviceService).getPort(any(), any());
+        doReturn(true).when(component.oltFlowService).handleBasicPortFlows(eq(sub), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+        doReturn(true).when(component.oltDeviceService).isLocalLeader(cp.deviceId());
+
+        // adding the discovered subscriber to the queue
+        LinkedBlockingQueue<DiscoveredSubscriber> q = component.eventsQueues.get(cp);
+        q.add(sub);
+        component.eventsQueues.put(cp, q);
+
+        // check that we're calling the correct method
+        TimeUnit.MILLISECONDS.sleep(600);
+        verify(component.oltFlowService, atLeastOnce()).handleBasicPortFlows(eq(sub), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+
+        // check if the method doesn't throw an exception we're removing the subscriber from the queue
+        LinkedBlockingQueue<DiscoveredSubscriber> updatedQueue = component.eventsQueues.get(cp);
+        assert updatedQueue.isEmpty();
     }
 
-    /**
-     * Tests that the getSubscriber method returns Subscriber informations.
-     */
     @Test
-    public void testGetSubscriber() {
-        ConnectPoint cp = ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 2);
+    public void testProcessDiscoveredSubscribersBasicPortException() throws Exception {
+        doReturn(true).when(component.deviceService).isAvailable(any());
+        doReturn(sub.port).when(component.deviceService).getPort(any(), any());
+        doReturn(false).when(component.oltFlowService).handleBasicPortFlows(any(), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+        doReturn(true).when(component.oltDeviceService).isLocalLeader(cp.deviceId());
+        // replace the queue with a spy
+        LinkedBlockingQueue<DiscoveredSubscriber> q = component.eventsQueues.get(cp);
+        LinkedBlockingQueue<DiscoveredSubscriber> spiedQueue = spy(q);
+        // adding the discovered subscriber to the queue
+        spiedQueue.add(sub);
+        component.eventsQueues.put(cp, spiedQueue);
 
-        SubscriberAndDeviceInformation s = olt.getSubscriber(cp);
+        TimeUnit.MILLISECONDS.sleep(600);
 
-        assertEquals(s.circuitId(), CLIENT_CIRCUIT_ID);
-        assertEquals(s.nasPortId(), CLIENT_NAS_PORT_ID);
+        // check that we're calling the correct method,
+        // since the subscriber is not removed from the queue we're calling the method multiple times
+        verify(component.oltFlowService, atLeastOnce()).handleBasicPortFlows(eq(sub), eq(DEFAULT_BP_ID_DEFAULT),
+                eq(DEFAULT_BP_ID_DEFAULT));
+
+        // check if the method throws an exception we are not removing the subscriber from the queue
+        verify(spiedQueue, never()).remove(sub);
     }
 
-    private class MockDevice extends DefaultDevice {
+    @Test
+    public void testAddSubscriberToQueue() {
 
-        public MockDevice(ProviderId providerId, DeviceId id, Type type,
-                          String manufacturer, String hwVersion, String swVersion,
-                          String serialNumber, ChassisId chassisId, Annotations... annotations) {
-            super(providerId, id, type, manufacturer, hwVersion, swVersion, serialNumber,
-                    chassisId, annotations);
-        }
+        // replace the queue with a spy
+        LinkedBlockingQueue<DiscoveredSubscriber> q = component.eventsQueues.get(cp);
+        LinkedBlockingQueue<DiscoveredSubscriber> spiedQueue = spy(q);
+        component.eventsQueues.put(cp, spiedQueue);
+
+        // trying to add the same event twice should result in a single item in the queue
+        component.addSubscriberToQueue(sub);
+        component.addSubscriberToQueue(sub);
+
+        verify(spiedQueue, times(1)).add(sub);
+        Assert.assertEquals(1, spiedQueue.size());
+
+
+
     }
 
-    private class MockDeviceService extends DeviceServiceAdapter {
-
-        private ProviderId providerId = new ProviderId("of", "foo");
-        private final Device device1 = new MockDevice(providerId, DEVICE_ID_1, Device.Type.SWITCH,
-                "foo.inc", "0", "0", OLT_DEV_ID, new ChassisId(),
-                DEVICE_ANNOTATIONS);
-
-        @Override
-        public Device getDevice(DeviceId devId) {
-            return device1;
-
-        }
-
-        @Override
-        public Port getPort(ConnectPoint cp) {
-            log.info("Looking up port {}", cp.port().toString());
-            if (cp.port().toString().equals("1")) {
-                return null;
-            }
-            return new MockPort();
-        }
-    }
-
-    private class MockPort implements Port {
-
-        @Override
-        public boolean isEnabled() {
-            return true;
-        }
-
-        @Override
-        public long portSpeed() {
-            return 1000;
-        }
-
-        @Override
-        public Element element() {
-            return null;
-        }
-
-        @Override
-        public PortNumber number() {
-            return null;
-        }
-
-        @Override
-        public Annotations annotations() {
-            return new MockAnnotations();
-        }
-
-        @Override
-        public Type type() {
-            return Port.Type.FIBER;
-        }
-
-        private class MockAnnotations implements Annotations {
-
-            @Override
-            public String value(String val) {
-                return "BRCM12345678";
-            }
-
-            @Override
-            public Set<String> keys() {
-                return null;
-            }
-        }
-    }
-
-
-}
\ No newline at end of file
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltTestHelpers.java b/impl/src/test/java/org/opencord/olt/impl/OltTestHelpers.java
new file mode 100644
index 0000000..6965bd2
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/OltTestHelpers.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import com.google.common.collect.Maps;
+import org.mockito.ArgumentMatcher;
+import org.onosproject.net.Annotations;
+import org.onosproject.net.Element;
+import org.onosproject.net.Port;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.opencord.sadis.BandwidthProfileInformation;
+
+import java.util.Map;
+
+@SuppressWarnings("checkstyle:HideUtilityClassConstructor")
+public class OltTestHelpers {
+
+    protected static final String CLIENT_NAS_PORT_ID = "PON 1/1";
+    protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
+    protected static final String OLT_DEV_ID = "of:00000000000000aa";
+    Map<String, BandwidthProfileInformation> bpInformation = Maps.newConcurrentMap();
+
+    protected class FilteringObjectiveMatcher extends ArgumentMatcher<FilteringObjective> {
+
+        private FilteringObjective left;
+
+        public FilteringObjectiveMatcher(FilteringObjective left) {
+            this.left = left;
+        }
+
+        @Override
+        public boolean matches(Object right) {
+            // NOTE this matcher can be improved
+            FilteringObjective r = (FilteringObjective) right;
+            boolean matches = left.type().equals(r.type()) &&
+                    left.key().equals(r.key()) &&
+                    left.conditions().equals(r.conditions()) &&
+                    left.appId().equals(r.appId()) &&
+                    left.priority() == r.priority();
+
+            if (left.meta() != null) {
+                if (left.meta().equals(r.meta())) {
+                    return matches;
+                } else {
+                    return false;
+                }
+            }
+            return matches;
+        }
+    }
+
+    public class OltPort implements Port {
+
+        public boolean enabled;
+        public PortNumber portNumber;
+        public Annotations annotations;
+        public Element element;
+
+        public OltPort(Element element, boolean enabled, PortNumber portNumber, Annotations annotations) {
+            this.enabled = enabled;
+            this.portNumber = portNumber;
+            this.annotations = annotations;
+            this.element = element;
+        }
+
+        @Override
+        public Element element() {
+            return element;
+        }
+
+        @Override
+        public PortNumber number() {
+            return portNumber;
+        }
+
+        @Override
+        public boolean isEnabled() {
+            return enabled;
+        }
+
+        @Override
+        public Type type() {
+            return null;
+        }
+
+        @Override
+        public long portSpeed() {
+            return 0;
+        }
+
+        @Override
+        public Annotations annotations() {
+            return annotations;
+        }
+    }
+}
diff --git a/impl/src/test/java/org/opencord/olt/impl/TestBase.java b/impl/src/test/java/org/opencord/olt/impl/TestBase.java
deleted file mode 100644
index 87db5d1..0000000
--- a/impl/src/test/java/org/opencord/olt/impl/TestBase.java
+++ /dev/null
@@ -1,352 +0,0 @@
-/*
- * Copyright 2016-present Open Networking Foundation
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.opencord.olt.impl;
-
-import com.google.common.collect.HashMultimap;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Multiset;
-import org.onlab.packet.ChassisId;
-import org.onlab.packet.Ip4Address;
-import org.onlab.packet.MacAddress;
-import org.onosproject.core.DefaultApplicationId;
-import org.onosproject.net.DefaultDevice;
-import org.onosproject.net.Device;
-import org.onosproject.net.DeviceId;
-import org.onosproject.net.meter.MeterId;
-import org.onosproject.store.service.AsyncConsistentMultimap;
-import org.onosproject.store.service.ConsistentMultimap;
-import org.onosproject.store.service.ConsistentMultimapBuilder;
-import org.onosproject.store.service.MultimapEventListener;
-import org.onosproject.store.service.TestConsistentMultimap;
-import org.onosproject.store.service.Versioned;
-import org.opencord.sadis.BandwidthProfileInformation;
-import org.opencord.sadis.BaseInformationService;
-import org.opencord.sadis.SadisService;
-import org.opencord.sadis.SubscriberAndDeviceInformation;
-
-import java.util.AbstractMap;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.stream.Collectors;
-
-public class TestBase {
-
-    protected static final String CLIENT_NAS_PORT_ID = "PON 1/1";
-    protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
-    protected static final String OLT_DEV_ID = "of:00000000000000aa";
-    protected static final DeviceId DEVICE_ID_1 = DeviceId.deviceId(OLT_DEV_ID);
-    protected MeterId usMeterId = MeterId.meterId(1);
-    protected MeterId dsMeterId = MeterId.meterId(2);
-    protected MeterId usOltMeterId = MeterId.meterId(3);
-    protected MeterId dsOltMeterId = MeterId.meterId(4);
-    protected String usBpId = "HSIA-US";
-    protected String dsBpId = "HSIA-DS";
-    protected DefaultApplicationId appId = new DefaultApplicationId(1, "OltServices");
-
-    protected static Device olt = new DefaultDevice(null, DeviceId.deviceId(OLT_DEV_ID), Device.Type.SWITCH,
-            "VOLTHA Project", "open_pon", "open_pon", "BBSIM_OLT_1", new ChassisId("a0a0a0a0a01"));
-
-    Map<String, BandwidthProfileInformation> bpInformation = Maps.newConcurrentMap();
-
-    protected void addBandwidthProfile(String id) {
-        BandwidthProfileInformation bpInfo = new BandwidthProfileInformation();
-        bpInfo.setGuaranteedInformationRate(0);
-        bpInfo.setCommittedInformationRate(10000);
-        bpInfo.setCommittedBurstSize(1000L);
-        bpInfo.setExceededBurstSize(2000L);
-        bpInfo.setExceededInformationRate(20000);
-        bpInformation.put(id, bpInfo);
-    }
-
-    protected class MockSadisService implements SadisService {
-
-        @Override
-        public BaseInformationService<SubscriberAndDeviceInformation> getSubscriberInfoService() {
-            return new MockSubService();
-        }
-
-        @Override
-        public BaseInformationService<BandwidthProfileInformation> getBandwidthProfileService() {
-            return new MockBpService();
-        }
-    }
-
-    private class MockBpService implements BaseInformationService<BandwidthProfileInformation> {
-        @Override
-        public void clearLocalData() {
-
-        }
-
-        @Override
-        public void invalidateAll() {
-
-        }
-
-        @Override
-        public void invalidateId(String id) {
-
-        }
-
-        @Override
-        public BandwidthProfileInformation get(String id) {
-            return bpInformation.get(id);
-        }
-
-        @Override
-        public BandwidthProfileInformation getfromCache(String id) {
-            return null;
-        }
-    }
-
-    private class MockSubService implements BaseInformationService<SubscriberAndDeviceInformation> {
-        MockSubscriberAndDeviceInformation sub =
-                new MockSubscriberAndDeviceInformation(CLIENT_NAS_PORT_ID,
-                        CLIENT_NAS_PORT_ID, CLIENT_CIRCUIT_ID, null, null);
-
-        @Override
-        public SubscriberAndDeviceInformation get(String id) {
-            return sub;
-        }
-
-        @Override
-        public void clearLocalData() {
-
-        }
-
-        @Override
-        public void invalidateAll() {
-        }
-
-        @Override
-        public void invalidateId(String id) {
-        }
-
-        @Override
-        public SubscriberAndDeviceInformation getfromCache(String id) {
-            return null;
-        }
-    }
-
-    private class MockSubscriberAndDeviceInformation extends SubscriberAndDeviceInformation {
-
-        MockSubscriberAndDeviceInformation(String id, String nasPortId,
-                                           String circuitId, MacAddress hardId,
-                                           Ip4Address ipAddress) {
-            this.setHardwareIdentifier(hardId);
-            this.setId(id);
-            this.setIPAddress(ipAddress);
-            this.setNasPortId(nasPortId);
-            this.setCircuitId(circuitId);
-        }
-    }
-
-    class MockConsistentMultimap<K, V> implements ConsistentMultimap<K, V> {
-        private HashMultimap<K, Versioned<V>> innermap;
-        private AtomicLong counter = new AtomicLong();
-
-        public MockConsistentMultimap() {
-            this.innermap = HashMultimap.create();
-        }
-
-        private Versioned<V> version(V v) {
-            return new Versioned<>(v, counter.incrementAndGet(), System.currentTimeMillis());
-        }
-
-        private Versioned<Collection<? extends V>> versionCollection(Collection<? extends V> collection) {
-            return new Versioned<>(collection, counter.incrementAndGet(), System.currentTimeMillis());
-        }
-
-        @Override
-        public int size() {
-            return innermap.size();
-        }
-
-        @Override
-        public boolean isEmpty() {
-            return innermap.isEmpty();
-        }
-
-        @Override
-        public boolean containsKey(K key) {
-            return innermap.containsKey(key);
-        }
-
-        @Override
-        public boolean containsValue(V value) {
-            return innermap.containsValue(value);
-        }
-
-        @Override
-        public boolean containsEntry(K key, V value) {
-            return innermap.containsEntry(key, value);
-        }
-
-        @Override
-        public boolean put(K key, V value) {
-            return innermap.put(key, version(value));
-        }
-
-        @Override
-        public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
-            innermap.put(key, version(value));
-            return (Versioned<Collection<? extends V>>) innermap.get(key);
-        }
-
-        @Override
-        public boolean remove(K key, V value) {
-            return innermap.remove(key, value);
-        }
-
-        @Override
-        public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
-            innermap.remove(key, value);
-            return (Versioned<Collection<? extends V>>) innermap.get(key);
-        }
-
-        @Override
-        public boolean removeAll(K key, Collection<? extends V> values) {
-            return false;
-        }
-
-        @Override
-        public Versioned<Collection<? extends V>> removeAll(K key) {
-            return null;
-        }
-
-        @Override
-        public boolean removeAll(Map<K, Collection<? extends V>> mapping) {
-            return false;
-        }
-
-        @Override
-        public boolean putAll(K key, Collection<? extends V> values) {
-            return false;
-        }
-
-        @Override
-        public boolean putAll(Map<K, Collection<? extends V>> mapping) {
-            return false;
-        }
-
-        @Override
-        public Versioned<Collection<? extends V>> replaceValues(K key, Collection<V> values) {
-            return null;
-        }
-
-        @Override
-        public void clear() {
-            innermap.clear();
-        }
-
-        @Override
-        public Versioned<Collection<? extends V>> get(K key) {
-            Collection<? extends V> values = innermap.get(key).stream()
-                    .map(v -> v.value())
-                    .collect(Collectors.toList());
-            return versionCollection(values);
-        }
-
-        @Override
-        public Set<K> keySet() {
-            return innermap.keySet();
-        }
-
-        @Override
-        public Multiset<K> keys() {
-            return innermap.keys();
-        }
-
-        @Override
-        public Multiset<V> values() {
-            return null;
-        }
-
-        @Override
-        public Collection<Map.Entry<K, V>> entries() {
-            return null;
-        }
-
-        @Override
-        public Iterator<Map.Entry<K, V>> iterator() {
-            return new ConsistentMultimapIterator(innermap.entries().iterator());
-        }
-
-        @Override
-        public Map<K, Collection<V>> asMap() {
-            return null;
-        }
-
-        @Override
-        public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
-        }
-
-        @Override
-        public void removeListener(MultimapEventListener<K, V> listener) {
-        }
-
-        @Override
-        public String name() {
-            return "mock multimap";
-        }
-
-        @Override
-        public Type primitiveType() {
-            return null;
-        }
-
-        private class ConsistentMultimapIterator implements Iterator<Map.Entry<K, V>> {
-
-            private final Iterator<Map.Entry<K, Versioned<V>>> it;
-
-            public ConsistentMultimapIterator(Iterator<Map.Entry<K, Versioned<V>>> it) {
-                this.it = it;
-            }
-
-            @Override
-            public boolean hasNext() {
-                return it.hasNext();
-            }
-
-            @Override
-            public Map.Entry<K, V> next() {
-                Map.Entry<K, Versioned<V>> e = it.next();
-                return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().value());
-            }
-        }
-
-    }
-
-    public static TestConsistentMultimap.Builder builder() {
-        return new TestConsistentMultimap.Builder();
-    }
-
-    public static class Builder<K, V> extends ConsistentMultimapBuilder<K, V> {
-
-        @Override
-        public AsyncConsistentMultimap<K, V> buildMultimap() {
-            return null;
-        }
-
-        @Override
-        public ConsistentMultimap<K, V> build() {
-            return new TestConsistentMultimap<K, V>();
-        }
-    }
-}