Enable operation in a multi-instance ONOS cluster.

Shared state has been moved to ONOS consistent maps to ensure it
is available throughout the cluster.

Event handling work (e.g. port up, etc) is partitioned between nodes
in the cluster using consistent hashing based on device ID.

Subscriber provisioning requests can be handled by any instance
(the instance that receives the request handles it).

Change-Id: I65cf24a7a7fe4397e1559e5d1c770449979f2566
diff --git a/api/pom.xml b/api/pom.xml
index cb10179..b4d81dd 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -38,6 +38,7 @@
             <groupId>org.opencord</groupId>
             <artifactId>sadis-api</artifactId>
             <version>${sadis.api.version}</version>
+            <scope>provided</scope>
         </dependency>
     </dependencies>
 
diff --git a/app/pom.xml b/app/pom.xml
index 02f912d..287ab4d 100644
--- a/app/pom.xml
+++ b/app/pom.xml
@@ -72,6 +72,14 @@
 
         <dependency>
             <groupId>org.onosproject</groupId>
+            <artifactId>onos-api</artifactId>
+            <version>${onos.version}</version>
+            <classifier>tests</classifier>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.onosproject</groupId>
             <artifactId>onos-core-serializers</artifactId>
             <version>${onos.version}</version>
             <scope>provided</scope>
diff --git a/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java b/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
new file mode 100644
index 0000000..52e9b96
--- /dev/null
+++ b/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import com.google.common.hash.Hashing;
+import org.onosproject.cluster.NodeId;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Returns a server hosting a given key based on consistent hashing.
+ */
+public class ConsistentHasher {
+
+    private static class Entry implements Comparable<Entry> {
+        private final NodeId server;
+        private final int hash;
+
+        public Entry(NodeId server, int hash) {
+            this.server = server;
+            this.hash = hash;
+        }
+
+        public Entry(int hash) {
+            server = null;
+            this.hash = hash;
+        }
+
+        @Override
+        public int compareTo(Entry o) {
+            if (this.hash > o.hash) {
+                return 1;
+            } else if (this.hash < o.hash) {
+                return -1;
+            } // else
+            return 0;
+        }
+    }
+
+    private final int weight;
+
+    private List<Entry> table;
+
+    /**
+     * Creates a new hasher with the given list of servers.
+     *
+     * @param servers list of servers
+     * @param weight weight
+     */
+    public ConsistentHasher(List<NodeId> servers, int weight) {
+        this.weight = weight;
+
+        this.table = new ArrayList<>();
+
+        servers.forEach(this::addServer);
+    }
+
+    /**
+     * Adds a new server to the list of servers.
+     *
+     * @param server server ID
+     */
+    public synchronized void addServer(NodeId server) {
+        // Add weight number of buckets for each server
+        for (int i = 0; i < weight; i++) {
+            String label = server.toString() + i;
+            int hash = getHash(label);
+            Entry e = new Entry(server, hash);
+            table.add(e);
+        }
+
+        Collections.sort(table);
+    }
+
+    /**
+     * Removes a server from the list of servers.
+     *
+     * @param server server ID
+     */
+    public synchronized void removeServer(NodeId server) {
+        table.removeIf(e -> e.server.equals(server));
+    }
+
+    /**
+     * Hashes a given input string and finds that server that should
+     * handle the given ID.
+     *
+     * @param s input
+     * @return server ID
+     */
+    public synchronized NodeId hash(String s) {
+        Entry temp = new Entry(getHash(s));
+
+        int pos = Collections.binarySearch(this.table, temp);
+
+        // translate a negative not-found result into the closest following match
+        if (pos < 0) {
+            pos = Math.abs(pos + 1);
+        }
+
+        // wraparound if the hash was after the last entry in the table
+        if (pos == this.table.size()) {
+            pos = 0;
+        }
+
+        return table.get(pos).server;
+    }
+
+    private int getHash(String s) {
+        // stable, uniformly-distributed hash
+        return Hashing.murmur3_128().hashString(s, Charset.defaultCharset()).asInt();
+    }
+}
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 50a8fd6..000d5e6 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -16,14 +16,18 @@
 package org.opencord.olt.impl;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
@@ -39,6 +43,10 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
@@ -60,23 +68,30 @@
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -95,6 +110,8 @@
     private static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
 
+    public static final int HASH_WEIGHT = 10;
+
     private final Logger log = getLogger(getClass());
 
     private static final String NNI = "nni-";
@@ -103,9 +120,6 @@
     protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -123,6 +137,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected AccessDeviceMeterService oltMeterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     /**
      * Default bandwidth profile id that is used for authentication trap flows.
      **/
@@ -134,6 +154,9 @@
     protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+    private ConsistentHasher hasher;
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
     private BaseInformationService<BandwidthProfileInformation> bpService;
@@ -144,7 +167,7 @@
 
     protected ExecutorService eventExecutor;
 
-    private Map<ConnectPoint, Set<UniTagInformation>> programmedSubs;
+    private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -163,18 +186,37 @@
                 .preSetProperty("org.onosproject.net.meter.impl.MeterManager",
                                 "purgeOnDisconnection", "true");
         componentConfigService.registerProperties(getClass());
-        programmedSubs = Maps.newConcurrentMap();
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(UniTagInformation.class)
+                .build();
+
+        programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+                .withName("volt-programmed-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
 
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
         subsService = sadisService.getSubscriberInfoService();
         bpService = sadisService.getBandwidthProfileService();
 
+        List<NodeId> readyNodes = clusterService.getNodes().stream()
+                .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+                .map(ControllerNode::id)
+                .collect(toList());
+        hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+        clusterService.addListener(clusterListener);
+
         // look for all provisioned devices in Sadis and create EAPOL flows for the
         // UNI ports
         Iterable<Device> devices = deviceService.getDevices();
         for (Device d : devices) {
-            checkAndCreateDeviceFlows(d);
+            if (isDeviceMine(d.id())) {
+                checkAndCreateDeviceFlows(d);
+            }
         }
 
         deviceService.addListener(deviceListener);
@@ -184,6 +226,7 @@
     @Deactivate
     public void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
+        clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         log.info("Stopped");
@@ -253,7 +296,7 @@
         DeviceId deviceId = connectPoint.deviceId();
         PortNumber subscriberPortNo = connectPoint.port();
 
-        Set<UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint);
+        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
         if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
             log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
                              "no need to remove it", connectPoint);
@@ -373,7 +416,10 @@
 
     @Override
     public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
-        return ImmutableMap.copyOf(programmedSubs);
+        return programmedSubs.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
     }
 
     @Override
@@ -562,19 +608,11 @@
     }
 
     private void updateProgrammedSubscriber(ConnectPoint connectPoint, UniTagInformation tagInformation, Boolean add) {
-        programmedSubs.compute(connectPoint, (k, v) -> {
-            if (add) {
-                if (v == null) {
-                    v = Sets.newHashSet();
-                }
-                v.add(tagInformation);
-            } else {
-                if (v != null) {
-                    v.remove(tagInformation);
-                }
-            }
-            return v;
-        });
+        if (add) {
+            programmedSubs.put(connectPoint, tagInformation);
+        } else {
+            programmedSubs.remove(connectPoint, tagInformation);
+        }
     }
 
     /**
@@ -768,10 +806,6 @@
      * @param dev Device to look for
      */
     private void checkAndCreateDeviceFlows(Device dev) {
-        // we create only for the ones we are master of
-        if (!mastershipService.isLocalMaster(dev.id())) {
-            return;
-        }
         // check if this device is provisioned in Sadis
         SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
         log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
@@ -779,7 +813,7 @@
         if (deviceInfo != null) {
             // This is an OLT device as per Sadis, we create flows for UNI and NNI ports
             for (Port p : deviceService.getPorts(dev.id())) {
-                if (PortNumber.LOCAL.equals(p.number())) {
+                if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
                     continue;
                 }
                 if (isUniPort(dev, p)) {
@@ -871,6 +905,21 @@
         return subsService.get(devSerialNo);
     }
 
+    /**
+     * Determines if this instance should handle this device based on
+     * consistent hashing.
+     *
+     * @param id device ID
+     * @return true if this instance should handle the device, otherwise false
+     */
+    private boolean isDeviceMine(DeviceId id) {
+        NodeId nodeId = hasher.hash(id.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("Node that will handle {} is {}", id, nodeId);
+        }
+        return nodeId.equals(clusterService.getLocalNode().id());
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
 
@@ -888,6 +937,11 @@
                     return;
                 }
 
+                // Only handle the event if the device belongs to us
+                if (!isDeviceMine(event.subject().id())) {
+                    return;
+                }
+
                 log.debug("OLT got {} event for {} {}", eventType, event.subject(), event.port());
 
                 if (getOltInfo(dev) == null) {
@@ -936,8 +990,8 @@
                         if (!isUniPort(dev, port)) {
                             break;
                         }
-                        Set<UniTagInformation> uniTagInformationSet = programmedSubs
-                                .get(new ConnectPoint(devId, port.number()));
+                        ConnectPoint cp = new ConnectPoint(devId, port.number());
+                        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
                         if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
                             if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be processed for port updated {}", port);
@@ -988,6 +1042,7 @@
         private void handleDeviceDisconnection(Device device, boolean sendUniEvent) {
             programmedDevices.remove(device.id());
             removeAllSubscribers(device.id());
+            oltMeterService.clearMeters(device.id());
             post(new AccessDeviceEvent(
                     AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
                     null, null, null));
@@ -1008,12 +1063,25 @@
         }
 
         private void removeAllSubscribers(DeviceId deviceId) {
-            List<ConnectPoint> connectPoints = programmedSubs.keySet().stream()
-                    .filter(ks -> Objects.equals(ks.deviceId(), deviceId))
-                    .collect(Collectors.toList());
+            List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
+                    .filter(e -> e.getKey().deviceId().equals(deviceId))
+                    .collect(toList());
 
-            connectPoints.forEach(cp -> programmedSubs.remove(cp));
+            subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
         }
 
     }
+
+    private class InternalClusterListener implements ClusterEventListener {
+
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+                hasher.addServer(event.subject().id());
+            }
+            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+                hasher.removeServer(event.subject().id());
+            }
+        }
+    }
 }
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index bf49483..af8ba4a 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -380,14 +380,6 @@
             return;
         }
 
-        if (!mastershipService.isLocalMaster(devId)) {
-            log.warn("The master of the device {} is another instance", devId);
-            if (filterFuture != null) {
-                filterFuture.complete(ObjectiveError.DEVICEMISSING);
-            }
-            return;
-        }
-
         BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
         if (bpInfo == null) {
             log.warn("Bandwidth profile {} is not found. Authentication flow"
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 825604c..7a3a3a3 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -15,13 +15,10 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -40,6 +37,10 @@
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
 import org.onosproject.net.meter.MeterService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.osgi.service.component.ComponentContext;
@@ -54,18 +55,21 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Dictionary;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toSet;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS_DEFAULT;
@@ -88,11 +92,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
     protected boolean deleteMeters = true;
 
-    protected SetMultimap<String, MeterKey> bpInfoToMeter =
-            Multimaps.synchronizedSetMultimap(HashMultimap.create());
-    protected Set<MeterKey> programmedMeters;
+    ConsistentMultimap<String, MeterKey> bpInfoToMeter;
+
     private ApplicationId appId;
     private static final String APP_NAME = "org.opencord.olt";
 
@@ -107,7 +113,19 @@
         eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
                 "events-%d", log));
         appId = coreService.registerApplication(APP_NAME);
-        programmedMeters = Sets.newConcurrentHashSet();
+        modified(context);
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(MeterKey.class)
+                .build();
+
+        bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
+                .withName("volt-bp-info-to-meter")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
         meterService.addListener(meterListener);
         componentConfigService.registerProperties(getClass());
         log.info("Olt Meter service started");
@@ -131,23 +149,25 @@
 
     @Override
     public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
-        return ImmutableMap.copyOf(bpInfoToMeter.asMap());
+        return bpInfoToMeter.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
     }
 
-    @Override
-    public void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
+    void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
         bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
     }
 
     @Override
     public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
-        if (bpInfoToMeter.get(bandwidthProfile) == null) {
+        if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
             log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
                     bandwidthProfile);
             return null;
         }
 
-        Optional<MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile)
+        Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
                 .stream()
                 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
                 .findFirst();
@@ -164,7 +184,9 @@
 
     @Override
     public ImmutableSet<MeterKey> getProgMeters() {
-        return ImmutableSet.copyOf(programmedMeters);
+        return bpInfoToMeter.stream()
+                .map(Map.Entry::getValue)
+                .collect(ImmutableSet.toImmutableSet());
     }
 
     @Override
@@ -210,11 +232,19 @@
         Meter meter = meterService.submit(meterRequest);
         meterIdRef.set(meter.id());
         addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
-        programmedMeters.add(MeterKey.key(deviceId, meter.id()));
         log.info("Meter is created. Meter Id {}", meter.id());
         return meter.id();
     }
 
+    @Override
+    public void clearMeters(DeviceId deviceId) {
+        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+                .filter(e -> e.getValue().deviceId().equals(deviceId))
+                .collect(Collectors.toList());
+
+        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
+    }
+
     private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
         List<Band> meterBands = new ArrayList<>();
 
@@ -257,7 +287,6 @@
                 }
                 if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
                     log.info("Meter Removed Event is received for {}", meter.id());
-                    programmedMeters.remove(key);
                     pendingRemoveMeters.remove(key);
                     removeMeterFromBpMapping(key);
                 }
@@ -294,15 +323,11 @@
         }
 
         private void removeMeterFromBpMapping(MeterKey meterKey) {
-            Iterator<Map.Entry<String, MeterKey>> iterator = bpInfoToMeter.entries().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<String, MeterKey> entry = iterator.next();
-                if (entry.getValue().equals(meterKey)) {
-                    iterator.remove();
-                    log.info("Deleted meter for MeterKey {} - Last prog meters {}", meterKey, programmedMeters);
-                    break;
-                }
-            }
+            List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+                    .filter(e -> e.getValue().equals(meterKey))
+                    .collect(Collectors.toList());
+
+            meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
         }
     }
 }
diff --git a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
index 4f07131..48570cd 100644
--- a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
+++ b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
@@ -39,16 +39,6 @@
     ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings();
 
     /**
-     * Adds a bandwidthProfile-meterKey (device / meter) mapping that have been programmed
-     * in the data plane.
-     *
-     * @param deviceId         the access device id
-     * @param meterId          the meter id that is mapped to the bandwidth profile
-     * @param bandwidthProfile the bandwidth profile id
-     */
-    void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile);
-
-    /**
      * Returns the meter id for a given bandwidth profile.
      *
      * @param deviceId         the access device id
@@ -77,5 +67,10 @@
     MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
                         CompletableFuture<Object> meterFuture);
 
-
+    /**
+     * Clears out bandwidth profile to meter mappings for the given device.
+     *
+     * @param deviceId device ID
+     */
+    void clearMeters(DeviceId deviceId);
 }
diff --git a/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java b/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
new file mode 100644
index 0000000..1f682cb
--- /dev/null
+++ b/app/src/test/java/org/opencord/olt/impl/ConsistentHasherTest.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.net.DeviceId;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.equalTo;
+
+public class ConsistentHasherTest {
+
+    private static final int WEIGHT = 10;
+
+    private static final NodeId N1 = new NodeId("10.0.0.1");
+    private static final NodeId N2 = new NodeId("10.0.0.2");
+    private static final NodeId N3 = new NodeId("10.0.0.3");
+
+    private ConsistentHasher hasher;
+
+    @Before
+    public void setUp() {
+        List<NodeId> servers = new ArrayList<>();
+        servers.add(N1);
+        servers.add(N2);
+
+        hasher = new ConsistentHasher(servers, WEIGHT);
+    }
+
+    @Test
+    public void testHasher() {
+        DeviceId deviceId = DeviceId.deviceId("foo");
+        NodeId server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N1));
+
+        deviceId = DeviceId.deviceId("bsaf");
+        server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N2));
+    }
+
+    @Test
+    public void testAddServer() {
+        DeviceId deviceId = DeviceId.deviceId("foo");
+        NodeId server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N1));
+
+        hasher.addServer(N3);
+
+        server = hasher.hash(deviceId.toString());
+
+        assertThat(server, equalTo(N3));
+    }
+}
diff --git a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
index 16b407b..e0bd109 100644
--- a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -188,11 +188,6 @@
         }
 
         @Override
-        public void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
-
-        }
-
-        @Override
         public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
             return null;
         }
@@ -208,6 +203,10 @@
                                    CompletableFuture<Object> meterFuture) {
             return usMeterId;
         }
+
+        @Override
+        public void clearMeters(DeviceId deviceId) {
+        }
     }
 
     private class MockOltFlowObjectiveService implements org.onosproject.net.flowobjective.FlowObjectiveService {
diff --git a/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java b/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
index da7e3e1..000ea35 100644
--- a/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
+++ b/app/src/test/java/org/opencord/olt/impl/OltMeterTest.java
@@ -15,12 +15,11 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.Sets;
 import org.junit.Before;
 import org.junit.Test;
+import org.onosproject.cfg.ComponentConfigAdapter;
+import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.meter.DefaultMeter;
 import org.onosproject.net.meter.Meter;
@@ -28,6 +27,7 @@
 import org.onosproject.net.meter.MeterKey;
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.store.service.TestStorageService;
 import org.opencord.sadis.BandwidthProfileInformation;
 
 import java.util.Collection;
@@ -41,9 +41,12 @@
     @Before
     public void setUp() {
         oltMeterService = new OltMeterService();
-        oltMeterService.bpInfoToMeter = Multimaps.synchronizedSetMultimap(HashMultimap.create());
-        oltMeterService.programmedMeters = Sets.newConcurrentHashSet();
+        oltMeterService.storageService = new TestStorageService();
         oltMeterService.meterService = new MockMeterService();
+        oltMeterService.coreService = new CoreServiceAdapter();
+        oltMeterService.componentConfigService = new ComponentConfigAdapter();
+        oltMeterService.activate(null);
+        oltMeterService.bpInfoToMeter = new MockConsistentMultimap<>();
     }
 
     @Test
diff --git a/app/src/test/java/org/opencord/olt/impl/TestBase.java b/app/src/test/java/org/opencord/olt/impl/TestBase.java
index b8447a9..bc5a4d6 100644
--- a/app/src/test/java/org/opencord/olt/impl/TestBase.java
+++ b/app/src/test/java/org/opencord/olt/impl/TestBase.java
@@ -15,18 +15,33 @@
  */
 package org.opencord.olt.impl;
 
+import com.google.common.collect.HashMultimap;
 import com.google.common.collect.Maps;
+import com.google.common.collect.Multiset;
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.MacAddress;
 import org.onosproject.core.DefaultApplicationId;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.service.AsyncConsistentMultimap;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.ConsistentMultimapBuilder;
+import org.onosproject.store.service.MultimapEventListener;
+import org.onosproject.store.service.TestConsistentMultimap;
+import org.onosproject.store.service.Versioned;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
 import org.opencord.sadis.SubscriberAndDeviceInformation;
 
+import java.util.AbstractMap;
+import java.util.Collection;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.stream.Collectors;
 
 public class TestBase {
 
@@ -124,4 +139,187 @@
             this.setCircuitId(circuitId);
         }
     }
+
+    class MockConsistentMultimap<K, V> implements ConsistentMultimap<K, V> {
+        private HashMultimap<K, Versioned<V>> innermap;
+        private AtomicLong counter = new AtomicLong();
+
+        public MockConsistentMultimap() {
+            this.innermap = HashMultimap.create();
+        }
+
+        private Versioned<V> version(V v) {
+            return new Versioned<>(v, counter.incrementAndGet(), System.currentTimeMillis());
+        }
+
+        private Versioned<Collection<? extends V>> versionCollection(Collection<? extends V> collection) {
+            return new Versioned<>(collection, counter.incrementAndGet(), System.currentTimeMillis());
+        }
+
+        @Override
+        public int size() {
+            return innermap.size();
+        }
+
+        @Override
+        public boolean isEmpty() {
+            return innermap.isEmpty();
+        }
+
+        @Override
+        public boolean containsKey(K key) {
+            return innermap.containsKey(key);
+        }
+
+        @Override
+        public boolean containsValue(V value) {
+            return innermap.containsValue(value);
+        }
+
+        @Override
+        public boolean containsEntry(K key, V value) {
+            return innermap.containsEntry(key, value);
+        }
+
+        @Override
+        public boolean put(K key, V value) {
+            return innermap.put(key, version(value));
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> putAndGet(K key, V value) {
+            innermap.put(key, version(value));
+            return (Versioned<Collection<? extends V>>) innermap.get(key);
+        }
+
+        @Override
+        public boolean remove(K key, V value) {
+            return innermap.remove(key, value);
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> removeAndGet(K key, V value) {
+            innermap.remove(key, value);
+            return (Versioned<Collection<? extends V>>) innermap.get(key);
+        }
+
+        @Override
+        public boolean removeAll(K key, Collection<? extends V> values) {
+            return false;
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> removeAll(K key) {
+            return null;
+        }
+
+        @Override
+        public boolean putAll(K key, Collection<? extends V> values) {
+            return false;
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> replaceValues(K key, Collection<V> values) {
+            return null;
+        }
+
+        @Override
+        public void clear() {
+            innermap.clear();
+        }
+
+        @Override
+        public Versioned<Collection<? extends V>> get(K key) {
+            Collection<? extends V> values = innermap.get(key).stream()
+                    .map(v -> v.value())
+                    .collect(Collectors.toList());
+            return versionCollection(values);
+        }
+
+        @Override
+        public Set<K> keySet() {
+            return innermap.keySet();
+        }
+
+        @Override
+        public Multiset<K> keys() {
+            return innermap.keys();
+        }
+
+        @Override
+        public Multiset<V> values() {
+            return null;
+        }
+
+        @Override
+        public Collection<Map.Entry<K, V>> entries() {
+            return null;
+        }
+
+        @Override
+        public Iterator<Map.Entry<K, V>> iterator() {
+            return new ConsistentMultimapIterator(innermap.entries().iterator());
+        }
+
+        @Override
+        public Map<K, Collection<V>> asMap() {
+            return null;
+        }
+
+        @Override
+        public void addListener(MultimapEventListener<K, V> listener, Executor executor) {
+        }
+
+        @Override
+        public void removeListener(MultimapEventListener<K, V> listener) {
+        }
+
+        @Override
+        public String name() {
+            return "mock multimap";
+        }
+
+        @Override
+        public Type primitiveType() {
+            return null;
+        }
+
+        private class ConsistentMultimapIterator implements Iterator<Map.Entry<K, V>> {
+
+            private final Iterator<Map.Entry<K, Versioned<V>>> it;
+
+            public ConsistentMultimapIterator(Iterator<Map.Entry<K, Versioned<V>>> it) {
+                this.it = it;
+            }
+
+            @Override
+            public boolean hasNext() {
+                return it.hasNext();
+            }
+
+            @Override
+            public Map.Entry<K, V> next() {
+                Map.Entry<K, Versioned<V>> e = it.next();
+                return new AbstractMap.SimpleEntry<>(e.getKey(), e.getValue().value());
+            }
+        }
+
+    }
+
+    public static TestConsistentMultimap.Builder builder() {
+        return new TestConsistentMultimap.Builder();
+    }
+
+    public static class Builder<K, V> extends ConsistentMultimapBuilder<K, V> {
+
+        @Override
+        public AsyncConsistentMultimap<K, V> buildMultimap() {
+            return null;
+        }
+
+        @Override
+        public ConsistentMultimap<K, V> build() {
+            return new TestConsistentMultimap<K, V>();
+        }
+    }
 }