Aggregate stats from all cluster nodes and publish

Change-Id: Ic41cdcc8fc17845dabffc42b817b8bb7439a0b52
diff --git a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
index 596fb6c..a143681 100644
--- a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
+++ b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayEvent.java
@@ -33,8 +33,6 @@
 
     private final Map.Entry<String, AtomicLong> countersEntry;
 
-    private final String dhcpCountersTopic;
-
     private final String subscriberId;
 
     /**
@@ -64,16 +62,13 @@
      * @param allocationInfo DHCP allocation info
      * @param connectPoint connect point the client is on
      * @param countersEntry an entry that represents the counters (used for STATS events)
-     * @param dhcpCountersTopic Kafka topic where the dhcp counters are published
      * @param subscriberId the subscriber identifier information
      */
     public DhcpL2RelayEvent(Type type, DhcpAllocationInfo allocationInfo, ConnectPoint connectPoint,
-                            Map.Entry<String, AtomicLong> countersEntry,
-                            String dhcpCountersTopic, String subscriberId) {
+                            Map.Entry<String, AtomicLong> countersEntry, String subscriberId) {
         super(type, allocationInfo);
         this.connectPoint = connectPoint;
         this.countersEntry = countersEntry;
-        this.dhcpCountersTopic = dhcpCountersTopic;
         this.subscriberId = subscriberId;
     }
 
@@ -88,7 +83,6 @@
         super(type, allocationInfo);
         this.connectPoint = connectPoint;
         this.countersEntry = null;
-        this.dhcpCountersTopic = null;
         this.subscriberId = null;
     }
 
@@ -111,15 +105,6 @@
     }
 
     /**
-     * Gets the Kafka topic where the dhcp counters are published.
-     *
-     * @return the dhcp kafka topic
-     */
-    public String getDhcpCountersTopic() {
-        return dhcpCountersTopic;
-    }
-
-    /**
      * Gets the subscriber identifier information.
      *
      * @return the Id from subscriber
diff --git a/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayStoreDelegate.java b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayStoreDelegate.java
new file mode 100644
index 0000000..3ff6a49
--- /dev/null
+++ b/api/src/main/java/org/opencord/dhcpl2relay/DhcpL2RelayStoreDelegate.java
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.opencord.dhcpl2relay;
+
+import org.onosproject.store.StoreDelegate;
+
+/**
+ * Store delegate for DHCP L2 Relay store.
+ */
+public interface DhcpL2RelayStoreDelegate extends StoreDelegate<DhcpL2RelayEvent> {
+}
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
index 0f4901f..0335dff 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/cli/DhcpL2RelayStatsCommand.java
@@ -19,17 +19,16 @@
 
 import org.apache.karaf.shell.api.action.Argument;
 import org.apache.karaf.shell.api.action.Command;
-import org.apache.karaf.shell.api.action.lifecycle.Service;
 import org.apache.karaf.shell.api.action.Option;
+import org.apache.karaf.shell.api.action.lifecycle.Service;
 import org.onosproject.cli.AbstractShellCommand;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
-import org.opencord.dhcpl2relay.impl.DhcpL2RelayCounters;
+import org.opencord.dhcpl2relay.impl.DhcpL2RelayCounterNames;
 import org.opencord.dhcpl2relay.impl.DhcpL2RelayCountersIdentifier;
 import org.opencord.dhcpl2relay.impl.DhcpL2RelayCountersStore;
 
 import java.util.Collections;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * Display/Reset the DHCP L2 relay application statistics.
@@ -57,7 +56,7 @@
             description = "The counter to display (or reset). In case not specified, all counters\nwill be " +
                     "displayed (or reset in case the -r option is specified).",
             required = false, multiValued = false)
-    DhcpL2RelayCounters counter = null;
+    DhcpL2RelayCounterNames counter = null;
 
     @Override
     protected void doExecute() {
@@ -69,7 +68,7 @@
             subscriberId = DhcpL2RelayEvent.GLOBAL_COUNTER;
         }
 
-       if (reset) {
+        if (reset) {
             if (please == null || !please.equals(CONFIRM_PHRASE)) {
                 print("WARNING: Be aware that you are going to reset the counters. " +
                         "Enter confirmation phrase to continue.");
@@ -83,58 +82,57 @@
                 dhcpCounters.setCounter(subscriberId, counter, (long) 0);
             }
         } else {
-           Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpCounters.getCountersMap();
-           if (countersMap.size() > 0) {
-               if (counter == null) {
-                   String jsonString = "";
-                   if (outputJson()) {
-                       jsonString = String.format("{\"%s\":{", dhcpCounters.NAME);
-                   } else {
-                       print("%s [%s] :", dhcpCounters.NAME, subscriberId);
-                   }
-                   DhcpL2RelayCounters[] counters = DhcpL2RelayCounters.values();
-                   for (int i = 0; i < counters.length; i++) {
-                       DhcpL2RelayCounters counterType = counters[i];
-                       AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counterType));
-                       if (v == null) {
-                           v = new AtomicLong(0);
-                       }
-                       if (outputJson()) {
-                           jsonString += String.format("\"%s\":%d", counterType, v.longValue());
-                           if (i < counters.length - 1) {
-                               jsonString += ",";
-                           }
-                       } else {
-                           printCounter(counterType, v);
-                       }
-                   }
-                   if (outputJson()) {
-                       jsonString += "}}";
-                       print("%s", jsonString);
-                   }
-               } else {
-                   // Show only the specified counter
-                   AtomicLong v = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counter));
-                   if (v == null) {
-                       v = new AtomicLong(0);
-                   }
-                   if (outputJson()) {
-                       print("{\"%s\":%d}", counter, v.longValue());
-                   } else {
-                       printCounter(counter, v);
-                   }
-               }
-           } else {
-               print("No DHCP L2 Relay Counters were created yet for counter class [%s]",
-                       DhcpL2RelayEvent.GLOBAL_COUNTER);
-           }
-       }
+            Map<DhcpL2RelayCountersIdentifier, Long> countersMap = dhcpCounters.getCounters().counters();
+            if (countersMap.size() > 0) {
+                if (counter == null) {
+                    String jsonString = "";
+                    if (outputJson()) {
+                        jsonString = String.format("{\"%s\":{", dhcpCounters.NAME);
+                    } else {
+                        print("%s [%s] :", dhcpCounters.NAME, subscriberId);
+                    }
+                    DhcpL2RelayCounterNames[] counters = DhcpL2RelayCounterNames.values();
+                    for (int i = 0; i < counters.length; i++) {
+                        DhcpL2RelayCounterNames counterType = counters[i];
+                        Long value = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counterType));
+                        if (value == null) {
+                            value = 0L;
+                        }
+                        if (outputJson()) {
+                            jsonString += String.format("\"%s\":%d", counterType, value);
+                            if (i < counters.length - 1) {
+                                jsonString += ",";
+                            }
+                        } else {
+                            printCounter(counterType, value);
+                        }
+                    }
+                    if (outputJson()) {
+                        jsonString += "}}";
+                        print("%s", jsonString);
+                    }
+                } else {
+                    // Show only the specified counter
+                    Long value = countersMap.get(new DhcpL2RelayCountersIdentifier(subscriberId, counter));
+                    if (value == null) {
+                        value = 0L;
+                    }
+                    if (outputJson()) {
+                        print("{\"%s\":%d}", counter, value);
+                    } else {
+                        printCounter(counter, value);
+                    }
+                }
+            } else {
+                print("No DHCP L2 Relay Counters were created yet for counter class [%s]",
+                        DhcpL2RelayEvent.GLOBAL_COUNTER);
+            }
+        }
     }
 
-    void printCounter(DhcpL2RelayCounters c, AtomicLong a) {
+    private void printCounter(DhcpL2RelayCounterNames counterNames, long value) {
         // print in non-JSON format
-        print("  %s %s %-4d", c,
-                String.join("", Collections.nCopies(50 - c.toString().length(), ".")),
-                a.longValue());
+        print("  %s %s %-4d", counterNames,
+                String.join("", Collections.nCopies(50 - counterNames.toString().length(), ".")), value);
     }
 }
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
index 2d17579..c34d817 100755
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2Relay.java
@@ -30,7 +30,6 @@
 import org.onlab.packet.VlanId;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onlab.util.KryoNamespace;
-import org.onlab.util.SafeRecurringTask;
 import org.onlab.util.Tools;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cluster.ClusterService;
@@ -76,6 +75,7 @@
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 import org.opencord.dhcpl2relay.DhcpL2RelayListener;
 import org.opencord.dhcpl2relay.DhcpL2RelayService;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
 import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
 import org.opencord.sadis.BaseInformationService;
 import org.opencord.sadis.SadisService;
@@ -94,7 +94,6 @@
 import java.io.ByteArrayOutputStream;
 import java.nio.ByteBuffer;
 import java.time.Instant;
-import java.util.AbstractMap;
 import java.util.ArrayList;
 import java.util.Dictionary;
 import java.util.List;
@@ -105,25 +104,17 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Predicate;
 import java.util.stream.Collectors;
 
 import static org.onlab.packet.DHCP.DHCPOptionCode.OptionCode_MessageType;
 import static org.onlab.packet.MacAddress.valueOf;
-import static org.onlab.util.Tools.get;
-import static org.onlab.util.Tools.getIntegerProperty;
 import static org.onosproject.net.config.basics.SubjectFactories.APP_SUBJECT_FACTORY;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.DHCP_COUNTERS_TOPIC_DEFAULT;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82;
 import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.OPTION_82_DEFAULT;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
-import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
 
 /**
  * DHCP Relay Agent Application Component.
@@ -132,8 +123,6 @@
 property = {
         OPTION_82 + ":Boolean=" + OPTION_82_DEFAULT,
         ENABLE_DHCP_BROADCAST_REPLIES + ":Boolean=" + ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT,
-        PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
-        DHCP_COUNTERS_TOPIC + ":String=" + DHCP_COUNTERS_TOPIC_DEFAULT
 })
 public class DhcpL2Relay
         extends AbstractListenerManager<DhcpL2RelayEvent, DhcpL2RelayListener>
@@ -203,12 +192,6 @@
     /** Ask the DHCP Server to send back replies as L2 broadcast. */
     protected boolean enableDhcpBroadcastReplies = ENABLE_DHCP_BROADCAST_REPLIES_DEFAULT;
 
-    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
-    private String dhcpCountersTopic = DHCP_COUNTERS_TOPIC_DEFAULT;
-
-
-    protected PublishCountersToKafka publishCountersToKafka;
-
     ScheduledFuture<?> refreshTask;
     ScheduledExecutorService refreshService = Executors.newSingleThreadScheduledExecutor();
 
@@ -231,6 +214,8 @@
 
     private BaseInformationService<SubscriberAndDeviceInformation> subsService;
 
+    private DhcpL2RelayStoreDelegate delegate = new InnerDhcpL2RelayStoreDelegate();
+
     @Activate
     protected void activate(ComponentContext context) {
         //start the dhcp relay agent
@@ -256,6 +241,8 @@
 
         leadershipService.runForLeadership(LEADER_TOPIC);
 
+        dhcpL2RelayCounters.setDelegate(delegate);
+
         cfgService.addListener(cfgListener);
         mastershipService.addListener(changeListener);
         deviceService.addListener(deviceListener);
@@ -272,9 +259,6 @@
             modified(context);
         }
 
-        publishCountersToKafka = new PublishCountersToKafka();
-        restartPublishCountersTask();
-
         log.info("DHCP-L2-RELAY Started");
     }
 
@@ -286,6 +270,7 @@
         if (refreshService != null) {
             refreshService.shutdownNow();
         }
+        dhcpL2RelayCounters.unsetDelegate(delegate);
         cfgService.removeListener(cfgListener);
         factories.forEach(cfgService::unregisterConfigFactory);
         packetService.removeProcessor(dhcpRelayPacketProcessor);
@@ -313,41 +298,6 @@
         if (o != null) {
             enableDhcpBroadcastReplies = o;
         }
-
-        Integer newPublishCountersRate = getIntegerProperty(properties, "publishCountersRate");
-        if (newPublishCountersRate != null) {
-            if (newPublishCountersRate != publishCountersRate && newPublishCountersRate >= 0) {
-                log.info("publishCountersRate modified from {} to {}", publishCountersRate, newPublishCountersRate);
-                publishCountersRate = newPublishCountersRate;
-            } else if (newPublishCountersRate < 0) {
-                log.error("Invalid newPublishCountersRate : {}, defaulting to 0", newPublishCountersRate);
-                publishCountersRate = 0;
-            }
-            restartPublishCountersTask();
-        }
-
-        String newDhcpCountersTopic = get(properties, "dhcpCountersTopic");
-        if (newDhcpCountersTopic != null && !newDhcpCountersTopic.equals(dhcpCountersTopic)) {
-            log.info("Property dhcpCountersTopic modified from {} to {}", dhcpCountersTopic, newDhcpCountersTopic);
-            dhcpCountersTopic = newDhcpCountersTopic;
-        }
-    }
-
-    /**
-     * Starts a thread to publish the counters to kafka at a certain rate time.
-     */
-    private void restartPublishCountersTask() {
-        if (refreshTask != null) {
-            refreshTask.cancel(true);
-        }
-        if (publishCountersRate > 0) {
-            log.info("Refresh Rate set to {}, publishCountersToKafka will be called every {} seconds",
-                    publishCountersRate, publishCountersRate);
-            refreshTask = refreshService.scheduleWithFixedDelay(SafeRecurringTask.wrap(publishCountersToKafka),
-                    publishCountersRate, publishCountersRate, TimeUnit.SECONDS);
-        } else {
-            log.info("Refresh Rate set to 0, disabling calls to publishCountersToKafka");
-        }
     }
 
     @Override
@@ -356,27 +306,6 @@
     }
 
     /**
-     * Publish the counters to kafka.
-     */
-    private class PublishCountersToKafka implements Runnable {
-        public void run() {
-            dhcpL2RelayCounters.getCountersMap().forEach((counterKey, counterValue) -> {
-                // Publish the global counters
-                if (counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
-                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
-                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
-                                    counterValue), dhcpCountersTopic, null));
-                } else { // Publish the counters per subscriber
-                    DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(counterKey.counterClassKey));
-                    post(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, info, null,
-                            new AbstractMap.SimpleEntry<String, AtomicLong>(counterKey.counterTypeKey.toString(),
-                                    counterValue), dhcpCountersTopic, counterKey.counterClassKey));
-                }
-            });
-        }
-    }
-
-    /**
      * Checks if this app has been configured.
      *
      * @return true if all information we need have been initialized
@@ -707,7 +636,7 @@
                 packetService.emit(o);
 
                 SubscriberAndDeviceInformation entry = getSubscriberInfoFromClient(context);
-                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_TO_SERVER"));
+                updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_TO_SERVER"));
             } else {
                 log.error("No connect point to send msg to DHCP Server");
             }
@@ -726,7 +655,7 @@
         }
 
         private void  updateDhcpRelayCountersStore(SubscriberAndDeviceInformation entry,
-                                                   DhcpL2RelayCounters counterType) {
+                                                   DhcpL2RelayCounterNames counterType) {
             // Update global counter stats
             dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
             if (entry == null) {
@@ -796,7 +725,7 @@
                         forwardPacket(ethernetPacketDiscover, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"));
                     break;
                 case DHCPOFFER:
                     //reply to dhcp client.
@@ -806,7 +735,7 @@
                         sendReply(ethernetPacketOffer, dhcpPayload, context);
                     }
                     entry = getSubscriberInfoFromServer(dhcpPayload, context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPOFFER"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPOFFER"));
                     break;
                 case DHCPREQUEST:
                     Ethernet ethernetPacketRequest =
@@ -815,7 +744,7 @@
                         forwardPacket(ethernetPacketRequest, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPREQUEST"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPREQUEST"));
                     break;
                 case DHCPACK:
                     //reply to dhcp client.
@@ -825,7 +754,7 @@
                         sendReply(ethernetPacketAck, dhcpPayload, context);
                     }
                     entry = getSubscriberInfoFromServer(dhcpPayload, context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPACK"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPACK"));
                     break;
                 case DHCPDECLINE:
                     Ethernet ethernetPacketDecline =
@@ -834,7 +763,7 @@
                         forwardPacket(ethernetPacketDecline, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPDECLINE"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPDECLINE"));
                     break;
                 case DHCPNAK:
                     //reply to dhcp client.
@@ -844,7 +773,7 @@
                         sendReply(ethernetPacketNak, dhcpPayload, context);
                     }
                     entry = getSubscriberInfoFromServer(dhcpPayload, context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPNACK"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPNACK"));
                     break;
                 case DHCPRELEASE:
                     Ethernet ethernetPacketRelease =
@@ -853,7 +782,7 @@
                         forwardPacket(ethernetPacketRelease, context);
                     }
                     entry = getSubscriberInfoFromClient(context);
-                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("DHCPRELEASE"));
+                    updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("DHCPRELEASE"));
                     break;
                 default:
                     break;
@@ -975,7 +904,7 @@
                 return null;
             }
 
-            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounters.valueOf("PACKETS_FROM_SERVER"));
+            updateDhcpRelayCountersStore(entry, DhcpL2RelayCounterNames.valueOf("PACKETS_FROM_SERVER"));
 
             // we leave the srcMac from the original packet
             etherReply.setQinQVID(VlanId.NO_VID);
@@ -1216,4 +1145,21 @@
             }
         }
     }
+
+    private class InnerDhcpL2RelayStoreDelegate implements DhcpL2RelayStoreDelegate {
+        @Override
+        public void notify(DhcpL2RelayEvent event) {
+            if (event.type().equals(DhcpL2RelayEvent.Type.STATS_UPDATE)) {
+                DhcpL2RelayEvent toPost = event;
+                if (event.getSubscriberId() != null) {
+                    // infuse the event with the allocation info before posting
+                    DhcpAllocationInfo info = Versioned.valueOrNull(allocations.get(event.getSubscriberId()));
+                    toPost = new DhcpL2RelayEvent(event.type(), info, event.connectPoint(),
+                            event.getCountersEntry(), event.getSubscriberId());
+                }
+                post(toPost);
+            }
+
+        }
+    }
 }
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounterNames.java
similarity index 74%
rename from app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
rename to app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounterNames.java
index 71f7adc..8c2b694 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounters.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCounterNames.java
@@ -23,7 +23,7 @@
 /**
  * Represents DHCP relay counters type.
  */
-public enum DhcpL2RelayCounters {
+public enum DhcpL2RelayCounterNames {
     /**
      *  DHCP relay counter of type Discover.
      */
@@ -64,10 +64,10 @@
     /**
      * Supported types of DHCP relay counters.
      */
-    static final Set<DhcpL2RelayCounters> SUPPORTED_COUNTERS = ImmutableSet.of(DhcpL2RelayCounters.DHCPDISCOVER,
-            DhcpL2RelayCounters.DHCPRELEASE, DhcpL2RelayCounters.DHCPDECLINE,
-            DhcpL2RelayCounters.DHCPREQUEST, DhcpL2RelayCounters.DHCPOFFER,
-            DhcpL2RelayCounters.DHCPACK, DhcpL2RelayCounters.DHCPNACK,
-            DhcpL2RelayCounters.PACKETS_TO_SERVER,
-            DhcpL2RelayCounters.PACKETS_FROM_SERVER);
+    static final Set<DhcpL2RelayCounterNames> SUPPORTED_COUNTERS = ImmutableSet.of(DhcpL2RelayCounterNames.DHCPDISCOVER,
+            DhcpL2RelayCounterNames.DHCPRELEASE, DhcpL2RelayCounterNames.DHCPDECLINE,
+            DhcpL2RelayCounterNames.DHCPREQUEST, DhcpL2RelayCounterNames.DHCPOFFER,
+            DhcpL2RelayCounterNames.DHCPACK, DhcpL2RelayCounterNames.DHCPNACK,
+            DhcpL2RelayCounterNames.PACKETS_TO_SERVER,
+            DhcpL2RelayCounterNames.PACKETS_FROM_SERVER);
     }
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
index 63587e1..d6c7ef9 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersIdentifier.java
@@ -24,14 +24,14 @@
  */
 public final class DhcpL2RelayCountersIdentifier {
     final String counterClassKey;
-    final Enum<DhcpL2RelayCounters> counterTypeKey;
+    final Enum<DhcpL2RelayCounterNames> counterTypeKey;
 
     /**
      * Creates a default global counter identifier for a given counterType.
      *
      * @param counterTypeKey Identifies the supported type of DHCP relay counters
      */
-    public DhcpL2RelayCountersIdentifier(DhcpL2RelayCounters counterTypeKey) {
+    public DhcpL2RelayCountersIdentifier(DhcpL2RelayCounterNames counterTypeKey) {
         this.counterClassKey = DhcpL2RelayEvent.GLOBAL_COUNTER;
         this.counterTypeKey = counterTypeKey;
     }
@@ -43,7 +43,7 @@
      * @param counterClassKey Identifies which class the counter is assigned (global or per subscriber)
      * @param counterTypeKey Identifies the supported type of DHCP relay counters
      */
-    public DhcpL2RelayCountersIdentifier(String counterClassKey, DhcpL2RelayCounters counterTypeKey) {
+    public DhcpL2RelayCountersIdentifier(String counterClassKey, DhcpL2RelayCounterNames counterTypeKey) {
         this.counterClassKey = counterClassKey;
         this.counterTypeKey = counterTypeKey;
     }
@@ -66,4 +66,8 @@
     public int hashCode() {
         return Objects.hash(counterClassKey, counterTypeKey);
     }
+
+    public String toString() {
+        return this.counterClassKey + "/" + this.counterTypeKey;
+    }
 }
\ No newline at end of file
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
index b79006b..a8d20ce 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStore.java
@@ -16,31 +16,25 @@
 
 package org.opencord.dhcpl2relay.impl;
 
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import org.onosproject.store.Store;
+import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
 
 /**
  * Represents a stored DHCP Relay Counters. A counter entry is defined by the pair &lt;counterClass, counterType&gt;,
  * where counterClass can be maybe global or subscriber ID and counterType is the DHCP message type.
  */
-public interface DhcpL2RelayCountersStore {
+public interface DhcpL2RelayCountersStore extends Store<DhcpL2RelayEvent, DhcpL2RelayStoreDelegate> {
 
     String NAME = "DHCP_L2_Relay_stats";
 
     /**
-     * Init counter values for a given counter class.
-     *
-     * @param counterClass class of counters (global, per subscriber).
-     */
-    void initCounters(String counterClass);
-
-    /**
      * Creates or updates DHCP L2 Relay counter.
      *
      * @param counterClass class of counters (global, per subscriber).
      * @param counterType name of counter
      */
-    void incrementCounter(String counterClass, DhcpL2RelayCounters counterType);
+    void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType);
 
     /**
      * Sets the value of a DHCP L2 Relay counter.
@@ -49,14 +43,14 @@
      * @param counterType name of counter
      * @param value The value of the counter
      */
-    void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value);
+    void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value);
 
     /**
-     * Gets the DHCP L2 Relay counters map.
+     * Gets the current DHCP L2 relay counter values.
      *
-     * @return the DHCP counter map
+     * @return DHCP L2 relay counter values
      */
-    public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap();
+    DhcpL2RelayStatistics getCounters();
 
     /**
      * Resets counter values for a given counter class.
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatistics.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatistics.java
new file mode 100644
index 0000000..c719253
--- /dev/null
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayStatistics.java
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.opencord.dhcpl2relay.impl;
+
+import com.google.common.base.MoreObjects;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Snapshot of DHCP L2 Relay statistics.
+ */
+public class DhcpL2RelayStatistics {
+
+    private final ImmutableMap<DhcpL2RelayCountersIdentifier, Long> counters;
+
+    private DhcpL2RelayStatistics(ImmutableMap<DhcpL2RelayCountersIdentifier, Long> counters) {
+        this.counters = counters;
+    }
+
+    /**
+     * Creates a new empty statistics instance.
+     */
+    public DhcpL2RelayStatistics() {
+        counters = ImmutableMap.of();
+    }
+
+    /**
+     * Gets the value of the counter with the given ID. Defaults to 0 if counter is not present.
+     *
+     * @param id counter ID
+     * @return counter value
+     */
+    public long get(DhcpL2RelayCountersIdentifier id) {
+        return counters.getOrDefault(id, 0L);
+    }
+
+    /**
+     * Gets the map of counters.
+     *
+     * @return map of counters
+     */
+    public Map<DhcpL2RelayCountersIdentifier, Long> counters() {
+        return counters;
+    }
+
+    /**
+     * Creates a new statistics instance with the given counter values.
+     *
+     * @param counters counters
+     * @return statistics
+     */
+    public static DhcpL2RelayStatistics withCounters(Map<DhcpL2RelayCountersIdentifier, Long> counters) {
+        ImmutableMap.Builder<DhcpL2RelayCountersIdentifier, Long> builder = ImmutableMap.builder();
+
+        counters.forEach(builder::put);
+
+        return new DhcpL2RelayStatistics(builder.build());
+    }
+
+    /**
+     * Adds the given statistics instance to this one (sums the common counters) and returns
+     * a new instance containing the result.
+     *
+     * @param other other instance
+     * @return result
+     */
+    public DhcpL2RelayStatistics add(DhcpL2RelayStatistics other) {
+        ImmutableMap.Builder<DhcpL2RelayCountersIdentifier, Long> builder = ImmutableMap.builder();
+
+        Set<DhcpL2RelayCountersIdentifier> keys = Sets.newHashSet(other.counters.keySet());
+
+        counters.forEach((id, value) -> {
+            builder.put(id, value + other.counters.getOrDefault(id, 0L));
+            keys.remove(id);
+        });
+
+        keys.forEach(i -> builder.put(i, other.counters.get(i)));
+
+        return new DhcpL2RelayStatistics(builder.build());
+    }
+
+    @Override
+    public String toString() {
+        MoreObjects.ToStringHelper helper = MoreObjects.toStringHelper(this.getClass());
+        counters.forEach((id, v) -> helper.add(id.toString(), v));
+        return helper.toString();
+    }
+}
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
index af89aa3..3c47467 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/OsgiPropertyConstants.java
@@ -33,6 +33,6 @@
     public static final String PUBLISH_COUNTERS_RATE = "publishCountersRate";
     public static final int PUBLISH_COUNTERS_RATE_DEFAULT = 10;
 
-    public static final String DHCP_COUNTERS_TOPIC = "dhcpCountersTopic";
-    public static final String DHCP_COUNTERS_TOPIC_DEFAULT = "onos_traffic.stats";
+    public static final String SYNC_COUNTERS_RATE = "syncCountersRate";
+    public static final int SYNC_COUNTERS_RATE_DEFAULT = 5;
 }
diff --git a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
index eedbace..f95867c 100644
--- a/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
+++ b/app/src/main/java/org/opencord/dhcpl2relay/impl/SimpleDhcpL2RelayCountersStore.java
@@ -15,57 +15,215 @@
  */
 package org.opencord.dhcpl2relay.impl;
 
+import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
-import org.onosproject.core.ApplicationId;
-import org.onosproject.core.CoreService;
+import org.onlab.util.KryoNamespace;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterService;
+import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.AbstractStore;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
+import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.slf4j.Logger;
 
-import java.util.Iterator;
-import java.util.Map;
+import java.nio.charset.StandardCharsets;
+import java.util.AbstractMap;
+import java.util.Dictionary;
+import java.util.Objects;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 
-import static org.slf4j.LoggerFactory.getLogger;
 import static com.google.common.base.Preconditions.checkNotNull;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.PUBLISH_COUNTERS_RATE_DEFAULT;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE;
+import static org.opencord.dhcpl2relay.impl.OsgiPropertyConstants.SYNC_COUNTERS_RATE_DEFAULT;
+import static org.slf4j.LoggerFactory.getLogger;
 
 /**
  * DHCP Relay Agent Counters Manager Component.
  */
-@Component(immediate = true)
-public class SimpleDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
-    private ApplicationId appId;
+@Component(immediate = true,
+property = {
+        PUBLISH_COUNTERS_RATE + ":Integer=" + PUBLISH_COUNTERS_RATE_DEFAULT,
+        SYNC_COUNTERS_RATE + ":Integer=" + SYNC_COUNTERS_RATE_DEFAULT,
+}
+)
+public class SimpleDhcpL2RelayCountersStore extends AbstractStore<DhcpL2RelayEvent, DhcpL2RelayStoreDelegate>
+        implements DhcpL2RelayCountersStore {
+
+    private static final String DHCP_STATISTICS_LEADERSHIP = "dhcpl2relay-statistics";
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("dhcpl2relay-statistics-reset");
+
     private final Logger log = getLogger(getClass());
-    private Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+    private ConcurrentMap<DhcpL2RelayCountersIdentifier, Long> countersMap;
+
+    private EventuallyConsistentMap<NodeId, DhcpL2RelayStatistics> statistics;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
-    protected CoreService coreService;
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected LeadershipService leadershipService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService componentConfigService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
+
+    protected int publishCountersRate = PUBLISH_COUNTERS_RATE_DEFAULT;
+    protected int syncCountersRate = SYNC_COUNTERS_RATE_DEFAULT;
+
+    KryoNamespace serializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(DhcpL2RelayStatistics.class)
+            .register(DhcpL2RelayCountersIdentifier.class)
+            .register(DhcpL2RelayCounterNames.class)
+            .register(ClusterMessage.class)
+            .register(MessageSubject.class)
+            .build();
+
+    private ScheduledExecutorService executor;
+
+    private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
+
+    private AtomicBoolean dirty = new AtomicBoolean(true);
 
     @Activate
-    public void activate() {
+    public void activate(ComponentContext context) {
         log.info("Activate Dhcp L2 Counters Manager");
-        //appId = coreService.getAppId(DhcpL2Relay.DHCP_L2RELAY_APP);
-        countersMap = new ConcurrentHashMap();
+        countersMap = new ConcurrentHashMap<>();
+        componentConfigService.registerProperties(getClass());
+
+        modified(context);
+
+        statistics = storageService.<NodeId, DhcpL2RelayStatistics>eventuallyConsistentMapBuilder()
+                .withName("dhcpl2relay-statistics")
+                .withSerializer(serializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+
         // Initialize counter values for the global counters
-        initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, statistics.get(clusterService.getLocalNode().id()));
+        syncStats();
+
+        leadershipService.runForLeadership(DHCP_STATISTICS_LEADERSHIP);
+
+        executor = Executors.newScheduledThreadPool(1);
+
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
+                this::resetLocal, executor);
+
+        startSyncTask();
+        startPublishTask();
     }
 
-    public ImmutableMap<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
+    @Deactivate
+    public void deactivate() {
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+        leadershipService.withdraw(DHCP_STATISTICS_LEADERSHIP);
+
+        stopPublishTask();
+        stopSyncTask();
+        executor.shutdownNow();
+        componentConfigService.unregisterProperties(getClass(), false);
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+
+        String s = Tools.get(properties, PUBLISH_COUNTERS_RATE);
+        int oldPublishCountersRate = publishCountersRate;
+        publishCountersRate = Strings.isNullOrEmpty(s) ? PUBLISH_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldPublishCountersRate != publishCountersRate) {
+            stopPublishTask();
+            startPublishTask();
+        }
+
+        s = Tools.get(properties, SYNC_COUNTERS_RATE);
+        int oldSyncCountersRate = syncCountersRate;
+        syncCountersRate = Strings.isNullOrEmpty(s) ? SYNC_COUNTERS_RATE_DEFAULT
+                : Integer.parseInt(s.trim());
+        if (oldSyncCountersRate != syncCountersRate) {
+            stopSyncTask();
+            startSyncTask();
+        }
+    }
+
+    private ScheduledFuture<?> startTask(Runnable r, int rate) {
+        return executor.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+                0, rate, TimeUnit.SECONDS);
+    }
+
+    private void stopTask(ScheduledFuture<?> task) {
+        task.cancel(true);
+    }
+
+    private void startSyncTask() {
+        syncTask = startTask(this::syncStats, syncCountersRate);
+    }
+
+    private void stopSyncTask() {
+        stopTask(syncTask);
+    }
+
+    private void startPublishTask() {
+        publisherTask = startTask(this::publishStats, publishCountersRate);
+    }
+
+    private void stopPublishTask() {
+        stopTask(publisherTask);
+    }
+
+    ImmutableMap<DhcpL2RelayCountersIdentifier, Long> getCountersMap() {
         return ImmutableMap.copyOf(countersMap);
     }
 
+    public DhcpL2RelayStatistics getCounters() {
+        return aggregate();
+    }
+
     /**
      * Initialize the supported counters map for the given counter class.
      * @param counterClass class of counters (global, per subscriber)
+     * @param existingStats existing values to intialise the counters to
      */
-    public void initCounters(String counterClass) {
+    public void initCounters(String counterClass, DhcpL2RelayStatistics existingStats) {
         checkNotNull(counterClass, "counter class can't be null");
-        for (DhcpL2RelayCounters counterType : DhcpL2RelayCounters.SUPPORTED_COUNTERS) {
-            countersMap.put(new DhcpL2RelayCountersIdentifier(counterClass, counterType), new AtomicLong(0));
+        for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
+            DhcpL2RelayCountersIdentifier id = new DhcpL2RelayCountersIdentifier(counterClass, counterType);
+            countersMap.put(id, existingStats == null ? 0L : existingStats.get(id));
         }
     }
 
@@ -74,17 +232,17 @@
      * @param counterClass class of counters (global, per subscriber)
      * @param counterType name of counter
      */
-    public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+    public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
         checkNotNull(counterClass, "counter class can't be null");
-        if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+        if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
             DhcpL2RelayCountersIdentifier counterIdentifier =
                     new DhcpL2RelayCountersIdentifier(counterClass, counterType);
             countersMap.compute(counterIdentifier, (key, counterValue) ->
-                (counterValue != null) ? new AtomicLong(counterValue.incrementAndGet()) : new AtomicLong(1)
-            );
+                (counterValue != null) ? counterValue + 1 : 1L);
         } else {
             log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
         }
+        dirty.set(true);
     }
 
     /**
@@ -92,31 +250,84 @@
      * @param counterClass class of counters (global, per subscriber)
      */
     public void resetCounters(String counterClass) {
+        byte[] payload = counterClass.getBytes(StandardCharsets.UTF_8);
+        ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, payload);
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
+    }
+
+    private void resetLocal(ClusterMessage m) {
+        String counterClass = new String(m.payload(), StandardCharsets.UTF_8);
+
         checkNotNull(counterClass, "counter class can't be null");
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator(); it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+        for (DhcpL2RelayCounterNames counterType : DhcpL2RelayCounterNames.SUPPORTED_COUNTERS) {
             DhcpL2RelayCountersIdentifier counterIdentifier =
                     new DhcpL2RelayCountersIdentifier(counterClass, counterType);
-            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) ->
-                    new AtomicLong(0)
-            );
+            countersMap.computeIfPresent(counterIdentifier, (key, counterValue) -> 0L);
         }
+        dirty.set(true);
+        syncStats();
     }
 
     /**
      * Inserts the counter entry if it is not already in the set otherwise update the existing counter entry.
      * @param counterClass class of counters (global, per subscriber).
      * @param counterType name of counter
-     * @param value conter value
+     * @param value counter value
      */
-    public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
+    public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
         checkNotNull(counterClass, "counter class can't be null");
-        if (DhcpL2RelayCounters.SUPPORTED_COUNTERS.contains(counterType)) {
+        if (DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.contains(counterType)) {
             DhcpL2RelayCountersIdentifier counterIdentifier =
                     new DhcpL2RelayCountersIdentifier(counterClass, counterType);
-            countersMap.put(counterIdentifier, new AtomicLong(value));
+            countersMap.put(counterIdentifier, value);
         } else {
             log.error("Failed to increment counter class {} of type {}", counterClass, counterType);
         }
+        dirty.set(true);
+        syncStats();
+    }
+
+    private DhcpL2RelayStatistics aggregate() {
+        return statistics.values().stream()
+                .reduce(new DhcpL2RelayStatistics(), DhcpL2RelayStatistics::add);
+    }
+
+    /**
+     * Creates a snapshot of the current in-memory statistics.
+     *
+     * @return snapshot of statistics
+     */
+    private DhcpL2RelayStatistics snapshot() {
+        return DhcpL2RelayStatistics.withCounters(countersMap);
+    }
+
+    /**
+     * Syncs in-memory stats to the eventually consistent map.
+     */
+    private void syncStats() {
+        if (dirty.get()) {
+            statistics.put(clusterService.getLocalNode().id(), snapshot());
+            dirty.set(false);
+        }
+    }
+
+    private void publishStats() {
+        // Only publish events if we are the cluster leader for DHCP L2 relay stats
+        if (!Objects.equals(leadershipService.getLeader(DHCP_STATISTICS_LEADERSHIP),
+                clusterService.getLocalNode().id())) {
+            return;
+        }
+
+        aggregate().counters().forEach((counterKey, counterValue) -> {
+            // Subscriber-specific counters have the subscriber ID set
+            String subscriberId = null;
+            if (!counterKey.counterClassKey.equals(DhcpL2RelayEvent.GLOBAL_COUNTER)) {
+                subscriberId = counterKey.counterClassKey;
+            }
+
+            delegate.notify(new DhcpL2RelayEvent(DhcpL2RelayEvent.Type.STATS_UPDATE, null, null,
+                    new AbstractMap.SimpleEntry<>(counterKey.counterTypeKey.toString(),
+                            new AtomicLong(counterValue)), subscriberId));
+        });
     }
 }
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
index d4cb8bd..e218088 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayConfigTest.java
@@ -66,6 +66,9 @@
         dhcpL2Relay.mastershipService = new MockMastershipService();
         dhcpL2Relay.storageService = new TestStorageService();
         dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
+        SimpleDhcpL2RelayCountersStore store = new SimpleDhcpL2RelayCountersStore();
+        store.componentConfigService = mockConfigService;
+        dhcpL2Relay.dhcpL2RelayCounters = store;
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
         dhcpL2Relay.activate(new ComponentContextAdapter());
     }
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
index 30f651f..179d6b6 100644
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayCountersStoreTest.java
@@ -22,14 +22,25 @@
 import org.onlab.junit.TestUtils;
 import org.onlab.osgi.ComponentContextAdapter;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.LeadershipServiceAdapter;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessageHandler;
+import org.onosproject.store.cluster.messaging.MessageSubject;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
 
+import java.time.Duration;
 import java.util.Iterator;
 import java.util.Map;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.function.Consumer;
+import java.util.function.Function;
 
 import static org.junit.Assert.assertEquals;
 
@@ -59,11 +70,16 @@
         dhcpL2Relay.storageService = new TestStorageService();
         dhcpL2Relay.leadershipService = new LeadershipServiceAdapter();
         TestUtils.setField(dhcpL2Relay, "eventDispatcher", new TestEventDispatcher());
-        dhcpL2Relay.activate(new ComponentContextAdapter());
         store = new SimpleDhcpL2RelayCountersStore();
+        store.storageService = new TestStorageService();
+        store.clusterService = new ClusterServiceAdapter();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterCommunicationService = new TestClusterCommunicationService<>();
+        store.componentConfigService = mockConfigService;
         TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
-        store.activate();
+        store.activate(new MockComponentContext());
         dhcpL2Relay.dhcpL2RelayCounters = this.store;
+        dhcpL2Relay.activate(new ComponentContextAdapter());
     }
 
     /**
@@ -80,20 +96,20 @@
     @Test
     public void testInitCounter() {
         // Init the supported global counter
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
         // Init the supported counter for a specific subscriber
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(CLIENT_ID_1);
+        store.initCounters(CLIENT_ID_1, new DhcpL2RelayStatistics());
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
             long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 0, globalCounterValue);
-            assertEquals((long) 0, perSubscriberValue);
+                    counterType));
+            assertEquals(0, globalCounterValue);
+            assertEquals(0, perSubscriberValue);
         }
     }
 
@@ -104,27 +120,27 @@
     @Test
     public void testIncrementCounter() {
         // Init the supported global counter
-        dhcpL2Relay.dhcpL2RelayCounters.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER);
+        store.initCounters(DhcpL2RelayEvent.GLOBAL_COUNTER, new DhcpL2RelayStatistics());
 
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             // Increment of existing supported global counter
             dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(DhcpL2RelayEvent.GLOBAL_COUNTER, counterType);
             // Add of a Subscriber entry that is not already in the set
             dhcpL2Relay.dhcpL2RelayCounters.incrementCounter(CLIENT_ID_1, counterType);
         }
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
-            DhcpL2RelayCounters counterType = it.next();
+            DhcpL2RelayCounterNames counterType = it.next();
             long globalCounterValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType)).longValue();
+                    DhcpL2RelayEvent.GLOBAL_COUNTER, counterType));
             long perSubscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 1, globalCounterValue);
-            assertEquals((long) 1, perSubscriberValue);
+                    counterType));
+            assertEquals(1, globalCounterValue);
+            assertEquals(1, perSubscriberValue);
         }
     }
 
@@ -133,12 +149,12 @@
      */
     @Test
     public void testIncrementAndResetCounter() {
-        DhcpL2RelayCounters counterType;
+        DhcpL2RelayCounterNames counterType;
         long subscriberValue;
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap;
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap;
 
         // First start incrementing the counter of a specific subscriber
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             // Insert of a Subscriber entry that is not already in the set
@@ -146,24 +162,24 @@
         }
 
         // Make sure that the counter is incremented
-        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 1, subscriberValue);
+                    counterType));
+            assertEquals(1, subscriberValue);
         }
 
         // Reset the counter
         dhcpL2Relay.dhcpL2RelayCounters.resetCounters(CLIENT_ID_1);
-        countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
-        for (Iterator<DhcpL2RelayCounters> it = DhcpL2RelayCounters.SUPPORTED_COUNTERS.iterator();
+        countersMap = store.getCountersMap();
+        for (Iterator<DhcpL2RelayCounterNames> it = DhcpL2RelayCounterNames.SUPPORTED_COUNTERS.iterator();
              it.hasNext();) {
             counterType = it.next();
             subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                    counterType)).longValue();
-            assertEquals((long) 0, subscriberValue);
+                    counterType));
+            assertEquals(0, subscriberValue);
         }
     }
 
@@ -173,13 +189,84 @@
      */
     @Test
     public void testInsertOrUpdateCounter() {
-        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"), (long) 50);
+        dhcpL2Relay.dhcpL2RelayCounters.setCounter(CLIENT_ID_1,
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER"), (long) 50);
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         long subscriberValue = countersMap.get(new DhcpL2RelayCountersIdentifier(
-                CLIENT_ID_1, DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                CLIENT_ID_1, DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
 
-        assertEquals((long) 50, subscriberValue);
+        assertEquals(50, subscriberValue);
+    }
+
+    public class TestClusterCommunicationService<M> implements ClusterCommunicationService {
+
+        private Consumer handler;
+
+        @Override
+        public void addSubscriber(MessageSubject subject,
+                                  ClusterMessageHandler subscriber, ExecutorService executor) {
+
+        }
+
+        @Override
+        public <M> void broadcast(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+
+        }
+
+        @Override
+        public <M> void broadcastIncludeSelf(M message, MessageSubject subject, Function<M, byte[]> encoder) {
+            handler.accept(message);
+        }
+
+        @Override
+        public <M> CompletableFuture<Void> unicast(M message, MessageSubject subject,
+                                                   Function<M, byte[]> encoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M> void multicast(M message, MessageSubject subject,
+                                  Function<M, byte[]> encoder, Set<NodeId> nodeIds) {
+
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder,
+                                                          Function<byte[], R> decoder, NodeId toNodeId) {
+            return null;
+        }
+
+        @Override
+        public <M, R> CompletableFuture<R> sendAndReceive(M message, MessageSubject subject,
+                                                          Function<M, byte[]> encoder, Function<byte[], R> decoder,
+                                                          NodeId toNodeId, Duration timeout) {
+            return null;
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, R> handler, Function<R, byte[]> encoder, Executor executor) {
+
+        }
+
+        @Override
+        public <M, R> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                         Function<M, CompletableFuture<R>> handler, Function<R, byte[]> encoder) {
+
+        }
+
+        @Override
+        public <M> void addSubscriber(MessageSubject subject, Function<byte[], M> decoder,
+                                      Consumer<M> handler, Executor executor) {
+            this.handler = handler;
+        }
+
+        @Override
+        public void removeSubscriber(MessageSubject subject) {
+
+        }
     }
 
 }
\ No newline at end of file
diff --git a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
index 42664cc..4106ef1 100755
--- a/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
+++ b/app/src/test/java/org/opencord/dhcpl2relay/impl/DhcpL2RelayTest.java
@@ -16,7 +16,6 @@
 package org.opencord.dhcpl2relay.impl;
 
 import com.google.common.collect.Lists;
-import org.easymock.EasyMock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -27,20 +26,21 @@
 import org.onlab.packet.UDP;
 import org.onlab.packet.dhcp.DhcpOption;
 import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cluster.ClusterServiceAdapter;
 import org.onosproject.cluster.LeadershipServiceAdapter;
 import org.onosproject.net.ConnectPoint;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
 import org.onosproject.store.service.TestStorageService;
 import org.opencord.dhcpl2relay.DhcpL2RelayEvent;
+import org.opencord.dhcpl2relay.DhcpL2RelayStoreDelegate;
 import org.opencord.dhcpl2relay.impl.packet.DhcpOption82;
 
 import java.nio.ByteBuffer;
-import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
 
+import static org.easymock.EasyMock.createMock;
 import static org.junit.Assert.assertEquals;
 
 public class DhcpL2RelayTest extends DhcpL2RelayTestBase {
@@ -49,7 +49,7 @@
     private SimpleDhcpL2RelayCountersStore store;
 
     ComponentConfigService mockConfigService =
-            EasyMock.createMock(ComponentConfigService.class);
+            createMock(ComponentConfigService.class);
 
     /**
      * Sets up the services required by the dhcpl2relay app.
@@ -73,8 +73,13 @@
         dhcpL2Relay.refreshService = new MockExecutor(dhcpL2Relay.refreshService);
         dhcpL2Relay.activate(new DhcpL2RelayTestBase.MockComponentContext());
         store = new SimpleDhcpL2RelayCountersStore();
+        store.storageService = new TestStorageService();
+        store.leadershipService = new LeadershipServiceAdapter();
+        store.clusterService = new ClusterServiceAdapter();
+        store.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
+        store.componentConfigService = mockConfigService;
         TestUtils.setField(store, "eventDispatcher", new TestEventDispatcher());
-        store.activate();
+        store.activate(new MockComponentContext());
         dhcpL2Relay.dhcpL2RelayCounters = this.store;
     }
 
@@ -207,20 +212,20 @@
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
         offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
         requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
         ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(DhcpL2RelayEvent.GLOBAL_COUNTER,
-                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPACK")));
 
-        assertEquals((long) 1, discoveryValue);
-        assertEquals((long) 1, offerValue);
-        assertEquals((long) 1, requestValue);
-        assertEquals((long) 1, ackValue);
+        assertEquals(1, discoveryValue);
+        assertEquals(1, offerValue);
+        assertEquals(1, requestValue);
+        assertEquals(1, ackValue);
     }
 
     /**
@@ -246,32 +251,20 @@
         sendPacket(requestPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
         sendPacket(ackPacket, ConnectPoint.deviceConnectPoint(OLT_DEV_ID + "/" + 1));
 
-        Map<DhcpL2RelayCountersIdentifier, AtomicLong> countersMap = dhcpL2Relay.dhcpL2RelayCounters.getCountersMap();
+        Map<DhcpL2RelayCountersIdentifier, Long> countersMap = store.getCountersMap();
         discoveryValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPDISCOVER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPDISCOVER")));
         offerValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPOFFER"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPOFFER")));
         requestValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPREQUEST"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPREQUEST")));
         ackValue = countersMap.get(new DhcpL2RelayCountersIdentifier(CLIENT_ID_1,
-                DhcpL2RelayCounters.valueOf("DHCPACK"))).longValue();
+                DhcpL2RelayCounterNames.valueOf("DHCPACK")));
 
-        assertEquals((long) 1, discoveryValue);
-        assertEquals((long) 1, offerValue);
-        assertEquals((long) 1, requestValue);
-        assertEquals((long) 1, ackValue);
-    }
-
-    /**
-     * Tests the schedule function to publish the counters to kafka.
-     *
-     */
-    @Test
-    public void testSchedulePublishCountersToKafka() {
-        MockExecutor executor = new MockExecutor(dhcpL2Relay.refreshService);
-        dhcpL2Relay.refreshTask = executor.scheduleWithFixedDelay(
-                dhcpL2Relay.publishCountersToKafka, 0, 10, TimeUnit.SECONDS);
-        executor.assertLastMethodCalled("scheduleWithFixedDelay", 0, 10, TimeUnit.SECONDS);
+        assertEquals(1, discoveryValue);
+        assertEquals(1, offerValue);
+        assertEquals(1, requestValue);
+        assertEquals(1, ackValue);
     }
 
     public void compareClientPackets(Ethernet sent, Ethernet relayed) {
@@ -317,29 +310,38 @@
     }
 
     private class MockDhcpL2RelayCountersStore implements DhcpL2RelayCountersStore {
+
         @Override
-        public void initCounters(String counterClass) {
+        public void incrementCounter(String counterClass, DhcpL2RelayCounterNames counterType) {
 
         }
 
         @Override
-        public void incrementCounter(String counterClass, DhcpL2RelayCounters counterType) {
+        public void setCounter(String counterClass, DhcpL2RelayCounterNames counterType, Long value) {
 
         }
 
         @Override
-        public void setCounter(String counterClass, DhcpL2RelayCounters counterType, Long value) {
-
-        }
-
-        @Override
-        public Map<DhcpL2RelayCountersIdentifier, AtomicLong> getCountersMap() {
-            return new HashMap<>();
+        public DhcpL2RelayStatistics getCounters() {
+            return new DhcpL2RelayStatistics();
         }
 
         @Override
         public void resetCounters(String counterClass) {
 
         }
+
+        @Override
+        public void setDelegate(DhcpL2RelayStoreDelegate delegate) {
+        }
+
+        @Override
+        public void unsetDelegate(DhcpL2RelayStoreDelegate delegate) {
+        }
+
+        @Override
+        public boolean hasDelegate() {
+            return false;
+        }
     }
 }
\ No newline at end of file