blob: 4b883441f4e5c46fc11e429e9df031af47215cbf [file] [log] [blame]
/*
* Copyright 2016-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencord.olt.impl;
import static com.google.common.base.Strings.isNullOrEmpty;
import static java.util.stream.Collectors.collectingAndThen;
import static java.util.stream.Collectors.groupingBy;
import static java.util.stream.Collectors.mapping;
import static java.util.stream.Collectors.toSet;
import static org.onlab.util.Tools.get;
import static org.onlab.util.Tools.groupedThreads;
import static org.opencord.olt.impl.OsgiPropertyConstants.*;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Dictionary;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.onlab.util.KryoNamespace;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
import org.onosproject.cluster.NodeId;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceService;
import org.onosproject.net.flowobjective.ObjectiveError;
import org.onosproject.net.meter.Band;
import org.onosproject.net.meter.DefaultBand;
import org.onosproject.net.meter.DefaultMeterRequest;
import org.onosproject.net.meter.Meter;
import org.onosproject.net.meter.MeterContext;
import org.onosproject.net.meter.MeterEvent;
import org.onosproject.net.meter.MeterFailReason;
import org.onosproject.net.meter.MeterId;
import org.onosproject.net.meter.MeterKey;
import org.onosproject.net.meter.MeterListener;
import org.onosproject.net.meter.MeterRequest;
import org.onosproject.net.meter.MeterService;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.ConsistentMultimap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.opencord.olt.internalapi.AccessDeviceMeterService;
import org.opencord.sadis.BandwidthProfileInformation;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
/**
* Provisions Meters on access devices.
*/
@Component(immediate = true, property = {
DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
})
public class OltMeterService implements AccessDeviceMeterService {
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MeterService meterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService componentConfigService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected MastershipService mastershipService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected LeadershipService leadershipService;
/**
* Delete meters when reference count drops to zero.
*/
protected boolean deleteMeters = DELETE_METERS_DEFAULT;
/**
* Number of Zero References received before deleting the meter.
*/
protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
private ApplicationId appId;
private static final String APP_NAME = "org.opencord.olt";
private final MeterListener meterListener = new InternalMeterListener();
private final Logger log = getLogger(getClass());
protected ExecutorService eventExecutor;
protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
@Activate
public void activate(ComponentContext context) {
eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
"events-%d", log));
appId = coreService.registerApplication(APP_NAME);
modified(context);
KryoNamespace serializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(MeterKey.class)
.register(BandwidthProfileInformation.class)
.build();
bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
.withName("volt-bp-info-to-meter")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build();
meterService.addListener(meterListener);
componentConfigService.registerProperties(getClass());
pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
.withName("volt-pending-meters")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build().asJavaMap();
pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
.withName("volt-pending-remove-meters")
.withSerializer(Serializer.using(serializer))
.withApplicationId(appId)
.build().asJavaMap();
log.info("Olt Meter service started");
}
@Deactivate
public void deactivate() {
meterService.removeListener(meterListener);
}
@Modified
public void modified(ComponentContext context) {
Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
if (d != null) {
deleteMeters = d;
}
String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
Integer.parseInt(zeroReferenceMeterCountNew.trim());
}
@Override
public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
return bpInfoToMeter.stream()
.collect(collectingAndThen(
groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
ImmutableMap::copyOf));
}
boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
log.debug("adding bp {} to meter {} mapping for device {}",
bandwidthProfile, meterId, deviceId);
return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
}
@Override
public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
if (bandwidthProfile == null) {
log.warn("Bandwidth Profile requested is null");
return null;
}
if (bpInfoToMeter.get(bandwidthProfile) == null) {
log.warn("Bandwidth Profile '{}' is not present in the map",
bandwidthProfile);
return null;
}
if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
bandwidthProfile);
return null;
}
Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
.stream()
.filter(meterKey -> meterKey.deviceId().equals(deviceId))
.findFirst();
if (meterKeyForDevice.isPresent()) {
log.debug("Found meter {} for bandwidth profile {} on {}",
meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
return meterKeyForDevice.get().meterId();
} else {
log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
return null;
}
}
@Override
public ImmutableSet<MeterKey> getProgMeters() {
return bpInfoToMeter.stream()
.map(Map.Entry::getValue)
.collect(ImmutableSet.toImmutableSet());
}
@Override
public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
CompletableFuture<Object> meterFuture) {
log.debug("Creating meter on {} for {}", deviceId, bpInfo);
if (bpInfo == null) {
log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
meterFuture.complete(ObjectiveError.BADPARAMS);
return null;
}
MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
if (meterId != null) {
log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
meterFuture.complete(null);
return meterId;
}
List<Band> meterBands = createMeterBands(bpInfo);
final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
MeterRequest meterRequest = DefaultMeterRequest.builder()
.withBands(meterBands)
.withUnit(Meter.Unit.KB_PER_SEC)
.withContext(new MeterContext() {
@Override
public void onSuccess(MeterRequest op) {
log.debug("Meter {} for {} is installed on the device {}",
meterIdRef.get(), bpInfo.id(), deviceId);
boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
if (added) {
meterFuture.complete(null);
} else {
log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
meterIdRef.get(), bpInfo.id(), deviceId);
meterFuture.complete(ObjectiveError.UNKNOWN);
}
}
@Override
public void onError(MeterRequest op, MeterFailReason reason) {
log.error("Failed installing meter {} on {} for {}",
meterIdRef.get(), deviceId, bpInfo.id());
bpInfoToMeter.remove(bpInfo.id(),
MeterKey.key(deviceId, meterIdRef.get()));
meterFuture.complete(reason);
}
})
.forDevice(deviceId)
.fromApp(appId)
.burst()
.add();
Meter meter = meterService.submit(meterRequest);
meterIdRef.set(meter.id());
log.info("Meter {} created and sent for installation on {} for {}",
meter.id(), deviceId, bpInfo);
return meter.id();
}
@Override
public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
if (deviceId == null) {
return;
}
pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
bwps.remove(bwpInfo);
return bwps;
});
}
@Override
public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
if (bwpInfo == null) {
log.debug("Bandwidth profile is null for device: {}", deviceId);
return false;
}
if (pendingMeters.containsKey(deviceId)
&& pendingMeters.get(deviceId).contains(bwpInfo)) {
log.debug("Meter is already pending on {} with bp {}",
deviceId, bwpInfo);
return false;
}
log.debug("Adding bandwidth profile {} to pending on {}",
bwpInfo, deviceId);
pendingMeters.compute(deviceId, (id, bwps) -> {
if (bwps == null) {
bwps = new HashSet<>();
}
bwps.add(bwpInfo);
return bwps;
});
return true;
}
@Override
public void clearMeters(DeviceId deviceId) {
log.debug("Removing all meters for device {}", deviceId);
clearDeviceState(deviceId);
meterService.purgeMeters(deviceId);
}
@Override
public void clearDeviceState(DeviceId deviceId) {
log.info("Clearing local device state for {}", deviceId);
pendingRemoveMeters.remove(deviceId);
removeMetersFromBpMapping(deviceId);
//Following call handles cornercase of OLT delete during meter provisioning
pendingMeters.remove(deviceId);
}
private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
List<Band> meterBands = new ArrayList<>();
// add cir
if (bpInfo.committedInformationRate() != 0) {
meterBands.add(createMeterBand(bpInfo.committedInformationRate(), bpInfo.committedBurstSize()));
}
// check if both air and gir are set together in sadis
// if they are, set air to 0
if (bpInfo.assuredInformationRate() != 0 && bpInfo.guaranteedInformationRate() != 0) {
bpInfo.setAssuredInformationRate(0);
}
// add pir
long pir = bpInfo.peakInformationRate() != 0 ? bpInfo.peakInformationRate() : (bpInfo.exceededInformationRate()
+ bpInfo.committedInformationRate() + bpInfo.guaranteedInformationRate()
+ bpInfo.assuredInformationRate());
Long pbs = bpInfo.peakBurstSize() != null ? bpInfo.peakBurstSize() :
(bpInfo.exceededBurstSize() != null ? bpInfo.exceededBurstSize() : 0) +
(bpInfo.committedBurstSize() != null ? bpInfo.committedBurstSize() : 0);
meterBands.add(createMeterBand(pir, pbs));
// add gir
if (bpInfo.guaranteedInformationRate() != 0) {
meterBands.add(createMeterBand(bpInfo.guaranteedInformationRate(), 0L));
}
// add air
// air is used in place of gir only if gir is
// not present and air is not 0, see line 330.
// Included for backwards compatibility, will be removed in VOLTHA 2.9.
if (bpInfo.assuredInformationRate() != 0) {
meterBands.add(createMeterBand(bpInfo.assuredInformationRate(), 0L));
}
return meterBands;
}
private Band createMeterBand(long rate, Long burst) {
return DefaultBand.builder()
.withRate(rate) //already Kbps
.burstSize(burst) // already Kbits
.ofType(Band.Type.DROP) // no matter
.build();
}
private void removeMeterFromBpMapping(MeterKey meterKey) {
List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
.filter(e -> e.getValue().equals(meterKey))
.collect(Collectors.toList());
meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
}
private void removeMetersFromBpMapping(DeviceId deviceId) {
List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
.filter(e -> e.getValue().deviceId().equals(deviceId))
.collect(Collectors.toList());
meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
}
/**
* Checks for mastership or falls back to leadership on deviceId.
* If the device is available use mastership,
* otherwise fallback on leadership.
* Leadership on the device topic is needed because the master can be NONE
* in case the device went away, we still need to handle events
* consistently
*/
private boolean isLocalLeader(DeviceId deviceId) {
if (deviceService.isAvailable(deviceId)) {
return mastershipService.isLocalMaster(deviceId);
} else {
// Fallback with Leadership service - device id is used as topic
NodeId leader = leadershipService.runForLeadership(
deviceId.toString()).leaderNodeId();
// Verify if this node is the leader
return clusterService.getLocalNode().id().equals(leader);
}
}
private class InternalMeterListener implements MeterListener {
@Override
public void event(MeterEvent meterEvent) {
eventExecutor.execute(() -> {
Meter meter = meterEvent.subject();
if (meter == null) {
log.error("Meter in event {} is null", meterEvent);
return;
}
if (isLocalLeader(meter.deviceId())) {
MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
if (deleteMeters &&
MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
log.info("Zero Count Meter Event is received. Meter is {} on {}",
meter.id(), meter.deviceId());
incrementMeterCount(meter.deviceId(), key);
if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
.get(key).get() == zeroReferenceMeterCount) {
log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
meter.id(), meter.deviceId());
deleteMeter(meter.deviceId(), meter.id());
}
}
if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
log.info("Meter Removed Event is received for {} on {}",
meter.id(), meter.deviceId());
pendingRemoveMeters.computeIfPresent(meter.deviceId(),
(id, meters) -> {
if (meters.get(key) == null) {
log.info("Meters is not pending " +
"{} on {}", key, id);
return meters;
}
meters.remove(key);
return meters;
});
removeMeterFromBpMapping(key);
}
} else {
log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
}
});
}
private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
if (key == null) {
return;
}
pendingRemoveMeters.compute(deviceId,
(id, meters) -> {
if (meters == null) {
meters = new HashMap<>();
}
if (meters.get(key) == null) {
meters.put(key, new AtomicInteger(1));
}
meters.get(key).addAndGet(1);
return meters;
});
}
private void deleteMeter(DeviceId deviceId, MeterId meterId) {
Meter meter = meterService.getMeter(deviceId, meterId);
if (meter != null) {
MeterRequest meterRequest = DefaultMeterRequest.builder()
.withBands(meter.bands())
.withUnit(meter.unit())
.forDevice(deviceId)
.fromApp(appId)
.burst()
.remove();
meterService.withdraw(meterRequest, meterId);
}
}
}
}