Enable operation in a multi-instance ONOS cluster.

Shared state has been moved to ONOS consistent maps to ensure it
is available throughout the cluster.

Event handling work (e.g. port up, etc) is partitioned between nodes
in the cluster using consistent hashing based on device ID.

Subscriber provisioning requests can be handled by any instance
(the instance that receives the request handles it).

Change-Id: I65cf24a7a7fe4397e1559e5d1c770449979f2566
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 50a8fd6..000d5e6 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -16,14 +16,18 @@
 package org.opencord.olt.impl;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
@@ -39,6 +43,10 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
@@ -60,23 +68,30 @@
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -95,6 +110,8 @@
     private static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
 
+    public static final int HASH_WEIGHT = 10;
+
     private final Logger log = getLogger(getClass());
 
     private static final String NNI = "nni-";
@@ -103,9 +120,6 @@
     protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -123,6 +137,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected AccessDeviceMeterService oltMeterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     /**
      * Default bandwidth profile id that is used for authentication trap flows.
      **/
@@ -134,6 +154,9 @@
     protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+    private ConsistentHasher hasher;
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
     private BaseInformationService<BandwidthProfileInformation> bpService;
@@ -144,7 +167,7 @@
 
     protected ExecutorService eventExecutor;
 
-    private Map<ConnectPoint, Set<UniTagInformation>> programmedSubs;
+    private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -163,18 +186,37 @@
                 .preSetProperty("org.onosproject.net.meter.impl.MeterManager",
                                 "purgeOnDisconnection", "true");
         componentConfigService.registerProperties(getClass());
-        programmedSubs = Maps.newConcurrentMap();
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(UniTagInformation.class)
+                .build();
+
+        programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+                .withName("volt-programmed-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
 
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
         subsService = sadisService.getSubscriberInfoService();
         bpService = sadisService.getBandwidthProfileService();
 
+        List<NodeId> readyNodes = clusterService.getNodes().stream()
+                .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+                .map(ControllerNode::id)
+                .collect(toList());
+        hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+        clusterService.addListener(clusterListener);
+
         // look for all provisioned devices in Sadis and create EAPOL flows for the
         // UNI ports
         Iterable<Device> devices = deviceService.getDevices();
         for (Device d : devices) {
-            checkAndCreateDeviceFlows(d);
+            if (isDeviceMine(d.id())) {
+                checkAndCreateDeviceFlows(d);
+            }
         }
 
         deviceService.addListener(deviceListener);
@@ -184,6 +226,7 @@
     @Deactivate
     public void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
+        clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         log.info("Stopped");
@@ -253,7 +296,7 @@
         DeviceId deviceId = connectPoint.deviceId();
         PortNumber subscriberPortNo = connectPoint.port();
 
-        Set<UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint);
+        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
         if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
             log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
                              "no need to remove it", connectPoint);
@@ -373,7 +416,10 @@
 
     @Override
     public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
-        return ImmutableMap.copyOf(programmedSubs);
+        return programmedSubs.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
     }
 
     @Override
@@ -562,19 +608,11 @@
     }
 
     private void updateProgrammedSubscriber(ConnectPoint connectPoint, UniTagInformation tagInformation, Boolean add) {
-        programmedSubs.compute(connectPoint, (k, v) -> {
-            if (add) {
-                if (v == null) {
-                    v = Sets.newHashSet();
-                }
-                v.add(tagInformation);
-            } else {
-                if (v != null) {
-                    v.remove(tagInformation);
-                }
-            }
-            return v;
-        });
+        if (add) {
+            programmedSubs.put(connectPoint, tagInformation);
+        } else {
+            programmedSubs.remove(connectPoint, tagInformation);
+        }
     }
 
     /**
@@ -768,10 +806,6 @@
      * @param dev Device to look for
      */
     private void checkAndCreateDeviceFlows(Device dev) {
-        // we create only for the ones we are master of
-        if (!mastershipService.isLocalMaster(dev.id())) {
-            return;
-        }
         // check if this device is provisioned in Sadis
         SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
         log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
@@ -779,7 +813,7 @@
         if (deviceInfo != null) {
             // This is an OLT device as per Sadis, we create flows for UNI and NNI ports
             for (Port p : deviceService.getPorts(dev.id())) {
-                if (PortNumber.LOCAL.equals(p.number())) {
+                if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
                     continue;
                 }
                 if (isUniPort(dev, p)) {
@@ -871,6 +905,21 @@
         return subsService.get(devSerialNo);
     }
 
+    /**
+     * Determines if this instance should handle this device based on
+     * consistent hashing.
+     *
+     * @param id device ID
+     * @return true if this instance should handle the device, otherwise false
+     */
+    private boolean isDeviceMine(DeviceId id) {
+        NodeId nodeId = hasher.hash(id.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("Node that will handle {} is {}", id, nodeId);
+        }
+        return nodeId.equals(clusterService.getLocalNode().id());
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
 
@@ -888,6 +937,11 @@
                     return;
                 }
 
+                // Only handle the event if the device belongs to us
+                if (!isDeviceMine(event.subject().id())) {
+                    return;
+                }
+
                 log.debug("OLT got {} event for {} {}", eventType, event.subject(), event.port());
 
                 if (getOltInfo(dev) == null) {
@@ -936,8 +990,8 @@
                         if (!isUniPort(dev, port)) {
                             break;
                         }
-                        Set<UniTagInformation> uniTagInformationSet = programmedSubs
-                                .get(new ConnectPoint(devId, port.number()));
+                        ConnectPoint cp = new ConnectPoint(devId, port.number());
+                        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
                         if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
                             if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be processed for port updated {}", port);
@@ -988,6 +1042,7 @@
         private void handleDeviceDisconnection(Device device, boolean sendUniEvent) {
             programmedDevices.remove(device.id());
             removeAllSubscribers(device.id());
+            oltMeterService.clearMeters(device.id());
             post(new AccessDeviceEvent(
                     AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
                     null, null, null));
@@ -1008,12 +1063,25 @@
         }
 
         private void removeAllSubscribers(DeviceId deviceId) {
-            List<ConnectPoint> connectPoints = programmedSubs.keySet().stream()
-                    .filter(ks -> Objects.equals(ks.deviceId(), deviceId))
-                    .collect(Collectors.toList());
+            List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
+                    .filter(e -> e.getKey().deviceId().equals(deviceId))
+                    .collect(toList());
 
-            connectPoints.forEach(cp -> programmedSubs.remove(cp));
+            subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
         }
 
     }
+
+    private class InternalClusterListener implements ClusterEventListener {
+
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+                hasher.addServer(event.subject().id());
+            }
+            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+                hasher.removeServer(event.subject().id());
+            }
+        }
+    }
 }