[VOL-3472] Mac Learning App HostProvider Implementation

Change-Id: I196dadd89e5f894048233fb1c49a99f10658b644
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index 0a73e3e..df943e2 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -19,18 +19,33 @@
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IpAddress;
 import org.onlab.packet.VlanId;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
-import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.Link;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
 import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyService;
 import org.onosproject.store.service.ConsistentMap;
 import org.onosproject.store.service.StorageService;
 import org.onosproject.store.service.Versioned;
 import org.opencord.maclearner.api.DefaultMacLearner;
+import org.opencord.maclearner.api.MacLearnerHostLocationService;
 import org.opencord.maclearner.api.MacDeleteResult;
 import org.opencord.maclearner.api.MacLearnerEvent;
 import org.opencord.maclearner.api.MacLearnerKey;
@@ -70,6 +85,7 @@
 import java.net.URI;
 import java.util.Date;
 import java.util.Dictionary;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
@@ -82,12 +98,13 @@
 import java.util.stream.Collectors;
 
 import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.stream.Collectors.toList;
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
 import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.AUTO_CLEAR_MAC_MAPPING;
+import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.AUTO_CLEAR_MAC_MAPPING_DEFAULT;
 import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.CACHE_DURATION_DEFAULT;
 import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.CACHE_DURATION;
-import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.ENABLE_DEVICE_LISTENER;
-import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.ENABLE_DEVICE_LISTENER_DEFAULT;
 import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
 
 /**
@@ -96,7 +113,7 @@
 @Component(immediate = true,
         property = {
                 CACHE_DURATION + ":Integer=" + CACHE_DURATION_DEFAULT,
-                ENABLE_DEVICE_LISTENER + ":Boolean=" + ENABLE_DEVICE_LISTENER_DEFAULT
+                AUTO_CLEAR_MAC_MAPPING + ":Boolean=" + AUTO_CLEAR_MAC_MAPPING_DEFAULT
         },
         service = MacLearnerService.class
 )
@@ -107,6 +124,7 @@
 
     private static final String MAC_LEARNER_APP = "org.opencord.maclearner";
     private static final String MAC_LEARNER = "maclearner";
+    private static final String OLT_MANUFACTURER_KEY = "VOLTHA";
     private ApplicationId appId;
 
     private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@@ -118,7 +136,7 @@
     protected CoreService coreService;
 
     @Reference(cardinality = MANDATORY)
-    protected MastershipService mastershipService;
+    protected ClusterService clusterService;
 
     @Reference(cardinality = MANDATORY)
     protected DeviceService deviceService;
@@ -129,14 +147,26 @@
     @Reference(cardinality = MANDATORY)
     protected StorageService storageService;
 
+    @Reference(cardinality = MANDATORY)
+    protected TopologyService topologyService;
 
     @Reference(cardinality = MANDATORY)
     protected ComponentConfigService componentConfigService;
 
-    private MacLearnerPacketProcessor macLearnerPacketProcessor =
+    @Reference(cardinality = MANDATORY)
+    protected MacLearnerHostLocationService hostLocService;
+
+    @Reference(cardinality = MANDATORY)
+    protected LinkService linkService;
+
+    private final MacLearnerPacketProcessor macLearnerPacketProcessor =
             new MacLearnerPacketProcessor();
 
-    private DeviceListener deviceListener = new InternalDeviceListener();
+    private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+    private ConsistentHasher hasher;
+    public static final int HASH_WEIGHT = 10;
 
     /**
      * Minimum duration of mapping, mapping can be exist until 2*cacheDuration because of cleanerTimer fixed rate.
@@ -144,9 +174,9 @@
     protected int cacheDurationSec = CACHE_DURATION_DEFAULT;
 
     /**
-     * Register a device event listener for removing mappings from MAC Address Map for removed events.
+     * Removes mappings from MAC Address Map for removed events.
      */
-    protected boolean enableDeviceListener = ENABLE_DEVICE_LISTENER_DEFAULT;
+    protected boolean autoClearMacMapping = AUTO_CLEAR_MAC_MAPPING_DEFAULT;
 
     private ConsistentMap<DeviceId, Set<PortNumber>> ignoredPortsMap;
     private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
@@ -174,9 +204,13 @@
         //mac learner must process the packet before director processors
         packetService.addProcessor(macLearnerPacketProcessor,
                 PacketProcessor.advisor(2));
-        if (enableDeviceListener) {
-            deviceService.addListener(deviceListener);
-        }
+        List<NodeId> readyNodes = clusterService.getNodes().stream()
+                .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+                .map(ControllerNode::id)
+                .collect(toList());
+        hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+        clusterService.addListener(clusterListener);
+        deviceService.addListener(deviceListener);
         createSchedulerForClearMacMappings();
         log.info("{} is started.", getClass().getSimpleName());
     }
@@ -210,25 +244,42 @@
     private void clearExpiredMacMappings() {
         Date curDate = new Date();
         for (Map.Entry<MacLearnerKey, Versioned<MacLearnerValue>> entry : macAddressMap.entrySet()) {
-            if (!mastershipService.isLocalMaster(entry.getKey().getDeviceId())) {
-                return;
+            if (!isDeviceMine(entry.getKey().getDeviceId())) {
+                continue;
             }
             if (curDate.getTime() - entry.getValue().value().getTimestamp() > cacheDurationSec * 1000) {
-                removeFromMacAddressMap(entry.getKey());
+                removeFromMacAddressMap(entry.getKey(), false);
             }
         }
     }
 
+    /**
+     * Determines if this instance should handle this device based on
+     * consistent hashing.
+     *
+     * @param id device ID
+     * @return true if this instance should handle the device, otherwise false
+     */
+    private boolean isDeviceMine(DeviceId id) {
+        NodeId nodeId = hasher.hash(id.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("Node that will handle {} is {}", id, nodeId);
+        }
+        return nodeId.equals(clusterService.getLocalNode().id());
+    }
+
     @Deactivate
     public void deactivate() {
         if (scheduledFuture != null) {
             scheduledFuture.cancel(true);
         }
         packetService.removeProcessor(macLearnerPacketProcessor);
-        if (enableDeviceListener) {
-            deviceService.removeListener(deviceListener);
-        }
+        clusterService.removeListener(clusterListener);
+        deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(MacLearnerEvent.class);
+        if (eventExecutor != null) {
+            eventExecutor.shutdown();
+        }
         componentConfigService.unregisterProperties(getClass(), false);
         log.info("{} is stopped.", getClass().getSimpleName());
     }
@@ -244,17 +295,6 @@
                 setMacMappingCacheDuration(cacheDur);
             }
         }
-
-        Boolean enableDevListener = Tools.isPropertyEnabled(properties, ENABLE_DEVICE_LISTENER);
-        if (enableDevListener != null && enableDeviceListener != enableDevListener) {
-            enableDeviceListener = enableDevListener;
-            log.info("enableDeviceListener parameter changed to: {}", enableDeviceListener);
-            if (this.enableDeviceListener) {
-                deviceService.addListener(deviceListener);
-            } else {
-                deviceService.removeListener(deviceListener);
-            }
-        }
     }
 
     private Integer setMacMappingCacheDuration(Integer second) {
@@ -329,7 +369,7 @@
     public MacDeleteResult deleteMacMapping(DeviceId deviceId, PortNumber portNumber, VlanId vlanId) {
         log.info("Deleting MAC mapping for: {} {} {}", deviceId, portNumber, vlanId);
         MacLearnerKey key = new MacLearnerKey(deviceId, portNumber, vlanId);
-        return removeFromMacAddressMap(key);
+        return removeFromMacAddressMap(key, true);
     }
 
     @Override
@@ -343,7 +383,7 @@
             log.warn("MAC mapping not found for deviceId: {} and portNumber: {}", deviceId, portNumber);
             return false;
         }
-        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey()));
+        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey(), true));
         return true;
     }
 
@@ -357,7 +397,7 @@
             log.warn("MAC mapping not found for deviceId: {}", deviceId);
             return false;
         }
-        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey()));
+        entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey(), true));
         return true;
     }
 
@@ -410,6 +450,10 @@
         post(macLearnerEvent);
     }
 
+    private boolean isOltDevice(Device device) {
+        return device.manufacturer().contains(OLT_MANUFACTURER_KEY);
+    }
+
     private class MacLearnerPacketProcessor implements PacketProcessor {
 
         @Override
@@ -422,8 +466,52 @@
                 return;
             }
 
-            PortNumber sourcePort = context.inPacket().receivedFrom().port();
-            DeviceId deviceId = context.inPacket().receivedFrom().deviceId();
+            ConnectPoint cp = context.inPacket().receivedFrom();
+            DeviceId deviceId = cp.deviceId();
+            PortNumber sourcePort = cp.port();
+            MacAddress srcMac = packet.getSourceMAC();
+            MacAddress dstMac = packet.getDestinationMAC();
+
+            Device device = deviceService.getDevice(deviceId);
+            if (!isOltDevice(device)) { // not handle non OLT device packets
+                log.debug("Packet received from non-OLT device: {}. Returning.", deviceId);
+                return;
+            }
+
+            if (srcMac.isBroadcast() || srcMac.isMulticast()) {
+                log.debug("Broadcast or multicast packet received from: {}. Returning.", cp);
+                return;
+            }
+
+            // Ignore location probes
+            if (dstMac.isOnos() && !MacAddress.NONE.equals(dstMac)) {
+                log.debug("Location probe. cp: {}", cp);
+                return;
+            }
+
+            // If this arrived on control port, bail out.
+            if (cp.port().isLogical()) {
+                log.debug("Packet received from logical port: {}", cp);
+                return;
+            }
+
+            // If this is not an edge port, bail out.
+            Topology topology = topologyService.currentTopology();
+            if (topologyService.isInfrastructure(topology, cp)) {
+                log.debug("Packet received from non-edge port: {}", cp);
+                return;
+            }
+
+            VlanId vlan = VlanId.vlanId(packet.getVlanID());
+            VlanId outerVlan = VlanId.vlanId(packet.getQinQVID());
+            VlanId innerVlan = VlanId.NONE;
+            EthType outerTpid = EthType.EtherType.UNKNOWN.ethType();
+            // Set up values for double-tagged hosts
+            if (outerVlan.toShort() != Ethernet.VLAN_UNTAGGED) {
+                innerVlan = vlan;
+                vlan = outerVlan;
+                outerTpid = EthType.EtherType.lookup(packet.getQinQTPID()).ethType();
+            }
 
             Versioned<Set<PortNumber>> ignoredPortsOfDevice = ignoredPortsMap.get(deviceId);
             if (ignoredPortsOfDevice != null && ignoredPortsOfDevice.value().contains(sourcePort)) {
@@ -438,11 +526,24 @@
                     UDP udpPacket = (UDP) ipv4Packet.getPayload();
                     int udpSourcePort = udpPacket.getSourcePort();
                     if ((udpSourcePort == UDP.DHCP_CLIENT_PORT) || (udpSourcePort == UDP.DHCP_SERVER_PORT)) {
+                        // Update host location
+                        HostLocation hloc = new HostLocation(cp, System.currentTimeMillis());
+                        HostLocation auxLocation = null;
+                        Optional<Link> optLink = linkService.getDeviceLinks(deviceId).stream().findFirst();
+                        if (optLink.isPresent()) {
+                            Link link = optLink.get();
+                            auxLocation = !link.src().deviceId().equals(deviceId) ?
+                                    new HostLocation(link.src(), System.currentTimeMillis()) :
+                                    new HostLocation(link.dst(), System.currentTimeMillis());
+                        } else {
+                            log.debug("Link not found for device {}", deviceId);
+                        }
+                        hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
+                                packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
+                                hloc, auxLocation, null);
                         DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
                         //This packet is dhcp.
-                        VlanId vlanId = packet.getQinQVID() != -1 ?
-                                VlanId.vlanId(packet.getQinQVID()) : VlanId.vlanId(packet.getVlanID());
-                        processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlanId);
+                        processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);
                     }
                 }
             }
@@ -469,6 +570,11 @@
             if (incomingPacketType.equals(DHCP.MsgType.DHCPDISCOVER) ||
                     incomingPacketType.equals(DHCP.MsgType.DHCPREQUEST)) {
                 addToMacAddressMap(deviceId, sourcePort, vlanId, packet.getSourceMAC());
+            } else if (incomingPacketType.equals(DHCP.MsgType.DHCPACK)) {
+                MacAddress hostMac = MacAddress.valueOf(dhcpPayload.getClientHardwareAddress());
+                VlanId hostVlan = VlanId.vlanId(packet.getVlanID());
+                HostId hostId = HostId.hostId(hostMac, hostVlan);
+                hostLocService.updateHostIp(hostId, IpAddress.valueOf(dhcpPayload.getYourIPAddress()));
             }
         }
 
@@ -495,7 +601,8 @@
                         portNumber,
                         vlanId,
                         prevMacAddress.value().getMacAddress());
-            } else if (prevMacAddress == null || !prevMacAddress.value().getMacAddress().equals(macAddress)) {
+            }
+            if (prevMacAddress == null || !prevMacAddress.value().getMacAddress().equals(macAddress)) {
                 // Not sending event for already mapped
                 log.info("Mapped MAC: {} for port: {} of deviceId: {} and vlanId: {}",
                         macAddress, portNumber, deviceId, vlanId);
@@ -505,7 +612,7 @@
 
     }
 
-    private MacDeleteResult removeFromMacAddressMap(MacLearnerKey macLearnerKey) {
+    private MacDeleteResult removeFromMacAddressMap(MacLearnerKey macLearnerKey, boolean vanishHost) {
         Versioned<MacLearnerValue> verMacAddress = macAddressMap.remove(macLearnerKey);
         if (verMacAddress != null) {
             log.info("Mapping removed. deviceId: {} portNumber: {} vlanId: {} macAddress: {}",
@@ -516,6 +623,9 @@
                     macLearnerKey.getPortNumber(),
                     macLearnerKey.getVlanId(),
                     verMacAddress.value().getMacAddress());
+            if (vanishHost) {
+                hostLocService.vanishHost(verMacAddress.value().getMacAddress(), macLearnerKey.getVlanId());
+            }
             return MacDeleteResult.SUCCESSFUL;
         } else {
             log.warn("MAC not removed, because mapping not found for deviceId: {} and portNumber: {} and vlanId: {}",
@@ -531,12 +641,18 @@
         @Override
         public void event(DeviceEvent event) {
             eventExecutor.execute(() -> {
+                Device device = event.subject();
+                log.debug("Device event received: {}", event.type());
                 switch (event.type()) {
                     case DEVICE_REMOVED:
-                        deleteMacMappings(event.subject().id());
+                        if (autoClearMacMapping) {
+                            deleteMacMappings(device.id());
+                        }
                         break;
                     case PORT_REMOVED:
-                        deleteMacMappings(event.subject().id(), event.port().number());
+                        if (autoClearMacMapping) {
+                            deleteMacMappings(device.id(), event.port().number());
+                        }
                         break;
                     default:
                         log.debug("Unhandled device event for Mac Learner: {}", event.type());
@@ -546,9 +662,21 @@
 
         @Override
         public boolean isRelevant(DeviceEvent event) {
-            return mastershipService.isLocalMaster(event.subject().id());
+            return isDeviceMine(event.subject().id());
         }
 
     }
 
+    private class InternalClusterListener implements ClusterEventListener {
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+                hasher.addServer(event.subject().id());
+            }
+            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+                hasher.removeServer(event.subject().id());
+            }
+        }
+    }
+
 }