Enable operation in a multi-instance cluster
Change-Id: Ia384fbd972d8866f5dd893c523b5d43ef17e6458
diff --git a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayService.java b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayService.java
index 76957ab..6d3d16a 100644
--- a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayService.java
+++ b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayService.java
@@ -18,9 +18,18 @@
import org.onosproject.event.ListenerService;
+import java.util.Map;
+
/**
* DHCP L2 relay service.
*/
public interface DhcpL2RelayService extends
ListenerService<DhcpL2RelayEvent, DhcpL2RelayListener> {
+
+ /**
+ * Returns information about DHCP leases that have been allocated.
+ *
+ * @return map of subscriber ID to DHCP allocation information
+ */
+ Map<String, DhcpAllocationInfo> getAllocationInfo();
}
diff --git a/app/pom.xml b/app/pom.xml
index 3f77eb6..9341b25 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -49,6 +49,13 @@
</dependency>
<dependency>
+ <groupId>org.onosproject</groupId>
+ <artifactId>onos-core-serializers</artifactId>
+ <version>${onos.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
<groupId>org.opencord</groupId>
<artifactId>sadis-api</artifactId>
<version>${sadis.api.version}</version>
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java
index 1e324aa..7d08737 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java
@@ -18,7 +18,7 @@
import org.apache.karaf.shell.api.action.Command;
import org.apache.karaf.shell.api.action.lifecycle.Service;
import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.dhcpl2relay.impl.DhcpL2Relay;
+import org.opencord.dhcpl2relay.DhcpL2RelayService;
/**
* Shows the Successful DHCP allocations relayed by the dhcpl2relay.
@@ -29,7 +29,9 @@
public class DhcpL2RelayAllocationsCommand extends AbstractShellCommand {
@Override
protected void doExecute() {
- DhcpL2Relay.allocationMap().forEach((key, value) -> {
+ DhcpL2RelayService service = get(DhcpL2RelayService.class);
+
+ service.getAllocationInfo().forEach((key, value) -> {
print("SubscriberId=%s,ConnectPoint=%s,State=%s,MAC=%s,CircuitId=%s" +
",IP Allocated=%s,Allocation Timestamp=%s",
key, value.location(), value.type(), value.macAddress().toString(), value.circuitId(),
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2c1081b..2d17579 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -15,33 +15,10 @@
*/
package org.opencord.dhcpl2relay.impl;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
import org.apache.commons.io.HexDump;
import org.onlab.packet.DHCP;
import org.onlab.packet.Ethernet;
@@ -52,9 +29,12 @@
import org.onlab.packet.UDP;
import org.onlab.packet.VlanId;
import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.KryoNamespace;
import org.onlab.util.SafeRecurringTask;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.event.AbstractListenerManager;
@@ -87,6 +67,11 @@
import org.onosproject.net.packet.PacketPriority;
import org.onosproject.net.packet.PacketProcessor;
import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
import org.opencord.dhcpl2relay.DhcpAllocationInfo;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
import org.opencord.dhcpl2relay.DhcpL2RelayListener;
@@ -106,7 +91,39 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
/**
* DHCP Relay Agent Application Component.
@@ -125,6 +142,7 @@
public static final String DHCP_L2RELAY_APP = "org.opencord.dhcpl2relay";
private static final String HOST_LOC_PROVIDER =
"org.onosproject.provider.host.impl.HostLocationProvider";
+ private static final String LEADER_TOPIC = "dhcpl2relay-leader";
private final Logger log = LoggerFactory.getLogger(getClass());
private final InternalConfigListener cfgListener =
new InternalConfigListener();
@@ -165,11 +183,20 @@
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected FlowObjectiveService flowObjectiveService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
// OSGi Properties
/** Add option 82 to relayed packets. */
protected boolean option82 = OPTION_82_DEFAULT;
@@ -197,7 +224,7 @@
private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
private ApplicationId appId;
- static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
+ private ConsistentMap<String, DhcpAllocationInfo> allocations;
protected boolean modifyClientPktsSrcDstMac = false;
//Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
protected boolean useOltUplink = false;
@@ -214,6 +241,21 @@
componentConfigService.registerProperties(getClass());
eventDispatcher.addSink(DhcpL2RelayEvent.class, listenerRegistry);
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(Instant.class)
+ .register(DHCP.MsgType.class)
+ .register(DhcpAllocationInfo.class)
+ .build();
+
+ allocations = storageService.<String, DhcpAllocationInfo>consistentMapBuilder()
+ .withName("dhcpl2relay-allocations")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
+ leadershipService.runForLeadership(LEADER_TOPIC);
+
cfgService.addListener(cfgListener);
mastershipService.addListener(changeListener);
deviceService.addListener(deviceListener);
@@ -253,6 +295,7 @@
deviceService.removeListener(deviceListener);
mastershipService.removeListener(changeListener);
eventDispatcher.removeSink(DhcpL2RelayEvent.class);
+ leadershipService.withdraw(LEADER_TOPIC);
log.info("DHCP-L2-RELAY Stopped");
}
@@ -307,6 +350,11 @@
}
}
+ @Override
+ public Map<String, DhcpAllocationInfo> getAllocationInfo() {
+ return ImmutableMap.copyOf(allocations.asJavaMap());
+ }
+
/**
* Publish the counters to kafka.
*/
@@ -319,7 +367,7 @@
new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
counterValue), dhcpCountersTopic, null));
} else { // Publish the counters per subscriber
- DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+ DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
counterValue), dhcpCountersTopic, counterKey.counterClassKey));
@@ -556,10 +604,6 @@
appId, Optional.of(cp.deviceId()));
}
- public static Map<String, DhcpAllocationInfo> allocationMap() {
- return allocationMap;
- }
-
private SubscriberAndDeviceInformation getDevice(PacketContext context) {
String serialNo = deviceService.getDevice(context.inPacket().
receivedFrom().deviceId()).serialNumber();
@@ -860,7 +904,7 @@
context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
entry.circuitId(), clientMac, clientIp);
- allocationMap.put(entry.id(), info);
+ allocations.put(entry.id(), info);
post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
context.inPacket().receivedFrom()));
@@ -918,8 +962,7 @@
//storeDHCPAllocationInfo
DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
dhcpPayload.getPacketType(), entry.circuitId(), dstMac, ip);
- allocationMap.put(entry.id(), info);
- log.debug("DHCP Allocation Map {} is updated", allocationMap);
+ allocations.put(entry.id(), info);
post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
} // end storing of info
@@ -1077,6 +1120,14 @@
}
}
+ private void removeAllocations(Predicate<Map.Entry<String, Versioned<DhcpAllocationInfo>>> pred) {
+ allocations.stream()
+ .filter(pred)
+ .map(Map.Entry::getKey)
+ .collect(Collectors.toList())
+ .forEach(allocations::remove);
+ }
+
/**
* Handles Device status change for the devices which connect
* to the DHCP server.
@@ -1084,25 +1135,34 @@
private class InnerDeviceListener implements DeviceListener {
@Override
public void event(DeviceEvent event) {
+ // ensure only one instance handles the event
+ if (!Objects.equals(leadershipService.getLeader(LEADER_TOPIC), clusterService.getLocalNode().id())) {
+ return;
+ }
+
+ final DeviceId deviceId = event.subject().id();
+
switch (event.type()) {
+ case DEVICE_REMOVED:
+ log.info("Device removed {}", event.subject().id());
+ removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+ break;
case DEVICE_AVAILABILITY_CHANGED:
- log.info("Device Avail Changed {}", event.subject().id());
- DeviceId deviceId = event.subject().id();
- if (!deviceService.isAvailable(deviceId)) {
- log.warn("Device {} is not available ", deviceId);
- if (deviceService.getPorts(deviceId).isEmpty()) {
- allocationMap.entrySet().removeIf(entry -> deviceId.equals(entry.getValue().
- location().deviceId()));
- log.info("Device {} is removed from DHCP allocationmap ", deviceId);
- }
+ boolean available = deviceService.isAvailable(deviceId);
+ log.info("Device Avail Changed {} to {}", event.subject().id(), available);
+
+ if (!available && deviceService.getPorts(deviceId).isEmpty()) {
+ removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+ log.info("Device {} is removed from DHCP allocationmap ", deviceId);
}
break;
case PORT_REMOVED:
Port port = event.port();
- deviceId = event.subject().id();
log.info("Port {} is deleted on device {}", port, deviceId);
- allocationMap.entrySet().removeIf(entry -> port.number().equals(entry.getValue().
- location().port()) && deviceId.equals(entry.getValue().location().deviceId()));
+
+ ConnectPoint cp = new ConnectPoint(deviceId, port.number());
+ removeAllocations(e -> e.getValue().value().location().equals(cp));
+
log.info("Port {} on device {} is removed from DHCP allocationmap", event.port(), deviceId);
break;
default:
@@ -1147,7 +1207,7 @@
if (useOltUplink && isUplinkPortOfOlt(event.subject().id(), event.port())) {
requestDhcpPacketsFromConnectPoint(
new ConnectPoint(event.subject().id(), event.port().number()),
- Optional.ofNullable(null));
+ Optional.empty());
}
break;
default:
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
index 94a90c8..d4cb8bd 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -19,21 +19,23 @@
import org.easymock.EasyMock;
import org.junit.Before;
import org.junit.Test;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.*;
-
import org.onlab.junit.TestUtils;
import org.onlab.osgi.ComponentContextAdapter;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.config.Config;
import org.onosproject.net.config.NetworkConfigRegistryAdapter;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
import java.util.Set;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
/**
* Tests for DHCP relay app configuration.
*/
@@ -62,6 +64,8 @@
dhcpL2Relay.sadisService = new MockSadisService();
dhcpL2Relay.hostService = new MockHostService();
dhcpL2Relay.mastershipService = new MockMastershipService();
+ dhcpL2Relay.storageService = new TestStorageService();
+ dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
dhcpL2Relay.activate(new ComponentContextAdapter());
}
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 32bb0d3..30f651f 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,8 +22,9 @@
import org.onlab.junit.TestUtils;
import org.onlab.osgi.ComponentContextAdapter;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
import java.util.Iterator;
@@ -55,6 +56,8 @@
dhcpL2Relay.deviceService = new MockDeviceService();
dhcpL2Relay.sadisService = new MockSadisService();
dhcpL2Relay.mastershipService = new MockMastershipService();
+ dhcpL2Relay.storageService = new TestStorageService();
+ dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
dhcpL2Relay.activate(new ComponentContextAdapter());
store = new SimpleDhcpL2RelayCountersStore();
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 6d0524e..42664cc 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -15,15 +15,7 @@
*/
package org.opencord.dhcpl2relay.impl;
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
@@ -34,14 +26,22 @@
import org.onlab.packet.IPv4;
import org.onlab.packet.UDP;
import org.onlab.packet.dhcp.DhcpOption;
-
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
-import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
@@ -67,6 +67,8 @@
dhcpL2Relay.hostService = new MockHostService();
dhcpL2Relay.mastershipService = new MockMastershipService();
dhcpL2Relay.dhcpL2RelayCounters = new MockDhcpL2RelayCountersStore();
+ dhcpL2Relay.storageService = new TestStorageService();
+ dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());