Aggregate stats from all cluster nodes and publish
Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
index d4cb8bd..e218088 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -66,6 +66,9 @@
dhcpL2Relay.mastershipService = new MockMastershipService();
dhcpL2Relay.storageService = new TestStorageService();
dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
+ SimpleDhcpL2RelayCountersStore store = new SimpleDhcpL2RelayCountersStore();
+ store.componentConfigService = mockConfigService;
+ dhcpL2Relay.dhcpL2RelayCounters = store;
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
dhcpL2Relay.activate(new ComponentContextAdapter());
}
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 30f651f..179d6b6 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,14 +22,25 @@
import org.onlab.junit.TestUtils;
import org.onlab.osgi.ComponentContextAdapter;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
import static org.junit.Assert.assertEquals;
@@ -59,11 +70,16 @@
dhcpL2Relay.storageService = new TestStorageService();
dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
- dhcpL2Relay.activate(new ComponentContextAdapter());
store = new SimpleDhcpL2RelayCountersStore();
+ store.storageService = new TestStorageService();
+ store.clusterService = new ClusterServiceAdapter();
+ store.leadershipService = new LeadershipServiceAdapter();
+ store.clusterCommunicationService = new TestClusterCommunicationService<>();
+ store.componentConfigService = mockConfigService;
TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
- store.activate();
+ store.activate(new MockComponentContext());
dhcpL2Relay.dhcpL2RelayCounters = this.store;
+ dhcpL2Relay.activate(new ComponentContextAdapter());
}
/**
@@ -80,20 +96,20 @@
@Test
public void testInitCounter() {
// Init the supported global counter
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
// Init the supported counter for a specific subscriber
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+ store.initCounters(CLIENT_ID_1, new DhcpL2RelayStatistics());
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+ DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 0, globalCounterValue);
- assertEquals((long) 0, perSubscriberValue);
+ counterType));
+ assertEquals(0, globalCounterValue);
+ assertEquals(0, perSubscriberValue);
}
}
@@ -104,27 +120,27 @@
@Test
public void testIncrementCounter() {
// Init the supported global counter
- dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+ store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
// Increment of existing supported global counter
dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
// Add of a Subscriber entry that is not already in the set
dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
}
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
- DhcpL2RelayCounters counterType = it.next();
+ DhcpL2RelayCounterNames counterType = it.next();
long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+ DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 1, globalCounterValue);
- assertEquals((long) 1, perSubscriberValue);
+ counterType));
+ assertEquals(1, globalCounterValue);
+ assertEquals(1, perSubscriberValue);
}
}
@@ -133,12 +149,12 @@
*/
@Test
public void testIncrementAndResetCounter() {
- DhcpL2RelayCounters counterType;
+ DhcpL2RelayCounterNames counterType;
long subscriberValue;
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap;
// First start incrementing the counter of a specific subscriber
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
// Insert of a Subscriber entry that is not already in the set
@@ -146,24 +162,24 @@
}
// Make sure that the counter is incremented
- countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 1, subscriberValue);
+ counterType));
+ assertEquals(1, subscriberValue);
}
// Reset the counter
dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
- countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
- for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+ countersMap = store.getCountersMap();
+ for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
it.hasNext();) {
counterType = it.next();
subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- counterType)).longValue();
- assertEquals((long) 0, subscriberValue);
+ counterType));
+ assertEquals(0, subscriberValue);
}
}
@@ -173,13 +189,84 @@
*/
@Test
public void testInsertOrUpdateCounter() {
- dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+ dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1,
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"), (long) 50);
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
- CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ CLIENT_ID_1, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
- assertEquals((long) 50, subscriberValue);
+ assertEquals(50, subscriberValue);
+ }
+
+ public class TestClusterCommunicationService<M> implements ClusterCommunicationService {
+
+ private Consumer handler;
+
+ @Override
+ public void addSubscriber(MessageSubject subject,
+ ClusterMessageHandler subscriber, ExecutorService executor) {
+
+ }
+
+ @Override
+ public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+
+ }
+
+ @Override
+ public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+ handler.accept(message);
+ }
+
+ @Override
+ public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M> void multicast(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder,
+ Function<byte[], R> decoder, NodeId toNodeId) {
+ return null;
+ }
+
+ @Override
+ public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+ Function<M, byte[]> encoder, Function<byte[], R> decoder,
+ NodeId toNodeId, Duration timeout) {
+ return null;
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+
+ }
+
+ @Override
+ public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+
+ }
+
+ @Override
+ public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+ Consumer<M> handler, Executor executor) {
+ this.handler = handler;
+ }
+
+ @Override
+ public void removeSubscriber(MessageSubject subject) {
+
+ }
}
}
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 42664cc..4106ef1 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -16,7 +16,6 @@
package org.opencord.dhcpl2relay.impl;
import com.google.common.collect.Lists;
-import org.easymock.EasyMock;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -27,20 +26,21 @@
import org.onlab.packet.UDP;
import org.onlab.packet.dhcp.DhcpOption;
import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
import org.onosproject.cluster.LeadershipServiceAdapter;
import org.onosproject.net.ConnectPoint;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
import org.onosproject.store.service.TestStorageService;
import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
import java.nio.ByteBuffer;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
+import static org.easymock.EasyMock.createMock;
import static org.junit.Assert.assertEquals;
public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
@@ -49,7 +49,7 @@
private SimpleDhcpL2RelayCountersStore store;
ComponentConfigService mockConfigService =
- EasyMock.createMock(ComponentConfigService.class);
+ createMock(ComponentConfigService.class);
/**
* Sets up the services required by the dhcpl2relay app.
@@ -73,8 +73,13 @@
dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());
store = new SimpleDhcpL2RelayCountersStore();
+ store.storageService = new TestStorageService();
+ store.leadershipService = new LeadershipServiceAdapter();
+ store.clusterService = new ClusterServiceAdapter();
+ store.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
+ store.componentConfigService = mockConfigService;
TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
- store.activate();
+ store.activate(new MockComponentContext());
dhcpL2Relay.dhcpL2RelayCounters = this.store;
}
@@ -207,20 +212,20 @@
sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
- DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPACK")));
- assertEquals((long) 1, discoveryValue);
- assertEquals((long) 1, offerValue);
- assertEquals((long) 1, requestValue);
- assertEquals((long) 1, ackValue);
+ assertEquals(1, discoveryValue);
+ assertEquals(1, offerValue);
+ assertEquals(1, requestValue);
+ assertEquals(1, ackValue);
}
/**
@@ -246,32 +251,20 @@
sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
- Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+ Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
- DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+ DhcpL2RelayCounterNames.valueOf("DHCPACK")));
- assertEquals((long) 1, discoveryValue);
- assertEquals((long) 1, offerValue);
- assertEquals((long) 1, requestValue);
- assertEquals((long) 1, ackValue);
- }
-
- /**
- * Tests the schedule function to publish the counters to kafka.
- *
- */
- @Test
- public void testSchedulePublishCountersToKafka() {
- MockExecutor executor = new MockExecutor(dhcpL2Relay.refreshService);
- dhcpL2Relay.refreshTask = executor.scheduleWithFixedDelay(
- dhcpL2Relay.publishCountersToKafka, 0, 10, TimeUnit.SECONDS);
- executor.assertLastMethodCalled("scheduleWithFixedDelay", 0, 10, TimeUnit.SECONDS);
+ assertEquals(1, discoveryValue);
+ assertEquals(1, offerValue);
+ assertEquals(1, requestValue);
+ assertEquals(1, ackValue);
}
public void compareClientPackets(Ethernet sent, Ethernet relayed) {
@@ -317,29 +310,38 @@
}
private class MockDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+
@Override
- public void initCounters(String counterClass) {
+ public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
}
@Override
- public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+ public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
}
@Override
- public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
-
- }
-
- @Override
- public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
- return new HashMap<>();
+ public DhcpL2RelayStatistics getCounters() {
+ return new DhcpL2RelayStatistics();
}
@Override
public void resetCounters(String counterClass) {
}
+
+ @Override
+ public void setDelegate(DhcpL2RelayStoreDelegate delegate) {
+ }
+
+ @Override
+ public void unsetDelegate(DhcpL2RelayStoreDelegate delegate) {
+ }
+
+ @Override
+ public boolean hasDelegate() {
+ return false;
+ }
}
}
\ No newline at end of file