[VOL-3148] Only one meter creation is attempted for a given bandwidth profile.
Achieved through coordination of subscribers and meters.
Change-Id: I0377633a4ff5f34e817ec53382431d4a74d974c1
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index 2886025..d5e8e32 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -34,12 +34,14 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.Dictionary;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -183,10 +185,14 @@
private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
+ private Set<DeviceBandwidthProfile> pendingMeters;
+ private Set<SubscriberFlowInfo> pendingSubscribers;
+
@Activate
public void activate(ComponentContext context) {
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("onos/olt",
"events-%d", log));
+
modified(context);
ApplicationId appId = coreService.registerApplication(APP_NAME);
componentConfigService.registerProperties(getClass());
@@ -202,6 +208,8 @@
.withApplicationId(appId)
.build();
+ pendingMeters = ConcurrentHashMap.newKeySet();
+ pendingSubscribers = Sets.newConcurrentHashSet();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
subsService = sadisService.getSubscriberInfoService();
@@ -233,6 +241,7 @@
clusterService.removeListener(clusterListener);
deviceService.removeListener(deviceListener);
eventDispatcher.removeSink(AccessDeviceEvent.class);
+ eventExecutor.shutdown();
log.info("Stopped");
}
@@ -258,7 +267,7 @@
@Override
public boolean provisionSubscriber(ConnectPoint connectPoint) {
- log.info("Call to provision subscriber at {}", connectPoint);
+ log.info("Call to provisioning subscriber at {}", connectPoint);
DeviceId deviceId = connectPoint.deviceId();
PortNumber subscriberPortNo = connectPoint.port();
@@ -569,7 +578,7 @@
private void provisionUniTagList(ConnectPoint connectPoint, PortNumber uplinkPort,
SubscriberAndDeviceInformation sub) {
- log.info("Provisioning vlans for subscriber {} on dev/port: {}", sub, connectPoint);
+ log.debug("Provisioning vlans for subscriber {} on dev/port: {}", sub, connectPoint);
if (sub.uniTagList() == null || sub.uniTagList().isEmpty()) {
log.warn("Unitaglist doesn't exist for the subscriber {}", sub.id());
@@ -629,7 +638,7 @@
private void handleSubscriberFlows(DeviceId deviceId, PortNumber uplinkPort, PortNumber subscriberPort,
UniTagInformation tagInfo) {
- log.info("Provisioning vlan-based flows for the uniTagInformation {}", tagInfo);
+ log.debug("Provisioning vlan-based flows for the uniTagInformation {}", tagInfo);
Port port = deviceService.getPort(deviceId, subscriberPort);
@@ -642,122 +651,192 @@
return;
}
- ConnectPoint cp = new ConnectPoint(deviceId, subscriberPort);
-
BandwidthProfileInformation upstreamBpInfo =
getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
BandwidthProfileInformation downstreamBpInfo =
getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
+ if (upstreamBpInfo == null) {
+ log.warn("No meter installed since no Upstream BW Profile definition found for "
+ + "ctag {} stag {} tpId {} and Device/port: {}:{}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+ tagInfo.getTechnologyProfileId(), deviceId,
+ subscriberPort);
+ return;
+ }
+ if (downstreamBpInfo == null) {
+ log.warn("No meter installed since no Downstream BW Profile definition found for "
+ + "ctag {} stag {} tpId {} and Device/port: {}:{}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+ tagInfo.getTechnologyProfileId(), deviceId,
+ subscriberPort);
+ return;
+ }
- CompletableFuture<Object> upstreamMeterFuture = new CompletableFuture<>();
- CompletableFuture<Object> downsteamMeterFuture = new CompletableFuture<>();
+ // check for meterIds for the upstream and downstream bandwidth profiles
+ MeterId upMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
+ MeterId downMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(deviceId, uplinkPort, subscriberPort,
+ tagInfo, downMeterId, upMeterId,
+ downstreamBpInfo.id(), upstreamBpInfo.id());
+
+ if (upMeterId != null && downMeterId != null) {
+ log.debug("Meters are existing for upstream {} and downstream {}",
+ upstreamBpInfo.id(), downstreamBpInfo.id());
+ handleSubFlowsWithMeters(fi);
+ } else {
+ log.debug("Adding {} to pending subs", fi);
+ // one or both meters are not ready. It's possible they are in the process of being
+ // created for other subscribers that share the same bandwidth profile.
+ pendingSubscribers.add(fi);
+
+ // queue up the meters to be created
+ if (upMeterId == null) {
+ log.debug("Missing meter for upstream {}", upstreamBpInfo.id());
+ checkAndCreateDevMeter(new DeviceBandwidthProfile(deviceId, upstreamBpInfo));
+ }
+ if (downMeterId == null) {
+ log.debug("Missing meter for downstream {}", downstreamBpInfo.id());
+ checkAndCreateDevMeter(new DeviceBandwidthProfile(deviceId, downstreamBpInfo));
+ }
+ }
+ }
+ private void checkAndCreateDevMeter(DeviceBandwidthProfile dm) {
+ if (pendingMeters.contains(dm)) {
+ return;
+ }
+ pendingMeters.add(dm);
+ createMeter(dm);
+ }
+
+ private void createMeter(DeviceBandwidthProfile dm) {
+ log.debug("Creating Meter {} from queue", dm);
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+ MeterId meterId = oltMeterService.createMeter(dm.getDevId(), dm.getBwInfo(),
+ meterFuture);
+
+ meterFuture.thenAcceptAsync(result -> {
+ // iterate through the subscribers on hold
+ Iterator<SubscriberFlowInfo> subsIterator = pendingSubscribers.iterator();
+ while (subsIterator.hasNext()) {
+ SubscriberFlowInfo fi = subsIterator.next();
+ if (result == null) {
+ // meter install sent to device
+ log.debug("Meter {} installed for bw {}", meterId, dm.getBwInfo());
+
+ MeterId upMeterId = oltMeterService
+ .getMeterIdFromBpMapping(dm.getDevId(), fi.getUpBpInfo());
+ MeterId downMeterId = oltMeterService
+ .getMeterIdFromBpMapping(dm.getDevId(), fi.getDownBpInfo());
+ if (upMeterId != null && downMeterId != null) {
+ log.debug("Provisioning subscriber after meter {}" +
+ "installation and both meters are present " +
+ "upstream {} and downstream {}",
+ meterId, upMeterId, downMeterId);
+ // put in the meterIds because when fi was first
+ // created there may or may not have been a meterId
+ // depending on whether the meter was created or
+ // not at that time.
+ fi.setUpMeterId(upMeterId);
+ fi.setDownMeterId(downMeterId);
+ handleSubFlowsWithMeters(fi);
+ subsIterator.remove();
+ }
+ pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+ } else {
+ // meter install failed
+ log.error("Addition of subscriber {} failed due to meter " +
+ "{} with result {}", fi, meterId, result);
+ subsIterator.remove();
+ pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+ }
+ }
+ });
+ }
+ /**
+ * Add subscriber flows given meter information for both upstream and
+ * downstream directions.
+ *
+ * @param subscriberFlowInfo relevant information for subscriber
+ */
+ private void handleSubFlowsWithMeters(SubscriberFlowInfo subscriberFlowInfo) {
+ log.debug("Provisioning subscriber flows based on {}", subscriberFlowInfo);
+ UniTagInformation tagInfo = subscriberFlowInfo.getTagInfo();
CompletableFuture<ObjectiveError> upFuture = new CompletableFuture<>();
CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
- MeterId upstreamMeterId = oltMeterService.createMeter(deviceId, upstreamBpInfo, upstreamMeterFuture);
-
- MeterId downstreamMeterId = oltMeterService.createMeter(deviceId, downstreamBpInfo, downsteamMeterFuture);
-
- upstreamMeterFuture.thenAcceptAsync(result -> {
- if (result == null) {
- log.info("Upstream Meter {} is sent to the device {}. " +
- "Sending subscriber flows.", upstreamMeterId, deviceId);
-
- ForwardingObjective.Builder upFwd =
- oltFlowService.createUpBuilder(uplinkPort, subscriberPort, upstreamMeterId, tagInfo);
-
- flowObjectiveService.forward(deviceId, upFwd.add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Upstream flow installed successfully");
- upFuture.complete(null);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- upFuture.complete(error);
- }
- }));
-
- } else if (upstreamBpInfo == null) {
- log.warn("No meter installed since no Upstream BW Profile definition found for " +
- "ctag {} stag {} tpId {} and Device/port: {}:{}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(),
- tagInfo.getTechnologyProfileId(),
- deviceId, subscriberPort);
- } else {
- log.warn("Meter installation error while sending upstream flows. " +
- "Result {} and MeterId {}", result, upstreamMeterId);
+ ForwardingObjective.Builder upFwd =
+ oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
+ flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Upstream flow installed successfully {}", subscriberFlowInfo);
+ upFuture.complete(null);
}
- }).exceptionally(ex -> {
- log.error("Upstream flow failed: " + ex.getMessage());
- upFuture.complete(ObjectiveError.UNKNOWN);
- return null;
- });
- downsteamMeterFuture.thenAcceptAsync(result -> {
- if (result == null) {
- log.info("Downstream Meter {} is sent to the device {}. " +
- "Sending subscriber flows.", downstreamMeterId, deviceId);
-
- ForwardingObjective.Builder downFwd =
- oltFlowService.createDownBuilder(uplinkPort, subscriberPort, downstreamMeterId, tagInfo);
-
- flowObjectiveService.forward(deviceId, downFwd.add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Downstream flow installed successfully");
- downFuture.complete(null);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- downFuture.complete(error);
- }
- }));
-
- } else if (downstreamBpInfo == null) {
- log.warn("No meter installed since no Downstream BW Profile definition found for " +
- "ctag {} stag {} tpId {} and Device/port: {}:{}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(),
- tagInfo.getTechnologyProfileId(),
- deviceId, subscriberPort);
- } else {
- log.warn("Meter installation error while sending upstream flows. " +
- "Result {} and MeterId {}", result, downstreamMeterId);
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ upFuture.complete(error);
}
- }).exceptionally(ex -> {
- log.error("Downstream flow failed: " + ex.getMessage());
- downFuture.complete(ObjectiveError.UNKNOWN);
- return null;
- });
+ }));
+
+ ForwardingObjective.Builder downFwd =
+ oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo());
+ flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.debug("Downstream flow installed successfully {}", subscriberFlowInfo);
+ downFuture.complete(null);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ downFuture.complete(error);
+ }
+ }));
upFuture.thenAcceptBothAsync(downFuture, (upStatus, downStatus) -> {
AccessDeviceEvent.Type type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTERED;
if (downStatus != null) {
log.error("Flow with innervlan {} and outerVlan {} on device {} " +
"on port {} failed downstream installation: {}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(), deviceId, cp, downStatus);
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(), downStatus);
type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
} else if (upStatus != null) {
log.error("Flow with innerVlan {} and outerVlan {} on device {} " +
"on port {} failed upstream installation: {}",
- tagInfo.getPonCTag(), tagInfo.getPonSTag(), deviceId, cp, upStatus);
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(), subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(), upStatus);
type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
} else {
- log.info("Upstream and downstream data plane flows are installed successfully.");
- oltFlowService.processEapolFilteringObjectives(deviceId, subscriberPort,
+ log.debug("Upstream and downstream data plane flows are installed successfully " +
+ "for {}", subscriberFlowInfo);
+ oltFlowService.processEapolFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
tagInfo.getUpstreamBandwidthProfile(),
null, tagInfo.getPonCTag(), true);
- oltFlowService.processDhcpFilteringObjectives(deviceId, subscriberPort,
- upstreamMeterId, tagInfo, true, true);
+ oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(),
+ tagInfo, true, true);
- oltFlowService.processIgmpFilteringObjectives(deviceId, subscriberPort, upstreamMeterId, tagInfo,
- true, true);
- updateProgrammedSubscriber(cp, tagInfo, true);
- post(new AccessDeviceEvent(type, deviceId, port, tagInfo.getPonSTag(), tagInfo.getPonCTag(),
- tagInfo.getTechnologyProfileId()));
+ oltFlowService.processIgmpFilteringObjectives(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort(),
+ subscriberFlowInfo.getUpId(),
+ tagInfo, true, true);
+ updateProgrammedSubscriber(new ConnectPoint(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort()),
+ tagInfo, true);
}
+ post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(),
+ deviceService.getPort(subscriberFlowInfo.getDevId(),
+ subscriberFlowInfo.getUniPort()),
+ tagInfo.getPonSTag(), tagInfo.getPonCTag(),
+ tagInfo.getTechnologyProfileId()));
}, oltInstallers);
}
@@ -860,9 +939,9 @@
}
// Return the port that has been configured as the uplink port of this OLT in Sadis
Optional<Port> optionalPort = deviceService.getPorts(dev.id()).stream()
- .filter(port -> isNniPort(port) ||
- (port.number().toLong() == deviceInfo.uplinkPort()))
- .findFirst();
+ .filter(port -> isNniPort(port) ||
+ (port.number().toLong() == deviceInfo.uplinkPort()))
+ .findFirst();
if (optionalPort.isPresent()) {
log.trace("getUplinkPort: Found port {}", optionalPort.get());
return optionalPort.get();
@@ -964,7 +1043,7 @@
if (!isLocalLeader && event.type().equals(DeviceEvent.Type.DEVICE_AVAILABILITY_CHANGED)
&& !deviceService.isAvailable(devId) && deviceService.getPorts(devId).isEmpty()) {
log.info("Cleaning local state for non master instance upon " +
- "device disconnection {}", devId);
+ "device disconnection {}", devId);
programmedDevices.remove(devId);
// Since no mastership of the device is present upon disconnection
// the method in the FlowRuleManager only empties the local copy
@@ -1073,12 +1152,12 @@
} else {
if (deviceService.getPorts(devId).isEmpty()) {
log.info("Handling controlled device disconnection .. "
- + "flushing all state for dev:{}", devId);
+ + "flushing all state for dev:{}", devId);
handleDeviceDisconnection(dev, false);
} else {
log.info("Disconnected device has available ports .. "
- + "assuming temporary disconnection, "
- + "retaining state for device {}", devId);
+ + "assuming temporary disconnection, "
+ + "retaining state for device {}", devId);
}
}
break;
@@ -1141,4 +1220,5 @@
}
}
}
+
}