Enable operation in a multi-instance ONOS cluster.

Shared state has been moved to ONOS consistent maps to ensure it
is available throughout the cluster.

Event handling work (e.g. port up, etc) is partitioned between nodes
in the cluster using consistent hashing based on device ID.

Subscriber provisioning requests can be handled by any instance
(the instance that receives the request handles it).

Change-Id: I65cf24a7a7fe4397e1559e5d1c770449979f2566
diff --git a/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java b/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
new file mode 100644
index 0000000..52e9b96
--- /dev/null
+++ b/app/src/main/java/org/opencord/olt/impl/ConsistentHasher.java
@@ -0,0 +1,130 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.impl;
+
+import com.google.common.hash.Hashing;
+import org.onosproject.cluster.NodeId;
+
+import java.nio.charset.Charset;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * Returns a server hosting a given key based on consistent hashing.
+ */
+public class ConsistentHasher {
+
+    private static class Entry implements Comparable<Entry> {
+        private final NodeId server;
+        private final int hash;
+
+        public Entry(NodeId server, int hash) {
+            this.server = server;
+            this.hash = hash;
+        }
+
+        public Entry(int hash) {
+            server = null;
+            this.hash = hash;
+        }
+
+        @Override
+        public int compareTo(Entry o) {
+            if (this.hash > o.hash) {
+                return 1;
+            } else if (this.hash < o.hash) {
+                return -1;
+            } // else
+            return 0;
+        }
+    }
+
+    private final int weight;
+
+    private List<Entry> table;
+
+    /**
+     * Creates a new hasher with the given list of servers.
+     *
+     * @param servers list of servers
+     * @param weight weight
+     */
+    public ConsistentHasher(List<NodeId> servers, int weight) {
+        this.weight = weight;
+
+        this.table = new ArrayList<>();
+
+        servers.forEach(this::addServer);
+    }
+
+    /**
+     * Adds a new server to the list of servers.
+     *
+     * @param server server ID
+     */
+    public synchronized void addServer(NodeId server) {
+        // Add weight number of buckets for each server
+        for (int i = 0; i < weight; i++) {
+            String label = server.toString() + i;
+            int hash = getHash(label);
+            Entry e = new Entry(server, hash);
+            table.add(e);
+        }
+
+        Collections.sort(table);
+    }
+
+    /**
+     * Removes a server from the list of servers.
+     *
+     * @param server server ID
+     */
+    public synchronized void removeServer(NodeId server) {
+        table.removeIf(e -> e.server.equals(server));
+    }
+
+    /**
+     * Hashes a given input string and finds that server that should
+     * handle the given ID.
+     *
+     * @param s input
+     * @return server ID
+     */
+    public synchronized NodeId hash(String s) {
+        Entry temp = new Entry(getHash(s));
+
+        int pos = Collections.binarySearch(this.table, temp);
+
+        // translate a negative not-found result into the closest following match
+        if (pos < 0) {
+            pos = Math.abs(pos + 1);
+        }
+
+        // wraparound if the hash was after the last entry in the table
+        if (pos == this.table.size()) {
+            pos = 0;
+        }
+
+        return table.get(pos).server;
+    }
+
+    private int getHash(String s) {
+        // stable, uniformly-distributed hash
+        return Hashing.murmur3_128().hashString(s, Charset.defaultCharset()).asInt();
+    }
+}
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 50a8fd6..000d5e6 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -16,14 +16,18 @@
 package org.opencord.olt.impl;
 
 import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import org.onlab.packet.VlanId;
+import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterEvent;
+import org.onosproject.cluster.ClusterEventListener;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.ControllerNode;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.event.AbstractListenerManager;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.Device;
@@ -39,6 +43,10 @@
 import org.onosproject.net.flowobjective.ObjectiveContext;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
@@ -60,23 +68,30 @@
 import org.slf4j.Logger;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Dictionary;
 import java.util.List;
 import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.stream.Collectors;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
 import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
+import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 /**
@@ -95,6 +110,8 @@
     private static final short EAPOL_DEFAULT_VLAN = 4091;
     private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
 
+    public static final int HASH_WEIGHT = 10;
+
     private final Logger log = getLogger(getClass());
 
     private static final String NNI = "nni-";
@@ -103,9 +120,6 @@
     protected FlowObjectiveService flowObjectiveService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
@@ -123,6 +137,12 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected AccessDeviceMeterService oltMeterService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
     /**
      * Default bandwidth profile id that is used for authentication trap flows.
      **/
@@ -134,6 +154,9 @@
     protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME;
 
     private final DeviceListener deviceListener = new InternalDeviceListener();
+    private final ClusterEventListener clusterListener = new InternalClusterListener();
+
+    private ConsistentHasher hasher;
 
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
     private BaseInformationService<BandwidthProfileInformation> bpService;
@@ -144,7 +167,7 @@
 
     protected ExecutorService eventExecutor;
 
-    private Map<ConnectPoint, Set<UniTagInformation>> programmedSubs;
+    private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -163,18 +186,37 @@
                 .preSetProperty("org.onosproject.net.meter.impl.MeterManager",
                                 "purgeOnDisconnection", "true");
         componentConfigService.registerProperties(getClass());
-        programmedSubs = Maps.newConcurrentMap();
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(UniTagInformation.class)
+                .build();
+
+        programmedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+                .withName("volt-programmed-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
 
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
         subsService = sadisService.getSubscriberInfoService();
         bpService = sadisService.getBandwidthProfileService();
 
+        List<NodeId> readyNodes = clusterService.getNodes().stream()
+                .filter(c -> clusterService.getState(c.id()) == ControllerNode.State.READY)
+                .map(ControllerNode::id)
+                .collect(toList());
+        hasher = new ConsistentHasher(readyNodes, HASH_WEIGHT);
+        clusterService.addListener(clusterListener);
+
         // look for all provisioned devices in Sadis and create EAPOL flows for the
         // UNI ports
         Iterable<Device> devices = deviceService.getDevices();
         for (Device d : devices) {
-            checkAndCreateDeviceFlows(d);
+            if (isDeviceMine(d.id())) {
+                checkAndCreateDeviceFlows(d);
+            }
         }
 
         deviceService.addListener(deviceListener);
@@ -184,6 +226,7 @@
     @Deactivate
     public void deactivate() {
         componentConfigService.unregisterProperties(getClass(), false);
+        clusterService.removeListener(clusterListener);
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         log.info("Stopped");
@@ -253,7 +296,7 @@
         DeviceId deviceId = connectPoint.deviceId();
         PortNumber subscriberPortNo = connectPoint.port();
 
-        Set<UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint);
+        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(connectPoint).value();
         if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
             log.warn("Subscriber on connectionPoint {} was not previously programmed, " +
                              "no need to remove it", connectPoint);
@@ -373,7 +416,10 @@
 
     @Override
     public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs() {
-        return ImmutableMap.copyOf(programmedSubs);
+        return programmedSubs.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
     }
 
     @Override
@@ -562,19 +608,11 @@
     }
 
     private void updateProgrammedSubscriber(ConnectPoint connectPoint, UniTagInformation tagInformation, Boolean add) {
-        programmedSubs.compute(connectPoint, (k, v) -> {
-            if (add) {
-                if (v == null) {
-                    v = Sets.newHashSet();
-                }
-                v.add(tagInformation);
-            } else {
-                if (v != null) {
-                    v.remove(tagInformation);
-                }
-            }
-            return v;
-        });
+        if (add) {
+            programmedSubs.put(connectPoint, tagInformation);
+        } else {
+            programmedSubs.remove(connectPoint, tagInformation);
+        }
     }
 
     /**
@@ -768,10 +806,6 @@
      * @param dev Device to look for
      */
     private void checkAndCreateDeviceFlows(Device dev) {
-        // we create only for the ones we are master of
-        if (!mastershipService.isLocalMaster(dev.id())) {
-            return;
-        }
         // check if this device is provisioned in Sadis
         SubscriberAndDeviceInformation deviceInfo = getOltInfo(dev);
         log.info("checkAndCreateDeviceFlows: deviceInfo {}", deviceInfo);
@@ -779,7 +813,7 @@
         if (deviceInfo != null) {
             // This is an OLT device as per Sadis, we create flows for UNI and NNI ports
             for (Port p : deviceService.getPorts(dev.id())) {
-                if (PortNumber.LOCAL.equals(p.number())) {
+                if (PortNumber.LOCAL.equals(p.number()) || !p.isEnabled()) {
                     continue;
                 }
                 if (isUniPort(dev, p)) {
@@ -871,6 +905,21 @@
         return subsService.get(devSerialNo);
     }
 
+    /**
+     * Determines if this instance should handle this device based on
+     * consistent hashing.
+     *
+     * @param id device ID
+     * @return true if this instance should handle the device, otherwise false
+     */
+    private boolean isDeviceMine(DeviceId id) {
+        NodeId nodeId = hasher.hash(id.toString());
+        if (log.isDebugEnabled()) {
+            log.debug("Node that will handle {} is {}", id, nodeId);
+        }
+        return nodeId.equals(clusterService.getLocalNode().id());
+    }
+
     private class InternalDeviceListener implements DeviceListener {
         private Set<DeviceId> programmedDevices = Sets.newConcurrentHashSet();
 
@@ -888,6 +937,11 @@
                     return;
                 }
 
+                // Only handle the event if the device belongs to us
+                if (!isDeviceMine(event.subject().id())) {
+                    return;
+                }
+
                 log.debug("OLT got {} event for {} {}", eventType, event.subject(), event.port());
 
                 if (getOltInfo(dev) == null) {
@@ -936,8 +990,8 @@
                         if (!isUniPort(dev, port)) {
                             break;
                         }
-                        Set<UniTagInformation> uniTagInformationSet = programmedSubs
-                                .get(new ConnectPoint(devId, port.number()));
+                        ConnectPoint cp = new ConnectPoint(devId, port.number());
+                        Collection<? extends UniTagInformation> uniTagInformationSet = programmedSubs.get(cp).value();
                         if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
                             if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
                                 log.info("eapol will be processed for port updated {}", port);
@@ -988,6 +1042,7 @@
         private void handleDeviceDisconnection(Device device, boolean sendUniEvent) {
             programmedDevices.remove(device.id());
             removeAllSubscribers(device.id());
+            oltMeterService.clearMeters(device.id());
             post(new AccessDeviceEvent(
                     AccessDeviceEvent.Type.DEVICE_DISCONNECTED, device.id(),
                     null, null, null));
@@ -1008,12 +1063,25 @@
         }
 
         private void removeAllSubscribers(DeviceId deviceId) {
-            List<ConnectPoint> connectPoints = programmedSubs.keySet().stream()
-                    .filter(ks -> Objects.equals(ks.deviceId(), deviceId))
-                    .collect(Collectors.toList());
+            List<Map.Entry<ConnectPoint, UniTagInformation>> subs = programmedSubs.stream()
+                    .filter(e -> e.getKey().deviceId().equals(deviceId))
+                    .collect(toList());
 
-            connectPoints.forEach(cp -> programmedSubs.remove(cp));
+            subs.forEach(e -> programmedSubs.remove(e.getKey(), e.getValue()));
         }
 
     }
+
+    private class InternalClusterListener implements ClusterEventListener {
+
+        @Override
+        public void event(ClusterEvent event) {
+            if (event.type() == ClusterEvent.Type.INSTANCE_READY) {
+                hasher.addServer(event.subject().id());
+            }
+            if (event.type() == ClusterEvent.Type.INSTANCE_DEACTIVATED) {
+                hasher.removeServer(event.subject().id());
+            }
+        }
+    }
 }
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index bf49483..af8ba4a 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -380,14 +380,6 @@
             return;
         }
 
-        if (!mastershipService.isLocalMaster(devId)) {
-            log.warn("The master of the device {} is another instance", devId);
-            if (filterFuture != null) {
-                filterFuture.complete(ObjectiveError.DEVICEMISSING);
-            }
-            return;
-        }
-
         BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
         if (bpInfo == null) {
             log.warn("Bandwidth profile {} is not found. Authentication flow"
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 825604c..7a3a3a3 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -15,13 +15,10 @@
  */
 package org.opencord.olt.impl;
 
-import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Multimaps;
-import com.google.common.collect.SetMultimap;
-import com.google.common.collect.Sets;
+import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.core.ApplicationId;
@@ -40,6 +37,10 @@
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
 import org.onosproject.net.meter.MeterService;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.ConsistentMultimap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.osgi.service.component.ComponentContext;
@@ -54,18 +55,21 @@
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Dictionary;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
+import static java.util.stream.Collectors.collectingAndThen;
+import static java.util.stream.Collectors.groupingBy;
+import static java.util.stream.Collectors.mapping;
+import static java.util.stream.Collectors.toSet;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS_DEFAULT;
@@ -88,11 +92,13 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService componentConfigService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
     protected boolean deleteMeters = true;
 
-    protected SetMultimap<String, MeterKey> bpInfoToMeter =
-            Multimaps.synchronizedSetMultimap(HashMultimap.create());
-    protected Set<MeterKey> programmedMeters;
+    ConsistentMultimap<String, MeterKey> bpInfoToMeter;
+
     private ApplicationId appId;
     private static final String APP_NAME = "org.opencord.olt";
 
@@ -107,7 +113,19 @@
         eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
                 "events-%d", log));
         appId = coreService.registerApplication(APP_NAME);
-        programmedMeters = Sets.newConcurrentHashSet();
+        modified(context);
+
+        KryoNamespace serializer = KryoNamespace.newBuilder()
+                .register(KryoNamespaces.API)
+                .register(MeterKey.class)
+                .build();
+
+        bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
+                .withName("volt-bp-info-to-meter")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
         meterService.addListener(meterListener);
         componentConfigService.registerProperties(getClass());
         log.info("Olt Meter service started");
@@ -131,23 +149,25 @@
 
     @Override
     public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
-        return ImmutableMap.copyOf(bpInfoToMeter.asMap());
+        return bpInfoToMeter.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
     }
 
-    @Override
-    public void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
+    void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
         bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
     }
 
     @Override
     public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
-        if (bpInfoToMeter.get(bandwidthProfile) == null) {
+        if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
             log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
                     bandwidthProfile);
             return null;
         }
 
-        Optional<MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile)
+        Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
                 .stream()
                 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
                 .findFirst();
@@ -164,7 +184,9 @@
 
     @Override
     public ImmutableSet<MeterKey> getProgMeters() {
-        return ImmutableSet.copyOf(programmedMeters);
+        return bpInfoToMeter.stream()
+                .map(Map.Entry::getValue)
+                .collect(ImmutableSet.toImmutableSet());
     }
 
     @Override
@@ -210,11 +232,19 @@
         Meter meter = meterService.submit(meterRequest);
         meterIdRef.set(meter.id());
         addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
-        programmedMeters.add(MeterKey.key(deviceId, meter.id()));
         log.info("Meter is created. Meter Id {}", meter.id());
         return meter.id();
     }
 
+    @Override
+    public void clearMeters(DeviceId deviceId) {
+        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+                .filter(e -> e.getValue().deviceId().equals(deviceId))
+                .collect(Collectors.toList());
+
+        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
+    }
+
     private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
         List<Band> meterBands = new ArrayList<>();
 
@@ -257,7 +287,6 @@
                 }
                 if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
                     log.info("Meter Removed Event is received for {}", meter.id());
-                    programmedMeters.remove(key);
                     pendingRemoveMeters.remove(key);
                     removeMeterFromBpMapping(key);
                 }
@@ -294,15 +323,11 @@
         }
 
         private void removeMeterFromBpMapping(MeterKey meterKey) {
-            Iterator<Map.Entry<String, MeterKey>> iterator = bpInfoToMeter.entries().iterator();
-            while (iterator.hasNext()) {
-                Map.Entry<String, MeterKey> entry = iterator.next();
-                if (entry.getValue().equals(meterKey)) {
-                    iterator.remove();
-                    log.info("Deleted meter for MeterKey {} - Last prog meters {}", meterKey, programmedMeters);
-                    break;
-                }
-            }
+            List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
+                    .filter(e -> e.getValue().equals(meterKey))
+                    .collect(Collectors.toList());
+
+            meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
         }
     }
 }
diff --git a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
index 4f07131..48570cd 100644
--- a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
+++ b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
@@ -39,16 +39,6 @@
     ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings();
 
     /**
-     * Adds a bandwidthProfile-meterKey (device / meter) mapping that have been programmed
-     * in the data plane.
-     *
-     * @param deviceId         the access device id
-     * @param meterId          the meter id that is mapped to the bandwidth profile
-     * @param bandwidthProfile the bandwidth profile id
-     */
-    void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile);
-
-    /**
      * Returns the meter id for a given bandwidth profile.
      *
      * @param deviceId         the access device id
@@ -77,5 +67,10 @@
     MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
                         CompletableFuture<Object> meterFuture);
 
-
+    /**
+     * Clears out bandwidth profile to meter mappings for the given device.
+     *
+     * @param deviceId device ID
+     */
+    void clearMeters(DeviceId deviceId);
 }