[VOL-4210] Extending compute timeout of the pending subscribers map to avoid getting stuck checking if a subscriber is pending during installation of already pending ones when meter is created

Change-Id: I6748a56e8746301277b28286c0f5e7ee397ba845
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index f11ac21..74c5811 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -52,6 +52,7 @@
 import org.onosproject.net.host.HostListener;
 import org.onosproject.net.host.HostService;
 import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.primitives.DefaultConsistentMap;
 import org.onosproject.store.serializers.KryoNamespaces;
 import org.onosproject.store.service.ConsistentMultimap;
 import org.onosproject.store.service.Serializer;
@@ -125,6 +126,7 @@
     private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
 
     public static final int HASH_WEIGHT = 10;
+    public static final long PENDING_SUBS_MAP_TIMEOUT_MILLIS = 30000L;
 
     private final Logger log = getLogger(getClass());
 
@@ -268,12 +270,15 @@
                 .withSerializer(Serializer.using(macSerializer))
                 .withApplicationId(appId)
                 .build();
-
-        pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
+        //TODO possibly use consistent multimap with compute on key and element to avoid
+        // lock on all objects of the map, while instead obtaining and releasing the lock
+        // on a per subscriber basis.
+        pendingSubscribersForDevice = new DefaultConsistentMap<>(
+                storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
                 .withName("volt-pending-subs")
                 .withSerializer(Serializer.using(serializer))
                 .withApplicationId(appId)
-                .build().asJavaMap();
+                .buildAsyncMap(), PENDING_SUBS_MAP_TIMEOUT_MILLIS).asJavaMap();
         eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
 
         if (sadisService != null) {
@@ -483,7 +488,7 @@
 
     private boolean removeSubscriber(AccessDevicePort subscriberPort) {
         log.info("Call to un-provision subscriber at {}", subscriberPort);
-
+        //TODO we need to check if the subscriber is pending
         // Get the subscriber connected to this port from the local cache
         // If we don't know about the subscriber there's no need to remove it
         DeviceId deviceId = subscriberPort.deviceId();
@@ -1005,14 +1010,14 @@
     }
 
     private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
-        log.debug("Creating Meter with {} on {}", bwpInfo, deviceId);
+        log.info("Creating Meter with {} on {}", bwpInfo, deviceId);
         CompletableFuture<Object> meterFuture = new CompletableFuture<>();
 
         MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
                                                       meterFuture);
 
         meterFuture.thenAcceptAsync(result -> {
-            log.debug("Meter Future for {} has completed", meterId);
+            log.info("Meter Future for {} has completed", meterId);
             pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
                 // iterate through the subscribers on hold
                 if (queue != null && !queue.isEmpty()) {
@@ -1021,7 +1026,7 @@
                         // it can be actually swapped underneath us.
                         SubscriberFlowInfo fi = queue.peek();
                         if (fi == null) {
-                            log.debug("No more subscribers pending on {}", deviceId);
+                            log.info("No more subscribers pending on {}", deviceId);
                             queue = new LinkedBlockingQueue<>();
                             break;
                         }
@@ -1049,6 +1054,7 @@
                                 // created there may or may not have been a meterId
                                 // depending on whether the meter was created or
                                 // not at that time.
+                                //TODO possibly spawn this in a separate thread.
                                 fi.setUpMeterId(upMeterId);
                                 fi.setDownMeterId(downMeterId);
                                 fi.setUpOltMeterId(upOltMeterId);
@@ -1086,7 +1092,7 @@
      * @param subscriberFlowInfo relevant information for subscriber
      */
     private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
-        log.debug("Provisioning subscriber flows based on {}", subscriberFlowInfo);
+        log.info("Provisioning subscriber flows based on {}", subscriberFlowInfo);
         UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
         if (tagInfo.getIsDhcpRequired()) {
             Optional<MacAddress> macAddress =
diff --git a/web/src/main/java/org/opencord/olt/rest/OltWebResource.java b/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
index 54af4ac..7b19b18 100644
--- a/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
+++ b/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
@@ -22,6 +22,7 @@
 import org.onosproject.rest.AbstractWebResource;
 import org.opencord.olt.AccessDeviceService;
 import org.opencord.olt.AccessSubscriberId;
+import org.slf4j.Logger;
 
 import java.util.Optional;
 import javax.ws.rs.Consumes;
@@ -34,6 +35,7 @@
 import javax.ws.rs.core.Response;
 
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * OLT REST APIs.
@@ -42,6 +44,9 @@
 @Path("oltapp")
 public class OltWebResource extends AbstractWebResource {
 
+    private final Logger log = getLogger(getClass());
+
+
     /**
      * Provision a subscriber.
      *
@@ -62,7 +67,8 @@
         try {
             service.provisionSubscriber(connectPoint);
         } catch (Exception e) {
-            return Response.status(INTERNAL_SERVER_ERROR).build();
+            log.error("Can't provision subscriber {} due to exception", connectPoint, e);
+            return Response.status(INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
         }
         return ok("").build();
     }
@@ -83,7 +89,12 @@
         DeviceId deviceId = DeviceId.deviceId(device);
         PortNumber portNumber = PortNumber.portNumber(port);
         ConnectPoint connectPoint = new ConnectPoint(deviceId, portNumber);
-        service.removeSubscriber(connectPoint);
+        try {
+            service.removeSubscriber(connectPoint);
+        } catch (Exception e) {
+            log.error("Can't remove subscriber {} due to exception", connectPoint, e);
+            return Response.status(INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
+        }
         return Response.noContent().build();
     }