[VOL-4180] Multi UNI feature implemented to OLT application.

Change-Id: I3d45719ebdce304ba94652ed9de553e40d76a77c

EAPOL flow bug-fixed

review fixes finshed

Multi UNI feature implemented to OLT application.

- It's possible to fetch a meter by annotations. (OltPipeline)
- New meters can be created for bandwidth profiles of OLT device.
- Olt meterId is transported via writeMetadata so that voltha/rw-core can parse it and assign the correct meters to ONU and OLT flows.

Change-Id: Ia6c9909b5f03b0f3fe329bd11580f891bfab3a32
diff --git a/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
index 5232eb8..459526c 100644
--- a/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
+++ b/impl/src/main/java/org/opencord/olt/driver/OltPipeline.java
@@ -94,6 +94,10 @@
 import java.util.stream.Collectors;
 
 import static org.onosproject.core.CoreService.CORE_APP_NAME;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DOWNSTREAM_ONU;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_OLT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.UPSTREAM_ONU;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -542,7 +546,7 @@
     private void installNoModificationRules(ForwardingObjective fwd) {
         Instructions.OutputInstruction output = (Instructions.OutputInstruction) fetchOutput(fwd, DOWNSTREAM);
         Instructions.MetadataInstruction writeMetadata = fetchWriteMetadata(fwd);
-        Instructions.MeterInstruction meter = (Instructions.MeterInstruction) fetchMeter(fwd);
+        Instructions.MeterInstruction meter = fwd.treatment().metered();
 
         TrafficSelector selector = fwd.selector();
 
@@ -627,6 +631,9 @@
     private void installDownstreamRulesForVlans(ForwardingObjective fwd, Instruction output,
                                                 TrafficSelector outerSelector, TrafficSelector innerSelector) {
 
+        Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
+        Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
+
         List<Pair<Instruction, Instruction>> vlanOps =
                 vlanOps(fwd,
                         L2ModificationInstruction.L2SubType.VLAN_POP);
@@ -640,11 +647,11 @@
         TrafficTreatment innerTreatment;
         VlanId setVlanId = ((L2ModificationInstruction.ModVlanIdInstruction) popAndRewrite.getRight()).vlanId();
         if (VlanId.NONE.equals(setVlanId)) {
-            innerTreatment = (buildTreatment(popAndRewrite.getLeft(), fetchMeter(fwd),
-                                             writeMetadataIncludingOnlyTp(fwd), output));
+            innerTreatment = (buildTreatment(popAndRewrite.getLeft(), onuDsMeter,
+                    writeMetadataIncludingOnlyTp(fwd), output));
         } else {
             innerTreatment = (buildTreatment(popAndRewrite.getRight(),
-                                             fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
+                    onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
         }
 
         List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
@@ -676,7 +683,7 @@
                 .withPriority(fwd.priority())
                 .withSelector(outerSelector)
                 .withTreatment(buildTreatment(popAndRewrite.getLeft(), modVlanId,
-                                              innerPbitSet, fetchMeter(fwd),
+                                              innerPbitSet, oltDsMeter,
                                               fetchWriteMetadata(fwd),
                                               Instructions.transition(QQ_TABLE)));
 
@@ -696,6 +703,9 @@
     private void installDownstreamRulesForAnyVlan(ForwardingObjective fwd, Instruction output,
                                                   TrafficSelector outerSelector, TrafficSelector innerSelector) {
 
+        Instruction onuDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_ONU));
+        Instruction oltDsMeter = fetchMeterById(fwd, fwd.annotations().value(DOWNSTREAM_OLT));
+
         //match: in port (nni), s-tag
         //action: immediate: write metadata, pop vlan, meter and go to table 1
         FlowRule.Builder outer = DefaultFlowRule.builder()
@@ -704,7 +714,7 @@
                 .makePermanent()
                 .withPriority(fwd.priority())
                 .withSelector(outerSelector)
-                .withTreatment(buildTreatment(Instructions.popVlan(), fetchMeter(fwd),
+                .withTreatment(buildTreatment(Instructions.popVlan(), oltDsMeter,
                                               fetchWriteMetadata(fwd), Instructions.transition(QQ_TABLE)));
 
         //match: in port (nni) and s-tag
@@ -716,8 +726,7 @@
                 .makePermanent()
                 .withPriority(fwd.priority())
                 .withSelector(innerSelector)
-                .withTreatment(buildTreatment(fetchMeter(fwd),
-                                              writeMetadataIncludingOnlyTp(fwd), output));
+                .withTreatment(buildTreatment(onuDsMeter, writeMetadataIncludingOnlyTp(fwd), output));
 
         applyRules(fwd, inner, outer);
     }
@@ -755,6 +764,9 @@
                                               Pair<Instruction, Instruction> innerPair,
                                               Pair<Instruction, Instruction> outerPair, Boolean noneValueVlanStatus) {
 
+        Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
+        Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
+
         List<Instruction> setVlanPcps = findL2Instructions(L2ModificationInstruction.L2SubType.VLAN_PCP,
                                                            fwd.treatment().allInstructions());
 
@@ -768,11 +780,11 @@
 
         TrafficTreatment innerTreatment;
         if (noneValueVlanStatus) {
-            innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), fetchMeter(fwd),
+            innerTreatment = buildTreatment(innerPair.getLeft(), innerPair.getRight(), onuUsMeter,
                                             fetchWriteMetadata(fwd), innerPbitSet,
                                             Instructions.transition(QQ_TABLE));
         } else {
-            innerTreatment = buildTreatment(innerPair.getRight(), fetchMeter(fwd), fetchWriteMetadata(fwd),
+            innerTreatment = buildTreatment(innerPair.getRight(), onuUsMeter, fetchWriteMetadata(fwd),
                                             innerPbitSet, Instructions.transition(QQ_TABLE));
         }
 
@@ -803,8 +815,8 @@
                 .makePermanent()
                 .withPriority(fwd.priority())
                 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
-                                              fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd),
-                                              outerPbitSet, output));
+                        oltUsMeter, writeMetadataIncludingOnlyTp(fwd),
+                        outerPbitSet, output));
 
         if (innerPbitSet != null) {
             byte innerPbit = ((L2ModificationInstruction.ModVlanPcpInstruction)
@@ -821,6 +833,8 @@
                                                 Pair<Instruction, Instruction> outerPair) {
 
         log.debug("Installing upstream rules for any value vlan");
+        Instruction onuUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_ONU));
+        Instruction oltUsMeter = fetchMeterById(fwd, fwd.annotations().value(UPSTREAM_OLT));
 
         //match: in port and any-vlan (coming from OLT app.)
         //action: write metadata, go to table 1 and meter
@@ -830,8 +844,7 @@
                 .makePermanent()
                 .withPriority(fwd.priority())
                 .withSelector(fwd.selector())
-                .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), fetchMeter(fwd),
-                                              fetchWriteMetadata(fwd)));
+                .withTreatment(buildTreatment(Instructions.transition(QQ_TABLE), onuUsMeter, fetchWriteMetadata(fwd)));
 
         //match: in port and any-vlan (coming from OLT app.)
         //action: immediate: push:QinQ, vlanId (s-tag), write metadata, meter and output
@@ -843,7 +856,7 @@
                 .withPriority(fwd.priority())
                 .withSelector(fwd.selector())
                 .withTreatment(buildTreatment(outerPair.getLeft(), outerPair.getRight(),
-                                              fetchMeter(fwd), writeMetadataIncludingOnlyTp(fwd), output));
+                        oltUsMeter, writeMetadataIncludingOnlyTp(fwd), output));
 
         applyRules(fwd, inner, outer);
     }
@@ -886,16 +899,15 @@
         return output;
     }
 
-    private Instruction fetchMeter(ForwardingObjective fwd) {
-        Instruction meter = fwd.treatment().metered();
-
-        if (meter == null) {
-            log.debug("Meter instruction is not found for the forwarding objective {}", fwd);
+    private Instruction fetchMeterById(ForwardingObjective fwd, String meterId) {
+        Optional<Instructions.MeterInstruction> meter = fwd.treatment().meters().stream()
+                .filter(meterInstruction -> meterInstruction.meterId().toString().equals(meterId)).findAny();
+        if (meter.isEmpty()) {
+            log.debug("Meter instruction is not found for the meterId: {} ", meterId);
             return null;
         }
-
-        log.debug("Meter instruction is found.");
-        return meter;
+        log.debug("Meter instruction is found for the meterId: {} ", meterId);
+        return meter.get();
     }
 
     private Instructions.MetadataInstruction fetchWriteMetadata(ForwardingObjective fwd) {
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 70a3753..f11ac21 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -405,6 +405,7 @@
         pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
             for (SubscriberFlowInfo fi : infos) {
                 if (fi.getUniPort().equals(connectPoint.port())) {
+                    log.debug("Subscriber is already pending, {}", fi);
                     isPending.set(true);
                     break;
                 }
@@ -434,7 +435,7 @@
         public void run() {
             CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
             oltFlowService.processEapolFilteringObjectives(subscriberPort,
-                                                     defaultBpId, filterFuture,
+                                                     defaultBpId, Optional.empty(), filterFuture,
                                                      VlanId.vlanId(EAPOL_DEFAULT_VLAN),
                                                      false);
             filterFuture.thenAcceptAsync(filterStatus -> {
@@ -511,14 +512,17 @@
             unprovisionVlans(uplinkPort, subscriberPort, uniTag);
 
             // remove eapol with subscriber bandwidth profile
+            Optional<String> upstreamOltBw = uniTag.getUpstreamOltBandwidthProfile() == null ?
+                    Optional.empty() : Optional.of(uniTag.getUpstreamOltBandwidthProfile());
             oltFlowService.processEapolFilteringObjectives(subscriberPort,
                                                            uniTag.getUpstreamBandwidthProfile(),
+                                                           upstreamOltBw,
                                                            null, uniTag.getPonCTag(), false);
 
             if (subscriberPort.port() != null && subscriberPort.isEnabled()) {
                 // reinstall eapol with default bandwidth profile
-                oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
-                                                               null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+                oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
+                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
             } else {
                 log.debug("Port {} is no longer enabled or it's unavailable. Not "
                                   + "reprogramming default eapol flow", connectPoint);
@@ -556,8 +560,8 @@
             //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
             //install subscriber flows
             CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
-            oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
-                                                           filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
+            oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
+                    filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
             filterFuture.thenAcceptAsync(filterStatus -> {
                 if (filterStatus == null) {
                     provisionUniTagInformation(uplinkPort, subscriberPort, cTag.get(), sTag.get(), tpId.get());
@@ -697,6 +701,10 @@
                 .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
         MeterId downstreamMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
+        MeterId upstreamOltMeterId = oltMeterService
+                .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamOltBandwidthProfile());
+        MeterId downstreamOltMeterId = oltMeterService
+                .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamOltBandwidthProfile());
 
         Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
                 getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort.number()),
@@ -707,7 +715,7 @@
                     "waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
             CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
             oltFlowService.processDhcpFilteringObjectives(subscriberPort,
-                    upstreamMeterId, uniTag, false, true, Optional.of(dhcpFuture));
+                    upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.of(dhcpFuture));
             dhcpFuture.thenAcceptAsync(dhcpStatus -> {
                 AccessDeviceEvent.Type type;
                 if (dhcpStatus == null) {
@@ -731,16 +739,19 @@
         }
 
         ForwardingObjective.Builder upFwd =
-                oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
+                oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag);
 
         Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
         ForwardingObjective.Builder downFwd =
-                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag, macAddress);
+                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, downstreamOltMeterId,
+                        uniTag, macAddress);
 
-        oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
+        oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
+                false, true);
         oltFlowService.processDhcpFilteringObjectives(subscriberPort,
-                upstreamMeterId, uniTag, false, true, Optional.empty());
-        oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
+                upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.empty());
+        oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
+                false, true);
 
         flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
             @Override
@@ -885,6 +896,10 @@
                 getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
         BandwidthProfileInformation downstreamBpInfo =
                 getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
+        BandwidthProfileInformation upstreamOltBpInfo =
+                getBandwidthProfileInformation(tagInfo.getUpstreamOltBandwidthProfile());
+        BandwidthProfileInformation downstreamOltBpInfo =
+                getBandwidthProfileInformation(tagInfo.getDownstreamOltBandwidthProfile());
         if (upstreamBpInfo == null) {
             log.warn("No meter installed since no Upstream BW Profile definition found for "
                              + "ctag {} stag {} tpId {} on {}",
@@ -898,17 +913,45 @@
                      tagInfo.getTechnologyProfileId(), subscriberPort);
             return;
         }
+        if ((upstreamOltBpInfo != null && downstreamOltBpInfo == null) ||
+                (upstreamOltBpInfo == null && downstreamOltBpInfo != null)) {
+            log.warn("No meter installed since only one olt BW Profile definition found for "
+                            + "ctag {} stag {} tpId {} and Device/port: {}:{}",
+                    tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+                    tagInfo.getTechnologyProfileId(), deviceId,
+                    subscriberPort);
+            return;
+        }
+
+        MeterId upOltMeterId = null;
+        MeterId downOltMeterId = null;
 
         // check for meterIds for the upstream and downstream bandwidth profiles
         MeterId upMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
         MeterId downMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
-        SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
-                                                       tagInfo, downMeterId, upMeterId,
-                                                       downstreamBpInfo.id(), upstreamBpInfo.id());
 
-        if (upMeterId != null && downMeterId != null) {
+        if (upstreamOltBpInfo != null) {
+            // Multi UNI service
+            upOltMeterId = oltMeterService
+                    .getMeterIdFromBpMapping(deviceId, upstreamOltBpInfo.id());
+            downOltMeterId = oltMeterService
+                    .getMeterIdFromBpMapping(deviceId, downstreamOltBpInfo.id());
+        } else {
+            // NOT Multi UNI service
+            log.debug("OLT bandwidth profiles fields are set to ONU bandwidth profiles");
+            upstreamOltBpInfo = upstreamBpInfo;
+            downstreamOltBpInfo = downstreamBpInfo;
+            upOltMeterId = upMeterId;
+            downOltMeterId = downMeterId;
+        }
+        SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
+                                                       tagInfo, downMeterId, upMeterId, downOltMeterId, upOltMeterId,
+                                                       downstreamBpInfo.id(), upstreamBpInfo.id(),
+                                                       downstreamOltBpInfo.id(), upstreamOltBpInfo.id());
+
+        if (upMeterId != null && downMeterId != null && upOltMeterId != null && downOltMeterId != null) {
             log.debug("Meters are existing for upstream {} and downstream {} on {}",
                     upstreamBpInfo.id(), downstreamBpInfo.id(), subscriberPort);
             handleSubFlowsWithMeters(fi);
@@ -925,21 +968,37 @@
                 return queue;
             });
 
+            List<BandwidthProfileInformation> bws = new ArrayList<>();
             // queue up the meters to be created
             if (upMeterId == null) {
                 log.debug("Missing meter for upstream {} on {}", upstreamBpInfo.id(), subscriberPort);
-                checkAndCreateDevMeter(deviceId, upstreamBpInfo);
+                bws.add(upstreamBpInfo);
             }
             if (downMeterId == null) {
                 log.debug("Missing meter for downstream {} on {}", downstreamBpInfo.id(), subscriberPort);
-                checkAndCreateDevMeter(deviceId, downstreamBpInfo);
+                bws.add(downstreamBpInfo);
             }
+            if (upOltMeterId == null) {
+                log.debug("Missing meter for upstreamOlt {} on {}", upstreamOltBpInfo.id(), subscriberPort);
+                bws.add(upstreamOltBpInfo);
+            }
+            if (downOltMeterId == null) {
+                log.debug("Missing meter for downstreamOlt {} on {}", downstreamOltBpInfo.id(), subscriberPort);
+                bws.add(downstreamOltBpInfo);
+            }
+            bws.stream().distinct().forEach(bw -> checkAndCreateDevMeter(deviceId, bw));
         }
     }
 
     private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+        log.debug("Checking and Creating Meter with {} on {}", bwpInfo, deviceId);
+        if (bwpInfo == null) {
+            log.error("Can't create meter. Bandwidth profile is null for device : {}", deviceId);
+            return;
+        }
         //If false the meter is already being installed, skipping installation
         if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
+            log.debug("Meter is already being installed on {} for {}", deviceId, bwpInfo);
             return;
         }
         createMeter(deviceId, bwpInfo);
@@ -953,52 +1012,69 @@
                                                       meterFuture);
 
         meterFuture.thenAcceptAsync(result -> {
-            BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
-            // iterate through the subscribers on hold
-            if (queue != null) {
-                while (true) {
-                    //TODO this might return the reference and not the actual object so
-                    // it can be actually swapped underneath us.
-                    SubscriberFlowInfo fi = queue.peek();
-                    if (fi == null) {
-                        log.debug("No more subscribers pending on {}", deviceId);
-                        pendingSubscribersForDevice.replace(deviceId, queue);
-                        break;
-                    }
-                    if (result == null) {
-                        // meter install sent to device
-                        log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
+            log.debug("Meter Future for {} has completed", meterId);
+            pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
+                // iterate through the subscribers on hold
+                if (queue != null && !queue.isEmpty()) {
+                    while (true) {
+                        //TODO this might return the reference and not the actual object so
+                        // it can be actually swapped underneath us.
+                        SubscriberFlowInfo fi = queue.peek();
+                        if (fi == null) {
+                            log.debug("No more subscribers pending on {}", deviceId);
+                            queue = new LinkedBlockingQueue<>();
+                            break;
+                        }
+                        if (result == null) {
+                            // meter install sent to device
+                            log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
 
-                        MeterId upMeterId = oltMeterService
-                                .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
-                        MeterId downMeterId = oltMeterService
-                                .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
-                        if (upMeterId != null && downMeterId != null) {
-                            log.debug("Provisioning subscriber after meter {} " +
-                                              "installation and both meters are present " +
-                                              "upstream {} and downstream {} on {}",
-                                      meterId, upMeterId, downMeterId, fi.getUniPort());
-                            // put in the meterIds  because when fi was first
-                            // created there may or may not have been a meterId
-                            // depending on whether the meter was created or
-                            // not at that time.
-                            fi.setUpMeterId(upMeterId);
-                            fi.setDownMeterId(downMeterId);
-                            handleSubFlowsWithMeters(fi);
+                            MeterId upMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
+                            MeterId downMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
+                            MeterId upOltMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getUpOltBpInfo());
+                            MeterId downOltMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(deviceId, fi.getDownOltBpInfo());
+                            if (upMeterId != null && downMeterId != null &&
+                                    upOltMeterId != null && downOltMeterId != null) {
+                                log.debug("Provisioning subscriber after meter {} " +
+                                                  "installation and all meters are present " +
+                                                  "upstream {} , downstream {} , oltUpstream {} " +
+                                                  "and oltDownstream {} on {}",
+                                          meterId, upMeterId, downMeterId, upOltMeterId,
+                                          downOltMeterId, fi.getUniPort());
+                                // put in the meterIds  because when fi was first
+                                // created there may or may not have been a meterId
+                                // depending on whether the meter was created or
+                                // not at that time.
+                                fi.setUpMeterId(upMeterId);
+                                fi.setDownMeterId(downMeterId);
+                                fi.setUpOltMeterId(upOltMeterId);
+                                fi.setDownOltMeterId(downOltMeterId);
+                                handleSubFlowsWithMeters(fi);
+                                queue.remove(fi);
+                            } else {
+                                log.debug("Not all meters for {} are yet installed up {}, " +
+                                                 "down {}, oltUp {}, oltDown {}", fi, upMeterId,
+                                         downMeterId, upOltMeterId, downOltMeterId);
+                            }
+                            oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+                        } else {
+                            // meter install failed
+                            log.error("Addition of subscriber {} on {} failed due to meter " +
+                                              "{} with result {}", fi, fi.getUniPort(), meterId, result);
+                            oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
                             queue.remove(fi);
                         }
-                        oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
-                    } else {
-                        // meter install failed
-                        log.error("Addition of subscriber {} on {} failed due to meter " +
-                                          "{} with result {}", fi, fi.getUniPort(), meterId, result);
-                        queue.remove(fi);
-                        oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
                     }
+                } else {
+                    log.info("No pending subscribers on {}", deviceId);
+                    queue = new LinkedBlockingQueue<>();
                 }
-            } else {
-                log.info("No pending subscribers on {}", deviceId);
-            }
+                return queue;
+            });
         });
 
     }
@@ -1027,7 +1103,8 @@
 
                 CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
                 oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getUniPort(),
-                        subscriberFlowInfo.getUpId(), tagInfo, true, true, Optional.of(dhcpFuture));
+                        subscriberFlowInfo.getUpId(), subscriberFlowInfo.getUpOltId(),
+                        tagInfo, true, true, Optional.of(dhcpFuture));
                 dhcpFuture.thenAcceptAsync(dhcpStatus -> {
                     if (dhcpStatus != null) {
                         log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
@@ -1063,8 +1140,8 @@
         CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
 
         ForwardingObjective.Builder upFwd =
-                oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort,
-                                               subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
+                oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort, subscriberFlowInfo.getUpId(),
+                        subscriberFlowInfo.getUpOltId(), subscriberFlowInfo.getTagInfo());
         flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
@@ -1080,7 +1157,8 @@
 
         ForwardingObjective.Builder downFwd =
                 oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), uniPort,
-                        subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo(), macAddress);
+                        subscriberFlowInfo.getDownId(), subscriberFlowInfo.getDownOltId(),
+                        subscriberFlowInfo.getTagInfo(), macAddress);
         flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
@@ -1106,20 +1184,23 @@
                 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
             } else {
                 log.debug("Upstream and downstream data plane flows are installed successfully on {}", uniPort);
+                Optional<String> upstreamOltBw = tagInfo.getUpstreamOltBandwidthProfile() == null ?
+                        Optional.empty() : Optional.of(tagInfo.getUpstreamOltBandwidthProfile());
                 oltFlowService.processEapolFilteringObjectives(uniPort, tagInfo.getUpstreamBandwidthProfile(),
-                                                               null, tagInfo.getPonCTag(), true);
-
+                                                               upstreamOltBw, null,
+                        tagInfo.getPonCTag(), true);
 
                 if (!tagInfo.getEnableMacLearning()) {
                     oltFlowService.processDhcpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                                                                  tagInfo, true, true, Optional.empty());
+                            subscriberFlowInfo.getUpOltId(), tagInfo, true, true, Optional.empty());
                 }
 
                 oltFlowService.processIgmpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                                                              tagInfo, true, true);
+                        subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
 
                 oltFlowService.processPPPoEDFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
-                                                                tagInfo, true, true);
+                        subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
+
                 updateProgrammedSubscriber(uniPort, tagInfo, true);
             }
             post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(), uniPort.port(),
@@ -1228,8 +1309,8 @@
                     AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
                     if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
                         log.info("Creating Eapol on {}", port);
-                        oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
-                                                                       VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+                        oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+                                null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
                     } else {
                         log.debug("Subscriber Eapol on {} is already provisioned, not installing default", port);
                     }
@@ -1465,7 +1546,7 @@
 
                             if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be sent for port added {}", port);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId,
+                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
                                                                                null,
                                                                                VlanId.vlanId(EAPOL_DEFAULT_VLAN),
                                                                                true);
@@ -1487,9 +1568,8 @@
                             if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
                                 log.info("No subscriber provisioned on port {} in PORT_REMOVED event, " +
                                                  "removing default EAPOL flow", port);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
-                                                                               VlanId.vlanId(EAPOL_DEFAULT_VLAN),
-                                                                               false);
+                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+                                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
                             } else {
                                 removeSubscriber(port);
                             }
@@ -1516,17 +1596,17 @@
                             if (!port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be {} updated for {} with default vlan {}",
                                          (port.isEnabled()) ? "added" : "removed", port, EAPOL_DEFAULT_VLAN);
-                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
-                                                                               VlanId.vlanId(EAPOL_DEFAULT_VLAN),
-                                                                               port.isEnabled());
+                                oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+                                        null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), port.isEnabled());
                             }
                         } else {
                             log.info("eapol will be {} updated for {}", (port.isEnabled()) ? "added" : "removed",
                                      port);
                             for (UniTagInformation uniTag : uniTagInformationSet) {
                                 oltFlowService.processEapolFilteringObjectives(port,
-                                        uniTag.getUpstreamBandwidthProfile(), null,
-                                        uniTag.getPonCTag(), port.isEnabled());
+                                        uniTag.getUpstreamBandwidthProfile(),
+                                        Optional.of(uniTag.getUpstreamOltBandwidthProfile()),
+                                        null, uniTag.getPonCTag(), port.isEnabled());
                             }
                         }
                         if (port.isEnabled()) {
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 466d363..1868421 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -27,7 +27,9 @@
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Annotations;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceService;
@@ -66,8 +68,11 @@
 import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
+import java.util.Arrays;
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -271,6 +276,7 @@
     @Override
     public void processDhcpFilteringObjectives(AccessDevicePort port,
                                                MeterId upstreamMeterId,
+                                               MeterId upstreamOltMeterId,
                                                UniTagInformation tagInformation,
                                                boolean install,
                                                boolean upstream,
@@ -306,8 +312,8 @@
             byte protocol = IPv4.PROTOCOL_UDP;
 
             addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
-                                       vlanPcp, upstream, install, dhcpFuture);
+                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
+                    vlanPcp, upstream, install, dhcpFuture);
         }
 
         if (enableDhcpV6) {
@@ -318,14 +324,14 @@
             byte protocol = IPv6.PROTOCOL_UDP;
 
             addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
-                    upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
-                                       vlanPcp, upstream, install, dhcpFuture);
+                    upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
+                    vlanPcp, upstream, install, dhcpFuture);
         }
     }
 
     private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
-                                            EthType ethType, MeterId upstreamMeterId, int techProfileId, byte protocol,
-                                            VlanId cTag, VlanId unitagMatch,
+                                            EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
+                                            int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
                                             Byte vlanPcp, boolean upstream,
                                             boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
 
@@ -337,7 +343,7 @@
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId), 0);
+            treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
         }
 
         FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
@@ -389,6 +395,7 @@
     @Override
     public void processPPPoEDFilteringObjectives(AccessDevicePort port,
                                                  MeterId upstreamMeterId,
+                                                 MeterId upstreamOltMeterId,
                                                  UniTagInformation tagInformation,
                                                  boolean install,
                                                  boolean upstream) {
@@ -420,7 +427,7 @@
         }
 
         if (techProfileId != NONE_TP_ID) {
-            treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId), 0);
+            treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
         }
 
         DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
@@ -459,6 +466,7 @@
     @Override
     public void processIgmpFilteringObjectives(AccessDevicePort port,
                                                MeterId upstreamMeterId,
+                                               MeterId upstreamOltMeterId,
                                                UniTagInformation tagInformation,
                                                boolean install,
                                                boolean upstream) {
@@ -484,7 +492,7 @@
 
             if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
                 treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
-                        tagInformation.getTechnologyProfileId()), 0);
+                        tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
             }
 
 
@@ -532,7 +540,7 @@
     }
 
     @Override
-    public void processEapolFilteringObjectives(AccessDevicePort port, String bpId,
+    public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
                                                 CompletableFuture<ObjectiveError> filterFuture,
                                                 VlanId vlanId, boolean install) {
 
@@ -546,6 +554,12 @@
         }
         log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
         BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
+        BandwidthProfileInformation oltBpInfo;
+        if (oltBpId.isPresent()) {
+            oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
+        } else {
+            oltBpInfo = bpInfo;
+        }
         if (bpInfo == null) {
             log.warn("Bandwidth profile {} is not found. Authentication flow"
                     + " will not be installed on {}", bpId, port);
@@ -558,19 +572,22 @@
         ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
         DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
-        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
         // check if meter exists and create it only for an install
         final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
-        log.info("Meter id {} for Bandwidth profile {} associated to EAPOL on {}",
-                meterId, bpInfo.id(), port.deviceId());
-        if (meterId == null) {
+        MeterId oltMeterId = null;
+        if (oltBpId.isPresent()) {
+            oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
+        }
+        log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
+                        "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
+        if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
             if (install) {
                 log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
                 SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                               new UniTagInformation.Builder()
-                                                                       .setPonCTag(vlanId).build(),
-                                                               null, null,
-                                                               null, bpInfo.id());
+                                                            new UniTagInformation.Builder()
+                                                                    .setPonCTag(vlanId).build(),
+                                                        null, meterId, null, oltMeterId,
+                                                    null, bpInfo.id(), null, oltBpInfo.id());
                 pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
                     if (queue == null) {
                         queue = new LinkedBlockingQueue<>();
@@ -580,11 +597,14 @@
                 });
 
                 //If false the meter is already being installed, skipping installation
-                if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo)) {
+                if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
+                        !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
                     return;
                 }
-                MeterId innerMeterId = oltMeterService.createMeter(port.deviceId(), bpInfo, meterFuture);
-                fi.setUpMeterId(innerMeterId);
+                List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
+                bwpList.stream().distinct().filter(Objects::nonNull)
+                        .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
+                        cp, filterBuilder, treatmentBuilder));
             } else {
                 // this case should not happen as the request to remove an eapol
                 // flow should mean that the flow points to a meter that exists.
@@ -593,54 +613,75 @@
                 log.warn("Unknown meter id for bp {}, still proceeding with "
                         + "delete of eapol flow on {}", bpInfo.id(), port);
                 SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                               new UniTagInformation.Builder()
-                                                                       .setPonCTag(vlanId).build(),
-                                                               null, meterId,
-                                                               null, bpInfo.id());
-                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+                        new UniTagInformation.Builder().setPonCTag(vlanId).build(),
+                        null, meterId, null, oltMeterId, null,
+                        bpInfo.id(), null, oltBpInfo.id());
+                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
             }
         } else {
             log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
             SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
-                                                           new UniTagInformation.Builder()
-                                                                   .setPonCTag(vlanId).build(),
-                                                           null, meterId,
-                                                           null, bpInfo.id());
-            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+                    new UniTagInformation.Builder().setPonCTag(vlanId).build(),
+                    null, meterId, null, oltMeterId, null,
+                    bpInfo.id(), null, oltBpInfo.id());
+            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
             //No need for the future, meter is present.
             return;
         }
-        meterFuture.thenAcceptAsync(result -> {
+    }
+
+    private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
+                                            CompletableFuture<ObjectiveError> filterFuture,
+                                            boolean install, ConnectPoint cp,
+                                            DefaultFilteringObjective.Builder filterBuilder,
+                                            TrafficTreatment.Builder treatmentBuilder) {
+        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+        MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
+        DeviceId deviceId = port.deviceId();
+        meterFuture.thenAccept(result -> {
             //for each pending eapol flow we check if the meter is there.
-            BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(port.deviceId());
-            if (queue != null) {
-                while (true) {
-                    SubscriberFlowInfo fi = queue.remove();
-                    if (fi == null) {
-                        pendingEapolForDevice.replace(port.deviceId(), queue);
-                        break;
-                    }
-                    //TODO this might return the reference and not the actual object
-                    // so it can be actually swapped underneath us.
-                    log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
-                    if (result == null) {
-                        MeterId mId = oltMeterService
-                                .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
-                        if (mId != null) {
-                            log.debug("Meter installation completed for subscriber on {}, " +
-                                            "handling EAPOL trap flow", port);
-                            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+            pendingEapolForDevice.compute(deviceId, (id, queue) -> {
+                if (queue != null && !queue.isEmpty()) {
+                    while (true) {
+                        //TODO this might return the reference and not the actual object
+                        // so it can be actually swapped underneath us.
+                        SubscriberFlowInfo fi = queue.peek();
+                        if (fi == null) {
+                            log.debug("No more subscribers eapol flows on {}", deviceId);
+                            queue = new LinkedBlockingQueue<>();
+                            break;
                         }
-                    } else {
-                        log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
-                                         "Result {} and MeterId {}", port, result,
-                                meterId);
+                        log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
+                        if (result == null) {
+                            MeterId mId = oltMeterService
+                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
+                            MeterId oltMeterId = oltMeterService
+                                    .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
+                            if (mId != null && oltMeterId != null) {
+                                log.debug("Meter installation completed for subscriber on {}, " +
+                                                  "handling EAPOL trap flow", port);
+                                fi.setUpMeterId(mId);
+                                fi.setUpOltMeterId(oltMeterId);
+                                handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
+                                            mId, oltMeterId);
+                                queue.remove(fi);
+                            } else {
+                                log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
+                                                  "oltMeterId {}", fi, meterId, oltMeterId);
+                            }
+                        } else {
+                            log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+                                             "Result {} and MeterId {}", port, result, meterId);
+                            queue.remove(fi);
+                        }
+                        oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
                     }
-                    oltMeterService.removeFromPendingMeters(port.deviceId(), bpInfo);
+                } else {
+                    log.info("No pending EAPOLs on {}", port.deviceId());
+                    queue = new LinkedBlockingQueue<>();
                 }
-            } else {
-                log.info("No pending EAPOLs on {}", port.deviceId());
-            }
+                return queue;
+            });
         });
     }
 
@@ -648,7 +689,7 @@
                              boolean install, ConnectPoint cp,
                              DefaultFilteringObjective.Builder filterBuilder,
                              TrafficTreatment.Builder treatmentBuilder,
-                             SubscriberFlowInfo fi, MeterId mId) {
+                             SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
         log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
                  mId, fi.getUpBpInfo(), fi.getUniPort(),
                  (install) ? "Installing" : "Removing");
@@ -664,7 +705,7 @@
                 .withMeta(treatmentBuilder
                                   .writeMetadata(createTechProfValueForWm(
                                           fi.getTagInfo().getPonCTag(),
-                                          techProfileId), 0)
+                                          techProfileId, oltMeterId), 0)
                                   .setOutput(PortNumber.CONTROLLER)
                                   .pushVlan()
                                   .setVlanId(fi.getTagInfo().getPonCTag())
@@ -707,9 +748,9 @@
         log.info("{} flows for NNI port {}",
                  install ? "Adding" : "Removing", nniPort);
         processLldpFilteringObjective(nniPort, install);
-        processDhcpFilteringObjectives(nniPort, null, null, install, false, Optional.empty());
-        processIgmpFilteringObjectives(nniPort, null, null, install, false);
-        processPPPoEDFilteringObjectives(nniPort, null, null, install, false);
+        processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
+        processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
+        processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
     }
 
 
@@ -765,13 +806,15 @@
                         upstream ? uplinkPort.number() : subscriberPort.number()), 0)
                 .build();
 
-        return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY);
+        return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
+                DefaultAnnotations.builder().build());
     }
 
     @Override
     public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
                                                        AccessDevicePort subscriberPort,
                                                        MeterId upstreamMeterId,
+                                                       MeterId upstreamOltMeterId,
                                                        UniTagInformation uniTagInformation) {
 
         TrafficSelector selector = DefaultTrafficSelector.builder()
@@ -801,17 +844,26 @@
                 .writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
                         uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
 
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+
         if (upstreamMeterId != null) {
             treatmentBuilder.meter(upstreamMeterId);
+            annotationBuilder.set(UPSTREAM_ONU, upstreamMeterId.toString());
+        }
+        if (upstreamOltMeterId != null) {
+            treatmentBuilder.meter(upstreamOltMeterId);
+            annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY);
+        return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+                annotationBuilder.build());
     }
 
     @Override
     public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
                                                          AccessDevicePort subscriberPort,
                                                          MeterId downstreamMeterId,
+                                                         MeterId downstreamOltMeterId,
                                                          UniTagInformation tagInformation,
                                                          Optional<MacAddress> macAddress) {
 
@@ -854,11 +906,20 @@
             treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
         }
 
+        DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+
         if (downstreamMeterId != null) {
             treatmentBuilder.meter(downstreamMeterId);
+            annotationBuilder.set(DOWNSTREAM_ONU, downstreamMeterId.toString());
         }
 
-        return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY);
+        if (downstreamOltMeterId != null) {
+            treatmentBuilder.meter(downstreamOltMeterId);
+            annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
+        }
+
+        return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
+                annotationBuilder.build());
     }
 
     @Override
@@ -868,12 +929,14 @@
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
                                                                                 TrafficTreatment treatment,
-                                                                                Integer priority) {
+                                                                                Integer priority,
+                                                                                Annotations annotations) {
         return DefaultForwardingObjective.builder()
                 .withFlag(ForwardingObjective.Flag.VERSATILE)
                 .withPriority(priority)
                 .makePermanent()
                 .withSelector(selector)
+                .withAnnotations(annotations)
                 .fromApp(appId)
                 .withTreatment(treatment);
     }
@@ -882,15 +945,24 @@
      * Returns the write metadata value including tech profile reference and innerVlan.
      * For param cVlan, null can be sent
      *
-     * @param cVlan         c (customer) tag of one subscriber
-     * @param techProfileId tech profile id of one subscriber
+     * @param cVlan                 c (customer) tag of one subscriber
+     * @param techProfileId         tech profile id of one subscriber
+     * @param upstreamOltMeterId    upstream meter id for OLT device.
      * @return the write metadata value including tech profile reference and innerVlan
      */
-    private Long createTechProfValueForWm(VlanId cVlan, int techProfileId) {
+    private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+        Long writeMetadata;
+
         if (cVlan == null || VlanId.NONE.equals(cVlan)) {
-            return (long) techProfileId << 32;
+            writeMetadata = (long) techProfileId << 32;
+        } else {
+            writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
         }
-        return ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+        if (upstreamOltMeterId == null) {
+            return writeMetadata;
+        } else {
+            return writeMetadata | upstreamOltMeterId.id();
+        }
     }
 
     private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 59aed0e..ab603b1 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -182,6 +182,15 @@
 
     @Override
     public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
+        if (bandwidthProfile == null) {
+            log.warn("Bandwidth Profile requested is null");
+            return null;
+        }
+        if (bpInfoToMeter.get(bandwidthProfile) == null) {
+            log.warn("Bandwidth Profile '{}' is not present in the map",
+                     bandwidthProfile);
+            return null;
+        }
         if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
             log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
                     bandwidthProfile);
@@ -282,6 +291,10 @@
 
     @Override
     public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+        if (bwpInfo == null) {
+            log.debug("Bandwidth profile is null for device: {}", deviceId);
+            return false;
+        }
         if (pendingMeters.containsKey(deviceId)
                 && pendingMeters.get(deviceId).contains(bwpInfo)) {
             log.debug("Meter is already pending on {} with bp {}",
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 1db37e5..0e82f37 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -62,4 +62,9 @@
 
     public static final String REQUIRED_DRIVERS_PROPERTY_DELAY = "requiredDriversPropertyDelay";
     public static final int REQUIRED_DRIVERS_PROPERTY_DELAY_DEFAULT = 5;
+
+    public static final String UPSTREAM_ONU = "upstreamOnu";
+    public static final String UPSTREAM_OLT = "upstreamOlt";
+    public static final String DOWNSTREAM_ONU = "downstreamOnu";
+    public static final String DOWNSTREAM_OLT = "downstreamOlt";
 }
diff --git a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
index 444f45f..225e48f 100644
--- a/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
+++ b/impl/src/main/java/org/opencord/olt/impl/SubscriberFlowInfo.java
@@ -32,30 +32,44 @@
     private final UniTagInformation tagInfo;
     private MeterId downId;
     private MeterId upId;
+    private MeterId downOltId;
+    private MeterId upOltId;
     private final String downBpInfo;
     private final String upBpInfo;
+    private final String downOltBpInfo;
+    private final String upOltBpInfo;
 
     /**
      * Builds the mapper of information.
-     * @param nniPort the nni port
-     * @param uniPort the uni port
-     * @param tagInfo the tag info
-     * @param downId the downstream meter id
-     * @param upId the upstream meter id
-     * @param downBpInfo the downstream bandwidth profile
-     * @param upBpInfo the upstream bandwidth profile
+     * @param nniPort       the nni port
+     * @param uniPort       the uni port
+     * @param tagInfo       the tag info
+     * @param downId        the downstream meter id
+     * @param upId          the upstream meter id
+     * @param downOltId     the downstream meter id of OLT device
+     * @param upOltId       the upstream meter id of OLT device
+     * @param downBpInfo    the downstream bandwidth profile
+     * @param upBpInfo      the upstream bandwidth profile
+     * @param downOltBpInfo the downstream bandwidth profile of OLT device
+     * @param upOltBpInfo   the upstream bandwidth profile of OLT device
      */
     SubscriberFlowInfo(AccessDevicePort nniPort, AccessDevicePort uniPort,
                        UniTagInformation tagInfo, MeterId downId, MeterId upId,
-                       String downBpInfo, String upBpInfo) {
+                       MeterId downOltId, MeterId upOltId,
+                       String downBpInfo, String upBpInfo,
+                       String downOltBpInfo, String upOltBpInfo) {
         this.devId = uniPort.deviceId();
         this.nniPort = nniPort;
         this.uniPort = uniPort;
         this.tagInfo = tagInfo;
         this.downId = downId;
         this.upId = upId;
+        this.downOltId = downOltId;
+        this.upOltId = upOltId;
         this.downBpInfo = downBpInfo;
         this.upBpInfo = upBpInfo;
+        this.downOltBpInfo = downOltBpInfo;
+        this.upOltBpInfo = upOltBpInfo;
     }
 
     /**
@@ -113,6 +127,24 @@
     }
 
     /**
+     * Gets the downstream meter id of this subscriber and flow information of OLT device.
+     *
+     * @return downstream olt meter id
+     */
+    MeterId getDownOltId() {
+        return downOltId;
+    }
+
+    /**
+     * Gets the upstream meter id of this subscriber and flow information of OLT device.
+     *
+     * @return upstream olt meter id
+     */
+    MeterId getUpOltId() {
+        return upOltId;
+    }
+
+    /**
      * Gets the downstream bandwidth profile of this subscriber and flow information.
      *
      * @return downstream bandwidth profile
@@ -131,7 +163,26 @@
     }
 
     /**
+     * Gets the downstream bandwidth profile of this subscriber and flow information of OLT device.
+     *
+     * @return downstream OLT bandwidth profile
+     */
+    String getDownOltBpInfo() {
+        return downOltBpInfo;
+    }
+
+    /**
+     * Gets the upstream bandwidth profile of this subscriber and flow information of OLT device.
+     *
+     * @return upstream OLT bandwidth profile.
+     */
+    String getUpOltBpInfo() {
+        return upOltBpInfo;
+    }
+
+    /**
      * Sets the upstream meter id.
+     *
      * @param upMeterId the upstream meter id
      */
     void setUpMeterId(MeterId upMeterId) {
@@ -140,12 +191,31 @@
 
     /**
      * Sets the downstream meter id.
+     *
      * @param downMeterId the downstream meter id
      */
     void setDownMeterId(MeterId downMeterId) {
         this.downId = downMeterId;
     }
 
+    /**
+     * Sets the upstream meter id of OLT.
+     *
+     * @param upOltMeterId the upstream meter id of OLT
+     */
+    void setUpOltMeterId(MeterId upOltMeterId) {
+        this.upOltId = upOltMeterId;
+    }
+
+    /**
+     * Sets the downstream meter id of OLT.
+     *
+     * @param downOltMeterId the downstream meter id of OLT
+     */
+    void setDownOltMeterId(MeterId downOltMeterId) {
+        this.downOltId = downOltMeterId;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -160,12 +230,14 @@
                 uniPort.equals(flowInfo.uniPort) &&
                 tagInfo.equals(flowInfo.tagInfo) &&
                 downBpInfo.equals(flowInfo.downBpInfo) &&
-                upBpInfo.equals(flowInfo.upBpInfo);
+                upBpInfo.equals(flowInfo.upBpInfo) &&
+                Objects.equals(downOltBpInfo, flowInfo.downOltBpInfo) &&
+                Objects.equals(upOltBpInfo, flowInfo.upOltBpInfo);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(devId, nniPort, uniPort, tagInfo, downBpInfo, upBpInfo);
+        return Objects.hash(devId, nniPort, uniPort, tagInfo, downBpInfo, upBpInfo, downOltBpInfo, upOltBpInfo);
     }
 
     @Override
@@ -177,8 +249,12 @@
                 .add("tagInfo", tagInfo)
                 .add("downId", downId)
                 .add("upId", upId)
+                .add("downOltId", downOltId)
+                .add("upOltId", upOltId)
                 .add("downBpInfo", downBpInfo)
                 .add("upBpInfo", upBpInfo)
+                .add("downOltBpInfo", downOltBpInfo)
+                .add("upOltBpInfo", upOltBpInfo)
                 .toString();
     }
 }
diff --git a/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
index 0012e28..9e46782 100644
--- a/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
@@ -35,18 +35,22 @@
     /**
      * Provisions or removes trap-to-controller DHCP packets.
      *
-     * @param port            the uni port for which this trap flow is designated
-     * @param upstreamMeterId the upstream meter id that includes the upstream
-     *                        bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
-     *                        null can be sent
-     * @param tagInformation  the uni tag (ctag, stag) information
-     * @param install         true to install the flow, false to remove the flow
-     * @param upstream        true if trapped packets are flowing upstream towards
-     *                        server, false if packets are flowing downstream towards client
-     * @param dhcpFuture      gets result of dhcp objective when complete
+     * @param port               the uni port for which this trap flow is designated
+     * @param upstreamMeterId    the upstream meter id that includes the upstream
+     *                           bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
+     *                           null can be sent
+     * @param upstreamOltMeterId the upstream meter id of OLT device that includes the upstream
+     *                           bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
+     *                           null can be sent
+     * @param tagInformation     the uni tag (ctag, stag) information
+     * @param install            true to install the flow, false to remove the flow
+     * @param upstream           true if trapped packets are flowing upstream towards
+     *                           server, false if packets are flowing downstream towards client
+     * @param dhcpFuture         gets result of dhcp objective when complete
      */
     void processDhcpFilteringObjectives(AccessDevicePort port,
                                         MeterId upstreamMeterId,
+                                        MeterId upstreamOltMeterId,
                                         UniTagInformation tagInformation,
                                         boolean install,
                                         boolean upstream,
@@ -55,14 +59,16 @@
     /**
      * Trap igmp packets to the controller.
      *
-     * @param port            Uni Port number
-     * @param upstreamMeterId upstream meter id that represents the upstream bandwidth profile
-     * @param tagInformation  the uni tag information of the subscriber
-     * @param install         the indicator to install or to remove the flow
-     * @param upstream        determines the direction of the flow
+     * @param port                  Uni Port number
+     * @param upstreamMeterId       upstream meter id that represents the upstream bandwidth profile
+     * @param upstreamOltMeterId    upstream meter id of OLT device that represents the upstream bandwidth profile
+     * @param tagInformation        the uni tag information of the subscriber
+     * @param install               the indicator to install or to remove the flow
+     * @param upstream              determines the direction of the flow
      */
     void processIgmpFilteringObjectives(AccessDevicePort port,
                                         MeterId upstreamMeterId,
+                                        MeterId upstreamOltMeterId,
                                         UniTagInformation tagInformation,
                                         boolean install,
                                         boolean upstream);
@@ -72,29 +78,39 @@
      *
      * @param port         the port for which this trap flow is designated
      * @param bpId         bandwidth profile id to add the related meter to the flow
+     * @param oltBpId      bandwidth profile id of OLT device to add the related meter to the flow
      * @param filterFuture completable future for this filtering objective operation
      * @param vlanId       the default or customer tag for a subscriber
      * @param install      true to install the flow, false to remove the flow
      */
-    void processEapolFilteringObjectives(AccessDevicePort port, String bpId,
+    void processEapolFilteringObjectives(AccessDevicePort port,
+                                         String bpId,
+                                         Optional<String> oltBpId,
                                          CompletableFuture<ObjectiveError> filterFuture,
-                                         VlanId vlanId, boolean install);
+                                         VlanId vlanId,
+                                         boolean install);
 
     /**
      * Trap PPPoE discovery packets to the controller.
      *
-     * @param port            the uni port for which this trap flow is designated
-     * @param upstreamMeterId the upstream meter id that includes the upstream
-     *                        bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
-     *                        null can be sent
-     * @param tagInformation  the uni tag (ctag, stag) information
-     * @param install         true to install the flow, false to remove the flow
-     * @param upstream        true if trapped packets are flowing upstream towards
-     *                        server, false if packets are flowing downstream towards client
+     * @param port               the uni port for which this trap flow is designated
+     * @param upstreamMeterId    the upstream meter id that includes the upstream
+     *                           bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
+     *                           null can be sent
+     * @param upstreamOltMeterId the upstream meter id of OLT device that includes the upstream
+     *                           bandwidth profile values such as PIR,CIR. If no meter id needs to be referenced,
+     *                           null can be sent
+     * @param tagInformation     the uni tag (ctag, stag) information
+     * @param install            true to install the flow, false to remove the flow
+     * @param upstream           true if trapped packets are flowing upstream towards
+     *                           server, false if packets are flowing downstream towards client
      **/
     void processPPPoEDFilteringObjectives(AccessDevicePort port,
-                                          MeterId upstreamMeterId, UniTagInformation tagInformation,
-                                          boolean install, boolean upstream);
+                                          MeterId upstreamMeterId,
+                                          MeterId upstreamOltMeterId,
+                                          UniTagInformation tagInformation,
+                                          boolean install,
+                                          boolean upstream);
 
     /**
      * Trap lldp packets to the controller.
@@ -135,31 +151,35 @@
      * Creates a ForwardingObjective builder for the upstream flows.
      * The treatment will contain push action
      *
-     * @param uplinkPort the nni port
-     * @param subscriberPort the uni port
-     * @param upstreamMeterId the meter id that is assigned to upstream flows
-     * @param uniTagInformation the uni tag information
+     * @param uplinkPort         the nni port
+     * @param subscriberPort     the uni port
+     * @param upstreamMeterId    the meter id that is assigned to upstream flows
+     * @param upstreamOltMeterId the meter id that is assigned to upstream flows for OLT device
+     * @param uniTagInformation  the uni tag information
      * @return ForwardingObjective.Builder
      */
     ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
                                                 AccessDevicePort subscriberPort,
                                                 MeterId upstreamMeterId,
+                                                MeterId upstreamOltMeterId,
                                                 UniTagInformation uniTagInformation);
 
     /**
      * Creates a ForwardingObjective builder for the downstream flows.
      * The treatment will contain pop action
      *
-     * @param uplinkPort the nni port
-     * @param subscriberPort the uni port
-     * @param downstreamMeterId the meter id that is assigned to downstream flows
-     * @param tagInformation the uni tag information
-     * @param macAddress the mac address
+     * @param uplinkPort           the nni port
+     * @param subscriberPort       the uni port
+     * @param downstreamMeterId    the meter id that is assigned to downstream flows
+     * @param downstreamOltMeterId the meter id that is assigned to downstream flows
+     * @param tagInformation       the uni tag information
+     * @param macAddress           the mac address
      * @return ForwardingObjective.Builder
      */
     ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
                                                   AccessDevicePort subscriberPort,
                                                   MeterId downstreamMeterId,
+                                                  MeterId downstreamOltMeterId,
                                                   UniTagInformation tagInformation,
                                                   Optional<MacAddress> macAddress);
 
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
index 7ae3c8c..752e44d 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -139,55 +139,55 @@
         oltFlowService.flowObjectiveService.clearQueue();
         // ensure upstream dhcp traps can be added and removed
         oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
         oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 false, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
 
         // Ensure upstream flow has no pcp unless properly specified.
         oltFlowService.processDhcpFilteringObjectives(uniPort2,
-                usMeterId, uniTagInfoNoPcp,
+                usMeterId, null, uniTagInfoNoPcp,
                 true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
 
         // ensure upstream flows are not added if uniTagInfo is missing dhcp requirement
         oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfoNoDhcpNoIgmp,
+                usMeterId, null, uniTagInfoNoDhcpNoIgmp,
                 true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
 
         // ensure downstream traps don't succeed without global config for nni ports
         oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 true, false, Optional.empty());
         oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 false, false, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
         // do global config for nni ports and now it should succeed
         oltFlowService.enableDhcpOnNni = true;
         oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 true, false, Optional.empty());
         oltFlowService.processDhcpFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 false, false, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 5;
 
         // turn on DHCPv6 and we should get 2 flows
         oltFlowService.enableDhcpV6 = true;
         oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 7;
 
         // turn off DHCPv4 and it's only v6
         oltFlowService.enableDhcpV4 = false;
         oltFlowService.processDhcpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 8;
 
@@ -204,28 +204,28 @@
         // ensure pppoed traps are not added if global config is off.
         oltFlowService.enablePppoe = false;
         oltFlowService.processPPPoEDFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 true, true);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 0;
 
         // ensure upstream pppoed traps can be added and removed
         oltFlowService.enablePppoe = true;
         oltFlowService.processPPPoEDFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 true, true);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
         oltFlowService.processPPPoEDFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 false, true);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
 
         // ensure downstream pppoed traps can be added and removed
         oltFlowService.processPPPoEDFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 true, false);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
         oltFlowService.processPPPoEDFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 false, false);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 4;
 
@@ -239,30 +239,30 @@
 
         // ensure igmp flows can be added and removed
         oltFlowService.processIgmpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfo,
+                usMeterId, null, uniTagInfo,
                 true, true);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
         oltFlowService.processIgmpFilteringObjectives(uniPort1, usMeterId,
-                uniTagInfo,
+                null, uniTagInfo,
                 false, true);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
 
         // ensure igmp flow is not added if uniTag has no igmp requirement
         oltFlowService.processIgmpFilteringObjectives(uniPort1,
-                usMeterId, uniTagInfoNoDhcpNoIgmp,
+                usMeterId, null, uniTagInfoNoDhcpNoIgmp,
                 true, true);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
 
         //ensure igmp flow on NNI fails without global setting
         oltFlowService.processIgmpFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 true, false);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
 
         // igmp trap on NNI should succeed with global config
         oltFlowService.enableIgmpOnNni = true;
         oltFlowService.processIgmpFilteringObjectives(nniPort,
-                null, null,
+                null, null, null,
                 true, false);
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
         // cleanup
@@ -277,12 +277,12 @@
 
         //will install
         oltFlowService.processEapolFilteringObjectives(uniPort1,
-                uniTagInfo.getUpstreamBandwidthProfile(), new CompletableFuture<>(),
+                uniTagInfo.getUpstreamBandwidthProfile(), Optional.empty(), new CompletableFuture<>(),
                 uniTagInfo.getUniTagMatch(), true);
 
         //bp profile doesn't exist
         oltFlowService.processEapolFilteringObjectives(uniPort1,
-                uniTagInfo.getDownstreamBandwidthProfile(), new CompletableFuture<>(),
+                uniTagInfo.getDownstreamBandwidthProfile(), Optional.empty(), new CompletableFuture<>(),
                 uniTagInfo.getUniTagMatch(), true);
     }
 
@@ -309,14 +309,14 @@
     @Test
     public void testUpBuilder() {
         ForwardingObjective objective =
-                oltFlowService.createUpBuilder(nniPort, uniPort1, usMeterId, uniTagInfo).add();
+                oltFlowService.createUpBuilder(nniPort, uniPort1, usMeterId, usMeterId, uniTagInfo).add();
         checkObjective(objective, true);
     }
 
     @Test
     public void testDownBuilder() {
         ForwardingObjective objective =
-                oltFlowService.createDownBuilder(nniPort, uniPort1, dsMeterId, uniTagInfo,
+                oltFlowService.createDownBuilder(nniPort, uniPort1, dsMeterId, dsMeterId, uniTagInfo,
                         Optional.of(macAddress)).remove();
         checkObjective(objective, false);
     }
diff --git a/impl/src/test/java/org/opencord/olt/impl/TestBase.java b/impl/src/test/java/org/opencord/olt/impl/TestBase.java
index 47507fb..87db5d1 100644
--- a/impl/src/test/java/org/opencord/olt/impl/TestBase.java
+++ b/impl/src/test/java/org/opencord/olt/impl/TestBase.java
@@ -54,6 +54,8 @@
     protected static final DeviceId DEVICE_ID_1 = DeviceId.deviceId(OLT_DEV_ID);
     protected MeterId usMeterId = MeterId.meterId(1);
     protected MeterId dsMeterId = MeterId.meterId(2);
+    protected MeterId usOltMeterId = MeterId.meterId(3);
+    protected MeterId dsOltMeterId = MeterId.meterId(4);
     protected String usBpId = "HSIA-US";
     protected String dsBpId = "HSIA-DS";
     protected DefaultApplicationId appId = new DefaultApplicationId(1, "OltServices");