[JIRA-3148] install only one meter per bandwidth profile on eapol flows

Change-Id: Ie2f1922cf3854b47d9a2d34a44410ef883b3fe2e
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index d5e8e32..fe2675e 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -41,7 +41,6 @@
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 
@@ -84,6 +83,7 @@
 import org.opencord.olt.AccessSubscriberId;
 import org.opencord.olt.internalapi.AccessDeviceFlowService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -185,7 +185,6 @@
 
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
 
-    private Set<DeviceBandwidthProfile> pendingMeters;
     private Set<SubscriberFlowInfo> pendingSubscribers;
 
     @Activate
@@ -208,7 +207,6 @@
                 .withApplicationId(appId)
                 .build();
 
-        pendingMeters = ConcurrentHashMap.newKeySet();
         pendingSubscribers = Sets.newConcurrentHashSet();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
@@ -267,7 +265,7 @@
 
     @Override
     public boolean provisionSubscriber(ConnectPoint connectPoint) {
-        log.info("Call to provisioning subscriber at {}", connectPoint);
+        log.info("Call to provision subscriber at {}", connectPoint);
         DeviceId deviceId = connectPoint.deviceId();
         PortNumber subscriberPortNo = connectPoint.port();
 
@@ -297,6 +295,9 @@
         filterFuture.thenAcceptAsync(filterStatus -> {
             if (filterStatus == null) {
                 provisionUniTagList(connectPoint, uplinkPort.number(), sub);
+            } else {
+                log.error("The filtering future did not complete properly {} " +
+                                  "subscriber on {} is not provisioned", filterStatus, connectPoint);
             }
         });
         return true;
@@ -703,16 +704,18 @@
         }
     }
     private void checkAndCreateDevMeter(DeviceBandwidthProfile dm) {
-        if (pendingMeters.contains(dm)) {
+        if (oltMeterService.isMeterPending(dm)) {
+            log.debug("Meter is already pending {}", dm);
             return;
         }
-        pendingMeters.add(dm);
+        oltMeterService.addToPendingMeters(dm);
         createMeter(dm);
     }
 
     private void createMeter(DeviceBandwidthProfile dm) {
-        log.debug("Creating Meter {} from queue", dm);
+        log.debug("Creating Meter {} from queue for subscriber", dm);
         CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
         MeterId meterId = oltMeterService.createMeter(dm.getDevId(), dm.getBwInfo(),
                                                       meterFuture);
 
@@ -743,13 +746,13 @@
                         handleSubFlowsWithMeters(fi);
                         subsIterator.remove();
                     }
-                    pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+                    oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
                 } else {
                     // meter install failed
                     log.error("Addition of subscriber {} failed due to meter " +
                                       "{} with result {}", fi, meterId, result);
                     subsIterator.remove();
-                    pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+                    oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
                 }
             }
         });
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 83032c0..b4dca74 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -49,6 +49,7 @@
 import org.onosproject.net.meter.MeterId;
 import org.opencord.olt.internalapi.AccessDeviceFlowService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -64,6 +65,7 @@
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -155,6 +157,7 @@
     protected BaseInformationService<BandwidthProfileInformation> bpService;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
     private Set<ConnectPoint> pendingAddEapol = Sets.newConcurrentHashSet();
+    private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -419,11 +422,26 @@
         CompletableFuture<Object> meterFuture = new CompletableFuture<>();
 
         // check if meter exists and create it only for an install
-        MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+        final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+        DeviceBandwidthProfile dm = new DeviceBandwidthProfile(devId, bpInfo);
         if (meterId == null) {
             if (install) {
-                meterId = oltMeterService.createMeter(devId, bpInfo, meterFuture);
-                treatmentBuilder.meter(meterId);
+                log.debug("Need to install meter for EAPOL with bwp {}", bpInfo.id());
+                SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+                                                               new UniTagInformation.Builder()
+                                                                       .setPonCTag(vlanId).build(),
+                                                               null, null,
+                                                               null, bpInfo.id());
+                pendingEapolForMeters.add(fi);
+
+                if (oltMeterService.isMeterPending(dm)) {
+                    log.debug("Meter {} is already pending for EAPOL", dm);
+                    return;
+                }
+                oltMeterService.addToPendingMeters(dm);
+                MeterId innerMeterId = oltMeterService.createMeter(devId, bpInfo,
+                                                                   meterFuture);
+                fi.setUpMeterId(innerMeterId);
             } else {
                 // this case should not happen as the request to remove an eapol
                 // flow should mean that the flow points to a meter that exists.
@@ -431,63 +449,100 @@
                 // correct 'match' to do so.
                 log.warn("Unknown meter id for bp {}, still proceeding with "
                         + "delete of eapol flow for {}/{}", bpInfo.id(), devId, portNumber);
-                meterFuture.complete(null);
+                SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+                                                               new UniTagInformation.Builder()
+                                                                       .setPonCTag(vlanId).build(),
+                                                               null, meterId,
+                                                               null, bpInfo.id());
+                handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
             }
         } else {
             log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
-            treatmentBuilder.meter(meterId);
-            meterFuture.complete(null);
+            SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+                                                           new UniTagInformation.Builder()
+                                                                   .setPonCTag(vlanId).build(),
+                                                           null, meterId,
+                                                           null, bpInfo.id());
+            handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
+            //No need for the future, meter is present.
+            return;
         }
-
-        final MeterId mId = meterId;
         meterFuture.thenAcceptAsync(result -> {
-            if (result == null) {
-                log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
-                        mId, bpId, devId, portNumber,
-                        (install) ? "Installing" : "Removing");
-                int techProfileId = getDefaultTechProfileId(devId, portNumber);
-
-                //Authentication trap flow uses only tech profile id as write metadata value
-                FilteringObjective eapol = (install ? builder.permit() : builder.deny())
-                        .withKey(Criteria.matchInPort(portNumber))
-                        .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
-                        .addCondition(Criteria.matchVlanId(vlanId))
-                        .withMeta(treatmentBuilder
-                                .writeMetadata(createTechProfValueForWm(vlanId, techProfileId), 0)
-                                .setOutput(PortNumber.CONTROLLER).build())
-                        .fromApp(appId)
-                        .withPriority(MAX_PRIORITY)
-                        .add(new ObjectiveContext() {
-                            @Override
-                            public void onSuccess(Objective objective) {
-                                log.info("Eapol filter for {} on {} {} with meter {}.",
-                                        devId, portNumber, (install) ? INSTALLED : REMOVED, mId);
-                                if (filterFuture != null) {
-                                    filterFuture.complete(null);
-                                }
-                                pendingAddEapol.remove(cp);
-                            }
-
-                            @Override
-                            public void onError(Objective objective, ObjectiveError error) {
-                                log.info("Eapol filter for {} on {} with meter {} failed {} because {}",
-                                        devId, portNumber, mId, (install) ? INSTALLATION : REMOVAL,
-                                        error);
-                                if (filterFuture != null) {
-                                    filterFuture.complete(error);
-                                }
-                                pendingAddEapol.remove(cp);
-                            }
-                        });
-
-                flowObjectiveService.filter(devId, eapol);
-            } else {
-                log.warn("Meter installation error while sending eapol trap flow. " +
-                        "Result {} and MeterId {}", result, mId);
+            //for each pending eapol flow we check if the meter is there.
+            Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
+            while (eapIterator.hasNext()) {
+                SubscriberFlowInfo fi = eapIterator.next();
+                if (result == null) {
+                    MeterId mId = oltMeterService
+                            .getMeterIdFromBpMapping(dm.getDevId(), fi.getUpBpInfo());
+                    if (mId != null) {
+                        handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, mId);
+                        eapIterator.remove();
+                    }
+                } else {
+                    log.warn("Meter installation error while sending eapol trap flow. " +
+                                     "Result {} and MeterId {}", result, meterId);
+                    eapIterator.remove();
+                }
+                oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(
+                        dm.getDevId(), dm.getBwInfo()));
             }
         });
     }
 
+    private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
+                             boolean install, ConnectPoint cp,
+                             DefaultFilteringObjective.Builder builder,
+                             TrafficTreatment.Builder treatmentBuilder,
+                             SubscriberFlowInfo fi, MeterId mId) {
+        log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
+                 mId, fi.getUpBpInfo(), fi.getDevId(), fi.getUniPort(),
+                 (install) ? "Installing" : "Removing");
+        int techProfileId = getDefaultTechProfileId(fi.getDevId(), fi.getUniPort());
+        // can happen in case of removal
+        if (mId != null) {
+            treatmentBuilder.meter(mId);
+        }
+        //Authentication trap flow uses only tech profile id as write metadata value
+        FilteringObjective eapol = (install ? builder.permit() : builder.deny())
+                .withKey(Criteria.matchInPort(fi.getUniPort()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+                .addCondition(Criteria.matchVlanId(fi.getTagInfo().getPonCTag()))
+                .withMeta(treatmentBuilder
+                                  .writeMetadata(createTechProfValueForWm(
+                                          fi.getTagInfo().getPonCTag(),
+                                          techProfileId), 0)
+                                  .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Eapol filter for {} on {} {} with meter {}.",
+                                 fi.getDevId(), fi.getUniPort(),
+                                 (install) ? INSTALLED : REMOVED, mId);
+                        if (filterFuture != null) {
+                            filterFuture.complete(null);
+                        }
+                        pendingAddEapol.remove(cp);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Eapol filter for {} on {} with meter {} " +
+                                         "failed {} because {}",
+                                 fi.getDevId(), fi.getUniPort(), mId,
+                                 (install) ? INSTALLATION : REMOVAL,
+                                 error);
+                        if (filterFuture != null) {
+                            filterFuture.complete(error);
+                        }
+                        pendingAddEapol.remove(cp);
+                    }
+                });
+        flowObjectiveService.filter(fi.getDevId(), eapol);
+    }
+
     /**
      * Installs trap filtering objectives for particular traffic types on an
      * NNI port.
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
index be15854..65a72f0 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -43,6 +43,7 @@
 import org.onosproject.store.service.Serializer;
 import org.onosproject.store.service.StorageService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -60,7 +61,9 @@
 import java.util.Map;
 import java.util.Optional;
 import java.util.Properties;
+import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -109,6 +112,8 @@
 
     protected ExecutorService eventExecutor;
 
+    private Set<DeviceBandwidthProfile> pendingMeters;
+
     @Activate
     public void activate(ComponentContext context) {
         eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
@@ -129,6 +134,7 @@
 
         meterService.addListener(meterListener);
         componentConfigService.registerProperties(getClass());
+        pendingMeters = ConcurrentHashMap.newKeySet();
         log.info("Olt Meter service started");
     }
 
@@ -195,6 +201,7 @@
     @Override
     public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
                                CompletableFuture<Object> meterFuture) {
+        log.debug("Installing meter on {} for {}", deviceId, bpInfo);
         if (bpInfo == null) {
             log.warn("Requested bandwidth profile information is NULL");
             meterFuture.complete(ObjectiveError.BADPARAMS);
@@ -218,7 +225,7 @@
                     @Override
                     public void onSuccess(MeterRequest op) {
                         log.debug("Meter {} is installed on the device {}",
-                                 meterId, deviceId);
+                                 meterIdRef.get(), deviceId);
                         addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
                         meterFuture.complete(null);
                     }
@@ -244,6 +251,21 @@
     }
 
     @Override
+    public void addToPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+        pendingMeters.add(deviceBandwidthProfile);
+    }
+
+    @Override
+    public void removeFromPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+        pendingMeters.remove(deviceBandwidthProfile);
+    }
+
+    @Override
+    public boolean isMeterPending(DeviceBandwidthProfile deviceBandwidthProfile) {
+        return pendingMeters.contains(deviceBandwidthProfile);
+    }
+
+    @Override
     public void clearMeters(DeviceId deviceId) {
         List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
                 .filter(e -> e.getValue().deviceId().equals(deviceId))
diff --git a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
index 48570cd..bad2e5b 100644
--- a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
+++ b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
@@ -68,6 +68,30 @@
                         CompletableFuture<Object> meterFuture);
 
     /**
+     * Adds the DeviceBandwidthProfile to the pendingMeters.
+     *
+     * @param deviceBandwidthProfile the device to bandwidth profile mapping
+     */
+    void addToPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile);
+
+    /**
+     * Removes the DeviceBandwidthProfile from the pendingMeters.
+     *
+     * @param deviceBandwidthProfile the device to bandwidth profile mapping
+     *
+     */
+    void removeFromPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile);
+
+    /**
+     * Checks if DeviceBandwidthProfile is pending.
+     *
+     * @param deviceBandwidthProfile the device to bandwidth profile mapping
+     *
+     * @return true if pending.
+     */
+    boolean isMeterPending(DeviceBandwidthProfile deviceBandwidthProfile);
+
+    /**
      * Clears out bandwidth profile to meter mappings for the given device.
      *
      * @param deviceId device ID
diff --git a/app/src/main/java/org/opencord/olt/impl/DeviceBandwidthProfile.java b/app/src/main/java/org/opencord/olt/internalapi/DeviceBandwidthProfile.java
similarity index 88%
rename from app/src/main/java/org/opencord/olt/impl/DeviceBandwidthProfile.java
rename to app/src/main/java/org/opencord/olt/internalapi/DeviceBandwidthProfile.java
index 49a4a34..8f676ec 100644
--- a/app/src/main/java/org/opencord/olt/impl/DeviceBandwidthProfile.java
+++ b/app/src/main/java/org/opencord/olt/internalapi/DeviceBandwidthProfile.java
@@ -13,7 +13,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.opencord.olt.impl;
+package org.opencord.olt.internalapi;
 
 import org.onosproject.net.DeviceId;
 import org.opencord.sadis.BandwidthProfileInformation;
@@ -23,7 +23,7 @@
 /**
  * Class containing a mapping of DeviceId to BandwidthProfileInformation.
  */
-class DeviceBandwidthProfile {
+public class DeviceBandwidthProfile {
     private final DeviceId devId;
     private BandwidthProfileInformation bwInfo;
 
@@ -33,7 +33,7 @@
      * @param devId  the device id
      * @param bwInfo the bandwidth profile information
      */
-    DeviceBandwidthProfile(DeviceId devId, BandwidthProfileInformation bwInfo) {
+    public DeviceBandwidthProfile(DeviceId devId, BandwidthProfileInformation bwInfo) {
         this.devId = devId;
         this.bwInfo = bwInfo;
     }
@@ -43,7 +43,7 @@
      *
      * @return device id.
      */
-    DeviceId getDevId() {
+    public DeviceId getDevId() {
         return devId;
     }
 
@@ -52,7 +52,7 @@
      *
      * @return bandwidth profile information
      */
-    BandwidthProfileInformation getBwInfo() {
+    public BandwidthProfileInformation getBwInfo() {
         return bwInfo;
     }
 
diff --git a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
index e0bd109..f259e6f 100644
--- a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -48,6 +48,7 @@
 import org.onosproject.net.flowobjective.Objective;
 import org.onosproject.net.meter.MeterId;
 import org.onosproject.net.meter.MeterKey;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.UniTagInformation;
 
@@ -205,6 +206,20 @@
         }
 
         @Override
+        public void addToPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+
+        }
+
+        @Override
+        public void removeFromPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+        }
+
+        @Override
+        public boolean isMeterPending(DeviceBandwidthProfile deviceBandwidthProfile) {
+            return false;
+        }
+
+        @Override
         public void clearMeters(DeviceId deviceId) {
         }
     }