[JIRA-3148] install only one meter per bandwidth profile on eapol flows
Change-Id: Ie2f1922cf3854b47d9a2d34a44410ef883b3fe2e
diff --git a/app/src/main/java/org/opencord/olt/impl/Olt.java b/app/src/main/java/org/opencord/olt/impl/Olt.java
index d5e8e32..fe2675e 100644
--- a/app/src/main/java/org/opencord/olt/impl/Olt.java
+++ b/app/src/main/java/org/opencord/olt/impl/Olt.java
@@ -41,7 +41,6 @@
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -84,6 +83,7 @@
import org.opencord.olt.AccessSubscriberId;
import org.opencord.olt.internalapi.AccessDeviceFlowService;
import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -185,7 +185,6 @@
private ConsistentMultimap<ConnectPoint, UniTagInformation> programmedSubs;
- private Set<DeviceBandwidthProfile> pendingMeters;
private Set<SubscriberFlowInfo> pendingSubscribers;
@Activate
@@ -208,7 +207,6 @@
.withApplicationId(appId)
.build();
- pendingMeters = ConcurrentHashMap.newKeySet();
pendingSubscribers = Sets.newConcurrentHashSet();
eventDispatcher.addSink(AccessDeviceEvent.class, listenerRegistry);
@@ -267,7 +265,7 @@
@Override
public boolean provisionSubscriber(ConnectPoint connectPoint) {
- log.info("Call to provisioning subscriber at {}", connectPoint);
+ log.info("Call to provision subscriber at {}", connectPoint);
DeviceId deviceId = connectPoint.deviceId();
PortNumber subscriberPortNo = connectPoint.port();
@@ -297,6 +295,9 @@
filterFuture.thenAcceptAsync(filterStatus -> {
if (filterStatus == null) {
provisionUniTagList(connectPoint, uplinkPort.number(), sub);
+ } else {
+ log.error("The filtering future did not complete properly {} " +
+ "subscriber on {} is not provisioned", filterStatus, connectPoint);
}
});
return true;
@@ -703,16 +704,18 @@
}
}
private void checkAndCreateDevMeter(DeviceBandwidthProfile dm) {
- if (pendingMeters.contains(dm)) {
+ if (oltMeterService.isMeterPending(dm)) {
+ log.debug("Meter is already pending {}", dm);
return;
}
- pendingMeters.add(dm);
+ oltMeterService.addToPendingMeters(dm);
createMeter(dm);
}
private void createMeter(DeviceBandwidthProfile dm) {
- log.debug("Creating Meter {} from queue", dm);
+ log.debug("Creating Meter {} from queue for subscriber", dm);
CompletableFuture<Object> meterFuture = new CompletableFuture<>();
+
MeterId meterId = oltMeterService.createMeter(dm.getDevId(), dm.getBwInfo(),
meterFuture);
@@ -743,13 +746,13 @@
handleSubFlowsWithMeters(fi);
subsIterator.remove();
}
- pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+ oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
} else {
// meter install failed
log.error("Addition of subscriber {} failed due to meter " +
"{} with result {}", fi, meterId, result);
subsIterator.remove();
- pendingMeters.remove(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
+ oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(dm.getDevId(), dm.getBwInfo()));
}
}
});
diff --git a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
index 83032c0..b4dca74 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltFlowService.java
@@ -49,6 +49,7 @@
import org.onosproject.net.meter.MeterId;
import org.opencord.olt.internalapi.AccessDeviceFlowService;
import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.BaseInformationService;
import org.opencord.sadis.SadisService;
@@ -64,6 +65,7 @@
import org.slf4j.Logger;
import java.util.Dictionary;
+import java.util.Iterator;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -155,6 +157,7 @@
protected BaseInformationService<BandwidthProfileInformation> bpService;
protected BaseInformationService<SubscriberAndDeviceInformation> subsService;
private Set<ConnectPoint> pendingAddEapol = Sets.newConcurrentHashSet();
+ private Set<SubscriberFlowInfo> pendingEapolForMeters = Sets.newConcurrentHashSet();;
@Activate
public void activate(ComponentContext context) {
@@ -419,11 +422,26 @@
CompletableFuture<Object> meterFuture = new CompletableFuture<>();
// check if meter exists and create it only for an install
- MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+ final MeterId meterId = oltMeterService.getMeterIdFromBpMapping(devId, bpInfo.id());
+ DeviceBandwidthProfile dm = new DeviceBandwidthProfile(devId, bpInfo);
if (meterId == null) {
if (install) {
- meterId = oltMeterService.createMeter(devId, bpInfo, meterFuture);
- treatmentBuilder.meter(meterId);
+ log.debug("Need to install meter for EAPOL with bwp {}", bpInfo.id());
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, null,
+ null, bpInfo.id());
+ pendingEapolForMeters.add(fi);
+
+ if (oltMeterService.isMeterPending(dm)) {
+ log.debug("Meter {} is already pending for EAPOL", dm);
+ return;
+ }
+ oltMeterService.addToPendingMeters(dm);
+ MeterId innerMeterId = oltMeterService.createMeter(devId, bpInfo,
+ meterFuture);
+ fi.setUpMeterId(innerMeterId);
} else {
// this case should not happen as the request to remove an eapol
// flow should mean that the flow points to a meter that exists.
@@ -431,63 +449,100 @@
// correct 'match' to do so.
log.warn("Unknown meter id for bp {}, still proceeding with "
+ "delete of eapol flow for {}/{}", bpInfo.id(), devId, portNumber);
- meterFuture.complete(null);
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId,
+ null, bpInfo.id());
+ handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
}
} else {
log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
- treatmentBuilder.meter(meterId);
- meterFuture.complete(null);
+ SubscriberFlowInfo fi = new SubscriberFlowInfo(devId, null, cp.port(),
+ new UniTagInformation.Builder()
+ .setPonCTag(vlanId).build(),
+ null, meterId,
+ null, bpInfo.id());
+ handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, meterId);
+ //No need for the future, meter is present.
+ return;
}
-
- final MeterId mId = meterId;
meterFuture.thenAcceptAsync(result -> {
- if (result == null) {
- log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
- mId, bpId, devId, portNumber,
- (install) ? "Installing" : "Removing");
- int techProfileId = getDefaultTechProfileId(devId, portNumber);
-
- //Authentication trap flow uses only tech profile id as write metadata value
- FilteringObjective eapol = (install ? builder.permit() : builder.deny())
- .withKey(Criteria.matchInPort(portNumber))
- .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
- .addCondition(Criteria.matchVlanId(vlanId))
- .withMeta(treatmentBuilder
- .writeMetadata(createTechProfValueForWm(vlanId, techProfileId), 0)
- .setOutput(PortNumber.CONTROLLER).build())
- .fromApp(appId)
- .withPriority(MAX_PRIORITY)
- .add(new ObjectiveContext() {
- @Override
- public void onSuccess(Objective objective) {
- log.info("Eapol filter for {} on {} {} with meter {}.",
- devId, portNumber, (install) ? INSTALLED : REMOVED, mId);
- if (filterFuture != null) {
- filterFuture.complete(null);
- }
- pendingAddEapol.remove(cp);
- }
-
- @Override
- public void onError(Objective objective, ObjectiveError error) {
- log.info("Eapol filter for {} on {} with meter {} failed {} because {}",
- devId, portNumber, mId, (install) ? INSTALLATION : REMOVAL,
- error);
- if (filterFuture != null) {
- filterFuture.complete(error);
- }
- pendingAddEapol.remove(cp);
- }
- });
-
- flowObjectiveService.filter(devId, eapol);
- } else {
- log.warn("Meter installation error while sending eapol trap flow. " +
- "Result {} and MeterId {}", result, mId);
+ //for each pending eapol flow we check if the meter is there.
+ Iterator<SubscriberFlowInfo> eapIterator = pendingEapolForMeters.iterator();
+ while (eapIterator.hasNext()) {
+ SubscriberFlowInfo fi = eapIterator.next();
+ if (result == null) {
+ MeterId mId = oltMeterService
+ .getMeterIdFromBpMapping(dm.getDevId(), fi.getUpBpInfo());
+ if (mId != null) {
+ handleEapol(filterFuture, install, cp, builder, treatmentBuilder, fi, mId);
+ eapIterator.remove();
+ }
+ } else {
+ log.warn("Meter installation error while sending eapol trap flow. " +
+ "Result {} and MeterId {}", result, meterId);
+ eapIterator.remove();
+ }
+ oltMeterService.removeFromPendingMeters(new DeviceBandwidthProfile(
+ dm.getDevId(), dm.getBwInfo()));
}
});
}
+ private void handleEapol(CompletableFuture<ObjectiveError> filterFuture,
+ boolean install, ConnectPoint cp,
+ DefaultFilteringObjective.Builder builder,
+ TrafficTreatment.Builder treatmentBuilder,
+ SubscriberFlowInfo fi, MeterId mId) {
+ log.info("Meter {} for {} on {}/{} exists. {} EAPOL trap flow",
+ mId, fi.getUpBpInfo(), fi.getDevId(), fi.getUniPort(),
+ (install) ? "Installing" : "Removing");
+ int techProfileId = getDefaultTechProfileId(fi.getDevId(), fi.getUniPort());
+ // can happen in case of removal
+ if (mId != null) {
+ treatmentBuilder.meter(mId);
+ }
+ //Authentication trap flow uses only tech profile id as write metadata value
+ FilteringObjective eapol = (install ? builder.permit() : builder.deny())
+ .withKey(Criteria.matchInPort(fi.getUniPort()))
+ .addCondition(Criteria.matchEthType(EthType.EtherType.EAPOL.ethType()))
+ .addCondition(Criteria.matchVlanId(fi.getTagInfo().getPonCTag()))
+ .withMeta(treatmentBuilder
+ .writeMetadata(createTechProfValueForWm(
+ fi.getTagInfo().getPonCTag(),
+ techProfileId), 0)
+ .setOutput(PortNumber.CONTROLLER).build())
+ .fromApp(appId)
+ .withPriority(MAX_PRIORITY)
+ .add(new ObjectiveContext() {
+ @Override
+ public void onSuccess(Objective objective) {
+ log.info("Eapol filter for {} on {} {} with meter {}.",
+ fi.getDevId(), fi.getUniPort(),
+ (install) ? INSTALLED : REMOVED, mId);
+ if (filterFuture != null) {
+ filterFuture.complete(null);
+ }
+ pendingAddEapol.remove(cp);
+ }
+
+ @Override
+ public void onError(Objective objective, ObjectiveError error) {
+ log.error("Eapol filter for {} on {} with meter {} " +
+ "failed {} because {}",
+ fi.getDevId(), fi.getUniPort(), mId,
+ (install) ? INSTALLATION : REMOVAL,
+ error);
+ if (filterFuture != null) {
+ filterFuture.complete(error);
+ }
+ pendingAddEapol.remove(cp);
+ }
+ });
+ flowObjectiveService.filter(fi.getDevId(), eapol);
+ }
+
/**
* Installs trap filtering objectives for particular traffic types on an
* NNI port.
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
index be15854..65a72f0 100644
--- a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
+++ b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -43,6 +43,7 @@
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
import org.opencord.sadis.BandwidthProfileInformation;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -60,7 +61,9 @@
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
@@ -109,6 +112,8 @@
protected ExecutorService eventExecutor;
+ private Set<DeviceBandwidthProfile> pendingMeters;
+
@Activate
public void activate(ComponentContext context) {
eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
@@ -129,6 +134,7 @@
meterService.addListener(meterListener);
componentConfigService.registerProperties(getClass());
+ pendingMeters = ConcurrentHashMap.newKeySet();
log.info("Olt Meter service started");
}
@@ -195,6 +201,7 @@
@Override
public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
CompletableFuture<Object> meterFuture) {
+ log.debug("Installing meter on {} for {}", deviceId, bpInfo);
if (bpInfo == null) {
log.warn("Requested bandwidth profile information is NULL");
meterFuture.complete(ObjectiveError.BADPARAMS);
@@ -218,7 +225,7 @@
@Override
public void onSuccess(MeterRequest op) {
log.debug("Meter {} is installed on the device {}",
- meterId, deviceId);
+ meterIdRef.get(), deviceId);
addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
meterFuture.complete(null);
}
@@ -244,6 +251,21 @@
}
@Override
+ public void addToPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+ pendingMeters.add(deviceBandwidthProfile);
+ }
+
+ @Override
+ public void removeFromPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+ pendingMeters.remove(deviceBandwidthProfile);
+ }
+
+ @Override
+ public boolean isMeterPending(DeviceBandwidthProfile deviceBandwidthProfile) {
+ return pendingMeters.contains(deviceBandwidthProfile);
+ }
+
+ @Override
public void clearMeters(DeviceId deviceId) {
List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
.filter(e -> e.getValue().deviceId().equals(deviceId))
diff --git a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
index 48570cd..bad2e5b 100644
--- a/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
+++ b/app/src/main/java/org/opencord/olt/internalapi/AccessDeviceMeterService.java
@@ -68,6 +68,30 @@
CompletableFuture<Object> meterFuture);
/**
+ * Adds the DeviceBandwidthProfile to the pendingMeters.
+ *
+ * @param deviceBandwidthProfile the device to bandwidth profile mapping
+ */
+ void addToPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile);
+
+ /**
+ * Removes the DeviceBandwidthProfile from the pendingMeters.
+ *
+ * @param deviceBandwidthProfile the device to bandwidth profile mapping
+ *
+ */
+ void removeFromPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile);
+
+ /**
+ * Checks if DeviceBandwidthProfile is pending.
+ *
+ * @param deviceBandwidthProfile the device to bandwidth profile mapping
+ *
+ * @return true if pending.
+ */
+ boolean isMeterPending(DeviceBandwidthProfile deviceBandwidthProfile);
+
+ /**
* Clears out bandwidth profile to meter mappings for the given device.
*
* @param deviceId device ID
diff --git a/app/src/main/java/org/opencord/olt/impl/DeviceBandwidthProfile.java b/app/src/main/java/org/opencord/olt/internalapi/DeviceBandwidthProfile.java
similarity index 88%
rename from app/src/main/java/org/opencord/olt/impl/DeviceBandwidthProfile.java
rename to app/src/main/java/org/opencord/olt/internalapi/DeviceBandwidthProfile.java
index 49a4a34..8f676ec 100644
--- a/app/src/main/java/org/opencord/olt/impl/DeviceBandwidthProfile.java
+++ b/app/src/main/java/org/opencord/olt/internalapi/DeviceBandwidthProfile.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.opencord.olt.impl;
+package org.opencord.olt.internalapi;
import org.onosproject.net.DeviceId;
import org.opencord.sadis.BandwidthProfileInformation;
@@ -23,7 +23,7 @@
/**
* Class containing a mapping of DeviceId to BandwidthProfileInformation.
*/
-class DeviceBandwidthProfile {
+public class DeviceBandwidthProfile {
private final DeviceId devId;
private BandwidthProfileInformation bwInfo;
@@ -33,7 +33,7 @@
* @param devId the device id
* @param bwInfo the bandwidth profile information
*/
- DeviceBandwidthProfile(DeviceId devId, BandwidthProfileInformation bwInfo) {
+ public DeviceBandwidthProfile(DeviceId devId, BandwidthProfileInformation bwInfo) {
this.devId = devId;
this.bwInfo = bwInfo;
}
@@ -43,7 +43,7 @@
*
* @return device id.
*/
- DeviceId getDevId() {
+ public DeviceId getDevId() {
return devId;
}
@@ -52,7 +52,7 @@
*
* @return bandwidth profile information
*/
- BandwidthProfileInformation getBwInfo() {
+ public BandwidthProfileInformation getBwInfo() {
return bwInfo;
}
diff --git a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
index e0bd109..f259e6f 100644
--- a/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
+++ b/app/src/test/java/org/opencord/olt/impl/OltFlowTest.java
@@ -48,6 +48,7 @@
import org.onosproject.net.flowobjective.Objective;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
+import org.opencord.olt.internalapi.DeviceBandwidthProfile;
import org.opencord.sadis.BandwidthProfileInformation;
import org.opencord.sadis.UniTagInformation;
@@ -205,6 +206,20 @@
}
@Override
+ public void addToPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+
+ }
+
+ @Override
+ public void removeFromPendingMeters(DeviceBandwidthProfile deviceBandwidthProfile) {
+ }
+
+ @Override
+ public boolean isMeterPending(DeviceBandwidthProfile deviceBandwidthProfile) {
+ return false;
+ }
+
+ @Override
public void clearMeters(DeviceId deviceId) {
}
}