[VOL-3408] Retying deletion of default eapol flow if VOLTHA is taking too long to respond.
Creating a list of subscriber that failed programming
This way we can be sure that subscribers eventually get provisioned.
Change-Id: I3dd6707ea0809496c2eb748723d4f50e19770327
diff --git a/api/src/main/java/org/opencord/olt/AccessDeviceService.java b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
index 06c5188..4878d97 100644
--- a/api/src/main/java/org/opencord/olt/AccessDeviceService.java
+++ b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
@@ -96,4 +96,13 @@
*/
ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs();
+ /**
+ * Returns information about subscribers that have NOT been programmed in the
+ * data-plane. It shows all uni tag information list of the subscribers even if
+ * these have not been programmed, meaning no flows have been sent to the device.
+ *
+ * @return an immutable map of locations and subscriber information
+ */
+ ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs();
+
}
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java b/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
new file mode 100644
index 0000000..5d6bf3d
--- /dev/null
+++ b/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.ConnectPoint;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Shows subscriber information for those subscriber which have been programmed
+ * in the data-plane.
+ */
+@Service
+@Command(scope = "onos", name = "volt-failed-subscribers",
+ description = "Shows subscribers awaiting for programming in the dataplane")
+public class ShowFailedSubscribersCommand extends AbstractShellCommand {
+
+ @Override
+ protected void doExecute() {
+ AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+ Map<ConnectPoint, Set<UniTagInformation>> info = service.getFailedSubs();
+ info.forEach(this::display);
+ }
+
+ private void display(ConnectPoint cp, Set<UniTagInformation> uniTagInformation) {
+ uniTagInformation.forEach(uniTag ->
+ print("location=%s tagInformation=%s", cp, uniTag));
+ }
+}
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 42c53b7..3bb561a 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -29,6 +29,8 @@
import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
@@ -107,6 +109,8 @@
property = {
DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
+ EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
+ EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
})
public class Olt
extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
@@ -168,6 +172,11 @@
**/
protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+ /**
+ * Default amounts of eapol retry.
+ **/
+ protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+
private final DeviceListener deviceListener = new InternalDeviceListener();
private final ClusterEventListener clusterListener = new InternalClusterListener();
@@ -181,8 +190,10 @@
"olt-installer-%d"));
protected ExecutorService eventExecutor;
+ protected ExecutorService retryExecutor;
private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
+ private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
private Set<SubscriberFlowInfo> pendingSubscribers;
@@ -190,6 +201,7 @@
public void activate(ComponentContext context) {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
"events-%d", log));
+ retryExecutor = Executors.newCachedThreadPool();
modified(context);
ApplicationId appId = coreService.registerApplication(APP_NAME);
@@ -206,6 +218,12 @@
.withApplicationId(appId)
.build();
+ failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+ .withName("volt-failed-subs")
+ .withSerializer(Serializer.using(serializer))
+ .withApplicationId(appId)
+ .build();
+
pendingSubscribers = Sets.newConcurrentHashSet();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
@@ -239,6 +257,7 @@
deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(AccessDeviceEvent.class);
eventExecutor.shutdown();
+ retryExecutor.shutdown();
log.info("Stopped");
}
@@ -253,7 +272,12 @@
String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
- log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}", defaultBpId, multicastServiceName);
+ String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
+ eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
+ Integer.parseInt(eapolDeleteRetryNew.trim());
+
+ log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
+ defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
} catch (Exception e) {
log.error("Error while modifying the properties", e);
@@ -285,23 +309,62 @@
return false;
}
- //delete Eapol authentication flow with default bandwidth
- //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
- //install subscriber flows
- CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
- oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId, filterFuture,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
- filterFuture.thenAcceptAsync(filterStatus -> {
- if (filterStatus == null) {
- provisionUniTagList(connectPoint, uplinkPort.number(), sub);
- } else {
- log.error("The filtering future did not complete properly {} " +
- "subscriber on {} is not provisioned", filterStatus, connectPoint);
- }
- });
+ // delete Eapol authentication flow with default bandwidth
+ // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
+ // retry deletion if it fails/times-out
+ retryExecutor.execute(new DeleteEapolInstallSub(connectPoint,
+ uplinkPort, sub, 1));
return true;
}
+ private class DeleteEapolInstallSub implements Runnable {
+ ConnectPoint cp;
+ Port uplinkPort;
+ SubscriberAndDeviceInformation sub;
+ private int attemptNumber;
+
+ DeleteEapolInstallSub(ConnectPoint cp, Port uplinkPort,
+ SubscriberAndDeviceInformation sub,
+ int attemptNumber) {
+ this.cp = cp;
+ this.uplinkPort = uplinkPort;
+ this.sub = sub;
+ this.attemptNumber = attemptNumber;
+ }
+
+ @Override
+ public void run() {
+ CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+ oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
+ defaultBpId, filterFuture,
+ VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+ false);
+ filterFuture.thenAcceptAsync(filterStatus -> {
+ if (filterStatus == null) {
+ log.info("Default eapol flow deleted in attempt {} of {}"
+ + "... provisioning subscriber flows {}",
+ attemptNumber, eapolDeleteRetryMaxAttempts, cp);
+ provisionUniTagList(cp, uplinkPort.number(), sub);
+ } else {
+ if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
+ log.warn("The filtering future failed {} for subscriber {}"
+ + "... retrying {} of {} attempts",
+ filterStatus, cp, attemptNumber, eapolDeleteRetryMaxAttempts);
+ retryExecutor.execute(
+ new DeleteEapolInstallSub(cp, uplinkPort, sub,
+ attemptNumber + 1));
+ } else {
+ log.error("The filtering future failed {} for subscriber {}"
+ + "after {} attempts. Subscriber provisioning failed",
+ filterStatus, cp, eapolDeleteRetryMaxAttempts);
+ sub.uniTagList().forEach(ut -> failedSubs.put(cp, ut));
+ }
+ }
+ });
+ }
+
+ }
+
@Override
public boolean removeSubscriber(ConnectPoint connectPoint) {
log.info("Call to un-provision subscriber at {}", connectPoint);
@@ -439,6 +502,14 @@
}
@Override
+ public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
+ return failedSubs.stream()
+ .collect(collectingAndThen(
+ groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+ ImmutableMap::copyOf));
+ }
+
+ @Override
public List<DeviceId> fetchOlts() {
// look through all the devices and find the ones that are OLTs as per Sadis
List<DeviceId> olts = new ArrayList<>();
diff --git a/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 37f4b3c..67bd5fa 100644
--- a/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -50,4 +50,7 @@
public static final String ENABLE_EAPOL = "enableEapol";
public static final boolean ENABLE_EAPOL_DEFAULT = true;
+
+ public static final String EAPOL_DELETE_RETRY_MAX_ATTEMPS = "eapolDeleteRetryMaxAttempts";
+ public static final int EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT = 3;
}