[SEBA-41] Operational Status IGMP Data

Change-Id: I8e3d9bdfafe61d7d357dc1fae8735ed78b9a77eb
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index bb134d5..a42d41b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -162,6 +162,9 @@
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected SadisService sadisService;
 
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
+    protected IgmpStatisticsService igmpStatisticsManager;
+
     private IgmpPacketProcessor processor = new IgmpPacketProcessor();
     private Logger log = LoggerFactory.getLogger(getClass());
     private ApplicationId coreAppId;
@@ -214,7 +217,6 @@
         configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
 
         subsService = sadisService.getSubscriberInfoService();
-
         if (connectPointMode) {
             provisionConnectPointFlows();
         } else {
@@ -230,7 +232,6 @@
         scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
         eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
                                                                         "events-igmp-%d", log));
-
         log.info("Started");
     }
 
@@ -246,7 +247,6 @@
         deviceService.removeListener(deviceListener);
         packetService.removeProcessor(processor);
         flowRuleService.removeFlowRulesById(appId);
-
         log.info("Stopped");
     }
 
@@ -361,9 +361,11 @@
         GroupMember groupMember = groupMemberMap.get(groupMemberKey);
 
         if (join) {
+            igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
             if (groupMember == null) {
                 Optional<ConnectPoint> sourceConfigured = getSource();
                 if (!sourceConfigured.isPresent()) {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
                     log.warn("Unable to process IGMP Join from {} since no source " +
                                      "configuration is found.", deviceId);
                     return;
@@ -384,7 +386,12 @@
 
                 HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
 
-                StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+                if (isJoined) {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
+                } else {
+                    igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+                }
                 groupMemberMap.put(groupMemberKey, groupMember);
                 groupMember.updateList(recordType, sourceList);
                 groupMember.getSourceList().forEach(source -> {
@@ -402,6 +409,7 @@
             groupMember.updateList(recordType, sourceList);
             groupMember.setLeave(false);
         } else {
+            igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
             if (groupMember == null) {
                 log.info("receive leave but no instance, group " + groupIp.toString() +
                         " device:" + deviceId.toString() + " port:" + portNumber.toString());
@@ -418,6 +426,7 @@
     }
 
     private void leaveAction(GroupMember groupMember) {
+        igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
         ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
         StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
         groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
@@ -451,6 +460,7 @@
     private class IgmpPacketProcessor implements PacketProcessor {
         @Override
         public void process(PacketContext context) {
+
             eventExecutor.execute(() -> {
                 try {
                     InboundPacket pkt = context.inPacket();
@@ -458,6 +468,7 @@
                     if (ethPkt == null) {
                         return;
                     }
+                    igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
 
                     if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
                         return;
@@ -484,6 +495,7 @@
                     PortNumber upLinkPort =  deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
                     switch (igmp.getIgmpType()) {
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
                             //Discard Query from OLT’s non-uplink port’s
                             if (!pkt.receivedFrom().port().equals(upLinkPort)) {
                                 if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
@@ -504,49 +516,61 @@
                                              0xff & igmp.getMaxRespField());
                             break;
                         case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
                             log.debug("IGMP version 1  message types are not currently supported.");
                             break;
                         case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
                         case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+                            break;
                         case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
-                            //Discard join/leave from OLT’s uplink port’s
-                            if (pkt.receivedFrom().port().equals(upLinkPort) ||
-                                    isConnectPoint(deviceId, pkt.receivedFrom().port())) {
-                                log.info("IGMP Picked up join/leave from uplink/connectPoint port");
-                                return;
-                            }
-
-                            Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
-                            while (itr.hasNext()) {
-                                IGMPGroup group = itr.next();
-                                if (group instanceof IGMPMembership) {
-                                    processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
-                                                      pkt.receivedFrom(), igmp.getIgmpType());
-                                } else if (group instanceof IGMPQuery) {
-                                    IGMPMembership mgroup;
-                                    mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
-                                    mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
-                                                                 IGMPMembership.MODE_IS_EXCLUDE :
-                                                                 IGMPMembership.MODE_IS_INCLUDE);
-                                    processIgmpReport(mgroup, VlanId.vlanId(vlan),
-                                                      pkt.receivedFrom(), igmp.getIgmpType());
-                                }
-                            }
+                            igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+                            processIgmpMessage(pkt, igmp, upLinkPort, vlan);
                             break;
 
                         default:
-                            log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            log.warn("wrong IGMP v3 type:" + igmp.getIgmpType());
+                            igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
                             break;
                     }
 
                 } catch (Exception ex) {
                     log.error("igmp process error : {} ", ex);
-                    ex.printStackTrace();
                 }
             });
         }
     }
 
+    private void processIgmpMessage(InboundPacket pkt, IGMP igmp, PortNumber upLinkPort, short vlan) {
+        //Discard join/leave from OLT’s uplink port’s
+        if (pkt.receivedFrom().port().equals(upLinkPort) ||
+                isConnectPoint(pkt.receivedFrom().deviceId(), pkt.receivedFrom().port())) {
+            log.info("IGMP Picked up join/leave from uplink/connectPoint port");
+            return;
+        }
+
+        Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+        while (itr.hasNext()) {
+            IGMPGroup group = itr.next();
+            if (group instanceof IGMPMembership) {
+                processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+                                  pkt.receivedFrom(), igmp.getIgmpType());
+            } else {
+                IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+                mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+                                             IGMPMembership.MODE_IS_EXCLUDE :
+                                             IGMPMembership.MODE_IS_INCLUDE);
+                processIgmpReport(mgroup, VlanId.vlanId(vlan),
+                                  pkt.receivedFrom(), igmp.getIgmpType());
+            }
+        }
+
+    }
+
     private class IgmpProxyTimerTask extends TimerTask {
         public void run() {
             try {
@@ -953,4 +977,5 @@
         }
         processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
     }
+
 }