Enable operation in a multi-instance cluster

Change-Id: Ia384fbd972d8866f5dd893c523b5d43ef17e6458
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2c1081b..2d17579 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -15,33 +15,10 @@
  */
 package org.opencord.dhcpl2relay.impl;
 
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
-import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
-import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
-import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-
-import java.io.ByteArrayOutputStream;
-import java.nio.ByteBuffer;
-import java.util.AbstractMap;
-import java.util.ArrayList;
-import java.util.Dictionary;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
 import org.apache.commons.io.HexDump;
 import org.onlab.packet.DHCP;
 import org.onlab.packet.Ethernet;
@@ -52,9 +29,12 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
@@ -87,6 +67,11 @@
 import org.onosproject.net.packet.PacketPriority;
 import org.onosproject.net.packet.PacketProcessor;
 import org.onosproject.net.packet.PacketService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.Versioned;
 import org.opencord.dhcpl2relay.DhcpAllocationInfo;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 import org.opencord.dhcpl2relay.DhcpL2RelayListener;
@@ -106,7 +91,39 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.*;
+import java.io.ByteArrayOutputStream;
+import java.nio.ByteBuffer;
+import java.time.Instant;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+
+import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
+import static org.onlab.packet.MacAddress.valueOf;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.getIntegerProperty;
+import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
 
 /**
  * DHCP Relay Agent Application Component.
@@ -125,6 +142,7 @@
     public static final String DHCP_L2RELAY_APP = "org.opencord.dhcpl2relay";
     private static final String HOST_LOC_PROVIDER =
             "org.onosproject.provider.host.impl.HostLocationProvider";
+    private static final String LEADER_TOPIC = "dhcpl2relay-leader";
     private final Logger log = LoggerFactory.getLogger(getClass());
     private final InternalConfigListener cfgListener =
             new InternalConfigListener();
@@ -165,11 +183,20 @@
     protected MastershipService mastershipService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DhcpL2RelayCountersStore dhcpL2RelayCounters;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     // OSGi Properties
     /** Add option 82 to relayed packets. */
     protected boolean option82 = OPTION_82_DEFAULT;
@@ -197,7 +224,7 @@
     private MacAddress dhcpConnectMac = MacAddress.BROADCAST;
     private ApplicationId appId;
 
-    static Map<String, DhcpAllocationInfo> allocationMap = Maps.newConcurrentMap();
+    private ConsistentMap<String, DhcpAllocationInfo> allocations;
     protected boolean modifyClientPktsSrcDstMac = false;
     //Whether to use the uplink port of the OLTs to send/receive messages to the DHCP server
     protected boolean useOltUplink = false;
@@ -214,6 +241,21 @@
         componentConfigService.registerProperties(getClass());
         eventDispatcher.addSink(DhcpL2RelayEvent.class, listenerRegistry);
 
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(Instant.class)
+                .register(DHCP.MsgType.class)
+                .register(DhcpAllocationInfo.class)
+                .build();
+
+        allocations = storageService.<String, DhcpAllocationInfo>consistentMapBuilder()
+                .withName("dhcpl2relay-allocations")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
+        leadershipService.runForLeadership(LEADER_TOPIC);
+
         cfgService.addListener(cfgListener);
         mastershipService.addListener(changeListener);
         deviceService.addListener(deviceListener);
@@ -253,6 +295,7 @@
         deviceService.removeListener(deviceListener);
         mastershipService.removeListener(changeListener);
         eventDispatcher.removeSink(DhcpL2RelayEvent.class);
+        leadershipService.withdraw(LEADER_TOPIC);
         log.info("DHCP-L2-RELAY Stopped");
     }
 
@@ -307,6 +350,11 @@
         }
     }
 
+    @Override
+    public Map<String, DhcpAllocationInfo> getAllocationInfo() {
+        return ImmutableMap.copyOf(allocations.asJavaMap());
+    }
+
     /**
      * Publish the counters to kafka.
      */
@@ -319,7 +367,7 @@
                             new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
                                     counterValue), dhcpCountersTopic, null));
                 } else { // Publish the counters per subscriber
-                    DhcpAllocationInfo info = allocationMap.get(counterKey.counterClassKey);
+                    DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
                     post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
                             new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
                                     counterValue), dhcpCountersTopic, counterKey.counterClassKey));
@@ -556,10 +604,6 @@
                 appId, Optional.of(cp.deviceId()));
     }
 
-    public static Map<String, DhcpAllocationInfo> allocationMap() {
-        return allocationMap;
-    }
-
     private SubscriberAndDeviceInformation getDevice(PacketContext context) {
         String serialNo = deviceService.getDevice(context.inPacket().
                 receivedFrom().deviceId()).serialNumber();
@@ -860,7 +904,7 @@
                     context.inPacket().receivedFrom(), dhcpPacket.getPacketType(),
                     entry.circuitId(), clientMac, clientIp);
 
-            allocationMap.put(entry.id(), info);
+            allocations.put(entry.id(), info);
 
             post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info,
                                       context.inPacket().receivedFrom()));
@@ -918,8 +962,7 @@
                 //storeDHCPAllocationInfo
                 DhcpAllocationInfo info = new DhcpAllocationInfo(subsCp,
                         dhcpPayload.getPacketType(), entry.circuitId(), dstMac, ip);
-                allocationMap.put(entry.id(), info);
-                log.debug("DHCP Allocation Map {} is updated", allocationMap);
+                    allocations.put(entry.id(), info);
 
                 post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.UPDATED, info, subsCp));
             } // end storing of info
@@ -1077,6 +1120,14 @@
         }
     }
 
+    private void removeAllocations(Predicate<Map.Entry<String, Versioned<DhcpAllocationInfo>>> pred) {
+        allocations.stream()
+                .filter(pred)
+                .map(Map.Entry::getKey)
+                .collect(Collectors.toList())
+                .forEach(allocations::remove);
+    }
+
     /**
      * Handles Device status change for the devices which connect
      * to the DHCP server.
@@ -1084,25 +1135,34 @@
     private class InnerDeviceListener implements DeviceListener {
         @Override
         public void event(DeviceEvent event) {
+            // ensure only one instance handles the event
+            if (!Objects.equals(leadershipService.getLeader(LEADER_TOPIC), clusterService.getLocalNode().id())) {
+                return;
+            }
+
+            final DeviceId deviceId = event.subject().id();
+
             switch (event.type()) {
+                case DEVICE_REMOVED:
+                    log.info("Device removed {}", event.subject().id());
+                    removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+                    break;
                 case DEVICE_AVAILABILITY_CHANGED:
-                    log.info("Device Avail Changed {}", event.subject().id());
-                    DeviceId deviceId = event.subject().id();
-                    if (!deviceService.isAvailable(deviceId)) {
-                        log.warn("Device {} is not available ", deviceId);
-                        if (deviceService.getPorts(deviceId).isEmpty()) {
-                            allocationMap.entrySet().removeIf(entry -> deviceId.equals(entry.getValue().
-                                    location().deviceId()));
-                            log.info("Device {} is removed from DHCP allocationmap ", deviceId);
-                        }
+                    boolean available = deviceService.isAvailable(deviceId);
+                    log.info("Device Avail Changed {} to {}", event.subject().id(), available);
+
+                    if (!available && deviceService.getPorts(deviceId).isEmpty()) {
+                        removeAllocations(e -> e.getValue().value().location().deviceId().equals(deviceId));
+                        log.info("Device {} is removed from DHCP allocationmap ", deviceId);
                     }
                     break;
                 case PORT_REMOVED:
                     Port port = event.port();
-                    deviceId = event.subject().id();
                     log.info("Port {} is deleted on device {}", port, deviceId);
-                    allocationMap.entrySet().removeIf(entry -> port.number().equals(entry.getValue().
-                            location().port()) && deviceId.equals(entry.getValue().location().deviceId()));
+
+                    ConnectPoint cp = new ConnectPoint(deviceId, port.number());
+                    removeAllocations(e -> e.getValue().value().location().equals(cp));
+
                     log.info("Port {} on device {} is removed from DHCP allocationmap", event.port(), deviceId);
                     break;
                 default:
@@ -1147,7 +1207,7 @@
                         if (useOltUplink && isUplinkPortOfOlt(event.subject().id(), event.port())) {
                             requestDhcpPacketsFromConnectPoint(
                                 new ConnectPoint(event.subject().id(), event.port().number()),
-                                        Optional.ofNullable(null));
+                                        Optional.empty());
                         }
                         break;
                     default: