[VOL-4577] : Update ONOS olt-app for adding FTTB DPU Management, ANCP traffic & trap rules

Change-Id: Ibb8aad6e68e8bd3b5f5824f0b04f4c5bc2f84a9e
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
index baa880d..878078a 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -55,12 +55,15 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.service.TestStorageService;
+import org.opencord.olt.impl.fttb.FttbUtils;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -70,6 +73,8 @@
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -81,6 +86,7 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.onosproject.net.AnnotationKeys.PORT_NAME;
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.ERROR;
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.NONE;
@@ -90,6 +96,15 @@
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.REMOVED;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DIRECTION;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DOWNSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_UPSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_NAME;
 
 public class OltFlowServiceTest extends OltTestHelpers {
 
@@ -112,6 +127,42 @@
     Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
             DefaultAnnotations.builder().set(PORT_NAME, "uni-1").build());
 
+    private final UniTagInformation dpuMgmtUti = new UniTagInformation.Builder()
+            .setPonCTag(VlanId.vlanId("6"))
+            .setPonSTag(VlanId.vlanId("60"))
+            .setUsPonCTagPriority(1)
+            .setUsPonSTagPriority(2)
+            .setTechnologyProfileId(64)
+            .setUpstreamBandwidthProfile("usBp")
+            .setUpstreamOltBandwidthProfile("usOltBp")
+            .setServiceName(FttbUtils.FTTB_SERVICE_DPU_MGMT_TRAFFIC)
+            .setIsDhcpRequired(true)
+            .setEnableMacLearning(true)
+            .build();
+
+    private final UniTagInformation ancpUti = new UniTagInformation.Builder()
+            .setPonCTag(VlanId.vlanId("4"))
+            .setPonSTag(VlanId.vlanId("40"))
+            .setUsPonCTagPriority(3)
+            .setUsPonSTagPriority(4)
+            .setTechnologyProfileId(64)
+            .setUpstreamBandwidthProfile("usBp")
+            .setUpstreamOltBandwidthProfile("usOltBp")
+            .setServiceName(FttbUtils.FTTB_SERVICE_DPU_ANCP_TRAFFIC)
+            .setIsDhcpRequired(false)
+            .build();
+
+    private final UniTagInformation fttbSubscriberUti = new UniTagInformation.Builder()
+            .setPonCTag(VlanId.vlanId("8"))
+            .setPonSTag(VlanId.vlanId("80"))
+            .setTechnologyProfileId(64)
+            .setUpstreamBandwidthProfile("usBp")
+            .setUpstreamOltBandwidthProfile("usOltBp")
+            .setServiceName(FttbUtils.FTTB_SERVICE_SUBSCRIBER_TRAFFIC)
+            .setIsDhcpRequired(false)
+            .build();
+
+
     @Before
     public void setUp() {
         component = new OltFlowService();
@@ -130,6 +181,11 @@
         doReturn(Mockito.mock(BaseInformationService.class))
                 .when(component.sadisService).getSubscriberInfoService();
         doReturn(testAppId).when(component.coreService).registerApplication("org.opencord.olt");
+        doReturn(testDevice).when(component.deviceService).getDevice(testDevice.id());
+        when(component.sadisService.getSubscriberInfoService().get(testDevice.serialNumber())).
+                thenReturn(Mockito.mock(SubscriberAndDeviceInformation.class));
+        when(component.oltDeviceService.getNniPort(testDevice)).thenReturn(Optional.of(nniPort));
+
         component.activate(null);
         component.bindSadisService(component.sadisService);
 
@@ -384,7 +440,7 @@
                 .withMeta(
                         DefaultTrafficTreatment.builder()
                                 .meter(MeterId.meterId(1))
-                                .writeMetadata(component.createTechProfValueForWriteMetadata(
+                                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(
                                         VlanId.vlanId(eapolDefaultVlan),
                                         component.defaultTechProfileId, MeterId.meterId(1)), 0)
                                 .setOutput(PortNumber.CONTROLLER)
@@ -437,7 +493,7 @@
                 .withMeta(
                         DefaultTrafficTreatment.builder()
                                 .meter(MeterId.meterId(1))
-                                .writeMetadata(component.createTechProfValueForWriteMetadata(
+                                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(
                                         VlanId.vlanId(eapolDefaultVlan),
                                         component.defaultTechProfileId, MeterId.meterId(1)), 0)
                                 .setOutput(PortNumber.CONTROLLER)
@@ -576,6 +632,7 @@
 
     @Test
     public void testHandleNniFlowsPppoe() {
+        component.enableDhcpOnNni = false;
         component.enablePppoeOnNni = true;
         component.enablePppoe = true;
         component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
@@ -599,6 +656,7 @@
 
     @Test
     public void testRemoveNniFlowsPppoe() {
+        component.enableDhcpOnNni = false;
         component.enablePppoeOnNni = true;
         component.enablePppoe = true;
         component.handleNniFlows(testDevice, nniPortDisabled, OltFlowService.FlowOperation.REMOVE);
@@ -937,4 +995,580 @@
         OltPortStatus status = component.cpStatus.get(sk1);
         Assert.assertEquals(REMOVED, status.subscriberFlowsStatus);
     }
+
+    @Test
+    public void testHandleNniFlowsDhcpV4WithNniDhcpTrapVid() {
+        component.enableDhcpOnNni = true;
+        component.enableDhcpV4 = true;
+
+        SubscriberAndDeviceInformation testOltFttbSadis = new SubscriberAndDeviceInformation();
+        testOltFttbSadis.setNniDhcpTrapVid(VlanId.vlanId("60"));
+
+        when(component.sadisService.getSubscriberInfoService().get(testDevice.serialNumber())).
+                thenReturn(testOltFttbSadis);
+
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchVlanId(testOltFttbSadis.nniDhcpTrapVid()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct DHCP filtering objective
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(component.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testHandleFttbSubscriberDhcpFlowsAdd() {
+        component.enableDhcpV4 = true;
+
+        // add two services, one requires DHCP the other doesn't
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        UniTagInformation mc = new UniTagInformation.Builder()
+                .setIsDhcpRequired(false).build();
+        uniTagInformationList.add(dpuMgmtUti);
+        uniTagInformationList.add(mc);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(2);
+        MeterId usOltBpMeterId = MeterId.meterId(3);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        TrafficTreatment expectedTreatment = DefaultTrafficTreatment.builder()
+                .setVlanId(dpuMgmtUti.getPonSTag())
+                .setVlanPcp((byte) dpuMgmtUti.getUsPonSTagPriority())
+                .setOutput(PortNumber.CONTROLLER)
+                .meter(usBpMeterId)
+                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(VlanId.NONE,
+                        dpuMgmtUti.getTechnologyProfileId(), usOltBpMeterId), 0L).build();
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(addedSub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchVlanId(dpuMgmtUti.getPonCTag()))
+                .addCondition(Criteria.matchVlanPcp((byte) dpuMgmtUti.getUsPonCTagPriority()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(expectedTreatment)
+                .add();
+
+        component.handleSubscriberDhcpFlows(addedSub.device.id(), addedSub.port,
+                OltFlowService.FlowOperation.ADD, si);
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(addedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testRemoveFttbSubscriberDhcpFlows() {
+        component.enableDhcpV4 = true;
+
+        // Mocking the get call, to mark the SubscriberKey as already added.
+        component.cpStatus = Mockito.mock(Map.class);
+        doReturn(new OltPortStatus(null, null, null, ADDED, null))
+                .when(component.cpStatus).get(Mockito.any());
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        uniTagInformationList.add(dpuMgmtUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(2);
+        MeterId usOltBpMeterId = MeterId.meterId(3);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        TrafficTreatment expectedTreatment = DefaultTrafficTreatment.builder()
+                .setVlanId(dpuMgmtUti.getPonSTag())
+                .setVlanPcp((byte) dpuMgmtUti.getUsPonSTagPriority())
+                .setOutput(PortNumber.CONTROLLER)
+                .meter(usBpMeterId)
+                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(VlanId.NONE,
+                        dpuMgmtUti.getTechnologyProfileId(), usOltBpMeterId), 0L).build();
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .deny()
+                .withKey(Criteria.matchInPort(removedSub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchVlanId(dpuMgmtUti.getPonCTag()))
+                .addCondition(Criteria.matchVlanPcp((byte) dpuMgmtUti.getUsPonCTagPriority()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(expectedTreatment)
+                .add();
+
+        component.handleSubscriberDhcpFlows(removedSub.device.id(), removedSub.port,
+                OltFlowService.FlowOperation.REMOVE, si);
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(removedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testHandleFttbMacSwitchingFlowsAdd() {
+        component.enableDhcpV4 = true;
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        uniTagInformationList.add(dpuMgmtUti);
+        uniTagInformationList.add(ancpUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        MacAddress mac = MacAddress.valueOf("0A:00:27:00:00:09");
+        Host host = Mockito.mock(Host.class);
+        doReturn(mac).when(host).mac();
+        doReturn(dpuMgmtUti.getPonSTag()).when(host).vlan();
+
+        doReturn(new HashSet<>(Arrays.asList(host))).when(component.hostService)
+                .getConnectedHosts(new ConnectPoint(addedSub.device.id(), addedSub.port.number()));
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(addedSub.port.number())
+                                .matchVlanId(uti.getPonCTag())
+                                .matchVlanPcp((byte) uti.getUsPonCTagPriority())
+                                .matchEthSrc(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setVlanPcp((byte) uti.getUsPonSTagPriority())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchVlanId(uti.getPonSTag())
+                                .matchEthDst(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(addedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), addedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .add();
+
+                component.handleSubscriberDataFlows(addedSub.device, addedSub.port,
+                        OltFlowService.FlowOperation.ADD, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(addedSub.device.id()), eq(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveFttbMacSwitchingFlows() {
+        component.enableDhcpV4 = true;
+        component.cpStatus = Mockito.mock(Map.class);
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        uniTagInformationList.add(dpuMgmtUti);
+        uniTagInformationList.add(ancpUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                        false, si);
+
+
+        ServiceKey sk1 = new ServiceKey(new AccessDevicePort(removedSub.port), dpuMgmtUti);
+        ServiceKey sk2 = new ServiceKey(new AccessDevicePort(removedSub.port), ancpUti);
+
+        component.cpStatus = component.storageService.
+                <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
+        OltPortStatus cp1Status = new OltPortStatus(NONE, NONE, PENDING_ADD, NONE, NONE);
+        OltPortStatus cp2Status = new OltPortStatus(NONE, NONE, PENDING_ADD, NONE, NONE);
+        component.cpStatus.put(sk1, cp1Status);
+        component.cpStatus.put(sk2, cp2Status);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        MacAddress mac = MacAddress.valueOf("0A:00:27:00:00:09");
+        Host host = Mockito.mock(Host.class);
+        doReturn(mac).when(host).mac();
+        doReturn(dpuMgmtUti.getPonSTag()).when(host).vlan();
+
+        doReturn(new HashSet<>(Arrays.asList(host))).when(component.hostService)
+                .getConnectedHosts(new ConnectPoint(removedSub.device.id(), removedSub.port.number()));
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(removedSub.port.number())
+                                .matchVlanId(uti.getPonCTag())
+                                .matchVlanPcp((byte) uti.getUsPonCTagPriority())
+                                .matchEthSrc(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setVlanPcp((byte) uti.getUsPonSTagPriority())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchVlanId(uti.getPonSTag())
+                                .matchEthDst(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(removedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), removedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .remove();
+
+                component.handleSubscriberDataFlows(removedSub.device, removedSub.port,
+                        OltFlowService.FlowOperation.REMOVE, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(removedSub.device.id()), eq(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testHandleFttbSubscriberFlowsAdd() {
+        component.enableDhcpV4 = true;
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        uniTagInformationList.add(fttbSubscriberUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(addedSub.port.number())
+                                .matchVlanId(uti.getPonCTag());
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchMetadata(uti.getPonSTag().toShort())
+                                .matchVlanId(uti.getPonSTag());
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(addedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), addedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .add();
+
+                component.handleSubscriberDataFlows(addedSub.device, addedSub.port,
+                        OltFlowService.FlowOperation.ADD, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(addedSub.device.id()), eq(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveFttbSubscriberFlows() {
+        component.enableDhcpV4 = true;
+
+        OltPortStatus oltPortStatus1 = new OltPortStatus(null, null, ADDED,
+                null, null);
+        // Mocking the get call, to mark the SubscriberKey as already added.
+        component.cpStatus = Mockito.mock(Map.class);
+        when(component.cpStatus.get(Mockito.any())).thenReturn(oltPortStatus1);
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        uniTagInformationList.add(fttbSubscriberUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(removedSub.port.number())
+                                .matchVlanId(uti.getPonCTag());
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchMetadata(uti.getPonSTag().toShort())
+                                .matchVlanId(uti.getPonSTag());
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(removedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), removedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .remove();
+
+                component.handleSubscriberDataFlows(removedSub.device, removedSub.port,
+                        OltFlowService.FlowOperation.REMOVE, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(removedSub.device.id()), eq(expected));
+            }
+        }
+    }
 }
\ No newline at end of file