Enable operation in a multi-instance cluster
Change-Id: Ia384fbd972d8866f5dd893c523b5d43ef17e6458
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2c1081b..2d17579 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -15,33 +15,10 @@
*/
package org.opencord.dhcpl2relay.impl;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
import org.apache.commons.io.HexDump;
import org.onlab.packet.DHCP;
import org.onlab.packet.Ethernet;
@@ -52,9 +29,12 @@
import org.onlab.packet.UDP;
import org.onlab.packet.VlanId;
import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.SafeRecurringTask;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
@@ -87,6 +67,11 @@
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.opencord.dhcpl2relay.DhcpAllocationInfo;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
import org.opencord.dhcpl2relay.DhcpL2RelayListener;
@@ -106,7 +91,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
/**
* DHCP Relay Agent Application Component.
@@ -125,6 +142,7 @@
public static final String DHCP_L2RELAY_APP = "org.opencord.dhcpl2relay";
private static final String HOST_LOC_PROVIDER =
"org.onosproject.provider.host.impl.HostLocationProvider";
+ private static final String LEADER_TOPIC = "dhcpl2relay-leader";
private final Logger log = LoggerFactory.getLogger(getClass());
private final InternalConfigListener cfgListener =
new InternalConfigListener();
@@ -165,11 +183,20 @@
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
// OSGi Properties
/** Add option 82 to relayed packets. */
protected boolean option82 = OPTION_82_DEFAULT;
@@ -197,7 +224,7 @@
private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
private ApplicationId appId;
- static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
+ private ConsistentMap<String, DhcpAllocationInfo> allocations;
protected boolean modifyClientPktsSrcDstMac = false;
//Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
protected boolean useOltUplink = false;
@@ -214,6 +241,21 @@
componentConfigService.registerProperties(getClass());
eventDispatcher.addSink(DhcpL2RelayEvent.class, listenerRegistry);
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(Instant.class)
+ .register(DHCP.MsgType.class)
+ .register(DhcpAllocationInfo.class)
+ .build();
+
+ allocations = storageService.<String, DhcpAllocationInfo>consistentMapBuilder()
+ .withName("dhcpl2relay-allocations")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ leadershipService.runForLeadership(LEADER_TOPIC);
+
cfgService.addListener(cfgListener);
mastershipService.addListener(changeListener);
deviceService.addListener(deviceListener);
@@ -253,6 +295,7 @@
deviceService.removeListener(deviceListener);
mastershipService.removeListener(changeListener);
eventDispatcher.removeSink(DhcpL2RelayEvent.class);
+ leadershipService.withdraw(LEADER_TOPIC);
log.info("DHCP-L2-RELAY Stopped");
}
@@ -307,6 +350,11 @@
}
}
+ @Override
+ public Map<String, DhcpAllocationInfo> getAllocationInfo() {
+ return ImmutableMap.copyOf(allocations.asJavaMap());
+ }
+
/**
* Publish the counters to kafka.
*/
@@ -319,7 +367,7 @@
new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
counterValue), dhcpCountersTopic, null));
} else { // Publish the counters per subscriber
- DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+ DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
counterValue), dhcpCountersTopic, counterKey.counterClassKey));
@@ -556,10 +604,6 @@
appId, Optional.of(cp.deviceId()));
}
- public static Map<String, DhcpAllocationInfo> allocationMap() {
- return allocationMap;
- }
-
private SubscriberAndDeviceInformation getDevice(PacketContext context) {
String serialNo = deviceService.getDevice(context.inPacket().
receivedFrom().deviceId()).serialNumber();
@@ -860,7 +904,7 @@
context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
entry.circuitId(), clientMac, clientIp);
- allocationMap.put(entry.id(), info);
+ allocations.put(entry.id(), info);
post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
context.inPacket().receivedFrom()));
@@ -918,8 +962,7 @@
//storeDHCPAllocationInfo
DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
dhcpPayload.getPacketType(), entry.circuitId(), dstMac, ip);
- allocationMap.put(entry.id(), info);
- log.debug("DHCP Allocation Map {} is updated", allocationMap);
+ allocations.put(entry.id(), info);
post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
} // end storing of info
@@ -1077,6 +1120,14 @@
}
}
+ private void removeAllocations(Predicate<Map.Entry<String, Versioned<DhcpAllocationInfo>>> pred) {
+ allocations.stream()
+ .filter(pred)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList())
+ .forEach(allocations::remove);
+ }
+
/**
* Handles Device status change for the devices which connect
* to the DHCP server.
@@ -1084,25 +1135,34 @@
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
+ // ensure only one instance handles the event
+ if (!Objects.equals(leadershipService.getLeader(LEADER_TOPIC), clusterService.getLocalNode().id())) {
+ return;
+ }
+
+ final DeviceId deviceId = event.subject().id();
+
switch (event.type()) {
+ case DEVICE_REMOVED:
+ log.info("Device removed {}", event.subject().id());
+ removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+ break;
case DEVICE_AVAILABILITY_CHANGED:
- log.info("Device Avail Changed {}", event.subject().id());
- DeviceId deviceId = event.subject().id();
- if (!deviceService.isAvailable(deviceId)) {
- log.warn("Device {} is not available ", deviceId);
- if (deviceService.getPorts(deviceId).isEmpty()) {
- allocationMap.entrySet().removeIf(entry -> deviceId.equals(entry.getValue().
- location().deviceId()));
- log.info("Device {} is removed from DHCP allocationmap ", deviceId);
- }
+ boolean available = deviceService.isAvailable(deviceId);
+ log.info("Device Avail Changed {} to {}", event.subject().id(), available);
+
+ if (!available && deviceService.getPorts(deviceId).isEmpty()) {
+ removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+ log.info("Device {} is removed from DHCP allocationmap ", deviceId);
}
break;
case PORT_REMOVED:
Port port = event.port();
- deviceId = event.subject().id();
log.info("Port {} is deleted on device {}", port, deviceId);
- allocationMap.entrySet().removeIf(entry -> port.number().equals(entry.getValue().
- location().port()) && deviceId.equals(entry.getValue().location().deviceId()));
+
+ ConnectPoint cp = new ConnectPoint(deviceId, port.number());
+ removeAllocations(e -> e.getValue().value().location().equals(cp));
+
log.info("Port {} on device {} is removed from DHCP allocationmap", event.port(), deviceId);
break;
default:
@@ -1147,7 +1207,7 @@
if (useOltUplink && isUplinkPortOfOlt(event.subject().id(), event.port())) {
requestDhcpPacketsFromConnectPoint(
new ConnectPoint(event.subject().id(), event.port().number()),
- Optional.ofNullable(null));
+ Optional.empty());
}
break;
default: