[JIRA-3148] install only one meter per bandwidth profile on eapol flows

Change-Id: Ie2f1922cf3854b47d9a2d34a44410ef883b3fe2e
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 83032c0..b4dca74 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -49,6 +49,7 @@
 import org.onosproject.net.meter.MeterId;
 import org.opencord.olt.internalapi.AccessDeviceFlowService;
 import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
 import org.opencord.sadis.BandwidthProfileInformation;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -64,6 +65,7 @@
 import org.slf4j.Logger;
 
 import java.util.Dictionary;
+import java.util.Iterator;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -155,6 +157,7 @@
     protected BaseInformationService<BandwidthProfileInformation> bpService;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
     private Set<ConnectPoint> pendingAddEapol = Sets.newConcurrentHashSet();
+    private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -419,11 +422,26 @@
         CompletableFuture<Object> meterFuture = new CompletableFuture<>();
 
         // check if meter exists and create it only for an install
-        MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+        final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+        DeviceBandwidthProfile dm = new DeviceBandwidthProfile(devId, bpInfo);
         if (meterId == null) {
             if (install) {
-                meterId = oltMeterService.createMeter(devId, bpInfo, meterFuture);
-                treatmentBuilder.meter(meterId);
+                log.debug("Need to install meter for EAPOL with bwp {}", bpInfo.id());
+                SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+                                                               new UniTagInformation.Builder()
+                                                                       .setPonCTag(vlanId).build(),
+                                                               null, null,
+                                                               null, bpInfo.id());
+                pendingEapolForMeters.add(fi);
+
+                if (oltMeterService.isMeterPending(dm)) {
+                    log.debug("Meter {} is already pending for EAPOL", dm);
+                    return;
+                }
+                oltMeterService.addToPendingMeters(dm);
+                MeterId innerMeterId = oltMeterService.createMeter(devId, bpInfo,
+                                                                   meterFuture);
+                fi.setUpMeterId(innerMeterId);
             } else {
                 // this case should not happen as the request to remove an eapol
                 // flow should mean that the flow points to a meter that exists.
@@ -431,63 +449,100 @@
                 // correct 'match' to do so.
                 log.warn("Unknown meter id for bp {}, still proceeding with "
                         + "delete of eapol flow for {}/{}", bpInfo.id(), devId, portNumber);
-                meterFuture.complete(null);
+                SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+                                                               new UniTagInformation.Builder()
+                                                                       .setPonCTag(vlanId).build(),
+                                                               null, meterId,
+                                                               null, bpInfo.id());
+                handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
             }
         } else {
             log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
-            treatmentBuilder.meter(meterId);
-            meterFuture.complete(null);
+            SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+                                                           new UniTagInformation.Builder()
+                                                                   .setPonCTag(vlanId).build(),
+                                                           null, meterId,
+                                                           null, bpInfo.id());
+            handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
+            //No need for the future, meter is present.
+            return;
         }
-
-        final MeterId mId = meterId;
         meterFuture.thenAcceptAsync(result -> {
-            if (result == null) {
-                log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
-                        mId, bpId, devId, portNumber,
-                        (install) ? "Installing" : "Removing");
-                int techProfileId = getDefaultTechProfileId(devId, portNumber);
-
-                //Authentication trap flow uses only tech profile id as write metadata value
-                FilteringObjective eapol = (install ? builder.permit() : builder.deny())
-                        .withKey(Criteria.matchInPort(portNumber))
-                        .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
-                        .addCondition(Criteria.matchVlanId(vlanId))
-                        .withMeta(treatmentBuilder
-                                .writeMetadata(createTechProfValueForWm(vlanId, techProfileId), 0)
-                                .setOutput(PortNumber.CONTROLLER).build())
-                        .fromApp(appId)
-                        .withPriority(MAX_PRIORITY)
-                        .add(new ObjectiveContext() {
-                            @Override
-                            public void onSuccess(Objective objective) {
-                                log.info("Eapol filter for {} on {} {} with meter {}.",
-                                        devId, portNumber, (install) ? INSTALLED : REMOVED, mId);
-                                if (filterFuture != null) {
-                                    filterFuture.complete(null);
-                                }
-                                pendingAddEapol.remove(cp);
-                            }
-
-                            @Override
-                            public void onError(Objective objective, ObjectiveError error) {
-                                log.info("Eapol filter for {} on {} with meter {} failed {} because {}",
-                                        devId, portNumber, mId, (install) ? INSTALLATION : REMOVAL,
-                                        error);
-                                if (filterFuture != null) {
-                                    filterFuture.complete(error);
-                                }
-                                pendingAddEapol.remove(cp);
-                            }
-                        });
-
-                flowObjectiveService.filter(devId, eapol);
-            } else {
-                log.warn("Meter installation error while sending eapol trap flow. " +
-                        "Result {} and MeterId {}", result, mId);
+            //for each pending eapol flow we check if the meter is there.
+            Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
+            while (eapIterator.hasNext()) {
+                SubscriberFlowInfo fi = eapIterator.next();
+                if (result == null) {
+                    MeterId mId = oltMeterService
+                            .getMeterIdFromBpMapping(dm.getDevId(), fi.getUpBpInfo());
+                    if (mId != null) {
+                        handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, mId);
+                        eapIterator.remove();
+                    }
+                } else {
+                    log.warn("Meter installation error while sending eapol trap flow. " +
+                                     "Result {} and MeterId {}", result, meterId);
+                    eapIterator.remove();
+                }
+                oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(
+                        dm.getDevId(), dm.getBwInfo()));
             }
         });
     }
 
+    private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
+                             boolean install, ConnectPoint cp,
+                             DefaultFilteringObjective.Builder builder,
+                             TrafficTreatment.Builder treatmentBuilder,
+                             SubscriberFlowInfo fi, MeterId mId) {
+        log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
+                 mId, fi.getUpBpInfo(), fi.getDevId(), fi.getUniPort(),
+                 (install) ? "Installing" : "Removing");
+        int techProfileId = getDefaultTechProfileId(fi.getDevId(), fi.getUniPort());
+        // can happen in case of removal
+        if (mId != null) {
+            treatmentBuilder.meter(mId);
+        }
+        //Authentication trap flow uses only tech profile id as write metadata value
+        FilteringObjective eapol = (install ? builder.permit() : builder.deny())
+                .withKey(Criteria.matchInPort(fi.getUniPort()))
+                .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+                .addCondition(Criteria.matchVlanId(fi.getTagInfo().getPonCTag()))
+                .withMeta(treatmentBuilder
+                                  .writeMetadata(createTechProfValueForWm(
+                                          fi.getTagInfo().getPonCTag(),
+                                          techProfileId), 0)
+                                  .setOutput(PortNumber.CONTROLLER).build())
+                .fromApp(appId)
+                .withPriority(MAX_PRIORITY)
+                .add(new ObjectiveContext() {
+                    @Override
+                    public void onSuccess(Objective objective) {
+                        log.info("Eapol filter for {} on {} {} with meter {}.",
+                                 fi.getDevId(), fi.getUniPort(),
+                                 (install) ? INSTALLED : REMOVED, mId);
+                        if (filterFuture != null) {
+                            filterFuture.complete(null);
+                        }
+                        pendingAddEapol.remove(cp);
+                    }
+
+                    @Override
+                    public void onError(Objective objective, ObjectiveError error) {
+                        log.error("Eapol filter for {} on {} with meter {} " +
+                                         "failed {} because {}",
+                                 fi.getDevId(), fi.getUniPort(), mId,
+                                 (install) ? INSTALLATION : REMOVAL,
+                                 error);
+                        if (filterFuture != null) {
+                            filterFuture.complete(error);
+                        }
+                        pendingAddEapol.remove(cp);
+                    }
+                });
+        flowObjectiveService.filter(fi.getDevId(), eapol);
+    }
+
     /**
      * Installs trap filtering objectives for particular traffic types on an
      * NNI port.