[VOL-3621] Using a blocking queue for the pending subscriber and eapol flows
Change-Id: I08106752b236ab507f54cf1ded2ccf5c56c4e4b8
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 2a8c6e8..408004c 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -15,34 +15,8 @@
*/
package org.opencord.olt.impl;
-import static com.google.common.base.Preconditions.checkNotNull;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
-import static java.util.stream.Collectors.collectingAndThen;
-import static java.util.stream.Collectors.groupingBy;
-import static java.util.stream.Collectors.mapping;
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.groupedThreads;
-import static org.opencord.olt.impl.OsgiPropertyConstants.*;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Dictionary;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
import org.onlab.packet.VlanId;
import org.onlab.util.KryoNamespace;
import org.onosproject.cfg.ComponentConfigService;
@@ -96,8 +70,33 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.Sets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Dictionary;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor;
+import static java.util.stream.Collectors.*;
+import static org.onlab.util.Tools.get;
+import static org.onlab.util.Tools.groupedThreads;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Provisions rules on access devices.
@@ -199,7 +198,7 @@
private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
- private Set<SubscriberFlowInfo> pendingSubscribers;
+ private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingSubscribersForDevice;
@Activate
public void activate(ComponentContext context) {
@@ -230,7 +229,7 @@
.withApplicationId(appId)
.build();
- pendingSubscribers = Sets.newConcurrentHashSet();
+ pendingSubscribersForDevice = new ConcurrentHashMap<>();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
subsService = sadisService.getSubscriberInfoService();
@@ -336,15 +335,20 @@
if (!uniTagInformationSet.isEmpty()) {
return true;
}
-
- for (SubscriberFlowInfo fi : pendingSubscribers) {
- if (fi.getDevId().equals(connectPoint.deviceId())
- && fi.getUniPort().equals(connectPoint.port())) {
- return true;
+ //Check if the subscriber is already getting provisioned
+ // so we do not provision twice
+ AtomicBoolean isPending = new AtomicBoolean(false);
+ pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
+ for (SubscriberFlowInfo fi : infos) {
+ if (fi.getUniPort().equals(connectPoint.port())) {
+ isPending.set(true);
+ break;
+ }
}
- }
+ return infos;
+ });
- return false;
+ return isPending.get();
}
private class DeleteEapolInstallSub implements Runnable {
@@ -800,7 +804,14 @@
log.debug("Adding {} on {}/{} to pending subs", fi, deviceId, subscriberPort);
// one or both meters are not ready. It's possible they are in the process of being
// created for other subscribers that share the same bandwidth profile.
- pendingSubscribers.add(fi);
+ pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
+ if (queue == null) {
+ queue = new LinkedBlockingQueue<>();
+ }
+ queue.add(fi);
+ log.info("Added {} to pending subscribers on {}/{}", fi, deviceId, subscriberPort);
+ return queue;
+ });
// queue up the meters to be created
if (upMeterId == null) {
@@ -829,42 +840,54 @@
meterFuture);
meterFuture.thenAcceptAsync(result -> {
+ BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
// iterate through the subscribers on hold
- Iterator<SubscriberFlowInfo> subsIterator = pendingSubscribers.iterator();
- while (subsIterator.hasNext()) {
- SubscriberFlowInfo fi = subsIterator.next();
- if (result == null) {
- // meter install sent to device
- log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
-
- MeterId upMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
- MeterId downMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
- if (upMeterId != null && downMeterId != null) {
- log.debug("Provisioning subscriber after meter {} " +
- "installation and both meters are present " +
- "upstream {} and downstream {} on {}/{}",
- meterId, upMeterId, downMeterId, deviceId, fi.getUniPort());
- // put in the meterIds because when fi was first
- // created there may or may not have been a meterId
- // depending on whether the meter was created or
- // not at that time.
- fi.setUpMeterId(upMeterId);
- fi.setDownMeterId(downMeterId);
- handleSubFlowsWithMeters(fi);
- subsIterator.remove();
+ if (queue != null) {
+ while (true) {
+ //TODO this might return the reference and not the actual object so
+ // it can be actually swapped underneath us.
+ SubscriberFlowInfo fi = queue.peek();
+ if (fi == null) {
+ log.debug("No more subscribers pending on {}", deviceId);
+ break;
}
- oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
- } else {
- // meter install failed
- log.error("Addition of subscriber {} on {}/{} failed due to meter " +
- "{} with result {}", fi, deviceId, fi.getUniPort(), meterId, result);
- subsIterator.remove();
- oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+ if (result == null) {
+ // meter install sent to device
+ log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
+
+ MeterId upMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
+ MeterId downMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
+ if (upMeterId != null && downMeterId != null) {
+ log.debug("Provisioning subscriber after meter {} " +
+ "installation and both meters are present " +
+ "upstream {} and downstream {} on {}/{}",
+ meterId, upMeterId, downMeterId, deviceId, fi.getUniPort());
+ // put in the meterIds because when fi was first
+ // created there may or may not have been a meterId
+ // depending on whether the meter was created or
+ // not at that time.
+ fi.setUpMeterId(upMeterId);
+ fi.setDownMeterId(downMeterId);
+ handleSubFlowsWithMeters(fi);
+ queue.remove(fi);
+ }
+ oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+ } else {
+ // meter install failed
+ log.error("Addition of subscriber {} on {}/{} failed due to meter " +
+ "{} with result {}", fi, deviceId, fi.getUniPort(),
+ meterId, result);
+ queue.remove(fi);
+ oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+ }
}
+ } else {
+ log.info("No pending subscribers on {}", deviceId);
}
});
+
}
/**
* Add subscriber flows given meter information for both upstream and
@@ -1310,7 +1333,7 @@
programmedDevices.remove(device.id());
removeAllSubscribers(device.id());
//Handle case where OLT disconnects during subscriber provisioning
- pendingSubscribers.removeIf(fi -> fi.getDevId().equals(device.id()));
+ pendingSubscribersForDevice.remove(device.id());
oltFlowService.clearDeviceState(device.id());
//Complete meter and flow purge
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 58344e6..bf99fe4 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -15,28 +15,6 @@
*/
package org.opencord.olt.impl;
-import static com.google.common.base.Strings.isNullOrEmpty;
-import static org.onlab.util.Tools.get;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID;
-import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_TP_ID_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_ON_NNI_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V4_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_DHCP_V6_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_EAPOL_DEFAULT;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI;
-import static org.opencord.olt.impl.OsgiPropertyConstants.ENABLE_IGMP_ON_NNI_DEFAULT;
-import static org.slf4j.LoggerFactory.getLogger;
-
-import java.util.Dictionary;
-import java.util.Iterator;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.CompletableFuture;
-
import org.onlab.packet.EthType;
import org.onlab.packet.IPv4;
import org.onlab.packet.IPv6;
@@ -84,7 +62,18 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import com.google.common.collect.Sets;
+import java.util.Dictionary;
+import java.util.Properties;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import static com.google.common.base.Strings.isNullOrEmpty;
+import static org.onlab.util.Tools.get;
+import static org.opencord.olt.impl.OsgiPropertyConstants.*;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* Provisions flow rules on access devices.
@@ -167,7 +156,8 @@
protected ApplicationId appId;
protected BaseInformationService<BandwidthProfileInformation> bpService;
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
- private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
+ private ConcurrentMap<DeviceId, BlockingQueue<SubscriberFlowInfo>> pendingEapolForDevice
+ = new ConcurrentHashMap<>();
@Activate
public void activate(ComponentContext context) {
@@ -459,7 +449,13 @@
.setPonCTag(vlanId).build(),
null, null,
null, bpInfo.id());
- pendingEapolForMeters.add(fi);
+ pendingEapolForDevice.compute(devId, (id, queue) -> {
+ if (queue == null) {
+ queue = new LinkedBlockingQueue<>();
+ }
+ queue.add(fi);
+ return queue;
+ });
//If false the meter is already being installed, skipping installation
if (!oltMeterService.checkAndAddPendingMeter(devId, bpInfo)) {
@@ -495,27 +491,32 @@
}
meterFuture.thenAcceptAsync(result -> {
//for each pending eapol flow we check if the meter is there.
- //TODO possibly the iterator gets taken at time t0, while the sub is added at time t1
- // and thus never considered
- Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
- while (eapIterator.hasNext()) {
- SubscriberFlowInfo fi = eapIterator.next();
- log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
- if (result == null) {
- MeterId mId = oltMeterService
- .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
- if (mId != null) {
- log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
- cp.toString());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
- eapIterator.remove();
+ BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(devId);
+ if (queue != null) {
+ while (true) {
+ SubscriberFlowInfo fi = queue.remove();
+ if (fi == null) {
+ break;
}
- } else {
- log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
- "Result {} and MeterId {}", cp.toString(), result, meterId);
- eapIterator.remove();
+ //TODO this might return the reference and not the actual object
+ // so it can be actually swapped underneath us.
+ log.debug("handing pending eapol on {}/{} for {}", fi.getDevId(), fi.getUniPort(), fi);
+ if (result == null) {
+ MeterId mId = oltMeterService
+ .getMeterIdFromBpMapping(devId, fi.getUpBpInfo());
+ if (mId != null) {
+ log.debug("Meter installation completed for subscriber on {}, handling EAPOL trap flow",
+ cp.toString());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+ }
+ } else {
+ log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+ "Result {} and MeterId {}", cp.toString(), result, meterId);
+ }
+ oltMeterService.removeFromPendingMeters(devId, bpInfo);
}
- oltMeterService.removeFromPendingMeters(devId, bpInfo);
+ } else {
+ log.info("No pending EAPOLs on {}", devId);
}
});
}
@@ -741,7 +742,7 @@
@Override
public void clearDeviceState(DeviceId deviceId) {
- pendingEapolForMeters.removeIf(fi -> fi.getDevId().equals(deviceId));
+ pendingEapolForDevice.remove(deviceId);
}
private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,