[VOL-4246] Feature parity with the previous implementation

Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 4b88344..be04449 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -1,5 +1,5 @@
 /*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
  *
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
@@ -13,47 +13,16 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package org.opencord.olt.impl;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toSet;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableMap;
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
 import org.onosproject.net.meter.DefaultMeterRequest;
@@ -66,12 +35,14 @@
 import org.onosproject.net.meter.MeterListener;
 import org.onosproject.net.meter.MeterRequest;
 import org.onosproject.net.meter.MeterService;
+import org.onosproject.net.meter.MeterState;
 import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
 import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -79,219 +50,386 @@
 import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-/**
- * Provisions Meters on access devices.
- */
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
 @Component(immediate = true, property = {
         DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
         ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
-        })
-public class OltMeterService implements AccessDeviceMeterService {
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MeterService meterService;
+})
+public class OltMeterService implements OltMeterServiceInterface {
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ComponentConfigService componentConfigService;
+    protected ComponentConfigService cfgService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected DeviceService deviceService;
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            bind = "bindSadisService",
+            unbind = "unbindSadisService",
+            policy = ReferencePolicy.DYNAMIC)
+    protected volatile SadisService sadisService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected ClusterService clusterService;
+    protected MeterService meterService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected MastershipService mastershipService;
+    protected OltDeviceServiceInterface oltDeviceService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected LeadershipService leadershipService;
+    private final Logger log = getLogger(getClass());
+    protected BaseInformationService<BandwidthProfileInformation> bpService;
+    private ApplicationId appId;
+    private static final String APP_NAME = "org.opencord.olt";
+    private final ReentrantReadWriteLock programmedMeterLock = new ReentrantReadWriteLock();
+    private final Lock programmedMeterWriteLock = programmedMeterLock.writeLock();
+    private final Lock programmedMeterReadLock = programmedMeterLock.readLock();
+
+    /**
+     * Programmed Meters status map.
+     * Keeps track of which meter is programmed on which device for which BandwidthProfile.
+     * The String key is the BandwidthProfile
+     */
+    protected Map<DeviceId, Map<String, MeterData>> programmedMeters;
+
+    private final MeterListener meterListener = new InternalMeterListener();
+    protected ExecutorService pendingRemovalMetersExecutor =
+            Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
+                                                           "pending-removal-meters-%d", log));
+
+    /**
+     * Map that contains a list of meters that needs to be removed.
+     * We wait to get 3 METER_REFERENCE_COUNT_ZERO events before removing the meter
+     * so that we're sure no flow is referencing it.
+     */
+    protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
+
+    /**
+     * Number of consecutive meter events with empty reference count
+     * after which a meter gets removed from the device.
+     */
+    protected int zeroReferenceMeterCount = 3;
 
     /**
      * Delete meters when reference count drops to zero.
      */
     protected boolean deleteMeters = DELETE_METERS_DEFAULT;
 
-    /**
-     * Number of Zero References received before deleting the meter.
-     */
-    protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
-
-    private ApplicationId appId;
-    private static final String APP_NAME = "org.opencord.olt";
-
-    private final MeterListener meterListener = new InternalMeterListener();
-
-    private final Logger log = getLogger(getClass());
-
-    protected ExecutorService eventExecutor;
-
-    protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
-    protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
-    protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
-
     @Activate
     public void activate(ComponentContext context) {
-        eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
-                "events-%d", log));
         appId = coreService.registerApplication(APP_NAME);
         modified(context);
-
         KryoNamespace serializer = KryoNamespace.newBuilder()
                 .register(KryoNamespaces.API)
+                .register(List.class)
+                .register(MeterData.class)
+                .register(MeterState.class)
                 .register(MeterKey.class)
-                .register(BandwidthProfileInformation.class)
                 .build();
 
-        bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
-                .withName("volt-bp-info-to-meter")
-                .withSerializer(Serializer.using(serializer))
-                .withApplicationId(appId)
-                .build();
-
-        meterService.addListener(meterListener);
-        componentConfigService.registerProperties(getClass());
-        pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
-                .withName("volt-pending-meters")
+        programmedMeters = storageService.<DeviceId, Map<String, MeterData>>consistentMapBuilder()
+                .withName("volt-programmed-meters")
                 .withSerializer(Serializer.using(serializer))
                 .withApplicationId(appId)
                 .build().asJavaMap();
+
         pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
                 .withName("volt-pending-remove-meters")
                 .withSerializer(Serializer.using(serializer))
                 .withApplicationId(appId)
                 .build().asJavaMap();
-        log.info("Olt Meter service started");
-    }
 
-    @Deactivate
-    public void deactivate() {
-        meterService.removeListener(meterListener);
-    }
+        cfgService.registerProperties(getClass());
 
+        meterService.addListener(meterListener);
+
+        log.info("Started");
+    }
 
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
 
-        Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+        Boolean d = Tools.isPropertyEnabled(properties, DELETE_METERS);
         if (d != null) {
             deleteMeters = d;
         }
 
-        String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
-        zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
-                Integer.parseInt(zeroReferenceMeterCountNew.trim());
+        String zeroCount = get(properties, ZERO_REFERENCE_METER_COUNT);
+        int oldSubscriberProcessingThreads = zeroReferenceMeterCount;
+        zeroReferenceMeterCount = isNullOrEmpty(zeroCount) ?
+                oldSubscriberProcessingThreads : Integer.parseInt(zeroCount.trim());
 
+        log.info("Modified. Values = deleteMeters: {}, zeroReferenceMeterCount: {}",
+                 deleteMeters, zeroReferenceMeterCount);
+    }
+
+    @Deactivate
+    public void deactivate(ComponentContext context) {
+        cfgService.unregisterProperties(getClass(), false);
+        meterService.removeListener(meterListener);
+        log.info("Stopped");
     }
 
     @Override
-    public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
-        return bpInfoToMeter.stream()
-                .collect(collectingAndThen(
-                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
-                        ImmutableMap::copyOf));
+    public Map<DeviceId, Map<String, MeterData>> getProgrammedMeters() {
+        try {
+            programmedMeterReadLock.lock();
+            return ImmutableMap.copyOf(programmedMeters);
+        } finally {
+            programmedMeterReadLock.unlock();
+        }
     }
 
-    boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
-        log.debug("adding bp {} to meter {} mapping for device {}",
-                 bandwidthProfile, meterId, deviceId);
-        return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
+    /**
+     * Will create a meter if needed and return true once available.
+     *
+     * @param deviceId         DeviceId
+     * @param bandwidthProfile Bandwidth Profile Id
+     * @return true
+     */
+    @Override
+    public synchronized boolean createMeter(DeviceId deviceId, String bandwidthProfile) {
+
+        // NOTE it is possible that hasMeterByBandwidthProfile returns false has the meter is in PENDING_ADD
+        // then a different thread changes the meter to ADDED
+        // and thus hasPendingMeterByBandwidthProfile return false as well and we install the meter a second time
+        // this causes an inconsistency between the existing meter and meterId stored in the map
+
+        if (!hasMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+            // NOTE this is at trace level as it's constantly called by the queue processor
+            if (log.isTraceEnabled()) {
+                log.trace("Missing meter for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+            }
+
+            if (!hasPendingMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+                createMeterForBp(deviceId, bandwidthProfile);
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("Meter is not yet available for {} on device {}",
+                          bandwidthProfile, deviceId);
+            }
+            return false;
+        }
+        log.debug("Meter found for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+        return true;
     }
 
     @Override
-    public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
-        if (bandwidthProfile == null) {
-            log.warn("Bandwidth Profile requested is null");
-            return null;
+    public boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si) {
+        // Each UniTagInformation has up to 4 meters,
+        // check and/or create all of them
+        AtomicBoolean waitingOnMeter = new AtomicBoolean();
+        waitingOnMeter.set(false);
+        Map<String, List<String>> pendingMeters = new HashMap<>();
+        si.uniTagList().forEach(uniTagInfo -> {
+            String serviceName = uniTagInfo.getServiceName();
+            pendingMeters.put(serviceName, new LinkedList<>());
+            String usBp = uniTagInfo.getUpstreamBandwidthProfile();
+            String dsBp = uniTagInfo.getDownstreamBandwidthProfile();
+            String oltUBp = uniTagInfo.getDownstreamOltBandwidthProfile();
+            String oltDsBp = uniTagInfo.getUpstreamOltBandwidthProfile();
+            if (!createMeter(deviceId, usBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+            if (!createMeter(deviceId, dsBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+            if (!createMeter(deviceId, oltUBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+            if (!createMeter(deviceId, oltDsBp)) {
+                pendingMeters.get(serviceName).add(usBp);
+                waitingOnMeter.set(true);
+            }
+        });
+        if (waitingOnMeter.get()) {
+            if (log.isTraceEnabled()) {
+                log.trace("Meters {} on device {} are not " +
+                                  "installed yet (requested by subscriber {})",
+                          pendingMeters, deviceId, si.id());
+            }
+            return false;
         }
-        if (bpInfoToMeter.get(bandwidthProfile) == null) {
-            log.warn("Bandwidth Profile '{}' is not present in the map",
-                     bandwidthProfile);
-            return null;
-        }
-        if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
-            log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
-                    bandwidthProfile);
-            return null;
-        }
+        return true;
+    }
 
-        Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
-                .stream()
-                .filter(meterKey -> meterKey.deviceId().equals(deviceId))
-                .findFirst();
-        if (meterKeyForDevice.isPresent()) {
-            log.debug("Found meter {} for bandwidth profile {} on {}",
-                    meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
-            return meterKeyForDevice.get().meterId();
-        } else {
-            log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
-                     bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
-            return null;
+    /**
+     * Returns true if a meter is present in the programmed meters map, only if status is ADDED.
+     *
+     * @param deviceId         the DeviceId on which to look for the meter
+     * @param bandwidthProfile the Bandwidth profile associated with this meter
+     * @return true if the meter is found
+     */
+    public boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+        try {
+            programmedMeterReadLock.lock();
+            Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+            if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+                return false;
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("added metersOnDevice {}: {}", deviceId, metersOnDevice);
+            }
+            return metersOnDevice.get(bandwidthProfile) != null &&
+                    metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED);
+        } finally {
+            programmedMeterReadLock.unlock();
+        }
+    }
+
+    public boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+        try {
+            programmedMeterReadLock.lock();
+            Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+            if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+                return false;
+            }
+            if (log.isTraceEnabled()) {
+                log.trace("pending metersOnDevice {}: {}", deviceId, metersOnDevice);
+            }
+            // NOTE that we check in order if the meter was ADDED and if it wasn't we check for PENDING_ADD
+            // it is possible that a different thread move the meter state from PENDING_ADD
+            // to ADDED between these two checks
+            // to avoid creating the meter twice we return true event if the meter is already added
+            return metersOnDevice.get(bandwidthProfile) != null && (
+                    metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED) ||
+                            metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.PENDING_ADD)
+            );
+
+        } finally {
+            programmedMeterReadLock.unlock();
+        }
+    }
+
+    public MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+        try {
+            programmedMeterReadLock.lock();
+            Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+            if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+                return null;
+            }
+            MeterData meterData = metersOnDevice.get(bandwidthProfile);
+            if (meterData == null || meterData.getMeterStatus() != MeterState.ADDED) {
+                return null;
+            }
+            if (log.isTraceEnabled()) {
+                log.debug("Found meter {} on device {} for bandwidth profile {}",
+                        meterData.getMeterId(), deviceId, bandwidthProfile);
+            }
+            return meterData.getMeterId();
+        } finally {
+            programmedMeterReadLock.unlock();
         }
     }
 
     @Override
-    public ImmutableSet<MeterKey> getProgMeters() {
-        return bpInfoToMeter.stream()
-                .map(Map.Entry::getValue)
-                .collect(ImmutableSet.toImmutableSet());
+    public void purgeDeviceMeters(DeviceId deviceId) {
+        log.debug("Purging meters on device {}", deviceId);
+        meterService.purgeMeters(deviceId);
+
+        // after we purge the meters we also need to clear the map
+        try {
+            programmedMeterWriteLock.lock();
+            programmedMeters.remove(deviceId);
+        } finally {
+            programmedMeterWriteLock.unlock();
+        }
+
+        // and clear the event count
+        // NOTE do we need a lock?
+        pendingRemoveMeters.remove(deviceId);
     }
 
-    @Override
-    public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
-                               CompletableFuture<Object> meterFuture) {
-        log.debug("Creating meter on {} for {}", deviceId, bpInfo);
+    /**
+     * Creates of a meter for a given Bandwidth Profile on a given device.
+     *
+     * @param deviceId         the DeviceId
+     * @param bandwidthProfile the BandwidthProfile ID
+     */
+    public void createMeterForBp(DeviceId deviceId, String bandwidthProfile) {
+        // adding meter in pending state to the programmedMeter map
+        try {
+            programmedMeterWriteLock.lock();
+            programmedMeters.compute(deviceId, (d, deviceMeters) -> {
+
+                if (deviceMeters == null) {
+                    deviceMeters = new HashMap<>();
+                }
+                // NOTE that this method is only called after verifying a
+                // meter for this BP does not already exist
+                MeterData meterData = new MeterData(
+                        null,
+                        MeterState.PENDING_ADD,
+                        bandwidthProfile
+                );
+                deviceMeters.put(bandwidthProfile, meterData);
+
+                return deviceMeters;
+            });
+        } finally {
+            programmedMeterWriteLock.unlock();
+        }
+
+        BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bandwidthProfile);
         if (bpInfo == null) {
-            log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
-            meterFuture.complete(ObjectiveError.BADPARAMS);
-            return null;
+            log.error("BandwidthProfile {} information not found in sadis", bandwidthProfile);
+            return;
         }
 
-        MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
-        if (meterId != null) {
-            log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
-            meterFuture.complete(null);
-            return meterId;
+        log.info("Creating meter for BandwidthProfile {} on device {}", bpInfo.id(), deviceId);
+
+        if (log.isTraceEnabled()) {
+            log.trace("BandwidthProfile: {}", bpInfo);
         }
 
         List<Band> meterBands = createMeterBands(bpInfo);
 
-        final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
+        CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
         MeterRequest meterRequest = DefaultMeterRequest.builder()
                 .withBands(meterBands)
                 .withUnit(Meter.Unit.KB_PER_SEC)
                 .withContext(new MeterContext() {
                     @Override
                     public void onSuccess(MeterRequest op) {
-                        log.debug("Meter {} for {} is installed on the device {}",
-                                  meterIdRef.get(), bpInfo.id(), deviceId);
-                        boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
-                        if (added) {
-                            meterFuture.complete(null);
-                        } else {
-                            log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
-                                      meterIdRef.get(), bpInfo.id(), deviceId);
-                            meterFuture.complete(ObjectiveError.UNKNOWN);
-                        }
+                        log.info("Meter for BandwidthProfile {} is installed on the device {}",
+                                 bandwidthProfile, deviceId);
+                        meterFuture.complete(null);
                     }
 
                     @Override
                     public void onError(MeterRequest op, MeterFailReason reason) {
-                        log.error("Failed installing meter {} on {} for {}",
-                                  meterIdRef.get(), deviceId, bpInfo.id());
-                        bpInfoToMeter.remove(bpInfo.id(),
-                                             MeterKey.key(deviceId, meterIdRef.get()));
+                        log.error("Failed installing meter on {} for {}",
+                                  deviceId, bandwidthProfile);
                         meterFuture.complete(reason);
                     }
                 })
@@ -300,63 +438,34 @@
                 .burst()
                 .add();
 
+        // creating the meter
         Meter meter = meterService.submit(meterRequest);
-        meterIdRef.set(meter.id());
-        log.info("Meter {} created and sent for installation on {} for {}",
-                 meter.id(), deviceId, bpInfo);
-        return meter.id();
-    }
 
-    @Override
-    public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        if (deviceId == null) {
-            return;
-        }
-        pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
-            bwps.remove(bwpInfo);
-            return bwps;
-        });
-    }
-
-    @Override
-    public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        if (bwpInfo == null) {
-            log.debug("Bandwidth profile is null for device: {}", deviceId);
-            return false;
-        }
-        if (pendingMeters.containsKey(deviceId)
-                && pendingMeters.get(deviceId).contains(bwpInfo)) {
-            log.debug("Meter is already pending on {} with bp {}",
-                      deviceId, bwpInfo);
-            return false;
-        }
-        log.debug("Adding bandwidth profile {} to pending on {}",
-                  bwpInfo, deviceId);
-        pendingMeters.compute(deviceId, (id, bwps) -> {
-            if (bwps == null) {
-                bwps = new HashSet<>();
+        // wait for the meter to be completed
+        meterFuture.thenAccept(error -> {
+            if (error != null) {
+                log.error("Cannot create meter, TODO address me");
             }
-            bwps.add(bwpInfo);
-            return bwps;
+
+            // then update the map with the MeterId
+            try {
+                programmedMeterWriteLock.lock();
+                programmedMeters.compute(deviceId, (d, entry) -> {
+                    if (entry != null) {
+                        entry.compute(bandwidthProfile, (bp, meterData) -> {
+                            if (meterData != null) {
+                                meterData.setMeterCellId(meter.meterCellId());
+                                meterData.setMeterStatus(MeterState.ADDED);
+                            }
+                            return meterData;
+                        });
+                    }
+                    return entry;
+                });
+            } finally {
+                programmedMeterWriteLock.unlock();
+            }
         });
-
-        return true;
-    }
-
-    @Override
-    public void clearMeters(DeviceId deviceId) {
-        log.debug("Removing all meters for device {}", deviceId);
-        clearDeviceState(deviceId);
-        meterService.purgeMeters(deviceId);
-    }
-
-    @Override
-    public void clearDeviceState(DeviceId deviceId) {
-        log.info("Clearing local device state for {}", deviceId);
-        pendingRemoveMeters.remove(deviceId);
-        removeMetersFromBpMapping(deviceId);
-        //Following call handles cornercase of OLT delete during meter provisioning
-        pendingMeters.remove(deviceId);
     }
 
     private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
@@ -408,119 +517,137 @@
                 .build();
     }
 
-    private void removeMeterFromBpMapping(MeterKey meterKey) {
-        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
-                .filter(e -> e.getValue().equals(meterKey))
-                .collect(Collectors.toList());
-
-        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
-    }
-
-    private void removeMetersFromBpMapping(DeviceId deviceId) {
-        List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
-                .filter(e -> e.getValue().deviceId().equals(deviceId))
-                .collect(Collectors.toList());
-
-        meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
-    }
-
-    /**
-     * Checks for mastership or falls back to leadership on deviceId.
-     * If the device is available use mastership,
-     * otherwise fallback on leadership.
-     * Leadership on the device topic is needed because the master can be NONE
-     * in case the device went away, we still need to handle events
-     * consistently
-     */
-    private boolean isLocalLeader(DeviceId deviceId) {
-        if (deviceService.isAvailable(deviceId)) {
-            return mastershipService.isLocalMaster(deviceId);
-        } else {
-            // Fallback with Leadership service - device id is used as topic
-            NodeId leader = leadershipService.runForLeadership(
-                    deviceId.toString()).leaderNodeId();
-            // Verify if this node is the leader
-            return clusterService.getLocalNode().id().equals(leader);
+    private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+        if (!checkSadisRunning()) {
+            return null;
         }
+        if (bandwidthProfile == null) {
+            return null;
+        }
+        return bpService.get(bandwidthProfile);
+    }
+
+    private boolean checkSadisRunning() {
+        if (bpService == null) {
+            log.warn("Sadis is not running");
+            return false;
+        }
+        return true;
     }
 
     private class InternalMeterListener implements MeterListener {
-
         @Override
         public void event(MeterEvent meterEvent) {
-            eventExecutor.execute(() -> {
+            pendingRemovalMetersExecutor.execute(() -> {
+
                 Meter meter = meterEvent.subject();
-                if (meter == null) {
-                    log.error("Meter in event {} is null", meterEvent);
+                if (!appId.equals(meter.appId())) {
                     return;
                 }
-                if (isLocalLeader(meter.deviceId())) {
-                    MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
-                    if (deleteMeters &&
-                            MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
-                        log.info("Zero Count Meter Event is received. Meter is {} on {}",
-                                 meter.id(), meter.deviceId());
-                        incrementMeterCount(meter.deviceId(), key);
 
-                        if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
-                                .get(key).get() == zeroReferenceMeterCount) {
-                            log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
-                                     meter.id(), meter.deviceId());
+                if (log.isTraceEnabled()) {
+                    log.trace("Received meter event {}", meterEvent);
+                }
+                MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+                if (meterEvent.type().equals(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO)) {
+                    if (!oltDeviceService.isLocalLeader(meter.deviceId())) {
+                        if (log.isTraceEnabled()) {
+                            log.trace("ignoring meter event {} " +
+                                    "as not leader for {}", meterEvent, meter.deviceId());
+                        }
+                        return;
+                    }
+                    log.info("Zero Count Reference event is received for meter {} on {}, " +
+                                     "incrementing counter",
+                             meter.id(), meter.deviceId());
+                    incrementMeterCount(meter.deviceId(), key);
+                    if (pendingRemoveMeters.get(meter.deviceId())
+                            .get(key).get() == zeroReferenceMeterCount) {
+                        // only delete the meters if the app is configured to do so
+                        if (deleteMeters) {
+                            log.info("Meter {} on device {} is unused, removing it", meter.id(), meter.deviceId());
                             deleteMeter(meter.deviceId(), meter.id());
                         }
                     }
-                    if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
-                        log.info("Meter Removed Event is received for {} on {}",
-                                 meter.id(), meter.deviceId());
-                        pendingRemoveMeters.computeIfPresent(meter.deviceId(),
-                                                             (id, meters) -> {
-                                                                 if (meters.get(key) == null) {
-                                                                     log.info("Meters is not pending " +
-                                                                                      "{} on {}", key, id);
-                                                                     return meters;
-                                                                 }
-                                                                 meters.remove(key);
-                                                                 return meters;
-                                                             });
-                        removeMeterFromBpMapping(key);
-                    }
-                } else {
-                    log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
+                }
+
+                if (meterEvent.type().equals(MeterEvent.Type.METER_REMOVED)) {
+                    removeMeterCount(meter, key);
                 }
             });
         }
 
+        private void removeMeterCount(Meter meter, MeterKey key) {
+            pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+                                                 (id, meters) -> {
+                                                     if (meters.get(key) == null) {
+                                                         log.info("Meters is not pending " +
+                                                                          "{} on {}", key, id);
+                                                         return meters;
+                                                     }
+                                                     meters.remove(key);
+                                                     return meters;
+                                                 });
+        }
+
         private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
             if (key == null) {
                 return;
             }
             pendingRemoveMeters.compute(deviceId,
-                    (id, meters) -> {
-                        if (meters == null) {
-                            meters = new HashMap<>();
+                                        (id, meters) -> {
+                                            if (meters == null) {
+                                                meters = new HashMap<>();
 
-                        }
-                        if (meters.get(key) == null) {
-                            meters.put(key, new AtomicInteger(1));
-                        }
-                        meters.get(key).addAndGet(1);
-                        return meters;
-                    });
+                                            }
+                                            if (meters.get(key) == null) {
+                                                meters.put(key, new AtomicInteger(1));
+                                            }
+                                            meters.get(key).addAndGet(1);
+                                            return meters;
+                                        });
+        }
+    }
+
+    private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+        Meter meter = meterService.getMeter(deviceId, meterId);
+        if (meter != null) {
+            MeterRequest meterRequest = DefaultMeterRequest.builder()
+                    .withBands(meter.bands())
+                    .withUnit(meter.unit())
+                    .forDevice(deviceId)
+                    .fromApp(appId)
+                    .burst()
+                    .remove();
+
+            meterService.withdraw(meterRequest, meterId);
         }
 
-        private void deleteMeter(DeviceId deviceId, MeterId meterId) {
-            Meter meter = meterService.getMeter(deviceId, meterId);
-            if (meter != null) {
-                MeterRequest meterRequest = DefaultMeterRequest.builder()
-                        .withBands(meter.bands())
-                        .withUnit(meter.unit())
-                        .forDevice(deviceId)
-                        .fromApp(appId)
-                        .burst()
-                        .remove();
-
-                meterService.withdraw(meterRequest, meterId);
-            }
+        // remove the meter from local caching
+        try {
+            programmedMeterWriteLock.lock();
+            programmedMeters.computeIfPresent(deviceId, (d, deviceMeters) -> {
+                Iterator<Map.Entry<String, MeterData>> iter = deviceMeters.entrySet().iterator();
+                while (iter.hasNext()) {
+                    Map.Entry<String, MeterData> entry = iter.next();
+                    if (entry.getValue().getMeterId().equals(meterId)) {
+                        deviceMeters.remove(entry.getKey());
+                    }
+                }
+                return deviceMeters;
+            });
+        } finally {
+            programmedMeterWriteLock.unlock();
         }
     }
+
+    protected void bindSadisService(SadisService service) {
+        this.bpService = service.getBandwidthProfileService();
+        log.info("Sadis service is loaded");
+    }
+
+    protected void unbindSadisService(SadisService service) {
+        this.bpService = null;
+        log.info("Sadis service is unloaded");
+    }
 }