Enable operation in a multi-instance cluster

Change-Id: Ia384fbd972d8866f5dd893c523b5d43ef17e6458
diff --git a/app/pom.xml b/app/pom.xml
index 3f77eb6..9341b25 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -49,6 +49,13 @@
         </dependency>
 
         <dependency>
+            <groupId>org.onosproject</groupId>
+            <artifactId>onos-core-serializers</artifactId>
+            <version>${onos.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
             <groupId>org.opencord</groupId>
             <artifactId>sadis-api</artifactId>
             <version>${sadis.api.version}</version>
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java
index 1e324aa..7d08737 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayAllocationsCommand.java
@@ -18,7 +18,7 @@
 import org.apache.karaf.shell.api.action.Command;
 import org.apache.karaf.shell.api.action.lifecycle.Service;
 import org.onosproject.cli.AbstractShellCommand;
-import org.opencord.dhcpl2relay.impl.DhcpL2Relay;
+import org.opencord.dhcpl2relay.DhcpL2RelayService;
 
 /**
  *  Shows the Successful DHCP allocations relayed by the dhcpl2relay.
@@ -29,7 +29,9 @@
 public class DhcpL2RelayAllocationsCommand extends AbstractShellCommand {
     @Override
     protected void doExecute() {
-        DhcpL2Relay.allocationMap().forEach((key, value) -> {
+        DhcpL2RelayService service = get(DhcpL2RelayService.class);
+
+        service.getAllocationInfo().forEach((key, value) -> {
             print("SubscriberId=%s,ConnectPoint=%s,State=%s,MAC=%s,CircuitId=%s" +
                             ",IP Allocated=%s,Allocation Timestamp=%s",
                     key, value.location(), value.type(), value.macAddress().toString(), value.circuitId(),
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2c1081b..2d17579 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -15,33 +15,10 @@
  */
 package org.opencord.dhcpl2relay.impl;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 import org.apache.commons.io.HexDump;
 import org.onlab.packet.DHCP;
 import org.onlab.packet.Ethernet;
@@ -52,9 +29,12 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
@@ -87,6 +67,11 @@
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.opencord.dhcpl2relay.DhcpAllocationInfo;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 import org.opencord.dhcpl2relay.DhcpL2RelayListener;
@@ -106,7 +91,39 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
 
 /**
  * DHCP Relay Agent Application Component.
@@ -125,6 +142,7 @@
     public static final String DHCP_L2RELAY_APP = "org.opencord.dhcpl2relay";
     private static final String HOST_LOC_PROVIDER =
             "org.onosproject.provider.host.impl.HostLocationProvider";
+    private static final String LEADER_TOPIC = "dhcpl2relay-leader";
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final InternalConfigListener cfgListener =
             new InternalConfigListener();
@@ -165,11 +183,20 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     // OSGi Properties
     /** Add option 82 to relayed packets. */
     protected boolean option82 = OPTION_82_DEFAULT;
@@ -197,7 +224,7 @@
     private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
     private ApplicationId appId;
 
-    static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
+    private ConsistentMap<String, DhcpAllocationInfo> allocations;
     protected boolean modifyClientPktsSrcDstMac = false;
     //Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
     protected boolean useOltUplink = false;
@@ -214,6 +241,21 @@
         componentConfigService.registerProperties(getClass());
         eventDispatcher.addSink(DhcpL2RelayEvent.class, listenerRegistry);
 
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(Instant.class)
+                .register(DHCP.MsgType.class)
+                .register(DhcpAllocationInfo.class)
+                .build();
+
+        allocations = storageService.<String, DhcpAllocationInfo>consistentMapBuilder()
+                .withName("dhcpl2relay-allocations")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
+        leadershipService.runForLeadership(LEADER_TOPIC);
+
         cfgService.addListener(cfgListener);
         mastershipService.addListener(changeListener);
         deviceService.addListener(deviceListener);
@@ -253,6 +295,7 @@
         deviceService.removeListener(deviceListener);
         mastershipService.removeListener(changeListener);
         eventDispatcher.removeSink(DhcpL2RelayEvent.class);
+        leadershipService.withdraw(LEADER_TOPIC);
         log.info("DHCP-L2-RELAY Stopped");
     }
 
@@ -307,6 +350,11 @@
         }
     }
 
+    @Override
+    public Map<String, DhcpAllocationInfo> getAllocationInfo() {
+        return ImmutableMap.copyOf(allocations.asJavaMap());
+    }
+
     /**
      * Publish the counters to kafka.
      */
@@ -319,7 +367,7 @@
                             new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
                                     counterValue), dhcpCountersTopic, null));
                 } else { // Publish the counters per subscriber
-                    DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+                    DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
                     post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
                             new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
                                     counterValue), dhcpCountersTopic, counterKey.counterClassKey));
@@ -556,10 +604,6 @@
                 appId, Optional.of(cp.deviceId()));
     }
 
-    public static Map<String, DhcpAllocationInfo> allocationMap() {
-        return allocationMap;
-    }
-
     private SubscriberAndDeviceInformation getDevice(PacketContext context) {
         String serialNo = deviceService.getDevice(context.inPacket().
                 receivedFrom().deviceId()).serialNumber();
@@ -860,7 +904,7 @@
                     context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
                     entry.circuitId(), clientMac, clientIp);
 
-            allocationMap.put(entry.id(), info);
+            allocations.put(entry.id(), info);
 
             post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
                                       context.inPacket().receivedFrom()));
@@ -918,8 +962,7 @@
                 //storeDHCPAllocationInfo
                 DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
                         dhcpPayload.getPacketType(), entry.circuitId(), dstMac, ip);
-                allocationMap.put(entry.id(), info);
-                log.debug("DHCP Allocation Map {} is updated", allocationMap);
+                    allocations.put(entry.id(), info);
 
                 post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
             } // end storing of info
@@ -1077,6 +1120,14 @@
         }
     }
 
+    private void removeAllocations(Predicate<Map.Entry<String, Versioned<DhcpAllocationInfo>>> pred) {
+        allocations.stream()
+                .filter(pred)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toList())
+                .forEach(allocations::remove);
+    }
+
     /**
      * Handles Device status change for the devices which connect
      * to the DHCP server.
@@ -1084,25 +1135,34 @@
     private class InnerDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
+            // ensure only one instance handles the event
+            if (!Objects.equals(leadershipService.getLeader(LEADER_TOPIC), clusterService.getLocalNode().id())) {
+                return;
+            }
+
+            final DeviceId deviceId = event.subject().id();
+
             switch (event.type()) {
+                case DEVICE_REMOVED:
+                    log.info("Device removed {}", event.subject().id());
+                    removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+                    break;
                 case DEVICE_AVAILABILITY_CHANGED:
-                    log.info("Device Avail Changed {}", event.subject().id());
-                    DeviceId deviceId = event.subject().id();
-                    if (!deviceService.isAvailable(deviceId)) {
-                        log.warn("Device {} is not available ", deviceId);
-                        if (deviceService.getPorts(deviceId).isEmpty()) {
-                            allocationMap.entrySet().removeIf(entry -> deviceId.equals(entry.getValue().
-                                    location().deviceId()));
-                            log.info("Device {} is removed from DHCP allocationmap ", deviceId);
-                        }
+                    boolean available = deviceService.isAvailable(deviceId);
+                    log.info("Device Avail Changed {} to {}", event.subject().id(), available);
+
+                    if (!available && deviceService.getPorts(deviceId).isEmpty()) {
+                        removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+                        log.info("Device {} is removed from DHCP allocationmap ", deviceId);
                     }
                     break;
                 case PORT_REMOVED:
                     Port port = event.port();
-                    deviceId = event.subject().id();
                     log.info("Port {} is deleted on device {}", port, deviceId);
-                    allocationMap.entrySet().removeIf(entry -> port.number().equals(entry.getValue().
-                            location().port()) && deviceId.equals(entry.getValue().location().deviceId()));
+
+                    ConnectPoint cp = new ConnectPoint(deviceId, port.number());
+                    removeAllocations(e -> e.getValue().value().location().equals(cp));
+
                     log.info("Port {} on device {} is removed from DHCP allocationmap", event.port(), deviceId);
                     break;
                 default:
@@ -1147,7 +1207,7 @@
                         if (useOltUplink && isUplinkPortOfOlt(event.subject().id(), event.port())) {
                             requestDhcpPacketsFromConnectPoint(
                                 new ConnectPoint(event.subject().id(), event.port().number()),
-                                        Optional.ofNullable(null));
+                                        Optional.empty());
                         }
                         break;
                     default:
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
index 94a90c8..d4cb8bd 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -19,21 +19,23 @@
 import org.easymock.EasyMock;
 import org.junit.Before;
 import org.junit.Test;
-
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.*;
-
 import org.onlab.junit.TestUtils;
 import org.onlab.osgi.ComponentContextAdapter;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.config.Config;
 import org.onosproject.net.config.NetworkConfigRegistryAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
 
 import java.util.Set;
 
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
 /**
  * Tests for DHCP relay app configuration.
  */
@@ -62,6 +64,8 @@
         dhcpL2Relay.sadisService = new MockSadisService();
         dhcpL2Relay.hostService = new MockHostService();
         dhcpL2Relay.mastershipService = new MockMastershipService();
+        dhcpL2Relay.storageService = new TestStorageService();
+        dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
         dhcpL2Relay.activate(new ComponentContextAdapter());
     }
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 32bb0d3..30f651f 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,8 +22,9 @@
 import org.onlab.junit.TestUtils;
 import org.onlab.osgi.ComponentContextAdapter;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.common.event.impl.TestEventDispatcher;
+import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 
 import java.util.Iterator;
@@ -55,6 +56,8 @@
         dhcpL2Relay.deviceService = new MockDeviceService();
         dhcpL2Relay.sadisService = new MockSadisService();
         dhcpL2Relay.mastershipService = new MockMastershipService();
+        dhcpL2Relay.storageService = new TestStorageService();
+        dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
         dhcpL2Relay.activate(new ComponentContextAdapter());
         store = new SimpleDhcpL2RelayCountersStore();
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 6d0524e..42664cc 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -15,15 +15,7 @@
  */
 package org.opencord.dhcpl2relay.impl;
 
-import static org.junit.Assert.assertEquals;
-
-import java.nio.ByteBuffer;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.Lists;
 import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
@@ -34,14 +26,22 @@
 import org.onlab.packet.IPv4;
 import org.onlab.packet.UDP;
 import org.onlab.packet.dhcp.DhcpOption;
-
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
 
-import com.google.common.collect.Lists;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+
+import static org.junit.Assert.assertEquals;
 
 public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
 
@@ -67,6 +67,8 @@
         dhcpL2Relay.hostService = new MockHostService();
         dhcpL2Relay.mastershipService = new MockMastershipService();
         dhcpL2Relay.dhcpL2RelayCounters = new MockDhcpL2RelayCountersStore();
+        dhcpL2Relay.storageService = new TestStorageService();
+        dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
         dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
         dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());