Adding Meter count property and mastership check
Change-Id: I6203292510be3dbfa59429915f86d80c42b71a77
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index ab603b1..4b88344 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -15,13 +15,14 @@
*/
package org.opencord.olt.impl;
+import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toSet;
+import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
@@ -44,9 +45,14 @@
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
@@ -83,6 +89,7 @@
*/
@Component(immediate = true, property = {
DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
+ ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
})
public class OltMeterService implements AccessDeviceMeterService {
@@ -98,11 +105,28 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected DeviceService deviceService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected MastershipService mastershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
/**
* Delete meters when reference count drops to zero.
*/
protected boolean deleteMeters = DELETE_METERS_DEFAULT;
+ /**
+ * Number of Zero References received before deleting the meter.
+ */
+ protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
+
private ApplicationId appId;
private static final String APP_NAME = "org.opencord.olt";
@@ -164,6 +188,11 @@
if (d != null) {
deleteMeters = d;
}
+
+ String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
+ zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
+ Integer.parseInt(zeroReferenceMeterCountNew.trim());
+
}
@Override
@@ -395,6 +424,26 @@
meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
}
+ /**
+ * Checks for mastership or falls back to leadership on deviceId.
+ * If the device is available use mastership,
+ * otherwise fallback on leadership.
+ * Leadership on the device topic is needed because the master can be NONE
+ * in case the device went away, we still need to handle events
+ * consistently
+ */
+ private boolean isLocalLeader(DeviceId deviceId) {
+ if (deviceService.isAvailable(deviceId)) {
+ return mastershipService.isLocalMaster(deviceId);
+ } else {
+ // Fallback with Leadership service - device id is used as topic
+ NodeId leader = leadershipService.runForLeadership(
+ deviceId.toString()).leaderNodeId();
+ // Verify if this node is the leader
+ return clusterService.getLocalNode().id().equals(leader);
+ }
+ }
+
private class InternalMeterListener implements MeterListener {
@Override
@@ -405,33 +454,38 @@
log.error("Meter in event {} is null", meterEvent);
return;
}
- MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
- if (deleteMeters && MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
- log.info("Zero Count Meter Event is received. Meter is {} on {}",
- meter.id(), meter.deviceId());
- incrementMeterCount(meter.deviceId(), key);
-
- if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
- .get(key).get() == 3) {
- log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
+ if (isLocalLeader(meter.deviceId())) {
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ if (deleteMeters &&
+ MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
+ log.info("Zero Count Meter Event is received. Meter is {} on {}",
meter.id(), meter.deviceId());
- deleteMeter(meter.deviceId(), meter.id());
+ incrementMeterCount(meter.deviceId(), key);
+
+ if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
+ .get(key).get() == zeroReferenceMeterCount) {
+ log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
+ meter.id(), meter.deviceId());
+ deleteMeter(meter.deviceId(), meter.id());
+ }
}
- }
- if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
- log.info("Meter Removed Event is received for {} on {}",
- meter.id(), meter.deviceId());
- pendingRemoveMeters.computeIfPresent(meter.deviceId(),
- (id, meters) -> {
- if (meters.get(key) == null) {
- log.info("Meters is not pending " +
- "{} on {}", key, id);
- return meters;
- }
- meters.remove(key);
- return meters;
- });
- removeMeterFromBpMapping(key);
+ if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
+ log.info("Meter Removed Event is received for {} on {}",
+ meter.id(), meter.deviceId());
+ pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+ (id, meters) -> {
+ if (meters.get(key) == null) {
+ log.info("Meters is not pending " +
+ "{} on {}", key, id);
+ return meters;
+ }
+ meters.remove(key);
+ return meters;
+ });
+ removeMeterFromBpMapping(key);
+ }
+ } else {
+ log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
}
});
}
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 0e82f37..3657436 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -45,6 +45,10 @@
public static final String DEFAULT_TP_ID = "defaultTechProfileId";
public static final int DEFAULT_TP_ID_DEFAULT = 64;
+ public static final String ZERO_REFERENCE_METER_COUNT = "zeroReferenceMeterCount";
+ public static final int ZERO_REFERENCE_METER_COUNT_DEFAULT = 3;
+
+
public static final String DEFAULT_BP_ID = "defaultBpId";
public static final String DEFAULT_BP_ID_DEFAULT = "Default";