[JIRA-3148] install only one meter per bandwidth profile on eapol flows

Change-Id: Ie2f1922cf3854b47d9a2d34a44410ef883b3fe2e
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index d5e8e32..fe2675e 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -41,7 +41,6 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -84,6 +83,7 @@
 import org.opencord.olt.AccessSubscriberId;
 import org.opencord.olt.internalapi.AccessDeviceFlowService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -185,7 +185,6 @@
 
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
 
-    private Set<DeviceBandwidthProfile> pendingMeters;
     private Set<SubscriberFlowInfo> pendingSubscribers;
 
     @Activate
@@ -208,7 +207,6 @@
                 .withApplicationId(appId)
                 .build();
 
-        pendingMeters = ConcurrentHashMap.newKeySet();
         pendingSubscribers = Sets.newConcurrentHashSet();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
@@ -267,7 +265,7 @@
 
     @Override
     public boolean provisionSubscriber(ConnectPoint connectPoint) {
-        log.info("Call to provisioning subscriber at {}", connectPoint);
+        log.info("Call to provision subscriber at {}", connectPoint);
         DeviceId deviceId = connectPoint.deviceId();
         PortNumber subscriberPortNo = connectPoint.port();
 
@@ -297,6 +295,9 @@
         filterFuture.thenAcceptAsync(filterStatus -> {
             if (filterStatus == null) {
                 provisionUniTagList(connectPoint, uplinkPort.number(), sub);
+            } else {
+                log.error("The filtering future did not complete properly {} " +
+                                  "subscriber on {} is not provisioned", filterStatus, connectPoint);
             }
         });
         return true;
@@ -703,16 +704,18 @@
         }
     }
     private void checkAndCreateDevMeter(DeviceBandwidthProfile dm) {
-        if (pendingMeters.contains(dm)) {
+        if (oltMeterService.isMeterPending(dm)) {
+            log.debug("Meter is already pending {}", dm);
             return;
         }
-        pendingMeters.add(dm);
+        oltMeterService.addToPendingMeters(dm);
         createMeter(dm);
     }
 
     private void createMeter(DeviceBandwidthProfile dm) {
-        log.debug("Creating Meter {} from queue", dm);
+        log.debug("Creating Meter {} from queue for subscriber", dm);
         CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
         MeterId meterId = oltMeterService.createMeter(dm.getDevId(), dm.getBwInfo(),
                                                       meterFuture);
 
@@ -743,13 +746,13 @@
                         handleSubFlowsWithMeters(fi);
                         subsIterator.remove();
                     }
-                    pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+                    oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
                 } else {
                     // meter install failed
                     log.error("Addition of subscriber {} failed due to meter " +
                                       "{} with result {}", fi, meterId, result);
                     subsIterator.remove();
-                    pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+                    oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
                 }
             }
         });