Aggregate stats from all cluster nodes and publish

Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 42664cc..4106ef1 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -16,7 +16,6 @@
 package org.opencord.dhcpl2relay.impl;
 
 import com.google.common.collect.Lists;
-import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,20 +26,21 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
 import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
+import static org.easymock.EasyMock.createMock;
 import static org.junit.Assert.assertEquals;
 
 public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
@@ -49,7 +49,7 @@
     private SimpleDhcpL2RelayCountersStore store;
 
     ComponentConfigService mockConfigService =
-            EasyMock.createMock(ComponentConfigService.class);
+            createMock(ComponentConfigService.class);
 
     /**
      * Sets up the services required by the dhcpl2relay app.
@@ -73,8 +73,13 @@
         dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
         dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());
         store = new SimpleDhcpL2RelayCountersStore();
+        store.storageService = new TestStorageService();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterService = new ClusterServiceAdapter();
+        store.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
+        store.componentConfigService = mockConfigService;
         TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
-        store.activate();
+        store.activate(new MockComponentContext());
         dhcpL2Relay.dhcpL2RelayCounters = this.store;
     }
 
@@ -207,20 +212,20 @@
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
         offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
         requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
         ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPACK")));
 
-        assertEquals((long) 1, discoveryValue);
-        assertEquals((long) 1, offerValue);
-        assertEquals((long) 1, requestValue);
-        assertEquals((long) 1, ackValue);
+        assertEquals(1, discoveryValue);
+        assertEquals(1, offerValue);
+        assertEquals(1, requestValue);
+        assertEquals(1, ackValue);
     }
 
     /**
@@ -246,32 +251,20 @@
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
         offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
         requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
         ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPACK")));
 
-        assertEquals((long) 1, discoveryValue);
-        assertEquals((long) 1, offerValue);
-        assertEquals((long) 1, requestValue);
-        assertEquals((long) 1, ackValue);
-    }
-
-    /**
-     * Tests the schedule function to publish the counters to kafka.
-     *
-     */
-    @Test
-    public void testSchedulePublishCountersToKafka() {
-        MockExecutor executor = new MockExecutor(dhcpL2Relay.refreshService);
-        dhcpL2Relay.refreshTask = executor.scheduleWithFixedDelay(
-                dhcpL2Relay.publishCountersToKafka, 0, 10, TimeUnit.SECONDS);
-        executor.assertLastMethodCalled("scheduleWithFixedDelay", 0, 10, TimeUnit.SECONDS);
+        assertEquals(1, discoveryValue);
+        assertEquals(1, offerValue);
+        assertEquals(1, requestValue);
+        assertEquals(1, ackValue);
     }
 
     public void compareClientPackets(Ethernet sent, Ethernet relayed) {
@@ -317,29 +310,38 @@
     }
 
     private class MockDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+
         @Override
-        public void initCounters(String counterClass) {
+        public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
 
         }
 
         @Override
-        public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+        public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
 
         }
 
         @Override
-        public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
-
-        }
-
-        @Override
-        public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
-            return new HashMap<>();
+        public DhcpL2RelayStatistics getCounters() {
+            return new DhcpL2RelayStatistics();
         }
 
         @Override
         public void resetCounters(String counterClass) {
 
         }
+
+        @Override
+        public void setDelegate(DhcpL2RelayStoreDelegate delegate) {
+        }
+
+        @Override
+        public void unsetDelegate(DhcpL2RelayStoreDelegate delegate) {
+        }
+
+        @Override
+        public boolean hasDelegate() {
+            return false;
+        }
     }
 }
\ No newline at end of file