[VOL-4210] Extending compute timeout of the pending subscribers map to avoid getting stuck checking if a subscriber is pending during installation of already pending ones when meter is created
Change-Id: I6748a56e8746301277b28286c0f5e7ee397ba845
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index f11ac21..74c5811 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -52,6 +52,7 @@
import org.onosproject.net.host.HostListener;
import org.onosproject.net.host.HostService;
import org.onosproject.net.meter.MeterId;
+import org.onosproject.store.primitives.DefaultConsistentMap;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
@@ -125,6 +126,7 @@
private static final String NO_UPLINK_PORT = "No uplink port found for OLT device {}";
public static final int HASH_WEIGHT = 10;
+ public static final long PENDING_SUBS_MAP_TIMEOUT_MILLIS = 30000L;
private final Logger log = getLogger(getClass());
@@ -268,12 +270,15 @@
.withSerializer(Serializer.using(macSerializer))
.withApplicationId(appId)
.build();
-
- pendingSubscribersForDevice = storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
+ //TODO possibly use consistent multimap with compute on key and element to avoid
+ // lock on all objects of the map, while instead obtaining and releasing the lock
+ // on a per subscriber basis.
+ pendingSubscribersForDevice = new DefaultConsistentMap<>(
+ storageService.<DeviceId, BlockingQueue<SubscriberFlowInfo>>consistentMapBuilder()
.withName("volt-pending-subs")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
- .build().asJavaMap();
+ .buildAsyncMap(), PENDING_SUBS_MAP_TIMEOUT_MILLIS).asJavaMap();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
if (sadisService != null) {
@@ -483,7 +488,7 @@
private boolean removeSubscriber(AccessDevicePort subscriberPort) {
log.info("Call to un-provision subscriber at {}", subscriberPort);
-
+ //TODO we need to check if the subscriber is pending
// Get the subscriber connected to this port from the local cache
// If we don't know about the subscriber there's no need to remove it
DeviceId deviceId = subscriberPort.deviceId();
@@ -1005,14 +1010,14 @@
}
private void createMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
- log.debug("Creating Meter with {} on {}", bwpInfo, deviceId);
+ log.info("Creating Meter with {} on {}", bwpInfo, deviceId);
CompletableFuture<Object> meterFuture = new CompletableFuture<>();
MeterId meterId = oltMeterService.createMeter(deviceId, bwpInfo,
meterFuture);
meterFuture.thenAcceptAsync(result -> {
- log.debug("Meter Future for {} has completed", meterId);
+ log.info("Meter Future for {} has completed", meterId);
pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
// iterate through the subscribers on hold
if (queue != null && !queue.isEmpty()) {
@@ -1021,7 +1026,7 @@
// it can be actually swapped underneath us.
SubscriberFlowInfo fi = queue.peek();
if (fi == null) {
- log.debug("No more subscribers pending on {}", deviceId);
+ log.info("No more subscribers pending on {}", deviceId);
queue = new LinkedBlockingQueue<>();
break;
}
@@ -1049,6 +1054,7 @@
// created there may or may not have been a meterId
// depending on whether the meter was created or
// not at that time.
+ //TODO possibly spawn this in a separate thread.
fi.setUpMeterId(upMeterId);
fi.setDownMeterId(downMeterId);
fi.setUpOltMeterId(upOltMeterId);
@@ -1086,7 +1092,7 @@
* @param subscriberFlowInfo relevant information for subscriber
*/
private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
- log.debug("Provisioning subscriber flows based on {}", subscriberFlowInfo);
+ log.info("Provisioning subscriber flows based on {}", subscriberFlowInfo);
UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
if (tagInfo.getIsDhcpRequired()) {
Optional<MacAddress> macAddress =
diff --git a/web/src/main/java/org/opencord/olt/rest/OltWebResource.java b/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
index 54af4ac..7b19b18 100644
--- a/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
+++ b/web/src/main/java/org/opencord/olt/rest/OltWebResource.java
@@ -22,6 +22,7 @@
import org.onosproject.rest.AbstractWebResource;
import org.opencord.olt.AccessDeviceService;
import org.opencord.olt.AccessSubscriberId;
+import org.slf4j.Logger;
import java.util.Optional;
import javax.ws.rs.Consumes;
@@ -34,6 +35,7 @@
import javax.ws.rs.core.Response;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* OLT REST APIs.
@@ -42,6 +44,9 @@
@Path("oltapp")
public class OltWebResource extends AbstractWebResource {
+ private final Logger log = getLogger(getClass());
+
+
/**
* Provision a subscriber.
*
@@ -62,7 +67,8 @@
try {
service.provisionSubscriber(connectPoint);
} catch (Exception e) {
- return Response.status(INTERNAL_SERVER_ERROR).build();
+ log.error("Can't provision subscriber {} due to exception", connectPoint, e);
+ return Response.status(INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
}
return ok("").build();
}
@@ -83,7 +89,12 @@
DeviceId deviceId = DeviceId.deviceId(device);
PortNumber portNumber = PortNumber.portNumber(port);
ConnectPoint connectPoint = new ConnectPoint(deviceId, portNumber);
- service.removeSubscriber(connectPoint);
+ try {
+ service.removeSubscriber(connectPoint);
+ } catch (Exception e) {
+ log.error("Can't remove subscriber {} due to exception", connectPoint, e);
+ return Response.status(INTERNAL_SERVER_ERROR).entity(e.getMessage()).build();
+ }
return Response.noContent().build();
}