[VOL-3408] Retying deletion of default eapol flow if VOLTHA is taking too long to respond.
Creating a list of subscriber that failed programming

This way we can be sure that subscribers eventually get provisioned.

Change-Id: I3dd6707ea0809496c2eb748723d4f50e19770327
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 42c53b7..3bb561a 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -29,6 +29,8 @@
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_BP_ID_DEFAULT;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME;
 import static org.opencord.olt.impl.OsgiPropertyConstants.DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS;
+import static org.opencord.olt.impl.OsgiPropertyConstants.EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.ArrayList;
@@ -107,6 +109,8 @@
         property = {
                 DEFAULT_BP_ID + ":String=" + DEFAULT_BP_ID_DEFAULT,
                 DEFAULT_MCAST_SERVICE_NAME + ":String=" + DEFAULT_MCAST_SERVICE_NAME_DEFAULT,
+                EAPOL_DELETE_RETRY_MAX_ATTEMPS + ":Integer=" +
+                        EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT,
         })
 public class Olt
         extends AbstractListenerManager<AccessDeviceEvent, AccessDeviceListener>
@@ -168,6 +172,11 @@
      **/
     protected String multicastServiceName = DEFAULT_MCAST_SERVICE_NAME_DEFAULT;
 
+    /**
+     * Default amounts of eapol retry.
+     **/
+    protected int eapolDeleteRetryMaxAttempts = EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT;
+
     private final DeviceListener deviceListener = new InternalDeviceListener();
     private final ClusterEventListener clusterListener = new InternalClusterListener();
 
@@ -181,8 +190,10 @@
                                                                                         "olt-installer-%d"));
 
     protected ExecutorService eventExecutor;
+    protected ExecutorService retryExecutor;
 
     private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
+    private ConsistentMultimap<ConnectPoint, UniTagInformation> failedSubs;
 
     private Set<SubscriberFlowInfo> pendingSubscribers;
 
@@ -190,6 +201,7 @@
     public void activate(ComponentContext context) {
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
                                                                         "events-%d", log));
+        retryExecutor = Executors.newCachedThreadPool();
 
         modified(context);
         ApplicationId appId = coreService.registerApplication(APP_NAME);
@@ -206,6 +218,12 @@
                 .withApplicationId(appId)
                 .build();
 
+        failedSubs = storageService.<ConnectPoint, UniTagInformation>consistentMultimapBuilder()
+                .withName("volt-failed-subs")
+                .withSerializer(Serializer.using(serializer))
+                .withApplicationId(appId)
+                .build();
+
         pendingSubscribers = Sets.newConcurrentHashSet();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
@@ -239,6 +257,7 @@
         deviceService.removeListener(deviceListener);
         eventDispatcher.removeSink(AccessDeviceEvent.class);
         eventExecutor.shutdown();
+        retryExecutor.shutdown();
         log.info("Stopped");
     }
 
@@ -253,7 +272,12 @@
             String mcastSN = get(properties, DEFAULT_MCAST_SERVICE_NAME);
             multicastServiceName = isNullOrEmpty(mcastSN) ? multicastServiceName : mcastSN;
 
-            log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}", defaultBpId, multicastServiceName);
+            String eapolDeleteRetryNew = get(properties, EAPOL_DELETE_RETRY_MAX_ATTEMPS);
+            eapolDeleteRetryMaxAttempts = isNullOrEmpty(eapolDeleteRetryNew) ? EAPOL_DELETE_RETRY_MAX_ATTEMPS_DEFAULT :
+                    Integer.parseInt(eapolDeleteRetryNew.trim());
+
+            log.debug("OLT properties: DefaultBpId: {}, MulticastServiceName: {}, EapolDeleteRetryMaxAttempts: {}",
+                      defaultBpId, multicastServiceName, eapolDeleteRetryMaxAttempts);
 
         } catch (Exception e) {
             log.error("Error while modifying the properties", e);
@@ -285,23 +309,62 @@
             return false;
         }
 
-        //delete Eapol authentication flow with default bandwidth
-        //wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
-        //install subscriber flows
-        CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
-        oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPortNo, defaultBpId, filterFuture,
-                                                       VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
-        filterFuture.thenAcceptAsync(filterStatus -> {
-            if (filterStatus == null) {
-                provisionUniTagList(connectPoint, uplinkPort.number(), sub);
-            } else {
-                log.error("The filtering future did not complete properly {} " +
-                                  "subscriber on {} is not provisioned", filterStatus, connectPoint);
-            }
-        });
+        // delete Eapol authentication flow with default bandwidth
+        // wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
+        // retry deletion if it fails/times-out
+        retryExecutor.execute(new DeleteEapolInstallSub(connectPoint,
+                                                        uplinkPort, sub, 1));
         return true;
     }
 
+    private class DeleteEapolInstallSub implements Runnable {
+        ConnectPoint cp;
+        Port uplinkPort;
+        SubscriberAndDeviceInformation sub;
+        private int attemptNumber;
+
+        DeleteEapolInstallSub(ConnectPoint cp, Port uplinkPort,
+                              SubscriberAndDeviceInformation sub,
+                              int attemptNumber) {
+            this.cp = cp;
+            this.uplinkPort = uplinkPort;
+            this.sub = sub;
+            this.attemptNumber = attemptNumber;
+        }
+
+        @Override
+        public void run() {
+            CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture();
+            oltFlowService.processEapolFilteringObjectives(cp.deviceId(), cp.port(),
+                                                     defaultBpId, filterFuture,
+                                                     VlanId.vlanId(EAPOL_DEFAULT_VLAN),
+                                                     false);
+            filterFuture.thenAcceptAsync(filterStatus -> {
+                if (filterStatus == null) {
+                    log.info("Default eapol flow deleted in attempt {} of {}"
+                            + "... provisioning subscriber flows {}",
+                             attemptNumber, eapolDeleteRetryMaxAttempts, cp);
+                    provisionUniTagList(cp, uplinkPort.number(), sub);
+                } else {
+                    if (attemptNumber <= eapolDeleteRetryMaxAttempts) {
+                        log.warn("The filtering future failed {} for subscriber {}"
+                                + "... retrying {} of {} attempts",
+                                 filterStatus, cp, attemptNumber, eapolDeleteRetryMaxAttempts);
+                        retryExecutor.execute(
+                                         new DeleteEapolInstallSub(cp, uplinkPort, sub,
+                                                                   attemptNumber + 1));
+                    } else {
+                        log.error("The filtering future failed {} for subscriber {}"
+                                + "after {} attempts. Subscriber provisioning failed",
+                                  filterStatus, cp, eapolDeleteRetryMaxAttempts);
+                        sub.uniTagList().forEach(ut -> failedSubs.put(cp, ut));
+                    }
+                }
+            });
+        }
+
+    }
+
     @Override
     public boolean removeSubscriber(ConnectPoint connectPoint) {
         log.info("Call to un-provision subscriber at {}", connectPoint);
@@ -439,6 +502,14 @@
     }
 
     @Override
+    public ImmutableMap<ConnectPoint, Set<UniTagInformation>> getFailedSubs() {
+        return failedSubs.stream()
+                .collect(collectingAndThen(
+                        groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
+                        ImmutableMap::copyOf));
+    }
+
+    @Override
     public List<DeviceId> fetchOlts() {
         // look through all the devices and find the ones that are OLTs as per Sadis
         List<DeviceId> olts = new ArrayList<>();