Aggregate stats from all cluster nodes and publish
Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 30f651f..179d6b6 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,14 +22,25 @@
import org.onlab.junit.TestUtils;
import org.onlab.osgi.ComponentContextAdapter;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
import static org.junit.Assert.assertEquals;
@@ -59,11 +70,16 @@
dhcpL2Relay.storageService = new TestStorageService();
dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
- dhcpL2Relay.activate(new ComponentContextAdapter());
store = new SimpleDhcpL2RelayCountersStore();
+ store.storageService = new TestStorageService();
+ store.clusterService = new ClusterServiceAdapter();
+ store.leadershipService = new LeadershipServiceAdapter();
+ store.clusterCommunicationService = new TestClusterCommunicationService<>();
+ store.componentConfigService = mockConfigService;
TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
- store.activate();
+ store.activate(new MockComponentContext());
dhcpL2Relay.dhcpL2RelayCounters = this.store;
+ dhcpL2Relay.activate(new ComponentContextAdapter());
}
/**
@@ -80,20 +96,20 @@
@Test
public void testInitCounter() {
// Init the supported global counter
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
// Init the supported counter for a specific subscriber
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+ store.initCounters(CLIENT_ID_1, new DhcpL2RelayStatistics());
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+ DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 0, globalCounterValue);
- assertEquals((long) 0, perSubscriberValue);
+ counterType));
+ assertEquals(0, globalCounterValue);
+ assertEquals(0, perSubscriberValue);
}
}
@@ -104,27 +120,27 @@
@Test
public void testIncrementCounter() {
// Init the supported global counter
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
// Increment of existing supported global counter
dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
// Add of a Subscriber entry that is not already in the set
dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
}
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+ DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 1, globalCounterValue);
- assertEquals((long) 1, perSubscriberValue);
+ counterType));
+ assertEquals(1, globalCounterValue);
+ assertEquals(1, perSubscriberValue);
}
}
@@ -133,12 +149,12 @@
*/
@Test
public void testIncrementAndResetCounter() {
- DhcpL2RelayCounters counterType;
+ DhcpL2RelayCounterNames counterType;
long subscriberValue;
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap;
// First start incrementing the counter of a specific subscriber
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
// Insert of a Subscriber entry that is not already in the set
@@ -146,24 +162,24 @@
}
// Make sure that the counter is incremented
- countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 1, subscriberValue);
+ counterType));
+ assertEquals(1, subscriberValue);
}
// Reset the counter
dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
- countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 0, subscriberValue);
+ counterType));
+ assertEquals(0, subscriberValue);
}
}
@@ -173,13 +189,84 @@
*/
@Test
public void testInsertOrUpdateCounter() {
- dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+ dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1,
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"), (long) 50);
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ CLIENT_ID_1, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
- assertEquals((long) 50, subscriberValue);
+ assertEquals(50, subscriberValue);
+ }
+
+ public class TestClusterCommunicationService<M> implements ClusterCommunicationService {
+
+ private Consumer handler;
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber, ExecutorService executor) {
+
+ }
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ handler.accept(message);
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Function<byte[], R> decoder,
+ NodeId toNodeId, Duration timeout) {
+ return null;
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Consumer<M> handler, Executor executor) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+
+ }
}
}
\ No newline at end of file