Aggregate stats from all cluster nodes and publish
Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
index eedbace..f95867c 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
@@ -15,57 +15,215 @@
*/
package org.opencord.dhcpl2relay.impl;
+import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
+import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.slf4j.Logger;
-import java.util.Iterator;
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.Dictionary;
+import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
-import static org.slf4j.LoggerFactory.getLogger;
import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
/**
* DHCP Relay Agent Counters Manager Component.
*/
-@Component(immediate = true)
-public class SimpleDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
- private ApplicationId appId;
+@Component(immediate = true,
+property = {
+ PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+ SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+}
+)
+public class SimpleDhcpL2RelayCountersStore extends AbstractStore<DhcpL2RelayEvent, DhcpL2RelayStoreDelegate>
+ implements DhcpL2RelayCountersStore {
+
+ private static final String DHCP_STATISTICS_LEADERSHIP = "dhcpl2relay-statistics";
+ private static final MessageSubject RESET_SUBJECT = new MessageSubject("dhcpl2relay-statistics-reset");
+
private final Logger log = getLogger(getClass());
- private Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+ private ConcurrentMap<DhcpL2RelayCountersIdentifier, Long> countersMap;
+
+ private EventuallyConsistentMap<NodeId, DhcpL2RelayStatistics> statistics;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
- protected CoreService coreService;
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected LeadershipService leadershipService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService componentConfigService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterCommunicationService clusterCommunicationService;
+
+ protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+ protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+
+ KryoNamespace serializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(DhcpL2RelayStatistics.class)
+ .register(DhcpL2RelayCountersIdentifier.class)
+ .register(DhcpL2RelayCounterNames.class)
+ .register(ClusterMessage.class)
+ .register(MessageSubject.class)
+ .build();
+
+ private ScheduledExecutorService executor;
+
+ private ScheduledFuture<?> publisherTask;
+ private ScheduledFuture<?> syncTask;
+
+ private AtomicBoolean dirty = new AtomicBoolean(true);
@Activate
- public void activate() {
+ public void activate(ComponentContext context) {
log.info("Activate Dhcp L2 Counters Manager");
- //appId = coreService.getAppId(DhcpL2Relay.DHCP_L2RELAY_APP);
- countersMap = new ConcurrentHashMap();
+ countersMap = new ConcurrentHashMap<>();
+ componentConfigService.registerProperties(getClass());
+
+ modified(context);
+
+ statistics = storageService.<NodeId, DhcpL2RelayStatistics>eventuallyConsistentMapBuilder()
+ .withName("dhcpl2relay-statistics")
+ .withSerializer(serializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+
// Initialize counter values for the global counters
- initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+ syncStats();
+
+ leadershipService.runForLeadership(DHCP_STATISTICS_LEADERSHIP);
+
+ executor = Executors.newScheduledThreadPool(1);
+
+ clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+ this::resetLocal, executor);
+
+ startSyncTask();
+ startPublishTask();
}
- public ImmutableMap<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
+ @Deactivate
+ public void deactivate() {
+ clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+ leadershipService.withdraw(DHCP_STATISTICS_LEADERSHIP);
+
+ stopPublishTask();
+ stopSyncTask();
+ executor.shutdownNow();
+ componentConfigService.unregisterProperties(getClass(), false);
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<String, Object> properties = context.getProperties();
+
+ String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+ int oldPublishCountersRate = publishCountersRate;
+ publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldPublishCountersRate != publishCountersRate) {
+ stopPublishTask();
+ startPublishTask();
+ }
+
+ s = Tools.get(properties, SYNC_COUNTERS_RATE);
+ int oldSyncCountersRate = syncCountersRate;
+ syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+ : Integer.parseInt(s.trim());
+ if (oldSyncCountersRate != syncCountersRate) {
+ stopSyncTask();
+ startSyncTask();
+ }
+ }
+
+ private ScheduledFuture<?> startTask(Runnable r, int rate) {
+ return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+ 0, rate, TimeUnit.SECONDS);
+ }
+
+ private void stopTask(ScheduledFuture<?> task) {
+ task.cancel(true);
+ }
+
+ private void startSyncTask() {
+ syncTask = startTask(this::syncStats, syncCountersRate);
+ }
+
+ private void stopSyncTask() {
+ stopTask(syncTask);
+ }
+
+ private void startPublishTask() {
+ publisherTask = startTask(this::publishStats, publishCountersRate);
+ }
+
+ private void stopPublishTask() {
+ stopTask(publisherTask);
+ }
+
+ ImmutableMap<DhcpL2RelayCountersIdentifier, Long> getCountersMap() {
return ImmutableMap.copyOf(countersMap);
}
+ public DhcpL2RelayStatistics getCounters() {
+ return aggregate();
+ }
+
/**
* Initialize the supported counters map for the given counter class.
* @param counterClass class of counters (global, per subscriber)
+ * @param existingStats existing values to intialise the counters to
*/
- public void initCounters(String counterClass) {
+ public void initCounters(String counterClass, DhcpL2RelayStatistics existingStats) {
checkNotNull(counterClass, "counter class can't be null");
- for (DhcpL2RelayCounters counterType : DhcpL2RelayCounters.SUPPORTED_COUNTERS) {
- countersMap.put(new DhcpL2RelayCountersIdentifier(counterClass, counterType), new AtomicLong(0));
+ for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
+ DhcpL2RelayCountersIdentifier id = new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+ countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
}
}
@@ -74,17 +232,17 @@
* @param counterClass class of counters (global, per subscriber)
* @param counterType name of counter
*/
- public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+ public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
checkNotNull(counterClass, "counter class can't be null");
- if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+ if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
DhcpL2RelayCountersIdentifier counterIdentifier =
new DhcpL2RelayCountersIdentifier(counterClass, counterType);
countersMap.compute(counterIdentifier, (key, counterValue) ->
- (counterValue != null) ? new AtomicLong(counterValue.incrementAndGet()) : new AtomicLong(1)
- );
+ (counterValue != null) ? counterValue + 1 : 1L);
} else {
log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
}
+ dirty.set(true);
}
/**
@@ -92,31 +250,84 @@
* @param counterClass class of counters (global, per subscriber)
*/
public void resetCounters(String counterClass) {
+ byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+ ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+ clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+ }
+
+ private void resetLocal(ClusterMessage m) {
+ String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+
checkNotNull(counterClass, "counter class can't be null");
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator(); it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
DhcpL2RelayCountersIdentifier counterIdentifier =
new DhcpL2RelayCountersIdentifier(counterClass, counterType);
- countersMap.computeIfPresent(counterIdentifier, (key, counterValue) ->
- new AtomicLong(0)
- );
+ countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
}
+ dirty.set(true);
+ syncStats();
}
/**
* Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
* @param counterClass class of counters (global, per subscriber).
* @param counterType name of counter
- * @param value conter value
+ * @param value counter value
*/
- public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
+ public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
checkNotNull(counterClass, "counter class can't be null");
- if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+ if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
DhcpL2RelayCountersIdentifier counterIdentifier =
new DhcpL2RelayCountersIdentifier(counterClass, counterType);
- countersMap.put(counterIdentifier, new AtomicLong(value));
+ countersMap.put(counterIdentifier, value);
} else {
log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
}
+ dirty.set(true);
+ syncStats();
+ }
+
+ private DhcpL2RelayStatistics aggregate() {
+ return statistics.values().stream()
+ .reduce(new DhcpL2RelayStatistics(), DhcpL2RelayStatistics::add);
+ }
+
+ /**
+ * Creates a snapshot of the current in-memory statistics.
+ *
+ * @return snapshot of statistics
+ */
+ private DhcpL2RelayStatistics snapshot() {
+ return DhcpL2RelayStatistics.withCounters(countersMap);
+ }
+
+ /**
+ * Syncs in-memory stats to the eventually consistent map.
+ */
+ private void syncStats() {
+ if (dirty.get()) {
+ statistics.put(clusterService.getLocalNode().id(), snapshot());
+ dirty.set(false);
+ }
+ }
+
+ private void publishStats() {
+ // Only publish events if we are the cluster leader for DHCP L2 relay stats
+ if (!Objects.equals(leadershipService.getLeader(DHCP_STATISTICS_LEADERSHIP),
+ clusterService.getLocalNode().id())) {
+ return;
+ }
+
+ aggregate().counters().forEach((counterKey, counterValue) -> {
+ // Subscriber-specific counters have the subscriber ID set
+ String subscriberId = null;
+ if (!counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
+ subscriberId = counterKey.counterClassKey;
+ }
+
+ delegate.notify(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
+ new AbstractMap.SimpleEntry<>(counterKey.counterTypeKey.toString(),
+ new AtomicLong(counterValue)), subscriberId));
+ });
}
}