Aggregate stats from all cluster nodes and publish

Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
index d4cb8bd..e218088 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -66,6 +66,9 @@
         dhcpL2Relay.mastershipService = new MockMastershipService();
         dhcpL2Relay.storageService = new TestStorageService();
         dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
+        SimpleDhcpL2RelayCountersStore store = new SimpleDhcpL2RelayCountersStore();
+        store.componentConfigService = mockConfigService;
+        dhcpL2Relay.dhcpL2RelayCounters = store;
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
         dhcpL2Relay.activate(new ComponentContextAdapter());
     }
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 30f651f..179d6b6 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,14 +22,25 @@
 import org.onlab.junit.TestUtils;
 import org.onlab.osgi.ComponentContextAdapter;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 
@@ -59,11 +70,16 @@
         dhcpL2Relay.storageService = new TestStorageService();
         dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
-        dhcpL2Relay.activate(new ComponentContextAdapter());
         store = new SimpleDhcpL2RelayCountersStore();
+        store.storageService = new TestStorageService();
+        store.clusterService = new ClusterServiceAdapter();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterCommunicationService = new TestClusterCommunicationService<>();
+        store.componentConfigService = mockConfigService;
         TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
-        store.activate();
+        store.activate(new MockComponentContext());
         dhcpL2Relay.dhcpL2RelayCounters = this.store;
+        dhcpL2Relay.activate(new ComponentContextAdapter());
     }
 
     /**
@@ -80,20 +96,20 @@
     @Test
     public void testInitCounter() {
         // Init the supported global counter
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
         // Init the supported counter for a specific subscriber
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+        store.initCounters(CLIENT_ID_1, new DhcpL2RelayStatistics());
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
             long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 0, globalCounterValue);
-            assertEquals((long) 0, perSubscriberValue);
+                    counterType));
+            assertEquals(0, globalCounterValue);
+            assertEquals(0, perSubscriberValue);
         }
     }
 
@@ -104,27 +120,27 @@
     @Test
     public void testIncrementCounter() {
         // Init the supported global counter
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
 
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             // Increment of existing supported global counter
             dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
             // Add of a Subscriber entry that is not already in the set
             dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
         }
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
             long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 1, globalCounterValue);
-            assertEquals((long) 1, perSubscriberValue);
+                    counterType));
+            assertEquals(1, globalCounterValue);
+            assertEquals(1, perSubscriberValue);
         }
     }
 
@@ -133,12 +149,12 @@
      */
     @Test
     public void testIncrementAndResetCounter() {
-        DhcpL2RelayCounters counterType;
+        DhcpL2RelayCounterNames counterType;
         long subscriberValue;
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap;
 
         // First start incrementing the counter of a specific subscriber
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             // Insert of a Subscriber entry that is not already in the set
@@ -146,24 +162,24 @@
         }
 
         // Make sure that the counter is incremented
-        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 1, subscriberValue);
+                    counterType));
+            assertEquals(1, subscriberValue);
         }
 
         // Reset the counter
         dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
-        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 0, subscriberValue);
+                    counterType));
+            assertEquals(0, subscriberValue);
         }
     }
 
@@ -173,13 +189,84 @@
      */
     @Test
     public void testInsertOrUpdateCounter() {
-        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1,
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"), (long) 50);
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                CLIENT_ID_1, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
 
-        assertEquals((long) 50, subscriberValue);
+        assertEquals(50, subscriberValue);
+    }
+
+    public class TestClusterCommunicationService<M> implements ClusterCommunicationService {
+
+        private Consumer handler;
+
+        @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber, ExecutorService executor) {
+
+        }
+
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+
+        }
+
+        @Override
+        public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+            handler.accept(message);
+        }
+
+        @Override
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+                                                   Function<M, byte[]> encoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                                  Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder,
+                                                          Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder, Function<byte[], R> decoder,
+                                                          NodeId toNodeId, Duration timeout) {
+            return null;
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+
+        }
+
+        @Override
+        public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                      Consumer<M> handler, Executor executor) {
+            this.handler = handler;
+        }
+
+        @Override
+        public void removeSubscriber(MessageSubject subject) {
+
+        }
     }
 
 }
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 42664cc..4106ef1 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -16,7 +16,6 @@
 package org.opencord.dhcpl2relay.impl;
 
 import com.google.common.collect.Lists;
-import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,20 +26,21 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
 import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
+import static org.easymock.EasyMock.createMock;
 import static org.junit.Assert.assertEquals;
 
 public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
@@ -49,7 +49,7 @@
     private SimpleDhcpL2RelayCountersStore store;
 
     ComponentConfigService mockConfigService =
-            EasyMock.createMock(ComponentConfigService.class);
+            createMock(ComponentConfigService.class);
 
     /**
      * Sets up the services required by the dhcpl2relay app.
@@ -73,8 +73,13 @@
         dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
         dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());
         store = new SimpleDhcpL2RelayCountersStore();
+        store.storageService = new TestStorageService();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterService = new ClusterServiceAdapter();
+        store.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
+        store.componentConfigService = mockConfigService;
         TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
-        store.activate();
+        store.activate(new MockComponentContext());
         dhcpL2Relay.dhcpL2RelayCounters = this.store;
     }
 
@@ -207,20 +212,20 @@
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
         offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
         requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
         ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPACK")));
 
-        assertEquals((long) 1, discoveryValue);
-        assertEquals((long) 1, offerValue);
-        assertEquals((long) 1, requestValue);
-        assertEquals((long) 1, ackValue);
+        assertEquals(1, discoveryValue);
+        assertEquals(1, offerValue);
+        assertEquals(1, requestValue);
+        assertEquals(1, ackValue);
     }
 
     /**
@@ -246,32 +251,20 @@
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
         offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
         requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
         ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPACK")));
 
-        assertEquals((long) 1, discoveryValue);
-        assertEquals((long) 1, offerValue);
-        assertEquals((long) 1, requestValue);
-        assertEquals((long) 1, ackValue);
-    }
-
-    /**
-     * Tests the schedule function to publish the counters to kafka.
-     *
-     */
-    @Test
-    public void testSchedulePublishCountersToKafka() {
-        MockExecutor executor = new MockExecutor(dhcpL2Relay.refreshService);
-        dhcpL2Relay.refreshTask = executor.scheduleWithFixedDelay(
-                dhcpL2Relay.publishCountersToKafka, 0, 10, TimeUnit.SECONDS);
-        executor.assertLastMethodCalled("scheduleWithFixedDelay", 0, 10, TimeUnit.SECONDS);
+        assertEquals(1, discoveryValue);
+        assertEquals(1, offerValue);
+        assertEquals(1, requestValue);
+        assertEquals(1, ackValue);
     }
 
     public void compareClientPackets(Ethernet sent, Ethernet relayed) {
@@ -317,29 +310,38 @@
     }
 
     private class MockDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+
         @Override
-        public void initCounters(String counterClass) {
+        public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
 
         }
 
         @Override
-        public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+        public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
 
         }
 
         @Override
-        public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
-
-        }
-
-        @Override
-        public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
-            return new HashMap<>();
+        public DhcpL2RelayStatistics getCounters() {
+            return new DhcpL2RelayStatistics();
         }
 
         @Override
         public void resetCounters(String counterClass) {
 
         }
+
+        @Override
+        public void setDelegate(DhcpL2RelayStoreDelegate delegate) {
+        }
+
+        @Override
+        public void unsetDelegate(DhcpL2RelayStoreDelegate delegate) {
+        }
+
+        @Override
+        public boolean hasDelegate() {
+            return false;
+        }
     }
 }
\ No newline at end of file