SEBA-815 Multi Tcont support by OLT app.
Change-Id: I024ef2fcb3d3e59cc86bd2088726ae513fcff796
diff --git a/app/src/main/java/org/opencord/olt/impl/OltMeterService.java b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
new file mode 100644
index 0000000..df434eb
--- /dev/null
+++ b/app/src/main/java/org/opencord/olt/impl/OltMeterService.java
@@ -0,0 +1,306 @@
+/*
+ * Copyright 2016-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.olt.impl;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+import org.apache.felix.scr.annotations.Activate;
+import org.apache.felix.scr.annotations.Component;
+import org.apache.felix.scr.annotations.Deactivate;
+import org.apache.felix.scr.annotations.Modified;
+import org.apache.felix.scr.annotations.Property;
+import org.apache.felix.scr.annotations.Reference;
+import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.apache.felix.scr.annotations.Service;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.core.ApplicationId;
+import org.onosproject.core.CoreService;
+import org.onosproject.net.DeviceId;
+import org.onosproject.net.flowobjective.ObjectiveError;
+import org.onosproject.net.meter.Band;
+import org.onosproject.net.meter.DefaultBand;
+import org.onosproject.net.meter.DefaultMeterRequest;
+import org.onosproject.net.meter.Meter;
+import org.onosproject.net.meter.MeterContext;
+import org.onosproject.net.meter.MeterEvent;
+import org.onosproject.net.meter.MeterFailReason;
+import org.onosproject.net.meter.MeterId;
+import org.onosproject.net.meter.MeterKey;
+import org.onosproject.net.meter.MeterListener;
+import org.onosproject.net.meter.MeterRequest;
+import org.onosproject.net.meter.MeterService;
+import org.opencord.olt.internalapi.AccessDeviceMeterService;
+import org.opencord.sadis.BandwidthProfileInformation;
+import org.osgi.service.component.ComponentContext;
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+
+import java.util.Dictionary;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.onlab.util.Tools.groupedThreads;
+import static org.slf4j.LoggerFactory.getLogger;
+
+@Service
+@Component(immediate = true)
+public class OltMeterService implements AccessDeviceMeterService {
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected MeterService meterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected CoreService coreService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ComponentConfigService componentConfigService;
+
+ @Property(name = "deleteMeters", boolValue = true,
+ label = "Deleting Meters based on flow count statistics")
+ protected boolean deleteMeters = true;
+
+ protected Map<String, Set<MeterKey>> bpInfoToMeter;
+ protected Set<MeterKey> programmedMeters;
+ private ApplicationId appId;
+ private static final String APP_NAME = "org.opencord.olt";
+
+ private final MeterListener meterListener = new InternalMeterListener();
+
+ private final Logger log = getLogger(getClass());
+
+ protected ExecutorService eventExecutor;
+
+ @Activate
+ public void activate(ComponentContext context) {
+ eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
+ "events-%d", log));
+ appId = coreService.registerApplication(APP_NAME);
+ bpInfoToMeter = Maps.newConcurrentMap();
+ programmedMeters = Sets.newConcurrentHashSet();
+ meterService.addListener(meterListener);
+ componentConfigService.registerProperties(getClass());
+ log.info("Olt Meter service started");
+ }
+
+ @Deactivate
+ public void deactivate() {
+ meterService.removeListener(meterListener);
+ }
+
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
+
+ Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
+ if (d != null) {
+ deleteMeters = d;
+ }
+ }
+
+ @Override
+ public ImmutableMap<String, Set<MeterKey>> getBpMeterMappings() {
+ return ImmutableMap.copyOf(bpInfoToMeter);
+ }
+
+ @Override
+ public void addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
+ bpInfoToMeter.compute(bandwidthProfile, (k, v) -> {
+ if (v == null) {
+ return Sets.newHashSet(MeterKey.key(deviceId, meterId));
+ } else {
+ v.add(MeterKey.key(deviceId, meterId));
+ return v;
+ }
+ });
+ }
+
+ @Override
+ public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
+ if (bpInfoToMeter.get(bandwidthProfile) == null) {
+ log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
+ bandwidthProfile);
+ return null;
+ }
+
+ Optional<MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile)
+ .stream()
+ .filter(meterKey -> meterKey.deviceId().equals(deviceId))
+ .findFirst();
+ if (meterKeyForDevice.isPresent()) {
+ log.debug("Found meter {} for bandwidth profile {}",
+ meterKeyForDevice.get().meterId(), bandwidthProfile);
+ return meterKeyForDevice.get().meterId();
+ } else {
+ log.warn("Bandwidth profile '{}' is not currently mapped to a meter",
+ bandwidthProfile);
+ return null;
+ }
+ }
+
+ @Override
+ public ImmutableSet<MeterKey> getProgMeters() {
+ return ImmutableSet.copyOf(programmedMeters);
+ }
+
+ @Override
+ public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
+ CompletableFuture<Object> meterFuture) {
+ if (bpInfo == null) {
+ log.warn("Requested bandwidth profile information is NULL");
+ meterFuture.complete(ObjectiveError.BADPARAMS);
+ return null;
+ }
+
+ MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
+ if (meterId != null) {
+ log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
+ meterFuture.complete(null);
+ return meterId;
+ }
+
+ List<Band> meterBands = createMeterBands(bpInfo);
+
+ final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
+ MeterRequest meterRequest = DefaultMeterRequest.builder()
+ .withBands(meterBands)
+ .withUnit(Meter.Unit.KB_PER_SEC)
+ .withContext(new MeterContext() {
+ @Override
+ public void onSuccess(MeterRequest op) {
+ meterFuture.complete(null);
+ }
+
+ @Override
+ public void onError(MeterRequest op, MeterFailReason reason) {
+ bpInfoToMeter.remove(MeterKey.key(deviceId, meterIdRef.get()));
+ meterFuture.complete(reason);
+ }
+ })
+ .forDevice(deviceId)
+ .fromApp(appId)
+ .burst()
+ .add();
+
+ Meter meter = meterService.submit(meterRequest);
+ meterIdRef.set(meter.id());
+ addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
+ programmedMeters.add(MeterKey.key(deviceId, meter.id()));
+ log.info("Meter is created. Meter Id {}", meter.id());
+ return meter.id();
+ }
+
+ private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
+ List<Band> meterBands = new ArrayList<>();
+
+ meterBands.add(createMeterBand(bpInfo.committedInformationRate(), bpInfo.committedBurstSize()));
+ meterBands.add(createMeterBand(bpInfo.exceededInformationRate(), bpInfo.exceededBurstSize()));
+ meterBands.add(createMeterBand(bpInfo.assuredInformationRate(), 0L));
+
+ return meterBands;
+ }
+
+ private Band createMeterBand(long rate, Long burst) {
+ return DefaultBand.builder()
+ .withRate(rate) //already Kbps
+ .burstSize(burst) // already Kbits
+ .ofType(Band.Type.DROP) // no matter
+ .build();
+ }
+
+ private class InternalMeterListener implements MeterListener {
+
+ Map<MeterKey, AtomicInteger> pendingRemoveMeters = Maps.newConcurrentMap();
+
+ @Override
+ public void event(MeterEvent meterEvent) {
+ eventExecutor.execute(() -> {
+ Meter meter = meterEvent.subject();
+ MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
+ if (deleteMeters && MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
+ log.info("Zero Count Meter Event is received. Meter is {}", meter.id());
+ incrementMeterCount(key);
+
+ if (meter != null && appId.equals(meter.appId()) && pendingRemoveMeters.get(key).get() == 3) {
+ log.info("Deleting unreferenced, no longer programmed Meter {}", meter.id());
+ deleteMeter(meter.deviceId(), meter.id());
+ }
+ }
+ if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
+ log.info("Meter Removed Event is received for {}", meter.id());
+ programmedMeters.remove(key);
+ pendingRemoveMeters.remove(key);
+ removeMeterFromBpMapping(meter);
+ }
+ });
+ }
+
+ private void incrementMeterCount(MeterKey key) {
+ if (key == null) {
+ return;
+ }
+ pendingRemoveMeters.compute(key,
+ (k, v) -> {
+ if (v == null) {
+ return new AtomicInteger(1);
+ }
+ v.addAndGet(1);
+ return v;
+ });
+ }
+
+ private void deleteMeter(DeviceId deviceId, MeterId meterId) {
+ Meter meter = meterService.getMeter(deviceId, meterId);
+ if (meter != null) {
+ MeterRequest meterRequest = DefaultMeterRequest.builder()
+ .withBands(meter.bands())
+ .withUnit(meter.unit())
+ .forDevice(deviceId)
+ .fromApp(appId)
+ .burst()
+ .remove();
+
+ meterService.withdraw(meterRequest, meterId);
+ }
+ }
+
+ private void removeMeterFromBpMapping(Meter meter) {
+ MeterKey meterKey = MeterKey.key(meter.deviceId(), meter.id());
+ Iterator<Map.Entry<String, Set<MeterKey>>> iterator = bpInfoToMeter.entrySet().iterator();
+ while (iterator.hasNext()) {
+ Map.Entry<String, Set<MeterKey>> entry = iterator.next();
+ if (entry.getValue().contains(meterKey)) {
+ iterator.remove();
+ log.info("Deleted meter for MeterKey {} - Last prog meters {}", meterKey, programmedMeters);
+ break;
+ }
+ }
+ }
+ }
+}