[VOL-3408] Retying deletion of default eapol flow if VOLTHA is taking too long to respond.
Creating a list of subscriber that failed programming

This way we can be sure that subscribers eventually get provisioned.

Change-Id: I3dd6707ea0809496c2eb748723d4f50e19770327
diff --git a/api/src/main/java/org/opencord/olt/AccessDeviceService.java b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
index 06c5188..4878d97 100644
--- a/api/src/main/java/org/opencord/olt/AccessDeviceService.java
+++ b/api/src/main/java/org/opencord/olt/AccessDeviceService.java
@@ -96,4 +96,13 @@
      */
     ImmutableMap<ConnectPoint, Set<UniTagInformation>> getProgSubs();
 
+    /**
+     * Returns information about subscribers that have NOT been programmed in the
+     * data-plane. It shows all uni tag information list of the subscribers even if
+     * these have not been programmed, meaning no flows have been sent to the device.
+     *
+     * @return an immutable map of locations and subscriber information
+     */
+    ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs();
+
 }
diff --git a/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java b/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
new file mode 100644
index 0000000..5d6bf3d
--- /dev/null
+++ b/app/src/main/java/org/opencord/olt/cli/ShowFailedSubscribersCommand.java
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.olt.cli;
+
+import org.apache.karaf.shell.api.action.Command;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
+import org.onosproject.cli.AbstractShellCommand;
+import org.onosproject.net.ConnectPoint;
+import org.opencord.olt.AccessDeviceService;
+import org.opencord.sadis.UniTagInformation;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Shows subscriber information for those subscriber which have been programmed
+ * in the data-plane.
+ */
+@Service
+@Command(scope = "onos", name = "volt-failed-subscribers",
+        description = "Shows subscribers awaiting for programming in the dataplane")
+public class ShowFailedSubscribersCommand extends AbstractShellCommand {
+
+    @Override
+    protected void doExecute() {
+        AccessDeviceService service = AbstractShellCommand.get(AccessDeviceService.class);
+        Map<ConnectPoint, Set<UniTagInformation>> info = service.getFailedSubs();
+        info.forEach(this::display);
+    }
+
+    private void display(ConnectPoint cp, Set<UniTagInformation> uniTagInformation) {
+        uniTagInformation.forEach(uniTag ->
+                                          print("location=%s tagInformation=%s", cp, uniTag));
+    }
+}
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 42c53b7..3bb561a 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -29,6 +29,8 @@
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
@@ -107,6 +109,8 @@
         property = {
                 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
                 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
+                EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
+                        EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
         })
 public class Olt
         extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
@@ -168,6 +172,11 @@
      **/
     protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
 
+    /**
+     * Default amounts of eapol retry.
+     **/
+    protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final ClusterEventListener clusterListener = new InternalClusterListener();
 
@@ -181,8 +190,10 @@
                                                                                         "olt-installer-%d"));
 
     protected ExecutorService eventExecutor;
+    protected ExecutorService retryExecutor;
 
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
+    private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
 
     private Set<SubscriberFlowInfo> pendingSubscribers;
 
@@ -190,6 +201,7 @@
     public void activate(ComponentContext context) {
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
                                                                         "events-%d", log));
+        retryExecutor = Executors.newCachedThreadPool();
 
         modified(context);
         ApplicationId appId = coreService.registerApplication(APP_NAME);
@@ -206,6 +218,12 @@
                 .withApplicationId(appId)
                 .build();
 
+        failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+                .withName("volt-failed-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
         pendingSubscribers = Sets.newConcurrentHashSet();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
@@ -239,6 +257,7 @@
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         eventExecutor.shutdown();
+        retryExecutor.shutdown();
         log.info("Stopped");
     }
 
@@ -253,7 +272,12 @@
             String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
             multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
 
-            log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}", defaultBpId, multicastServiceName);
+            String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
+            eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
+                    Integer.parseInt(eapolDeleteRetryNew.trim());
+
+            log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
+                      defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
 
         } catch (Exception e) {
             log.error("Error while modifying the properties", e);
@@ -285,23 +309,62 @@
             return false;
         }
 
-        //delete Eapol authentication flow with default bandwidth
-        //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
-        //install subscriber flows
-        CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
-        oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId, filterFuture,
-                                                       VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
-        filterFuture.thenAcceptAsync(filterStatus -> {
-            if (filterStatus == null) {
-                provisionUniTagList(connectPoint, uplinkPort.number(), sub);
-            } else {
-                log.error("The filtering future did not complete properly {} " +
-                                  "subscriber on {} is not provisioned", filterStatus, connectPoint);
-            }
-        });
+        // delete Eapol authentication flow with default bandwidth
+        // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
+        // retry deletion if it fails/times-out
+        retryExecutor.execute(new DeleteEapolInstallSub(connectPoint,
+                                                        uplinkPort, sub, 1));
         return true;
     }
 
+    private class DeleteEapolInstallSub implements Runnable {
+        ConnectPoint cp;
+        Port uplinkPort;
+        SubscriberAndDeviceInformation sub;
+        private int attemptNumber;
+
+        DeleteEapolInstallSub(ConnectPoint cp, Port uplinkPort,
+                              SubscriberAndDeviceInformation sub,
+                              int attemptNumber) {
+            this.cp = cp;
+            this.uplinkPort = uplinkPort;
+            this.sub = sub;
+            this.attemptNumber = attemptNumber;
+        }
+
+        @Override
+        public void run() {
+            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+            oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
+                                                     defaultBpId, filterFuture,
+                                                     VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+                                                     false);
+            filterFuture.thenAcceptAsync(filterStatus -> {
+                if (filterStatus == null) {
+                    log.info("Default eapol flow deleted in attempt {} of {}"
+                            + "... provisioning subscriber flows {}",
+                             attemptNumber, eapolDeleteRetryMaxAttempts, cp);
+                    provisionUniTagList(cp, uplinkPort.number(), sub);
+                } else {
+                    if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
+                        log.warn("The filtering future failed {} for subscriber {}"
+                                + "... retrying {} of {} attempts",
+                                 filterStatus, cp, attemptNumber, eapolDeleteRetryMaxAttempts);
+                        retryExecutor.execute(
+                                         new DeleteEapolInstallSub(cp, uplinkPort, sub,
+                                                                   attemptNumber + 1));
+                    } else {
+                        log.error("The filtering future failed {} for subscriber {}"
+                                + "after {} attempts. Subscriber provisioning failed",
+                                  filterStatus, cp, eapolDeleteRetryMaxAttempts);
+                        sub.uniTagList().forEach(ut -> failedSubs.put(cp, ut));
+                    }
+                }
+            });
+        }
+
+    }
+
     @Override
     public boolean removeSubscriber(ConnectPoint connectPoint) {
         log.info("Call to un-provision subscriber at {}", connectPoint);
@@ -439,6 +502,14 @@
     }
 
     @Override
+    public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
+        return failedSubs.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
+    }
+
+    @Override
     public List<DeviceId> fetchOlts() {
         // look through all the devices and find the ones that are OLTs as per Sadis
         List<DeviceId> olts = new ArrayList<>();
diff --git a/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
index 37f4b3c..67bd5fa 100644
--- a/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/olt/impl/OsgiPropertyConstants.java
@@ -50,4 +50,7 @@
 
     public static final String ENABLE_EAPOL = "enableEapol";
     public static final boolean ENABLE_EAPOL_DEFAULT = true;
+
+    public static final String EAPOL_DELETE_RETRY_MAX_ATTEMPS = "eapolDeleteRetryMaxAttempts";
+    public static final int EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT = 3;
 }