blob: f95867ca2aef4d6258c522504c8967e2b3e53976 [file] [log] [blame]
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -03001/*
2 * Copyright 2017-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package org.opencord.dhcpl2relay.impl;
17
Jonathan Hart77ca3152020-02-21 14:31:21 -080018import com.google.common.base.Strings;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030019import com.google.common.collect.ImmutableMap;
Jonathan Hart77ca3152020-02-21 14:31:21 -080020import org.onlab.util.KryoNamespace;
21import org.onlab.util.SafeRecurringTask;
22import org.onlab.util.Tools;
23import org.onosproject.cfg.ComponentConfigService;
24import org.onosproject.cluster.ClusterService;
25import org.onosproject.cluster.LeadershipService;
26import org.onosproject.cluster.NodeId;
27import org.onosproject.store.AbstractStore;
28import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
29import org.onosproject.store.cluster.messaging.ClusterMessage;
30import org.onosproject.store.cluster.messaging.MessageSubject;
31import org.onosproject.store.serializers.KryoNamespaces;
32import org.onosproject.store.service.EventuallyConsistentMap;
33import org.onosproject.store.service.Serializer;
34import org.onosproject.store.service.StorageService;
35import org.onosproject.store.service.WallClockTimestamp;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030036import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
Jonathan Hart77ca3152020-02-21 14:31:21 -080037import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
38import org.osgi.service.component.ComponentContext;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030039import org.osgi.service.component.annotations.Activate;
40import org.osgi.service.component.annotations.Component;
Jonathan Hart77ca3152020-02-21 14:31:21 -080041import org.osgi.service.component.annotations.Deactivate;
42import org.osgi.service.component.annotations.Modified;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030043import org.osgi.service.component.annotations.Reference;
44import org.osgi.service.component.annotations.ReferenceCardinality;
45import org.slf4j.Logger;
46
Jonathan Hart77ca3152020-02-21 14:31:21 -080047import java.nio.charset.StandardCharsets;
48import java.util.AbstractMap;
49import java.util.Dictionary;
50import java.util.Objects;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030051import java.util.concurrent.ConcurrentHashMap;
Jonathan Hart77ca3152020-02-21 14:31:21 -080052import java.util.concurrent.ConcurrentMap;
53import java.util.concurrent.Executors;
54import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.ScheduledFuture;
56import java.util.concurrent.TimeUnit;
57import java.util.concurrent.atomic.AtomicBoolean;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030058import java.util.concurrent.atomic.AtomicLong;
59
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030060import static com.google.common.base.Preconditions.checkNotNull;
Jonathan Hart77ca3152020-02-21 14:31:21 -080061import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
62import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
63import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
64import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
65import static org.slf4j.LoggerFactory.getLogger;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030066
67/**
68 * DHCP Relay Agent Counters Manager Component.
69 */
Jonathan Hart77ca3152020-02-21 14:31:21 -080070@Component(immediate = true,
71property = {
72 PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
73 SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
74}
75)
76public class SimpleDhcpL2RelayCountersStore extends AbstractStore<DhcpL2RelayEvent, DhcpL2RelayStoreDelegate>
77 implements DhcpL2RelayCountersStore {
78
79 private static final String DHCP_STATISTICS_LEADERSHIP = "dhcpl2relay-statistics";
80 private static final MessageSubject RESET_SUBJECT = new MessageSubject("dhcpl2relay-statistics-reset");
81
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030082 private final Logger log = getLogger(getClass());
Jonathan Hart77ca3152020-02-21 14:31:21 -080083 private ConcurrentMap<DhcpL2RelayCountersIdentifier, Long> countersMap;
84
85 private EventuallyConsistentMap<NodeId, DhcpL2RelayStatistics> statistics;
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -030086
87 @Reference(cardinality = ReferenceCardinality.MANDATORY)
Jonathan Hart77ca3152020-02-21 14:31:21 -080088 protected StorageService storageService;
89
90 @Reference(cardinality = ReferenceCardinality.MANDATORY)
91 protected ClusterService clusterService;
92
93 @Reference(cardinality = ReferenceCardinality.MANDATORY)
94 protected LeadershipService leadershipService;
95
96 @Reference(cardinality = ReferenceCardinality.MANDATORY)
97 protected ComponentConfigService componentConfigService;
98
99 @Reference(cardinality = ReferenceCardinality.MANDATORY)
100 protected ClusterCommunicationService clusterCommunicationService;
101
102 protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
103 protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
104
105 KryoNamespace serializer = KryoNamespace.newBuilder()
106 .register(KryoNamespaces.API)
107 .register(DhcpL2RelayStatistics.class)
108 .register(DhcpL2RelayCountersIdentifier.class)
109 .register(DhcpL2RelayCounterNames.class)
110 .register(ClusterMessage.class)
111 .register(MessageSubject.class)
112 .build();
113
114 private ScheduledExecutorService executor;
115
116 private ScheduledFuture<?> publisherTask;
117 private ScheduledFuture<?> syncTask;
118
119 private AtomicBoolean dirty = new AtomicBoolean(true);
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300120
121 @Activate
Jonathan Hart77ca3152020-02-21 14:31:21 -0800122 public void activate(ComponentContext context) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300123 log.info("Activate Dhcp L2 Counters Manager");
Jonathan Hart77ca3152020-02-21 14:31:21 -0800124 countersMap = new ConcurrentHashMap<>();
125 componentConfigService.registerProperties(getClass());
126
127 modified(context);
128
129 statistics = storageService.<NodeId, DhcpL2RelayStatistics>eventuallyConsistentMapBuilder()
130 .withName("dhcpl2relay-statistics")
131 .withSerializer(serializer)
132 .withTimestampProvider((k, v) -> new WallClockTimestamp())
133 .build();
134
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300135 // Initialize counter values for the global counters
Jonathan Hart77ca3152020-02-21 14:31:21 -0800136 initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
137 syncStats();
138
139 leadershipService.runForLeadership(DHCP_STATISTICS_LEADERSHIP);
140
141 executor = Executors.newScheduledThreadPool(1);
142
143 clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
144 this::resetLocal, executor);
145
146 startSyncTask();
147 startPublishTask();
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300148 }
149
Jonathan Hart77ca3152020-02-21 14:31:21 -0800150 @Deactivate
151 public void deactivate() {
152 clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
153 leadershipService.withdraw(DHCP_STATISTICS_LEADERSHIP);
154
155 stopPublishTask();
156 stopSyncTask();
157 executor.shutdownNow();
158 componentConfigService.unregisterProperties(getClass(), false);
159 }
160
161 @Modified
162 public void modified(ComponentContext context) {
163 Dictionary<String, Object> properties = context.getProperties();
164
165 String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
166 int oldPublishCountersRate = publishCountersRate;
167 publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
168 : Integer.parseInt(s.trim());
169 if (oldPublishCountersRate != publishCountersRate) {
170 stopPublishTask();
171 startPublishTask();
172 }
173
174 s = Tools.get(properties, SYNC_COUNTERS_RATE);
175 int oldSyncCountersRate = syncCountersRate;
176 syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
177 : Integer.parseInt(s.trim());
178 if (oldSyncCountersRate != syncCountersRate) {
179 stopSyncTask();
180 startSyncTask();
181 }
182 }
183
184 private ScheduledFuture<?> startTask(Runnable r, int rate) {
185 return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
186 0, rate, TimeUnit.SECONDS);
187 }
188
189 private void stopTask(ScheduledFuture<?> task) {
190 task.cancel(true);
191 }
192
193 private void startSyncTask() {
194 syncTask = startTask(this::syncStats, syncCountersRate);
195 }
196
197 private void stopSyncTask() {
198 stopTask(syncTask);
199 }
200
201 private void startPublishTask() {
202 publisherTask = startTask(this::publishStats, publishCountersRate);
203 }
204
205 private void stopPublishTask() {
206 stopTask(publisherTask);
207 }
208
209 ImmutableMap<DhcpL2RelayCountersIdentifier, Long> getCountersMap() {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300210 return ImmutableMap.copyOf(countersMap);
211 }
212
Jonathan Hart77ca3152020-02-21 14:31:21 -0800213 public DhcpL2RelayStatistics getCounters() {
214 return aggregate();
215 }
216
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300217 /**
218 * Initialize the supported counters map for the given counter class.
219 * @param counterClass class of counters (global, per subscriber)
Jonathan Hart77ca3152020-02-21 14:31:21 -0800220 * @param existingStats existing values to intialise the counters to
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300221 */
Jonathan Hart77ca3152020-02-21 14:31:21 -0800222 public void initCounters(String counterClass, DhcpL2RelayStatistics existingStats) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300223 checkNotNull(counterClass, "counter class can't be null");
Jonathan Hart77ca3152020-02-21 14:31:21 -0800224 for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
225 DhcpL2RelayCountersIdentifier id = new DhcpL2RelayCountersIdentifier(counterClass, counterType);
226 countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300227 }
228 }
229
230 /**
231 * Inserts the counter entry if it is not already in the set otherwise increment the existing counter entry.
232 * @param counterClass class of counters (global, per subscriber)
233 * @param counterType name of counter
234 */
Jonathan Hart77ca3152020-02-21 14:31:21 -0800235 public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300236 checkNotNull(counterClass, "counter class can't be null");
Jonathan Hart77ca3152020-02-21 14:31:21 -0800237 if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300238 DhcpL2RelayCountersIdentifier counterIdentifier =
239 new DhcpL2RelayCountersIdentifier(counterClass, counterType);
240 countersMap.compute(counterIdentifier, (key, counterValue) ->
Jonathan Hart77ca3152020-02-21 14:31:21 -0800241 (counterValue != null) ? counterValue + 1 : 1L);
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300242 } else {
243 log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
244 }
Jonathan Hart77ca3152020-02-21 14:31:21 -0800245 dirty.set(true);
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300246 }
247
248 /**
249 * Reset the counters map for the given counter class.
250 * @param counterClass class of counters (global, per subscriber)
251 */
252 public void resetCounters(String counterClass) {
Jonathan Hart77ca3152020-02-21 14:31:21 -0800253 byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
254 ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
255 clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
256 }
257
258 private void resetLocal(ClusterMessage m) {
259 String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
260
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300261 checkNotNull(counterClass, "counter class can't be null");
Jonathan Hart77ca3152020-02-21 14:31:21 -0800262 for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300263 DhcpL2RelayCountersIdentifier counterIdentifier =
264 new DhcpL2RelayCountersIdentifier(counterClass, counterType);
Jonathan Hart77ca3152020-02-21 14:31:21 -0800265 countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300266 }
Jonathan Hart77ca3152020-02-21 14:31:21 -0800267 dirty.set(true);
268 syncStats();
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300269 }
270
271 /**
272 * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
273 * @param counterClass class of counters (global, per subscriber).
274 * @param counterType name of counter
Jonathan Hart77ca3152020-02-21 14:31:21 -0800275 * @param value counter value
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300276 */
Jonathan Hart77ca3152020-02-21 14:31:21 -0800277 public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300278 checkNotNull(counterClass, "counter class can't be null");
Jonathan Hart77ca3152020-02-21 14:31:21 -0800279 if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300280 DhcpL2RelayCountersIdentifier counterIdentifier =
281 new DhcpL2RelayCountersIdentifier(counterClass, counterType);
Jonathan Hart77ca3152020-02-21 14:31:21 -0800282 countersMap.put(counterIdentifier, value);
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300283 } else {
284 log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
285 }
Jonathan Hart77ca3152020-02-21 14:31:21 -0800286 dirty.set(true);
287 syncStats();
288 }
289
290 private DhcpL2RelayStatistics aggregate() {
291 return statistics.values().stream()
292 .reduce(new DhcpL2RelayStatistics(), DhcpL2RelayStatistics::add);
293 }
294
295 /**
296 * Creates a snapshot of the current in-memory statistics.
297 *
298 * @return snapshot of statistics
299 */
300 private DhcpL2RelayStatistics snapshot() {
301 return DhcpL2RelayStatistics.withCounters(countersMap);
302 }
303
304 /**
305 * Syncs in-memory stats to the eventually consistent map.
306 */
307 private void syncStats() {
308 if (dirty.get()) {
309 statistics.put(clusterService.getLocalNode().id(), snapshot());
310 dirty.set(false);
311 }
312 }
313
314 private void publishStats() {
315 // Only publish events if we are the cluster leader for DHCP L2 relay stats
316 if (!Objects.equals(leadershipService.getLeader(DHCP_STATISTICS_LEADERSHIP),
317 clusterService.getLocalNode().id())) {
318 return;
319 }
320
321 aggregate().counters().forEach((counterKey, counterValue) -> {
322 // Subscriber-specific counters have the subscriber ID set
323 String subscriberId = null;
324 if (!counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
325 subscriberId = counterKey.counterClassKey;
326 }
327
328 delegate.notify(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
329 new AbstractMap.SimpleEntry<>(counterKey.counterTypeKey.toString(),
330 new AtomicLong(counterValue)), subscriberId));
331 });
Marcos Aurelio Carreroeaf02b82019-11-25 13:34:25 -0300332 }
333}