SEBA-993-Update IGMPKafkaIntegration

Change-Id: Ia8d36960b9ded9e912b0e564f70b8b5aab2613c5
diff --git a/pom.xml b/pom.xml
index 9c129d7..3edf74f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -42,7 +42,7 @@
         <dhcpl2relay.api.version>2.0.2</dhcpl2relay.api.version>
         <bng.api.version>1.0.0</bng.api.version>
         <sadis.api.version>5.0.1</sadis.api.version>
-        <igmp.api.version>2.0.0</igmp.api.version>
+        <igmp.api.version>2.1.0-SNAPSHOT</igmp.api.version>
         <mcast.api.version>2.0.0</mcast.api.version>
     </properties>
 
diff --git a/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java
index c424d67..40ebb54 100644
--- a/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/IgmpKafkaIntegration.java
@@ -19,6 +19,7 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
+import org.opencord.igmpproxy.IgmpStatisticType;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -76,31 +77,31 @@
     private static final String INVALID_IGMP_MSG_RECEIVED = "invalidIgmpMsgReceived";
 
     private static final String UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER =
-       "unknownIgmpTypePacketsRxCounter";
+            "unknownIgmpTypePacketsRxCounter";
     private static final String REPORTS_RX_WITH_WRONG_MODE_COUNTER =
-       "reportsRxWithWrongModeCounter";
+            "reportsRxWithWrongModeCounter";
     private static final String FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER =
-       "failJoinReqInsuffPermissionAccessCounter";
+            "failJoinReqInsuffPermissionAccessCounter";
     private static final String FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER =
-       "failJoinReqUnknownMulticastIpCounter";
+            "failJoinReqUnknownMulticastIpCounter";
     private static final String UNCONFIGURED_GROUP_COUNTER =
-       "unconfiguredGroupCounter";
+            "unconfiguredGroupCounter";
     private static final String VALID_IGMP_PACKET_COUNTER =
-       "validIgmpPacketCounter";
+            "validIgmpPacketCounter";
     private static final String IGMP_CHANNEL_JOIN_COUNTER =
-       "igmpChannelJoinCounter";
+            "igmpChannelJoinCounter";
     private static final String CURRENT_GRP_NUM_COUNTER =
-       "currentGrpNumCounter";
+            "currentGrpNumCounter";
     private static final String IGMP_VALID_CHECKSUM_COUNTER =
-       "igmpValidChecksumCounter";
+            "igmpValidChecksumCounter";
     private static final String INVALID_IGMP_LENGTH =
-       "invalidIgmpLength";
+            "invalidIgmpLength";
     private static final String IGMP_GENERAL_MEMBERSHIP_QUERY =
-       "igmpGeneralMembershipQuery";
+            "igmpGeneralMembershipQuery";
     private static final String IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY =
-       "igmpGrpSpecificMembershipQuery";
+            "igmpGrpSpecificMembershipQuery";
     private static final String IGMP_GRP_AND_SRC_SPECIFIC_MEMBERSHIP_QUERY =
-       "igmpGrpAndSrcSpecificMembershipQuery";
+            "igmpGrpAndSrcSpecificMembershipQuery";
 
     protected void bindIgmpStatService(IgmpStatisticsService incomingService) {
         bindAndAddListener(incomingService, igmpStatServiceRef, igmpStatisticsEventListener);
@@ -130,40 +131,63 @@
     private JsonNode serializeStat(IgmpStatisticsEvent event) {
         ObjectMapper mapper = new ObjectMapper();
         ObjectNode igmpStatEvent = mapper.createObjectNode();
-        igmpStatEvent.put(IGMP_JOIN_REQ, event.subject().getIgmpJoinReq());
-        igmpStatEvent.put(IGMP_SUCCESS_JOIN_REJOIN_REQ, event.subject().getIgmpSuccessJoinRejoinReq());
-        igmpStatEvent.put(IGMP_FAIL_JOIN_REQ, event.subject().getIgmpFailJoinReq());
-        igmpStatEvent.put(IGMP_LEAVE_REQ, event.subject().getIgmpLeaveReq());
-        igmpStatEvent.put(IGMP_DISCONNECT, event.subject().getIgmpDisconnect());
-        igmpStatEvent.put(IGMP_V3_MEMBERSHIP_QUERY, event.subject().getIgmpv3MembershipQuery());
-        igmpStatEvent.put(IGMP_V1_MEMBERSHIP_REPORT, event.subject().getIgmpv1MemershipReport());
-        igmpStatEvent.put(IGMP_V2_MEMBERSHIP_REPORT, event.subject().getIgmpv2MembershipReport());
-        igmpStatEvent.put(IGMP_V3_MEMBERSHIP_REPORT, event.subject().getIgmpv3MembershipReport());
-        igmpStatEvent.put(IGMP_V2_LEAVE_GROUP, event.subject().getIgmpv2LeaveGroup());
-        igmpStatEvent.put(TOTAL_MSG_RECEIVED, event.subject().getTotalMsgReceived());
-        igmpStatEvent.put(IGMP_MSG_RECEIVED, event.subject().getIgmpMsgReceived());
-        igmpStatEvent.put(INVALID_IGMP_MSG_RECEIVED, event.subject().getInvalidIgmpMsgReceived());
-        igmpStatEvent.put(UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER, event.subject().getUnknownIgmpTypePacketsRxCounter());
-        igmpStatEvent.put(REPORTS_RX_WITH_WRONG_MODE_COUNTER, event.subject().getReportsRxWithWrongModeCounter());
+        igmpStatEvent.put(IGMP_JOIN_REQ, event.subject()
+                .getStat(IgmpStatisticType.IGMP_JOIN_REQ));
+        igmpStatEvent.put(IGMP_SUCCESS_JOIN_REJOIN_REQ, event.subject()
+                .getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
+        igmpStatEvent.put(IGMP_FAIL_JOIN_REQ, event.subject()
+                .getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
+        igmpStatEvent.put(IGMP_LEAVE_REQ, event.subject()
+                .getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
+        igmpStatEvent.put(IGMP_DISCONNECT, event.subject()
+                .getStat(IgmpStatisticType.IGMP_DISCONNECT));
+        igmpStatEvent.put(IGMP_V3_MEMBERSHIP_QUERY, event.subject()
+                .getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
+        igmpStatEvent.put(IGMP_V1_MEMBERSHIP_REPORT, event.subject()
+                .getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
+        igmpStatEvent.put(IGMP_V2_MEMBERSHIP_REPORT, event.subject()
+                .getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
+        igmpStatEvent.put(IGMP_V3_MEMBERSHIP_REPORT, event.subject()
+                .getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
+        igmpStatEvent.put(IGMP_V2_LEAVE_GROUP, event.subject()
+                .getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
+        igmpStatEvent.put(TOTAL_MSG_RECEIVED, event.subject()
+                .getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
+        igmpStatEvent.put(IGMP_MSG_RECEIVED, event.subject()
+                .getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
+        igmpStatEvent.put(INVALID_IGMP_MSG_RECEIVED, event.subject()
+                .getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
+        igmpStatEvent.put(UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
+        igmpStatEvent.put(REPORTS_RX_WITH_WRONG_MODE_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
         igmpStatEvent.put(FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER,
-            event.subject().getFailJoinReqInsuffPermissionAccessCounter());
+                event.subject().getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
         igmpStatEvent.put(FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER,
-            event.subject().getFailJoinReqUnknownMulticastIpCounter());
-        igmpStatEvent.put(UNCONFIGURED_GROUP_COUNTER, event.subject().getUnconfiguredGroupCounter());
-        igmpStatEvent.put(VALID_IGMP_PACKET_COUNTER, event.subject().getValidIgmpPacketCounter());
-        igmpStatEvent.put(IGMP_CHANNEL_JOIN_COUNTER, event.subject().getIgmpChannelJoinCounter());
-        igmpStatEvent.put(CURRENT_GRP_NUM_COUNTER, event.subject().getCurrentGrpNumCounter());
-        igmpStatEvent.put(IGMP_VALID_CHECKSUM_COUNTER, event.subject().getIgmpValidChecksumCounter());
-        igmpStatEvent.put(INVALID_IGMP_LENGTH, event.subject().getInvalidIgmpLength());
-        igmpStatEvent.put(IGMP_GENERAL_MEMBERSHIP_QUERY, event.subject().getIgmpGeneralMembershipQuery());
-        igmpStatEvent.put(IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY, event.subject().getIgmpGrpSpecificMembershipQuery());
+                event.subject().getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
+        igmpStatEvent.put(UNCONFIGURED_GROUP_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
+        igmpStatEvent.put(VALID_IGMP_PACKET_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
+        igmpStatEvent.put(IGMP_CHANNEL_JOIN_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
+        igmpStatEvent.put(CURRENT_GRP_NUM_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
+        igmpStatEvent.put(IGMP_VALID_CHECKSUM_COUNTER, event.subject()
+                .getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
+        igmpStatEvent.put(INVALID_IGMP_LENGTH, event.subject()
+                .getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
+        igmpStatEvent.put(IGMP_GENERAL_MEMBERSHIP_QUERY, event.subject()
+                .getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
+        igmpStatEvent.put(IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY, event.subject()
+                .getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
         igmpStatEvent.put(IGMP_GRP_AND_SRC_SPECIFIC_MEMBERSHIP_QUERY,
-            event.subject().getIgmpGrpAndSrcSpecificMembershipQuery());
+                event.subject().getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
         return igmpStatEvent;
     }
 
     public class InternalIgmpStatisticsListener implements
-                              IgmpStatisticsEventListener {
+            IgmpStatisticsEventListener {
 
         @Override
         public void event(IgmpStatisticsEvent igmpStatisticsEvent) {
diff --git a/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java b/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java
index b97fc89..e721014 100644
--- a/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java
+++ b/src/test/java/org/opencord/kafka/integrations/IgmpKafkaIntegrationTest.java
@@ -19,7 +19,7 @@
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
-import org.opencord.igmpproxy.IgmpStatistics;
+import org.opencord.igmpproxy.IgmpStatisticType;
 import org.opencord.igmpproxy.IgmpStatisticsEventListener;
 import org.opencord.igmpproxy.IgmpStatisticsService;
 import org.opencord.kafka.EventBusService;
@@ -78,10 +78,6 @@
     }
 
     private class MockIgmpStatisticsService implements IgmpStatisticsService {
-        @Override
-        public IgmpStatistics getIgmpStats() {
-            return new IgmpStatistics();
-        }
 
         @Override
         public void addListener(IgmpStatisticsEventListener listener) {
@@ -92,5 +88,20 @@
         public void removeListener(IgmpStatisticsEventListener listener) {
             igmpStatisticsEventListener = null;
         }
+
+        @Override
+        public void increaseStat(IgmpStatisticType igmpStatisticType) {
+
+        }
+
+        @Override
+        public void resetAllStats() {
+
+        }
+
+        @Override
+        public Long getStat(IgmpStatisticType igmpStatisticType) {
+            return 0L;
+        }
     }
 }