[VOL-3472] Mac Learning App HostProvider Implementation

Change-Id: I196dadd89e5f894048233fb1c49a99f10658b644
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/ConsistentHasher.java b/app/src/main/java/org/opencord/maclearner/app/impl/ConsistentHasher.java
new file mode 100644
index 0000000..66f7080
--- /dev/null
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/ConsistentHasher.java
@@ -0,0 +1,129 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.maclearner.app.impl;
+
+import com.google.common.hash.Hashing;
+import org.onosproject.cluster.NodeId;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Returns a server hosting a given key based on consistent hashing.
+ */
+public class ConsistentHasher {
+
+    private static class Entry implements Comparable<Entry> {
+        private final NodeId server;
+        private final int hash;
+
+        public Entry(NodeId server, int hash) {
+            this.server = server;
+            this.hash = hash;
+        }
+
+        public Entry(int hash) {
+            server = null;
+            this.hash = hash;
+        }
+
+        @Override
+        public int compareTo(Entry o) {
+            if (this.hash > o.hash) {
+                return 1;
+            } else if (this.hash < o.hash) {
+                return -1;
+            } // else
+            return 0;
+        }
+    }
+
+    private final int weight;
+
+    private List<Entry> table;
+
+    /**
+     * Creates a new hasher with the given list of servers.
+     *
+     * @param servers list of servers
+     * @param weight weight
+     */
+    public ConsistentHasher(List<NodeId> servers, int weight) {
+        this.weight = weight;
+
+        this.table = new ArrayList<>();
+
+        servers.forEach(this::addServer);
+    }
+
+    /**
+     * Adds a new server to the list of servers.
+     *
+     * @param server server ID
+     */
+    public synchronized void addServer(NodeId server) {
+        // Add weight number of buckets for each server
+        for (int i = 0; i < weight; i++) {
+            String label = server.toString() + i;
+            int hash = getHash(label);
+            Entry e = new Entry(server, hash);
+            table.add(e);
+        }
+
+        Collections.sort(table);
+    }
+
+    /**
+     * Removes a server from the list of servers.
+     *
+     * @param server server ID
+     */
+    public synchronized void removeServer(NodeId server) {
+        table.removeIf(e -> e.server.equals(server));
+    }
+
+    /**
+     * Hashes a given input string and finds that server that should
+     * handle the given ID.
+     *
+     * @param s input
+     * @return server ID
+     */
+    public synchronized NodeId hash(String s) {
+        Entry temp = new Entry(getHash(s));
+
+        int pos = Collections.binarySearch(this.table, temp);
+
+        // translate a negative not-found result into the closest following match
+        if (pos < 0) {
+            pos = Math.abs(pos + 1);
+        }
+
+        // wraparound if the hash was after the last entry in the table
+        if (pos == this.table.size()) {
+            pos = 0;
+        }
+
+        return table.get(pos).server;
+    }
+
+    private int getHash(String s) {
+        // stable, uniformly-distributed hash
+        return Hashing.murmur3_128().hashString(s, Charset.defaultCharset()).asInt();
+    }
+}
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerHostProvider.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerHostProvider.java
new file mode 100644
index 0000000..510ba43
--- /dev/null
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerHostProvider.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.maclearner.app.impl;
+
+import com.google.common.collect.Sets;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IpAddress;
+import org.onlab.packet.MacAddress;
+import org.onlab.packet.VlanId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.host.DefaultHostDescription;
+import org.onosproject.net.host.HostDescription;
+import org.onosproject.net.host.HostProvider;
+import org.onosproject.net.host.HostProviderRegistry;
+import org.onosproject.net.host.HostProviderService;
+import org.onosproject.net.host.HostService;
+import org.onosproject.net.provider.AbstractProvider;
+import org.onosproject.net.provider.ProviderId;
+import org.opencord.maclearner.api.MacLearnerHostLocationService;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.Set;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+/**
+ * Provider which uses an OpenFlow controller to detect network end-station hosts.
+ */
+@Component(immediate = true, service = {MacLearnerHostLocationService.class, HostProvider.class})
+public class MacLearnerHostProvider extends AbstractProvider
+        implements MacLearnerHostLocationService, HostProvider {
+
+    private final Logger log = getLogger(getClass());
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected HostProviderRegistry providerRegistry;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected HostService hostService;
+
+    protected HostProviderService providerService;
+
+    /**
+     * Creates an OpenFlow host provider.
+     */
+    public MacLearnerHostProvider() {
+        super(new ProviderId("maclearner", "org.opencord.maclearner.host"));
+    }
+
+    @Activate
+    public void activate(ComponentContext context) {
+        providerService = providerRegistry.register(this);
+        log.info("{} is started.", getClass().getSimpleName());
+    }
+
+    @Deactivate
+    public void deactivate() {
+        providerRegistry.unregister(this);
+        providerService = null;
+        log.info("{} is stopped.", getClass().getSimpleName());
+    }
+
+    @Override
+    public void triggerProbe(Host host) {
+        // Do nothing here
+    }
+
+    @Override
+    public void createOrUpdateHost(HostId hid, MacAddress srcMac, MacAddress dstMac, VlanId vlan, VlanId innerVlan,
+                                   EthType outerTpid, HostLocation hloc, HostLocation auxLoc, IpAddress ip) {
+        Set<HostLocation> primaryLocations = Collections.singleton(hloc);
+        Set<HostLocation> auxLocations = auxLoc != null ? Collections.singleton(auxLoc) : null;
+
+        HostDescription desc = ip == null || ip.isZero() || ip.isSelfAssigned() ?
+                new DefaultHostDescription(srcMac, vlan, primaryLocations, auxLocations, Sets.newHashSet(),
+                        innerVlan, outerTpid, false) :
+                new DefaultHostDescription(srcMac, vlan, primaryLocations, auxLocations, Sets.newHashSet(ip),
+                        innerVlan, outerTpid, false);
+        try {
+            providerService.hostDetected(hid, desc, false);
+        } catch (IllegalStateException e) {
+            printHostActionErrorLogs(hid, e);
+        }
+    }
+
+    @Override
+    public void updateHostIp(HostId hid, IpAddress ip) {
+        Host host = hostService.getHost(hid);
+        if (host == null) {
+            log.warn("Fail to update IP for {}. Host does not exist", hid);
+            return;
+        }
+
+        HostDescription desc = new DefaultHostDescription(hid.mac(), hid.vlanId(),
+                host.locations(), Sets.newHashSet(ip), false);
+        try {
+            providerService.hostDetected(hid, desc, false);
+        } catch (IllegalStateException e) {
+            printHostActionErrorLogs(hid, e);
+        }
+    }
+
+    @Override
+    public void vanishHost(MacAddress macAddress, VlanId vlanId) {
+        HostId hid = HostId.hostId(macAddress, vlanId);
+        try {
+            providerService.hostVanished(hid);
+        } catch (IllegalStateException e) {
+            printHostActionErrorLogs(hid, e);
+        }
+    }
+
+    private void printHostActionErrorLogs(HostId hid, Exception e) {
+        log.error("Host {} suppressed due to IllegalStateException", hid);
+        log.debug("Exception: ", e);
+    }
+
+}
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index 0a73e3e..df943e2 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -19,18 +19,33 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
-import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.Link;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyService;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
 import org.opencord.maclearner.api.DefaultMacLearner;
+import org.opencord.maclearner.api.MacLearnerHostLocationService;
 import org.opencord.maclearner.api.MacDeleteResult;
 import org.opencord.maclearner.api.MacLearnerEvent;
 import org.opencord.maclearner.api.MacLearnerKey;
@@ -70,6 +85,7 @@
 import java.net.URI;
 import java.util.Date;
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -82,12 +98,13 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.stream.Collectors.toList;
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.AUTO_CLEAR_MAC_MAPPING;
+import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.AUTO_CLEAR_MAC_MAPPING_DEFAULT;
 import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.CACHE_DURATION_DEFAULT;
 import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.CACHE_DURATION;
-import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.ENABLE_DEVICE_LISTENER;
-import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.ENABLE_DEVICE_LISTENER_DEFAULT;
 import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
 
 /**
@@ -96,7 +113,7 @@
 @Component(immediate = true,
         property = {
                 CACHE_DURATION + ":Integer=" + CACHE_DURATION_DEFAULT,
-                ENABLE_DEVICE_LISTENER + ":Boolean=" + ENABLE_DEVICE_LISTENER_DEFAULT
+                AUTO_CLEAR_MAC_MAPPING + ":Boolean=" + AUTO_CLEAR_MAC_MAPPING_DEFAULT
         },
         service = MacLearnerService.class
 )
@@ -107,6 +124,7 @@
 
     private static final String MAC_LEARNER_APP = "org.opencord.maclearner";
     private static final String MAC_LEARNER = "maclearner";
+    private static final String OLT_MANUFACTURER_KEY = "VOLTHA";
     private ApplicationId appId;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@@ -118,7 +136,7 @@
     protected CoreService coreService;
 
     @Reference(cardinality = MANDATORY)
-    protected MastershipService mastershipService;
+    protected ClusterService clusterService;
 
     @Reference(cardinality = MANDATORY)
     protected DeviceService deviceService;
@@ -129,14 +147,26 @@
     @Reference(cardinality = MANDATORY)
     protected StorageService storageService;
 
+    @Reference(cardinality = MANDATORY)
+    protected TopologyService topologyService;
 
     @Reference(cardinality = MANDATORY)
     protected ComponentConfigService componentConfigService;
 
-    private MacLearnerPacketProcessor macLearnerPacketProcessor =
+    @Reference(cardinality = MANDATORY)
+    protected MacLearnerHostLocationService hostLocService;
+
+    @Reference(cardinality = MANDATORY)
+    protected LinkService linkService;
+
+    private final MacLearnerPacketProcessor macLearnerPacketProcessor =
             new MacLearnerPacketProcessor();
 
-    private DeviceListener deviceListener = new InternalDeviceListener();
+    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+    private ConsistentHasher hasher;
+    public static final int HASH_WEIGHT = 10;
 
     /**
      * Minimum duration of mapping, mapping can be exist until 2*cacheDuration because of cleanerTimer fixed rate.
@@ -144,9 +174,9 @@
     protected int cacheDurationSec = CACHE_DURATION_DEFAULT;
 
     /**
-     * Register a device event listener for removing mappings from MAC Address Map for removed events.
+     * Removes mappings from MAC Address Map for removed events.
      */
-    protected boolean enableDeviceListener = ENABLE_DEVICE_LISTENER_DEFAULT;
+    protected boolean autoClearMacMapping = AUTO_CLEAR_MAC_MAPPING_DEFAULT;
 
     private ConsistentMap<DeviceId, Set<PortNumber>> ignoredPortsMap;
     private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
@@ -174,9 +204,13 @@
         //mac learner must process the packet before director processors
         packetService.addProcessor(macLearnerPacketProcessor,
                 PacketProcessor.advisor(2));
-        if (enableDeviceListener) {
-            deviceService.addListener(deviceListener);
-        }
+        List<NodeId> readyNodes = clusterService.getNodes().stream()
+                .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+                .map(ControllerNode::id)
+                .collect(toList());
+        hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+        clusterService.addListener(clusterListener);
+        deviceService.addListener(deviceListener);
         createSchedulerForClearMacMappings();
         log.info("{} is started.", getClass().getSimpleName());
     }
@@ -210,25 +244,42 @@
     private void clearExpiredMacMappings() {
         Date curDate = new Date();
         for (Map.Entry<MacLearnerKey, Versioned<MacLearnerValue>> entry : macAddressMap.entrySet()) {
-            if (!mastershipService.isLocalMaster(entry.getKey().getDeviceId())) {
-                return;
+            if (!isDeviceMine(entry.getKey().getDeviceId())) {
+                continue;
             }
             if (curDate.getTime() - entry.getValue().value().getTimestamp() > cacheDurationSec * 1000) {
-                removeFromMacAddressMap(entry.getKey());
+                removeFromMacAddressMap(entry.getKey(), false);
             }
         }
     }
 
+    /**
+     * Determines if this instance should handle this device based on
+     * consistent hashing.
+     *
+     * @param id device ID
+     * @return true if this instance should handle the device, otherwise false
+     */
+    private boolean isDeviceMine(DeviceId id) {
+        NodeId nodeId = hasher.hash(id.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("Node that will handle {} is {}", id, nodeId);
+        }
+        return nodeId.equals(clusterService.getLocalNode().id());
+    }
+
     @Deactivate
     public void deactivate() {
         if (scheduledFuture != null) {
             scheduledFuture.cancel(true);
         }
         packetService.removeProcessor(macLearnerPacketProcessor);
-        if (enableDeviceListener) {
-            deviceService.removeListener(deviceListener);
-        }
+        clusterService.removeListener(clusterListener);
+        deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(MacLearnerEvent.class);
+        if (eventExecutor != null) {
+            eventExecutor.shutdown();
+        }
         componentConfigService.unregisterProperties(getClass(), false);
         log.info("{} is stopped.", getClass().getSimpleName());
     }
@@ -244,17 +295,6 @@
                 setMacMappingCacheDuration(cacheDur);
             }
         }
-
-        Boolean enableDevListener = Tools.isPropertyEnabled(properties, ENABLE_DEVICE_LISTENER);
-        if (enableDevListener != null && enableDeviceListener != enableDevListener) {
-            enableDeviceListener = enableDevListener;
-            log.info("enableDeviceListener parameter changed to: {}", enableDeviceListener);
-            if (this.enableDeviceListener) {
-                deviceService.addListener(deviceListener);
-            } else {
-                deviceService.removeListener(deviceListener);
-            }
-        }
     }
 
     private Integer setMacMappingCacheDuration(Integer second) {
@@ -329,7 +369,7 @@
     public MacDeleteResult deleteMacMapping(DeviceId deviceId, PortNumber portNumber, VlanId vlanId) {
         log.info("Deleting MAC mapping for: {} {} {}", deviceId, portNumber, vlanId);
         MacLearnerKey key = new MacLearnerKey(deviceId, portNumber, vlanId);
-        return removeFromMacAddressMap(key);
+        return removeFromMacAddressMap(key, true);
     }
 
     @Override
@@ -343,7 +383,7 @@
             log.warn("MAC mapping not found for deviceId: {} and portNumber: {}", deviceId, portNumber);
             return false;
         }
-        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey()));
+        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey(), true));
         return true;
     }
 
@@ -357,7 +397,7 @@
             log.warn("MAC mapping not found for deviceId: {}", deviceId);
             return false;
         }
-        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey()));
+        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey(), true));
         return true;
     }
 
@@ -410,6 +450,10 @@
         post(macLearnerEvent);
     }
 
+    private boolean isOltDevice(Device device) {
+        return device.manufacturer().contains(OLT_MANUFACTURER_KEY);
+    }
+
     private class MacLearnerPacketProcessor implements PacketProcessor {
 
         @Override
@@ -422,8 +466,52 @@
                 return;
             }
 
-            PortNumber sourcePort = context.inPacket().receivedFrom().port();
-            DeviceId deviceId = context.inPacket().receivedFrom().deviceId();
+            ConnectPoint cp = context.inPacket().receivedFrom();
+            DeviceId deviceId = cp.deviceId();
+            PortNumber sourcePort = cp.port();
+            MacAddress srcMac = packet.getSourceMAC();
+            MacAddress dstMac = packet.getDestinationMAC();
+
+            Device device = deviceService.getDevice(deviceId);
+            if (!isOltDevice(device)) { // not handle non OLT device packets
+                log.debug("Packet received from non-OLT device: {}. Returning.", deviceId);
+                return;
+            }
+
+            if (srcMac.isBroadcast() || srcMac.isMulticast()) {
+                log.debug("Broadcast or multicast packet received from: {}. Returning.", cp);
+                return;
+            }
+
+            // Ignore location probes
+            if (dstMac.isOnos() && !MacAddress.NONE.equals(dstMac)) {
+                log.debug("Location probe. cp: {}", cp);
+                return;
+            }
+
+            // If this arrived on control port, bail out.
+            if (cp.port().isLogical()) {
+                log.debug("Packet received from logical port: {}", cp);
+                return;
+            }
+
+            // If this is not an edge port, bail out.
+            Topology topology = topologyService.currentTopology();
+            if (topologyService.isInfrastructure(topology, cp)) {
+                log.debug("Packet received from non-edge port: {}", cp);
+                return;
+            }
+
+            VlanId vlan = VlanId.vlanId(packet.getVlanID());
+            VlanId outerVlan = VlanId.vlanId(packet.getQinQVID());
+            VlanId innerVlan = VlanId.NONE;
+            EthType outerTpid = EthType.EtherType.UNKNOWN.ethType();
+            // Set up values for double-tagged hosts
+            if (outerVlan.toShort() != Ethernet.VLAN_UNTAGGED) {
+                innerVlan = vlan;
+                vlan = outerVlan;
+                outerTpid = EthType.EtherType.lookup(packet.getQinQTPID()).ethType();
+            }
 
             Versioned<Set<PortNumber>> ignoredPortsOfDevice = ignoredPortsMap.get(deviceId);
             if (ignoredPortsOfDevice != null && ignoredPortsOfDevice.value().contains(sourcePort)) {
@@ -438,11 +526,24 @@
                     UDP udpPacket = (UDP) ipv4Packet.getPayload();
                     int udpSourcePort = udpPacket.getSourcePort();
                     if ((udpSourcePort == UDP.DHCP_CLIENT_PORT) || (udpSourcePort == UDP.DHCP_SERVER_PORT)) {
+                        // Update host location
+                        HostLocation hloc = new HostLocation(cp, System.currentTimeMillis());
+                        HostLocation auxLocation = null;
+                        Optional<Link> optLink = linkService.getDeviceLinks(deviceId).stream().findFirst();
+                        if (optLink.isPresent()) {
+                            Link link = optLink.get();
+                            auxLocation = !link.src().deviceId().equals(deviceId) ?
+                                    new HostLocation(link.src(), System.currentTimeMillis()) :
+                                    new HostLocation(link.dst(), System.currentTimeMillis());
+                        } else {
+                            log.debug("Link not found for device {}", deviceId);
+                        }
+                        hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
+                                packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
+                                hloc, auxLocation, null);
                         DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
                         //This packet is dhcp.
-                        VlanId vlanId = packet.getQinQVID() != -1 ?
-                                VlanId.vlanId(packet.getQinQVID()) : VlanId.vlanId(packet.getVlanID());
-                        processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlanId);
+                        processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);
                     }
                 }
             }
@@ -469,6 +570,11 @@
             if (incomingPacketType.equals(DHCP.MsgType.DHCPDISCOVER) ||
                     incomingPacketType.equals(DHCP.MsgType.DHCPREQUEST)) {
                 addToMacAddressMap(deviceId, sourcePort, vlanId, packet.getSourceMAC());
+            } else if (incomingPacketType.equals(DHCP.MsgType.DHCPACK)) {
+                MacAddress hostMac = MacAddress.valueOf(dhcpPayload.getClientHardwareAddress());
+                VlanId hostVlan = VlanId.vlanId(packet.getVlanID());
+                HostId hostId = HostId.hostId(hostMac, hostVlan);
+                hostLocService.updateHostIp(hostId, IpAddress.valueOf(dhcpPayload.getYourIPAddress()));
             }
         }
 
@@ -495,7 +601,8 @@
                         portNumber,
                         vlanId,
                         prevMacAddress.value().getMacAddress());
-            } else if (prevMacAddress == null || !prevMacAddress.value().getMacAddress().equals(macAddress)) {
+            }
+            if (prevMacAddress == null || !prevMacAddress.value().getMacAddress().equals(macAddress)) {
                 // Not sending event for already mapped
                 log.info("Mapped MAC: {} for port: {} of deviceId: {} and vlanId: {}",
                         macAddress, portNumber, deviceId, vlanId);
@@ -505,7 +612,7 @@
 
     }
 
-    private MacDeleteResult removeFromMacAddressMap(MacLearnerKey macLearnerKey) {
+    private MacDeleteResult removeFromMacAddressMap(MacLearnerKey macLearnerKey, boolean vanishHost) {
         Versioned<MacLearnerValue> verMacAddress = macAddressMap.remove(macLearnerKey);
         if (verMacAddress != null) {
             log.info("Mapping removed. deviceId: {} portNumber: {} vlanId: {} macAddress: {}",
@@ -516,6 +623,9 @@
                     macLearnerKey.getPortNumber(),
                     macLearnerKey.getVlanId(),
                     verMacAddress.value().getMacAddress());
+            if (vanishHost) {
+                hostLocService.vanishHost(verMacAddress.value().getMacAddress(), macLearnerKey.getVlanId());
+            }
             return MacDeleteResult.SUCCESSFUL;
         } else {
             log.warn("MAC not removed, because mapping not found for deviceId: {} and portNumber: {} and vlanId: {}",
@@ -531,12 +641,18 @@
         @Override
         public void event(DeviceEvent event) {
             eventExecutor.execute(() -> {
+                Device device = event.subject();
+                log.debug("Device event received: {}", event.type());
                 switch (event.type()) {
                     case DEVICE_REMOVED:
-                        deleteMacMappings(event.subject().id());
+                        if (autoClearMacMapping) {
+                            deleteMacMappings(device.id());
+                        }
                         break;
                     case PORT_REMOVED:
-                        deleteMacMappings(event.subject().id(), event.port().number());
+                        if (autoClearMacMapping) {
+                            deleteMacMappings(device.id(), event.port().number());
+                        }
                         break;
                     default:
                         log.debug("Unhandled device event for Mac Learner: {}", event.type());
@@ -546,9 +662,21 @@
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            return mastershipService.isLocalMaster(event.subject().id());
+            return isDeviceMine(event.subject().id());
         }
 
     }
 
+    private class InternalClusterListener implements ClusterEventListener {
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+                hasher.addServer(event.subject().id());
+            }
+            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+                hasher.removeServer(event.subject().id());
+            }
+        }
+    }
+
 }
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/maclearner/app/impl/OsgiPropertyConstants.java
index d57d322..104b58e 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/OsgiPropertyConstants.java
@@ -13,7 +13,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.opencord.maclearner.app.impl;
 
 /**
@@ -27,7 +26,7 @@
     public static final String CACHE_DURATION = "cacheDurationSec";
     public static final int CACHE_DURATION_DEFAULT = 86400; // 1 day
 
-    public static final String ENABLE_DEVICE_LISTENER = "enableDeviceListener";
-    public static final boolean ENABLE_DEVICE_LISTENER_DEFAULT = false;
+    public static final String AUTO_CLEAR_MAC_MAPPING = "autoClearMacMapping";
+    public static final boolean AUTO_CLEAR_MAC_MAPPING_DEFAULT = false;
 
 }
diff --git a/app/src/main/java/org/opencord/maclearner/app/rest/MacLearnerWebResource.java b/app/src/main/java/org/opencord/maclearner/app/rest/MacLearnerWebResource.java
index 334540e..692e787 100644
--- a/app/src/main/java/org/opencord/maclearner/app/rest/MacLearnerWebResource.java
+++ b/app/src/main/java/org/opencord/maclearner/app/rest/MacLearnerWebResource.java
@@ -43,8 +43,9 @@
 import java.util.Optional;
 import java.util.Set;
 
+import static javax.ws.rs.core.Response.Status.NO_CONTENT;
+import static javax.ws.rs.core.Response.Status.OK;
 import static org.slf4j.LoggerFactory.getLogger;
-import static javax.ws.rs.core.Response.Status.*;
 
 /**
  * Mac Learner web resource.
diff --git a/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java b/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
index 1e05637..adb59db 100644
--- a/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
+++ b/app/src/test/java/org/opencord/maclearner/app/impl/MacLearnerManagerTest.java
@@ -21,10 +21,17 @@
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.VlanId;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.PortNumber;
 
+import java.io.IOException;
 import java.util.Optional;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -33,7 +40,7 @@
 public class MacLearnerManagerTest extends TestBaseMacLearner {
 
     @Before
-    public void setUp() {
+    public void setUp() throws IOException {
         setUpApp();
     }
 
@@ -45,7 +52,12 @@
     private static final MacAddress CLIENT_MAC = MacAddress.valueOf("00:00:00:00:00:01");
     private static final VlanId CLIENT_VLAN = VlanId.vlanId("100");
     private static final VlanId CLIENT_QINQ_VLAN = VlanId.vlanId("200");
-    private static final ConnectPoint CLIENT_CP = ConnectPoint.deviceConnectPoint("of:0000000000000001/1");
+    public static final DeviceId OLT_DEVICE_ID = DeviceId.deviceId("of:0000b86a974385f7");
+    public static final PortNumber UNI_PORT = PortNumber.portNumber(16);
+    public static final ConnectPoint CLIENT_CP = new ConnectPoint(OLT_DEVICE_ID, UNI_PORT);
+    public static final DeviceId AGG_DEVICE_ID = DeviceId.deviceId("of:0000000000000001");
+    public static final PortNumber AGG_OLT_PORT = PortNumber.portNumber(10);
+    public static final PortNumber OLT_NNI_PORT = PortNumber.portNumber(1048576);
 
     @Test
     public void testSingleTagDhcpPacket() {
@@ -71,4 +83,21 @@
         assertEquals(CLIENT_MAC, macAddress.get());
     }
 
+    @Test
+    public void testHostProviding() {
+        packetService.processPacket(new TestDhcpRequestPacketContext(CLIENT_MAC,
+                CLIENT_VLAN,
+                CLIENT_QINQ_VLAN,
+                CLIENT_CP));
+        HostId hostId = HostId.hostId(CLIENT_MAC, CLIENT_QINQ_VLAN);
+        Host host = hostService.getHost(hostId);
+        assertNotNull(host);
+        assertEquals(OLT_DEVICE_ID, host.location().deviceId());
+        assertEquals(UNI_PORT, host.location().port());
+        Optional<HostLocation> optAuxLoc = host.auxLocations().stream().findFirst();
+        assertTrue(optAuxLoc.isPresent());
+        assertEquals(AGG_DEVICE_ID, optAuxLoc.get().deviceId());
+        assertEquals(AGG_OLT_PORT, optAuxLoc.get().port());
+    }
+
 }
diff --git a/app/src/test/java/org/opencord/maclearner/app/impl/TestBaseMacLearner.java b/app/src/test/java/org/opencord/maclearner/app/impl/TestBaseMacLearner.java
index c9d4af5..ffbf0c3 100644
--- a/app/src/test/java/org/opencord/maclearner/app/impl/TestBaseMacLearner.java
+++ b/app/src/test/java/org/opencord/maclearner/app/impl/TestBaseMacLearner.java
@@ -15,29 +15,59 @@
  */
 package org.opencord.maclearner.app.impl;
 
+import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.junit.TestUtils;
+import org.onlab.packet.ChassisId;
 import org.onlab.packet.DHCP;
+import org.onlab.packet.EthType;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.Ip4Address;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onosproject.cfg.ComponentConfigAdapter;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.DefaultControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.event.DefaultEventSinkRegistry;
 import org.onosproject.event.Event;
 import org.onosproject.event.EventDeliveryService;
 import org.onosproject.event.EventSink;
 import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultDevice;
+import org.onosproject.net.DefaultHost;
+import org.onosproject.net.DefaultLink;
+import org.onosproject.net.Device;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.Host;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.Link;
+import org.onosproject.net.config.Config;
+import org.onosproject.net.config.ConfigApplyDelegate;
+import org.onosproject.net.config.basics.HostLearningConfig;
+import org.onosproject.net.config.basics.HostLearningConfigTest;
+import org.onosproject.net.device.DeviceListener;
+import org.onosproject.net.device.DeviceServiceAdapter;
+import org.onosproject.net.host.HostDescription;
+import org.onosproject.net.host.HostProvider;
+import org.onosproject.net.host.HostProviderService;
+import org.onosproject.net.host.HostServiceAdapter;
+import org.onosproject.net.link.LinkServiceAdapter;
 import org.onosproject.net.packet.DefaultInboundPacket;
 import org.onosproject.net.packet.InboundPacket;
 import org.onosproject.net.packet.OutboundPacket;
@@ -45,6 +75,10 @@
 import org.onosproject.net.packet.PacketContextAdapter;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketServiceAdapter;
+import org.onosproject.net.provider.AbstractProviderService;
+import org.onosproject.net.provider.ProviderId;
+import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyServiceAdapter;
 import org.onosproject.store.service.AsyncConsistentMap;
 import org.onosproject.store.service.AsyncDistributedSet;
 import org.onosproject.store.service.AtomicCounter;
@@ -60,6 +94,8 @@
 import org.onosproject.store.service.StorageServiceAdapter;
 import org.onosproject.store.service.Versioned;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.lang.reflect.Field;
 import java.nio.ByteBuffer;
 import java.util.Collection;
@@ -72,37 +108,85 @@
 import java.util.function.BiFunction;
 
 import static com.google.common.base.Preconditions.checkState;
+import static org.onosproject.cluster.NodeId.nodeId;
+import static org.onosproject.net.NetTestTools.PID;
+import static org.opencord.maclearner.app.impl.MacLearnerManagerTest.AGG_DEVICE_ID;
+import static org.opencord.maclearner.app.impl.MacLearnerManagerTest.AGG_OLT_PORT;
+import static org.opencord.maclearner.app.impl.MacLearnerManagerTest.CLIENT_CP;
+import static org.opencord.maclearner.app.impl.MacLearnerManagerTest.OLT_DEVICE_ID;
+import static org.opencord.maclearner.app.impl.MacLearnerManagerTest.OLT_NNI_PORT;
 
 /**
  * Mac Learner mock services class.
  */
 public abstract class TestBaseMacLearner {
 
+    protected static final String C1 = "C1";
+    protected static final String C2 = "C2";
+    protected static final String C3 = "C3";
+
+    protected static final NodeId CNID_1 = nodeId(C1);
+    protected static final NodeId CNID_2 = nodeId(C2);
+    protected static final NodeId CNID_3 = nodeId(C3);
+
+    protected static final ControllerNode CNODE_1 = new DefaultControllerNode(CNID_1, "10.0.0.1");
+    protected static final ControllerNode CNODE_2 = new DefaultControllerNode(CNID_2, "10.0.0.2");
+    protected static final ControllerNode CNODE_3 = new DefaultControllerNode(CNID_3, "10.0.0.3");
+
     private static final Ip4Address SERVER_IP = Ip4Address.valueOf("10.0.3.253");
     private static final Ip4Address INTERFACE_IP = Ip4Address.valueOf("10.0.3.254");
 
     protected MacLearnerManager macLearnerManager;
-    protected ObjectMapper mapper;
-    protected ApplicationId subject;
+    protected ApplicationId appId;
+    protected HostLearningConfig hostLearningConfig;
 
     protected ComponentConfigService componentConfigService = new MockComponentConfigService();
     protected MockCoreService coreService = new MockCoreService();
     protected MockStorageService storageService = new MockStorageService();
     protected MockPacketService packetService = new MockPacketService();
+    protected MockClusterService clusterService = new MockClusterService();
+    protected MockDeviceService deviceService = new MockDeviceService();
+    protected MockTopologyService topologyService = new MockTopologyService();
+    protected MockLinkService linkService = new MockLinkService();
+    protected MacLearnerHostProvider macLearnerHostProvider = new MacLearnerHostProvider();
+    protected MockHostService hostService = new MockHostService(Sets.newHashSet());
 
-    public void setUpApp() {
+    public void setUpApp() throws IOException {
         macLearnerManager = new MacLearnerManager();
         macLearnerManager.componentConfigService = this.componentConfigService;
         macLearnerManager.coreService = this.coreService;
         macLearnerManager.storageService = this.storageService;
         macLearnerManager.packetService = this.packetService;
+        macLearnerManager.clusterService = this.clusterService;
+        macLearnerManager.deviceService = this.deviceService;
+        macLearnerManager.topologyService = this.topologyService;
+        macLearnerManager.linkService = this.linkService;
+        hostLearningConfig = new HostLearningConfig();
+        InputStream jsonStream = HostLearningConfigTest.class
+                .getResourceAsStream("/host-learning-config.json");
+
+        ConnectPoint subject = CLIENT_CP;
+        String key = CoreService.CORE_APP_NAME;
+        ObjectMapper mapper = new ObjectMapper();
+        JsonNode jsonNode = mapper.readTree(jsonStream);
+        ConfigApplyDelegate delegate = new MockDelegate();
+
+        hostLearningConfig = new HostLearningConfig();
+        hostLearningConfig.init(subject, key, jsonNode, mapper, delegate);
+        macLearnerHostProvider.providerService = new MockHostProviderService(macLearnerHostProvider);
+        macLearnerManager.hostLocService = this.macLearnerHostProvider;
         injectEventDispatcher(macLearnerManager, new MockEventDispatcher());
-        mapper = new ObjectMapper();
-        subject = macLearnerManager.coreService.registerApplication("org.opencord.maclearner");
+        appId = macLearnerManager.coreService.registerApplication("org.opencord.maclearner");
 
         macLearnerManager.activate();
     }
 
+    private class MockDelegate implements ConfigApplyDelegate {
+        @Override
+        public void onApply(Config config) {
+        }
+    }
+
     /**
      * Mocks an instance of {@link ApplicationId} so that the application
      * component under test can query and use its application ID.
@@ -167,6 +251,144 @@
 
     }
 
+    private static class MockClusterService extends ClusterServiceAdapter {
+        private final Map<NodeId, ControllerNode> nodes = new HashMap<>();
+        private final Map<NodeId, ControllerNode.State> states = new HashMap<>();
+
+        MockClusterService() {
+            nodes.put(CNODE_1.id(), CNODE_1);
+            nodes.put(CNODE_2.id(), CNODE_2);
+            nodes.put(CNODE_3.id(), CNODE_3);
+
+            states.put(CNODE_1.id(), ControllerNode.State.READY);
+            states.put(CNODE_2.id(), ControllerNode.State.ACTIVE);
+            states.put(CNODE_3.id(), ControllerNode.State.ACTIVE);
+        }
+
+        @Override
+        public Set<ControllerNode> getNodes() {
+            return ImmutableSet.copyOf(nodes.values());
+        }
+
+        @Override
+        public ControllerNode getNode(NodeId nodeId) {
+            return nodes.get(nodeId);
+        }
+
+        @Override
+        public ControllerNode.State getState(NodeId nodeId) {
+            return states.get(nodeId);
+        }
+    }
+
+    /**
+     * Mocks the device service of ONOS so that the application under test can
+     * register listeners.
+     */
+    protected static class MockHostService extends HostServiceAdapter {
+        private Set<Host> hosts;
+
+        MockHostService(Set<Host> hosts) {
+            this.hosts = ImmutableSet.copyOf(hosts);
+        }
+
+        @Override
+        public Set<Host> getHosts() {
+            return hosts;
+        }
+
+        @Override
+        public Host getHost(HostId hostId) {
+            return hosts.stream().filter(host -> hostId.equals(host.id())).findFirst().orElse(null);
+        }
+    }
+
+    /**
+     * Mocks the device service of ONOS so that the application under test can
+     * register listeners.
+     */
+    private static class MockDeviceService extends DeviceServiceAdapter {
+
+        private List<DeviceListener> listeners = Lists.newArrayList();
+
+        @Override
+        public Device getDevice(DeviceId deviceId) {
+            if (deviceId.equals(OLT_DEVICE_ID)) {
+                return new DefaultDevice(null, OLT_DEVICE_ID, Device.Type.SWITCH,
+                        "VOLTHA Project", "open_pon", "open_pon",
+                        "BBSIM_OLT_1", new ChassisId("a0a0a0a0a01"));
+            } else {
+                return null;
+            }
+        }
+
+        @Override
+        public void addListener(DeviceListener listener) {
+            listeners.add(listener);
+        }
+
+        @Override
+        public void removeListener(DeviceListener listener) {
+            listeners.remove(listener);
+        }
+    }
+
+    /**
+     * Mocks the topology service of ONOS so that the application under test can
+     * check fake topology.
+     */
+    private static class MockTopologyService extends TopologyServiceAdapter {
+
+        private final Topology topology = new MockTopology();
+
+        @Override
+        public Topology currentTopology() {
+            return topology;
+        }
+
+        @Override
+        public boolean isInfrastructure(Topology topology, ConnectPoint connectPoint) {
+            return false;
+        }
+    }
+
+    private static class MockTopology implements Topology {
+        @Override
+        public long time() {
+            return 11111L;
+        }
+
+        @Override
+        public long creationTime() {
+            return 22222L;
+        }
+
+        @Override
+        public long computeCost() {
+            return 0;
+        }
+
+        @Override
+        public int clusterCount() {
+            return 2;
+        }
+
+        @Override
+        public int deviceCount() {
+            return 6;
+        }
+
+        @Override
+        public int linkCount() {
+            return 4;
+        }
+
+        @Override
+        public ProviderId providerId() {
+            return ProviderId.NONE;
+        }
+    }
+
     /**
      * Mocks the storage service of ONOS so that the application under test can
      * use consistent maps.
@@ -388,6 +610,24 @@
     }
 
     /**
+     * Mocks the link service of ONOS so that the application under test can
+     * observe links.
+     */
+    public static class MockLinkService extends LinkServiceAdapter {
+
+        Link link = DefaultLink.builder()
+                .type(Link.Type.DIRECT)
+                .providerId(PID)
+                .src(new ConnectPoint(OLT_DEVICE_ID, OLT_NNI_PORT))
+                .dst(new ConnectPoint(AGG_DEVICE_ID, AGG_OLT_PORT)).build();
+
+        @Override
+        public Set<Link> getDeviceLinks(DeviceId deviceId) {
+            return Sets.newHashSet(link);
+        }
+    }
+
+    /**
      * Implements event delivery system that delivers events synchronously, or
      * in-line with the post method invocation.
      */
@@ -429,6 +669,60 @@
     }
 
     /**
+     * Mock HostProviderService.
+     */
+    private class MockHostProviderService
+            extends AbstractProviderService<HostProvider>
+            implements HostProviderService {
+        private HostId hostId = null;
+        private HostDescription hostDescription = null;
+        private String event = null;
+
+        public MockHostProviderService(HostProvider provider) {
+            super(provider);
+        }
+
+        @Override
+        public void hostDetected(HostId hostId, HostDescription hostDescription, boolean replaceIps) {
+            this.hostId = hostId;
+            this.hostDescription = hostDescription;
+            this.event = "hostDetected";
+            Set<Host> previousHosts = Sets.newHashSet(hostService.getHosts());
+            previousHosts.add(new DefaultHost(provider().id(), hostId, hostDescription.hwAddress(),
+                    hostDescription.vlan(), hostDescription.locations(), hostDescription.auxLocations(),
+                    hostDescription.ipAddress(), VlanId.NONE,
+                    EthType.EtherType.UNKNOWN.ethType(), false, false));
+            hostService = new MockHostService(previousHosts);
+        }
+
+        @Override
+        public void hostVanished(HostId hostId) {
+            this.hostId = hostId;
+            this.event = "hostVanished";
+            Set<Host> previousHosts = Sets.newHashSet(hostService.getHosts());
+            Host removedHost = hostService.getHost(hostId);
+            previousHosts.remove(removedHost);
+            hostService = new MockHostService(previousHosts);
+        }
+
+        @Override
+        public void removeIpFromHost(HostId hostId, IpAddress ipAddress) {
+            // not implemented
+        }
+
+        @Override
+        public void removeLocationFromHost(HostId hostId, HostLocation location) {
+            // not implemented
+        }
+
+        public void clear() {
+            this.hostId = null;
+            this.hostDescription = null;
+            this.event = null;
+        }
+    }
+
+    /**
      * Generates DHCP REQUEST packet.
      */
     protected static class TestDhcpRequestPacketContext extends PacketContextAdapter {