[VOL-3113] Integrate Mac Learning App for Dynamic MAC Learning

Change-Id: If589ec038700657eca13f8d3d299feedcdc05c44
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index b95bc08..f822ae3 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -17,6 +17,7 @@
 
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Sets;
+import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
@@ -34,6 +35,7 @@
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
 import org.onosproject.net.Port;
 import org.onosproject.net.PortNumber;
 import org.onosproject.net.device.DeviceEvent;
@@ -45,6 +47,9 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.host.HostEvent;
+import org.onosproject.net.host.HostListener;
+import org.onosproject.net.host.HostService;
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMultimap;
@@ -162,6 +167,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected HostService hostService;
+
     /**
      * Default bandwidth profile id that is used for authentication trap flows.
      **/
@@ -184,6 +192,7 @@
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final ClusterEventListener clusterListener = new InternalClusterListener();
+    private final HostListener hostListener = new InternalHostListener();
 
     private ConsistentHasher hasher;
 
@@ -195,6 +204,7 @@
                                                                                         "olt-installer-%d"));
 
     protected ExecutorService eventExecutor;
+    protected ExecutorService hostEventExecutor;
     protected ExecutorService retryExecutor;
     protected ScheduledExecutorService provisionExecutor;
 
@@ -202,11 +212,13 @@
     private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
 
     protected Map<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
+    private ConsistentMultimap<ConnectPoint, SubscriberFlowInfo> waitingMacSubscribers;
 
     @Activate
     public void activate(ComponentContext context) {
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
                                                                         "events-%d", log));
+        hostEventExecutor = Executors.newFixedThreadPool(8, groupedThreads("onos/olt", "mac-events-%d", log));
         retryExecutor = Executors.newCachedThreadPool();
         provisionExecutor = Executors.newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
                 "provision-%d", log));
@@ -234,6 +246,18 @@
                 .withApplicationId(appId)
                 .build();
 
+        KryoNamespace macSerializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(SubscriberFlowInfo.class)
+                .register(UniTagInformation.class)
+                .build();
+
+        waitingMacSubscribers = storageService.<ConnectPoint, SubscriberFlowInfo>consistentMultimapBuilder()
+                .withName("volt-waiting-mac-subs")
+                .withSerializer(Serializer.using(macSerializer))
+                .withApplicationId(appId)
+                .build();
+
         pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
                 .withName("volt-pending-subs")
                 .withSerializer(Serializer.using(serializer))
@@ -265,6 +289,7 @@
         }
 
         deviceService.addListener(deviceListener);
+        hostService.addListener(hostListener);
         log.info("Started with Application ID {}", appId.id());
     }
 
@@ -273,8 +298,10 @@
         componentConfigService.unregisterProperties(getClass(), false);
         clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
+        hostService.removeListener(hostListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         eventExecutor.shutdown();
+        hostEventExecutor.shutdown();
         retryExecutor.shutdown();
         provisionExecutor.shutdown();
         log.info("Stopped");
@@ -395,7 +422,7 @@
 
         @Override
         public void run() {
-            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
             oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
                                                      defaultBpId, filterFuture,
                                                      VlanId.vlanId(EAPOL_DEFAULT_VLAN),
@@ -507,7 +534,7 @@
             //delete Eapol authentication flow with default bandwidth
             //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
             //install subscriber flows
-            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
             oltFlowService.processEapolFilteringObjectives(subsPort.deviceId(), subsPort.port(), defaultBpId,
                                                            filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
             filterFuture.thenAcceptAsync(filterStatus -> {
@@ -640,8 +667,8 @@
 
         log.info("Unprovisioning vlans for {} at {}/{}", uniTag, deviceId, subscriberPort);
 
-        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture();
-        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture();
+        CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
+        CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
 
         VlanId deviceVlan = uniTag.getPonSTag();
         VlanId subscriberVlan = uniTag.getPonCTag();
@@ -651,15 +678,49 @@
         MeterId downstreamMeterId = oltMeterService
                 .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
 
+        Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
+                getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort), subscriberVlan);
+        if (waitingMacSubFlowInfo.isPresent()) {
+            // only dhcp objectives applied previously, so only dhcp uninstallation objective will be processed
+            log.debug("Waiting MAC service removed and dhcp uninstallation objective will be processed. " +
+                    "waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
+            CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
+            oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
+                    upstreamMeterId, uniTag, false, true, Optional.of(dhcpFuture));
+            dhcpFuture.thenAcceptAsync(dhcpStatus -> {
+                AccessDeviceEvent.Type type;
+                if (dhcpStatus == null) {
+                    type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTERED;
+                    log.debug("Dhcp uninstallation objective was processed successfully for cTag {}, sTag {}, " +
+                                    "tpId {} and Device/Port:{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
+                            uniTag.getTechnologyProfileId(), subscriberPort);
+                    updateProgrammedSubscriber(new ConnectPoint(deviceId, subscriberPort), uniTag, false);
+                } else {
+                    type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_UNREGISTRATION_FAILED;
+                    log.error("Dhcp uninstallation objective was failed for cTag {}, sTag {}, " +
+                                    "tpId {} and Device/Port:{} :{}", uniTag.getPonCTag(), uniTag.getPonSTag(),
+                            uniTag.getTechnologyProfileId(), subscriberPort, dhcpStatus);
+                }
+                post(new AccessDeviceEvent(type, deviceId, deviceService.getPort(deviceId, subscriberPort),
+                        deviceVlan, subscriberVlan, uniTag.getTechnologyProfileId()));
+            });
+            return;
+        } else {
+            log.debug("There is no waiting MAC service for dev/port: {}/{} and subscriberVlan: {}",
+                    deviceId, subscriberPort, subscriberVlan);
+        }
+
         ForwardingObjective.Builder upFwd =
                 oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
+
+        Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
         ForwardingObjective.Builder downFwd =
-                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag);
+                oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag, macAddress);
 
         oltFlowService.processIgmpFilteringObjectives(deviceId, subscriberPort,
                                                       upstreamMeterId, uniTag, false, true);
         oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
-                                                      upstreamMeterId, uniTag, false, true);
+                                                      upstreamMeterId, uniTag, false, true, Optional.empty());
         oltFlowService.processPPPoEDFilteringObjectives(deviceId, subscriberPort,
                                                         upstreamMeterId, uniTag, false, true);
 
@@ -711,6 +772,22 @@
         }, oltInstallers);
     }
 
+    private Optional<SubscriberFlowInfo> getAndRemoveWaitingMacSubFlowInfoForCTag(ConnectPoint cp, VlanId cTag) {
+        SubscriberFlowInfo returnSubFlowInfo = null;
+        Collection<? extends SubscriberFlowInfo> subFlowInfoSet = waitingMacSubscribers.get(cp).value();
+        for (SubscriberFlowInfo subFlowInfo : subFlowInfoSet) {
+            if (subFlowInfo.getTagInfo().getPonCTag().equals(cTag)) {
+                returnSubFlowInfo = subFlowInfo;
+                break;
+            }
+        }
+        if (returnSubFlowInfo != null) {
+            waitingMacSubscribers.remove(cp, returnSubFlowInfo);
+            return Optional.of(returnSubFlowInfo);
+        }
+        return Optional.empty();
+    }
+
     /**
      * Adds subscriber vlan flows, dhcp, eapol and igmp trap flows for the related uni port.
      *
@@ -932,6 +1009,53 @@
      * @param subscriberFlowInfo relevant information for subscriber
      */
     private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
+        log.debug("Provisioning subscriber flows based on {}", subscriberFlowInfo);
+        UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
+        if (tagInfo.getIsDhcpRequired()) {
+            Optional<MacAddress> macAddress =
+                    getMacAddress(subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), tagInfo);
+            if (subscriberFlowInfo.getTagInfo().getEnableMacLearning()) {
+                ConnectPoint cp = new ConnectPoint(subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort());
+                if (macAddress.isPresent()) {
+                    log.debug("MAC Address {} obtained for {}", macAddress.get(), subscriberFlowInfo);
+                } else {
+                    waitingMacSubscribers.put(cp, subscriberFlowInfo);
+                    log.debug("Adding sub to waiting mac map: {}", subscriberFlowInfo);
+                }
+
+                CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
+                oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getDevId(),
+                        subscriberFlowInfo.getUniPort(), subscriberFlowInfo.getUpId(),
+                        tagInfo, true, true, Optional.of(dhcpFuture));
+                dhcpFuture.thenAcceptAsync(dhcpStatus -> {
+                    if (dhcpStatus != null) {
+                        log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
+                        if (macAddress.isEmpty()) {
+                            waitingMacSubscribers.remove(cp, subscriberFlowInfo);
+                        }
+                        post(new AccessDeviceEvent(AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED,
+                                subscriberFlowInfo.getDevId(),
+                                deviceService.getPort(subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort()),
+                                tagInfo.getPonSTag(), tagInfo.getPonCTag(), tagInfo.getTechnologyProfileId()));
+                    } else {
+                        log.debug("Dhcp Objective success for: {}", subscriberFlowInfo);
+                        if (macAddress.isPresent()) {
+                            continueProvisioningSubs(subscriberFlowInfo, macAddress);
+                        }
+                    }
+                });
+            } else {
+                log.debug("Dynamic MAC Learning disabled, so will not learn for: {}", subscriberFlowInfo);
+                // dhcp flows will handle after data plane flows
+                continueProvisioningSubs(subscriberFlowInfo, macAddress);
+            }
+        } else {
+            // dhcp not required for this service
+            continueProvisioningSubs(subscriberFlowInfo, Optional.empty());
+        }
+    }
+
+    private void continueProvisioningSubs(SubscriberFlowInfo subscriberFlowInfo, Optional<MacAddress> macAddress) {
         log.debug("Provisioning subscriber flows on {}/{} based on {}",
                   subscriberFlowInfo.getDevId(), subscriberFlowInfo.getUniPort(), subscriberFlowInfo);
         UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
@@ -957,7 +1081,7 @@
 
         ForwardingObjective.Builder downFwd =
                 oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
-                                                 subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo());
+                        subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo(), macAddress);
         flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
             @Override
             public void onSuccess(Objective objective) {
@@ -980,7 +1104,7 @@
                           subscriberFlowInfo.getUniPort(), downStatus);
                 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
             } else if (upStatus != null) {
-                log.error("Flow with innervlan {} and outerVlan {} on {}/{} failed downstream installation: {}",
+                log.error("Flow with innervlan {} and outerVlan {} on {}/{} failed upstream installation: {}",
                           tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
                           subscriberFlowInfo.getUniPort(), upStatus);
                 type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
@@ -991,10 +1115,14 @@
                                                                subscriberFlowInfo.getUniPort(),
                                                                tagInfo.getUpstreamBandwidthProfile(),
                                                                null, tagInfo.getPonCTag(), true);
-                oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getDevId(),
-                                                              subscriberFlowInfo.getUniPort(),
-                                                              subscriberFlowInfo.getUpId(),
-                                                              tagInfo, true, true);
+
+
+                if (!tagInfo.getEnableMacLearning()) {
+                    oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getDevId(),
+                                                                  subscriberFlowInfo.getUniPort(),
+                                                                  subscriberFlowInfo.getUpId(),
+                                                                  tagInfo, true, true, Optional.empty());
+                }
 
                 oltFlowService.processIgmpFilteringObjectives(subscriberFlowInfo.getDevId(),
                                                               subscriberFlowInfo.getUniPort(),
@@ -1018,6 +1146,40 @@
     }
 
     /**
+     * Gets mac address from tag info if present, else checks the host service.
+     *
+     * @param deviceId device ID
+     * @param portNumber uni port
+     * @param tagInformation tag info
+     * @return MAC Address of subscriber
+     */
+    private Optional<MacAddress> getMacAddress(DeviceId deviceId, PortNumber portNumber,
+                                               UniTagInformation tagInformation) {
+        if (isMacAddressValid(tagInformation)) {
+            log.debug("Got MAC Address {} from the uniTagInformation for dev/port {}/{} and cTag {}",
+                    tagInformation.getConfiguredMacAddress(), deviceId, portNumber, tagInformation.getPonCTag());
+            return Optional.of(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+        } else if (tagInformation.getEnableMacLearning()) {
+            Optional<Host> optHost = hostService.getConnectedHosts(new ConnectPoint(deviceId, portNumber))
+                    .stream().filter(host -> host.vlan().equals(tagInformation.getPonCTag())).findFirst();
+            if (optHost.isPresent()) {
+                log.debug("Got MAC Address {} from the hostService for dev/port {}/{} and cTag {}",
+                        optHost.get().mac(), deviceId, portNumber, tagInformation.getPonCTag());
+                return Optional.of(optHost.get().mac());
+            }
+        }
+        log.debug("Could not obtain MAC Address for dev/port {}/{} and cTag {}", deviceId, portNumber,
+                tagInformation.getPonCTag());
+        return Optional.empty();
+    }
+
+    private boolean isMacAddressValid(UniTagInformation tagInformation) {
+        return tagInformation.getConfiguredMacAddress() != null &&
+                !tagInformation.getConfiguredMacAddress().trim().equals("") &&
+                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
+    }
+
+    /**
      * Checks the subscriber uni tag list and find the uni tag information.
      * using the pon c tag, pon s tag and the technology profile id
      * May return Optional<null>
@@ -1206,6 +1368,44 @@
         return false;
     }
 
+    private class InternalHostListener implements HostListener {
+        @Override
+        public void event(HostEvent event) {
+            hostEventExecutor.execute(() -> {
+                Host host = event.subject();
+                switch (event.type()) {
+                    case HOST_ADDED:
+                        ConnectPoint cp = new ConnectPoint(host.location().deviceId(), host.location().port());
+                        Optional<SubscriberFlowInfo> optSubFlowInfo =
+                                getAndRemoveWaitingMacSubFlowInfoForCTag(cp, host.vlan());
+                        if (optSubFlowInfo.isPresent()) {
+                            log.debug("Continuing provisioning for waiting mac service. event: {}", event);
+                            continueProvisioningSubs(optSubFlowInfo.get(), Optional.of(host.mac()));
+                        } else {
+                            log.debug("There is no waiting mac sub. event: {}", event);
+                        }
+                        break;
+                    case HOST_UPDATED:
+                        if (event.prevSubject() != null && !event.prevSubject().mac().equals(event.subject().mac())) {
+                            log.debug("Subscriber's MAC address changed. devId/port: {}/{} vlan: {}",
+                                    host.location().deviceId(), host.location().port(), host.vlan());
+                            // TODO handle subscriber MAC Address changed
+                        } else {
+                            log.debug("Unhandled HOST_UPDATED event: {}", event);
+                        }
+                        break;
+                    default:
+                        log.debug("Unhandled host event received. event: {}", event);
+                }
+            });
+        }
+
+        @Override
+        public boolean isRelevant(HostEvent event) {
+            return isLocalLeader(event.subject().location().deviceId());
+        }
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
 
@@ -1381,6 +1581,7 @@
         private void handleDeviceDisconnection(Device device, boolean sendDisconnectedEvent, boolean sendUniEvent) {
             programmedDevices.remove(device.id());
             removeAllSubscribers(device.id());
+            removeWaitingMacSubs(device.id());
             //Handle case where OLT disconnects during subscriber provisioning
             pendingSubscribersForDevice.remove(device.id());
             oltFlowService.clearDeviceState(device.id());
@@ -1417,6 +1618,14 @@
             subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
         }
 
+        private void removeWaitingMacSubs(DeviceId deviceId) {
+            List<ConnectPoint> waitingMacKeys = waitingMacSubscribers.stream()
+                    .filter(cp -> cp.getKey().deviceId().equals(deviceId))
+                    .map(Map.Entry::getKey)
+                    .collect(toList());
+            waitingMacKeys.forEach(cp -> waitingMacSubscribers.removeAll(cp));
+        }
+
     }
 
     private class InternalClusterListener implements ClusterEventListener {
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index cf5fe14..caeab28 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -68,6 +68,7 @@
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.Optional;
 import java.util.Map;
 import java.util.Properties;
 import java.util.concurrent.BlockingQueue;
@@ -271,13 +272,15 @@
                                                MeterId upstreamMeterId,
                                                UniTagInformation tagInformation,
                                                boolean install,
-                                               boolean upstream) {
+                                               boolean upstream,
+                                               Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
         if (upstream) {
             // for UNI ports
             if (tagInformation != null && !tagInformation.getIsDhcpRequired()) {
                 log.debug("Dhcp provisioning is disabled for UNI port {} on "
                         + "device {} for service {}", port, devId,
                         tagInformation.getServiceName());
+                dhcpFuture.ifPresent(f -> f.complete(null));
                 return;
             }
         } else {
@@ -285,6 +288,7 @@
             if (!enableDhcpOnNni) {
                 log.debug("Dhcp provisioning is disabled for NNI port {} on "
                         + "device {}", port, devId);
+                dhcpFuture.ifPresent(f -> f.complete(null));
                 return;
             }
         }
@@ -304,7 +308,7 @@
 
             addDhcpFilteringObjectives(devId, port, udpSrc, udpDst, ethType,
                     upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
-                                       vlanPcp, upstream, install);
+                                       vlanPcp, upstream, install, dhcpFuture);
         }
 
         if (enableDhcpV6) {
@@ -316,7 +320,7 @@
 
             addDhcpFilteringObjectives(devId, port, udpSrc, udpDst, ethType,
                     upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
-                                       vlanPcp, upstream, install);
+                                       vlanPcp, upstream, install, dhcpFuture);
         }
     }
 
@@ -324,7 +328,7 @@
                                             EthType ethType, MeterId upstreamMeterId, int techProfileId, byte protocol,
                                             VlanId cTag, VlanId unitagMatch,
                                             Byte vlanPcp, boolean upstream,
-                                            boolean install) {
+                                            boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
 
         DefaultFilteringObjective.Builder builder = DefaultFilteringObjective.builder();
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
@@ -367,6 +371,7 @@
                 log.info("DHCP {} filter for dev/port {}/{} {}.",
                         (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
                         devId, port, (install) ? INSTALLED : REMOVED);
+                dhcpFuture.ifPresent(f -> f.complete(null));
             }
 
             @Override
@@ -375,6 +380,7 @@
                         (ethType.equals(EthType.EtherType.IPV4.ethType())) ? V4 : V6,
                         devId, port, (install) ? INSTALLATION : REMOVAL,
                         error);
+                dhcpFuture.ifPresent(f -> f.complete(error));
             }
         });
         flowObjectiveService.filter(devId, dhcpUpstream);
@@ -710,7 +716,7 @@
         log.info("{} flows for NNI port {} on device {}",
                  install ? "Adding" : "Removing", port, devId);
         processLldpFilteringObjective(devId, port, install);
-        processDhcpFilteringObjectives(devId, port, null, null, install, false);
+        processDhcpFilteringObjectives(devId, port, null, null, install, false, Optional.empty());
         processIgmpFilteringObjectives(devId, port, null, null, install, false);
         processPPPoEDFilteringObjectives(devId, port, null, null, install, false);
     }
@@ -816,7 +822,8 @@
     public ForwardingObjective.Builder createDownBuilder(PortNumber uplinkPort,
                                                          PortNumber subscriberPort,
                                                          MeterId downstreamMeterId,
-                                                         UniTagInformation tagInformation) {
+                                                         UniTagInformation tagInformation,
+                                                         Optional<MacAddress> macAddress) {
 
         //subscriberVlan can be any valid Vlan here including ANY to make sure the packet is tagged
         TrafficSelector.Builder selectorBuilder = DefaultTrafficSelector.builder()
@@ -833,11 +840,7 @@
             selectorBuilder.matchVlanPcp((byte) tagInformation.getDsPonSTagPriority());
         }
 
-        if (tagInformation.getConfiguredMacAddress() != null &&
-                !tagInformation.getConfiguredMacAddress().equals("") &&
-                !MacAddress.NONE.equals(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()))) {
-            selectorBuilder.matchEthDst(MacAddress.valueOf(tagInformation.getConfiguredMacAddress()));
-        }
+        macAddress.ifPresent(selectorBuilder::matchEthDst);
 
         TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder()
                 .popVlan()
diff --git a/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
index 7de5b1a..e3a612c 100644
--- a/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/internalapi/AccessDeviceFlowService.java
@@ -15,6 +15,7 @@
  */
 package org.opencord.olt.internalapi;
 
+import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.PortNumber;
@@ -23,6 +24,7 @@
 import org.onosproject.net.meter.MeterId;
 import org.opencord.sadis.UniTagInformation;
 
+import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -42,12 +44,14 @@
      * @param install         true to install the flow, false to remove the flow
      * @param upstream        true if trapped packets are flowing upstream towards
      *                        server, false if packets are flowing downstream towards client
+     * @param dhcpFuture      gets result of dhcp objective when complete
      */
     void processDhcpFilteringObjectives(DeviceId devId, PortNumber port,
                                         MeterId upstreamMeterId,
                                         UniTagInformation tagInformation,
                                         boolean install,
-                                        boolean upstream);
+                                        boolean upstream,
+                                        Optional<CompletableFuture<ObjectiveError>> dhcpFuture);
 
     /**
      * Trap igmp packets to the controller.
@@ -156,12 +160,14 @@
      * @param subscriberPort the uni port
      * @param downstreamMeterId the meter id that is assigned to downstream flows
      * @param tagInformation the uni tag information
+     * @param macAddress the mac address
      * @return ForwardingObjective.Builder
      */
     ForwardingObjective.Builder createDownBuilder(PortNumber uplinkPort,
                                                   PortNumber subscriberPort,
                                                   MeterId downstreamMeterId,
-                                                  UniTagInformation tagInformation);
+                                                  UniTagInformation tagInformation,
+                                                  Optional<MacAddress> macAddress);
 
     /**
      * Clears pending mappings and state for device.
diff --git a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
index 7defeb9..2947bf3 100644
--- a/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ b/impl/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -28,6 +28,7 @@
 import org.junit.Before;
 import org.junit.Test;
 import org.onlab.packet.EthType;
+import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.cluster.RoleInfo;
@@ -68,6 +69,8 @@
     PortNumber uniPortNumber2 = PortNumber.portNumber(2);
     PortNumber nniPortNumber = PortNumber.portNumber(65535);
 
+    MacAddress macAddress = MacAddress.valueOf("00:00:00:00:0a:0b");
+
     UniTagInformation.Builder tagInfoBuilder = new UniTagInformation.Builder();
     UniTagInformation uniTagInfo = tagInfoBuilder.setUniTagMatch(VlanId.vlanId((short) 35))
             .setPonCTag(VlanId.vlanId((short) 33))
@@ -126,55 +129,55 @@
         // ensure upstream dhcp traps can be added and removed
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
                 usMeterId, uniTagInfo,
-                true, true);
+                true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 1;
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
                 usMeterId, uniTagInfo,
-                false, true);
+                false, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 2;
 
         // Ensure upstream flow has no pcp unless properly specified.
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber2,
                 usMeterId, uniTagInfoNoPcp,
-                true, true);
+                true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
 
         // ensure upstream flows are not added if uniTagInfo is missing dhcp requirement
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
                 usMeterId, uniTagInfoNoDhcpNoIgmp,
-                true, true);
+                true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
 
         // ensure downstream traps don't succeed without global config for nni ports
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
                 null, null,
-                true, false);
+                true, false, Optional.empty());
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
                 null, null,
-                false, false);
+                false, false, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 3;
         // do global config for nni ports and now it should succeed
         oltFlowService.enableDhcpOnNni = true;
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
                 null, null,
-                true, false);
+                true, false, Optional.empty());
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, nniPortNumber,
                 null, null,
-                false, false);
+                false, false, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 5;
 
         // turn on DHCPv6 and we should get 2 flows
         oltFlowService.enableDhcpV6 = true;
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
                 usMeterId, uniTagInfo,
-                true, true);
+                true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 7;
 
         // turn off DHCPv4 and it's only v6
         oltFlowService.enableDhcpV4 = false;
         oltFlowService.processDhcpFilteringObjectives(DEVICE_ID_1, uniPortNumber,
                 usMeterId, uniTagInfo,
-                true, true);
+                true, true, Optional.empty());
         assert oltFlowService.flowObjectiveService.getPendingFlowObjectives().size() == 8;
 
         // cleanup
@@ -302,7 +305,8 @@
     @Test
     public void testDownBuilder() {
         ForwardingObjective objective =
-                oltFlowService.createDownBuilder(nniPortNumber, uniPortNumber, dsMeterId, uniTagInfo).remove();
+                oltFlowService.createDownBuilder(nniPortNumber, uniPortNumber, dsMeterId, uniTagInfo,
+                        Optional.of(macAddress)).remove();
         checkObjective(objective, false);
     }
 
@@ -334,6 +338,7 @@
 
         if (!upstream) {
             assert selector.getCriterion(Criterion.Type.METADATA) != null;
+            assert selector.getCriterion(Criterion.Type.ETH_DST) != null;
         }
     }