[VOL-3621] Using a blocking queue for the pending subscriber and eapol flows
Change-Id: I08106752b236ab507f54cf1ded2ccf5c56c4e4b8
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 58344e6..bf99fe4 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -15,28 +15,6 @@
*/
package org.opencord.olt.impl;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Dictionary;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
import org.onlab.packet.IPv6;
@@ -84,7 +62,18 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import com.google.common.collect.Sets;
+import java.util.Dictionary;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Provisions flow rules on access devices.
@@ -167,7 +156,8 @@
protected ApplicationId appId;
protected BaseInformationService<BandwidthProfileInformation> bpService;
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
+ private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
+ = new ConcurrentHashMap<>();
@Activate
public void activate(ComponentContext context) {
@@ -459,7 +449,13 @@
.setPonCTag(vlanId).build(),
null, null,
null, bpInfo.id());
- pendingEapolForMeters.add(fi);
+ pendingEapolForDevice.compute(devId, (id, queue) -> {
+ if (queue == null) {
+ queue = new LinkedBlockingQueue<>();
+ }
+ queue.add(fi);
+ return queue;
+ });
//If false the meter is already being installed, skipping installation
if (!oltMeterService.checkAndAddPendingMeter(devId, bpInfo)) {
@@ -495,27 +491,32 @@
}
meterFuture.thenAcceptAsync(result -> {
//for each pending eapol flow we check if the meter is there.
- //TODO possibly the iterator gets taken at time t0, while the sub is added at time t1
- // and thus never considered
- Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
- while (eapIterator.hasNext()) {
- SubscriberFlowInfo fi = eapIterator.next();
- log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
- if (result == null) {
- MeterId mId = oltMeterService
- .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
- if (mId != null) {
- log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
- cp.toString());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
- eapIterator.remove();
+ BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(devId);
+ if (queue != null) {
+ while (true) {
+ SubscriberFlowInfo fi = queue.remove();
+ if (fi == null) {
+ break;
}
- } else {
- log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
- "Result {} and MeterId {}", cp.toString(), result, meterId);
- eapIterator.remove();
+ //TODO this might return the reference and not the actual object
+ // so it can be actually swapped underneath us.
+ log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
+ if (result == null) {
+ MeterId mId = oltMeterService
+ .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
+ if (mId != null) {
+ log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
+ cp.toString());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+ }
+ } else {
+ log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+ "Result {} and MeterId {}", cp.toString(), result, meterId);
+ }
+ oltMeterService.removeFromPendingMeters(devId, bpInfo);
}
- oltMeterService.removeFromPendingMeters(devId, bpInfo);
+ } else {
+ log.info("No pending EAPOLs on {}", devId);
}
});
}
@@ -741,7 +742,7 @@
@Override
public void clearDeviceState(DeviceId deviceId) {
- pendingEapolForMeters.removeIf(fi -> fi.getDevId().equals(deviceId));
+ pendingEapolForDevice.remove(deviceId);
}
private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,