[VOL-4577] : Update ONOS olt-app for adding FTTB DPU Management, ANCP traffic & trap rules

Change-Id: Ibb8aad6e68e8bd3b5f5824f0b04f4c5bc2f84a9e
diff --git a/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
index bbf1ba7..493680b 100644
--- a/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
+++ b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
@@ -34,6 +34,7 @@
 import org.onlab.util.KryoNamespace;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.net.Annotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.behaviour.NextGroup;
@@ -80,6 +81,7 @@
 import org.onosproject.net.group.GroupService;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.StorageService;
+import org.opencord.olt.impl.fttb.FttbUtils;
 import org.slf4j.Logger;
 
 import java.util.Arrays;
@@ -96,10 +98,17 @@
 import static org.onosproject.core.CoreService.CORE_APP_NAME;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DIRECTION;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DOWNSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_UPSTREAM;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_DPU_ANCP_TRAFFIC;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_DPU_MGMT_TRAFFIC;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_NAME;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_SUBSCRIBER_TRAFFIC;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -335,6 +344,12 @@
             return;
         }
 
+        if (FttbUtils.isFttbRule(fwd)) {
+            log.debug("Processing FTTB rule : {}", fwd);
+            processFttbRules(fwd);
+            return;
+        }
+
         TrafficTreatment treatment = fwd.treatment();
 
         List<Instruction> instructions = treatment.allInstructions();
@@ -1137,8 +1152,9 @@
         if (matchVlanId != null) {
             log.debug("Building selector with match VLAN, {}", matchVlanId);
             // in case of TT upstream the packet comes tagged and the vlan is swapped.
+            Criterion vlanPcp = filterForCriterion(filter.conditions(), Criterion.Type.VLAN_PCP);
             selector = buildSelector(filter.key(), ethType, ipProto, udpSrcPort,
-                                     udpDstPort, matchVlanId);
+                                     udpDstPort, matchVlanId, vlanPcp);
             treatment = buildTreatment(output, meter, writeMetadata,
                                        vlanIdInstruction, vlanPcpInstruction);
         } else {
@@ -1485,4 +1501,189 @@
 
         applyRules(fwd, inner, outer);
     }
+
+    private void processFttbRules(ForwardingObjective fwd) {
+        Annotations annotations = fwd.annotations();
+        String direction = annotations.value(FTTB_FLOW_DIRECTION);
+        String serviceName = annotations.value(FTTB_SERVICE_NAME);
+
+        if (direction == null) {
+            log.error("Flow direction not found for Fttb rule {} ", fwd);
+            return;
+        }
+
+        switch (direction) {
+            case FTTB_FLOW_UPSTREAM:
+                processUpstreamFttbRules(fwd, serviceName);
+                break;
+            case FTTB_FLOW_DOWNSTREAM:
+                processDownstreamFttbRules(fwd, serviceName);
+                break;
+            default:
+                log.error("Invalid flow direction {}, for {} ", direction, fwd);
+        }
+    }
+
+    private void processUpstreamFttbRules(ForwardingObjective fwd, String serviceName) {
+        TrafficSelector selector = fwd.selector();
+        TrafficTreatment treatment =  fwd.treatment();
+
+        // Selectors
+        Criterion inPortCriterion = selector.getCriterion(Criterion.Type.IN_PORT);
+        Criterion cVlanVidCriterion = selector.getCriterion(Criterion.Type.VLAN_VID);
+        Criterion cTagPriority = selector.getCriterion(Criterion.Type.VLAN_PCP);
+        Criterion ethSrcCriterion = selector.getCriterion(Criterion.Type.ETH_SRC);
+
+        // Instructions
+        L2ModificationInstruction.ModVlanIdInstruction sVlanSetVid = null;
+        L2ModificationInstruction.ModVlanPcpInstruction sTagPrioritySet = null;
+
+        List<Instruction> instructions = treatment.allInstructions();
+        List<Instruction> vlanIdL2Instructions = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_ID,
+                instructions);
+        List<Instruction> vlanPcpL2Instructions = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
+                instructions);
+
+        if (!vlanIdL2Instructions.isEmpty()) {
+            sVlanSetVid = (L2ModificationInstruction.ModVlanIdInstruction) vlanIdL2Instructions.get(0);
+        }
+
+        if (!vlanPcpL2Instructions.isEmpty()) {
+            sTagPrioritySet = (L2ModificationInstruction.ModVlanPcpInstruction) vlanPcpL2Instructions.get(0);
+        }
+
+        Instruction output = fetchOutput(fwd, UPSTREAM);
+        Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
+        Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
+
+        TrafficSelector oltSelector, onuSelector;
+        TrafficTreatment oltTreatment, onuTreatment;
+
+        switch (serviceName) {
+            case FTTB_SERVICE_DPU_MGMT_TRAFFIC:
+            case FTTB_SERVICE_DPU_ANCP_TRAFFIC:
+                onuSelector = buildSelector(inPortCriterion, cVlanVidCriterion, cTagPriority);
+                onuTreatment = buildTreatment(sVlanSetVid, sTagPrioritySet, onuUsMeter,
+                        fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE));
+
+                oltSelector = buildSelector(inPortCriterion, ethSrcCriterion,
+                        Criteria.matchVlanId(sVlanSetVid.vlanId()));
+                oltTreatment = buildTreatment(oltUsMeter, fetchWriteMetadata(fwd), output);
+                break;
+
+            case FTTB_SERVICE_SUBSCRIBER_TRAFFIC:
+                onuSelector = buildSelector(inPortCriterion, cVlanVidCriterion);
+                onuTreatment = buildTreatment(onuUsMeter, fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE));
+
+                oltSelector = buildSelector(inPortCriterion, cVlanVidCriterion);
+                oltTreatment = buildTreatment(sVlanSetVid, oltUsMeter, fetchWriteMetadata(fwd), output);
+                break;
+            default:
+                log.error("Unknown service type for Fttb rule : {}", fwd);
+                return;
+        }
+
+        FlowRule.Builder onuBuilder = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(onuSelector)
+                .withTreatment(onuTreatment);
+
+        FlowRule.Builder oltBuilder = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(oltSelector)
+                .withTreatment(oltTreatment);
+
+        applyRules(fwd, onuBuilder, oltBuilder);
+    }
+
+    private void processDownstreamFttbRules(ForwardingObjective fwd, String serviceName) {
+        TrafficSelector selector = fwd.selector();
+        TrafficTreatment treatment = fwd.treatment();
+
+        // Selectors
+        Criterion inPortCriterion = selector.getCriterion(Criterion.Type.IN_PORT);
+        Criterion sVlanVidCriterion = selector.getCriterion(Criterion.Type.VLAN_VID);
+        Criterion sTagPriority = selector.getCriterion(Criterion.Type.VLAN_PCP);
+        Criterion ethDstCriterion = selector.getCriterion(Criterion.Type.ETH_DST);
+        Criterion metadataSelector = selector.getCriterion(Criterion.Type.METADATA);
+
+        // Instructions
+        L2ModificationInstruction.ModVlanIdInstruction cVlanSetVid = null;
+        L2ModificationInstruction.ModVlanPcpInstruction cTagPrioritySet = null;
+
+        List<Instruction> instructions = treatment.allInstructions();
+        List<Instruction> vlanIdL2Instructions = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_ID,
+                instructions);
+        List<Instruction> vlanPcpL2Instructions = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
+                instructions);
+
+        if (!vlanIdL2Instructions.isEmpty()) {
+            cVlanSetVid = (L2ModificationInstruction.ModVlanIdInstruction) vlanIdL2Instructions.get(0);
+        }
+
+        if (!vlanPcpL2Instructions.isEmpty()) {
+            cTagPrioritySet = (L2ModificationInstruction.ModVlanPcpInstruction) vlanPcpL2Instructions.get(0);
+        }
+
+        Instruction output = fetchOutput(fwd, DOWNSTREAM);
+        Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
+        Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
+
+        TrafficSelector oltSelector, onuSelector;
+        TrafficTreatment oltTreatment, onuTreatment;
+
+        switch (serviceName) {
+            case FTTB_SERVICE_DPU_MGMT_TRAFFIC:
+            case FTTB_SERVICE_DPU_ANCP_TRAFFIC:
+                oltSelector = buildSelector(inPortCriterion, ethDstCriterion,
+                        sVlanVidCriterion);
+                oltTreatment = buildTreatment(oltDsMeter, fetchWriteMetadata(fwd),
+                        Instructions.transition(QQ_TABLE));
+
+                onuSelector = buildSelector(inPortCriterion, sVlanVidCriterion, sTagPriority, ethDstCriterion);
+                onuTreatment = buildTreatment(cVlanSetVid, cTagPrioritySet, onuUsMeter,
+                        fetchWriteMetadata(fwd), output);
+                break;
+
+            case FTTB_SERVICE_SUBSCRIBER_TRAFFIC:
+                oltSelector = buildSelector(inPortCriterion, sVlanVidCriterion);
+                oltTreatment = buildTreatment(cVlanSetVid, oltDsMeter, fetchWriteMetadata(fwd),
+                        Instructions.transition(QQ_TABLE));
+
+                onuSelector = buildSelector(inPortCriterion, Criteria.matchVlanId(cVlanSetVid.vlanId()),
+                        metadataSelector);
+                onuTreatment = buildTreatment(onuUsMeter, fetchWriteMetadata(fwd), output);
+                break;
+
+            default:
+                log.error("Unknown service type for Fttb rule : {}", fwd);
+                return;
+        }
+
+        FlowRule.Builder oltBuilder = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(oltSelector)
+                .withTreatment(oltTreatment);
+
+        FlowRule.Builder onuBuilder = DefaultFlowRule.builder()
+                .fromApp(fwd.appId())
+                .forDevice(deviceId)
+                .forTable(QQ_TABLE)
+                .makePermanent()
+                .withPriority(fwd.priority())
+                .withSelector(onuSelector)
+                .withTreatment(onuTreatment);
+
+        applyRules(fwd, onuBuilder, oltBuilder);
+    }
 }
\ No newline at end of file
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 1f103d8..607b146 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -631,13 +631,13 @@
             return null;
         }
 
-        UniTagInformation service = null;
-        for (UniTagInformation tagInfo : subInfo.uniTagList()) {
-            if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
-                    && tpId == tagInfo.getTechnologyProfileId()) {
-                service = tagInfo;
-                break;
-            }
+        UniTagInformation service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
+
+        if (service == null) {
+            // Try again after invalidating cache for the particular port name.
+            subsService.invalidateId(portName);
+            subInfo = subsService.get(portName);
+            service = OltUtils.getUniTagInformation(subInfo, innerVlan, outerVlan, tpId);
         }
 
         if (service == null) {
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 4d452e6..701450d 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -49,10 +49,6 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.Criterion;
-import org.onosproject.net.flow.criteria.EthTypeCriterion;
-import org.onosproject.net.flow.criteria.IPProtocolCriterion;
-import org.onosproject.net.flow.criteria.PortCriterion;
-import org.onosproject.net.flow.criteria.UdpPortCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
@@ -68,6 +64,7 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.opencord.olt.impl.fttb.FttbUtils;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -100,7 +97,6 @@
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
-import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
 import static org.opencord.olt.impl.OltUtils.completeFlowOpToString;
 import static org.opencord.olt.impl.OltUtils.flowOpToString;
 import static org.opencord.olt.impl.OltUtils.getPortName;
@@ -123,10 +119,17 @@
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_ON_NNI_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DIRECTION;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DOWNSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_UPSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_DPU_ANCP_TRAFFIC;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_DPU_MGMT_TRAFFIC;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_NAME;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
 import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL;
 import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL_DEFAULT;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_SUBSCRIBER_TRAFFIC;
 
 @Component(immediate = true, property = {
         ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
@@ -182,7 +185,7 @@
     protected ApplicationId appId;
     private static final Integer MAX_PRIORITY = 10000;
     private static final Integer MIN_PRIORITY = 1000;
-    protected static final short EAPOL_DEFAULT_VLAN = 4091;
+    public static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final int NONE_TP_ID = -1;
     private static final String V4 = "V4";
     private static final String V6 = "V6";
@@ -382,7 +385,6 @@
                         "defaultTechProfileId:{}," + "waitForRemoval:{}",
                 enableDhcpOnNni, enableDhcpV4, enableDhcpV6, enableIgmpOnNni, enableEapol,
                 enablePppoeOnNni, enablePppoe, defaultTechProfileId, waitForRemoval);
-
     }
 
     @Override
@@ -477,7 +479,6 @@
     @Override
     public boolean handleBasicPortFlows(DiscoveredSubscriber sub, String bandwidthProfileId,
                                         String oltBandwidthProfileId) {
-
         // we only need to something if EAPOL is enabled
         if (!enableEapol) {
             log.debug("Eapol is disabled for {}", portWithName(sub.port));
@@ -492,7 +493,6 @@
             log.error("Unknown Status {} on DiscoveredSubscriber {}", sub.status, sub);
             return false;
         }
-
     }
 
     private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
@@ -554,7 +554,6 @@
                     log.warn("continuing provisioning on {}", portWithName(sub.port));
                 }
             }
-
         }
 
         // NOTE createMeters will return if the meters are not installed
@@ -859,7 +858,7 @@
 
         TrafficTreatment treatment = treatmentBuilder
                 .meter(meterId)
-                .writeMetadata(createTechProfValueForWriteMetadata(
+                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(
                         vlanId,
                         techProfileId, oltMeterId), 0)
                 .setOutput(PortNumber.CONTROLLER)
@@ -972,21 +971,6 @@
         return defaultTechProfileId;
     }
 
-    protected Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
-        Long writeMetadata;
-
-        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
-            writeMetadata = (long) techProfileId << 32;
-        } else {
-            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
-        }
-        if (upstreamOltMeterId == null) {
-            return writeMetadata;
-        } else {
-            return writeMetadata | upstreamOltMeterId.id();
-        }
-    }
-
     private void processLldpFilteringObjective(DeviceId deviceId, Port port, FlowOperation action) {
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
 
@@ -1128,16 +1112,28 @@
                     .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamBandwidthProfile());
             MeterId oltUsMeterId = oltMeterService
                     .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamOltBandwidthProfile());
-            processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
-                    oltUsMeterId, uti);
+
+            if (FttbUtils.isFttbService(uti)) {
+                processFttbUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action,
+                        usMeterId, oltUsMeterId, uti, si);
+            } else {
+                processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
+                        oltUsMeterId, uti);
+            }
 
             // downstream flows
             MeterId dsMeterId = oltMeterService
                     .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamBandwidthProfile());
             MeterId oltDsMeterId = oltMeterService
                     .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamOltBandwidthProfile());
-            processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
-                    oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+
+            if (FttbUtils.isFttbService(uti)) {
+                processFttbDownstreamDataFilteringObjects(device.id(), port, nniPort.get(),
+                        action, dsMeterId, oltDsMeterId, uti, si);
+            } else {
+                processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
+                        oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+            }
         });
     }
 
@@ -1148,6 +1144,7 @@
         ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
         log.debug("{} DHCP filtering objectives on {}", flowOpToString(action), sk);
 
+        String serviceName = uti.getServiceName();
 
         OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
                 OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
@@ -1160,12 +1157,6 @@
             treatmentBuilder.meter(meterId);
         }
 
-        if (uti.getTechnologyProfileId() != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(
-                    createTechProfValueForWriteMetadata(uti.getUniTagMatch(),
-                            uti.getTechnologyProfileId(), oltMeterId), 0);
-        }
-
         FilteringObjective.Builder dhcpBuilder = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
                 .withKey(Criteria.matchInPort(port.number()))
                 .addCondition(Criteria.matchEthType(ethType))
@@ -1175,17 +1166,40 @@
                 .fromApp(appId)
                 .withPriority(MAX_PRIORITY);
 
+        VlanId cVlan = uti.getUniTagMatch();
+
         //VLAN changes and PCP matching need to happen only in the upstream directions
         if (direction == FlowDirection.UPSTREAM) {
-            treatmentBuilder.setVlanId(uti.getPonCTag());
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
-                dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
+            if (serviceName != null && serviceName.equals(FTTB_SERVICE_DPU_MGMT_TRAFFIC)) {
+                cVlan = VlanId.NONE;
+                FttbUtils.addUpstreamDhcpCondition(dhcpBuilder, uti);
+                FttbUtils.addUpstreamDhcpTreatment(treatmentBuilder, uti);
+            } else {
+                treatmentBuilder.setVlanId(uti.getPonCTag());
+                if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
+                    dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
+                }
+                if (uti.getUsPonCTagPriority() != -1) {
+                    treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
+                }
             }
-            if (uti.getUsPonCTagPriority() != -1) {
-                treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
+        } else if (direction == FlowDirection.DOWNSTREAM) {
+            // Down stream DHCP vid to be matched if OLT Sadis info contains Vlan id in nniDhcpTrapVid.
+            Device device = deviceService.getDevice(deviceId);
+            SubscriberAndDeviceInformation oltSub = subsService.get(device.serialNumber());
+            VlanId nniDhcpTrapVid = oltSub.nniDhcpTrapVid();
+
+            if (nniDhcpTrapVid != null && !VlanId.vlanId(VlanId.NO_VID).equals(nniDhcpTrapVid)) {
+                dhcpBuilder.addCondition(Criteria.matchVlanId(nniDhcpTrapVid));
             }
         }
 
+        if (uti.getTechnologyProfileId() != NONE_TP_ID) {
+            treatmentBuilder.writeMetadata(
+                    OltFlowServiceUtils.createTechProfValueForWriteMetadata(cVlan,
+                            uti.getTechnologyProfileId(), oltMeterId), 0);
+        }
+
         dhcpBuilder.withMeta(treatmentBuilder
                 .setOutput(PortNumber.CONTROLLER).build());
 
@@ -1221,7 +1235,7 @@
         if (direction == FlowDirection.UPSTREAM) {
 
             if (techProfileId != NONE_TP_ID) {
-                treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(null,
+                treatmentBuilder.writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(null,
                         techProfileId, oltMeterId), 0);
             }
 
@@ -1282,7 +1296,8 @@
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(cTag, techProfileId, oltMeterId), 0);
+            treatmentBuilder.writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(cTag, techProfileId,
+                    oltMeterId), 0);
         }
 
         DefaultFilteringObjective.Builder pppoedBuilder = ((action == FlowOperation.ADD)
@@ -1353,7 +1368,7 @@
         }
 
         treatmentBuilder.setOutput(nniPort.number())
-                .writeMetadata(createMetadata(uti.getPonCTag(),
+                .writeMetadata(OltFlowServiceUtils.createMetadata(uti.getPonCTag(),
                         uti.getTechnologyProfileId(), nniPort.number()), 0L);
 
         DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -1371,33 +1386,8 @@
                 treatmentBuilder.build(), MIN_PRIORITY,
                 annotationBuilder.build());
 
-        ObjectiveContext context = new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.info("{} Upstream Data plane filter for {}.",
-                        completeFlowOpToString(action), sk);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                log.error("Upstream Data plane filter for {} failed {} because {}.",
-                        sk, action, error);
-                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null, null);
-            }
-        };
-
-        ForwardingObjective flow = null;
-        if (action == FlowOperation.ADD) {
-            flow = flowBuilder.add(context);
-        } else if (action == FlowOperation.REMOVE) {
-            flow = flowBuilder.remove(context);
-        } else {
-            log.error("Flow action not supported: {}", action);
-        }
-
-        if (flow != null) {
-            flowObjectiveService.forward(deviceId, flow);
-        }
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.UPSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
     }
 
     private void processDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
@@ -1429,7 +1419,7 @@
                 .popVlan()
                 .setOutput(port.number());
 
-        treatmentBuilder.writeMetadata(createMetadata(uti.getPonCTag(),
+        treatmentBuilder.writeMetadata(OltFlowServiceUtils.createMetadata(uti.getPonCTag(),
                 uti.getTechnologyProfileId(),
                 port.number()), 0);
 
@@ -1462,36 +1452,8 @@
         DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
                 treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
 
-        ObjectiveContext context = new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.info("{} Downstream Data plane filter for {}.",
-                        completeFlowOpToString(action), sk);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                log.info("Downstream Data plane filter for {} failed {} because {}.",
-                         sk, action, error);
-                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null, null);
-            }
-        };
-
-        ForwardingObjective flow = null;
-        if (action == FlowOperation.ADD) {
-            flow = flowBuilder.add(context);
-        } else if (action == FlowOperation.REMOVE) {
-            flow = flowBuilder.remove(context);
-        } else {
-            log.error("Flow action not supported: {}", action);
-        }
-
-        if (flow != null) {
-            if (log.isTraceEnabled()) {
-                log.trace("Forwarding rule {}", flow);
-            }
-            flowObjectiveService.forward(deviceId, flow);
-        }
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.DOWNSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
     }
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
@@ -1508,14 +1470,6 @@
                 .withTreatment(treatment);
     }
 
-    private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
-        if (techProfileId == NONE_TP_ID) {
-            techProfileId = DEFAULT_TP_ID_DEFAULT;
-        }
-
-        return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
-    }
-
     private boolean isMacLearningEnabled(SubscriberAndDeviceInformation si) {
         AtomicBoolean requiresMacLearning = new AtomicBoolean();
         requiresMacLearning.set(false);
@@ -1543,10 +1497,20 @@
 
         si.uniTagList().forEach(uniTagInfo -> {
             boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
-            boolean configureMac = isMacAddressValid(uniTagInfo);
+            boolean configureMac = OltFlowServiceUtils.isMacAddressValid(uniTagInfo);
             boolean discoveredMac = false;
+
+            final VlanId vlan;
+
+            if (FttbUtils.isFttbDpuOrAncpService(uniTagInfo)) {
+                // Using S tag, as C tag is replaced by Stag by ONU.
+                vlan = uniTagInfo.getPonSTag();
+            } else {
+                vlan = uniTagInfo.getPonCTag();
+            }
+
             Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
-                    .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+                    .stream().filter(host -> host.vlan().equals(vlan)).findFirst();
             if (optHost.isPresent() && optHost.get().mac() != null) {
                 discoveredMac = true;
             }
@@ -1561,7 +1525,7 @@
     }
 
     protected MacAddress getMacAddress(DeviceId deviceId, Port port, UniTagInformation uniTagInfo) {
-        boolean configuredMac = isMacAddressValid(uniTagInfo);
+        boolean configuredMac = OltFlowServiceUtils.isMacAddressValid(uniTagInfo);
         if (configuredMac) {
             return MacAddress.valueOf(uniTagInfo.getConfiguredMacAddress());
         } else if (uniTagInfo.getEnableMacLearning()) {
@@ -1574,12 +1538,6 @@
         return null;
     }
 
-    private boolean isMacAddressValid(UniTagInformation tagInformation) {
-        return tagInformation.getConfiguredMacAddress() != null &&
-                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
-                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
-    }
-
     protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
                                             OltFlowsStatus subscriberEapolStatus,
                                             OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus,
@@ -1661,7 +1619,7 @@
                     if (port == null) {
                         log.warn("Port is gone in ONOS, " +
                                          "manually creating it {}", event.subject());
-                        PortNumber inPort = getPortNumberFromFlowRule(event.subject());
+                        PortNumber inPort = OltFlowServiceUtils.getPortNumberFromFlowRule(event.subject());
                         cpStatusReadLock.lock();
                         Optional<ServiceKey> keyWithPort = cpStatus.keySet()
                                 .stream().filter(key -> key.getPort().connectPoint()
@@ -1697,15 +1655,15 @@
         }
 
         protected void updateCpStatus(FlowRuleEvent.Type type, Port port, FlowRule flowRule) {
-            OltFlowsStatus status = flowRuleStatusToOltFlowStatus(type);
-            if (isDefaultEapolFlow(flowRule)) {
+            OltFlowsStatus status = OltFlowServiceUtils.flowRuleStatusToOltFlowStatus(type);
+            if (OltFlowServiceUtils.isDefaultEapolFlow(flowRule)) {
                 ServiceKey sk = new ServiceKey(new AccessDevicePort(port),
                         defaultEapolUniTag);
                 if (log.isTraceEnabled()) {
                     log.trace("update defaultEapolStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, status, null, null, null, null);
-            } else if (isSubscriberEapolFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isSubscriberEapolFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
@@ -1714,7 +1672,7 @@
                     log.trace("update subscriberEapolStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, null, status, null, status, null);
-            } else if (isDhcpFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isDhcpFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
@@ -1723,7 +1681,7 @@
                     log.trace("update dhcpStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, null, null, null, status, null);
-            } else if (isPppoeFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isPppoeFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
@@ -1732,8 +1690,8 @@
                     log.trace("update pppoeStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, null, null, null, null, status);
-            } else if (isDataFlow(flowRule)) {
-                PortNumber number = getPortNumberFromFlowRule(flowRule);
+            } else if (OltFlowServiceUtils.isDataFlow(flowRule)) {
+                PortNumber number = OltFlowServiceUtils.getPortNumberFromFlowRule(flowRule);
                 if (number == null) {
                     log.error("Can't capture the port number from flow {}", flowRule);
                     return;
@@ -1754,118 +1712,17 @@
             }
         }
 
-        private boolean isDefaultEapolFlow(FlowRule flowRule) {
-            EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
-            if (c == null) {
-                return false;
-            }
-            if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
-                AtomicBoolean isDefault = new AtomicBoolean(false);
-                flowRule.treatment().allInstructions().forEach(instruction -> {
-                    if (instruction.type() == L2MODIFICATION) {
-                        L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
-                        if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
-                            L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
-                                    (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
-                            if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
-                                isDefault.set(true);
-                                return;
-                            }
-                        }
-                    }
-                });
-                return isDefault.get();
-            }
-            return false;
-        }
 
-        /**
-         * Returns true if the flow is a DHCP flow.
-         * Matches both upstream and downstream flows.
-         *
-         * @param flowRule The FlowRule to evaluate
-         * @return boolean
-         */
-        private boolean isDhcpFlow(FlowRule flowRule) {
-            IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
-                    .getCriterion(Criterion.Type.IP_PROTO);
-            if (ipCriterion == null) {
-                return false;
-            }
-
-            UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
-
-            if (src == null) {
-                return false;
-            }
-            return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
-                    (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
-        }
-
-        private boolean isPppoeFlow(FlowRule flowRule) {
-            EthTypeCriterion ethTypeCriterion = (EthTypeCriterion) flowRule.selector()
-                    .getCriterion(Criterion.Type.ETH_TYPE);
-
-            if (ethTypeCriterion == null) {
-                return false;
-            }
-            return EthType.EtherType.PPPoED.ethType().equals(ethTypeCriterion.ethType());
-        }
-
-        private boolean isDataFlow(FlowRule flowRule) {
-            // we consider subscriber flows the one that matches on VLAN_VID
-            // method is valid only because it's the last check after EAPOL and DHCP.
-            // this matches mcast flows as well, if we want to avoid that we can
-            // filter out the elements that have groups in the treatment or
-            // mcastIp in the selector
-            // IPV4_DST:224.0.0.22/32
-            // treatment=[immediate=[GROUP:0x1]]
-
-            return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
-        }
-
-        private boolean isSubscriberEapolFlow(FlowRule flowRule) {
-            EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
-            if (c == null) {
-                return false;
-            }
-            if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
-                AtomicBoolean isSubscriber = new AtomicBoolean(false);
-                flowRule.treatment().allInstructions().forEach(instruction -> {
-                    if (instruction.type() == L2MODIFICATION) {
-                        L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
-                        if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
-                            L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
-                                    (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
-                            if (!vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
-                                isSubscriber.set(true);
-                                return;
-                            }
-                        }
-                    }
-                });
-                return isSubscriber.get();
-            }
-            return false;
-        }
 
         private Port getCpFromFlowRule(FlowRule flowRule) {
             DeviceId deviceId = flowRule.deviceId();
-            PortNumber inPort = getPortNumberFromFlowRule(flowRule);
+            PortNumber inPort = OltFlowServiceUtils.getPortNumberFromFlowRule(flowRule);
             if (inPort != null) {
                 return deviceService.getPort(deviceId, inPort);
             }
             return null;
         }
 
-        private PortNumber getPortNumberFromFlowRule(FlowRule flowRule) {
-            PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
-            if (inPort != null) {
-                return inPort.port();
-            }
-            return null;
-        }
-
         private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule, Port flowPort) {
             SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
 
@@ -1881,12 +1738,12 @@
 
             Optional<UniTagInformation> found = Optional.empty();
             VlanId flowVlan = null;
-            if (isDhcpFlow(flowRule)) {
+            if (OltFlowServiceUtils.isDhcpFlow(flowRule)) {
                 // we need to make a special case for DHCP as in the ATT workflow DHCP flows don't match on tags
                 L2ModificationInstruction.ModVlanIdInstruction instruction =
                         (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(1);
                 flowVlan = instruction.vlanId();
-            } else if (isSubscriberEapolFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isSubscriberEapolFlow(flowRule)) {
                 // we need to make a special case for EAPOL as in the ATT workflow EAPOL flows don't match on tags
                 L2ModificationInstruction.ModVlanIdInstruction instruction =
                         (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(2);
@@ -1917,21 +1774,6 @@
             return found.isPresent() ? new ServiceKey(new AccessDevicePort(flowPort), found.get()) : null;
 
         }
-
-        private OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
-            switch (type) {
-                case RULE_ADD_REQUESTED:
-                    return OltFlowsStatus.PENDING_ADD;
-                case RULE_ADDED:
-                    return OltFlowsStatus.ADDED;
-                case RULE_REMOVE_REQUESTED:
-                    return OltFlowsStatus.PENDING_REMOVE;
-                case RULE_REMOVED:
-                    return OltFlowsStatus.REMOVED;
-                default:
-                    return OltFlowsStatus.NONE;
-            }
-        }
     }
 
     protected void bindSadisService(SadisService service) {
@@ -1945,4 +1787,170 @@
         this.bpService = null;
         log.info("Sadis service is unloaded");
     }
-}
+
+    private void processFttbUpstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                         FlowOperation action,
+                                                         MeterId upstreamMeterId,
+                                                         MeterId upstreamOltMeterId,
+                                                         UniTagInformation uti,
+                                                         SubscriberAndDeviceInformation si) {
+        String serviceName = uti.getServiceName();
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+        TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
+                .matchInPort(port.number())
+                .matchVlanId(uti.getPonCTag());
+
+        if (uti.getUsPonCTagPriority() != -1) {
+            selectorBuilder.matchVlanPcp((byte) uti.getUsPonCTagPriority());
+        }
+
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+        treatmentBuilder.setVlanId(uti.getPonSTag());
+        if (uti.getUsPonSTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
+        }
+
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+        annotationBuilder.set(FTTB_SERVICE_NAME, serviceName);
+        annotationBuilder.set(FTTB_FLOW_DIRECTION, FTTB_FLOW_UPSTREAM);
+
+        if (upstreamMeterId != null) {
+            treatmentBuilder.meter(upstreamMeterId);
+            annotationBuilder.set(UPSTREAM_ONU, upstreamMeterId.toString());
+        }
+        if (upstreamOltMeterId != null) {
+            treatmentBuilder.meter(upstreamOltMeterId);
+            annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
+        }
+
+        VlanId innerVlan = null;
+
+        if (serviceName.equals(FTTB_SERVICE_DPU_MGMT_TRAFFIC) || serviceName.equals(FTTB_SERVICE_DPU_ANCP_TRAFFIC)) {
+            MacAddress mac = FttbUtils.getMacAddressFromDhcpEnabledUti(
+                    hostService, si, deviceId, port);
+
+            if (mac == null) {
+                log.error("Mac address not found port:{}, vlan:{}, service:{}",
+                        port, uti.getPonSTag(), serviceName);
+                return;
+            }
+
+            selectorBuilder.matchEthSrc(mac);
+            innerVlan = VlanId.NONE;
+
+        } else if (serviceName.equals(FTTB_SERVICE_SUBSCRIBER_TRAFFIC)) {
+            innerVlan = VlanId.ANY;
+        }
+
+        treatmentBuilder.setOutput(nniPort.number()).writeMetadata(OltFlowServiceUtils.createMetadata(innerVlan,
+                uti.getTechnologyProfileId(), nniPort.number()), 0L);
+
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+                treatmentBuilder.build(), MIN_PRIORITY,
+                annotationBuilder.build());
+
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.UPSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
+    }
+
+    private void processFttbDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                           FlowOperation action,
+                                                           MeterId downstreamMeterId,
+                                                           MeterId downstreamOltMeterId,
+                                                           UniTagInformation uti, SubscriberAndDeviceInformation si) {
+        String serviceName = uti.getServiceName();
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+        //subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
+        TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
+                .matchVlanId(uti.getPonSTag())
+                .matchInPort(nniPort.number());
+
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
+                .setVlanId(uti.getPonCTag())
+                .setOutput(port.number());
+
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+        annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+        annotationBuilder.set(FTTB_FLOW_DIRECTION, FTTB_FLOW_DOWNSTREAM);
+
+        if (downstreamMeterId != null) {
+            treatmentBuilder.meter(downstreamMeterId);
+            annotationBuilder.set(DOWNSTREAM_ONU, downstreamMeterId.toString());
+        }
+
+        if (downstreamOltMeterId != null) {
+            treatmentBuilder.meter(downstreamOltMeterId);
+            annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
+        }
+
+        VlanId innerVlan = null;
+
+        if (serviceName.equals(FTTB_SERVICE_DPU_MGMT_TRAFFIC) || serviceName.equals(FTTB_SERVICE_DPU_ANCP_TRAFFIC)) {
+            MacAddress mac = FttbUtils.getMacAddressFromDhcpEnabledUti(
+                    hostService, si, deviceId, port);
+
+            if (mac == null) {
+                log.error("Mac address not found port:{}, vlan:{}, service:{}",
+                        port, uti.getPonSTag(), serviceName);
+                return;
+            }
+
+            selectorBuilder.matchEthDst(mac);
+            innerVlan = VlanId.NONE;
+
+        } else if (serviceName.equals(FTTB_SERVICE_SUBSCRIBER_TRAFFIC)) {
+            innerVlan = VlanId.ANY;
+            selectorBuilder.matchMetadata(uti.getPonSTag().toShort());
+        }
+
+        treatmentBuilder.writeMetadata(OltFlowServiceUtils.createMetadata(innerVlan,
+                uti.getTechnologyProfileId(),
+                port.number()), 0);
+
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+                treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
+
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.DOWNSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
+    }
+
+    private ObjectiveContext getSubscriberFlowBuilderContext(ServiceKey sk, FlowOperation action,
+                                                             FlowDirection flowDirection) {
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.info("{} {} Data plane filter for {}.",
+                        completeFlowOpToString(action), flowDirection, sk);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.info("{} Data plane filter for {} failed {} because {}.",
+                        flowDirection, sk, action, error);
+                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null, null);
+            }
+        };
+
+        return context;
+    }
+
+    private void processForwardingRule(FlowOperation action, DefaultForwardingObjective.Builder flowBuilder,
+                                       ObjectiveContext context, DeviceId deviceId) {
+        ForwardingObjective flow = null;
+        if (action == FlowOperation.ADD) {
+            flow = flowBuilder.add(context);
+        } else if (action == FlowOperation.REMOVE) {
+            flow = flowBuilder.remove(context);
+        } else {
+            log.error("Flow action not supported: {}", action);
+        }
+
+        if (flow != null) {
+            if (log.isTraceEnabled()) {
+                log.trace("Forwarding rule {}", flow);
+            }
+            flowObjectiveService.forward(deviceId, flow);
+        }
+    }
+}
\ No newline at end of file
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceUtils.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceUtils.java
new file mode 100644
index 0000000..7099b1b
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowServiceUtils.java
@@ -0,0 +1,259 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import org.onlab.packet.EthType;
+import org.onlab.packet.IPv4;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.PortNumber;
+import org.onosproject.net.flow.FlowRule;
+import org.onosproject.net.flow.FlowRuleEvent;
+import org.onosproject.net.flow.criteria.Criterion;
+import org.onosproject.net.flow.criteria.EthTypeCriterion;
+import org.onosproject.net.flow.criteria.IPProtocolCriterion;
+import org.onosproject.net.flow.criteria.PortCriterion;
+import org.onosproject.net.flow.criteria.UdpPortCriterion;
+import org.onosproject.net.flow.instructions.L2ModificationInstruction;
+import org.onosproject.net.meter.MeterId;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
+import static org.opencord.olt.impl.OltFlowService.EAPOL_DEFAULT_VLAN;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
+
+/**
+ * Utility class for Flow service utility methods.
+ */
+public final class OltFlowServiceUtils {
+
+    public static final int NONE_TP_ID = -1;
+
+    private OltFlowServiceUtils() {
+    }
+
+    /**
+     * Constructs and returns the metadata from cVlan, techProfileId and upstreamOltMeterId.
+     *
+     * @param cVlan                 the customer vlan
+     * @param techProfileId         the technology profile
+     * @param upstreamOltMeterId    the upstream olt meter id
+     * @return Metadata
+     */
+    public static Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId,
+                                                           MeterId upstreamOltMeterId) {
+        Long writeMetadata;
+
+        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
+            writeMetadata = (long) techProfileId << 32;
+        } else {
+            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+        }
+        if (upstreamOltMeterId == null) {
+            return writeMetadata;
+        } else {
+            return writeMetadata | upstreamOltMeterId.id();
+        }
+    }
+
+    /**
+     * Converts FlowRuleEvent.Type to OltFlowService.OltFlowsStatus.
+     *
+     * @param type FlowRuleEvent type
+     * @return OltFlowService.OltFlowsStatus
+     */
+    public static OltFlowService.OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
+        switch (type) {
+            case RULE_ADD_REQUESTED:
+                return OltFlowService.OltFlowsStatus.PENDING_ADD;
+            case RULE_ADDED:
+                return OltFlowService.OltFlowsStatus.ADDED;
+            case RULE_REMOVE_REQUESTED:
+                return OltFlowService.OltFlowsStatus.PENDING_REMOVE;
+            case RULE_REMOVED:
+                return OltFlowService.OltFlowsStatus.REMOVED;
+            default:
+                return OltFlowService.OltFlowsStatus.NONE;
+        }
+    }
+
+    /**
+     * Checks if the configured Mac address is valid for a UniTagInformation.
+     *
+     * @param tagInformation UniTagInformation
+     * @return true if the mac address is valid
+     */
+    public static boolean isMacAddressValid(UniTagInformation tagInformation) {
+        return tagInformation.getConfiguredMacAddress() != null &&
+                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
+                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+    }
+
+    /**
+     * Returns true if the flow is a DHCP flow.
+     * Matches both upstream and downstream flows.
+     *
+     * @param flowRule The FlowRule to evaluate
+     * @return boolean
+     */
+    public static boolean isDhcpFlow(FlowRule flowRule) {
+        IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
+                .getCriterion(Criterion.Type.IP_PROTO);
+        if (ipCriterion == null) {
+            return false;
+        }
+
+        UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
+
+        if (src == null) {
+            return false;
+        }
+        return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
+                (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
+    }
+
+    /**
+     * Returns true if the flow is a Pppoe flow.
+     *
+     * @param flowRule The FlowRule to evaluate
+     * @return boolean
+     */
+    public static boolean isPppoeFlow(FlowRule flowRule) {
+        EthTypeCriterion ethTypeCriterion = (EthTypeCriterion) flowRule.selector()
+                .getCriterion(Criterion.Type.ETH_TYPE);
+
+        if (ethTypeCriterion == null) {
+            return false;
+        }
+        return EthType.EtherType.PPPoED.ethType().equals(ethTypeCriterion.ethType());
+    }
+
+    /**
+     * Return true if the flow is a Data flow.
+     * @param flowRule The FlowRule to evaluate
+     * @return boolean
+     */
+    public static boolean isDataFlow(FlowRule flowRule) {
+        // we consider subscriber flows the one that matches on VLAN_VID
+        // method is valid only because it's the last check after EAPOL and DHCP.
+        // this matches mcast flows as well, if we want to avoid that we can
+        // filter out the elements that have groups in the treatment or
+        // mcastIp in the selector
+        // IPV4_DST:224.0.0.22/32
+        // treatment=[immediate=[GROUP:0x1]]
+
+        return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
+    }
+
+    /**
+     * Extracts and returns inPort selector from the FlowRule.
+     *
+     * @param flowRule The FlowRule to evaluate
+     * @return PortNumber
+     */
+    public static PortNumber getPortNumberFromFlowRule(FlowRule flowRule) {
+        PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
+        if (inPort != null) {
+            return inPort.port();
+        }
+        return null;
+    }
+
+    /**
+     * Constructs and returns the metadata from innerVlan, techProfileId and egressPort.
+     *
+     * @param innerVlan         inner vlan tag
+     * @param techProfileId     technology profile
+     * @param egressPort        outport
+     * @return Metadata
+     */
+    public static Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
+        if (techProfileId == NONE_TP_ID) {
+            techProfileId = DEFAULT_TP_ID_DEFAULT;
+        }
+
+        Long writeMetadata = (long) techProfileId << 32 | egressPort.toLong();
+
+        if (innerVlan != null && !VlanId.NONE.equals(innerVlan)) {
+            writeMetadata |= (long) (innerVlan.id()) << 48;
+        }
+
+        return writeMetadata;
+    }
+
+    /***
+     * Checks if the FlowRule is default eapol.
+     * @param flowRule FlowRule to check.
+     * @return true if FlowRule is default eapol.
+     */
+    public static boolean isDefaultEapolFlow(FlowRule flowRule) {
+        EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
+        if (c == null) {
+            return false;
+        }
+        if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+            AtomicBoolean isDefault = new AtomicBoolean(false);
+            flowRule.treatment().allInstructions().forEach(instruction -> {
+                if (instruction.type() == L2MODIFICATION) {
+                    L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
+                    if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
+                        L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
+                                (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
+                        if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
+                            isDefault.set(true);
+                            return;
+                        }
+                    }
+                }
+            });
+            return isDefault.get();
+        }
+        return false;
+    }
+
+    /***
+     * Checks if the FlowRule is Subscriber eapol.
+     * @param flowRule Flow Rule
+     * @return true if FlowRule is Subscriber eapol.
+     */
+    public static boolean isSubscriberEapolFlow(FlowRule flowRule) {
+        EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
+        if (c == null) {
+            return false;
+        }
+        if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
+            AtomicBoolean isSubscriber = new AtomicBoolean(false);
+            flowRule.treatment().allInstructions().forEach(instruction -> {
+                if (instruction.type() == L2MODIFICATION) {
+                    L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
+                    if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
+                        L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
+                                (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
+                        if (!vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
+                            isSubscriber.set(true);
+                            return;
+                        }
+                    }
+                }
+            });
+            return isSubscriber.get();
+        }
+        return false;
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltUtils.java b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
index 6300ff9..869a01d 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltUtils.java
@@ -16,8 +16,11 @@
 
 package org.opencord.olt.impl;
 
+import org.onlab.packet.VlanId;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Port;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
 
 import static org.opencord.olt.impl.OltFlowService.FlowOperation.ADD;
 
@@ -57,4 +60,28 @@
     static String completeFlowOpToString(OltFlowService.FlowOperation op) {
         return op == ADD ? "Added" : "Removed";
     }
+
+    /**
+     * Search and return the matching UniTagInfomation from the list of UniTagInfomation in the
+     * SubscriberAndDeviceInformation.
+     * For the match : cvlan, svlan and tpId are used.
+     *
+     * @param subInfo       Subscriber information.
+     * @param innerVlan     cTag
+     * @param outerVlan     sTag
+     * @param tpId          Techprofile Id
+     * @return UniTagInformation
+     */
+    static UniTagInformation getUniTagInformation(SubscriberAndDeviceInformation subInfo, VlanId innerVlan,
+                                                  VlanId outerVlan, int tpId) {
+        UniTagInformation service = null;
+        for (UniTagInformation tagInfo : subInfo.uniTagList()) {
+            if (innerVlan.equals(tagInfo.getPonCTag()) && outerVlan.equals(tagInfo.getPonSTag())
+                    && tpId == tagInfo.getTechnologyProfileId()) {
+                service = tagInfo;
+                break;
+            }
+        }
+        return service;
+    }
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/fttb/FttbUtils.java b/impl/src/main/java/org/opencord/olt/impl/fttb/FttbUtils.java
new file mode 100644
index 0000000..19d98a5
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/fttb/FttbUtils.java
@@ -0,0 +1,185 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl.fttb;
+
+import org.onlab.packet.MacAddress;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.Port;
+import org.onosproject.net.flow.TrafficTreatment;
+import org.onosproject.net.flow.criteria.Criteria;
+import org.onosproject.net.flowobjective.FilteringObjective;
+import org.onosproject.net.flowobjective.ForwardingObjective;
+import org.onosproject.net.host.HostService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
+import org.opencord.sadis.UniTagInformation;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Optional;
+
+/**
+ * Utility class for holding FTTB constants and utility methods.
+ */
+public final class FttbUtils {
+
+    public static final String FTTB_FLOW_DIRECTION = "fttbFlowDirection";
+    public static final String FTTB_FLOW_UPSTREAM = "fttbFlowUpstream";
+    public static final String FTTB_FLOW_DOWNSTREAM = "fttbFlowDownstream";
+
+    public static final String FTTB_SERVICE_NAME = "fttbServiceName";
+    public static final String FTTB_SERVICE_DPU_MGMT_TRAFFIC = "DPU_MGMT_TRAFFIC";
+    public static final String FTTB_SERVICE_DPU_ANCP_TRAFFIC = "DPU_ANCP_TRAFFIC";
+    public static final String FTTB_SERVICE_SUBSCRIBER_TRAFFIC = "FTTB_SUBSCRIBER_TRAFFIC";
+
+    private static final Logger log = LoggerFactory.getLogger(FttbUtils.class);
+
+    private FttbUtils() {
+    }
+
+    /**
+     * Checks if the FlowObjective qualifies as FTTB rule.
+     *
+     * @param fwd ForwardingObjective rule.
+     * @return true if the fwd is FTTB rule.
+     */
+    public static boolean isFttbRule(ForwardingObjective fwd) {
+        String serviceName = fwd.annotations().value(FTTB_SERVICE_NAME);
+
+        if (serviceName == null) {
+            if (log.isTraceEnabled()) {
+                log.trace("Service name not found for : {} ", fwd);
+            }
+            return false;
+        }
+
+        return isFttbService(serviceName);
+    }
+
+    /**
+     * Checks if the UniTagInformation is a FTTB subscriber.
+     *
+     * @param uti The UniTagInformation to check for.
+     * @return true if the uti is FTTB subscriber.
+     */
+    public static boolean isFttbService(UniTagInformation uti) {
+        String serviceName = uti.getServiceName();
+
+        if (serviceName == null) {
+            log.warn("Could not find service name for {}", uti);
+            return false;
+        }
+
+        return isFttbService(serviceName);
+    }
+
+    /**
+     * Checks if the UniTagInformation is FTTB DPU or ANCP service.
+     *
+     * @param uti The UniTagInformation to check for.
+     * @return true if the uti is FTTB DPU or ANCP service.
+     */
+    public static boolean isFttbDpuOrAncpService(UniTagInformation uti) {
+        String serviceName = uti.getServiceName();
+
+        if (serviceName == null) {
+            log.trace("Could not find service name for {}", uti);
+            return false;
+        }
+
+        switch (serviceName) {
+            case FTTB_SERVICE_DPU_MGMT_TRAFFIC:
+            case FTTB_SERVICE_DPU_ANCP_TRAFFIC:
+                return true;
+            default:
+                return false;
+        }
+    }
+
+    /**
+     * Adds match conditions to FilteringObjective.Builder for FTTB.
+     * @param dhcpBuilder FilteringObjective.Builder
+     * @param uti UniTagInformation
+     */
+    public static void addUpstreamDhcpCondition(FilteringObjective.Builder dhcpBuilder,
+                                                UniTagInformation uti) {
+        dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getPonCTag()));
+        if (uti.getUsPonCTagPriority() != -1) {
+            dhcpBuilder.addCondition(Criteria.matchVlanPcp((byte) uti.getUsPonCTagPriority()));
+        }
+    }
+
+    /**
+     * Adds Instructions to TrafficTreatment.Builder for FTTB.
+     * @param treatmentBuilder TrafficTreatment.Builder
+     * @param uti UniTagInformation
+     */
+    public static void addUpstreamDhcpTreatment(TrafficTreatment.Builder treatmentBuilder, UniTagInformation uti) {
+        treatmentBuilder.setVlanId(uti.getPonSTag());
+
+        if (uti.getUsPonSTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
+        }
+    }
+
+    private static boolean isFttbService(String serviceName) {
+        if (serviceName == null) {
+            return false;
+        }
+
+        switch (serviceName) {
+            case FTTB_SERVICE_DPU_MGMT_TRAFFIC:
+            case FTTB_SERVICE_DPU_ANCP_TRAFFIC:
+            case FTTB_SERVICE_SUBSCRIBER_TRAFFIC:
+                return true;
+            default:
+                if (log.isTraceEnabled()) {
+                    log.trace("Service name {} is not one for FTTB", serviceName);
+                }
+                return false;
+        }
+    }
+
+    /**
+     * Returns mac address from the Dhcp Enabled UniTagInformation for a FTTB service.
+     *
+     * @param hostService   Service for interacting with the inventory of end-station hosts
+     * @param si            Information about a subscriber
+     * @param deviceId      Device id for mac lookup.
+     * @param port          Uni port on the device for mac lookup.
+     * @return Mac address of the subscriber.
+     */
+    public static MacAddress getMacAddressFromDhcpEnabledUti(HostService hostService,
+                                                             SubscriberAndDeviceInformation si,
+                                                             DeviceId deviceId,
+                                                             Port port) {
+        for (UniTagInformation uniTagInfo : si.uniTagList()) {
+            boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
+            if (isMacLearningEnabled) {
+                Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
+                        .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonSTag())).findFirst();
+                if (optHost.isPresent()) {
+                    if (optHost.get().mac() != null) {
+                        return optHost.get().mac();
+                    }
+                }
+            }
+        }
+        return null;
+    }
+}
diff --git a/impl/src/main/java/org/opencord/olt/impl/fttb/package-info.java b/impl/src/main/java/org/opencord/olt/impl/fttb/package-info.java
new file mode 100644
index 0000000..6a7d770
--- /dev/null
+++ b/impl/src/main/java/org/opencord/olt/impl/fttb/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Package for handling FTTB logic.
+ */
+package org.opencord.olt.impl.fttb;
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
index baa880d..878078a 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowServiceTest.java
@@ -55,12 +55,15 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
+import org.onosproject.net.flowobjective.DefaultForwardingObjective;
 import org.onosproject.net.flowobjective.FilteringObjective;
 import org.onosproject.net.flowobjective.FlowObjectiveService;
+import org.onosproject.net.flowobjective.ForwardingObjective;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.net.provider.ProviderId;
 import org.onosproject.store.service.TestStorageService;
+import org.opencord.olt.impl.fttb.FttbUtils;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
@@ -70,6 +73,8 @@
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
@@ -81,6 +86,7 @@
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 import static org.onosproject.net.AnnotationKeys.PORT_NAME;
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.ERROR;
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.NONE;
@@ -90,6 +96,15 @@
 import static org.opencord.olt.impl.OltFlowService.OltFlowsStatus.REMOVED;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DIRECTION;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DOWNSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_UPSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_NAME;
 
 public class OltFlowServiceTest extends OltTestHelpers {
 
@@ -112,6 +127,42 @@
     Port uniUpdateEnabled = new OltPort(testDevice, true, PortNumber.portNumber(16),
             DefaultAnnotations.builder().set(PORT_NAME, "uni-1").build());
 
+    private final UniTagInformation dpuMgmtUti = new UniTagInformation.Builder()
+            .setPonCTag(VlanId.vlanId("6"))
+            .setPonSTag(VlanId.vlanId("60"))
+            .setUsPonCTagPriority(1)
+            .setUsPonSTagPriority(2)
+            .setTechnologyProfileId(64)
+            .setUpstreamBandwidthProfile("usBp")
+            .setUpstreamOltBandwidthProfile("usOltBp")
+            .setServiceName(FttbUtils.FTTB_SERVICE_DPU_MGMT_TRAFFIC)
+            .setIsDhcpRequired(true)
+            .setEnableMacLearning(true)
+            .build();
+
+    private final UniTagInformation ancpUti = new UniTagInformation.Builder()
+            .setPonCTag(VlanId.vlanId("4"))
+            .setPonSTag(VlanId.vlanId("40"))
+            .setUsPonCTagPriority(3)
+            .setUsPonSTagPriority(4)
+            .setTechnologyProfileId(64)
+            .setUpstreamBandwidthProfile("usBp")
+            .setUpstreamOltBandwidthProfile("usOltBp")
+            .setServiceName(FttbUtils.FTTB_SERVICE_DPU_ANCP_TRAFFIC)
+            .setIsDhcpRequired(false)
+            .build();
+
+    private final UniTagInformation fttbSubscriberUti = new UniTagInformation.Builder()
+            .setPonCTag(VlanId.vlanId("8"))
+            .setPonSTag(VlanId.vlanId("80"))
+            .setTechnologyProfileId(64)
+            .setUpstreamBandwidthProfile("usBp")
+            .setUpstreamOltBandwidthProfile("usOltBp")
+            .setServiceName(FttbUtils.FTTB_SERVICE_SUBSCRIBER_TRAFFIC)
+            .setIsDhcpRequired(false)
+            .build();
+
+
     @Before
     public void setUp() {
         component = new OltFlowService();
@@ -130,6 +181,11 @@
         doReturn(Mockito.mock(BaseInformationService.class))
                 .when(component.sadisService).getSubscriberInfoService();
         doReturn(testAppId).when(component.coreService).registerApplication("org.opencord.olt");
+        doReturn(testDevice).when(component.deviceService).getDevice(testDevice.id());
+        when(component.sadisService.getSubscriberInfoService().get(testDevice.serialNumber())).
+                thenReturn(Mockito.mock(SubscriberAndDeviceInformation.class));
+        when(component.oltDeviceService.getNniPort(testDevice)).thenReturn(Optional.of(nniPort));
+
         component.activate(null);
         component.bindSadisService(component.sadisService);
 
@@ -384,7 +440,7 @@
                 .withMeta(
                         DefaultTrafficTreatment.builder()
                                 .meter(MeterId.meterId(1))
-                                .writeMetadata(component.createTechProfValueForWriteMetadata(
+                                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(
                                         VlanId.vlanId(eapolDefaultVlan),
                                         component.defaultTechProfileId, MeterId.meterId(1)), 0)
                                 .setOutput(PortNumber.CONTROLLER)
@@ -437,7 +493,7 @@
                 .withMeta(
                         DefaultTrafficTreatment.builder()
                                 .meter(MeterId.meterId(1))
-                                .writeMetadata(component.createTechProfValueForWriteMetadata(
+                                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(
                                         VlanId.vlanId(eapolDefaultVlan),
                                         component.defaultTechProfileId, MeterId.meterId(1)), 0)
                                 .setOutput(PortNumber.CONTROLLER)
@@ -576,6 +632,7 @@
 
     @Test
     public void testHandleNniFlowsPppoe() {
+        component.enableDhcpOnNni = false;
         component.enablePppoeOnNni = true;
         component.enablePppoe = true;
         component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
@@ -599,6 +656,7 @@
 
     @Test
     public void testRemoveNniFlowsPppoe() {
+        component.enableDhcpOnNni = false;
         component.enablePppoeOnNni = true;
         component.enablePppoe = true;
         component.handleNniFlows(testDevice, nniPortDisabled, OltFlowService.FlowOperation.REMOVE);
@@ -937,4 +995,580 @@
         OltPortStatus status = component.cpStatus.get(sk1);
         Assert.assertEquals(REMOVED, status.subscriberFlowsStatus);
     }
+
+    @Test
+    public void testHandleNniFlowsDhcpV4WithNniDhcpTrapVid() {
+        component.enableDhcpOnNni = true;
+        component.enableDhcpV4 = true;
+
+        SubscriberAndDeviceInformation testOltFttbSadis = new SubscriberAndDeviceInformation();
+        testOltFttbSadis.setNniDhcpTrapVid(VlanId.vlanId("60"));
+
+        when(component.sadisService.getSubscriberInfoService().get(testDevice.serialNumber())).
+                thenReturn(testOltFttbSadis);
+
+        component.handleNniFlows(testDevice, nniPort, OltFlowService.FlowOperation.ADD);
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(nniPort.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchVlanId(testOltFttbSadis.nniDhcpTrapVid()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(DefaultTrafficTreatment.builder().setOutput(PortNumber.CONTROLLER).build())
+                .add();
+
+        // invoked with the correct DHCP filtering objective
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(deviceId), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+        // invoked only twice, LLDP and DHCP
+        verify(component.flowObjectiveService, times(2))
+                .filter(eq(deviceId), any());
+    }
+
+    @Test
+    public void testHandleFttbSubscriberDhcpFlowsAdd() {
+        component.enableDhcpV4 = true;
+
+        // add two services, one requires DHCP the other doesn't
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        UniTagInformation mc = new UniTagInformation.Builder()
+                .setIsDhcpRequired(false).build();
+        uniTagInformationList.add(dpuMgmtUti);
+        uniTagInformationList.add(mc);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(2);
+        MeterId usOltBpMeterId = MeterId.meterId(3);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        TrafficTreatment expectedTreatment = DefaultTrafficTreatment.builder()
+                .setVlanId(dpuMgmtUti.getPonSTag())
+                .setVlanPcp((byte) dpuMgmtUti.getUsPonSTagPriority())
+                .setOutput(PortNumber.CONTROLLER)
+                .meter(usBpMeterId)
+                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(VlanId.NONE,
+                        dpuMgmtUti.getTechnologyProfileId(), usOltBpMeterId), 0L).build();
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .permit()
+                .withKey(Criteria.matchInPort(addedSub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchVlanId(dpuMgmtUti.getPonCTag()))
+                .addCondition(Criteria.matchVlanPcp((byte) dpuMgmtUti.getUsPonCTagPriority()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(expectedTreatment)
+                .add();
+
+        component.handleSubscriberDhcpFlows(addedSub.device.id(), addedSub.port,
+                OltFlowService.FlowOperation.ADD, si);
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(addedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testRemoveFttbSubscriberDhcpFlows() {
+        component.enableDhcpV4 = true;
+
+        // Mocking the get call, to mark the SubscriberKey as already added.
+        component.cpStatus = Mockito.mock(Map.class);
+        doReturn(new OltPortStatus(null, null, null, ADDED, null))
+                .when(component.cpStatus).get(Mockito.any());
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        uniTagInformationList.add(dpuMgmtUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(2);
+        MeterId usOltBpMeterId = MeterId.meterId(3);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        TrafficTreatment expectedTreatment = DefaultTrafficTreatment.builder()
+                .setVlanId(dpuMgmtUti.getPonSTag())
+                .setVlanPcp((byte) dpuMgmtUti.getUsPonSTagPriority())
+                .setOutput(PortNumber.CONTROLLER)
+                .meter(usBpMeterId)
+                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(VlanId.NONE,
+                        dpuMgmtUti.getTechnologyProfileId(), usOltBpMeterId), 0L).build();
+
+        FilteringObjective expectedFilter = DefaultFilteringObjective.builder()
+                .deny()
+                .withKey(Criteria.matchInPort(removedSub.port.number()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.IPV4.ethType()))
+                .addCondition(Criteria.matchIPProtocol(IPv4.PROTOCOL_UDP))
+                .addCondition(Criteria.matchUdpSrc(TpPort.tpPort(68)))
+                .addCondition(Criteria.matchUdpDst(TpPort.tpPort(67)))
+                .addCondition(Criteria.matchVlanId(dpuMgmtUti.getPonCTag()))
+                .addCondition(Criteria.matchVlanPcp((byte) dpuMgmtUti.getUsPonCTagPriority()))
+                .fromApp(testAppId)
+                .withPriority(10000)
+                .withMeta(expectedTreatment)
+                .add();
+
+        component.handleSubscriberDhcpFlows(removedSub.device.id(), removedSub.port,
+                OltFlowService.FlowOperation.REMOVE, si);
+        verify(component.flowObjectiveService, times(1))
+                .filter(eq(removedSub.device.id()), argThat(new FilteringObjectiveMatcher(expectedFilter)));
+    }
+
+    @Test
+    public void testHandleFttbMacSwitchingFlowsAdd() {
+        component.enableDhcpV4 = true;
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        uniTagInformationList.add(dpuMgmtUti);
+        uniTagInformationList.add(ancpUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        MacAddress mac = MacAddress.valueOf("0A:00:27:00:00:09");
+        Host host = Mockito.mock(Host.class);
+        doReturn(mac).when(host).mac();
+        doReturn(dpuMgmtUti.getPonSTag()).when(host).vlan();
+
+        doReturn(new HashSet<>(Arrays.asList(host))).when(component.hostService)
+                .getConnectedHosts(new ConnectPoint(addedSub.device.id(), addedSub.port.number()));
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(addedSub.port.number())
+                                .matchVlanId(uti.getPonCTag())
+                                .matchVlanPcp((byte) uti.getUsPonCTagPriority())
+                                .matchEthSrc(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setVlanPcp((byte) uti.getUsPonSTagPriority())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchVlanId(uti.getPonSTag())
+                                .matchEthDst(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(addedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), addedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .add();
+
+                component.handleSubscriberDataFlows(addedSub.device, addedSub.port,
+                        OltFlowService.FlowOperation.ADD, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(addedSub.device.id()), eq(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveFttbMacSwitchingFlows() {
+        component.enableDhcpV4 = true;
+        component.cpStatus = Mockito.mock(Map.class);
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        uniTagInformationList.add(dpuMgmtUti);
+        uniTagInformationList.add(ancpUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                        false, si);
+
+
+        ServiceKey sk1 = new ServiceKey(new AccessDevicePort(removedSub.port), dpuMgmtUti);
+        ServiceKey sk2 = new ServiceKey(new AccessDevicePort(removedSub.port), ancpUti);
+
+        component.cpStatus = component.storageService.
+                <ServiceKey, OltPortStatus>consistentMapBuilder().build().asJavaMap();
+        OltPortStatus cp1Status = new OltPortStatus(NONE, NONE, PENDING_ADD, NONE, NONE);
+        OltPortStatus cp2Status = new OltPortStatus(NONE, NONE, PENDING_ADD, NONE, NONE);
+        component.cpStatus.put(sk1, cp1Status);
+        component.cpStatus.put(sk2, cp2Status);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        MacAddress mac = MacAddress.valueOf("0A:00:27:00:00:09");
+        Host host = Mockito.mock(Host.class);
+        doReturn(mac).when(host).mac();
+        doReturn(dpuMgmtUti.getPonSTag()).when(host).vlan();
+
+        doReturn(new HashSet<>(Arrays.asList(host))).when(component.hostService)
+                .getConnectedHosts(new ConnectPoint(removedSub.device.id(), removedSub.port.number()));
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(removedSub.port.number())
+                                .matchVlanId(uti.getPonCTag())
+                                .matchVlanPcp((byte) uti.getUsPonCTagPriority())
+                                .matchEthSrc(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setVlanPcp((byte) uti.getUsPonSTagPriority())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchVlanId(uti.getPonSTag())
+                                .matchEthDst(mac);
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(removedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.NONE,
+                                        uti.getTechnologyProfileId(), removedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .remove();
+
+                component.handleSubscriberDataFlows(removedSub.device, removedSub.port,
+                        OltFlowService.FlowOperation.REMOVE, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(removedSub.device.id()), eq(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testHandleFttbSubscriberFlowsAdd() {
+        component.enableDhcpV4 = true;
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+        uniTagInformationList.add(fttbSubscriberUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber addedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.ADDED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(addedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(addedSub.port.number())
+                                .matchVlanId(uti.getPonCTag());
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchMetadata(uti.getPonSTag().toShort())
+                                .matchVlanId(uti.getPonSTag());
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(addedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), addedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .add();
+
+                component.handleSubscriberDataFlows(addedSub.device, addedSub.port,
+                        OltFlowService.FlowOperation.ADD, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(addedSub.device.id()), eq(expected));
+            }
+        }
+    }
+
+    @Test
+    public void testRemoveFttbSubscriberFlows() {
+        component.enableDhcpV4 = true;
+
+        OltPortStatus oltPortStatus1 = new OltPortStatus(null, null, ADDED,
+                null, null);
+        // Mocking the get call, to mark the SubscriberKey as already added.
+        component.cpStatus = Mockito.mock(Map.class);
+        when(component.cpStatus.get(Mockito.any())).thenReturn(oltPortStatus1);
+
+        List<UniTagInformation> uniTagInformationList = new LinkedList<>();
+
+        uniTagInformationList.add(fttbSubscriberUti);
+
+        SubscriberAndDeviceInformation si = new SubscriberAndDeviceInformation();
+        si.setUniTagList(uniTagInformationList);
+
+        final DiscoveredSubscriber removedSub =
+                new DiscoveredSubscriber(testDevice,
+                        uniUpdateEnabled, DiscoveredSubscriber.Status.REMOVED,
+                        false, si);
+
+        // return meter IDs
+        MeterId usBpMeterId = MeterId.meterId(1);
+        MeterId usOltBpMeterId = MeterId.meterId(1);
+
+        MeterId dsBpMeterId = MeterId.meterId(2);
+        MeterId dsOltBpMeterId = MeterId.meterId(2);
+
+        doReturn(usBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamBandwidthProfile());
+        doReturn(usOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getUpstreamOltBandwidthProfile());
+
+        doReturn(dsBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamBandwidthProfile());
+        doReturn(dsOltBpMeterId).when(component.oltMeterService)
+                .getMeterIdForBandwidthProfile(removedSub.device.id(), dpuMgmtUti.getDownstreamOltBandwidthProfile());
+
+        String[] directions = {FTTB_FLOW_UPSTREAM, FTTB_FLOW_DOWNSTREAM};
+        for (UniTagInformation uti : uniTagInformationList) {
+            for (String direction : directions) {
+                TrafficTreatment.Builder expectedTreatment = DefaultTrafficTreatment.builder();
+                TrafficSelector.Builder expectedSelectorBuilder = DefaultTrafficSelector.builder();
+
+                DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+                annotationBuilder.set(FTTB_FLOW_DIRECTION, direction);
+                annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+
+                switch (direction) {
+                    case FTTB_FLOW_UPSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(removedSub.port.number())
+                                .matchVlanId(uti.getPonCTag());
+
+                        expectedTreatment.setVlanId(uti.getPonSTag())
+                                .setOutput(nniPort.number())
+                                .meter(usBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), nniPort.number()), 0L).build();
+
+                        annotationBuilder.set(UPSTREAM_ONU, usBpMeterId.toString());
+                        annotationBuilder.set(UPSTREAM_OLT, usOltBpMeterId.toString());
+                        break;
+
+                    case FTTB_FLOW_DOWNSTREAM:
+                        expectedSelectorBuilder
+                                .matchInPort(nniPort.number())
+                                .matchMetadata(uti.getPonSTag().toShort())
+                                .matchVlanId(uti.getPonSTag());
+
+                        expectedTreatment.setVlanId(uti.getPonCTag())
+                                .setOutput(removedSub.port.number())
+                                .meter(dsBpMeterId)
+                                .writeMetadata(OltFlowServiceUtils.createMetadata(VlanId.ANY,
+                                        uti.getTechnologyProfileId(), removedSub.port.number()), 0L).build();
+
+                        annotationBuilder.set(DOWNSTREAM_ONU, dsBpMeterId.toString());
+                        annotationBuilder.set(DOWNSTREAM_OLT, dsOltBpMeterId.toString());
+                        break;
+
+                    default:
+                        return;
+                }
+
+                ForwardingObjective expected = DefaultForwardingObjective.builder()
+                        .withFlag(ForwardingObjective.Flag.VERSATILE)
+                        .withPriority(1000)
+                        .makePermanent()
+                        .withSelector(expectedSelectorBuilder.build())
+                        .withAnnotations(annotationBuilder.build())
+                        .fromApp(testAppId)
+                        .withTreatment(expectedTreatment.build())
+                        .remove();
+
+                component.handleSubscriberDataFlows(removedSub.device, removedSub.port,
+                        OltFlowService.FlowOperation.REMOVE, si, DEFAULT_MCAST_SERVICE_NAME_DEFAULT);
+                verify(component.flowObjectiveService, times(1))
+                        .forward(eq(removedSub.device.id()), eq(expected));
+            }
+        }
+    }
 }
\ No newline at end of file
diff --git a/impl/src/test/java/org/opencord/olt/impl/package-info.java b/impl/src/test/java/org/opencord/olt/impl/package-info.java
new file mode 100644
index 0000000..56e096f
--- /dev/null
+++ b/impl/src/test/java/org/opencord/olt/impl/package-info.java
@@ -0,0 +1,16 @@
+/**
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
\ No newline at end of file