[JIRA-3148] install only one meter per bandwidth profile on eapol flows
Change-Id: Ie2f1922cf3854b47d9a2d34a44410ef883b3fe2e
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 83032c0..b4dca74 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -49,6 +49,7 @@
import org.onosproject.net.meter.MeterId;
import org.opencord.olt.internalapi.AccessDeviceFlowService;
import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -64,6 +65,7 @@
import org.slf4j.Logger;
import java.util.Dictionary;
+import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -155,6 +157,7 @@
protected BaseInformationService<BandwidthProfileInformation> bpService;
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
private Set<ConnectPoint> pendingAddEapol = Sets.newConcurrentHashSet();
+ private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
@Activate
public void activate(ComponentContext context) {
@@ -419,11 +422,26 @@
CompletableFuture<Object> meterFuture = new CompletableFuture<>();
// check if meter exists and create it only for an install
- MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+ final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+ DeviceBandwidthProfile dm = new DeviceBandwidthProfile(devId, bpInfo);
if (meterId == null) {
if (install) {
- meterId = oltMeterService.createMeter(devId, bpInfo, meterFuture);
- treatmentBuilder.meter(meterId);
+ log.debug("Need to install meter for EAPOL with bwp {}", bpInfo.id());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, null,
+ null, bpInfo.id());
+ pendingEapolForMeters.add(fi);
+
+ if (oltMeterService.isMeterPending(dm)) {
+ log.debug("Meter {} is already pending for EAPOL", dm);
+ return;
+ }
+ oltMeterService.addToPendingMeters(dm);
+ MeterId innerMeterId = oltMeterService.createMeter(devId, bpInfo,
+ meterFuture);
+ fi.setUpMeterId(innerMeterId);
} else {
// this case should not happen as the request to remove an eapol
// flow should mean that the flow points to a meter that exists.
@@ -431,63 +449,100 @@
// correct 'match' to do so.
log.warn("Unknown meter id for bp {}, still proceeding with "
+ "delete of eapol flow for {}/{}", bpInfo.id(), devId, portNumber);
- meterFuture.complete(null);
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId,
+ null, bpInfo.id());
+ handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
}
} else {
log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
- treatmentBuilder.meter(meterId);
- meterFuture.complete(null);
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId,
+ null, bpInfo.id());
+ handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
+ //No need for the future, meter is present.
+ return;
}
-
- final MeterId mId = meterId;
meterFuture.thenAcceptAsync(result -> {
- if (result == null) {
- log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
- mId, bpId, devId, portNumber,
- (install) ? "Installing" : "Removing");
- int techProfileId = getDefaultTechProfileId(devId, portNumber);
-
- //Authentication trap flow uses only tech profile id as write metadata value
- FilteringObjective eapol = (install ? builder.permit() : builder.deny())
- .withKey(Criteria.matchInPort(portNumber))
- .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
- .addCondition(Criteria.matchVlanId(vlanId))
- .withMeta(treatmentBuilder
- .writeMetadata(createTechProfValueForWm(vlanId, techProfileId), 0)
- .setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Eapol filter for {} on {} {} with meter {}.",
- devId, portNumber, (install) ? INSTALLED : REMOVED, mId);
- if (filterFuture != null) {
- filterFuture.complete(null);
- }
- pendingAddEapol.remove(cp);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.info("Eapol filter for {} on {} with meter {} failed {} because {}",
- devId, portNumber, mId, (install) ? INSTALLATION : REMOVAL,
- error);
- if (filterFuture != null) {
- filterFuture.complete(error);
- }
- pendingAddEapol.remove(cp);
- }
- });
-
- flowObjectiveService.filter(devId, eapol);
- } else {
- log.warn("Meter installation error while sending eapol trap flow. " +
- "Result {} and MeterId {}", result, mId);
+ //for each pending eapol flow we check if the meter is there.
+ Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
+ while (eapIterator.hasNext()) {
+ SubscriberFlowInfo fi = eapIterator.next();
+ if (result == null) {
+ MeterId mId = oltMeterService
+ .getMeterIdFromBpMapping(dm.getDevId(), fi.getUpBpInfo());
+ if (mId != null) {
+ handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, mId);
+ eapIterator.remove();
+ }
+ } else {
+ log.warn("Meter installation error while sending eapol trap flow. " +
+ "Result {} and MeterId {}", result, meterId);
+ eapIterator.remove();
+ }
+ oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(
+ dm.getDevId(), dm.getBwInfo()));
}
});
}
+ private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
+ boolean install, ConnectPoint cp,
+ DefaultFilteringObjective.Builder builder,
+ TrafficTreatment.Builder treatmentBuilder,
+ SubscriberFlowInfo fi, MeterId mId) {
+ log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
+ mId, fi.getUpBpInfo(), fi.getDevId(), fi.getUniPort(),
+ (install) ? "Installing" : "Removing");
+ int techProfileId = getDefaultTechProfileId(fi.getDevId(), fi.getUniPort());
+ // can happen in case of removal
+ if (mId != null) {
+ treatmentBuilder.meter(mId);
+ }
+ //Authentication trap flow uses only tech profile id as write metadata value
+ FilteringObjective eapol = (install ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(fi.getUniPort()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+ .addCondition(Criteria.matchVlanId(fi.getTagInfo().getPonCTag()))
+ .withMeta(treatmentBuilder
+ .writeMetadata(createTechProfValueForWm(
+ fi.getTagInfo().getPonCTag(),
+ techProfileId), 0)
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("Eapol filter for {} on {} {} with meter {}.",
+ fi.getDevId(), fi.getUniPort(),
+ (install) ? INSTALLED : REMOVED, mId);
+ if (filterFuture != null) {
+ filterFuture.complete(null);
+ }
+ pendingAddEapol.remove(cp);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Eapol filter for {} on {} with meter {} " +
+ "failed {} because {}",
+ fi.getDevId(), fi.getUniPort(), mId,
+ (install) ? INSTALLATION : REMOVAL,
+ error);
+ if (filterFuture != null) {
+ filterFuture.complete(error);
+ }
+ pendingAddEapol.remove(cp);
+ }
+ });
+ flowObjectiveService.filter(fi.getDevId(), eapol);
+ }
+
/**
* Installs trap filtering objectives for particular traffic types on an
* NNI port.