blob: 4b883441f4e5c46fc11e429e9df031af47215cbf [file] [log] [blame]
Andrea Campanellacbbb7952019-11-25 06:38:41 +00001/*
2 * Copyright 2016-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.olt.impl;
17
Andrea Campanella7e1eb712021-09-22 14:27:35 +020018import static com.google.common.base.Strings.isNullOrEmpty;
Saurav Dasf62cea82020-08-26 17:43:04 -070019import static java.util.stream.Collectors.collectingAndThen;
20import static java.util.stream.Collectors.groupingBy;
21import static java.util.stream.Collectors.mapping;
22import static java.util.stream.Collectors.toSet;
Andrea Campanella7e1eb712021-09-22 14:27:35 +020023import static org.onlab.util.Tools.get;
Saurav Dasf62cea82020-08-26 17:43:04 -070024import static org.onlab.util.Tools.groupedThreads;
Andrea Campanella7e1eb712021-09-22 14:27:35 +020025import static org.opencord.olt.impl.OsgiPropertyConstants.*;
Saurav Dasf62cea82020-08-26 17:43:04 -070026import static org.slf4j.LoggerFactory.getLogger;
27
28import java.util.ArrayList;
29import java.util.Collection;
30import java.util.Dictionary;
31import java.util.HashMap;
32import java.util.HashSet;
33import java.util.List;
34import java.util.Map;
35import java.util.Optional;
36import java.util.Properties;
37import java.util.Set;
38import java.util.concurrent.CompletableFuture;
39import java.util.concurrent.ExecutorService;
40import java.util.concurrent.Executors;
41import java.util.concurrent.atomic.AtomicInteger;
42import java.util.concurrent.atomic.AtomicReference;
43import java.util.stream.Collectors;
44
Jonathan Hart4f178fa2020-02-03 10:46:01 -080045import org.onlab.util.KryoNamespace;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000046import org.onlab.util.Tools;
47import org.onosproject.cfg.ComponentConfigService;
Andrea Campanella7e1eb712021-09-22 14:27:35 +020048import org.onosproject.cluster.ClusterService;
49import org.onosproject.cluster.LeadershipService;
50import org.onosproject.cluster.NodeId;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000051import org.onosproject.core.ApplicationId;
52import org.onosproject.core.CoreService;
Andrea Campanella7e1eb712021-09-22 14:27:35 +020053import org.onosproject.mastership.MastershipService;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000054import org.onosproject.net.DeviceId;
Andrea Campanella7e1eb712021-09-22 14:27:35 +020055import org.onosproject.net.device.DeviceService;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000056import org.onosproject.net.flowobjective.ObjectiveError;
57import org.onosproject.net.meter.Band;
58import org.onosproject.net.meter.DefaultBand;
59import org.onosproject.net.meter.DefaultMeterRequest;
60import org.onosproject.net.meter.Meter;
61import org.onosproject.net.meter.MeterContext;
62import org.onosproject.net.meter.MeterEvent;
63import org.onosproject.net.meter.MeterFailReason;
64import org.onosproject.net.meter.MeterId;
65import org.onosproject.net.meter.MeterKey;
66import org.onosproject.net.meter.MeterListener;
67import org.onosproject.net.meter.MeterRequest;
68import org.onosproject.net.meter.MeterService;
Jonathan Hart4f178fa2020-02-03 10:46:01 -080069import org.onosproject.store.serializers.KryoNamespaces;
70import org.onosproject.store.service.ConsistentMultimap;
71import org.onosproject.store.service.Serializer;
72import org.onosproject.store.service.StorageService;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000073import org.opencord.olt.internalapi.AccessDeviceMeterService;
74import org.opencord.sadis.BandwidthProfileInformation;
75import org.osgi.service.component.ComponentContext;
76import org.osgi.service.component.annotations.Activate;
77import org.osgi.service.component.annotations.Component;
78import org.osgi.service.component.annotations.Deactivate;
79import org.osgi.service.component.annotations.Modified;
80import org.osgi.service.component.annotations.Reference;
81import org.osgi.service.component.annotations.ReferenceCardinality;
82import org.slf4j.Logger;
83
Saurav Dasf62cea82020-08-26 17:43:04 -070084import com.google.common.collect.ImmutableMap;
85import com.google.common.collect.ImmutableSet;
Andrea Campanellacbbb7952019-11-25 06:38:41 +000086
87/**
88 * Provisions Meters on access devices.
89 */
90@Component(immediate = true, property = {
91 DELETE_METERS + ":Boolean=" + DELETE_METERS_DEFAULT,
Andrea Campanella7e1eb712021-09-22 14:27:35 +020092 ZERO_REFERENCE_METER_COUNT + ":Integer=" + ZERO_REFERENCE_METER_COUNT_DEFAULT,
Andrea Campanellacbbb7952019-11-25 06:38:41 +000093 })
94public class OltMeterService implements AccessDeviceMeterService {
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected MeterService meterService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected CoreService coreService;
101
102 @Reference(cardinality = ReferenceCardinality.MANDATORY)
103 protected ComponentConfigService componentConfigService;
104
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800105 @Reference(cardinality = ReferenceCardinality.MANDATORY)
106 protected StorageService storageService;
107
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200108 @Reference(cardinality = ReferenceCardinality.MANDATORY)
109 protected DeviceService deviceService;
110
111 @Reference(cardinality = ReferenceCardinality.MANDATORY)
112 protected ClusterService clusterService;
113
114 @Reference(cardinality = ReferenceCardinality.MANDATORY)
115 protected MastershipService mastershipService;
116
117 @Reference(cardinality = ReferenceCardinality.MANDATORY)
118 protected LeadershipService leadershipService;
119
Saurav Dasf62cea82020-08-26 17:43:04 -0700120 /**
121 * Delete meters when reference count drops to zero.
122 */
123 protected boolean deleteMeters = DELETE_METERS_DEFAULT;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000124
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200125 /**
126 * Number of Zero References received before deleting the meter.
127 */
128 protected int zeroReferenceMeterCount = ZERO_REFERENCE_METER_COUNT_DEFAULT;
129
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000130 private ApplicationId appId;
131 private static final String APP_NAME = "org.opencord.olt";
132
133 private final MeterListener meterListener = new InternalMeterListener();
134
135 private final Logger log = getLogger(getClass());
136
137 protected ExecutorService eventExecutor;
138
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000139 protected Map<DeviceId, Set<BandwidthProfileInformation>> pendingMeters;
140 protected Map<DeviceId, Map<MeterKey, AtomicInteger>> pendingRemoveMeters;
141 protected ConsistentMultimap<String, MeterKey> bpInfoToMeter;
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200142
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000143 @Activate
144 public void activate(ComponentContext context) {
145 eventExecutor = Executors.newFixedThreadPool(5, groupedThreads("onos/olt",
146 "events-%d", log));
147 appId = coreService.registerApplication(APP_NAME);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800148 modified(context);
149
150 KryoNamespace serializer = KryoNamespace.newBuilder()
151 .register(KryoNamespaces.API)
152 .register(MeterKey.class)
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000153 .register(BandwidthProfileInformation.class)
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800154 .build();
155
156 bpInfoToMeter = storageService.<String, MeterKey>consistentMultimapBuilder()
157 .withName("volt-bp-info-to-meter")
158 .withSerializer(Serializer.using(serializer))
159 .withApplicationId(appId)
160 .build();
161
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000162 meterService.addListener(meterListener);
163 componentConfigService.registerProperties(getClass());
Ilayda Ozdemir90a93622021-02-25 09:40:58 +0000164 pendingMeters = storageService.<DeviceId, Set<BandwidthProfileInformation>>consistentMapBuilder()
165 .withName("volt-pending-meters")
166 .withSerializer(Serializer.using(serializer))
167 .withApplicationId(appId)
168 .build().asJavaMap();
169 pendingRemoveMeters = storageService.<DeviceId, Map<MeterKey, AtomicInteger>>consistentMapBuilder()
170 .withName("volt-pending-remove-meters")
171 .withSerializer(Serializer.using(serializer))
172 .withApplicationId(appId)
173 .build().asJavaMap();
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000174 log.info("Olt Meter service started");
175 }
176
177 @Deactivate
178 public void deactivate() {
179 meterService.removeListener(meterListener);
180 }
181
182
183 @Modified
184 public void modified(ComponentContext context) {
185 Dictionary<?, ?> properties = context != null ? context.getProperties() : new Properties();
186
187 Boolean d = Tools.isPropertyEnabled(properties, "deleteMeters");
188 if (d != null) {
189 deleteMeters = d;
190 }
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200191
192 String zeroReferenceMeterCountNew = get(properties, ZERO_REFERENCE_METER_COUNT);
193 zeroReferenceMeterCount = isNullOrEmpty(zeroReferenceMeterCountNew) ? ZERO_REFERENCE_METER_COUNT_DEFAULT :
194 Integer.parseInt(zeroReferenceMeterCountNew.trim());
195
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000196 }
197
198 @Override
199 public ImmutableMap<String, Collection<MeterKey>> getBpMeterMappings() {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800200 return bpInfoToMeter.stream()
201 .collect(collectingAndThen(
202 groupingBy(Map.Entry::getKey, mapping(Map.Entry::getValue, toSet())),
203 ImmutableMap::copyOf));
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000204 }
205
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700206 boolean addMeterIdToBpMapping(DeviceId deviceId, MeterId meterId, String bandwidthProfile) {
Andrea Campanella0c3309d2020-05-29 01:51:18 -0700207 log.debug("adding bp {} to meter {} mapping for device {}",
208 bandwidthProfile, meterId, deviceId);
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700209 return bpInfoToMeter.put(bandwidthProfile, MeterKey.key(deviceId, meterId));
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000210 }
211
212 @Override
213 public MeterId getMeterIdFromBpMapping(DeviceId deviceId, String bandwidthProfile) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000214 if (bandwidthProfile == null) {
215 log.warn("Bandwidth Profile requested is null");
216 return null;
217 }
218 if (bpInfoToMeter.get(bandwidthProfile) == null) {
219 log.warn("Bandwidth Profile '{}' is not present in the map",
220 bandwidthProfile);
221 return null;
222 }
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800223 if (bpInfoToMeter.get(bandwidthProfile).value().isEmpty()) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000224 log.warn("Bandwidth Profile '{}' is not currently mapped to a meter",
225 bandwidthProfile);
226 return null;
227 }
228
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800229 Optional<? extends MeterKey> meterKeyForDevice = bpInfoToMeter.get(bandwidthProfile).value()
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000230 .stream()
231 .filter(meterKey -> meterKey.deviceId().equals(deviceId))
232 .findFirst();
233 if (meterKeyForDevice.isPresent()) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700234 log.debug("Found meter {} for bandwidth profile {} on {}",
235 meterKeyForDevice.get().meterId(), bandwidthProfile, deviceId);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000236 return meterKeyForDevice.get().meterId();
237 } else {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700238 log.warn("Bandwidth Profile '{}' is not currently mapped to a meter on {} , {}",
239 bandwidthProfile, deviceId, bpInfoToMeter.get(bandwidthProfile).value());
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000240 return null;
241 }
242 }
243
244 @Override
245 public ImmutableSet<MeterKey> getProgMeters() {
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800246 return bpInfoToMeter.stream()
247 .map(Map.Entry::getValue)
248 .collect(ImmutableSet.toImmutableSet());
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000249 }
250
251 @Override
252 public MeterId createMeter(DeviceId deviceId, BandwidthProfileInformation bpInfo,
253 CompletableFuture<Object> meterFuture) {
Saurav Dasf62cea82020-08-26 17:43:04 -0700254 log.debug("Creating meter on {} for {}", deviceId, bpInfo);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000255 if (bpInfo == null) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700256 log.warn("Requested bandwidth profile on {} information is NULL", deviceId);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000257 meterFuture.complete(ObjectiveError.BADPARAMS);
258 return null;
259 }
260
261 MeterId meterId = getMeterIdFromBpMapping(deviceId, bpInfo.id());
262 if (meterId != null) {
263 log.debug("Meter {} was previously created for bp {}", meterId, bpInfo.id());
264 meterFuture.complete(null);
265 return meterId;
266 }
267
268 List<Band> meterBands = createMeterBands(bpInfo);
269
270 final AtomicReference<MeterId> meterIdRef = new AtomicReference<>();
271 MeterRequest meterRequest = DefaultMeterRequest.builder()
272 .withBands(meterBands)
273 .withUnit(Meter.Unit.KB_PER_SEC)
274 .withContext(new MeterContext() {
275 @Override
276 public void onSuccess(MeterRequest op) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700277 log.debug("Meter {} for {} is installed on the device {}",
278 meterIdRef.get(), bpInfo.id(), deviceId);
279 boolean added = addMeterIdToBpMapping(deviceId, meterIdRef.get(), bpInfo.id());
280 if (added) {
281 meterFuture.complete(null);
282 } else {
283 log.error("Failed to add Meter {} for {} on {} to the meter-bandwidth mapping",
284 meterIdRef.get(), bpInfo.id(), deviceId);
285 meterFuture.complete(ObjectiveError.UNKNOWN);
286 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000287 }
288
289 @Override
290 public void onError(MeterRequest op, MeterFailReason reason) {
Andrea Campanellac727a372020-06-09 17:34:38 +0200291 log.error("Failed installing meter {} on {} for {}",
292 meterIdRef.get(), deviceId, bpInfo.id());
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000293 bpInfoToMeter.remove(bpInfo.id(),
294 MeterKey.key(deviceId, meterIdRef.get()));
295 meterFuture.complete(reason);
296 }
297 })
298 .forDevice(deviceId)
299 .fromApp(appId)
300 .burst()
301 .add();
302
303 Meter meter = meterService.submit(meterRequest);
304 meterIdRef.set(meter.id());
Saurav Dasf62cea82020-08-26 17:43:04 -0700305 log.info("Meter {} created and sent for installation on {} for {}",
306 meter.id(), deviceId, bpInfo);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000307 return meter.id();
308 }
309
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800310 @Override
Andrea Campanella600d2e22020-06-22 11:00:31 +0200311 public void removeFromPendingMeters(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
312 if (deviceId == null) {
313 return;
314 }
315 pendingMeters.computeIfPresent(deviceId, (id, bwps) -> {
316 bwps.remove(bwpInfo);
317 return bwps;
318 });
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200319 }
320
321 @Override
Andrea Campanellad1e26642020-10-23 12:08:32 +0200322 public synchronized boolean checkAndAddPendingMeter(DeviceId deviceId, BandwidthProfileInformation bwpInfo) {
yasin saplib4b8ee12021-06-13 18:25:20 +0000323 if (bwpInfo == null) {
324 log.debug("Bandwidth profile is null for device: {}", deviceId);
325 return false;
326 }
Andrea Campanellad1e26642020-10-23 12:08:32 +0200327 if (pendingMeters.containsKey(deviceId)
328 && pendingMeters.get(deviceId).contains(bwpInfo)) {
Matteo Scandolo19b56f62020-10-29 13:29:21 -0700329 log.debug("Meter is already pending on {} with bp {}",
Andrea Campanellad1e26642020-10-23 12:08:32 +0200330 deviceId, bwpInfo);
Andrea Campanella600d2e22020-06-22 11:00:31 +0200331 return false;
332 }
Andrea Campanellad1e26642020-10-23 12:08:32 +0200333 log.debug("Adding bandwidth profile {} to pending on {}",
334 bwpInfo, deviceId);
335 pendingMeters.compute(deviceId, (id, bwps) -> {
336 if (bwps == null) {
337 bwps = new HashSet<>();
338 }
339 bwps.add(bwpInfo);
340 return bwps;
341 });
342
343 return true;
Andrea Campanella3ce4d282020-06-09 13:46:58 +0200344 }
345
346 @Override
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800347 public void clearMeters(DeviceId deviceId) {
Andrea Campanella65487ba2020-06-17 11:31:30 +0200348 log.debug("Removing all meters for device {}", deviceId);
Andrea Campanella600d2e22020-06-22 11:00:31 +0200349 clearDeviceState(deviceId);
Andrea Campanella65487ba2020-06-17 11:31:30 +0200350 meterService.purgeMeters(deviceId);
Jonathan Hart4f178fa2020-02-03 10:46:01 -0800351 }
352
Andrea Campanella600d2e22020-06-22 11:00:31 +0200353 @Override
354 public void clearDeviceState(DeviceId deviceId) {
355 log.info("Clearing local device state for {}", deviceId);
356 pendingRemoveMeters.remove(deviceId);
357 removeMetersFromBpMapping(deviceId);
358 //Following call handles cornercase of OLT delete during meter provisioning
359 pendingMeters.remove(deviceId);
360 }
361
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000362 private List<Band> createMeterBands(BandwidthProfileInformation bpInfo) {
363 List<Band> meterBands = new ArrayList<>();
364
Gamze Abakaf46ab432021-03-03 10:51:17 +0000365 // add cir
366 if (bpInfo.committedInformationRate() != 0) {
367 meterBands.add(createMeterBand(bpInfo.committedInformationRate(), bpInfo.committedBurstSize()));
368 }
369
370 // check if both air and gir are set together in sadis
371 // if they are, set air to 0
372 if (bpInfo.assuredInformationRate() != 0 && bpInfo.guaranteedInformationRate() != 0) {
373 bpInfo.setAssuredInformationRate(0);
374 }
375
376 // add pir
377 long pir = bpInfo.peakInformationRate() != 0 ? bpInfo.peakInformationRate() : (bpInfo.exceededInformationRate()
378 + bpInfo.committedInformationRate() + bpInfo.guaranteedInformationRate()
379 + bpInfo.assuredInformationRate());
380
381 Long pbs = bpInfo.peakBurstSize() != null ? bpInfo.peakBurstSize() :
382 (bpInfo.exceededBurstSize() != null ? bpInfo.exceededBurstSize() : 0) +
383 (bpInfo.committedBurstSize() != null ? bpInfo.committedBurstSize() : 0);
384
385 meterBands.add(createMeterBand(pir, pbs));
386
387 // add gir
388 if (bpInfo.guaranteedInformationRate() != 0) {
389 meterBands.add(createMeterBand(bpInfo.guaranteedInformationRate(), 0L));
390 }
391
392 // add air
393 // air is used in place of gir only if gir is
394 // not present and air is not 0, see line 330.
395 // Included for backwards compatibility, will be removed in VOLTHA 2.9.
396 if (bpInfo.assuredInformationRate() != 0) {
397 meterBands.add(createMeterBand(bpInfo.assuredInformationRate(), 0L));
398 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000399
400 return meterBands;
401 }
402
403 private Band createMeterBand(long rate, Long burst) {
404 return DefaultBand.builder()
405 .withRate(rate) //already Kbps
406 .burstSize(burst) // already Kbits
407 .ofType(Band.Type.DROP) // no matter
408 .build();
409 }
410
Andrea Campanella600d2e22020-06-22 11:00:31 +0200411 private void removeMeterFromBpMapping(MeterKey meterKey) {
412 List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
413 .filter(e -> e.getValue().equals(meterKey))
414 .collect(Collectors.toList());
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000415
Andrea Campanella600d2e22020-06-22 11:00:31 +0200416 meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
417 }
418
419 private void removeMetersFromBpMapping(DeviceId deviceId) {
420 List<Map.Entry<String, MeterKey>> meters = bpInfoToMeter.stream()
421 .filter(e -> e.getValue().deviceId().equals(deviceId))
422 .collect(Collectors.toList());
423
424 meters.forEach(e -> bpInfoToMeter.remove(e.getKey(), e.getValue()));
425 }
426
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200427 /**
428 * Checks for mastership or falls back to leadership on deviceId.
429 * If the device is available use mastership,
430 * otherwise fallback on leadership.
431 * Leadership on the device topic is needed because the master can be NONE
432 * in case the device went away, we still need to handle events
433 * consistently
434 */
435 private boolean isLocalLeader(DeviceId deviceId) {
436 if (deviceService.isAvailable(deviceId)) {
437 return mastershipService.isLocalMaster(deviceId);
438 } else {
439 // Fallback with Leadership service - device id is used as topic
440 NodeId leader = leadershipService.runForLeadership(
441 deviceId.toString()).leaderNodeId();
442 // Verify if this node is the leader
443 return clusterService.getLocalNode().id().equals(leader);
444 }
445 }
446
Andrea Campanella600d2e22020-06-22 11:00:31 +0200447 private class InternalMeterListener implements MeterListener {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000448
449 @Override
450 public void event(MeterEvent meterEvent) {
451 eventExecutor.execute(() -> {
452 Meter meter = meterEvent.subject();
453 if (meter == null) {
454 log.error("Meter in event {} is null", meterEvent);
455 return;
456 }
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200457 if (isLocalLeader(meter.deviceId())) {
458 MeterKey key = MeterKey.key(meter.deviceId(), meter.id());
459 if (deleteMeters &&
460 MeterEvent.Type.METER_REFERENCE_COUNT_ZERO.equals(meterEvent.type())) {
461 log.info("Zero Count Meter Event is received. Meter is {} on {}",
Andrea Campanella600d2e22020-06-22 11:00:31 +0200462 meter.id(), meter.deviceId());
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200463 incrementMeterCount(meter.deviceId(), key);
464
465 if (appId.equals(meter.appId()) && pendingRemoveMeters.get(meter.deviceId())
466 .get(key).get() == zeroReferenceMeterCount) {
467 log.info("Deleting unreferenced, no longer programmed Meter {} on {}",
468 meter.id(), meter.deviceId());
469 deleteMeter(meter.deviceId(), meter.id());
470 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000471 }
Andrea Campanella7e1eb712021-09-22 14:27:35 +0200472 if (MeterEvent.Type.METER_REMOVED.equals(meterEvent.type())) {
473 log.info("Meter Removed Event is received for {} on {}",
474 meter.id(), meter.deviceId());
475 pendingRemoveMeters.computeIfPresent(meter.deviceId(),
476 (id, meters) -> {
477 if (meters.get(key) == null) {
478 log.info("Meters is not pending " +
479 "{} on {}", key, id);
480 return meters;
481 }
482 meters.remove(key);
483 return meters;
484 });
485 removeMeterFromBpMapping(key);
486 }
487 } else {
488 log.trace("Ignoring meter event, not leader of {}, {}", meter.deviceId(), meterEvent);
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000489 }
490 });
491 }
492
Andrea Campanella600d2e22020-06-22 11:00:31 +0200493 private void incrementMeterCount(DeviceId deviceId, MeterKey key) {
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000494 if (key == null) {
495 return;
496 }
Andrea Campanella600d2e22020-06-22 11:00:31 +0200497 pendingRemoveMeters.compute(deviceId,
498 (id, meters) -> {
499 if (meters == null) {
500 meters = new HashMap<>();
501
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000502 }
Andrea Campanella600d2e22020-06-22 11:00:31 +0200503 if (meters.get(key) == null) {
504 meters.put(key, new AtomicInteger(1));
505 }
506 meters.get(key).addAndGet(1);
507 return meters;
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000508 });
509 }
510
511 private void deleteMeter(DeviceId deviceId, MeterId meterId) {
512 Meter meter = meterService.getMeter(deviceId, meterId);
513 if (meter != null) {
514 MeterRequest meterRequest = DefaultMeterRequest.builder()
515 .withBands(meter.bands())
516 .withUnit(meter.unit())
517 .forDevice(deviceId)
518 .fromApp(appId)
519 .burst()
520 .remove();
521
522 meterService.withdraw(meterRequest, meterId);
523 }
524 }
Andrea Campanellacbbb7952019-11-25 06:38:41 +0000525 }
526}