[VOL-3621] Using a blocking queue for the pending subscriber and eapol flows

Change-Id: I08106752b236ab507f54cf1ded2ccf5c56c4e4b8
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 58344e6..bf99fe4 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -15,28 +15,6 @@
  */
 package org.opencord.olt.impl;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Dictionary;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 import org.onlab.packet.EthType;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.IPv6;
@@ -84,7 +62,18 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Sets;
+import java.util.Dictionary;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Provisions flow rules on access devices.
@@ -167,7 +156,8 @@
     protected ApplicationId appId;
     protected BaseInformationService<BandwidthProfileInformation> bpService;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
+    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
+            = new ConcurrentHashMap<>();
 
     @Activate
     public void activate(ComponentContext context) {
@@ -459,7 +449,13 @@
                                                                        .setPonCTag(vlanId).build(),
                                                                null, null,
                                                                null, bpInfo.id());
-                pendingEapolForMeters.add(fi);
+                pendingEapolForDevice.compute(devId, (id, queue) -> {
+                    if (queue == null) {
+                        queue = new LinkedBlockingQueue<>();
+                    }
+                    queue.add(fi);
+                    return queue;
+                });
 
                 //If false the meter is already being installed, skipping installation
                 if (!oltMeterService.checkAndAddPendingMeter(devId, bpInfo)) {
@@ -495,27 +491,32 @@
         }
         meterFuture.thenAcceptAsync(result -> {
             //for each pending eapol flow we check if the meter is there.
-            //TODO possibly the iterator gets taken at time t0, while the sub is added at time t1
-            // and thus never considered
-            Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
-            while (eapIterator.hasNext()) {
-                SubscriberFlowInfo fi = eapIterator.next();
-                log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
-                if (result == null) {
-                    MeterId mId = oltMeterService
-                            .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
-                    if (mId != null) {
-                        log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
-                                cp.toString());
-                        handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
-                        eapIterator.remove();
+            BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(devId);
+            if (queue != null) {
+                while (true) {
+                    SubscriberFlowInfo fi = queue.remove();
+                    if (fi == null) {
+                        break;
                     }
-                } else {
-                    log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
-                                     "Result {} and MeterId {}", cp.toString(), result, meterId);
-                    eapIterator.remove();
+                    //TODO this might return the reference and not the actual object
+                    // so it can be actually swapped underneath us.
+                    log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
+                    if (result == null) {
+                        MeterId mId = oltMeterService
+                                .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
+                        if (mId != null) {
+                            log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
+                                      cp.toString());
+                            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+                        }
+                    } else {
+                        log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+                                         "Result {} and MeterId {}", cp.toString(), result, meterId);
+                    }
+                    oltMeterService.removeFromPendingMeters(devId, bpInfo);
                 }
-                oltMeterService.removeFromPendingMeters(devId, bpInfo);
+            } else {
+                log.info("No pending EAPOLs on {}", devId);
             }
         });
     }
@@ -741,7 +742,7 @@
 
     @Override
     public void clearDeviceState(DeviceId deviceId) {
-        pendingEapolForMeters.removeIf(fi -> fi.getDevId().equals(deviceId));
+        pendingEapolForDevice.remove(deviceId);
     }
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,