Adding Meter count property and mastership check

Change-Id: I6203292510be3dbfa59429915f86d80c42b71a77
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
index ab603b1..4b88344 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -15,13 +15,14 @@
  */
 package org.opencord.olt.impl;
 
+import static com.google.common.base.Strings.isNullOrEmpty;
 import static java.util.stream.Collectors.collectingAndThen;
 import static java.util.stream.Collectors.groupingBy;
 import static java.util.stream.Collectors.mapping;
 import static java.util.stream.Collectors.toSet;
+import static org.onlab.util.Tools.get;
 import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DELETE_METERS_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
@@ -44,9 +45,14 @@
 import org.onlab.util.KryoNamespace;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
+import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
+import org.onosproject.net.device.DeviceService;
 import org.onosproject.net.flowobjective.ObjectiveError;
 import org.onosproject.net.meter.Band;
 import org.onosproject.net.meter.DefaultBand;
@@ -83,6 +89,7 @@
  */
 @Component(immediate = true, property = {
         DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
+        ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
         })
 public class OltMeterService implements AccessDeviceMeterService {
 
@@ -98,11 +105,28 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected StorageService storageService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected DeviceService deviceService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected MastershipService mastershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
     /**
      * Delete meters when reference count drops to zero.
      */
     protected boolean deleteMeters = DELETE_METERS_DEFAULT;
 
+    /**
+     * Number of Zero References received before deleting the meter.
+     */
+    protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
+
     private ApplicationId appId;
     private static final String APP_NAME = "org.opencord.olt";
 
@@ -164,6 +188,11 @@
         if (d != null) {
             deleteMeters = d;
         }
+
+        String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
+        zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
+                Integer.parseInt(zeroReferenceMeterCountNew.trim());
+
     }
 
     @Override
@@ -395,6 +424,26 @@
         meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
     }
 
+    /**
+     * Checks for mastership or falls back to leadership on deviceId.
+     * If the device is available use mastership,
+     * otherwise fallback on leadership.
+     * Leadership on the device topic is needed because the master can be NONE
+     * in case the device went away, we still need to handle events
+     * consistently
+     */
+    private boolean isLocalLeader(DeviceId deviceId) {
+        if (deviceService.isAvailable(deviceId)) {
+            return mastershipService.isLocalMaster(deviceId);
+        } else {
+            // Fallback with Leadership service - device id is used as topic
+            NodeId leader = leadershipService.runForLeadership(
+                    deviceId.toString()).leaderNodeId();
+            // Verify if this node is the leader
+            return clusterService.getLocalNode().id().equals(leader);
+        }
+    }
+
     private class InternalMeterListener implements MeterListener {
 
         @Override
@@ -405,33 +454,38 @@
                     log.error("Meter in event {} is null", meterEvent);
                     return;
                 }
-                MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
-                if (deleteMeters && MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
-                    log.info("Zero Count Meter Event is received. Meter is {} on {}",
-                             meter.id(), meter.deviceId());
-                    incrementMeterCount(meter.deviceId(), key);
-
-                    if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
-                            .get(key).get() == 3) {
-                        log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
+                if (isLocalLeader(meter.deviceId())) {
+                    MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+                    if (deleteMeters &&
+                            MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
+                        log.info("Zero Count Meter Event is received. Meter is {} on {}",
                                  meter.id(), meter.deviceId());
-                        deleteMeter(meter.deviceId(), meter.id());
+                        incrementMeterCount(meter.deviceId(), key);
+
+                        if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
+                                .get(key).get() == zeroReferenceMeterCount) {
+                            log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
+                                     meter.id(), meter.deviceId());
+                            deleteMeter(meter.deviceId(), meter.id());
+                        }
                     }
-                }
-                if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
-                    log.info("Meter Removed Event is received for {} on {}",
-                             meter.id(), meter.deviceId());
-                    pendingRemoveMeters.computeIfPresent(meter.deviceId(),
-                                                (id, meters) -> {
-                                                    if (meters.get(key) == null) {
-                                                        log.info("Meters is not pending " +
-                                                                         "{} on {}", key, id);
-                                                        return meters;
-                                                    }
-                                                    meters.remove(key);
-                                                    return meters;
-                                                });
-                    removeMeterFromBpMapping(key);
+                    if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
+                        log.info("Meter Removed Event is received for {} on {}",
+                                 meter.id(), meter.deviceId());
+                        pendingRemoveMeters.computeIfPresent(meter.deviceId(),
+                                                             (id, meters) -> {
+                                                                 if (meters.get(key) == null) {
+                                                                     log.info("Meters is not pending " +
+                                                                                      "{} on {}", key, id);
+                                                                     return meters;
+                                                                 }
+                                                                 meters.remove(key);
+                                                                 return meters;
+                                                             });
+                        removeMeterFromBpMapping(key);
+                    }
+                } else {
+                    log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
                 }
             });
         }
diff --git a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 0e82f37..3657436 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -45,6 +45,10 @@
     public static final String DEFAULT_TP_ID = "defaultTechProfileId";
     public static final int DEFAULT_TP_ID_DEFAULT = 64;
 
+    public static final String ZERO_REFERENCE_METER_COUNT = "zeroReferenceMeterCount";
+    public static final int ZERO_REFERENCE_METER_COUNT_DEFAULT = 3;
+
+
     public static final String DEFAULT_BP_ID = "defaultBpId";
     public static final String DEFAULT_BP_ID_DEFAULT = "Default";