SEBA-989-Instance coordination and state distribution mechanism in IgmpStatisticsManager

Change-Id: Ibf3f3a2c5c91c010ef909692eea913f95ee7a92e
diff --git a/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java b/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
index 1b8d0fe..c3563a4 100644
--- a/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
+++ b/api/src/main/java/org/opencord/igmpproxy/IgmpLeadershipService.java
@@ -15,6 +15,8 @@
  */
 package org.opencord.igmpproxy;
 
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.net.DeviceId;
 
 /**
@@ -28,4 +30,34 @@
      * @return if it is leadership of this device, return true
      */
     boolean isLocalLeader(DeviceId deviceId);
+
+    /**
+     * Gets local node id.
+     *
+     * @return node id
+     */
+    NodeId getLocalNodeId();
+
+    /**
+     * Gets leader for topic.
+     *
+     * @param topic topic name
+     * @return leader of topic
+     */
+    NodeId getLeader(String topic);
+
+    /**
+     * Enters a leadership contest.
+     *
+     * @param topic leadership topic
+     * @return {@code Leadership} future
+     */
+    Leadership runForLeadership(String topic);
+
+    /**
+     * Withdraws from a leadership contest.
+     *
+     * @param topic leadership topic
+     */
+    void withdraw(String topic);
 }
diff --git a/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticType.java b/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticType.java
new file mode 100644
index 0000000..0a94569
--- /dev/null
+++ b/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticType.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+/**
+ * Types of igmp-statistics.
+ */
+public enum IgmpStatisticType {
+    /**
+     * Join request.
+     */
+    IGMP_JOIN_REQ,
+    /**
+     * Success re-join.
+     */
+    IGMP_SUCCESS_JOIN_RE_JOIN_REQ,
+    /**
+     * Fail join request.
+     */
+    IGMP_FAIL_JOIN_REQ,
+    /**
+     * Leave request.
+     */
+    IGMP_LEAVE_REQ,
+    /**
+     * Igmp disconnect.
+     */
+    IGMP_DISCONNECT,
+    /**
+     * Igmp v3 membership query.
+     */
+    IGMP_V3_MEMBERSHIP_QUERY,
+    /**
+     * Igmp v1 membership report.
+     */
+    IGMP_V1_MEMBERSHIP_REPORT,
+    /**
+     * Igmp v2 membeship report.
+     */
+    IGMP_V2_MEMBERSHIP_REPORT,
+    /**
+     * Igmp v3 membeship report.
+     */
+    IGMP_V3_MEMBERSHIP_REPORT,
+    /**
+     * Igmp v2 leave group.
+     */
+    IGMP_V2_LEAVE_GROUP,
+    /**
+     * Received total message.
+     */
+    TOTAL_MSG_RECEIVED,
+    /**
+     * Received igmp-message.
+     */
+    IGMP_MSG_RECEIVED,
+    /**
+     * Received invalid igmp-message.
+     */
+    INVALID_IGMP_MSG_RECEIVED,
+    /**
+     * Unknown igmp rx packets counter.
+     */
+    UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER,
+    /**
+     * Wrong mode rx counter reports.
+     */
+    REPORTS_RX_WITH_WRONG_MODE_COUNTER,
+    /**
+     * Insuff permission access counter of fail join request.
+     */
+    FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER,
+    /**
+     * Unknown mcast ip counter of fail join.
+     */
+    FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER,
+    /**
+     * Unconfigured group counter.
+     */
+    UNCONFIGURED_GROUP_COUNTER,
+    /**
+     * Valid igmp packet counter.
+     */
+    VALID_IGMP_PACKET_COUNTER,
+    /**
+     * Igmp channel join counter.
+     */
+    IGMP_CHANNEL_JOIN_COUNTER,
+    /**
+     * Current grp number counter.
+     */
+    CURRENT_GRP_NUMBER_COUNTER,
+    /**
+     * Igmp valid checksum counter.
+     */
+    IGMP_VALID_CHECKSUM_COUNTER,
+    /**
+     * Invalid igmp length.
+     */
+    INVALID_IGMP_LENGTH,
+    /**
+     * Igmp general membership query.
+     */
+    IGMP_GENERAL_MEMBERSHIP_QUERY,
+    /**
+     * Igmp grp specific membership query.
+     */
+    IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY,
+    /**
+     * Igmp grp and src spesific membership query.
+     */
+    IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY;
+}
diff --git a/api/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java b/api/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
index f330e9f..d101d60 100644
--- a/api/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
+++ b/api/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
@@ -15,14 +15,18 @@
  */
 package org.opencord.igmpproxy;
 
+import org.slf4j.Logger;
+
 import java.util.concurrent.atomic.AtomicLong;
 
+import static org.slf4j.LoggerFactory.getLogger;
+
 /**
- *
  * Records metrics for IgmpProxy application.
- *
  */
 public class IgmpStatistics {
+    private final Logger log = getLogger(getClass());
+    private static final long RESET_VALUE = 0L;
 
     //Total number of join requests
     private AtomicLong igmpJoinReq = new AtomicLong();
@@ -77,212 +81,239 @@
     //Total number of group and source specific IGMP membership query messages received
     private AtomicLong igmpGrpAndSrcSpecificMembershipQuery = new AtomicLong();
 
-    public Long getIgmpJoinReq() {
-        return igmpJoinReq.get();
+    public void setStats(IgmpStatistics current) {
+        igmpJoinReq.set(current.igmpJoinReq.get());
+        igmpSuccessJoinRejoinReq.set(current.igmpSuccessJoinRejoinReq.get());
+        igmpFailJoinReq.set(current.igmpFailJoinReq.get());
+        igmpLeaveReq.set(current.igmpLeaveReq.get());
+        igmpDisconnect.set(current.igmpDisconnect.get());
+        igmpv3MembershipQuery.set(current.igmpv3MembershipQuery.get());
+        igmpv1MembershipReport.set(current.igmpv1MembershipReport.get());
+        igmpv3MembershipReport.set(current.igmpv3MembershipReport.get());
+        igmpv2MembershipReport.set(current.igmpv2MembershipReport.get());
+        igmpv2LeaveGroup.set(current.igmpv2LeaveGroup.get());
+        totalMsgReceived.set(current.totalMsgReceived.get());
+        igmpMsgReceived.set(current.igmpMsgReceived.get());
+        invalidIgmpMsgReceived.set(current.invalidIgmpMsgReceived.get());
+        unknownIgmpTypePacketsRxCounter.set(current.unknownIgmpTypePacketsRxCounter.get());
+        reportsRxWithWrongModeCounter.set(current.reportsRxWithWrongModeCounter.get());
+        failJoinReqInsuffPermissionAccessCounter.set(current.failJoinReqInsuffPermissionAccessCounter.get());
+        failJoinReqUnknownMulticastIpCounter.set(current.failJoinReqUnknownMulticastIpCounter.get());
+        unconfiguredGroupCounter.set(current.unconfiguredGroupCounter.get());
+        validIgmpPacketCounter.set(current.validIgmpPacketCounter.get());
+        igmpChannelJoinCounter.set(current.igmpChannelJoinCounter.get());
+        currentGrpNumCounter.set(current.currentGrpNumCounter.get());
+        igmpValidChecksumCounter.set(current.igmpValidChecksumCounter.get());
+        invalidIgmpLength.set(current.invalidIgmpLength.get());
+        igmpGeneralMembershipQuery.set(current.igmpGeneralMembershipQuery.get());
+        igmpGrpSpecificMembershipQuery.set(current.igmpGrpSpecificMembershipQuery.get());
+        igmpGrpAndSrcSpecificMembershipQuery.set(current.igmpGrpAndSrcSpecificMembershipQuery.get());
     }
 
-    public Long getIgmpSuccessJoinRejoinReq() {
-        return igmpSuccessJoinRejoinReq.get();
+    public void resetAll() {
+        igmpJoinReq.set(RESET_VALUE);
+        igmpLeaveReq.set(RESET_VALUE);
+        igmpDisconnect.set(RESET_VALUE);
+        totalMsgReceived.set(RESET_VALUE);
+        igmpv2LeaveGroup.set(RESET_VALUE);
+        invalidIgmpLength.set(RESET_VALUE);
+        igmpv3MembershipQuery.set(RESET_VALUE);
+        igmpChannelJoinCounter.set(RESET_VALUE);
+        igmpv1MembershipReport.set(RESET_VALUE);
+        igmpv2MembershipReport.set(RESET_VALUE);
+        igmpv3MembershipReport.set(RESET_VALUE);
+        invalidIgmpMsgReceived.set(RESET_VALUE);
+        validIgmpPacketCounter.set(RESET_VALUE);
+        currentGrpNumCounter.set(RESET_VALUE);
+        igmpFailJoinReq.set(RESET_VALUE);
+        unconfiguredGroupCounter.set(RESET_VALUE);
+        igmpValidChecksumCounter.set(RESET_VALUE);
+        igmpGeneralMembershipQuery.set(RESET_VALUE);
+        igmpSuccessJoinRejoinReq.set(RESET_VALUE);
+        igmpGrpSpecificMembershipQuery.set(RESET_VALUE);
+        reportsRxWithWrongModeCounter.set(RESET_VALUE);
+        unknownIgmpTypePacketsRxCounter.set(RESET_VALUE);
+        failJoinReqUnknownMulticastIpCounter.set(RESET_VALUE);
+        igmpGrpAndSrcSpecificMembershipQuery.set(RESET_VALUE);
+        failJoinReqInsuffPermissionAccessCounter.set(RESET_VALUE);
     }
 
-    public Long getIgmpFailJoinReq() {
-        return igmpFailJoinReq.get();
+    public void increaseStat(IgmpStatisticType type) {
+        switch (type) {
+            case IGMP_JOIN_REQ:
+                igmpJoinReq.incrementAndGet();
+                break;
+            case IGMP_LEAVE_REQ:
+                igmpLeaveReq.incrementAndGet();
+                break;
+            case IGMP_DISCONNECT:
+                igmpDisconnect.incrementAndGet();
+                break;
+            case IGMP_MSG_RECEIVED:
+                break;
+            case TOTAL_MSG_RECEIVED:
+                totalMsgReceived.incrementAndGet();
+                break;
+            case IGMP_V2_LEAVE_GROUP:
+                igmpv2LeaveGroup.incrementAndGet();
+                igmpMsgReceived.incrementAndGet();
+                break;
+            case INVALID_IGMP_LENGTH:
+                invalidIgmpLength.incrementAndGet();
+                break;
+            case IGMP_V3_MEMBERSHIP_QUERY:
+                igmpv3MembershipQuery.incrementAndGet();
+                igmpMsgReceived.incrementAndGet();
+                break;
+            case IGMP_CHANNEL_JOIN_COUNTER:
+                igmpChannelJoinCounter.incrementAndGet();
+                break;
+            case IGMP_V1_MEMBERSHIP_REPORT:
+                igmpv1MembershipReport.incrementAndGet();
+                igmpMsgReceived.incrementAndGet();
+                break;
+            case IGMP_V2_MEMBERSHIP_REPORT:
+                igmpv2MembershipReport.incrementAndGet();
+                igmpMsgReceived.incrementAndGet();
+                break;
+            case IGMP_V3_MEMBERSHIP_REPORT:
+                igmpv3MembershipReport.incrementAndGet();
+                igmpMsgReceived.incrementAndGet();
+                break;
+            case INVALID_IGMP_MSG_RECEIVED:
+                invalidIgmpMsgReceived.incrementAndGet();
+                break;
+            case VALID_IGMP_PACKET_COUNTER:
+                validIgmpPacketCounter.incrementAndGet();
+                break;
+            case CURRENT_GRP_NUMBER_COUNTER:
+                currentGrpNumCounter.incrementAndGet();
+                break;
+            case IGMP_FAIL_JOIN_REQ:
+                igmpFailJoinReq.incrementAndGet();
+                break;
+            case UNCONFIGURED_GROUP_COUNTER:
+                unconfiguredGroupCounter.incrementAndGet();
+                break;
+            case IGMP_VALID_CHECKSUM_COUNTER:
+                igmpValidChecksumCounter.incrementAndGet();
+                break;
+            case IGMP_GENERAL_MEMBERSHIP_QUERY:
+                igmpGeneralMembershipQuery.incrementAndGet();
+                break;
+            case IGMP_SUCCESS_JOIN_RE_JOIN_REQ:
+                igmpSuccessJoinRejoinReq.incrementAndGet();
+                break;
+            case IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY:
+                igmpGrpSpecificMembershipQuery.incrementAndGet();
+                break;
+            case REPORTS_RX_WITH_WRONG_MODE_COUNTER:
+                reportsRxWithWrongModeCounter.incrementAndGet();
+                break;
+            case UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER:
+                unknownIgmpTypePacketsRxCounter.incrementAndGet();
+                break;
+            case FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER:
+                failJoinReqUnknownMulticastIpCounter.incrementAndGet();
+                break;
+            case IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY:
+                igmpGrpAndSrcSpecificMembershipQuery.incrementAndGet();
+                break;
+            case FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER:
+                failJoinReqInsuffPermissionAccessCounter.incrementAndGet();
+                break;
+            default:
+                log.warn("Unhandled statistic type. {}", type);
+                break;
+        }
     }
 
-    public Long getIgmpLeaveReq() {
-        return igmpLeaveReq.get();
+    public Long getStat(IgmpStatisticType type) {
+        Long value;
+        switch (type) {
+            case IGMP_JOIN_REQ:
+                value = igmpJoinReq.get();
+                break;
+            case IGMP_LEAVE_REQ:
+                value = igmpLeaveReq.get();
+                break;
+            case IGMP_DISCONNECT:
+                value = igmpDisconnect.get();
+                break;
+            case IGMP_MSG_RECEIVED:
+                value = igmpMsgReceived.get();
+                break;
+            case TOTAL_MSG_RECEIVED:
+                value = totalMsgReceived.get();
+                break;
+            case IGMP_V2_LEAVE_GROUP:
+                value = igmpv2LeaveGroup.get();
+                break;
+            case INVALID_IGMP_LENGTH:
+                value = invalidIgmpLength.get();
+                break;
+            case IGMP_V3_MEMBERSHIP_QUERY:
+                value = igmpv3MembershipQuery.get();
+                break;
+            case IGMP_CHANNEL_JOIN_COUNTER:
+                value = igmpChannelJoinCounter.get();
+                break;
+            case IGMP_V1_MEMBERSHIP_REPORT:
+                value = igmpv1MembershipReport.get();
+                break;
+            case IGMP_V2_MEMBERSHIP_REPORT:
+                value = igmpv2MembershipReport.get();
+                break;
+            case IGMP_V3_MEMBERSHIP_REPORT:
+                value = igmpv3MembershipReport.get();
+                break;
+            case INVALID_IGMP_MSG_RECEIVED:
+                value = invalidIgmpMsgReceived.get();
+                break;
+            case VALID_IGMP_PACKET_COUNTER:
+                value = validIgmpPacketCounter.get();
+                break;
+            case CURRENT_GRP_NUMBER_COUNTER:
+                value = currentGrpNumCounter.get();
+                break;
+            case IGMP_FAIL_JOIN_REQ:
+                value = igmpFailJoinReq.get();
+                break;
+            case UNCONFIGURED_GROUP_COUNTER:
+                value = unconfiguredGroupCounter.get();
+                break;
+            case IGMP_VALID_CHECKSUM_COUNTER:
+                value = igmpValidChecksumCounter.get();
+                break;
+            case IGMP_GENERAL_MEMBERSHIP_QUERY:
+                value = igmpGeneralMembershipQuery.get();
+                break;
+            case IGMP_SUCCESS_JOIN_RE_JOIN_REQ:
+                value = igmpSuccessJoinRejoinReq.get();
+                break;
+            case IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY:
+                value = igmpGrpSpecificMembershipQuery.get();
+                break;
+            case REPORTS_RX_WITH_WRONG_MODE_COUNTER:
+                value = reportsRxWithWrongModeCounter.get();
+                break;
+            case UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER:
+                value = unknownIgmpTypePacketsRxCounter.get();
+                break;
+            case FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER:
+                value = failJoinReqUnknownMulticastIpCounter.get();
+                break;
+            case IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY:
+                value = igmpGrpAndSrcSpecificMembershipQuery.get();
+                break;
+            case FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER:
+                value = failJoinReqInsuffPermissionAccessCounter.get();
+                break;
+            default:
+                value = null;
+                log.warn("Unhandled statistic type. {}", type);
+                break;
+        }
+        return value;
     }
-
-    public Long getIgmpDisconnect() {
-        return igmpDisconnect.get();
-    }
-
-    public Long getIgmpv3MembershipQuery() {
-        return igmpv3MembershipQuery.get();
-    }
-
-    public Long getIgmpv1MemershipReport() {
-        return igmpv1MembershipReport.get();
-    }
-
-    public Long getIgmpv3MembershipReport() {
-        return igmpv3MembershipReport.get();
-    }
-
-    public Long getIgmpv2MembershipReport() {
-        return igmpv2MembershipReport.get();
-    }
-
-    public Long getIgmpv2LeaveGroup() {
-        return igmpv2LeaveGroup.get();
-    }
-
-    public Long getTotalMsgReceived() {
-        return totalMsgReceived.get();
-    }
-
-    public Long getIgmpMsgReceived() {
-        return igmpMsgReceived.get();
-    }
-
-    public Long getInvalidIgmpMsgReceived() {
-        return invalidIgmpMsgReceived.get();
-    }
-
-    public void increaseIgmpJoinReq() {
-        igmpJoinReq.incrementAndGet();
-    }
-
-    public void increaseIgmpSuccessJoinRejoinReq() {
-        igmpSuccessJoinRejoinReq.incrementAndGet();
-    }
-
-    public void increaseIgmpFailJoinReq() {
-        igmpFailJoinReq.incrementAndGet();
-    }
-
-    public void increaseIgmpLeaveReq() {
-        igmpLeaveReq.incrementAndGet();
-    }
-
-    public void increaseIgmpDisconnect() {
-        igmpDisconnect.incrementAndGet();
-    }
-
-    public void increaseIgmpv3MembershipQuery() {
-        igmpv3MembershipQuery.incrementAndGet();
-        igmpMsgReceived.incrementAndGet();
-    }
-
-    public void increaseIgmpv2MembershipReport() {
-        igmpv2MembershipReport.incrementAndGet();
-        igmpMsgReceived.incrementAndGet();
-    }
-
-    public void increaseIgmpv1MembershipReport() {
-        igmpv1MembershipReport.incrementAndGet();
-        igmpMsgReceived.incrementAndGet();
-    }
-
-    public void increaseIgmpv3MembershipReport() {
-        igmpv3MembershipReport.incrementAndGet();
-        igmpMsgReceived.incrementAndGet();
-    }
-
-    public void increaseIgmpv2LeaveGroup() {
-        igmpv2LeaveGroup.incrementAndGet();
-        igmpMsgReceived.incrementAndGet();
-    }
-
-    public void increaseInvalidIgmpMsgReceived() {
-        invalidIgmpMsgReceived.incrementAndGet();
-    }
-
-    public void increaseTotalMsgReceived() {
-        totalMsgReceived.incrementAndGet();
-    }
-
-    public Long getValidIgmpPacketCounter() {
-        return validIgmpPacketCounter.get();
-    }
-
-    public void increaseValidIgmpPacketCounter() {
-        validIgmpPacketCounter.incrementAndGet();
-    }
-
-    public Long getCurrentGrpNumCounter() {
-        return currentGrpNumCounter.get();
-    }
-
-    public void increaseCurrentGrpNumCounter() {
-        currentGrpNumCounter.incrementAndGet();
-    }
-
-    public Long getIgmpChannelJoinCounter() {
-        return igmpChannelJoinCounter.get();
-    }
-    public Long getIgmpValidChecksumCounter() {
-        return igmpValidChecksumCounter.get();
-    }
-
-    public void increaseIgmpChannelJoinCounter() {
-        igmpChannelJoinCounter.incrementAndGet();
-    }
-
-    public void increaseIgmpValidChecksumCounter() {
-        igmpValidChecksumCounter.incrementAndGet();
-    }
-
-    public Long getUnconfiguredGroupCounter() {
-        return unconfiguredGroupCounter.get();
-    }
-
-    public void increaseUnconfiguredGroupCounter() {
-        unconfiguredGroupCounter.incrementAndGet();
-    }
-
-    public Long getFailJoinReqUnknownMulticastIpCounter() {
-        return failJoinReqUnknownMulticastIpCounter.get();
-    }
-
-    public void increaseFailJoinReqUnknownMulticastIpCounter() {
-        failJoinReqUnknownMulticastIpCounter.incrementAndGet();
-    }
-
-    public Long getFailJoinReqInsuffPermissionAccessCounter() {
-        return failJoinReqInsuffPermissionAccessCounter.get();
-    }
-
-    public void increaseFailJoinReqInsuffPermissionAccessCounter() {
-        failJoinReqInsuffPermissionAccessCounter.incrementAndGet();
-    }
-
-    public Long getReportsRxWithWrongModeCounter() {
-        return reportsRxWithWrongModeCounter.get();
-    }
-
-    public Long getUnknownIgmpTypePacketsRxCounter() {
-        return unknownIgmpTypePacketsRxCounter.get();
-    }
-
-    public void increaseUnknownIgmpTypePacketsRxCounter() {
-        unknownIgmpTypePacketsRxCounter.incrementAndGet();
-    }
-
-    public void increaseReportsRxWithWrongModeCounter() {
-        reportsRxWithWrongModeCounter.incrementAndGet();
-    }
-
-    public Long getInvalidIgmpLength() {
-        return invalidIgmpLength.get();
-    }
-
-    public void increaseInvalidIgmpLength() {
-        invalidIgmpLength.incrementAndGet();
-    }
-
-    public Long getIgmpGeneralMembershipQuery() {
-        return igmpGeneralMembershipQuery.get();
-    }
-
-    public Long getIgmpGrpSpecificMembershipQuery() {
-        return igmpGrpSpecificMembershipQuery.get();
-    }
-
-    public Long getIgmpGrpAndSrcSpecificMembershipQuery() {
-        return igmpGrpAndSrcSpecificMembershipQuery.get();
-    }
-
-    public void increaseIgmpGeneralMembershipQuery() {
-        igmpGeneralMembershipQuery.incrementAndGet();
-    }
-
-    public void increaseIgmpGrpSpecificMembershipQuery() {
-        igmpGrpSpecificMembershipQuery.incrementAndGet();
-    }
-
-    public void increaseIgmpGrpAndSrcSpecificMembershipQuery() {
-        igmpGrpAndSrcSpecificMembershipQuery.incrementAndGet();
-    }
-
 }
diff --git a/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java b/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
index 3574ad5..2a5b228 100644
--- a/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
+++ b/api/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
@@ -23,11 +23,25 @@
  */
 public interface IgmpStatisticsService extends
         ListenerService<IgmpStatisticsEvent, IgmpStatisticsEventListener> {
+
     /**
-     * Returns IgmpStatistics object.
+     * Increases statistic-value for given type.
      *
-     * @return IgmpStatistics
-    */
-    public IgmpStatistics getIgmpStats();
+     * @param type type of igmp-statistic.
+     */
+    void increaseStat(IgmpStatisticType type);
+
+    /**
+     * Resets all statistic-values.
+     */
+    void resetAllStats();
+
+    /**
+     * Get statistic-value for given type.
+     *
+     * @param type type of igmp-statistic
+     * @return value of statistic
+     */
+    Long getStat(IgmpStatisticType type);
 
 }
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
index 4f2e7bb..c40bca2 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
@@ -17,6 +17,7 @@
 
 import org.onosproject.cluster.ClusterService;
 import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.Leadership;
 import org.onosproject.cluster.NodeId;
 import org.onosproject.mastership.MastershipService;
 import org.onosproject.net.DeviceId;
@@ -69,4 +70,24 @@
         }
         return true;
     }
+
+    @Override
+    public NodeId getLocalNodeId() {
+        return clusterService.getLocalNode().id();
+    }
+
+    @Override
+    public NodeId getLeader(String topic) {
+        return leadershipService.getLeader(topic);
+    }
+
+    @Override
+    public Leadership runForLeadership(String topic) {
+        return leadershipService.runForLeadership(topic);
+    }
+
+    @Override
+    public void withdraw(String topic) {
+        leadershipService.withdraw(topic);
+    }
 }
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index ed3dad8..e32bc4a 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -18,6 +18,7 @@
 import com.google.common.collect.Sets;
 import org.onosproject.net.Device;
 import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatisticType;
 import org.opencord.igmpproxy.IgmpStatisticsService;
 import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
 import org.opencord.igmpproxy.GroupMemberId;
@@ -302,10 +303,10 @@
         maxResp = calculateMaxResp(maxResp);
         if (gAddr != null && !gAddr.isZero()) {
             stateMachineService.specialQuery(deviceId, gAddr, maxResp);
-            igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY);
         } else {
             stateMachineService.generalQuery(deviceId, maxResp);
-            igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY);
         }
     }
 
@@ -320,15 +321,15 @@
                 Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
                 if (accessDevice.isPresent()) {
                     stateMachineService.specialQuery(device.id(), gAddr, maxResponseTime);
-                    igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY);
                 }
             });
-            igmpStatisticsManager.getIgmpStats().increaseCurrentGrpNumCounter();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER);
         } else {
             //Don't know which group is targeted by the query
             //So query all the members(in all the OLTs) and proxy their reports
             stateMachineService.generalQuery(maxResponseTime);
-            igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY);
         }
     }
 
@@ -354,7 +355,7 @@
         Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
         if (!groupIp.isMulticast()) {
             log.info(groupIp.toString() + " is not a valid group address");
-            igmpStatisticsManager.getIgmpStats().increaseFailJoinReqUnknownMulticastIpCounter();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER);
             return;
         }
         Ip4Address srcIp = getDeviceIp(deviceId);
@@ -365,7 +366,7 @@
         ArrayList<Ip4Address> sourceList = new ArrayList<>();
 
         if (!validMembershipModes.contains(recordType)) {
-            igmpStatisticsManager.getIgmpStats().increaseReportsRxWithWrongModeCounter();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER);
         }
         if (igmpGroup.getSources().size() > 0) {
             igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
@@ -404,14 +405,15 @@
         GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
 
         if (join) {
-            igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_JOIN_REQ);
             if (groupMember == null) {
                 Optional<ConnectPoint> sourceConfigured = getSource();
                 if (!sourceConfigured.isPresent()) {
-                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ);
                     log.warn("Unable to process IGMP Join from {} since no source " +
                             "configuration is found.", deviceId);
-                    igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
+                    igmpStatisticsManager
+                            .increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER);
                     return;
                 }
 
@@ -432,10 +434,10 @@
 
                 boolean isJoined = stateMachineService.join(deviceId, groupIp, srcIp, deviceUplink.get());
                 if (isJoined) {
-                    igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
-                    igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ);
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER);
                 } else {
-                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ);
                 }
                 groupMemberStore.putGroupMember(groupMember);
                 log.debug("Group member created with id: {}", groupMember.getGroupMemberId());
@@ -449,7 +451,7 @@
                     //add sink to the route
                     multicastService.addSinks(route, Sets.newHashSet(cp));
                 });
-                igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
+                igmpStatisticsManager.increaseStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER);
 
             }
             groupMember.resetAllTimers();
@@ -458,11 +460,11 @@
             //put updated member to the store
             groupMemberStore.putGroupMember(groupMember);
         } else {
-            igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
+            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_LEAVE_REQ);
             if (groupMember == null) {
                 log.info("receive leave but no instance, group {} device: {} port:{}",
                         groupIp, deviceId, portNumber);
-                igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
+                igmpStatisticsManager.increaseStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER);
                 return;
             } else {
                 groupMember.setLeave(true);
@@ -478,7 +480,7 @@
     }
 
     private void leaveAction(GroupMember groupMember) {
-        igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
+        igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_DISCONNECT);
         ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
         stateMachineService.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
         groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
@@ -524,7 +526,7 @@
                     if (ethPkt == null) {
                         return;
                     }
-                    igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.TOTAL_MSG_RECEIVED);
 
                     if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
                         return;
@@ -536,14 +538,15 @@
                         return;
                     }
 
-                    igmpStatisticsManager.getIgmpStats().increaseIgmpValidChecksumCounter();
+                    igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER);
                     short vlan = ethPkt.getVlanID();
                     DeviceId deviceId = pkt.receivedFrom().deviceId();
 
                     if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
                             !getSubscriberAndDeviceInformation(deviceId).isPresent()) {
                         log.error("Device not registered in netcfg : {}", deviceId);
-                        igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
+                        igmpStatisticsManager
+                                .increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER);
                         return;
                     }
 
@@ -553,7 +556,7 @@
                     PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
                     switch (igmp.getIgmpType()) {
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
-                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY);
                             //Discard Query from OLT’s non-uplink port’s
                             if (!pkt.receivedFrom().port().equals(upLinkPort)) {
                                 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
@@ -574,26 +577,26 @@
                                     0xff & igmp.getMaxRespField());
                             break;
                         case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
-                            igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT);
                             log.debug("IGMP version 1  message types are not currently supported.");
                             break;
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
-                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT);
                             processIgmpMessage(pkt, igmp, upLinkPort, vlan);
                             break;
                         case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
-                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT);
                             processIgmpMessage(pkt, igmp, upLinkPort, vlan);
                             break;
                         case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
-                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP);
                             processIgmpMessage(pkt, igmp, upLinkPort, vlan);
                             break;
 
                         default:
                             log.warn("Unknown IGMP message type: {}", igmp.getIgmpType());
-                            igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
-                            igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED);
+                            igmpStatisticsManager.increaseStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER);
                             break;
                     }
 
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
index e6584ad..9bda1c9 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
@@ -33,6 +33,7 @@
 import org.onosproject.net.packet.OutboundPacket;
 import org.onosproject.net.packet.PacketService;
 import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatisticType;
 import org.opencord.igmpproxy.IgmpStatisticsService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -213,7 +214,7 @@
                 break;
             default:
                 log.debug("Unknown igmp type: {} ", type);
-                igmpStatisticsService.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
+                igmpStatisticsService.increaseStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER);
                 return null;
         }
 
@@ -280,16 +281,16 @@
         // This counter will be useful in future if we change the procedure to generate the packets.
         if ((igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT
              || igmp.getIgmpType() == IGMP.TYPE_IGMPV2_LEAVE_GROUP) && igmp.serialize().length < IGMPv2.HEADER_LENGTH) {
-                 igmpStatisticsService.getIgmpStats().increaseInvalidIgmpLength();
+                 igmpStatisticsService.increaseStat(IgmpStatisticType.INVALID_IGMP_LENGTH);
         } else if (igmp.getIgmpType() == IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT
             && igmp.serialize().length < IGMPv3.MINIMUM_HEADER_LEN) {
-                 igmpStatisticsService.getIgmpStats().increaseInvalidIgmpLength();
+                 igmpStatisticsService.increaseStat(IgmpStatisticType.INVALID_IGMP_LENGTH);
         }
         TrafficTreatment treatment = DefaultTrafficTreatment.builder()
                 .setOutput(portNumber).build();
         OutboundPacket packet = new DefaultOutboundPacket(deviceId,
                 treatment, ByteBuffer.wrap(ethPkt.serialize()));
-        igmpStatisticsService.getIgmpStats().increaseValidIgmpPacketCounter();
+        igmpStatisticsService.increaseStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER);
         packetService.emit(packet);
 
     }
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
index 21d0011..69ba6b6 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
@@ -16,6 +16,22 @@
 
 package org.opencord.igmpproxy.impl;
 
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.igmpproxy.IgmpStatisticType;
+import org.opencord.igmpproxy.IgmpStatisticsEvent;
+import org.opencord.igmpproxy.IgmpStatisticsEventListener;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatistics;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.ComponentContext;
 import org.osgi.service.component.annotations.Activate;
@@ -28,61 +44,95 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 
-import org.opencord.igmpproxy.IgmpStatistics;
-import org.opencord.igmpproxy.IgmpStatisticsService;
-import org.opencord.igmpproxy.IgmpStatisticsEvent;
-import org.opencord.igmpproxy.IgmpStatisticsEventListener;
-
 import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
 import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
 import static org.slf4j.LoggerFactory.getLogger;
 
 import java.util.Dictionary;
+import java.util.Objects;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.slf4j.Logger;
 
 import com.google.common.base.Strings;
 
 
-
 /**
- *
  * Process the stats collected in Igmp proxy application. Publish to kafka onos.
- *
  */
 @Component(immediate = true, property = {
         STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+        STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
 })
 public class IgmpStatisticsManager extends
-                 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
-                         implements IgmpStatisticsService {
+        AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
+        implements IgmpStatisticsService {
+    private static final String IGMP_STATISTICS = "igmp-statistics";
+    private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
+
     private final Logger log = getLogger(getClass());
     private IgmpStatistics igmpStats;
 
-    ScheduledExecutorService executorForIgmp;
+    private ScheduledExecutorService executorForIgmp;
     private ScheduledFuture<?> publisherTask;
+    private ScheduledFuture<?> syncTask;
 
     protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+    protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
+
+    private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
+
+    private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
+
+    private KryoNamespace statSerializer = KryoNamespace.newBuilder()
+            .register(KryoNamespaces.API)
+            .register(IgmpStatistics.class)
+            .build();
+
+    //Statistics values are valid or invalid
+    private AtomicBoolean validityCheck = new AtomicBoolean(false);
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ComponentConfigService cfgService;
 
-    @Override
-    public IgmpStatistics getIgmpStats() {
-        return igmpStats;
-    }
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected StorageService storageService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpLeadershipService leadershipManager;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ClusterCommunicationService clusterCommunicationService;
 
     @Activate
     public void activate(ComponentContext context) {
-        igmpStats = new IgmpStatistics();
+        igmpStats = getIgmpStatsInstance();
+
+
+        statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
+                .withName(IGMP_STATISTICS)
+                .withSerializer(statSerializer)
+                .withTimestampProvider((k, v) -> new WallClockTimestamp())
+                .build();
+
+        initStats(statistics.get(leadershipManager.getLocalNodeId()));
+        syncStats();
+
+        leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
 
         eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
         executorForIgmp = Executors.newScheduledThreadPool(1);
         cfgService.registerProperties(getClass());
+
+        clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
+                this::resetLocal, executorForIgmp);
+
         modified(context);
         log.info("IgmpStatisticsManager Activated");
     }
@@ -90,69 +140,175 @@
     @Modified
     public void modified(ComponentContext context) {
         Dictionary<String, Object> properties = context.getProperties();
-
         try {
             String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
             statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
-                Integer.parseInt(STATISTICS_GENERATION_PERIOD)
+                    Integer.parseInt(STATISTICS_GENERATION_PERIOD)
                     : Integer.parseInt(s.trim());
+            log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
+            statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
+                    Integer.parseInt(STATISTICS_SYNC_PERIOD)
+                    : Integer.parseInt(s.trim());
+            log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
         } catch (NumberFormatException ne) {
-            log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+            log.error("Unable to parse configuration parameter", ne);
             statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+            statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
         }
-        if (publisherTask != null) {
-            publisherTask.cancel(true);
-        }
-        publisherTask = executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
-                0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
+        stopPublishTask();
+        stopSyncTask();
+
+        startPublishTask();
+        startSyncTask();
     }
 
     @Deactivate
     public void deactivate() {
         eventDispatcher.removeSink(IgmpStatisticsEvent.class);
-        publisherTask.cancel(true);
+        stopPublishTask();
+        stopSyncTask();
         executorForIgmp.shutdown();
         cfgService.unregisterProperties(getClass(), false);
         igmpStats = null;
+        clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+        leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
         log.info("IgmpStatisticsManager Deactivated");
     }
 
+    private IgmpStatistics getIgmpStatsInstance() {
+        if (igmpStats == null) {
+            igmpStats = new IgmpStatistics();
+            log.info("Instance of igmp-statistics created.");
+        }
+        return igmpStats;
+    }
+
+    private void syncStats() {
+        if (!validityCheck.get()) {
+            //sync with valid values
+            statistics.put(leadershipManager.getLocalNodeId(), snapshot());
+            validityCheck.set(true);
+            log.debug("Valid statistic values are put.");
+        }
+    }
+
+    private void initStats(IgmpStatistics init) {
+        if (init == null) {
+            log.warn("Igmp statistics was not created.");
+            return;
+        }
+        igmpStats.setStats(init);
+    }
+
+    private IgmpStatistics snapshot() {
+        return getIgmpStatsInstance();
+    }
+
+    private void startSyncTask() {
+        syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
+        log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
+    }
+
+    private void stopSyncTask() {
+        stopTask(syncTask);
+        log.debug("Sync task stopped.");
+    }
+
+    private void startPublishTask() {
+        publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
+        log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
+    }
+
+    private void stopPublishTask() {
+        stopTask(publisherTask);
+        log.debug("Publisher task stopped.");
+    }
+
+    private ScheduledFuture<?> startTask(Runnable r, int rate) {
+        return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+                0, rate, TimeUnit.SECONDS);
+    }
+
+    private void stopTask(ScheduledFuture<?> task) {
+        if (task != null) {
+            task.cancel(true);
+        }
+    }
+
+    private void resetLocal(ClusterMessage message) {
+        //reset all-statistics
+        igmpStats.resetAll();
+        validityCheck.set(false);
+    }
+
     /**
      * Publishes stats.
      */
     private void publishStats() {
+        // Only publish events if we are the cluster leader for Igmp-stats
+        if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
+                leadershipManager.getLocalNodeId())) {
+            log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
+            return;
+        }
 
         if (log.isDebugEnabled()) {
             log.debug("Notifying stats: {}", igmpStats);
-            log.debug("--IgmpDisconnect--" + igmpStats.getIgmpDisconnect());
-            log.debug("--IgmpFailJoinReq--" + igmpStats.getIgmpFailJoinReq());
-            log.debug("--IgmpJoinReq--" + igmpStats.getIgmpJoinReq());
-            log.debug("--IgmpLeaveReq--" + igmpStats.getIgmpLeaveReq());
-            log.debug("--IgmpMsgReceived--" + igmpStats.getIgmpMsgReceived());
-            log.debug("--IgmpSuccessJoinRejoinReq--" + igmpStats.getIgmpSuccessJoinRejoinReq());
-            log.debug("--Igmpv1MemershipReport--" + igmpStats.getIgmpv1MemershipReport());
-            log.debug("--Igmpv2LeaveGroup--" + igmpStats.getIgmpv2LeaveGroup());
-            log.debug("--Igmpv2MembershipReport--" + igmpStats.getIgmpv2MembershipReport());
-            log.debug("--Igmpv3MembershipQuery--" + igmpStats.getIgmpv3MembershipQuery());
-            log.debug("--Igmpv3MembershipReport--" + igmpStats.getIgmpv3MembershipReport());
-            log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getInvalidIgmpMsgReceived());
-            log.debug("--TotalMsgReceived--  " + igmpStats.getTotalMsgReceived());
-            log.debug("--UnknownIgmpTypePacketsRx--" + igmpStats.getUnknownIgmpTypePacketsRxCounter());
-            log.debug("--ReportsRxWithWrongMode--" + igmpStats.getReportsRxWithWrongModeCounter());
-            log.debug("--FailJoinReqInsuffPermission--" + igmpStats.getFailJoinReqInsuffPermissionAccessCounter());
-            log.debug("--FailJoinReqUnknownMulticastIp--" + igmpStats.getFailJoinReqUnknownMulticastIpCounter());
-            log.debug("--UnconfiguredGroupCounter--" + igmpStats.getUnconfiguredGroupCounter());
-            log.debug("--ValidIgmpPacketCounter--" + igmpStats.getValidIgmpPacketCounter());
-            log.debug("--IgmpChannelJoinCounter--" + igmpStats.getIgmpChannelJoinCounter());
-            log.debug("--CurrentGrpNumCounter--" + igmpStats.getCurrentGrpNumCounter());
-            log.debug("--IgmpValidChecksumCounter--" + igmpStats.getIgmpValidChecksumCounter());
-            log.debug("--InvalidIgmpLength--" + igmpStats.getInvalidIgmpLength());
-            log.debug("--IgmpGeneralMembershipQuery--" + igmpStats.getIgmpGeneralMembershipQuery());
-            log.debug("--IgmpGrpSpecificMembershipQuery--" + igmpStats.getIgmpGrpSpecificMembershipQuery());
-            log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" + igmpStats.getIgmpGrpAndSrcSpecificMembershipQuery());
+            log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
+            log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
+            log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
+            log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
+            log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
+            log.debug("--IgmpSuccessJoinRejoinReq--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
+            log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
+            log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
+            log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
+            log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
+            log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
+            log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
+            log.debug("--TotalMsgReceived--  " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
+            log.debug("--UnknownIgmpTypePacketsRx--" +
+                    igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
+            log.debug("--ReportsRxWithWrongMode--" +
+                    igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
+            log.debug("--FailJoinReqInsuffPermission--" +
+                    igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
+            log.debug("--FailJoinReqUnknownMulticastIp--" +
+                    igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
+            log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
+            log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
+            log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
+            log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
+            log.debug("--IgmpValidChecksumCounter--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
+            log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
+            log.debug("--IgmpGeneralMembershipQuery--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
+            log.debug("--IgmpGrpSpecificMembershipQuery--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
+            log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
+                    igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
         }
 
         post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
     }
 
+    @Override
+    public void increaseStat(IgmpStatisticType type) {
+        igmpStats.increaseStat(type);
+        validityCheck.set(false);
+    }
+
+    @Override
+    public void resetAllStats() {
+        ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
+        clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
+                Serializer.using(statSerializer)::encode);
+    }
+
+    @Override
+    public Long getStat(IgmpStatisticType type) {
+        return igmpStats.getStat(type);
+    }
 }
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
index c99a3b7..26bc419 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
@@ -26,4 +26,7 @@
 
     public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
     public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
+
+    public static final String STATISTICS_SYNC_PERIOD = "statisticsSyncPeriodInSeconds";
+    public static final int STATISTICS_SYNC_PERIOD_DEFAULT = 10;
 }
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
index 2595911..1905ec2 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerBase.java
@@ -16,6 +16,7 @@
 package org.opencord.igmpproxy.impl;
 
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.onlab.packet.Ethernet;
 import org.onlab.packet.IGMP;
@@ -26,6 +27,9 @@
 import org.onlab.packet.VlanId;
 import org.onosproject.cfg.ComponentConfigService;
 import org.onosproject.cfg.ConfigProperty;
+import org.onosproject.cluster.Leader;
+import org.onosproject.cluster.Leadership;
+import org.onosproject.cluster.NodeId;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.event.DefaultEventSinkRegistry;
 import org.onosproject.event.Event;
@@ -295,10 +299,34 @@
     }
 
     class TestIgmpLeaderShipService implements IgmpLeadershipService {
+        private NodeId nodeId = NodeId.nodeId("test-id");
+        private Leader leader = new Leader(nodeId, 0, 0);
+        private static final String TEST_TOPIC = "igmp-statistics";
+
         @Override
         public boolean isLocalLeader(DeviceId deviceId) {
             return true;
         }
+
+        @Override
+        public NodeId getLocalNodeId() {
+            return nodeId;
+        }
+
+        @Override
+        public NodeId getLeader(String topic) {
+            return nodeId;
+        }
+
+        @Override
+        public Leadership runForLeadership(String topic) {
+            return new Leadership(TEST_TOPIC, leader, Lists.newArrayList(nodeId));
+        }
+
+        @Override
+        public void withdraw(String topic) {
+
+        }
     }
 
     class MockMastershipService extends MastershipServiceAdapter {
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
index e0daa14..3ce7760 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpManagerTest.java
@@ -24,6 +24,8 @@
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
 
 import static org.junit.Assert.*;
 
@@ -54,6 +56,9 @@
         igmpStatisticsManager = new IgmpStatisticsManager();
         igmpStatisticsManager.cfgService = new MockCfgService();
         TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+        igmpStatisticsManager.storageService = new TestStorageService();
+        igmpStatisticsManager.leadershipManager = new TestIgmpLeaderShipService();
+        igmpStatisticsManager.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
 
diff --git a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
index 4421bde..3c9f5aa 100644
--- a/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
+++ b/app/src/test/java/org/opencord/igmpproxy/impl/IgmpStatisticsTest.java
@@ -33,6 +33,9 @@
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
 
+import org.onosproject.store.cluster.messaging.ClusterCommunicationServiceAdapter;
+import org.onosproject.store.service.TestStorageService;
+import org.opencord.igmpproxy.IgmpStatisticType;
 import org.opencord.igmpproxy.IgmpStatisticsEvent;
 
 import com.google.common.collect.Lists;
@@ -73,6 +76,9 @@
         igmpStatisticsManager.cfgService = new MockCfgService();
         igmpStatisticsManager.addListener(mockListener);
         TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+        igmpStatisticsManager.storageService = new TestStorageService();
+        igmpStatisticsManager.leadershipManager = new TestIgmpLeaderShipService();
+        igmpStatisticsManager.clusterCommunicationService = new ClusterCommunicationServiceAdapter();
         igmpStatisticsManager.activate(new MockComponentContext());
         igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
         // By default - we send query messages
@@ -110,16 +116,25 @@
         }
 
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getUnconfiguredGroupCounter().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getValidIgmpPacketCounter().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpChannelJoinCounter().longValue());
-        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpLeaveReq().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpMsgReceived().longValue());
-        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpValidChecksumCounter().longValue());
+                assertEquals((long) 2, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED).longValue()));
+        assertEquals(1L, igmpStatisticsManager.getStat(IgmpStatisticType.IGMP_JOIN_REQ).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER).longValue());
+        assertEquals(1L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_LEAVE_REQ).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_MSG_RECEIVED).longValue());
+        assertEquals(2L, igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER).longValue());
 
     }
 
@@ -136,8 +151,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats().getFailJoinReqUnknownMulticastIpCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER).longValue()));
     }
 
     //Test Igmp Query Statistics.
@@ -157,12 +172,11 @@
                 IgmpSender.getInstance().buildIgmpV3Query(Ip4Address.valueOf(0), SOURCE_IP_OF_A, VlanId.MAX_VLAN);
         sendPacket(igmpv3MembershipQueryPkt1);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals(igmpStatisticsManager.getIgmpStats()
-                        .getIgmpGrpAndSrcSpecificMembershipQuery().longValue(), 1));
-        assertEquals(igmpStatisticsManager.getIgmpStats()
-                .getIgmpGeneralMembershipQuery().longValue(), 1);
-        assertEquals(igmpStatisticsManager.getIgmpStats()
-                .getCurrentGrpNumCounter().longValue(), 1);
+                assertEquals(igmpStatisticsManager
+                        .getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY).longValue(), 1));
+        assertEquals(igmpStatisticsManager
+                .getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY).longValue(), 1);
+        assertEquals(igmpStatisticsManager.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER).longValue(), 1);
     }
 
     //Test Events
@@ -193,8 +207,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats().getReportsRxWithWrongModeCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER).longValue()));
     }
 
     //Test packet with Unknown IGMP type.
@@ -209,8 +223,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats().getUnknownIgmpTypePacketsRxCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER).longValue()));
     }
 
     //Test packet with Insufficient Permission.
@@ -226,9 +240,8 @@
         // Sending first packet
         sendPacket(firstPacket);
         assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
-                assertEquals((long) 1,
-                        igmpStatisticsManager.getIgmpStats()
-                                .getFailJoinReqInsuffPermissionAccessCounter().longValue()));
+                assertEquals((long) 1, igmpStatisticsManager
+                        .getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER).longValue()));
     }
 
     public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {