[VOL-4180] Multi UNI feature implemented to OLT application.
Change-Id: I3d45719ebdce304ba94652ed9de553e40d76a77c
EAPOL flow bug-fixed
review fixes finshed
Multi UNI feature implemented to OLT application.
- It's possible to fetch a meter by annotations. (OltPipeline)
- New meters can be created for bandwidth profiles of OLT device.
- Olt meterId is transported via writeMetadata so that voltha/rw-core can parse it and assign the correct meters to ONU and OLT flows.
Change-Id: Ia6c9909b5f03b0f3fe329bd11580f891bfab3a32
diff --git a/impl/src/main/java/org/opencord/olt/impl/Olt.java b/impl/src/main/java/org/opencord/olt/impl/Olt.java
index 70a3753..f11ac21 100644
--- a/impl/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/impl/src/main/java/org/opencord/olt/impl/Olt.java
@@ -405,6 +405,7 @@
pendingSubscribersForDevice.computeIfPresent(connectPoint.deviceId(), (id, infos) -> {
for (SubscriberFlowInfo fi : infos) {
if (fi.getUniPort().equals(connectPoint.port())) {
+ log.debug("Subscriber is already pending, {}", fi);
isPending.set(true);
break;
}
@@ -434,7 +435,7 @@
public void run() {
CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
oltFlowService.processEapolFilteringObjectives(subscriberPort,
- defaultBpId, filterFuture,
+ defaultBpId, Optional.empty(), filterFuture,
VlanId.vlanId(EAPOL_DEFAULT_VLAN),
false);
filterFuture.thenAcceptAsync(filterStatus -> {
@@ -511,14 +512,17 @@
unprovisionVlans(uplinkPort, subscriberPort, uniTag);
// remove eapol with subscriber bandwidth profile
+ Optional<String> upstreamOltBw = uniTag.getUpstreamOltBandwidthProfile() == null ?
+ Optional.empty() : Optional.of(uniTag.getUpstreamOltBandwidthProfile());
oltFlowService.processEapolFilteringObjectives(subscriberPort,
uniTag.getUpstreamBandwidthProfile(),
+ upstreamOltBw,
null, uniTag.getPonCTag(), false);
if (subscriberPort.port() != null && subscriberPort.isEnabled()) {
// reinstall eapol with default bandwidth profile
- oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
- null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+ oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
+ null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
} else {
log.debug("Port {} is no longer enabled or it's unavailable. Not "
+ "reprogramming default eapol flow", connectPoint);
@@ -556,8 +560,8 @@
//wait until Eapol rule with defaultBpId is removed to install subscriber-based rules
//install subscriber flows
CompletableFuture<ObjectiveError> filterFuture = new CompletableFuture<>();
- oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId,
- filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
+ oltFlowService.processEapolFilteringObjectives(subscriberPort, defaultBpId, Optional.empty(),
+ filterFuture, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
filterFuture.thenAcceptAsync(filterStatus -> {
if (filterStatus == null) {
provisionUniTagInformation(uplinkPort, subscriberPort, cTag.get(), sTag.get(), tpId.get());
@@ -697,6 +701,10 @@
.getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamBandwidthProfile());
MeterId downstreamMeterId = oltMeterService
.getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamBandwidthProfile());
+ MeterId upstreamOltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, uniTag.getUpstreamOltBandwidthProfile());
+ MeterId downstreamOltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, uniTag.getDownstreamOltBandwidthProfile());
Optional<SubscriberFlowInfo> waitingMacSubFlowInfo =
getAndRemoveWaitingMacSubFlowInfoForCTag(new ConnectPoint(deviceId, subscriberPort.number()),
@@ -707,7 +715,7 @@
"waitingMacSubFlowInfo:{}", waitingMacSubFlowInfo.get());
CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
oltFlowService.processDhcpFilteringObjectives(subscriberPort,
- upstreamMeterId, uniTag, false, true, Optional.of(dhcpFuture));
+ upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.of(dhcpFuture));
dhcpFuture.thenAcceptAsync(dhcpStatus -> {
AccessDeviceEvent.Type type;
if (dhcpStatus == null) {
@@ -731,16 +739,19 @@
}
ForwardingObjective.Builder upFwd =
- oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, uniTag);
+ oltFlowService.createUpBuilder(uplink, subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag);
Optional<MacAddress> macAddress = getMacAddress(deviceId, subscriberPort, uniTag);
ForwardingObjective.Builder downFwd =
- oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, uniTag, macAddress);
+ oltFlowService.createDownBuilder(uplink, subscriberPort, downstreamMeterId, downstreamOltMeterId,
+ uniTag, macAddress);
- oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
+ oltFlowService.processIgmpFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
+ false, true);
oltFlowService.processDhcpFilteringObjectives(subscriberPort,
- upstreamMeterId, uniTag, false, true, Optional.empty());
- oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, uniTag, false, true);
+ upstreamMeterId, upstreamOltMeterId, uniTag, false, true, Optional.empty());
+ oltFlowService.processPPPoEDFilteringObjectives(subscriberPort, upstreamMeterId, upstreamOltMeterId, uniTag,
+ false, true);
flowObjectiveService.forward(deviceId, upFwd.remove(new ObjectiveContext() {
@Override
@@ -885,6 +896,10 @@
getBandwidthProfileInformation(tagInfo.getUpstreamBandwidthProfile());
BandwidthProfileInformation downstreamBpInfo =
getBandwidthProfileInformation(tagInfo.getDownstreamBandwidthProfile());
+ BandwidthProfileInformation upstreamOltBpInfo =
+ getBandwidthProfileInformation(tagInfo.getUpstreamOltBandwidthProfile());
+ BandwidthProfileInformation downstreamOltBpInfo =
+ getBandwidthProfileInformation(tagInfo.getDownstreamOltBandwidthProfile());
if (upstreamBpInfo == null) {
log.warn("No meter installed since no Upstream BW Profile definition found for "
+ "ctag {} stag {} tpId {} on {}",
@@ -898,17 +913,45 @@
tagInfo.getTechnologyProfileId(), subscriberPort);
return;
}
+ if ((upstreamOltBpInfo != null && downstreamOltBpInfo == null) ||
+ (upstreamOltBpInfo == null && downstreamOltBpInfo != null)) {
+ log.warn("No meter installed since only one olt BW Profile definition found for "
+ + "ctag {} stag {} tpId {} and Device/port: {}:{}",
+ tagInfo.getPonCTag(), tagInfo.getPonSTag(),
+ tagInfo.getTechnologyProfileId(), deviceId,
+ subscriberPort);
+ return;
+ }
+
+ MeterId upOltMeterId = null;
+ MeterId downOltMeterId = null;
// check for meterIds for the upstream and downstream bandwidth profiles
MeterId upMeterId = oltMeterService
.getMeterIdFromBpMapping(deviceId, upstreamBpInfo.id());
MeterId downMeterId = oltMeterService
.getMeterIdFromBpMapping(deviceId, downstreamBpInfo.id());
- SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
- tagInfo, downMeterId, upMeterId,
- downstreamBpInfo.id(), upstreamBpInfo.id());
- if (upMeterId != null && downMeterId != null) {
+ if (upstreamOltBpInfo != null) {
+ // Multi UNI service
+ upOltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, upstreamOltBpInfo.id());
+ downOltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, downstreamOltBpInfo.id());
+ } else {
+ // NOT Multi UNI service
+ log.debug("OLT bandwidth profiles fields are set to ONU bandwidth profiles");
+ upstreamOltBpInfo = upstreamBpInfo;
+ downstreamOltBpInfo = downstreamBpInfo;
+ upOltMeterId = upMeterId;
+ downOltMeterId = downMeterId;
+ }
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(uplinkPort, subscriberPort,
+ tagInfo, downMeterId, upMeterId, downOltMeterId, upOltMeterId,
+ downstreamBpInfo.id(), upstreamBpInfo.id(),
+ downstreamOltBpInfo.id(), upstreamOltBpInfo.id());
+
+ if (upMeterId != null && downMeterId != null && upOltMeterId != null && downOltMeterId != null) {
log.debug("Meters are existing for upstream {} and downstream {} on {}",
upstreamBpInfo.id(), downstreamBpInfo.id(), subscriberPort);
handleSubFlowsWithMeters(fi);
@@ -925,21 +968,37 @@
return queue;
});
+ List<BandwidthProfileInformation> bws = new ArrayList<>();
// queue up the meters to be created
if (upMeterId == null) {
log.debug("Missing meter for upstream {} on {}", upstreamBpInfo.id(), subscriberPort);
- checkAndCreateDevMeter(deviceId, upstreamBpInfo);
+ bws.add(upstreamBpInfo);
}
if (downMeterId == null) {
log.debug("Missing meter for downstream {} on {}", downstreamBpInfo.id(), subscriberPort);
- checkAndCreateDevMeter(deviceId, downstreamBpInfo);
+ bws.add(downstreamBpInfo);
}
+ if (upOltMeterId == null) {
+ log.debug("Missing meter for upstreamOlt {} on {}", upstreamOltBpInfo.id(), subscriberPort);
+ bws.add(upstreamOltBpInfo);
+ }
+ if (downOltMeterId == null) {
+ log.debug("Missing meter for downstreamOlt {} on {}", downstreamOltBpInfo.id(), subscriberPort);
+ bws.add(downstreamOltBpInfo);
+ }
+ bws.stream().distinct().forEach(bw -> checkAndCreateDevMeter(deviceId, bw));
}
}
private void checkAndCreateDevMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
+ log.debug("Checking and Creating Meter with {} on {}", bwpInfo, deviceId);
+ if (bwpInfo == null) {
+ log.error("Can't create meter. Bandwidth profile is null for device : {}", deviceId);
+ return;
+ }
//If false the meter is already being installed, skipping installation
if (!oltMeterService.checkAndAddPendingMeter(deviceId, bwpInfo)) {
+ log.debug("Meter is already being installed on {} for {}", deviceId, bwpInfo);
return;
}
createMeter(deviceId, bwpInfo);
@@ -953,52 +1012,69 @@
meterFuture);
meterFuture.thenAcceptAsync(result -> {
- BlockingQueue<SubscriberFlowInfo> queue = pendingSubscribersForDevice.get(deviceId);
- // iterate through the subscribers on hold
- if (queue != null) {
- while (true) {
- //TODO this might return the reference and not the actual object so
- // it can be actually swapped underneath us.
- SubscriberFlowInfo fi = queue.peek();
- if (fi == null) {
- log.debug("No more subscribers pending on {}", deviceId);
- pendingSubscribersForDevice.replace(deviceId, queue);
- break;
- }
- if (result == null) {
- // meter install sent to device
- log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
+ log.debug("Meter Future for {} has completed", meterId);
+ pendingSubscribersForDevice.compute(deviceId, (id, queue) -> {
+ // iterate through the subscribers on hold
+ if (queue != null && !queue.isEmpty()) {
+ while (true) {
+ //TODO this might return the reference and not the actual object so
+ // it can be actually swapped underneath us.
+ SubscriberFlowInfo fi = queue.peek();
+ if (fi == null) {
+ log.debug("No more subscribers pending on {}", deviceId);
+ queue = new LinkedBlockingQueue<>();
+ break;
+ }
+ if (result == null) {
+ // meter install sent to device
+ log.debug("Meter {} installed for bw {} on {}", meterId, bwpInfo, deviceId);
- MeterId upMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
- MeterId downMeterId = oltMeterService
- .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
- if (upMeterId != null && downMeterId != null) {
- log.debug("Provisioning subscriber after meter {} " +
- "installation and both meters are present " +
- "upstream {} and downstream {} on {}",
- meterId, upMeterId, downMeterId, fi.getUniPort());
- // put in the meterIds because when fi was first
- // created there may or may not have been a meterId
- // depending on whether the meter was created or
- // not at that time.
- fi.setUpMeterId(upMeterId);
- fi.setDownMeterId(downMeterId);
- handleSubFlowsWithMeters(fi);
+ MeterId upMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getUpBpInfo());
+ MeterId downMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getDownBpInfo());
+ MeterId upOltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getUpOltBpInfo());
+ MeterId downOltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(deviceId, fi.getDownOltBpInfo());
+ if (upMeterId != null && downMeterId != null &&
+ upOltMeterId != null && downOltMeterId != null) {
+ log.debug("Provisioning subscriber after meter {} " +
+ "installation and all meters are present " +
+ "upstream {} , downstream {} , oltUpstream {} " +
+ "and oltDownstream {} on {}",
+ meterId, upMeterId, downMeterId, upOltMeterId,
+ downOltMeterId, fi.getUniPort());
+ // put in the meterIds because when fi was first
+ // created there may or may not have been a meterId
+ // depending on whether the meter was created or
+ // not at that time.
+ fi.setUpMeterId(upMeterId);
+ fi.setDownMeterId(downMeterId);
+ fi.setUpOltMeterId(upOltMeterId);
+ fi.setDownOltMeterId(downOltMeterId);
+ handleSubFlowsWithMeters(fi);
+ queue.remove(fi);
+ } else {
+ log.debug("Not all meters for {} are yet installed up {}, " +
+ "down {}, oltUp {}, oltDown {}", fi, upMeterId,
+ downMeterId, upOltMeterId, downOltMeterId);
+ }
+ oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
+ } else {
+ // meter install failed
+ log.error("Addition of subscriber {} on {} failed due to meter " +
+ "{} with result {}", fi, fi.getUniPort(), meterId, result);
+ oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
queue.remove(fi);
}
- oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
- } else {
- // meter install failed
- log.error("Addition of subscriber {} on {} failed due to meter " +
- "{} with result {}", fi, fi.getUniPort(), meterId, result);
- queue.remove(fi);
- oltMeterService.removeFromPendingMeters(deviceId, bwpInfo);
}
+ } else {
+ log.info("No pending subscribers on {}", deviceId);
+ queue = new LinkedBlockingQueue<>();
}
- } else {
- log.info("No pending subscribers on {}", deviceId);
- }
+ return queue;
+ });
});
}
@@ -1027,7 +1103,8 @@
CompletableFuture<ObjectiveError> dhcpFuture = new CompletableFuture<>();
oltFlowService.processDhcpFilteringObjectives(subscriberFlowInfo.getUniPort(),
- subscriberFlowInfo.getUpId(), tagInfo, true, true, Optional.of(dhcpFuture));
+ subscriberFlowInfo.getUpId(), subscriberFlowInfo.getUpOltId(),
+ tagInfo, true, true, Optional.of(dhcpFuture));
dhcpFuture.thenAcceptAsync(dhcpStatus -> {
if (dhcpStatus != null) {
log.error("Dhcp Objective failed for {}: {}", subscriberFlowInfo, dhcpStatus);
@@ -1063,8 +1140,8 @@
CompletableFuture<ObjectiveError> downFuture = new CompletableFuture<>();
ForwardingObjective.Builder upFwd =
- oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort,
- subscriberFlowInfo.getUpId(), subscriberFlowInfo.getTagInfo());
+ oltFlowService.createUpBuilder(subscriberFlowInfo.getNniPort(), uniPort, subscriberFlowInfo.getUpId(),
+ subscriberFlowInfo.getUpOltId(), subscriberFlowInfo.getTagInfo());
flowObjectiveService.forward(subscriberFlowInfo.getDevId(), upFwd.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
@@ -1080,7 +1157,8 @@
ForwardingObjective.Builder downFwd =
oltFlowService.createDownBuilder(subscriberFlowInfo.getNniPort(), uniPort,
- subscriberFlowInfo.getDownId(), subscriberFlowInfo.getTagInfo(), macAddress);
+ subscriberFlowInfo.getDownId(), subscriberFlowInfo.getDownOltId(),
+ subscriberFlowInfo.getTagInfo(), macAddress);
flowObjectiveService.forward(subscriberFlowInfo.getDevId(), downFwd.add(new ObjectiveContext() {
@Override
public void onSuccess(Objective objective) {
@@ -1106,20 +1184,23 @@
type = AccessDeviceEvent.Type.SUBSCRIBER_UNI_TAG_REGISTRATION_FAILED;
} else {
log.debug("Upstream and downstream data plane flows are installed successfully on {}", uniPort);
+ Optional<String> upstreamOltBw = tagInfo.getUpstreamOltBandwidthProfile() == null ?
+ Optional.empty() : Optional.of(tagInfo.getUpstreamOltBandwidthProfile());
oltFlowService.processEapolFilteringObjectives(uniPort, tagInfo.getUpstreamBandwidthProfile(),
- null, tagInfo.getPonCTag(), true);
-
+ upstreamOltBw, null,
+ tagInfo.getPonCTag(), true);
if (!tagInfo.getEnableMacLearning()) {
oltFlowService.processDhcpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
- tagInfo, true, true, Optional.empty());
+ subscriberFlowInfo.getUpOltId(), tagInfo, true, true, Optional.empty());
}
oltFlowService.processIgmpFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
- tagInfo, true, true);
+ subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
oltFlowService.processPPPoEDFilteringObjectives(uniPort, subscriberFlowInfo.getUpId(),
- tagInfo, true, true);
+ subscriberFlowInfo.getUpOltId(), tagInfo, true, true);
+
updateProgrammedSubscriber(uniPort, tagInfo, true);
}
post(new AccessDeviceEvent(type, subscriberFlowInfo.getDevId(), uniPort.port(),
@@ -1228,8 +1309,8 @@
AccessDevicePort port = new AccessDevicePort(p, AccessDevicePort.Type.UNI);
if (!programmedSubs.containsKey(new ConnectPoint(dev.id(), p.number()))) {
log.info("Creating Eapol on {}", port);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
+ oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+ null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), true);
} else {
log.debug("Subscriber Eapol on {} is already provisioned, not installing default", port);
}
@@ -1465,7 +1546,7 @@
if (port.isEnabled() && !port.number().equals(PortNumber.LOCAL)) {
log.info("eapol will be sent for port added {}", port);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId,
+ oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
null,
VlanId.vlanId(EAPOL_DEFAULT_VLAN),
true);
@@ -1487,9 +1568,8 @@
if (uniTagInformationSet == null || uniTagInformationSet.isEmpty()) {
log.info("No subscriber provisioned on port {} in PORT_REMOVED event, " +
"removing default EAPOL flow", port);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN),
- false);
+ oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+ null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), false);
} else {
removeSubscriber(port);
}
@@ -1516,17 +1596,17 @@
if (!port.number().equals(PortNumber.LOCAL)) {
log.info("eapol will be {} updated for {} with default vlan {}",
(port.isEnabled()) ? "added" : "removed", port, EAPOL_DEFAULT_VLAN);
- oltFlowService.processEapolFilteringObjectives(port, defaultBpId, null,
- VlanId.vlanId(EAPOL_DEFAULT_VLAN),
- port.isEnabled());
+ oltFlowService.processEapolFilteringObjectives(port, defaultBpId, Optional.empty(),
+ null, VlanId.vlanId(EAPOL_DEFAULT_VLAN), port.isEnabled());
}
} else {
log.info("eapol will be {} updated for {}", (port.isEnabled()) ? "added" : "removed",
port);
for (UniTagInformation uniTag : uniTagInformationSet) {
oltFlowService.processEapolFilteringObjectives(port,
- uniTag.getUpstreamBandwidthProfile(), null,
- uniTag.getPonCTag(), port.isEnabled());
+ uniTag.getUpstreamBandwidthProfile(),
+ Optional.of(uniTag.getUpstreamOltBandwidthProfile()),
+ null, uniTag.getPonCTag(), port.isEnabled());
}
}
if (port.isEnabled()) {