[VOL-4577] : Update ONOS olt-app for adding FTTB DPU Management, ANCP traffic & trap rules

Change-Id: Ibb8aad6e68e8bd3b5f5824f0b04f4c5bc2f84a9e
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 4d452e6..701450d 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -49,10 +49,6 @@
 import org.onosproject.net.flow.TrafficTreatment;
 import org.onosproject.net.flow.criteria.Criteria;
 import org.onosproject.net.flow.criteria.Criterion;
-import org.onosproject.net.flow.criteria.EthTypeCriterion;
-import org.onosproject.net.flow.criteria.IPProtocolCriterion;
-import org.onosproject.net.flow.criteria.PortCriterion;
-import org.onosproject.net.flow.criteria.UdpPortCriterion;
 import org.onosproject.net.flow.criteria.VlanIdCriterion;
 import org.onosproject.net.flow.instructions.L2ModificationInstruction;
 import org.onosproject.net.flowobjective.DefaultFilteringObjective;
@@ -68,6 +64,7 @@
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
+import org.opencord.olt.impl.fttb.FttbUtils;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -100,7 +97,6 @@
 
 import static com.google.common.base.Strings.isNullOrEmpty;
 import static org.onlab.util.Tools.get;
-import static org.onosproject.net.flow.instructions.Instruction.Type.L2MODIFICATION;
 import static org.opencord.olt.impl.OltUtils.completeFlowOpToString;
 import static org.opencord.olt.impl.OltUtils.flowOpToString;
 import static org.opencord.olt.impl.OltUtils.getPortName;
@@ -123,10 +119,17 @@
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_ON_NNI_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE;
 import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_PPPOE_DEFAULT;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DIRECTION;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_DOWNSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_FLOW_UPSTREAM;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_DPU_ANCP_TRAFFIC;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_DPU_MGMT_TRAFFIC;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_NAME;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
 import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL;
 import static org.opencord.olt.impl.OsgiPropertyConstants.WAIT_FOR_REMOVAL_DEFAULT;
+import static org.opencord.olt.impl.fttb.FttbUtils.FTTB_SERVICE_SUBSCRIBER_TRAFFIC;
 
 @Component(immediate = true, property = {
         ENABLE_DHCP_ON_NNI + ":Boolean=" + ENABLE_DHCP_ON_NNI_DEFAULT,
@@ -182,7 +185,7 @@
     protected ApplicationId appId;
     private static final Integer MAX_PRIORITY = 10000;
     private static final Integer MIN_PRIORITY = 1000;
-    protected static final short EAPOL_DEFAULT_VLAN = 4091;
+    public static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final int NONE_TP_ID = -1;
     private static final String V4 = "V4";
     private static final String V6 = "V6";
@@ -382,7 +385,6 @@
                         "defaultTechProfileId:{}," + "waitForRemoval:{}",
                 enableDhcpOnNni, enableDhcpV4, enableDhcpV6, enableIgmpOnNni, enableEapol,
                 enablePppoeOnNni, enablePppoe, defaultTechProfileId, waitForRemoval);
-
     }
 
     @Override
@@ -477,7 +479,6 @@
     @Override
     public boolean handleBasicPortFlows(DiscoveredSubscriber sub, String bandwidthProfileId,
                                         String oltBandwidthProfileId) {
-
         // we only need to something if EAPOL is enabled
         if (!enableEapol) {
             log.debug("Eapol is disabled for {}", portWithName(sub.port));
@@ -492,7 +493,6 @@
             log.error("Unknown Status {} on DiscoveredSubscriber {}", sub.status, sub);
             return false;
         }
-
     }
 
     private boolean addDefaultFlows(DiscoveredSubscriber sub, String bandwidthProfileId, String oltBandwidthProfileId) {
@@ -554,7 +554,6 @@
                     log.warn("continuing provisioning on {}", portWithName(sub.port));
                 }
             }
-
         }
 
         // NOTE createMeters will return if the meters are not installed
@@ -859,7 +858,7 @@
 
         TrafficTreatment treatment = treatmentBuilder
                 .meter(meterId)
-                .writeMetadata(createTechProfValueForWriteMetadata(
+                .writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(
                         vlanId,
                         techProfileId, oltMeterId), 0)
                 .setOutput(PortNumber.CONTROLLER)
@@ -972,21 +971,6 @@
         return defaultTechProfileId;
     }
 
-    protected Long createTechProfValueForWriteMetadata(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
-        Long writeMetadata;
-
-        if (cVlan == null || VlanId.NONE.equals(cVlan)) {
-            writeMetadata = (long) techProfileId << 32;
-        } else {
-            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
-        }
-        if (upstreamOltMeterId == null) {
-            return writeMetadata;
-        } else {
-            return writeMetadata | upstreamOltMeterId.id();
-        }
-    }
-
     private void processLldpFilteringObjective(DeviceId deviceId, Port port, FlowOperation action) {
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
 
@@ -1128,16 +1112,28 @@
                     .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamBandwidthProfile());
             MeterId oltUsMeterId = oltMeterService
                     .getMeterIdForBandwidthProfile(device.id(), uti.getUpstreamOltBandwidthProfile());
-            processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
-                    oltUsMeterId, uti);
+
+            if (FttbUtils.isFttbService(uti)) {
+                processFttbUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action,
+                        usMeterId, oltUsMeterId, uti, si);
+            } else {
+                processUpstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, usMeterId,
+                        oltUsMeterId, uti);
+            }
 
             // downstream flows
             MeterId dsMeterId = oltMeterService
                     .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamBandwidthProfile());
             MeterId oltDsMeterId = oltMeterService
                     .getMeterIdForBandwidthProfile(device.id(), uti.getDownstreamOltBandwidthProfile());
-            processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
-                    oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+
+            if (FttbUtils.isFttbService(uti)) {
+                processFttbDownstreamDataFilteringObjects(device.id(), port, nniPort.get(),
+                        action, dsMeterId, oltDsMeterId, uti, si);
+            } else {
+                processDownstreamDataFilteringObjects(device.id(), port, nniPort.get(), action, dsMeterId,
+                        oltDsMeterId, uti, getMacAddress(device.id(), port, uti));
+            }
         });
     }
 
@@ -1148,6 +1144,7 @@
         ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
         log.debug("{} DHCP filtering objectives on {}", flowOpToString(action), sk);
 
+        String serviceName = uti.getServiceName();
 
         OltFlowsStatus status = action.equals(FlowOperation.ADD) ?
                 OltFlowsStatus.PENDING_ADD : OltFlowsStatus.PENDING_REMOVE;
@@ -1160,12 +1157,6 @@
             treatmentBuilder.meter(meterId);
         }
 
-        if (uti.getTechnologyProfileId() != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(
-                    createTechProfValueForWriteMetadata(uti.getUniTagMatch(),
-                            uti.getTechnologyProfileId(), oltMeterId), 0);
-        }
-
         FilteringObjective.Builder dhcpBuilder = (action == FlowOperation.ADD ? builder.permit() : builder.deny())
                 .withKey(Criteria.matchInPort(port.number()))
                 .addCondition(Criteria.matchEthType(ethType))
@@ -1175,17 +1166,40 @@
                 .fromApp(appId)
                 .withPriority(MAX_PRIORITY);
 
+        VlanId cVlan = uti.getUniTagMatch();
+
         //VLAN changes and PCP matching need to happen only in the upstream directions
         if (direction == FlowDirection.UPSTREAM) {
-            treatmentBuilder.setVlanId(uti.getPonCTag());
-            if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
-                dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
+            if (serviceName != null && serviceName.equals(FTTB_SERVICE_DPU_MGMT_TRAFFIC)) {
+                cVlan = VlanId.NONE;
+                FttbUtils.addUpstreamDhcpCondition(dhcpBuilder, uti);
+                FttbUtils.addUpstreamDhcpTreatment(treatmentBuilder, uti);
+            } else {
+                treatmentBuilder.setVlanId(uti.getPonCTag());
+                if (!VlanId.vlanId(VlanId.NO_VID).equals(uti.getUniTagMatch())) {
+                    dhcpBuilder.addCondition(Criteria.matchVlanId(uti.getUniTagMatch()));
+                }
+                if (uti.getUsPonCTagPriority() != -1) {
+                    treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
+                }
             }
-            if (uti.getUsPonCTagPriority() != -1) {
-                treatmentBuilder.setVlanPcp((byte) uti.getUsPonCTagPriority());
+        } else if (direction == FlowDirection.DOWNSTREAM) {
+            // Down stream DHCP vid to be matched if OLT Sadis info contains Vlan id in nniDhcpTrapVid.
+            Device device = deviceService.getDevice(deviceId);
+            SubscriberAndDeviceInformation oltSub = subsService.get(device.serialNumber());
+            VlanId nniDhcpTrapVid = oltSub.nniDhcpTrapVid();
+
+            if (nniDhcpTrapVid != null && !VlanId.vlanId(VlanId.NO_VID).equals(nniDhcpTrapVid)) {
+                dhcpBuilder.addCondition(Criteria.matchVlanId(nniDhcpTrapVid));
             }
         }
 
+        if (uti.getTechnologyProfileId() != NONE_TP_ID) {
+            treatmentBuilder.writeMetadata(
+                    OltFlowServiceUtils.createTechProfValueForWriteMetadata(cVlan,
+                            uti.getTechnologyProfileId(), oltMeterId), 0);
+        }
+
         dhcpBuilder.withMeta(treatmentBuilder
                 .setOutput(PortNumber.CONTROLLER).build());
 
@@ -1221,7 +1235,7 @@
         if (direction == FlowDirection.UPSTREAM) {
 
             if (techProfileId != NONE_TP_ID) {
-                treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(null,
+                treatmentBuilder.writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(null,
                         techProfileId, oltMeterId), 0);
             }
 
@@ -1282,7 +1296,8 @@
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWriteMetadata(cTag, techProfileId, oltMeterId), 0);
+            treatmentBuilder.writeMetadata(OltFlowServiceUtils.createTechProfValueForWriteMetadata(cTag, techProfileId,
+                    oltMeterId), 0);
         }
 
         DefaultFilteringObjective.Builder pppoedBuilder = ((action == FlowOperation.ADD)
@@ -1353,7 +1368,7 @@
         }
 
         treatmentBuilder.setOutput(nniPort.number())
-                .writeMetadata(createMetadata(uti.getPonCTag(),
+                .writeMetadata(OltFlowServiceUtils.createMetadata(uti.getPonCTag(),
                         uti.getTechnologyProfileId(), nniPort.number()), 0L);
 
         DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
@@ -1371,33 +1386,8 @@
                 treatmentBuilder.build(), MIN_PRIORITY,
                 annotationBuilder.build());
 
-        ObjectiveContext context = new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.info("{} Upstream Data plane filter for {}.",
-                        completeFlowOpToString(action), sk);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                log.error("Upstream Data plane filter for {} failed {} because {}.",
-                        sk, action, error);
-                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null, null);
-            }
-        };
-
-        ForwardingObjective flow = null;
-        if (action == FlowOperation.ADD) {
-            flow = flowBuilder.add(context);
-        } else if (action == FlowOperation.REMOVE) {
-            flow = flowBuilder.remove(context);
-        } else {
-            log.error("Flow action not supported: {}", action);
-        }
-
-        if (flow != null) {
-            flowObjectiveService.forward(deviceId, flow);
-        }
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.UPSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
     }
 
     private void processDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
@@ -1429,7 +1419,7 @@
                 .popVlan()
                 .setOutput(port.number());
 
-        treatmentBuilder.writeMetadata(createMetadata(uti.getPonCTag(),
+        treatmentBuilder.writeMetadata(OltFlowServiceUtils.createMetadata(uti.getPonCTag(),
                 uti.getTechnologyProfileId(),
                 port.number()), 0);
 
@@ -1462,36 +1452,8 @@
         DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
                 treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
 
-        ObjectiveContext context = new ObjectiveContext() {
-            @Override
-            public void onSuccess(Objective objective) {
-                log.info("{} Downstream Data plane filter for {}.",
-                        completeFlowOpToString(action), sk);
-            }
-
-            @Override
-            public void onError(Objective objective, ObjectiveError error) {
-                log.info("Downstream Data plane filter for {} failed {} because {}.",
-                         sk, action, error);
-                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null, null);
-            }
-        };
-
-        ForwardingObjective flow = null;
-        if (action == FlowOperation.ADD) {
-            flow = flowBuilder.add(context);
-        } else if (action == FlowOperation.REMOVE) {
-            flow = flowBuilder.remove(context);
-        } else {
-            log.error("Flow action not supported: {}", action);
-        }
-
-        if (flow != null) {
-            if (log.isTraceEnabled()) {
-                log.trace("Forwarding rule {}", flow);
-            }
-            flowObjectiveService.forward(deviceId, flow);
-        }
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.DOWNSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
     }
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
@@ -1508,14 +1470,6 @@
                 .withTreatment(treatment);
     }
 
-    private Long createMetadata(VlanId innerVlan, int techProfileId, PortNumber egressPort) {
-        if (techProfileId == NONE_TP_ID) {
-            techProfileId = DEFAULT_TP_ID_DEFAULT;
-        }
-
-        return ((long) (innerVlan.id()) << 48 | (long) techProfileId << 32) | egressPort.toLong();
-    }
-
     private boolean isMacLearningEnabled(SubscriberAndDeviceInformation si) {
         AtomicBoolean requiresMacLearning = new AtomicBoolean();
         requiresMacLearning.set(false);
@@ -1543,10 +1497,20 @@
 
         si.uniTagList().forEach(uniTagInfo -> {
             boolean isMacLearningEnabled = uniTagInfo.getEnableMacLearning();
-            boolean configureMac = isMacAddressValid(uniTagInfo);
+            boolean configureMac = OltFlowServiceUtils.isMacAddressValid(uniTagInfo);
             boolean discoveredMac = false;
+
+            final VlanId vlan;
+
+            if (FttbUtils.isFttbDpuOrAncpService(uniTagInfo)) {
+                // Using S tag, as C tag is replaced by Stag by ONU.
+                vlan = uniTagInfo.getPonSTag();
+            } else {
+                vlan = uniTagInfo.getPonCTag();
+            }
+
             Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, port.number()))
-                    .stream().filter(host -> host.vlan().equals(uniTagInfo.getPonCTag())).findFirst();
+                    .stream().filter(host -> host.vlan().equals(vlan)).findFirst();
             if (optHost.isPresent() && optHost.get().mac() != null) {
                 discoveredMac = true;
             }
@@ -1561,7 +1525,7 @@
     }
 
     protected MacAddress getMacAddress(DeviceId deviceId, Port port, UniTagInformation uniTagInfo) {
-        boolean configuredMac = isMacAddressValid(uniTagInfo);
+        boolean configuredMac = OltFlowServiceUtils.isMacAddressValid(uniTagInfo);
         if (configuredMac) {
             return MacAddress.valueOf(uniTagInfo.getConfiguredMacAddress());
         } else if (uniTagInfo.getEnableMacLearning()) {
@@ -1574,12 +1538,6 @@
         return null;
     }
 
-    private boolean isMacAddressValid(UniTagInformation tagInformation) {
-        return tagInformation.getConfiguredMacAddress() != null &&
-                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
-                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
-    }
-
     protected void updateConnectPointStatus(ServiceKey key, OltFlowsStatus eapolStatus,
                                             OltFlowsStatus subscriberEapolStatus,
                                             OltFlowsStatus subscriberFlowsStatus, OltFlowsStatus dhcpStatus,
@@ -1661,7 +1619,7 @@
                     if (port == null) {
                         log.warn("Port is gone in ONOS, " +
                                          "manually creating it {}", event.subject());
-                        PortNumber inPort = getPortNumberFromFlowRule(event.subject());
+                        PortNumber inPort = OltFlowServiceUtils.getPortNumberFromFlowRule(event.subject());
                         cpStatusReadLock.lock();
                         Optional<ServiceKey> keyWithPort = cpStatus.keySet()
                                 .stream().filter(key -> key.getPort().connectPoint()
@@ -1697,15 +1655,15 @@
         }
 
         protected void updateCpStatus(FlowRuleEvent.Type type, Port port, FlowRule flowRule) {
-            OltFlowsStatus status = flowRuleStatusToOltFlowStatus(type);
-            if (isDefaultEapolFlow(flowRule)) {
+            OltFlowsStatus status = OltFlowServiceUtils.flowRuleStatusToOltFlowStatus(type);
+            if (OltFlowServiceUtils.isDefaultEapolFlow(flowRule)) {
                 ServiceKey sk = new ServiceKey(new AccessDevicePort(port),
                         defaultEapolUniTag);
                 if (log.isTraceEnabled()) {
                     log.trace("update defaultEapolStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, status, null, null, null, null);
-            } else if (isSubscriberEapolFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isSubscriberEapolFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
@@ -1714,7 +1672,7 @@
                     log.trace("update subscriberEapolStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, null, status, null, status, null);
-            } else if (isDhcpFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isDhcpFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
@@ -1723,7 +1681,7 @@
                     log.trace("update dhcpStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, null, null, null, status, null);
-            } else if (isPppoeFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isPppoeFlow(flowRule)) {
                 ServiceKey sk = getSubscriberKeyFromFlowRule(flowRule, port);
                 if (sk == null) {
                     return;
@@ -1732,8 +1690,8 @@
                     log.trace("update pppoeStatus {} on {}", status, sk);
                 }
                 updateConnectPointStatus(sk, null, null, null, null, status);
-            } else if (isDataFlow(flowRule)) {
-                PortNumber number = getPortNumberFromFlowRule(flowRule);
+            } else if (OltFlowServiceUtils.isDataFlow(flowRule)) {
+                PortNumber number = OltFlowServiceUtils.getPortNumberFromFlowRule(flowRule);
                 if (number == null) {
                     log.error("Can't capture the port number from flow {}", flowRule);
                     return;
@@ -1754,118 +1712,17 @@
             }
         }
 
-        private boolean isDefaultEapolFlow(FlowRule flowRule) {
-            EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
-            if (c == null) {
-                return false;
-            }
-            if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
-                AtomicBoolean isDefault = new AtomicBoolean(false);
-                flowRule.treatment().allInstructions().forEach(instruction -> {
-                    if (instruction.type() == L2MODIFICATION) {
-                        L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
-                        if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
-                            L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
-                                    (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
-                            if (vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
-                                isDefault.set(true);
-                                return;
-                            }
-                        }
-                    }
-                });
-                return isDefault.get();
-            }
-            return false;
-        }
 
-        /**
-         * Returns true if the flow is a DHCP flow.
-         * Matches both upstream and downstream flows.
-         *
-         * @param flowRule The FlowRule to evaluate
-         * @return boolean
-         */
-        private boolean isDhcpFlow(FlowRule flowRule) {
-            IPProtocolCriterion ipCriterion = (IPProtocolCriterion) flowRule.selector()
-                    .getCriterion(Criterion.Type.IP_PROTO);
-            if (ipCriterion == null) {
-                return false;
-            }
-
-            UdpPortCriterion src = (UdpPortCriterion) flowRule.selector().getCriterion(Criterion.Type.UDP_SRC);
-
-            if (src == null) {
-                return false;
-            }
-            return ipCriterion.protocol() == IPv4.PROTOCOL_UDP &&
-                    (src.udpPort().toInt() == 68 || src.udpPort().toInt() == 67);
-        }
-
-        private boolean isPppoeFlow(FlowRule flowRule) {
-            EthTypeCriterion ethTypeCriterion = (EthTypeCriterion) flowRule.selector()
-                    .getCriterion(Criterion.Type.ETH_TYPE);
-
-            if (ethTypeCriterion == null) {
-                return false;
-            }
-            return EthType.EtherType.PPPoED.ethType().equals(ethTypeCriterion.ethType());
-        }
-
-        private boolean isDataFlow(FlowRule flowRule) {
-            // we consider subscriber flows the one that matches on VLAN_VID
-            // method is valid only because it's the last check after EAPOL and DHCP.
-            // this matches mcast flows as well, if we want to avoid that we can
-            // filter out the elements that have groups in the treatment or
-            // mcastIp in the selector
-            // IPV4_DST:224.0.0.22/32
-            // treatment=[immediate=[GROUP:0x1]]
-
-            return flowRule.selector().getCriterion(Criterion.Type.VLAN_VID) != null;
-        }
-
-        private boolean isSubscriberEapolFlow(FlowRule flowRule) {
-            EthTypeCriterion c = (EthTypeCriterion) flowRule.selector().getCriterion(Criterion.Type.ETH_TYPE);
-            if (c == null) {
-                return false;
-            }
-            if (c.ethType().equals(EthType.EtherType.EAPOL.ethType())) {
-                AtomicBoolean isSubscriber = new AtomicBoolean(false);
-                flowRule.treatment().allInstructions().forEach(instruction -> {
-                    if (instruction.type() == L2MODIFICATION) {
-                        L2ModificationInstruction modificationInstruction = (L2ModificationInstruction) instruction;
-                        if (modificationInstruction.subtype() == L2ModificationInstruction.L2SubType.VLAN_ID) {
-                            L2ModificationInstruction.ModVlanIdInstruction vlanInstruction =
-                                    (L2ModificationInstruction.ModVlanIdInstruction) modificationInstruction;
-                            if (!vlanInstruction.vlanId().id().equals(EAPOL_DEFAULT_VLAN)) {
-                                isSubscriber.set(true);
-                                return;
-                            }
-                        }
-                    }
-                });
-                return isSubscriber.get();
-            }
-            return false;
-        }
 
         private Port getCpFromFlowRule(FlowRule flowRule) {
             DeviceId deviceId = flowRule.deviceId();
-            PortNumber inPort = getPortNumberFromFlowRule(flowRule);
+            PortNumber inPort = OltFlowServiceUtils.getPortNumberFromFlowRule(flowRule);
             if (inPort != null) {
                 return deviceService.getPort(deviceId, inPort);
             }
             return null;
         }
 
-        private PortNumber getPortNumberFromFlowRule(FlowRule flowRule) {
-            PortCriterion inPort = (PortCriterion) flowRule.selector().getCriterion(Criterion.Type.IN_PORT);
-            if (inPort != null) {
-                return inPort.port();
-            }
-            return null;
-        }
-
         private ServiceKey getSubscriberKeyFromFlowRule(FlowRule flowRule, Port flowPort) {
             SubscriberAndDeviceInformation si = subsService.get(getPortName(flowPort));
 
@@ -1881,12 +1738,12 @@
 
             Optional<UniTagInformation> found = Optional.empty();
             VlanId flowVlan = null;
-            if (isDhcpFlow(flowRule)) {
+            if (OltFlowServiceUtils.isDhcpFlow(flowRule)) {
                 // we need to make a special case for DHCP as in the ATT workflow DHCP flows don't match on tags
                 L2ModificationInstruction.ModVlanIdInstruction instruction =
                         (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(1);
                 flowVlan = instruction.vlanId();
-            } else if (isSubscriberEapolFlow(flowRule)) {
+            } else if (OltFlowServiceUtils.isSubscriberEapolFlow(flowRule)) {
                 // we need to make a special case for EAPOL as in the ATT workflow EAPOL flows don't match on tags
                 L2ModificationInstruction.ModVlanIdInstruction instruction =
                         (L2ModificationInstruction.ModVlanIdInstruction) flowRule.treatment().immediate().get(2);
@@ -1917,21 +1774,6 @@
             return found.isPresent() ? new ServiceKey(new AccessDevicePort(flowPort), found.get()) : null;
 
         }
-
-        private OltFlowsStatus flowRuleStatusToOltFlowStatus(FlowRuleEvent.Type type) {
-            switch (type) {
-                case RULE_ADD_REQUESTED:
-                    return OltFlowsStatus.PENDING_ADD;
-                case RULE_ADDED:
-                    return OltFlowsStatus.ADDED;
-                case RULE_REMOVE_REQUESTED:
-                    return OltFlowsStatus.PENDING_REMOVE;
-                case RULE_REMOVED:
-                    return OltFlowsStatus.REMOVED;
-                default:
-                    return OltFlowsStatus.NONE;
-            }
-        }
     }
 
     protected void bindSadisService(SadisService service) {
@@ -1945,4 +1787,170 @@
         this.bpService = null;
         log.info("Sadis service is unloaded");
     }
-}
+
+    private void processFttbUpstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                         FlowOperation action,
+                                                         MeterId upstreamMeterId,
+                                                         MeterId upstreamOltMeterId,
+                                                         UniTagInformation uti,
+                                                         SubscriberAndDeviceInformation si) {
+        String serviceName = uti.getServiceName();
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+        TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
+                .matchInPort(port.number())
+                .matchVlanId(uti.getPonCTag());
+
+        if (uti.getUsPonCTagPriority() != -1) {
+            selectorBuilder.matchVlanPcp((byte) uti.getUsPonCTagPriority());
+        }
+
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
+
+        treatmentBuilder.setVlanId(uti.getPonSTag());
+        if (uti.getUsPonSTagPriority() != -1) {
+            treatmentBuilder.setVlanPcp((byte) uti.getUsPonSTagPriority());
+        }
+
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+        annotationBuilder.set(FTTB_SERVICE_NAME, serviceName);
+        annotationBuilder.set(FTTB_FLOW_DIRECTION, FTTB_FLOW_UPSTREAM);
+
+        if (upstreamMeterId != null) {
+            treatmentBuilder.meter(upstreamMeterId);
+            annotationBuilder.set(UPSTREAM_ONU, upstreamMeterId.toString());
+        }
+        if (upstreamOltMeterId != null) {
+            treatmentBuilder.meter(upstreamOltMeterId);
+            annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
+        }
+
+        VlanId innerVlan = null;
+
+        if (serviceName.equals(FTTB_SERVICE_DPU_MGMT_TRAFFIC) || serviceName.equals(FTTB_SERVICE_DPU_ANCP_TRAFFIC)) {
+            MacAddress mac = FttbUtils.getMacAddressFromDhcpEnabledUti(
+                    hostService, si, deviceId, port);
+
+            if (mac == null) {
+                log.error("Mac address not found port:{}, vlan:{}, service:{}",
+                        port, uti.getPonSTag(), serviceName);
+                return;
+            }
+
+            selectorBuilder.matchEthSrc(mac);
+            innerVlan = VlanId.NONE;
+
+        } else if (serviceName.equals(FTTB_SERVICE_SUBSCRIBER_TRAFFIC)) {
+            innerVlan = VlanId.ANY;
+        }
+
+        treatmentBuilder.setOutput(nniPort.number()).writeMetadata(OltFlowServiceUtils.createMetadata(innerVlan,
+                uti.getTechnologyProfileId(), nniPort.number()), 0L);
+
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+                treatmentBuilder.build(), MIN_PRIORITY,
+                annotationBuilder.build());
+
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.UPSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
+    }
+
+    private void processFttbDownstreamDataFilteringObjects(DeviceId deviceId, Port port, Port nniPort,
+                                                           FlowOperation action,
+                                                           MeterId downstreamMeterId,
+                                                           MeterId downstreamOltMeterId,
+                                                           UniTagInformation uti, SubscriberAndDeviceInformation si) {
+        String serviceName = uti.getServiceName();
+        ServiceKey sk = new ServiceKey(new AccessDevicePort(port), uti);
+        //subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
+        TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
+                .matchVlanId(uti.getPonSTag())
+                .matchInPort(nniPort.number());
+
+        TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
+                .setVlanId(uti.getPonCTag())
+                .setOutput(port.number());
+
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+        annotationBuilder.set(FTTB_SERVICE_NAME, uti.getServiceName());
+        annotationBuilder.set(FTTB_FLOW_DIRECTION, FTTB_FLOW_DOWNSTREAM);
+
+        if (downstreamMeterId != null) {
+            treatmentBuilder.meter(downstreamMeterId);
+            annotationBuilder.set(DOWNSTREAM_ONU, downstreamMeterId.toString());
+        }
+
+        if (downstreamOltMeterId != null) {
+            treatmentBuilder.meter(downstreamOltMeterId);
+            annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
+        }
+
+        VlanId innerVlan = null;
+
+        if (serviceName.equals(FTTB_SERVICE_DPU_MGMT_TRAFFIC) || serviceName.equals(FTTB_SERVICE_DPU_ANCP_TRAFFIC)) {
+            MacAddress mac = FttbUtils.getMacAddressFromDhcpEnabledUti(
+                    hostService, si, deviceId, port);
+
+            if (mac == null) {
+                log.error("Mac address not found port:{}, vlan:{}, service:{}",
+                        port, uti.getPonSTag(), serviceName);
+                return;
+            }
+
+            selectorBuilder.matchEthDst(mac);
+            innerVlan = VlanId.NONE;
+
+        } else if (serviceName.equals(FTTB_SERVICE_SUBSCRIBER_TRAFFIC)) {
+            innerVlan = VlanId.ANY;
+            selectorBuilder.matchMetadata(uti.getPonSTag().toShort());
+        }
+
+        treatmentBuilder.writeMetadata(OltFlowServiceUtils.createMetadata(innerVlan,
+                uti.getTechnologyProfileId(),
+                port.number()), 0);
+
+        DefaultForwardingObjective.Builder flowBuilder = createForwardingObjectiveBuilder(selectorBuilder.build(),
+                treatmentBuilder.build(), MIN_PRIORITY, annotationBuilder.build());
+
+        ObjectiveContext context = getSubscriberFlowBuilderContext(sk, action, FlowDirection.DOWNSTREAM);
+        processForwardingRule(action, flowBuilder, context, deviceId);
+    }
+
+    private ObjectiveContext getSubscriberFlowBuilderContext(ServiceKey sk, FlowOperation action,
+                                                             FlowDirection flowDirection) {
+        ObjectiveContext context = new ObjectiveContext() {
+            @Override
+            public void onSuccess(Objective objective) {
+                log.info("{} {} Data plane filter for {}.",
+                        completeFlowOpToString(action), flowDirection, sk);
+            }
+
+            @Override
+            public void onError(Objective objective, ObjectiveError error) {
+                log.info("{} Data plane filter for {} failed {} because {}.",
+                        flowDirection, sk, action, error);
+                updateConnectPointStatus(sk, null, null, OltFlowsStatus.ERROR, null, null);
+            }
+        };
+
+        return context;
+    }
+
+    private void processForwardingRule(FlowOperation action, DefaultForwardingObjective.Builder flowBuilder,
+                                       ObjectiveContext context, DeviceId deviceId) {
+        ForwardingObjective flow = null;
+        if (action == FlowOperation.ADD) {
+            flow = flowBuilder.add(context);
+        } else if (action == FlowOperation.REMOVE) {
+            flow = flowBuilder.remove(context);
+        } else {
+            log.error("Flow action not supported: {}", action);
+        }
+
+        if (flow != null) {
+            if (log.isTraceEnabled()) {
+                log.trace("Forwarding rule {}", flow);
+            }
+            flowObjectiveService.forward(deviceId, flow);
+        }
+    }
+}
\ No newline at end of file