Aggregate stats from all cluster nodes and publish

Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 30f651f..179d6b6 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,14 +22,25 @@
 import org.onlab.junit.TestUtils;
 import org.onlab.osgi.ComponentContextAdapter;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 
@@ -59,11 +70,16 @@
         dhcpL2Relay.storageService = new TestStorageService();
         dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
-        dhcpL2Relay.activate(new ComponentContextAdapter());
         store = new SimpleDhcpL2RelayCountersStore();
+        store.storageService = new TestStorageService();
+        store.clusterService = new ClusterServiceAdapter();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterCommunicationService = new TestClusterCommunicationService<>();
+        store.componentConfigService = mockConfigService;
         TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
-        store.activate();
+        store.activate(new MockComponentContext());
         dhcpL2Relay.dhcpL2RelayCounters = this.store;
+        dhcpL2Relay.activate(new ComponentContextAdapter());
     }
 
     /**
@@ -80,20 +96,20 @@
     @Test
     public void testInitCounter() {
         // Init the supported global counter
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
         // Init the supported counter for a specific subscriber
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+        store.initCounters(CLIENT_ID_1, new DhcpL2RelayStatistics());
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
             long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 0, globalCounterValue);
-            assertEquals((long) 0, perSubscriberValue);
+                    counterType));
+            assertEquals(0, globalCounterValue);
+            assertEquals(0, perSubscriberValue);
         }
     }
 
@@ -104,27 +120,27 @@
     @Test
     public void testIncrementCounter() {
         // Init the supported global counter
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
 
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             // Increment of existing supported global counter
             dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
             // Add of a Subscriber entry that is not already in the set
             dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
         }
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
             long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 1, globalCounterValue);
-            assertEquals((long) 1, perSubscriberValue);
+                    counterType));
+            assertEquals(1, globalCounterValue);
+            assertEquals(1, perSubscriberValue);
         }
     }
 
@@ -133,12 +149,12 @@
      */
     @Test
     public void testIncrementAndResetCounter() {
-        DhcpL2RelayCounters counterType;
+        DhcpL2RelayCounterNames counterType;
         long subscriberValue;
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap;
 
         // First start incrementing the counter of a specific subscriber
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             // Insert of a Subscriber entry that is not already in the set
@@ -146,24 +162,24 @@
         }
 
         // Make sure that the counter is incremented
-        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 1, subscriberValue);
+                    counterType));
+            assertEquals(1, subscriberValue);
         }
 
         // Reset the counter
         dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
-        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 0, subscriberValue);
+                    counterType));
+            assertEquals(0, subscriberValue);
         }
     }
 
@@ -173,13 +189,84 @@
      */
     @Test
     public void testInsertOrUpdateCounter() {
-        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1,
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"), (long) 50);
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                CLIENT_ID_1, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
 
-        assertEquals((long) 50, subscriberValue);
+        assertEquals(50, subscriberValue);
+    }
+
+    public class TestClusterCommunicationService<M> implements ClusterCommunicationService {
+
+        private Consumer handler;
+
+        @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber, ExecutorService executor) {
+
+        }
+
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+
+        }
+
+        @Override
+        public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+            handler.accept(message);
+        }
+
+        @Override
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+                                                   Function<M, byte[]> encoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                                  Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder,
+                                                          Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder, Function<byte[], R> decoder,
+                                                          NodeId toNodeId, Duration timeout) {
+            return null;
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+
+        }
+
+        @Override
+        public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                      Consumer<M> handler, Executor executor) {
+            this.handler = handler;
+        }
+
+        @Override
+        public void removeSubscriber(MessageSubject subject) {
+
+        }
     }
 
 }
\ No newline at end of file