| /* |
| * Copyright 2021-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.opencord.olt.impl; |
| |
| import com.google.common.collect.ImmutableMap; |
| import org.onlab.util.Identifier; |
| import org.onlab.util.KryoNamespace; |
| import org.onlab.util.Tools; |
| import org.onosproject.cfg.ComponentConfigService; |
| import org.onosproject.core.ApplicationId; |
| import org.onosproject.core.CoreService; |
| import org.onosproject.net.DeviceId; |
| import org.onosproject.net.flow.FlowEntry; |
| import org.onosproject.net.flow.FlowRule; |
| import org.onosproject.net.flow.FlowRuleService; |
| import org.onosproject.net.flow.TrafficTreatment; |
| import org.onosproject.net.flow.instructions.Instructions; |
| import org.onosproject.net.meter.Band; |
| import org.onosproject.net.meter.DefaultBand; |
| import org.onosproject.net.meter.DefaultMeterRequest; |
| import org.onosproject.net.meter.Meter; |
| import org.onosproject.net.meter.MeterContext; |
| import org.onosproject.net.meter.MeterEvent; |
| import org.onosproject.net.meter.MeterFailReason; |
| import org.onosproject.net.meter.MeterId; |
| import org.onosproject.net.meter.MeterKey; |
| import org.onosproject.net.meter.MeterListener; |
| import org.onosproject.net.meter.MeterRequest; |
| import org.onosproject.net.meter.MeterService; |
| import org.onosproject.net.meter.MeterState; |
| import org.onosproject.store.serializers.KryoNamespaces; |
| import org.onosproject.store.service.Serializer; |
| import org.onosproject.store.service.StorageService; |
| import org.opencord.olt.MeterData; |
| import org.opencord.olt.OltDeviceServiceInterface; |
| import org.opencord.olt.OltMeterServiceInterface; |
| import org.opencord.sadis.BandwidthProfileInformation; |
| import org.opencord.sadis.BaseInformationService; |
| import org.opencord.sadis.SadisService; |
| import org.opencord.sadis.SubscriberAndDeviceInformation; |
| import org.osgi.service.component.ComponentContext; |
| import org.osgi.service.component.annotations.Activate; |
| import org.osgi.service.component.annotations.Component; |
| import org.osgi.service.component.annotations.Deactivate; |
| import org.osgi.service.component.annotations.Modified; |
| import org.osgi.service.component.annotations.Reference; |
| import org.osgi.service.component.annotations.ReferenceCardinality; |
| import org.osgi.service.component.annotations.ReferencePolicy; |
| import org.slf4j.Logger; |
| |
| import java.util.ArrayList; |
| import java.util.Collection; |
| import java.util.Dictionary; |
| import java.util.HashMap; |
| import java.util.Iterator; |
| import java.util.LinkedList; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.Properties; |
| import java.util.concurrent.CompletableFuture; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.atomic.AtomicBoolean; |
| import java.util.concurrent.atomic.AtomicInteger; |
| import java.util.concurrent.locks.Lock; |
| import java.util.concurrent.locks.ReentrantReadWriteLock; |
| import java.util.stream.StreamSupport; |
| |
| import static com.google.common.base.Strings.isNullOrEmpty; |
| import static org.onlab.util.Tools.get; |
| import static org.onlab.util.Tools.groupedThreads; |
| import static org.opencord.olt.impl.OsgiPropertyConstants.*; |
| import static org.slf4j.LoggerFactory.getLogger; |
| |
| @Component(immediate = true, property = { |
| DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT, |
| ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT, |
| }) |
| public class OltMeterService implements OltMeterServiceInterface { |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected CoreService coreService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected ComponentConfigService cfgService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected StorageService storageService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected FlowRuleService flowRuleService; |
| |
| @Reference(cardinality = ReferenceCardinality.OPTIONAL, |
| bind = "bindSadisService", |
| unbind = "unbindSadisService", |
| policy = ReferencePolicy.DYNAMIC) |
| protected volatile SadisService sadisService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected MeterService meterService; |
| |
| @Reference(cardinality = ReferenceCardinality.MANDATORY) |
| protected OltDeviceServiceInterface oltDeviceService; |
| |
| private final Logger log = getLogger(getClass()); |
| protected BaseInformationService<BandwidthProfileInformation> bpService; |
| private ApplicationId appId; |
| private static final String APP_NAME = "org.opencord.olt"; |
| private final ReentrantReadWriteLock programmedMeterLock = new ReentrantReadWriteLock(); |
| private final Lock programmedMeterWriteLock = programmedMeterLock.writeLock(); |
| private final Lock programmedMeterReadLock = programmedMeterLock.readLock(); |
| |
| /** |
| * Programmed Meters status map. |
| * Keeps track of which meter is programmed on which device for which BandwidthProfile. |
| * The String key is the BandwidthProfile |
| */ |
| protected Map<DeviceId, Map<String, MeterData>> programmedMeters; |
| |
| private final MeterListener meterListener = new InternalMeterListener(); |
| protected ExecutorService pendingRemovalMetersExecutor = |
| Executors.newFixedThreadPool(5, groupedThreads("onos/olt", |
| "pending-removal-meters-%d", log)); |
| |
| /** |
| * Map that contains a list of meters that needs to be removed. |
| * We wait to get 3 METER_REFERENCE_COUNT_ZERO events before removing the meter |
| * so that we're sure no flow is referencing it. |
| */ |
| protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters; |
| |
| /** |
| * Number of consecutive meter events with empty reference count |
| * after which a meter gets removed from the device. |
| */ |
| protected int zeroReferenceMeterCount = 3; |
| |
| /** |
| * Delete meters when reference count drops to zero. |
| */ |
| protected boolean deleteMeters = DELETE_METERS_DEFAULT; |
| |
| @Activate |
| public void activate(ComponentContext context) { |
| appId = coreService.registerApplication(APP_NAME); |
| modified(context); |
| KryoNamespace serializer = KryoNamespace.newBuilder() |
| .register(KryoNamespaces.API) |
| .register(List.class) |
| .register(MeterData.class) |
| .register(MeterState.class) |
| .register(MeterKey.class) |
| .build(); |
| |
| programmedMeters = storageService.<DeviceId, Map<String, MeterData>>consistentMapBuilder() |
| .withName("volt-programmed-meters") |
| .withSerializer(Serializer.using(serializer)) |
| .withApplicationId(appId) |
| .build().asJavaMap(); |
| |
| pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder() |
| .withName("volt-pending-remove-meters") |
| .withSerializer(Serializer.using(serializer)) |
| .withApplicationId(appId) |
| .build().asJavaMap(); |
| |
| cfgService.registerProperties(getClass()); |
| |
| meterService.addListener(meterListener); |
| |
| log.info("Started"); |
| } |
| |
| @Modified |
| public void modified(ComponentContext context) { |
| Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties(); |
| |
| Boolean d = Tools.isPropertyEnabled(properties, DELETE_METERS); |
| if (d != null) { |
| deleteMeters = d; |
| } |
| |
| String zeroCount = get(properties, ZERO_REFERENCE_METER_COUNT); |
| int oldSubscriberProcessingThreads = zeroReferenceMeterCount; |
| zeroReferenceMeterCount = isNullOrEmpty(zeroCount) ? |
| oldSubscriberProcessingThreads : Integer.parseInt(zeroCount.trim()); |
| |
| log.info("Modified. Values = deleteMeters: {}, zeroReferenceMeterCount: {}", |
| deleteMeters, zeroReferenceMeterCount); |
| } |
| |
| @Deactivate |
| public void deactivate(ComponentContext context) { |
| cfgService.unregisterProperties(getClass(), false); |
| meterService.removeListener(meterListener); |
| log.info("Stopped"); |
| } |
| |
| @Override |
| public Map<DeviceId, Map<String, MeterData>> getProgrammedMeters() { |
| try { |
| programmedMeterReadLock.lock(); |
| return ImmutableMap.copyOf(programmedMeters); |
| } finally { |
| programmedMeterReadLock.unlock(); |
| } |
| } |
| |
| /** |
| * Will create a meter if needed and return true once available. |
| * |
| * @param deviceId DeviceId |
| * @param bandwidthProfile Bandwidth Profile Id |
| * @return true |
| */ |
| @Override |
| public synchronized boolean createMeter(DeviceId deviceId, String bandwidthProfile) { |
| |
| // NOTE it is possible that hasMeterByBandwidthProfile returns false has the meter is in PENDING_ADD |
| // then a different thread changes the meter to ADDED |
| // and thus hasPendingMeterByBandwidthProfile return false as well and we install the meter a second time |
| // this causes an inconsistency between the existing meter and meterId stored in the map |
| |
| if (!hasMeterByBandwidthProfile(deviceId, bandwidthProfile)) { |
| // NOTE this is at trace level as it's constantly called by the queue processor |
| if (log.isTraceEnabled()) { |
| log.trace("Missing meter for BandwidthProfile {} on device {}", bandwidthProfile, deviceId); |
| } |
| |
| if (!hasPendingMeterByBandwidthProfile(deviceId, bandwidthProfile)) { |
| createMeterForBp(deviceId, bandwidthProfile); |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("Meter is not yet available for {} on device {}", |
| bandwidthProfile, deviceId); |
| } |
| return false; |
| } |
| log.debug("Meter found for BandwidthProfile {} on device {}", bandwidthProfile, deviceId); |
| return true; |
| } |
| |
| @Override |
| public boolean createMeters(DeviceId deviceId, SubscriberAndDeviceInformation si, String multicastServiceName) { |
| // Each UniTagInformation has up to 4 meters, |
| // check and/or create all of them |
| AtomicBoolean waitingOnMeter = new AtomicBoolean(); |
| waitingOnMeter.set(false); |
| Map<String, List<String>> pendingMeters = new HashMap<>(); |
| si.uniTagList().forEach(uniTagInfo -> { |
| String serviceName = uniTagInfo.getServiceName(); |
| |
| if (multicastServiceName.equals(uniTagInfo.getServiceName())) { |
| log.debug("This is the multicast service ({}) for subscriber {} on {}, " + |
| "meters are not needed", |
| uniTagInfo.getServiceName(), si.id(), deviceId); |
| return; |
| } |
| |
| pendingMeters.put(serviceName, new LinkedList<>()); |
| String usBp = uniTagInfo.getUpstreamBandwidthProfile(); |
| String dsBp = uniTagInfo.getDownstreamBandwidthProfile(); |
| String oltUBp = uniTagInfo.getDownstreamOltBandwidthProfile(); |
| String oltDsBp = uniTagInfo.getUpstreamOltBandwidthProfile(); |
| if (!createMeter(deviceId, usBp)) { |
| pendingMeters.get(serviceName).add(usBp); |
| waitingOnMeter.set(true); |
| } |
| if (!createMeter(deviceId, dsBp)) { |
| pendingMeters.get(serviceName).add(usBp); |
| waitingOnMeter.set(true); |
| } |
| if (!createMeter(deviceId, oltUBp)) { |
| pendingMeters.get(serviceName).add(usBp); |
| waitingOnMeter.set(true); |
| } |
| if (!createMeter(deviceId, oltDsBp)) { |
| pendingMeters.get(serviceName).add(usBp); |
| waitingOnMeter.set(true); |
| } |
| }); |
| if (waitingOnMeter.get()) { |
| if (log.isTraceEnabled()) { |
| log.trace("Meters {} on device {} are not " + |
| "installed yet (requested by subscriber {})", |
| pendingMeters, deviceId, si.id()); |
| } |
| return false; |
| } |
| return true; |
| } |
| |
| /** |
| * Returns true if a meter is present in the programmed meters map, only if status is ADDED. |
| * |
| * @param deviceId the DeviceId on which to look for the meter |
| * @param bandwidthProfile the Bandwidth profile associated with this meter |
| * @return true if the meter is found |
| */ |
| public boolean hasMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) { |
| try { |
| programmedMeterReadLock.lock(); |
| Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId); |
| if (metersOnDevice == null || metersOnDevice.isEmpty()) { |
| return false; |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("added metersOnDevice {}: {}", deviceId, metersOnDevice); |
| } |
| return metersOnDevice.get(bandwidthProfile) != null && |
| metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED); |
| } finally { |
| programmedMeterReadLock.unlock(); |
| } |
| } |
| |
| public boolean hasPendingMeterByBandwidthProfile(DeviceId deviceId, String bandwidthProfile) { |
| try { |
| programmedMeterReadLock.lock(); |
| Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId); |
| if (metersOnDevice == null || metersOnDevice.isEmpty()) { |
| return false; |
| } |
| if (log.isTraceEnabled()) { |
| log.trace("pending metersOnDevice {}: {}", deviceId, metersOnDevice); |
| } |
| // NOTE that we check in order if the meter was ADDED and if it wasn't we check for PENDING_ADD |
| // it is possible that a different thread move the meter state from PENDING_ADD |
| // to ADDED between these two checks |
| // to avoid creating the meter twice we return true event if the meter is already added |
| return metersOnDevice.get(bandwidthProfile) != null && ( |
| metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.ADDED) || |
| metersOnDevice.get(bandwidthProfile).getMeterStatus().equals(MeterState.PENDING_ADD) |
| ); |
| |
| } finally { |
| programmedMeterReadLock.unlock(); |
| } |
| } |
| |
| public MeterId getMeterIdForBandwidthProfile(DeviceId deviceId, String bandwidthProfile) { |
| try { |
| programmedMeterReadLock.lock(); |
| Map<String, MeterData> metersOnDevice = programmedMeters.get(deviceId); |
| if (metersOnDevice == null || metersOnDevice.isEmpty()) { |
| return null; |
| } |
| MeterData meterData = metersOnDevice.get(bandwidthProfile); |
| if (meterData == null || meterData.getMeterStatus() != MeterState.ADDED) { |
| return null; |
| } |
| if (log.isTraceEnabled()) { |
| log.debug("Found meter {} on device {} for bandwidth profile {}", |
| meterData.getMeterId(), deviceId, bandwidthProfile); |
| } |
| return meterData.getMeterId(); |
| } finally { |
| programmedMeterReadLock.unlock(); |
| } |
| } |
| |
| @Override |
| public void purgeDeviceMeters(DeviceId deviceId) { |
| log.debug("Purging meters on device {}", deviceId); |
| meterService.purgeMeters(deviceId); |
| |
| // after we purge the meters we also need to clear the map |
| try { |
| programmedMeterWriteLock.lock(); |
| programmedMeters.remove(deviceId); |
| } finally { |
| programmedMeterWriteLock.unlock(); |
| } |
| |
| // and clear the event count |
| // NOTE do we need a lock? |
| pendingRemoveMeters.remove(deviceId); |
| } |
| |
| /** |
| * Creates of a meter for a given Bandwidth Profile on a given device. |
| * |
| * @param deviceId the DeviceId |
| * @param bandwidthProfile the BandwidthProfile ID |
| */ |
| public void createMeterForBp(DeviceId deviceId, String bandwidthProfile) { |
| // adding meter in pending state to the programmedMeter map |
| try { |
| programmedMeterWriteLock.lock(); |
| programmedMeters.compute(deviceId, (d, deviceMeters) -> { |
| |
| if (deviceMeters == null) { |
| deviceMeters = new HashMap<>(); |
| } |
| // NOTE that this method is only called after verifying a |
| // meter for this BP does not already exist |
| MeterData meterData = new MeterData( |
| null, |
| MeterState.PENDING_ADD, |
| bandwidthProfile |
| ); |
| deviceMeters.put(bandwidthProfile, meterData); |
| |
| return deviceMeters; |
| }); |
| } finally { |
| programmedMeterWriteLock.unlock(); |
| } |
| |
| BandwidthProfileInformation bpInfo = getBandwidthProfileInformation(bandwidthProfile); |
| if (bpInfo == null) { |
| log.error("BandwidthProfile {} information not found in sadis", bandwidthProfile); |
| return; |
| } |
| |
| log.info("Creating meter for BandwidthProfile {} on device {}", bpInfo.id(), deviceId); |
| |
| if (log.isTraceEnabled()) { |
| log.trace("BandwidthProfile: {}", bpInfo); |
| } |
| try { |
| List<Band> meterBands = createMeterBands(bpInfo); |
| |
| log.info("Meter bands {} for bwp {}", meterBands, bpInfo); |
| |
| CompletableFuture<Object> meterFuture = new CompletableFuture<>(); |
| |
| MeterRequest meterRequest = DefaultMeterRequest.builder() |
| .withBands(meterBands) |
| .withUnit(Meter.Unit.KB_PER_SEC) |
| .withContext(new MeterContext() { |
| @Override |
| public void onSuccess(MeterRequest op) { |
| log.info("Meter for BandwidthProfile {} is installed on the device {}", |
| bandwidthProfile, deviceId); |
| meterFuture.complete(null); |
| } |
| |
| @Override |
| public void onError(MeterRequest op, MeterFailReason reason) { |
| log.error("Failed installing meter on {} for {}", |
| deviceId, bandwidthProfile); |
| meterFuture.complete(reason); |
| } |
| }) |
| .forDevice(deviceId) |
| .fromApp(appId) |
| .burst() |
| .add(); |
| |
| // creating the meter |
| Meter meter = meterService.submit(meterRequest); |
| |
| // wait for the meter to be completed |
| meterFuture.thenAccept(error -> { |
| if (error != null) { |
| log.error("Cannot create meter, TODO address me"); |
| } |
| |
| // then update the map with the MeterId |
| try { |
| programmedMeterWriteLock.lock(); |
| programmedMeters.compute(deviceId, (d, entry) -> { |
| if (entry != null) { |
| entry.compute(bandwidthProfile, (bp, meterData) -> { |
| if (meterData != null) { |
| meterData.setMeterCellId(meter.meterCellId()); |
| meterData.setMeterStatus(MeterState.ADDED); |
| } |
| return meterData; |
| }); |
| } |
| return entry; |
| }); |
| } finally { |
| programmedMeterWriteLock.unlock(); |
| } |
| }); |
| } catch (Exception e) { |
| log.error("", e); |
| } |
| } |
| |
| private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) { |
| List<Band> meterBands = new ArrayList<>(); |
| |
| // add cir |
| if (bpInfo.committedInformationRate() != 0) { |
| meterBands.add(createMeterBand(bpInfo.committedInformationRate(), |
| bpInfo.committedBurstSize(), Band.Type.DROP, null)); |
| } |
| |
| // check if both air and gir are set together in sadis |
| // if they are, set air to 0 |
| if (bpInfo.assuredInformationRate() != 0 && bpInfo.guaranteedInformationRate() != 0) { |
| bpInfo.setAssuredInformationRate(0); |
| } |
| |
| // add pir |
| long pir = bpInfo.peakInformationRate() != 0 ? bpInfo.peakInformationRate() : (bpInfo.exceededInformationRate() |
| + bpInfo.committedInformationRate() + bpInfo.guaranteedInformationRate() |
| + bpInfo.assuredInformationRate()); |
| |
| Long pbs = bpInfo.peakBurstSize() != null ? bpInfo.peakBurstSize() : |
| (bpInfo.exceededBurstSize() != null ? bpInfo.exceededBurstSize() : 0) + |
| (bpInfo.committedBurstSize() != null ? bpInfo.committedBurstSize() : 0); |
| |
| meterBands.add(createMeterBand(pir, pbs, Band.Type.REMARK, (short) 1)); |
| |
| // add gir |
| //We can use DROP here because it GIr will never be equals to cir so rate will always be different. |
| if (bpInfo.guaranteedInformationRate() != 0) { |
| meterBands.add(createMeterBand(bpInfo.guaranteedInformationRate(), 0L, Band.Type.DROP, null)); |
| } |
| |
| // add air |
| // air is used in place of gir only if gir is |
| // not present and air is not 0, see line 330. |
| // Included for backwards compatibility, will be removed in VOLTHA 2.9. |
| // Using Band.Type.NONE is ok because this will be removed. |
| if (bpInfo.assuredInformationRate() != 0) { |
| meterBands.add(createMeterBand(bpInfo.assuredInformationRate(), 0L, Band.Type.DROP, null)); |
| } |
| |
| return meterBands; |
| } |
| |
| private Band createMeterBand(long rate, Long burst, Band.Type type, Short precedence) { |
| Band.Builder bandBuilder = DefaultBand.builder() |
| .withRate(rate) //already Kbps |
| .burstSize(burst) // already Kbits |
| .ofType(type); // no matter |
| if (precedence != null) { |
| bandBuilder.dropPrecedence(precedence); |
| } |
| |
| return bandBuilder.build(); |
| } |
| |
| private BandwidthProfileInformation getBandwidthProfileInformation(String bandwidthProfile) { |
| if (!checkSadisRunning()) { |
| return null; |
| } |
| if (bandwidthProfile == null) { |
| return null; |
| } |
| return bpService.get(bandwidthProfile); |
| } |
| |
| private boolean checkSadisRunning() { |
| if (bpService == null) { |
| log.warn("Sadis is not running"); |
| return false; |
| } |
| return true; |
| } |
| |
| private class InternalMeterListener implements MeterListener { |
| @Override |
| public void event(MeterEvent meterEvent) { |
| pendingRemovalMetersExecutor.execute(() -> { |
| |
| Meter meter = meterEvent.subject(); |
| if (!appId.equals(meter.appId())) { |
| return; |
| } |
| |
| if (log.isTraceEnabled()) { |
| log.trace("Received meter event {}", meterEvent); |
| } |
| MeterKey key = MeterKey.key(meter.deviceId(), meter.id()); |
| if (meterEvent.type().equals(MeterEvent.Type.METER_REFERENCE_COUNT_ZERO)) { |
| if (!oltDeviceService.isLocalLeader(meter.deviceId())) { |
| if (log.isTraceEnabled()) { |
| log.trace("ignoring meter event {} " + |
| "as not leader for {}", meterEvent, meter.deviceId()); |
| } |
| return; |
| } |
| log.info("Zero Count Reference event is received for meter {} on {}, " + |
| "incrementing counter", |
| meter.id(), meter.deviceId()); |
| incrementMeterCount(meter.deviceId(), key); |
| if (pendingRemoveMeters.get(meter.deviceId()) |
| .get(key).get() >= zeroReferenceMeterCount) { |
| // only delete the meters if the app is configured to do so |
| if (deleteMeters) { |
| // Check if there's any pending flow referencing that meter. |
| if (isUsedByPendingAddFlow(meter)) { |
| log.info("Meter {} is still being referenced by pending flows, avoiding removal.", |
| meter.id()); |
| removeMeterCount(meter, key); |
| return; |
| } |
| log.info("Meter {} on device {} is unused, removing it", meter.id(), meter.deviceId()); |
| deleteMeter(meter.deviceId(), meter.id()); |
| } |
| } |
| } |
| |
| if (meterEvent.type().equals(MeterEvent.Type.METER_REMOVED)) { |
| removeMeterCount(meter, key); |
| } |
| }); |
| } |
| |
| private boolean isUsedByPendingAddFlow(Meter meter) { |
| Long meterId = meter.id().id(); |
| Iterable<FlowEntry> pendingAddFlows = flowRuleService.getFlowEntriesByState(meter.deviceId(), |
| FlowEntry.FlowEntryState.PENDING_ADD); |
| return StreamSupport.stream(pendingAddFlows.spliterator(), true) |
| .map(FlowRule::treatment) |
| .map(TrafficTreatment::meters) |
| .flatMap(Collection::parallelStream) |
| .map(Instructions.MeterInstruction::meterId) |
| .map(Identifier::id) |
| .anyMatch(meterId::equals); |
| } |
| private void removeMeterCount(Meter meter, MeterKey key) { |
| pendingRemoveMeters.computeIfPresent(meter.deviceId(), |
| (id, meters) -> { |
| if (meters.get(key) == null) { |
| log.info("Meters is not pending " + |
| "{} on {}", key, id); |
| return meters; |
| } |
| meters.remove(key); |
| return meters; |
| }); |
| } |
| |
| private void incrementMeterCount(DeviceId deviceId, MeterKey key) { |
| if (key == null) { |
| return; |
| } |
| pendingRemoveMeters.compute(deviceId, |
| (id, meters) -> { |
| if (meters == null) { |
| meters = new HashMap<>(); |
| |
| } |
| if (meters.get(key) == null) { |
| meters.put(key, new AtomicInteger(1)); |
| } |
| meters.get(key).addAndGet(1); |
| return meters; |
| }); |
| } |
| } |
| |
| private void deleteMeter(DeviceId deviceId, MeterId meterId) { |
| Meter meter = meterService.getMeter(deviceId, meterId); |
| if (meter != null) { |
| MeterRequest meterRequest = DefaultMeterRequest.builder() |
| .withBands(meter.bands()) |
| .withUnit(meter.unit()) |
| .forDevice(deviceId) |
| .fromApp(appId) |
| .burst() |
| .remove(); |
| |
| meterService.withdraw(meterRequest, meterId); |
| } |
| |
| // remove the meter from local caching |
| try { |
| programmedMeterWriteLock.lock(); |
| programmedMeters.computeIfPresent(deviceId, (d, deviceMeters) -> { |
| Iterator<Map.Entry<String, MeterData>> iter = deviceMeters.entrySet().iterator(); |
| while (iter.hasNext()) { |
| Map.Entry<String, MeterData> entry = iter.next(); |
| if (entry.getValue().getMeterId().equals(meterId)) { |
| deviceMeters.remove(entry.getKey()); |
| } |
| } |
| return deviceMeters; |
| }); |
| } finally { |
| programmedMeterWriteLock.unlock(); |
| } |
| } |
| |
| protected void bindSadisService(SadisService service) { |
| this.bpService = service.getBandwidthProfileService(); |
| log.info("Sadis service is loaded"); |
| } |
| |
| protected void unbindSadisService(SadisService service) { |
| this.bpService = null; |
| log.info("Sadis service is unloaded"); |
| } |
| } |