[VOL-3408] Retying deletion of default eapol flow if VOLTHA is taking too long to respond.
Creating a list of subscriber that failed programming
This way we can be sure that subscribers eventually get provisioned.
Change-Id: I3dd6707ea0809496c2eb748723d4f50e19770327
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 42c53b7..3bb561a 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -29,6 +29,8 @@
import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
@@ -107,6 +109,8 @@
property = {
DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
+ EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
+ EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
})
public class Olt
extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
@@ -168,6 +172,11 @@
**/
protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+ /**
+ * Default amounts of eapol retry.
+ **/
+ protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+
private final DeviceListener deviceListener = new InternalDeviceListener();
private final ClusterEventListener clusterListener = new InternalClusterListener();
@@ -181,8 +190,10 @@
"olt-installer-%d"));
protected ExecutorService eventExecutor;
+ protected ExecutorService retryExecutor;
private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
+ private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
private Set<SubscriberFlowInfo> pendingSubscribers;
@@ -190,6 +201,7 @@
public void activate(ComponentContext context) {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
"events-%d", log));
+ retryExecutor = Executors.newCachedThreadPool();
modified(context);
ApplicationId appId = coreService.registerApplication(APP_NAME);
@@ -206,6 +218,12 @@
.withApplicationId(appId)
.build();
+ failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+ .withName("volt-failed-subs")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
pendingSubscribers = Sets.newConcurrentHashSet();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
@@ -239,6 +257,7 @@
deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(AccessDeviceEvent.class);
eventExecutor.shutdown();
+ retryExecutor.shutdown();
log.info("Stopped");
}
@@ -253,7 +272,12 @@
String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
- log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}", defaultBpId, multicastServiceName);
+ String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
+ eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
+ Integer.parseInt(eapolDeleteRetryNew.trim());
+
+ log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
+ defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
} catch (Exception e) {
log.error("Error while modifying the properties", e);
@@ -285,23 +309,62 @@
return false;
}
- //delete Eapol authentication flow with default bandwidth
- //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
- //install subscriber flows
- CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
- oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId, filterFuture,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
- filterFuture.thenAcceptAsync(filterStatus -> {
- if (filterStatus == null) {
- provisionUniTagList(connectPoint, uplinkPort.number(), sub);
- } else {
- log.error("The filtering future did not complete properly {} " +
- "subscriber on {} is not provisioned", filterStatus, connectPoint);
- }
- });
+ // delete Eapol authentication flow with default bandwidth
+ // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
+ // retry deletion if it fails/times-out
+ retryExecutor.execute(new DeleteEapolInstallSub(connectPoint,
+ uplinkPort, sub, 1));
return true;
}
+ private class DeleteEapolInstallSub implements Runnable {
+ ConnectPoint cp;
+ Port uplinkPort;
+ SubscriberAndDeviceInformation sub;
+ private int attemptNumber;
+
+ DeleteEapolInstallSub(ConnectPoint cp, Port uplinkPort,
+ SubscriberAndDeviceInformation sub,
+ int attemptNumber) {
+ this.cp = cp;
+ this.uplinkPort = uplinkPort;
+ this.sub = sub;
+ this.attemptNumber = attemptNumber;
+ }
+
+ @Override
+ public void run() {
+ CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+ oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
+ defaultBpId, filterFuture,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+ false);
+ filterFuture.thenAcceptAsync(filterStatus -> {
+ if (filterStatus == null) {
+ log.info("Default eapol flow deleted in attempt {} of {}"
+ + "... provisioning subscriber flows {}",
+ attemptNumber, eapolDeleteRetryMaxAttempts, cp);
+ provisionUniTagList(cp, uplinkPort.number(), sub);
+ } else {
+ if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
+ log.warn("The filtering future failed {} for subscriber {}"
+ + "... retrying {} of {} attempts",
+ filterStatus, cp, attemptNumber, eapolDeleteRetryMaxAttempts);
+ retryExecutor.execute(
+ new DeleteEapolInstallSub(cp, uplinkPort, sub,
+ attemptNumber + 1));
+ } else {
+ log.error("The filtering future failed {} for subscriber {}"
+ + "after {} attempts. Subscriber provisioning failed",
+ filterStatus, cp, eapolDeleteRetryMaxAttempts);
+ sub.uniTagList().forEach(ut -> failedSubs.put(cp, ut));
+ }
+ }
+ });
+ }
+
+ }
+
@Override
public boolean removeSubscriber(ConnectPoint connectPoint) {
log.info("Call to un-provision subscriber at {}", connectPoint);
@@ -439,6 +502,14 @@
}
@Override
+ public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
+ return failedSubs.stream()
+ .collect(collectingAndThen(
+ groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+ ImmutableMap::copyOf));
+ }
+
+ @Override
public List<DeviceId> fetchOlts() {
// look through all the devices and find the ones that are OLTs as per Sadis
List<DeviceId> olts = new ArrayList<>();