[VOL-3621] Using a blocking queue for the pending subscriber and eapol flows

Change-Id: I08106752b236ab507f54cf1ded2ccf5c56c4e4b8
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 2a8c6e8..408004c 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -15,34 +15,8 @@
  */
 package org.opencord.olt.impl;
 
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
 import org.onlab.packet.VlanId;
 import org.onlab.util.KryoNamespace;
 import org.onosproject.cfg.ComponentConfigService;
@@ -96,8 +70,33 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.stream.Collectors.*;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Provisions rules on access devices.
@@ -199,7 +198,7 @@
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
     private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
 
-    private Set<SubscriberFlowInfo> pendingSubscribers;
+    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
 
     @Activate
     public void activate(ComponentContext context) {
@@ -230,7 +229,7 @@
                 .withApplicationId(appId)
                 .build();
 
-        pendingSubscribers = Sets.newConcurrentHashSet();
+        pendingSubscribersForDevice = new ConcurrentHashMap<>();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
         subsService = sadisService.getSubscriberInfoService();
@@ -336,15 +335,20 @@
         if (!uniTagInformationSet.isEmpty()) {
             return true;
         }
-
-        for (SubscriberFlowInfo fi : pendingSubscribers) {
-            if (fi.getDevId().equals(connectPoint.deviceId())
-                    && fi.getUniPort().equals(connectPoint.port())) {
-                return true;
+        //Check if the subscriber is already getting provisioned
+        // so we do not provision twice
+        AtomicBoolean isPending = new AtomicBoolean(false);
+        pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
+            for (SubscriberFlowInfo fi : infos) {
+                if (fi.getUniPort().equals(connectPoint.port())) {
+                    isPending.set(true);
+                    break;
+                }
             }
-        }
+            return infos;
+        });
 
-        return false;
+        return isPending.get();
     }
 
     private class DeleteEapolInstallSub implements Runnable {
@@ -800,7 +804,14 @@
             log.debug("Adding {} on {}/{} to pending subs", fi, deviceId, subscriberPort);
             // one or both meters are not ready. It's possible they are in the process of being
             // created for other subscribers that share the same bandwidth profile.
-            pendingSubscribers.add(fi);
+            pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
+                if (queue == null) {
+                    queue = new LinkedBlockingQueue<>();
+                }
+                queue.add(fi);
+                log.info("Added {} to pending subscribers on {}/{}", fi, deviceId, subscriberPort);
+                return queue;
+            });
 
             // queue up the meters to be created
             if (upMeterId == null) {
@@ -829,42 +840,54 @@
                                                       meterFuture);
 
         meterFuture.thenAcceptAsync(result -> {
+            BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
             // iterate through the subscribers on hold
-            Iterator<SubscriberFlowInfo> subsIterator = pendingSubscribers.iterator();
-            while (subsIterator.hasNext()) {
-                SubscriberFlowInfo fi = subsIterator.next();
-                if (result == null) {
-                    // meter install sent to device
-                    log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
-
-                    MeterId upMeterId = oltMeterService
-                            .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
-                    MeterId downMeterId = oltMeterService
-                            .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
-                    if (upMeterId != null && downMeterId != null) {
-                        log.debug("Provisioning subscriber after meter {} " +
-                                          "installation and both meters are present " +
-                                          "upstream {} and downstream {} on {}/{}",
-                                  meterId, upMeterId, downMeterId, deviceId, fi.getUniPort());
-                        // put in the meterIds  because when fi was first
-                        // created there may or may not have been a meterId
-                        // depending on whether the meter was created or
-                        // not at that time.
-                        fi.setUpMeterId(upMeterId);
-                        fi.setDownMeterId(downMeterId);
-                        handleSubFlowsWithMeters(fi);
-                        subsIterator.remove();
+            if (queue != null) {
+                while (true) {
+                    //TODO this might return the reference and not the actual object so
+                    // it can be actually swapped underneath us.
+                    SubscriberFlowInfo fi = queue.peek();
+                    if (fi == null) {
+                        log.debug("No more subscribers pending on {}", deviceId);
+                        break;
                     }
-                    oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
-                } else {
-                    // meter install failed
-                    log.error("Addition of subscriber {} on {}/{} failed due to meter " +
-                                      "{} with result {}", fi, deviceId, fi.getUniPort(), meterId, result);
-                    subsIterator.remove();
-                    oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+                    if (result == null) {
+                        // meter install sent to device
+                        log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
+
+                        MeterId upMeterId = oltMeterService
+                                .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
+                        MeterId downMeterId = oltMeterService
+                                .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
+                        if (upMeterId != null && downMeterId != null) {
+                            log.debug("Provisioning subscriber after meter {} " +
+                                              "installation and both meters are present " +
+                                              "upstream {} and downstream {} on {}/{}",
+                                      meterId, upMeterId, downMeterId, deviceId, fi.getUniPort());
+                            // put in the meterIds  because when fi was first
+                            // created there may or may not have been a meterId
+                            // depending on whether the meter was created or
+                            // not at that time.
+                            fi.setUpMeterId(upMeterId);
+                            fi.setDownMeterId(downMeterId);
+                            handleSubFlowsWithMeters(fi);
+                            queue.remove(fi);
+                        }
+                        oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+                    } else {
+                        // meter install failed
+                        log.error("Addition of subscriber {} on {}/{} failed due to meter " +
+                                          "{} with result {}", fi, deviceId, fi.getUniPort(),
+                                  meterId, result);
+                        queue.remove(fi);
+                        oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+                    }
                 }
+            } else {
+                log.info("No pending subscribers on {}", deviceId);
             }
         });
+
     }
     /**
      * Add subscriber flows given meter information for both upstream and
@@ -1310,7 +1333,7 @@
             programmedDevices.remove(device.id());
             removeAllSubscribers(device.id());
             //Handle case where OLT disconnects during subscriber provisioning
-            pendingSubscribers.removeIf(fi -> fi.getDevId().equals(device.id()));
+            pendingSubscribersForDevice.remove(device.id());
             oltFlowService.clearDeviceState(device.id());
 
             //Complete meter and flow purge
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 58344e6..bf99fe4 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -15,28 +15,6 @@
  */
 package org.opencord.olt.impl;
 
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Dictionary;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
 import org.onlab.packet.EthType;
 import org.onlab.packet.IPv4;
 import org.onlab.packet.IPv6;
@@ -84,7 +62,18 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import com.google.common.collect.Sets;
+import java.util.Dictionary;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * Provisions flow rules on access devices.
@@ -167,7 +156,8 @@
     protected ApplicationId appId;
     protected BaseInformationService<BandwidthProfileInformation> bpService;
     protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
-    private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
+    private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
+            = new ConcurrentHashMap<>();
 
     @Activate
     public void activate(ComponentContext context) {
@@ -459,7 +449,13 @@
                                                                        .setPonCTag(vlanId).build(),
                                                                null, null,
                                                                null, bpInfo.id());
-                pendingEapolForMeters.add(fi);
+                pendingEapolForDevice.compute(devId, (id, queue) -> {
+                    if (queue == null) {
+                        queue = new LinkedBlockingQueue<>();
+                    }
+                    queue.add(fi);
+                    return queue;
+                });
 
                 //If false the meter is already being installed, skipping installation
                 if (!oltMeterService.checkAndAddPendingMeter(devId, bpInfo)) {
@@ -495,27 +491,32 @@
         }
         meterFuture.thenAcceptAsync(result -> {
             //for each pending eapol flow we check if the meter is there.
-            //TODO possibly the iterator gets taken at time t0, while the sub is added at time t1
-            // and thus never considered
-            Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
-            while (eapIterator.hasNext()) {
-                SubscriberFlowInfo fi = eapIterator.next();
-                log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
-                if (result == null) {
-                    MeterId mId = oltMeterService
-                            .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
-                    if (mId != null) {
-                        log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
-                                cp.toString());
-                        handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
-                        eapIterator.remove();
+            BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(devId);
+            if (queue != null) {
+                while (true) {
+                    SubscriberFlowInfo fi = queue.remove();
+                    if (fi == null) {
+                        break;
                     }
-                } else {
-                    log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
-                                     "Result {} and MeterId {}", cp.toString(), result, meterId);
-                    eapIterator.remove();
+                    //TODO this might return the reference and not the actual object
+                    // so it can be actually swapped underneath us.
+                    log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
+                    if (result == null) {
+                        MeterId mId = oltMeterService
+                                .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
+                        if (mId != null) {
+                            log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
+                                      cp.toString());
+                            handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+                        }
+                    } else {
+                        log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+                                         "Result {} and MeterId {}", cp.toString(), result, meterId);
+                    }
+                    oltMeterService.removeFromPendingMeters(devId, bpInfo);
                 }
-                oltMeterService.removeFromPendingMeters(devId, bpInfo);
+            } else {
+                log.info("No pending EAPOLs on {}", devId);
             }
         });
     }
@@ -741,7 +742,7 @@
 
     @Override
     public void clearDeviceState(DeviceId deviceId) {
-        pendingEapolForMeters.removeIf(fi -> fi.getDevId().equals(deviceId));
+        pendingEapolForDevice.remove(deviceId);
     }
 
     private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,