[VOL-3472] Mac Learning App HostProvider Implementation
Change-Id: I196dadd89e5f894048233fb1c49a99f10658b644
diff --git a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
index 0a73e3e..df943e2 100644
--- a/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
+++ b/app/src/main/java/org/opencord/maclearner/app/impl/MacLearnerManager.java
@@ -19,18 +19,33 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
+import org.onlab.packet.EthType;
+import org.onlab.packet.IpAddress;
import org.onlab.packet.VlanId;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
-import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.Device;
+import org.onosproject.net.HostId;
+import org.onosproject.net.HostLocation;
+import org.onosproject.net.Link;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
import org.onosproject.net.device.DeviceService;
+import org.onosproject.net.link.LinkService;
+import org.onosproject.net.topology.Topology;
+import org.onosproject.net.topology.TopologyService;
import org.onosproject.store.service.ConsistentMap;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.Versioned;
import org.opencord.maclearner.api.DefaultMacLearner;
+import org.opencord.maclearner.api.MacLearnerHostLocationService;
import org.opencord.maclearner.api.MacDeleteResult;
import org.opencord.maclearner.api.MacLearnerEvent;
import org.opencord.maclearner.api.MacLearnerKey;
@@ -70,6 +85,7 @@
import java.net.URI;
import java.util.Date;
import java.util.Dictionary;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
@@ -82,12 +98,13 @@
import java.util.stream.Collectors;
import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.stream.Collectors.toList;
import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.AUTO_CLEAR_MAC_MAPPING;
+import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.AUTO_CLEAR_MAC_MAPPING_DEFAULT;
import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.CACHE_DURATION_DEFAULT;
import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.CACHE_DURATION;
-import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.ENABLE_DEVICE_LISTENER;
-import static org.opencord.maclearner.app.impl.OsgiPropertyConstants.ENABLE_DEVICE_LISTENER_DEFAULT;
import static org.osgi.service.component.annotations.ReferenceCardinality.MANDATORY;
/**
@@ -96,7 +113,7 @@
@Component(immediate = true,
property = {
CACHE_DURATION + ":Integer=" + CACHE_DURATION_DEFAULT,
- ENABLE_DEVICE_LISTENER + ":Boolean=" + ENABLE_DEVICE_LISTENER_DEFAULT
+ AUTO_CLEAR_MAC_MAPPING + ":Boolean=" + AUTO_CLEAR_MAC_MAPPING_DEFAULT
},
service = MacLearnerService.class
)
@@ -107,6 +124,7 @@
private static final String MAC_LEARNER_APP = "org.opencord.maclearner";
private static final String MAC_LEARNER = "maclearner";
+ private static final String OLT_MANUFACTURER_KEY = "VOLTHA";
private ApplicationId appId;
private final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
@@ -118,7 +136,7 @@
protected CoreService coreService;
@Reference(cardinality = MANDATORY)
- protected MastershipService mastershipService;
+ protected ClusterService clusterService;
@Reference(cardinality = MANDATORY)
protected DeviceService deviceService;
@@ -129,14 +147,26 @@
@Reference(cardinality = MANDATORY)
protected StorageService storageService;
+ @Reference(cardinality = MANDATORY)
+ protected TopologyService topologyService;
@Reference(cardinality = MANDATORY)
protected ComponentConfigService componentConfigService;
- private MacLearnerPacketProcessor macLearnerPacketProcessor =
+ @Reference(cardinality = MANDATORY)
+ protected MacLearnerHostLocationService hostLocService;
+
+ @Reference(cardinality = MANDATORY)
+ protected LinkService linkService;
+
+ private final MacLearnerPacketProcessor macLearnerPacketProcessor =
new MacLearnerPacketProcessor();
- private DeviceListener deviceListener = new InternalDeviceListener();
+ private final DeviceListener deviceListener = new InternalDeviceListener();
+ private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+ private ConsistentHasher hasher;
+ public static final int HASH_WEIGHT = 10;
/**
* Minimum duration of mapping, mapping can be exist until 2*cacheDuration because of cleanerTimer fixed rate.
@@ -144,9 +174,9 @@
protected int cacheDurationSec = CACHE_DURATION_DEFAULT;
/**
- * Register a device event listener for removing mappings from MAC Address Map for removed events.
+ * Removes mappings from MAC Address Map for removed events.
*/
- protected boolean enableDeviceListener = ENABLE_DEVICE_LISTENER_DEFAULT;
+ protected boolean autoClearMacMapping = AUTO_CLEAR_MAC_MAPPING_DEFAULT;
private ConsistentMap<DeviceId, Set<PortNumber>> ignoredPortsMap;
private ConsistentMap<MacLearnerKey, MacLearnerValue> macAddressMap;
@@ -174,9 +204,13 @@
//mac learner must process the packet before director processors
packetService.addProcessor(macLearnerPacketProcessor,
PacketProcessor.advisor(2));
- if (enableDeviceListener) {
- deviceService.addListener(deviceListener);
- }
+ List<NodeId> readyNodes = clusterService.getNodes().stream()
+ .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+ .map(ControllerNode::id)
+ .collect(toList());
+ hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+ clusterService.addListener(clusterListener);
+ deviceService.addListener(deviceListener);
createSchedulerForClearMacMappings();
log.info("{} is started.", getClass().getSimpleName());
}
@@ -210,25 +244,42 @@
private void clearExpiredMacMappings() {
Date curDate = new Date();
for (Map.Entry<MacLearnerKey, Versioned<MacLearnerValue>> entry : macAddressMap.entrySet()) {
- if (!mastershipService.isLocalMaster(entry.getKey().getDeviceId())) {
- return;
+ if (!isDeviceMine(entry.getKey().getDeviceId())) {
+ continue;
}
if (curDate.getTime() - entry.getValue().value().getTimestamp() > cacheDurationSec * 1000) {
- removeFromMacAddressMap(entry.getKey());
+ removeFromMacAddressMap(entry.getKey(), false);
}
}
}
+ /**
+ * Determines if this instance should handle this device based on
+ * consistent hashing.
+ *
+ * @param id device ID
+ * @return true if this instance should handle the device, otherwise false
+ */
+ private boolean isDeviceMine(DeviceId id) {
+ NodeId nodeId = hasher.hash(id.toString());
+ if (log.isDebugEnabled()) {
+ log.debug("Node that will handle {} is {}", id, nodeId);
+ }
+ return nodeId.equals(clusterService.getLocalNode().id());
+ }
+
@Deactivate
public void deactivate() {
if (scheduledFuture != null) {
scheduledFuture.cancel(true);
}
packetService.removeProcessor(macLearnerPacketProcessor);
- if (enableDeviceListener) {
- deviceService.removeListener(deviceListener);
- }
+ clusterService.removeListener(clusterListener);
+ deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(MacLearnerEvent.class);
+ if (eventExecutor != null) {
+ eventExecutor.shutdown();
+ }
componentConfigService.unregisterProperties(getClass(), false);
log.info("{} is stopped.", getClass().getSimpleName());
}
@@ -244,17 +295,6 @@
setMacMappingCacheDuration(cacheDur);
}
}
-
- Boolean enableDevListener = Tools.isPropertyEnabled(properties, ENABLE_DEVICE_LISTENER);
- if (enableDevListener != null && enableDeviceListener != enableDevListener) {
- enableDeviceListener = enableDevListener;
- log.info("enableDeviceListener parameter changed to: {}", enableDeviceListener);
- if (this.enableDeviceListener) {
- deviceService.addListener(deviceListener);
- } else {
- deviceService.removeListener(deviceListener);
- }
- }
}
private Integer setMacMappingCacheDuration(Integer second) {
@@ -329,7 +369,7 @@
public MacDeleteResult deleteMacMapping(DeviceId deviceId, PortNumber portNumber, VlanId vlanId) {
log.info("Deleting MAC mapping for: {} {} {}", deviceId, portNumber, vlanId);
MacLearnerKey key = new MacLearnerKey(deviceId, portNumber, vlanId);
- return removeFromMacAddressMap(key);
+ return removeFromMacAddressMap(key, true);
}
@Override
@@ -343,7 +383,7 @@
log.warn("MAC mapping not found for deviceId: {} and portNumber: {}", deviceId, portNumber);
return false;
}
- entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey()));
+ entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey(), true));
return true;
}
@@ -357,7 +397,7 @@
log.warn("MAC mapping not found for deviceId: {}", deviceId);
return false;
}
- entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey()));
+ entriesToDelete.forEach(e -> removeFromMacAddressMap(e.getKey(), true));
return true;
}
@@ -410,6 +450,10 @@
post(macLearnerEvent);
}
+ private boolean isOltDevice(Device device) {
+ return device.manufacturer().contains(OLT_MANUFACTURER_KEY);
+ }
+
private class MacLearnerPacketProcessor implements PacketProcessor {
@Override
@@ -422,8 +466,52 @@
return;
}
- PortNumber sourcePort = context.inPacket().receivedFrom().port();
- DeviceId deviceId = context.inPacket().receivedFrom().deviceId();
+ ConnectPoint cp = context.inPacket().receivedFrom();
+ DeviceId deviceId = cp.deviceId();
+ PortNumber sourcePort = cp.port();
+ MacAddress srcMac = packet.getSourceMAC();
+ MacAddress dstMac = packet.getDestinationMAC();
+
+ Device device = deviceService.getDevice(deviceId);
+ if (!isOltDevice(device)) { // not handle non OLT device packets
+ log.debug("Packet received from non-OLT device: {}. Returning.", deviceId);
+ return;
+ }
+
+ if (srcMac.isBroadcast() || srcMac.isMulticast()) {
+ log.debug("Broadcast or multicast packet received from: {}. Returning.", cp);
+ return;
+ }
+
+ // Ignore location probes
+ if (dstMac.isOnos() && !MacAddress.NONE.equals(dstMac)) {
+ log.debug("Location probe. cp: {}", cp);
+ return;
+ }
+
+ // If this arrived on control port, bail out.
+ if (cp.port().isLogical()) {
+ log.debug("Packet received from logical port: {}", cp);
+ return;
+ }
+
+ // If this is not an edge port, bail out.
+ Topology topology = topologyService.currentTopology();
+ if (topologyService.isInfrastructure(topology, cp)) {
+ log.debug("Packet received from non-edge port: {}", cp);
+ return;
+ }
+
+ VlanId vlan = VlanId.vlanId(packet.getVlanID());
+ VlanId outerVlan = VlanId.vlanId(packet.getQinQVID());
+ VlanId innerVlan = VlanId.NONE;
+ EthType outerTpid = EthType.EtherType.UNKNOWN.ethType();
+ // Set up values for double-tagged hosts
+ if (outerVlan.toShort() != Ethernet.VLAN_UNTAGGED) {
+ innerVlan = vlan;
+ vlan = outerVlan;
+ outerTpid = EthType.EtherType.lookup(packet.getQinQTPID()).ethType();
+ }
Versioned<Set<PortNumber>> ignoredPortsOfDevice = ignoredPortsMap.get(deviceId);
if (ignoredPortsOfDevice != null && ignoredPortsOfDevice.value().contains(sourcePort)) {
@@ -438,11 +526,24 @@
UDP udpPacket = (UDP) ipv4Packet.getPayload();
int udpSourcePort = udpPacket.getSourcePort();
if ((udpSourcePort == UDP.DHCP_CLIENT_PORT) || (udpSourcePort == UDP.DHCP_SERVER_PORT)) {
+ // Update host location
+ HostLocation hloc = new HostLocation(cp, System.currentTimeMillis());
+ HostLocation auxLocation = null;
+ Optional<Link> optLink = linkService.getDeviceLinks(deviceId).stream().findFirst();
+ if (optLink.isPresent()) {
+ Link link = optLink.get();
+ auxLocation = !link.src().deviceId().equals(deviceId) ?
+ new HostLocation(link.src(), System.currentTimeMillis()) :
+ new HostLocation(link.dst(), System.currentTimeMillis());
+ } else {
+ log.debug("Link not found for device {}", deviceId);
+ }
+ hostLocService.createOrUpdateHost(HostId.hostId(packet.getSourceMAC(), vlan),
+ packet.getSourceMAC(), packet.getDestinationMAC(), vlan, innerVlan, outerTpid,
+ hloc, auxLocation, null);
DHCP dhcpPayload = (DHCP) udpPacket.getPayload();
//This packet is dhcp.
- VlanId vlanId = packet.getQinQVID() != -1 ?
- VlanId.vlanId(packet.getQinQVID()) : VlanId.vlanId(packet.getVlanID());
- processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlanId);
+ processDhcpPacket(context, packet, dhcpPayload, sourcePort, deviceId, vlan);
}
}
}
@@ -469,6 +570,11 @@
if (incomingPacketType.equals(DHCP.MsgType.DHCPDISCOVER) ||
incomingPacketType.equals(DHCP.MsgType.DHCPREQUEST)) {
addToMacAddressMap(deviceId, sourcePort, vlanId, packet.getSourceMAC());
+ } else if (incomingPacketType.equals(DHCP.MsgType.DHCPACK)) {
+ MacAddress hostMac = MacAddress.valueOf(dhcpPayload.getClientHardwareAddress());
+ VlanId hostVlan = VlanId.vlanId(packet.getVlanID());
+ HostId hostId = HostId.hostId(hostMac, hostVlan);
+ hostLocService.updateHostIp(hostId, IpAddress.valueOf(dhcpPayload.getYourIPAddress()));
}
}
@@ -495,7 +601,8 @@
portNumber,
vlanId,
prevMacAddress.value().getMacAddress());
- } else if (prevMacAddress == null || !prevMacAddress.value().getMacAddress().equals(macAddress)) {
+ }
+ if (prevMacAddress == null || !prevMacAddress.value().getMacAddress().equals(macAddress)) {
// Not sending event for already mapped
log.info("Mapped MAC: {} for port: {} of deviceId: {} and vlanId: {}",
macAddress, portNumber, deviceId, vlanId);
@@ -505,7 +612,7 @@
}
- private MacDeleteResult removeFromMacAddressMap(MacLearnerKey macLearnerKey) {
+ private MacDeleteResult removeFromMacAddressMap(MacLearnerKey macLearnerKey, boolean vanishHost) {
Versioned<MacLearnerValue> verMacAddress = macAddressMap.remove(macLearnerKey);
if (verMacAddress != null) {
log.info("Mapping removed. deviceId: {} portNumber: {} vlanId: {} macAddress: {}",
@@ -516,6 +623,9 @@
macLearnerKey.getPortNumber(),
macLearnerKey.getVlanId(),
verMacAddress.value().getMacAddress());
+ if (vanishHost) {
+ hostLocService.vanishHost(verMacAddress.value().getMacAddress(), macLearnerKey.getVlanId());
+ }
return MacDeleteResult.SUCCESSFUL;
} else {
log.warn("MAC not removed, because mapping not found for deviceId: {} and portNumber: {} and vlanId: {}",
@@ -531,12 +641,18 @@
@Override
public void event(DeviceEvent event) {
eventExecutor.execute(() -> {
+ Device device = event.subject();
+ log.debug("Device event received: {}", event.type());
switch (event.type()) {
case DEVICE_REMOVED:
- deleteMacMappings(event.subject().id());
+ if (autoClearMacMapping) {
+ deleteMacMappings(device.id());
+ }
break;
case PORT_REMOVED:
- deleteMacMappings(event.subject().id(), event.port().number());
+ if (autoClearMacMapping) {
+ deleteMacMappings(device.id(), event.port().number());
+ }
break;
default:
log.debug("Unhandled device event for Mac Learner: {}", event.type());
@@ -546,9 +662,21 @@
@Override
public boolean isRelevant(DeviceEvent event) {
- return mastershipService.isLocalMaster(event.subject().id());
+ return isDeviceMine(event.subject().id());
}
}
+ private class InternalClusterListener implements ClusterEventListener {
+ @Override
+ public void event(ClusterEvent event) {
+ if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+ hasher.addServer(event.subject().id());
+ }
+ if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+ hasher.removeServer(event.subject().id());
+ }
+ }
+ }
+
}