[VOL-4246] Feature parity with the previous implementation
Change-Id: I3741edb3c1b88b1cf8b5e6d4ff0900132e2e5e6a
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index 4b88344..be04449 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2016-present Open Networking Foundation
+ * Copyright 2021-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,47 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.opencord.olt.impl;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toSet;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.stream.Collectors;
-
+import com.google.common.collect.ImmutableMap;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
-import org.onosproject.cluster.ClusterService;
-import org.onosproject.cluster.LeadershipService;
-import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
-import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
-import org.onosproject.net.device.DeviceService;
-import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeterRequest;
@@ -66,12 +35,14 @@
import org.onosproject.net.meter.MeterListener;
import org.onosproject.net.meter.MeterRequest;
import org.onosproject.net.meter.MeterService;
+import org.onosproject.net.meter.MeterState;
import org.onosproject.store.serializers.KryoNamespaces;
-import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
-import org.opencord.olt.internalapi.AccessDeviceMeterService;
import org.opencord.sadis.BandwidthProfileInformation;
+import org.opencord.sadis.BaseInformationService;
+import org.opencord.sadis.SadisService;
+import org.opencord.sadis.SubscriberAndDeviceInformation;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -79,219 +50,386 @@
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
+import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
-/**
- * Provisions Meters on access devices.
- */
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
+
@Component(immediate = true, property = {
DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
- })
-public class OltMeterService implements AccessDeviceMeterService {
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MeterService meterService;
+})
+public class OltMeterService implements OltMeterServiceInterface {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ComponentConfigService componentConfigService;
+ protected ComponentConfigService cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected DeviceService deviceService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+ bind = "bindSadisService",
+ unbind = "unbindSadisService",
+ policy = ReferencePolicy.DYNAMIC)
+ protected volatile SadisService sadisService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected ClusterService clusterService;
+ protected MeterService meterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected MastershipService mastershipService;
+ protected OltDeviceServiceInterface oltDeviceService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected LeadershipService leadershipService;
+ private final Logger log = getLogger(getClass());
+ protected BaseInformationService<BandwidthProfileInformation> bpService;
+ private ApplicationId appId;
+ private static final String APP_NAME = "org.opencord.olt";
+ private final ReentrantReadWriteLock programmedMeterLock = new ReentrantReadWriteLock();
+ private final Lock programmedMeterWriteLock = programmedMeterLock.writeLock();
+ private final Lock programmedMeterReadLock = programmedMeterLock.readLock();
+
+ /**
+ * Programmed Meters status map.
+ * Keeps track of which meter is programmed on which device for which BandwidthProfile.
+ * The String key is the BandwidthProfile
+ */
+ protected Map<DeviceId, Map<String, MeterData>> programmedMeters;
+
+ private final MeterListener meterListener = new InternalMeterListener();
+ protected ExecutorService pendingRemovalMetersExecutor =
+ Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
+ "pending-removal-meters-%d", log));
+
+ /**
+ * Map that contains a list of meters that needs to be removed.
+ * We wait to get 3 METER_REFERENCE_COUNT_ZERO events before removing the meter
+ * so that we're sure no flow is referencing it.
+ */
+ protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
+
+ /**
+ * Number of consecutive meter events with empty reference count
+ * after which a meter gets removed from the device.
+ */
+ protected int zeroReferenceMeterCount = 3;
/**
* Delete meters when reference count drops to zero.
*/
protected boolean deleteMeters = DELETE_METERS_DEFAULT;
- /**
- * Number of Zero References received before deleting the meter.
- */
- protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
-
- private ApplicationId appId;
- private static final String APP_NAME = "org.opencord.olt";
-
- private final MeterListener meterListener = new InternalMeterListener();
-
- private final Logger log = getLogger(getClass());
-
- protected ExecutorService eventExecutor;
-
- protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
- protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
- protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
-
@Activate
public void activate(ComponentContext context) {
- eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
- "events-%d", log));
appId = coreService.registerApplication(APP_NAME);
modified(context);
-
KryoNamespace serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
+ .register(List.class)
+ .register(MeterData.class)
+ .register(MeterState.class)
.register(MeterKey.class)
- .register(BandwidthProfileInformation.class)
.build();
- bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
- .withName("volt-bp-info-to-meter")
- .withSerializer(Serializer.using(serializer))
- .withApplicationId(appId)
- .build();
-
- meterService.addListener(meterListener);
- componentConfigService.registerProperties(getClass());
- pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
- .withName("volt-pending-meters")
+ programmedMeters = storageService.<DeviceId, Map<String, MeterData>>consistentMapBuilder()
+ .withName("volt-programmed-meters")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build().asJavaMap();
+
pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
.withName("volt-pending-remove-meters")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build().asJavaMap();
- log.info("Olt Meter service started");
- }
- @Deactivate
- public void deactivate() {
- meterService.removeListener(meterListener);
- }
+ cfgService.registerProperties(getClass());
+ meterService.addListener(meterListener);
+
+ log.info("Started");
+ }
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
- Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+ Boolean d = Tools.isPropertyEnabled(properties, DELETE_METERS);
if (d != null) {
deleteMeters = d;
}
- String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
- zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
- Integer.parseInt(zeroReferenceMeterCountNew.trim());
+ String zeroCount = get(properties, ZERO_REFERENCE_METER_COUNT);
+ int oldSubscriberProcessingThreads = zeroReferenceMeterCount;
+ zeroReferenceMeterCount = isNullOrEmpty(zeroCount) ?
+ oldSubscriberProcessingThreads : Integer.parseInt(zeroCount.trim());
+ log.info("Modified. Values = deleteMeters: {}, zeroReferenceMeterCount: {}",
+ deleteMeters, zeroReferenceMeterCount);
+ }
+
+ @Deactivate
+ public void deactivate(ComponentContext context) {
+ cfgService.unregisterProperties(getClass(), false);
+ meterService.removeListener(meterListener);
+ log.info("Stopped");
}
@Override
- public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
- return bpInfoToMeter.stream()
- .collect(collectingAndThen(
- groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
- ImmutableMap::copyOf));
+ public Map<DeviceId, Map<String, MeterData>> getProgrammedMeters() {
+ try {
+ programmedMeterReadLock.lock();
+ return ImmutableMap.copyOf(programmedMeters);
+ } finally {
+ programmedMeterReadLock.unlock();
+ }
}
- boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
- log.debug("adding bp {} to meter {} mapping for device {}",
- bandwidthProfile, meterId, deviceId);
- return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
+ /**
+ * Will create a meter if needed and return true once available.
+ *
+ * @param deviceId DeviceId
+ * @param bandwidthProfile Bandwidth Profile Id
+ * @return true
+ */
+ @Override
+ public synchronized boolean createMeter(DeviceId deviceId, String bandwidthProfile) {
+
+ // NOTE it is possible that hasMeterByBandwidthProfile returns false has the meter is in PENDING_ADD
+ // then a different thread changes the meter to ADDED
+ // and thus hasPendingMeterByBandwidthProfile return false as well and we install the meter a second time
+ // this causes an inconsistency between the existing meter and meterId stored in the map
+
+ if (!hasMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+ // NOTE this is at trace level as it's constantly called by the queue processor
+ if (log.isTraceEnabled()) {
+ log.trace("Missing meter for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+ }
+
+ if (!hasPendingMeterByBandwidthProfile(deviceId, bandwidthProfile)) {
+ createMeterForBp(deviceId, bandwidthProfile);
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("Meter is not yet available for {} on device {}",
+ bandwidthProfile, deviceId);
+ }
+ return false;
+ }
+ log.debug("Meter found for BandwidthProfile {} on device {}", bandwidthProfile, deviceId);
+ return true;
}
@Override
- public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
- if (bandwidthProfile == null) {
- log.warn("Bandwidth Profile requested is null");
- return null;
+ public boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si) {
+ // Each UniTagInformation has up to 4 meters,
+ // check and/or create all of them
+ AtomicBoolean waitingOnMeter = new AtomicBoolean();
+ waitingOnMeter.set(false);
+ Map<String, List<String>> pendingMeters = new HashMap<>();
+ si.uniTagList().forEach(uniTagInfo -> {
+ String serviceName = uniTagInfo.getServiceName();
+ pendingMeters.put(serviceName, new LinkedList<>());
+ String usBp = uniTagInfo.getUpstreamBandwidthProfile();
+ String dsBp = uniTagInfo.getDownstreamBandwidthProfile();
+ String oltUBp = uniTagInfo.getDownstreamOltBandwidthProfile();
+ String oltDsBp = uniTagInfo.getUpstreamOltBandwidthProfile();
+ if (!createMeter(deviceId, usBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ if (!createMeter(deviceId, dsBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ if (!createMeter(deviceId, oltUBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ if (!createMeter(deviceId, oltDsBp)) {
+ pendingMeters.get(serviceName).add(usBp);
+ waitingOnMeter.set(true);
+ }
+ });
+ if (waitingOnMeter.get()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Meters {} on device {} are not " +
+ "installed yet (requested by subscriber {})",
+ pendingMeters, deviceId, si.id());
+ }
+ return false;
}
- if (bpInfoToMeter.get(bandwidthProfile) == null) {
- log.warn("Bandwidth Profile '{}' is not present in the map",
- bandwidthProfile);
- return null;
- }
- if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
- log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
- bandwidthProfile);
- return null;
- }
+ return true;
+ }
- Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
- .stream()
- .filter(meterKey -> meterKey.deviceId().equals(deviceId))
- .findFirst();
- if (meterKeyForDevice.isPresent()) {
- log.debug("Found meter {} for bandwidth profile {} on {}",
- meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
- return meterKeyForDevice.get().meterId();
- } else {
- log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
- bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
- return null;
+ /**
+ * Returns true if a meter is present in the programmed meters map, only if status is ADDED.
+ *
+ * @param deviceId the DeviceId on which to look for the meter
+ * @param bandwidthProfile the Bandwidth profile associated with this meter
+ * @return true if the meter is found
+ */
+ public boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+ try {
+ programmedMeterReadLock.lock();
+ Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+ if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+ return false;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("added metersOnDevice {}: {}", deviceId, metersOnDevice);
+ }
+ return metersOnDevice.get(bandwidthProfile) != null &&
+ metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED);
+ } finally {
+ programmedMeterReadLock.unlock();
+ }
+ }
+
+ public boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+ try {
+ programmedMeterReadLock.lock();
+ Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+ if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+ return false;
+ }
+ if (log.isTraceEnabled()) {
+ log.trace("pending metersOnDevice {}: {}", deviceId, metersOnDevice);
+ }
+ // NOTE that we check in order if the meter was ADDED and if it wasn't we check for PENDING_ADD
+ // it is possible that a different thread move the meter state from PENDING_ADD
+ // to ADDED between these two checks
+ // to avoid creating the meter twice we return true event if the meter is already added
+ return metersOnDevice.get(bandwidthProfile) != null && (
+ metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED) ||
+ metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.PENDING_ADD)
+ );
+
+ } finally {
+ programmedMeterReadLock.unlock();
+ }
+ }
+
+ public MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfile) {
+ try {
+ programmedMeterReadLock.lock();
+ Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId);
+ if (metersOnDevice == null || metersOnDevice.isEmpty()) {
+ return null;
+ }
+ MeterData meterData = metersOnDevice.get(bandwidthProfile);
+ if (meterData == null || meterData.getMeterStatus() != MeterState.ADDED) {
+ return null;
+ }
+ if (log.isTraceEnabled()) {
+ log.debug("Found meter {} on device {} for bandwidth profile {}",
+ meterData.getMeterId(), deviceId, bandwidthProfile);
+ }
+ return meterData.getMeterId();
+ } finally {
+ programmedMeterReadLock.unlock();
}
}
@Override
- public ImmutableSet<MeterKey> getProgMeters() {
- return bpInfoToMeter.stream()
- .map(Map.Entry::getValue)
- .collect(ImmutableSet.toImmutableSet());
+ public void purgeDeviceMeters(DeviceId deviceId) {
+ log.debug("Purging meters on device {}", deviceId);
+ meterService.purgeMeters(deviceId);
+
+ // after we purge the meters we also need to clear the map
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.remove(deviceId);
+ } finally {
+ programmedMeterWriteLock.unlock();
+ }
+
+ // and clear the event count
+ // NOTE do we need a lock?
+ pendingRemoveMeters.remove(deviceId);
}
- @Override
- public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
- CompletableFuture<Object> meterFuture) {
- log.debug("Creating meter on {} for {}", deviceId, bpInfo);
+ /**
+ * Creates of a meter for a given Bandwidth Profile on a given device.
+ *
+ * @param deviceId the DeviceId
+ * @param bandwidthProfile the BandwidthProfile ID
+ */
+ public void createMeterForBp(DeviceId deviceId, String bandwidthProfile) {
+ // adding meter in pending state to the programmedMeter map
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.compute(deviceId, (d, deviceMeters) -> {
+
+ if (deviceMeters == null) {
+ deviceMeters = new HashMap<>();
+ }
+ // NOTE that this method is only called after verifying a
+ // meter for this BP does not already exist
+ MeterData meterData = new MeterData(
+ null,
+ MeterState.PENDING_ADD,
+ bandwidthProfile
+ );
+ deviceMeters.put(bandwidthProfile, meterData);
+
+ return deviceMeters;
+ });
+ } finally {
+ programmedMeterWriteLock.unlock();
+ }
+
+ BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bandwidthProfile);
if (bpInfo == null) {
- log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
- meterFuture.complete(ObjectiveError.BADPARAMS);
- return null;
+ log.error("BandwidthProfile {} information not found in sadis", bandwidthProfile);
+ return;
}
- MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
- if (meterId != null) {
- log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
- meterFuture.complete(null);
- return meterId;
+ log.info("Creating meter for BandwidthProfile {} on device {}", bpInfo.id(), deviceId);
+
+ if (log.isTraceEnabled()) {
+ log.trace("BandwidthProfile: {}", bpInfo);
}
List<Band> meterBands = createMeterBands(bpInfo);
- final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
MeterRequest meterRequest = DefaultMeterRequest.builder()
.withBands(meterBands)
.withUnit(Meter.Unit.KB_PER_SEC)
.withContext(new MeterContext() {
@Override
public void onSuccess(MeterRequest op) {
- log.debug("Meter {} for {} is installed on the device {}",
- meterIdRef.get(), bpInfo.id(), deviceId);
- boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
- if (added) {
- meterFuture.complete(null);
- } else {
- log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
- meterIdRef.get(), bpInfo.id(), deviceId);
- meterFuture.complete(ObjectiveError.UNKNOWN);
- }
+ log.info("Meter for BandwidthProfile {} is installed on the device {}",
+ bandwidthProfile, deviceId);
+ meterFuture.complete(null);
}
@Override
public void onError(MeterRequest op, MeterFailReason reason) {
- log.error("Failed installing meter {} on {} for {}",
- meterIdRef.get(), deviceId, bpInfo.id());
- bpInfoToMeter.remove(bpInfo.id(),
- MeterKey.key(deviceId, meterIdRef.get()));
+ log.error("Failed installing meter on {} for {}",
+ deviceId, bandwidthProfile);
meterFuture.complete(reason);
}
})
@@ -300,63 +438,34 @@
.burst()
.add();
+ // creating the meter
Meter meter = meterService.submit(meterRequest);
- meterIdRef.set(meter.id());
- log.info("Meter {} created and sent for installation on {} for {}",
- meter.id(), deviceId, bpInfo);
- return meter.id();
- }
- @Override
- public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- if (deviceId == null) {
- return;
- }
- pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
- bwps.remove(bwpInfo);
- return bwps;
- });
- }
-
- @Override
- public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- if (bwpInfo == null) {
- log.debug("Bandwidth profile is null for device: {}", deviceId);
- return false;
- }
- if (pendingMeters.containsKey(deviceId)
- && pendingMeters.get(deviceId).contains(bwpInfo)) {
- log.debug("Meter is already pending on {} with bp {}",
- deviceId, bwpInfo);
- return false;
- }
- log.debug("Adding bandwidth profile {} to pending on {}",
- bwpInfo, deviceId);
- pendingMeters.compute(deviceId, (id, bwps) -> {
- if (bwps == null) {
- bwps = new HashSet<>();
+ // wait for the meter to be completed
+ meterFuture.thenAccept(error -> {
+ if (error != null) {
+ log.error("Cannot create meter, TODO address me");
}
- bwps.add(bwpInfo);
- return bwps;
+
+ // then update the map with the MeterId
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.compute(deviceId, (d, entry) -> {
+ if (entry != null) {
+ entry.compute(bandwidthProfile, (bp, meterData) -> {
+ if (meterData != null) {
+ meterData.setMeterCellId(meter.meterCellId());
+ meterData.setMeterStatus(MeterState.ADDED);
+ }
+ return meterData;
+ });
+ }
+ return entry;
+ });
+ } finally {
+ programmedMeterWriteLock.unlock();
+ }
});
-
- return true;
- }
-
- @Override
- public void clearMeters(DeviceId deviceId) {
- log.debug("Removing all meters for device {}", deviceId);
- clearDeviceState(deviceId);
- meterService.purgeMeters(deviceId);
- }
-
- @Override
- public void clearDeviceState(DeviceId deviceId) {
- log.info("Clearing local device state for {}", deviceId);
- pendingRemoveMeters.remove(deviceId);
- removeMetersFromBpMapping(deviceId);
- //Following call handles cornercase of OLT delete during meter provisioning
- pendingMeters.remove(deviceId);
}
private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
@@ -408,119 +517,137 @@
.build();
}
- private void removeMeterFromBpMapping(MeterKey meterKey) {
- List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
- .filter(e -> e.getValue().equals(meterKey))
- .collect(Collectors.toList());
-
- meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
- }
-
- private void removeMetersFromBpMapping(DeviceId deviceId) {
- List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
- .filter(e -> e.getValue().deviceId().equals(deviceId))
- .collect(Collectors.toList());
-
- meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
- }
-
- /**
- * Checks for mastership or falls back to leadership on deviceId.
- * If the device is available use mastership,
- * otherwise fallback on leadership.
- * Leadership on the device topic is needed because the master can be NONE
- * in case the device went away, we still need to handle events
- * consistently
- */
- private boolean isLocalLeader(DeviceId deviceId) {
- if (deviceService.isAvailable(deviceId)) {
- return mastershipService.isLocalMaster(deviceId);
- } else {
- // Fallback with Leadership service - device id is used as topic
- NodeId leader = leadershipService.runForLeadership(
- deviceId.toString()).leaderNodeId();
- // Verify if this node is the leader
- return clusterService.getLocalNode().id().equals(leader);
+ private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {
+ if (!checkSadisRunning()) {
+ return null;
}
+ if (bandwidthProfile == null) {
+ return null;
+ }
+ return bpService.get(bandwidthProfile);
+ }
+
+ private boolean checkSadisRunning() {
+ if (bpService == null) {
+ log.warn("Sadis is not running");
+ return false;
+ }
+ return true;
}
private class InternalMeterListener implements MeterListener {
-
@Override
public void event(MeterEvent meterEvent) {
- eventExecutor.execute(() -> {
+ pendingRemovalMetersExecutor.execute(() -> {
+
Meter meter = meterEvent.subject();
- if (meter == null) {
- log.error("Meter in event {} is null", meterEvent);
+ if (!appId.equals(meter.appId())) {
return;
}
- if (isLocalLeader(meter.deviceId())) {
- MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
- if (deleteMeters &&
- MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
- log.info("Zero Count Meter Event is received. Meter is {} on {}",
- meter.id(), meter.deviceId());
- incrementMeterCount(meter.deviceId(), key);
- if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
- .get(key).get() == zeroReferenceMeterCount) {
- log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
- meter.id(), meter.deviceId());
+ if (log.isTraceEnabled()) {
+ log.trace("Received meter event {}", meterEvent);
+ }
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ if (meterEvent.type().equals(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO)) {
+ if (!oltDeviceService.isLocalLeader(meter.deviceId())) {
+ if (log.isTraceEnabled()) {
+ log.trace("ignoring meter event {} " +
+ "as not leader for {}", meterEvent, meter.deviceId());
+ }
+ return;
+ }
+ log.info("Zero Count Reference event is received for meter {} on {}, " +
+ "incrementing counter",
+ meter.id(), meter.deviceId());
+ incrementMeterCount(meter.deviceId(), key);
+ if (pendingRemoveMeters.get(meter.deviceId())
+ .get(key).get() == zeroReferenceMeterCount) {
+ // only delete the meters if the app is configured to do so
+ if (deleteMeters) {
+ log.info("Meter {} on device {} is unused, removing it", meter.id(), meter.deviceId());
deleteMeter(meter.deviceId(), meter.id());
}
}
- if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
- log.info("Meter Removed Event is received for {} on {}",
- meter.id(), meter.deviceId());
- pendingRemoveMeters.computeIfPresent(meter.deviceId(),
- (id, meters) -> {
- if (meters.get(key) == null) {
- log.info("Meters is not pending " +
- "{} on {}", key, id);
- return meters;
- }
- meters.remove(key);
- return meters;
- });
- removeMeterFromBpMapping(key);
- }
- } else {
- log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
+ }
+
+ if (meterEvent.type().equals(MeterEvent.Type.METER_REMOVED)) {
+ removeMeterCount(meter, key);
}
});
}
+ private void removeMeterCount(Meter meter, MeterKey key) {
+ pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+ (id, meters) -> {
+ if (meters.get(key) == null) {
+ log.info("Meters is not pending " +
+ "{} on {}", key, id);
+ return meters;
+ }
+ meters.remove(key);
+ return meters;
+ });
+ }
+
private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
if (key == null) {
return;
}
pendingRemoveMeters.compute(deviceId,
- (id, meters) -> {
- if (meters == null) {
- meters = new HashMap<>();
+ (id, meters) -> {
+ if (meters == null) {
+ meters = new HashMap<>();
- }
- if (meters.get(key) == null) {
- meters.put(key, new AtomicInteger(1));
- }
- meters.get(key).addAndGet(1);
- return meters;
- });
+ }
+ if (meters.get(key) == null) {
+ meters.put(key, new AtomicInteger(1));
+ }
+ meters.get(key).addAndGet(1);
+ return meters;
+ });
+ }
+ }
+
+ private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+ Meter meter = meterService.getMeter(deviceId, meterId);
+ if (meter != null) {
+ MeterRequest meterRequest = DefaultMeterRequest.builder()
+ .withBands(meter.bands())
+ .withUnit(meter.unit())
+ .forDevice(deviceId)
+ .fromApp(appId)
+ .burst()
+ .remove();
+
+ meterService.withdraw(meterRequest, meterId);
}
- private void deleteMeter(DeviceId deviceId, MeterId meterId) {
- Meter meter = meterService.getMeter(deviceId, meterId);
- if (meter != null) {
- MeterRequest meterRequest = DefaultMeterRequest.builder()
- .withBands(meter.bands())
- .withUnit(meter.unit())
- .forDevice(deviceId)
- .fromApp(appId)
- .burst()
- .remove();
-
- meterService.withdraw(meterRequest, meterId);
- }
+ // remove the meter from local caching
+ try {
+ programmedMeterWriteLock.lock();
+ programmedMeters.computeIfPresent(deviceId, (d, deviceMeters) -> {
+ Iterator<Map.Entry<String, MeterData>> iter = deviceMeters.entrySet().iterator();
+ while (iter.hasNext()) {
+ Map.Entry<String, MeterData> entry = iter.next();
+ if (entry.getValue().getMeterId().equals(meterId)) {
+ deviceMeters.remove(entry.getKey());
+ }
+ }
+ return deviceMeters;
+ });
+ } finally {
+ programmedMeterWriteLock.unlock();
}
}
+
+ protected void bindSadisService(SadisService service) {
+ this.bpService = service.getBandwidthProfileService();
+ log.info("Sadis service is loaded");
+ }
+
+ protected void unbindSadisService(SadisService service) {
+ this.bpService = null;
+ log.info("Sadis service is unloaded");
+ }
}