[SEBA-41] Operational Status IGMP Data

Change-Id: I8e3d9bdfafe61d7d357dc1fae8735ed78b9a77eb
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index bb134d5..a42d41b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -162,6 +162,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected SadisService sadisService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpStatisticsService igmpStatisticsManager;
+
     private IgmpPacketProcessor processor = new IgmpPacketProcessor();
     private Logger log = LoggerFactory.getLogger(getClass());
     private ApplicationId coreAppId;
@@ -214,7 +217,6 @@
         configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
 
         subsService = sadisService.getSubscriberInfoService();
-
         if (connectPointMode) {
             provisionConnectPointFlows();
         } else {
@@ -230,7 +232,6 @@
         scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
                                                                         "events-igmp-%d", log));
-
         log.info("Started");
     }
 
@@ -246,7 +247,6 @@
         deviceService.removeListener(deviceListener);
         packetService.removeProcessor(processor);
         flowRuleService.removeFlowRulesById(appId);
-
         log.info("Stopped");
     }
 
@@ -361,9 +361,11 @@
         GroupMember groupMember = groupMemberMap.get(groupMemberKey);
 
         if (join) {
+            igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
             if (groupMember == null) {
                 Optional<ConnectPoint> sourceConfigured = getSource();
                 if (!sourceConfigured.isPresent()) {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
                     log.warn("Unable to process IGMP Join from {} since no source " +
                                      "configuration is found.", deviceId);
                     return;
@@ -384,7 +386,12 @@
 
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                if (isJoined) {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
+                } else {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+                }
                 groupMemberMap.put(groupMemberKey, groupMember);
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
@@ -402,6 +409,7 @@
             groupMember.updateList(recordType, sourceList);
             groupMember.setLeave(false);
         } else {
+            igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
             if (groupMember == null) {
                 log.info("receive leave but no instance, group " + groupIp.toString() +
                         " device:" + deviceId.toString() + " port:" + portNumber.toString());
@@ -418,6 +426,7 @@
     }
 
     private void leaveAction(GroupMember groupMember) {
+        igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
         ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
         StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
         groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
@@ -451,6 +460,7 @@
     private class IgmpPacketProcessor implements PacketProcessor {
         @Override
         public void process(PacketContext context) {
+
             eventExecutor.execute(() -> {
                 try {
                     InboundPacket pkt = context.inPacket();
@@ -458,6 +468,7 @@
                     if (ethPkt == null) {
                         return;
                     }
+                    igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
 
                     if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
                         return;
@@ -484,6 +495,7 @@
                     PortNumber upLinkPort =  deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
                     switch (igmp.getIgmpType()) {
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
                             //Discard Query from OLT’s non-uplink port’s
                             if (!pkt.receivedFrom().port().equals(upLinkPort)) {
                                 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
@@ -504,49 +516,61 @@
                                              0xff & igmp.getMaxRespField());
                             break;
                         case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
                             log.debug("IGMP version 1  message types are not currently supported.");
                             break;
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
                         case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
                         case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
-                            //Discard join/leave from OLT’s uplink port’s
-                            if (pkt.receivedFrom().port().equals(upLinkPort) ||
-                                    isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                                log.info("IGMP Picked up join/leave from uplink/connectPoint port");
-                                return;
-                            }
-
-                            Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
-                            while (itr.hasNext()) {
-                                IGMPGroup group = itr.next();
-                                if (group instanceof IGMPMembership) {
-                                    processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
-                                                      pkt.receivedFrom(), igmp.getIgmpType());
-                                } else if (group instanceof IGMPQuery) {
-                                    IGMPMembership mgroup;
-                                    mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
-                                    mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
-                                                                 IGMPMembership.MODE_IS_EXCLUDE :
-                                                                 IGMPMembership.MODE_IS_INCLUDE);
-                                    processIgmpReport(mgroup, VlanId.vlanId(vlan),
-                                                      pkt.receivedFrom(), igmp.getIgmpType());
-                                }
-                            }
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
                             break;
 
                         default:
-                            log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            log.warn("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
                             break;
                     }
 
                 } catch (Exception ex) {
                     log.error("igmp process error : {} ", ex);
-                    ex.printStackTrace();
                 }
             });
         }
     }
 
+    private void processIgmpMessage(InboundPacket pkt, IGMP igmp, PortNumber upLinkPort, short vlan) {
+        //Discard join/leave from OLT’s uplink port’s
+        if (pkt.receivedFrom().port().equals(upLinkPort) ||
+                isConnectPoint(pkt.receivedFrom().deviceId(), pkt.receivedFrom().port())) {
+            log.info("IGMP Picked up join/leave from uplink/connectPoint port");
+            return;
+        }
+
+        Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+        while (itr.hasNext()) {
+            IGMPGroup group = itr.next();
+            if (group instanceof IGMPMembership) {
+                processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+                                  pkt.receivedFrom(), igmp.getIgmpType());
+            } else {
+                IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+                mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+                                             IGMPMembership.MODE_IS_EXCLUDE :
+                                             IGMPMembership.MODE_IS_INCLUDE);
+                processIgmpReport(mgroup, VlanId.vlanId(vlan),
+                                  pkt.receivedFrom(), igmp.getIgmpType());
+            }
+        }
+
+    }
+
     private class IgmpProxyTimerTask extends TimerTask {
         public void run() {
             try {
@@ -953,4 +977,5 @@
         }
         processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
     }
+
 }
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java b/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
new file mode 100644
index 0000000..c7c4650
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ * Records metrics for IgmpProxy application.
+ *
+ */
+public class IgmpStatistics {
+
+    //Total number of join requests
+    private AtomicLong igmpJoinReq = new AtomicLong();
+    //Total number of successful join and rejoin requests
+    private AtomicLong igmpSuccessJoinRejoinReq = new AtomicLong();
+    //Total number of failed join requests
+    private AtomicLong igmpFailJoinReq = new AtomicLong();
+    //Total number of leaves requests
+    private AtomicLong igmpLeaveReq = new AtomicLong();
+    // Total number of disconnects
+    private AtomicLong igmpDisconnect = new AtomicLong();
+    //Count of Total number of IGMPV3_MEMBERSHIP_QUERY
+    private AtomicLong igmpv3MembershipQuery = new AtomicLong();
+    //Count of IGMPV1_MEMBERSHIP_REPORT
+    private AtomicLong igmpv1MembershipReport = new AtomicLong();
+    //Count of IGMPV3_MEMBERSHIP_REPORT
+    private AtomicLong igmpv3MembershipReport = new AtomicLong();
+    //Count of IGMPV2_MEMBERSHIP_REPORT
+    private AtomicLong igmpv2MembershipReport = new AtomicLong();
+    //Count of TYPE_IGMPV2_LEAVE_GROUP
+    private AtomicLong igmpv2LeaveGroup = new AtomicLong();
+    //Total number of messages received.
+    private AtomicLong totalMsgReceived = new AtomicLong();
+    //Total number of IGMP messages received
+    private AtomicLong igmpMsgReceived = new AtomicLong();
+    //Total number of invalid IGMP messages received
+    private AtomicLong invalidIgmpMsgReceived = new AtomicLong();
+
+    public Long getIgmpJoinReq() {
+        return igmpJoinReq.get();
+    }
+
+    public Long getIgmpSuccessJoinRejoinReq() {
+        return igmpSuccessJoinRejoinReq.get();
+    }
+
+    public Long getIgmpFailJoinReq() {
+        return igmpFailJoinReq.get();
+    }
+
+    public Long getIgmpLeaveReq() {
+        return igmpLeaveReq.get();
+    }
+
+    public Long getIgmpDisconnect() {
+        return igmpDisconnect.get();
+    }
+
+    public Long getIgmpv3MembershipQuery() {
+        return igmpv3MembershipQuery.get();
+    }
+
+    public Long getIgmpv1MemershipReport() {
+        return igmpv1MembershipReport.get();
+    }
+
+    public Long getIgmpv3MembershipReport() {
+        return igmpv3MembershipReport.get();
+    }
+
+    public Long getIgmpv2MembershipReport() {
+        return igmpv2MembershipReport.get();
+    }
+
+    public Long getIgmpv2LeaveGroup() {
+        return igmpv2LeaveGroup.get();
+    }
+
+    public Long getTotalMsgReceived() {
+        return totalMsgReceived.get();
+    }
+
+    public Long getIgmpMsgReceived() {
+        return igmpMsgReceived.get();
+    }
+
+    public Long getInvalidIgmpMsgReceived() {
+        return invalidIgmpMsgReceived.get();
+    }
+
+    public void increaseIgmpJoinReq() {
+        igmpJoinReq.incrementAndGet();
+    }
+
+    public void increaseIgmpSuccessJoinRejoinReq() {
+        igmpSuccessJoinRejoinReq.incrementAndGet();
+    }
+
+    public void increaseIgmpFailJoinReq() {
+        igmpFailJoinReq.incrementAndGet();
+    }
+
+    public void increaseIgmpLeaveReq() {
+        igmpLeaveReq.incrementAndGet();
+    }
+
+    public void increaseIgmpDisconnect() {
+        igmpDisconnect.incrementAndGet();
+    }
+
+    public void increaseIgmpv3MembershipQuery() {
+        igmpv3MembershipQuery.incrementAndGet();
+        igmpMsgReceived.incrementAndGet();
+    }
+
+    public void increaseIgmpv2MembershipReport() {
+        igmpv2MembershipReport.incrementAndGet();
+        igmpMsgReceived.incrementAndGet();
+    }
+
+    public void increaseIgmpv1MembershipReport() {
+        igmpv1MembershipReport.incrementAndGet();
+        igmpMsgReceived.incrementAndGet();
+    }
+
+    public void increaseIgmpv3MembershipReport() {
+        igmpv3MembershipReport.incrementAndGet();
+        igmpMsgReceived.incrementAndGet();
+    }
+
+    public void increaseIgmpv2LeaveGroup() {
+        igmpv2LeaveGroup.incrementAndGet();
+        igmpMsgReceived.incrementAndGet();
+    }
+
+    public void increaseInvalidIgmpMsgReceived() {
+        invalidIgmpMsgReceived.incrementAndGet();
+    }
+
+    public void increaseTotalMsgReceived() {
+        totalMsgReceived.incrementAndGet();
+    }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEvent.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEvent.java
new file mode 100644
index 0000000..6717336
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.onosproject.event.AbstractEvent;
+
+
+/**
+ * Event indicating the Statistics Data of IGMP.
+ */
+public class IgmpStatisticsEvent extends
+                           AbstractEvent<IgmpStatisticsEvent.Type, IgmpStatistics> {
+    /**
+     * Statistics data.
+     * IgmpStatisticsEvent event type.
+     */
+    public enum Type {
+        /**
+         * signifies that the IGMP Statistics Event stats has been updated.
+         */
+        STATS_UPDATE
+    }
+
+    public IgmpStatisticsEvent(Type type, IgmpStatistics stats) {
+        super(type, stats);
+    }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEventListener.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEventListener.java
new file mode 100644
index 0000000..7bdf8e8
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener for accounting events.
+ */
+public interface IgmpStatisticsEventListener extends
+        EventListener<IgmpStatisticsEvent> {
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsManager.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsManager.java
new file mode 100644
index 0000000..a30c5e8
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsManager.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.event.AbstractListenerManager;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Dictionary;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+
+import static org.opencord.igmpproxy.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.igmpproxy.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+
+
+/**
+ *
+ * Process the stats collected in Igmp proxy application. Publish to kafka onos.
+ *
+ */
+@Component(immediate = true, property = {
+        STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+})
+public class IgmpStatisticsManager extends
+                 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
+                         implements IgmpStatisticsService {
+    private final Logger log = getLogger(getClass());
+    private IgmpStatistics igmpStats;
+
+    ScheduledExecutorService executorForIgmp;
+    private ScheduledFuture<?> publisherTask;
+
+    protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected ComponentConfigService cfgService;
+
+    @Override
+    public IgmpStatistics getIgmpStats() {
+        return igmpStats;
+    }
+
+    @Activate
+    public void activate(ComponentContext context) {
+        igmpStats = new IgmpStatistics();
+
+        eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
+        executorForIgmp = Executors.newScheduledThreadPool(1);
+        cfgService.registerProperties(getClass());
+        modified(context);
+        log.info("IgmpStatisticsManager Activated");
+    }
+
+    @Modified
+    public void modified(ComponentContext context) {
+        Dictionary<String, Object> properties = context.getProperties();
+
+        try {
+            String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
+            statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
+                Integer.parseInt(STATISTICS_GENERATION_PERIOD)
+                    : Integer.parseInt(s.trim());
+        } catch (NumberFormatException ne) {
+            log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+            statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+        }
+        if (publisherTask != null) {
+            publisherTask.cancel(true);
+        }
+        publisherTask = executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
+                0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
+    }
+
+    @Deactivate
+    public void deactivate() {
+        eventDispatcher.removeSink(IgmpStatisticsEvent.class);
+        publisherTask.cancel(true);
+        executorForIgmp.shutdown();
+        cfgService.unregisterProperties(getClass(), false);
+        igmpStats = null;
+        log.info("IgmpStatisticsManager Deactivated");
+    }
+
+    /**
+     * Publishes stats.
+     */
+    private void publishStats() {
+
+        if (log.isDebugEnabled()) {
+            log.debug("Notifying stats: {}", igmpStats);
+            log.debug("--IgmpDisconnect--" + igmpStats.getIgmpDisconnect());
+            log.debug("--IgmpFailJoinReq--" + igmpStats.getIgmpFailJoinReq());
+            log.debug("--IgmpJoinReq--" + igmpStats.getIgmpJoinReq());
+            log.debug("--IgmpLeaveReq--" + igmpStats.getIgmpLeaveReq());
+            log.debug("--IgmpMsgReceived--" + igmpStats.getIgmpMsgReceived());
+            log.debug("--IgmpSuccessJoinRejoinReq--" + igmpStats.getIgmpSuccessJoinRejoinReq());
+            log.debug("--Igmpv1MemershipReport--" + igmpStats.getIgmpv1MemershipReport());
+            log.debug("--Igmpv2LeaveGroup--" + igmpStats.getIgmpv2LeaveGroup());
+            log.debug("--Igmpv2MembershipReport--" + igmpStats.getIgmpv2MembershipReport());
+            log.debug("--Igmpv3MembershipQuery--" + igmpStats.getIgmpv3MembershipQuery());
+            log.debug("--Igmpv3MembershipReport--" + igmpStats.getIgmpv3MembershipReport());
+            log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getInvalidIgmpMsgReceived());
+            log.debug("--TotalMsgReceived--  " + igmpStats.getTotalMsgReceived());
+        }
+
+        post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
+    }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
new file mode 100644
index 0000000..3574ad5
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.onosproject.event.ListenerService;
+
+/**
+ * Service for interacting with accounting module.
+ */
+public interface IgmpStatisticsService extends
+        ListenerService<IgmpStatisticsEvent, IgmpStatisticsEventListener> {
+    /**
+     * Returns IgmpStatistics object.
+     *
+     * @return IgmpStatistics
+    */
+    public IgmpStatistics getIgmpStats();
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/OsgiPropertyConstants.java b/src/main/java/org/opencord/igmpproxy/OsgiPropertyConstants.java
new file mode 100644
index 0000000..80d5cf0
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/OsgiPropertyConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+    private OsgiPropertyConstants() {
+    }
+
+    public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
+    public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
+
+}
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
index 43cc24a..05581ed 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
@@ -20,7 +20,13 @@
 import org.onlab.packet.Ip4Address;
 import org.onlab.packet.IpAddress;
 import org.onlab.packet.MacAddress;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
 import org.onosproject.core.ApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
 import org.onosproject.mastership.MastershipServiceAdapter;
 import org.onosproject.mcast.api.McastListener;
 import org.onosproject.mcast.api.McastRoute;
@@ -57,9 +63,18 @@
 import org.opencord.sadis.SubscriberAndDeviceInformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+
+import static com.google.common.base.Preconditions.checkState;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
@@ -92,6 +107,8 @@
     protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
     protected String dsBpId = "HSIA-DS";
 
+    private static final int STATISTICS_GEN_PERIOD_IN_SEC = 2;
+
     private static final String NNI_PREFIX = "nni";
 
     protected List<Port> lsPorts = new ArrayList<Port>();
@@ -191,6 +208,7 @@
 
      class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
          Boolean igmpOnPodFlag = false;
+
          TestNetworkConfigRegistry(Boolean igmpFlag) {
              igmpOnPodFlag = igmpFlag;
          }
@@ -377,28 +395,36 @@
     }
 
     /**
-     * Sends an Ethernet packet to the process method of the Packet Processor.
+     * Sends Ethernet packet to the process method of the Packet Processor.
      *
      * @param reply Ethernet packet
      * @throws InterruptedException
      */
-   void sendPacket(Ethernet reply) {
+   void sendPacket(Ethernet reply, boolean isSingleSend) {
 
         final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
 
-        if (flagForPacket) {
-            InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
-            context = new TestPacketContext(127L, inPacket, null, false);
-            flagForPacket = false;
+        if (isSingleSend) {
+            InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+            context = new TestPacketContext(127L, inBoundPacket, null, false);
 
             packetProcessor.process(context);
         } else {
-            InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
-            context = new TestPacketContext(127L, inBoundPacket, null, false);
-            flagForPacket = true;
+            if (flagForPacket) {
+                InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+                context = new TestPacketContext(127L, inPacket, null, false);
+                flagForPacket = false;
 
-            packetProcessor.process(context);
+                packetProcessor.process(context);
+            } else {
+                InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+                context = new TestPacketContext(127L, inBoundPacket, null, false);
+                flagForPacket = true;
+
+                packetProcessor.process(context);
+            }
         }
+
    }
 
     protected class MockSadisService implements SadisService {
@@ -484,4 +510,134 @@
         }
     }
 
+    protected class MockCfgService implements ComponentConfigService {
+
+        @Override
+        public Set<String> getComponentNames() {
+            return null;
+        }
+
+        @Override
+        public void registerProperties(Class<?> componentClass) {
+
+        }
+
+        @Override
+        public void unregisterProperties(Class<?> componentClass, boolean clear) {
+
+        }
+
+        @Override
+        public Set<ConfigProperty> getProperties(String componentName) {
+            return null;
+        }
+
+        @Override
+        public void setProperty(String componentName, String name, String value) {
+
+        }
+
+        @Override
+        public void preSetProperty(String componentName, String name, String value) {
+
+        }
+
+        @Override
+        public void preSetProperty(String componentName, String name, String value, boolean override) {
+
+        }
+
+        @Override
+        public void unsetProperty(String componentName, String name) {
+
+        }
+
+        @Override
+        public ConfigProperty getProperty(String componentName, String attribute) {
+            return null;
+        }
+
+    }
+
+    public static class TestEventDispatcher extends DefaultEventSinkRegistry implements EventDeliveryService {
+
+        @Override
+        @SuppressWarnings("unchecked")
+        public synchronized void post(Event event) {
+            EventSink sink = getSink(event.getClass());
+            checkState(sink != null, "No sink for event %s", event);
+            sink.process(event);
+        }
+
+        @Override
+        public void setDispatchTimeLimit(long millis) {
+        }
+
+        @Override
+        public long getDispatchTimeLimit() {
+            return 0;
+        }
+    }
+
+    class MockComponentContext implements ComponentContext {
+
+        @Override
+        public Dictionary<String, Object> getProperties() {
+            Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+            cfgDict.put("statisticsGenerationPeriodInSeconds", STATISTICS_GEN_PERIOD_IN_SEC);
+            return cfgDict;
+        }
+
+        @Override
+        public Object locateService(String name) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Object locateService(String name, ServiceReference reference) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Object[] locateServices(String name) {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public BundleContext getBundleContext() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public Bundle getUsingBundle() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public ComponentInstance getComponentInstance() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+
+        @Override
+        public void enableComponent(String name) {
+            // TODO Auto-generated method stub
+        }
+
+        @Override
+         public void disableComponent(String name) {
+             // TODO Auto-generated method stub
+         }
+
+         @Override
+         public ServiceReference getServiceReference() {
+             // TODO Auto-generated method stub
+             return null;
+         }
+    }
 }
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
index 21a83fe..b77a09c 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
@@ -18,19 +18,26 @@
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.onlab.junit.TestUtils;
 import org.onlab.packet.Ethernet;
 import org.onosproject.core.CoreServiceAdapter;
 import org.onosproject.net.flow.FlowRuleServiceAdapter;
 import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.opencord.igmpproxy.IgmpManagerBase.MockComponentContext;
 
 import static org.junit.Assert.*;
 
+/**
+ * Set of tests of the ONOS application component.
+ */
 public class IgmpManagerTest extends IgmpManagerBase {
 
     private static final int WAIT_TIMEOUT = 1000;
 
     private IgmpManager igmpManager;
 
+    private IgmpStatisticsManager igmpStatisticsManager;
+
     // Set up the IGMP application.
     @Before
     public void setUp() {
@@ -43,6 +50,11 @@
         igmpManager.flowRuleService = new FlowRuleServiceAdapter();
         igmpManager.multicastService = new TestMulticastRouteService();
         igmpManager.sadisService = new MockSadisService();
+        igmpStatisticsManager = new IgmpStatisticsManager();
+        igmpStatisticsManager.cfgService = new MockCfgService();
+        TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+        igmpStatisticsManager.activate(new MockComponentContext());
+        igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
         // By default - we send query messages
         SingleStateMachine.sendQuery = true;
     }
@@ -84,16 +96,17 @@
         Ethernet firstPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_A);
         Ethernet secondPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_B);
         // Sending first packet and here shouldSendjoin flag will be true
-        sendPacket(firstPacket);
+        sendPacket(firstPacket, false);
         // Emitted packet is stored in list savedPackets
         assertNotNull(savedPackets);
         synchronized (savedPackets) {
             savedPackets.wait(WAIT_TIMEOUT);
         }
+
         assertNotNull(savedPackets);
         assertEquals(1, savedPackets.size());
         // Sending the second packet with same group ip address
-        sendPacket(secondPacket);
+        sendPacket(secondPacket, false);
         synchronized (savedPackets) {
             savedPackets.wait(WAIT_TIMEOUT);
         }
@@ -113,7 +126,7 @@
         Ethernet firstPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_A);
         Ethernet secondPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_B);
         // Sending first packet and here shouldSendjoin flag will be true
-        sendPacket(firstPacket);
+        sendPacket(firstPacket, false);
         // Emitted packet is stored in list savedPackets
         synchronized (savedPackets) {
           savedPackets.wait(WAIT_TIMEOUT);
@@ -122,10 +135,11 @@
         assertEquals(1, savedPackets.size());
         // Sending the second packet with same group ip address which will not be emitted
         // shouldSendJoin flag will be false.
-        sendPacket(secondPacket);
+        sendPacket(secondPacket, false);
         synchronized (savedPackets) {
             savedPackets.wait(WAIT_TIMEOUT);
         }
         assertEquals(1, savedPackets.size());
     }
+
 }
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpStatisticsTest.java b/src/test/java/org/opencord/igmpproxy/IgmpStatisticsTest.java
new file mode 100644
index 0000000..e1e4f35
--- /dev/null
+++ b/src/test/java/org/opencord/igmpproxy/IgmpStatisticsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.TestTools.assertAfter;
+
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.packet.Ethernet;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.net.flow.FlowRuleServiceAdapter;
+import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Set of tests of the ONOS application component for IGMP Statistics.
+ */
+public class IgmpStatisticsTest extends IgmpManagerBase {
+
+    private static final int WAIT_TIMEOUT = 500;
+
+    private IgmpManager igmpManager;
+
+    private IgmpStatisticsManager igmpStatisticsManager;
+
+    private MockIgmpStatisticsEventListener mockListener = new MockIgmpStatisticsEventListener();
+
+    // Set up the IGMP application.
+    @Before
+    public void setUp() {
+        igmpManager = new IgmpManager();
+        igmpManager.coreService = new CoreServiceAdapter();
+        igmpManager.mastershipService = new MockMastershipService();
+        igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
+        igmpManager.deviceService = new MockDeviceService();
+        igmpManager.packetService = new MockPacketService();
+        igmpManager.flowRuleService = new FlowRuleServiceAdapter();
+        igmpManager.multicastService = new TestMulticastRouteService();
+        igmpManager.sadisService = new MockSadisService();
+        igmpStatisticsManager = new IgmpStatisticsManager();
+        igmpStatisticsManager.cfgService = new MockCfgService();
+        igmpStatisticsManager.addListener(mockListener);
+        TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+        igmpStatisticsManager.activate(new MockComponentContext());
+        igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
+        // By default - we send query messages
+        SingleStateMachine.sendQuery = true;
+    }
+
+    // Tear Down the IGMP application.
+    @After
+    public void tearDown() {
+        igmpStatisticsManager.removeListener(mockListener);
+        igmpStatisticsManager.deactivate();
+        IgmpManager.groupMemberMap.clear();
+        StateMachine.clearMap();
+    }
+
+    //Test Igmp Statistics.
+    @Test
+    public void testIgmpStatistics() throws InterruptedException {
+        igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
+        igmpManager.activate();
+        //IGMPv3 Join
+        Ethernet igmpv3MembershipReportPkt = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_A);
+        sendPacket(igmpv3MembershipReportPkt, true);
+        synchronized (savedPackets) {
+            savedPackets.wait(WAIT_TIMEOUT);
+        }
+        //Leave
+        Ethernet igmpv3LeavePkt = IgmpSender.getInstance().buildIgmpV3Leave(GROUP_IP, SOURCE_IP_OF_A);
+        sendPacket(igmpv3LeavePkt, true);
+        synchronized (savedPackets) {
+            savedPackets.wait(WAIT_TIMEOUT);
+        }
+
+        assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+            assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
+        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
+        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
+        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
+
+        assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpLeaveReq().longValue());
+        assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpMsgReceived().longValue());
+
+    }
+
+    //Test Events
+    @Test
+    public void testIgmpStatisticsEvent() {
+        final int waitEventGeneration = igmpStatisticsManager.statisticsGenerationPeriodInSeconds * 1000;
+        //assert that event listened as the app activates
+        assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+            assertEquals(mockListener.events.size(), 1));
+
+        assertAfter(waitEventGeneration / 2, waitEventGeneration, () ->
+            assertEquals(mockListener.events.size(), 2));
+
+        for (IgmpStatisticsEvent event : mockListener.events) {
+            assertEquals(event.type(), IgmpStatisticsEvent.Type.STATS_UPDATE);
+        }
+    }
+
+    public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {
+        protected List<IgmpStatisticsEvent> events = Lists.newArrayList();
+
+        @Override
+        public void event(IgmpStatisticsEvent event) {
+            events.add(event);
+        }
+
+    }
+}