[VOL-4180] Multi UNI feature implemented to OLT application.
Change-Id: I3d45719ebdce304ba94652ed9de553e40d76a77c
EAPOL flow bug-fixed
review fixes finshed
Multi UNI feature implemented to OLT application.
- It's possible to fetch a meter by annotations. (OltPipeline)
- New meters can be created for bandwidth profiles of OLT device.
- Olt meterId is transported via writeMetadata so that voltha/rw-core can parse it and assign the correct meters to ONU and OLT flows.
Change-Id: Ia6c9909b5f03b0f3fe329bd11580f891bfab3a32
diff --git a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 466d363..1868421 100644
--- a/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/impl/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -27,7 +27,9 @@
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
+import org.onosproject.net.Annotations;
import org.onosproject.net.ConnectPoint;
+import org.onosproject.net.DefaultAnnotations;
import org.onosproject.net.DeviceId;
import org.onosproject.net.PortNumber;
import org.onosproject.net.device.DeviceService;
@@ -66,8 +68,11 @@
import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
+import java.util.Arrays;
import java.util.Dictionary;
+import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
@@ -271,6 +276,7 @@
@Override
public void processDhcpFilteringObjectives(AccessDevicePort port,
MeterId upstreamMeterId,
+ MeterId upstreamOltMeterId,
UniTagInformation tagInformation,
boolean install,
boolean upstream,
@@ -306,8 +312,8 @@
byte protocol = IPv4.PROTOCOL_UDP;
addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
- upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
- vlanPcp, upstream, install, dhcpFuture);
+ upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
+ vlanPcp, upstream, install, dhcpFuture);
}
if (enableDhcpV6) {
@@ -318,14 +324,14 @@
byte protocol = IPv6.PROTOCOL_UDP;
addDhcpFilteringObjectives(port, udpSrc, udpDst, ethType,
- upstreamMeterId, techProfileId, protocol, cTag, unitagMatch,
- vlanPcp, upstream, install, dhcpFuture);
+ upstreamMeterId, upstreamOltMeterId, techProfileId, protocol, cTag, unitagMatch,
+ vlanPcp, upstream, install, dhcpFuture);
}
}
private void addDhcpFilteringObjectives(AccessDevicePort port, int udpSrc, int udpDst,
- EthType ethType, MeterId upstreamMeterId, int techProfileId, byte protocol,
- VlanId cTag, VlanId unitagMatch,
+ EthType ethType, MeterId upstreamMeterId, MeterId upstreamOltMeterId,
+ int techProfileId, byte protocol, VlanId cTag, VlanId unitagMatch,
Byte vlanPcp, boolean upstream,
boolean install, Optional<CompletableFuture<ObjectiveError>> dhcpFuture) {
@@ -337,7 +343,7 @@
}
if (techProfileId != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId), 0);
+ treatmentBuilder.writeMetadata(createTechProfValueForWm(unitagMatch, techProfileId, upstreamOltMeterId), 0);
}
FilteringObjective.Builder dhcpUpstreamBuilder = (install ? builder.permit() : builder.deny())
@@ -389,6 +395,7 @@
@Override
public void processPPPoEDFilteringObjectives(AccessDevicePort port,
MeterId upstreamMeterId,
+ MeterId upstreamOltMeterId,
UniTagInformation tagInformation,
boolean install,
boolean upstream) {
@@ -420,7 +427,7 @@
}
if (techProfileId != NONE_TP_ID) {
- treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId), 0);
+ treatmentBuilder.writeMetadata(createTechProfValueForWm(cTag, techProfileId, upstreamOltMeterId), 0);
}
DefaultFilteringObjective.Builder pppoedBuilder = (install ? builder.permit() : builder.deny())
@@ -459,6 +466,7 @@
@Override
public void processIgmpFilteringObjectives(AccessDevicePort port,
MeterId upstreamMeterId,
+ MeterId upstreamOltMeterId,
UniTagInformation tagInformation,
boolean install,
boolean upstream) {
@@ -484,7 +492,7 @@
if (tagInformation.getTechnologyProfileId() != NONE_TP_ID) {
treatmentBuilder.writeMetadata(createTechProfValueForWm(null,
- tagInformation.getTechnologyProfileId()), 0);
+ tagInformation.getTechnologyProfileId(), upstreamOltMeterId), 0);
}
@@ -532,7 +540,7 @@
}
@Override
- public void processEapolFilteringObjectives(AccessDevicePort port, String bpId,
+ public void processEapolFilteringObjectives(AccessDevicePort port, String bpId, Optional<String> oltBpId,
CompletableFuture<ObjectiveError> filterFuture,
VlanId vlanId, boolean install) {
@@ -546,6 +554,12 @@
}
log.info("Processing EAPOL with Bandwidth profile {} on {}", bpId, port);
BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bpId);
+ BandwidthProfileInformation oltBpInfo;
+ if (oltBpId.isPresent()) {
+ oltBpInfo = getBandwidthProfileInformation(oltBpId.get());
+ } else {
+ oltBpInfo = bpInfo;
+ }
if (bpInfo == null) {
log.warn("Bandwidth profile {} is not found. Authentication flow"
+ " will not be installed on {}", bpId, port);
@@ -558,19 +572,22 @@
ConnectPoint cp = new ConnectPoint(port.deviceId(), port.number());
DefaultFilteringObjective.Builder filterBuilder = DefaultFilteringObjective.builder();
TrafficTreatment.Builder treatmentBuilder = DefaultTrafficTreatment.builder();
- CompletableFuture<Object> meterFuture = new CompletableFuture<>();
// check if meter exists and create it only for an install
final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(port.deviceId(), bpInfo.id());
- log.info("Meter id {} for Bandwidth profile {} associated to EAPOL on {}",
- meterId, bpInfo.id(), port.deviceId());
- if (meterId == null) {
+ MeterId oltMeterId = null;
+ if (oltBpId.isPresent()) {
+ oltMeterId = oltBpId.map(id -> oltMeterService.getMeterIdFromBpMapping(port.deviceId(), id)).orElse(null);
+ }
+ log.info("Meter id {} for Bandwidth profile {} and OLT meter id {} for OLT Bandwidth profile {} " +
+ "associated to EAPOL on {}", meterId, bpInfo.id(), oltMeterId, oltBpId, port.deviceId());
+ if (meterId == null || (oltBpId.isPresent() && oltMeterId == null)) {
if (install) {
log.debug("Need to install meter for EAPOL with bwp {} on {}", bpInfo.id(), port);
SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder()
- .setPonCTag(vlanId).build(),
- null, null,
- null, bpInfo.id());
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId, null, oltMeterId,
+ null, bpInfo.id(), null, oltBpInfo.id());
pendingEapolForDevice.compute(port.deviceId(), (id, queue) -> {
if (queue == null) {
queue = new LinkedBlockingQueue<>();
@@ -580,11 +597,14 @@
});
//If false the meter is already being installed, skipping installation
- if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo)) {
+ if (!oltMeterService.checkAndAddPendingMeter(port.deviceId(), bpInfo) &&
+ !oltMeterService.checkAndAddPendingMeter(port.deviceId(), oltBpInfo)) {
return;
}
- MeterId innerMeterId = oltMeterService.createMeter(port.deviceId(), bpInfo, meterFuture);
- fi.setUpMeterId(innerMeterId);
+ List<BandwidthProfileInformation> bwpList = Arrays.asList(bpInfo, oltBpInfo);
+ bwpList.stream().distinct().filter(Objects::nonNull)
+ .forEach(bwp -> createMeterAndProceedEapol(port, bwp, filterFuture, install,
+ cp, filterBuilder, treatmentBuilder));
} else {
// this case should not happen as the request to remove an eapol
// flow should mean that the flow points to a meter that exists.
@@ -593,54 +613,75 @@
log.warn("Unknown meter id for bp {}, still proceeding with "
+ "delete of eapol flow on {}", bpInfo.id(), port);
SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder()
- .setPonCTag(vlanId).build(),
- null, meterId,
- null, bpInfo.id());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+ new UniTagInformation.Builder().setPonCTag(vlanId).build(),
+ null, meterId, null, oltMeterId, null,
+ bpInfo.id(), null, oltBpInfo.id());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
}
} else {
log.debug("Meter {} was previously created for bp {} on {}", meterId, bpInfo.id(), port);
SubscriberFlowInfo fi = new SubscriberFlowInfo(null, port,
- new UniTagInformation.Builder()
- .setPonCTag(vlanId).build(),
- null, meterId,
- null, bpInfo.id());
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId);
+ new UniTagInformation.Builder().setPonCTag(vlanId).build(),
+ null, meterId, null, oltMeterId, null,
+ bpInfo.id(), null, oltBpInfo.id());
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, meterId, oltMeterId);
//No need for the future, meter is present.
return;
}
- meterFuture.thenAcceptAsync(result -> {
+ }
+
+ private void createMeterAndProceedEapol(AccessDevicePort port, BandwidthProfileInformation bwpInfo,
+ CompletableFuture<ObjectiveError> filterFuture,
+ boolean install, ConnectPoint cp,
+ DefaultFilteringObjective.Builder filterBuilder,
+ TrafficTreatment.Builder treatmentBuilder) {
+ CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+ MeterId meterId = oltMeterService.createMeter(port.deviceId(), bwpInfo, meterFuture);
+ DeviceId deviceId = port.deviceId();
+ meterFuture.thenAccept(result -> {
//for each pending eapol flow we check if the meter is there.
- BlockingQueue<SubscriberFlowInfo> queue = pendingEapolForDevice.get(port.deviceId());
- if (queue != null) {
- while (true) {
- SubscriberFlowInfo fi = queue.remove();
- if (fi == null) {
- pendingEapolForDevice.replace(port.deviceId(), queue);
- break;
- }
- //TODO this might return the reference and not the actual object
- // so it can be actually swapped underneath us.
- log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
- if (result == null) {
- MeterId mId = oltMeterService
- .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
- if (mId != null) {
- log.debug("Meter installation completed for subscriber on {}, " +
- "handling EAPOL trap flow", port);
- handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi, mId);
+ pendingEapolForDevice.compute(deviceId, (id, queue) -> {
+ if (queue != null && !queue.isEmpty()) {
+ while (true) {
+ //TODO this might return the reference and not the actual object
+ // so it can be actually swapped underneath us.
+ SubscriberFlowInfo fi = queue.peek();
+ if (fi == null) {
+ log.debug("No more subscribers eapol flows on {}", deviceId);
+ queue = new LinkedBlockingQueue<>();
+ break;
}
- } else {
- log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
- "Result {} and MeterId {}", port, result,
- meterId);
+ log.debug("handing pending eapol on {} for {}", fi.getUniPort(), fi);
+ if (result == null) {
+ MeterId mId = oltMeterService
+ .getMeterIdFromBpMapping(port.deviceId(), fi.getUpBpInfo());
+ MeterId oltMeterId = oltMeterService
+ .getMeterIdFromBpMapping(port.deviceId(), fi.getUpOltBpInfo());
+ if (mId != null && oltMeterId != null) {
+ log.debug("Meter installation completed for subscriber on {}, " +
+ "handling EAPOL trap flow", port);
+ fi.setUpMeterId(mId);
+ fi.setUpOltMeterId(oltMeterId);
+ handleEapol(filterFuture, install, cp, filterBuilder, treatmentBuilder, fi,
+ mId, oltMeterId);
+ queue.remove(fi);
+ } else {
+ log.debug("Not all meters for {} are yet installed for EAPOL meterID {}, " +
+ "oltMeterId {}", fi, meterId, oltMeterId);
+ }
+ } else {
+ log.warn("Meter installation error while sending EAPOL trap flow to {}. " +
+ "Result {} and MeterId {}", port, result, meterId);
+ queue.remove(fi);
+ }
+ oltMeterService.removeFromPendingMeters(port.deviceId(), bwpInfo);
}
- oltMeterService.removeFromPendingMeters(port.deviceId(), bpInfo);
+ } else {
+ log.info("No pending EAPOLs on {}", port.deviceId());
+ queue = new LinkedBlockingQueue<>();
}
- } else {
- log.info("No pending EAPOLs on {}", port.deviceId());
- }
+ return queue;
+ });
});
}
@@ -648,7 +689,7 @@
boolean install, ConnectPoint cp,
DefaultFilteringObjective.Builder filterBuilder,
TrafficTreatment.Builder treatmentBuilder,
- SubscriberFlowInfo fi, MeterId mId) {
+ SubscriberFlowInfo fi, MeterId mId, MeterId oltMeterId) {
log.info("Meter {} for {} on {} exists. {} EAPOL trap flow",
mId, fi.getUpBpInfo(), fi.getUniPort(),
(install) ? "Installing" : "Removing");
@@ -664,7 +705,7 @@
.withMeta(treatmentBuilder
.writeMetadata(createTechProfValueForWm(
fi.getTagInfo().getPonCTag(),
- techProfileId), 0)
+ techProfileId, oltMeterId), 0)
.setOutput(PortNumber.CONTROLLER)
.pushVlan()
.setVlanId(fi.getTagInfo().getPonCTag())
@@ -707,9 +748,9 @@
log.info("{} flows for NNI port {}",
install ? "Adding" : "Removing", nniPort);
processLldpFilteringObjective(nniPort, install);
- processDhcpFilteringObjectives(nniPort, null, null, install, false, Optional.empty());
- processIgmpFilteringObjectives(nniPort, null, null, install, false);
- processPPPoEDFilteringObjectives(nniPort, null, null, install, false);
+ processDhcpFilteringObjectives(nniPort, null, null, null, install, false, Optional.empty());
+ processIgmpFilteringObjectives(nniPort, null, null, null, install, false);
+ processPPPoEDFilteringObjectives(nniPort, null, null, null, install, false);
}
@@ -765,13 +806,15 @@
upstream ? uplinkPort.number() : subscriberPort.number()), 0)
.build();
- return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY);
+ return createForwardingObjectiveBuilder(selector, treatment, MIN_PRIORITY,
+ DefaultAnnotations.builder().build());
}
@Override
public ForwardingObjective.Builder createUpBuilder(AccessDevicePort uplinkPort,
AccessDevicePort subscriberPort,
MeterId upstreamMeterId,
+ MeterId upstreamOltMeterId,
UniTagInformation uniTagInformation) {
TrafficSelector selector = DefaultTrafficSelector.builder()
@@ -801,17 +844,26 @@
.writeMetadata(createMetadata(uniTagInformation.getPonCTag(),
uniTagInformation.getTechnologyProfileId(), uplinkPort.number()), 0L);
+ DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+
if (upstreamMeterId != null) {
treatmentBuilder.meter(upstreamMeterId);
+ annotationBuilder.set(UPSTREAM_ONU, upstreamMeterId.toString());
+ }
+ if (upstreamOltMeterId != null) {
+ treatmentBuilder.meter(upstreamOltMeterId);
+ annotationBuilder.set(UPSTREAM_OLT, upstreamOltMeterId.toString());
}
- return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY);
+ return createForwardingObjectiveBuilder(selector, treatmentBuilder.build(), MIN_PRIORITY,
+ annotationBuilder.build());
}
@Override
public ForwardingObjective.Builder createDownBuilder(AccessDevicePort uplinkPort,
AccessDevicePort subscriberPort,
MeterId downstreamMeterId,
+ MeterId downstreamOltMeterId,
UniTagInformation tagInformation,
Optional<MacAddress> macAddress) {
@@ -854,11 +906,20 @@
treatmentBuilder.setVlanId(tagInformation.getUniTagMatch());
}
+ DefaultAnnotations.Builder annotationBuilder = DefaultAnnotations.builder();
+
if (downstreamMeterId != null) {
treatmentBuilder.meter(downstreamMeterId);
+ annotationBuilder.set(DOWNSTREAM_ONU, downstreamMeterId.toString());
}
- return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY);
+ if (downstreamOltMeterId != null) {
+ treatmentBuilder.meter(downstreamOltMeterId);
+ annotationBuilder.set(DOWNSTREAM_OLT, downstreamOltMeterId.toString());
+ }
+
+ return createForwardingObjectiveBuilder(selectorBuilder.build(), treatmentBuilder.build(), MIN_PRIORITY,
+ annotationBuilder.build());
}
@Override
@@ -868,12 +929,14 @@
private DefaultForwardingObjective.Builder createForwardingObjectiveBuilder(TrafficSelector selector,
TrafficTreatment treatment,
- Integer priority) {
+ Integer priority,
+ Annotations annotations) {
return DefaultForwardingObjective.builder()
.withFlag(ForwardingObjective.Flag.VERSATILE)
.withPriority(priority)
.makePermanent()
.withSelector(selector)
+ .withAnnotations(annotations)
.fromApp(appId)
.withTreatment(treatment);
}
@@ -882,15 +945,24 @@
* Returns the write metadata value including tech profile reference and innerVlan.
* For param cVlan, null can be sent
*
- * @param cVlan c (customer) tag of one subscriber
- * @param techProfileId tech profile id of one subscriber
+ * @param cVlan c (customer) tag of one subscriber
+ * @param techProfileId tech profile id of one subscriber
+ * @param upstreamOltMeterId upstream meter id for OLT device.
* @return the write metadata value including tech profile reference and innerVlan
*/
- private Long createTechProfValueForWm(VlanId cVlan, int techProfileId) {
+ private Long createTechProfValueForWm(VlanId cVlan, int techProfileId, MeterId upstreamOltMeterId) {
+ Long writeMetadata;
+
if (cVlan == null || VlanId.NONE.equals(cVlan)) {
- return (long) techProfileId << 32;
+ writeMetadata = (long) techProfileId << 32;
+ } else {
+ writeMetadata = ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
}
- return ((long) (cVlan.id()) << 48 | (long) techProfileId << 32);
+ if (upstreamOltMeterId == null) {
+ return writeMetadata;
+ } else {
+ return writeMetadata | upstreamOltMeterId.id();
+ }
}
private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) {