[SEBA-41] Operational Status IGMP Data
Change-Id: I8e3d9bdfafe61d7d357dc1fae8735ed78b9a77eb
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index bb134d5..a42d41b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -162,6 +162,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected SadisService sadisService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpStatisticsService igmpStatisticsManager;
+
private IgmpPacketProcessor processor = new IgmpPacketProcessor();
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
@@ -214,7 +217,6 @@
configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
subsService = sadisService.getSubscriberInfoService();
-
if (connectPointMode) {
provisionConnectPointFlows();
} else {
@@ -230,7 +232,6 @@
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
"events-igmp-%d", log));
-
log.info("Started");
}
@@ -246,7 +247,6 @@
deviceService.removeListener(deviceListener);
packetService.removeProcessor(processor);
flowRuleService.removeFlowRulesById(appId);
-
log.info("Stopped");
}
@@ -361,9 +361,11 @@
GroupMember groupMember = groupMemberMap.get(groupMemberKey);
if (join) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
if (groupMember == null) {
Optional<ConnectPoint> sourceConfigured = getSource();
if (!sourceConfigured.isPresent()) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
log.warn("Unable to process IGMP Join from {} since no source " +
"configuration is found.", deviceId);
return;
@@ -384,7 +386,12 @@
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+ boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+ if (isJoined) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
+ } else {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+ }
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
@@ -402,6 +409,7 @@
groupMember.updateList(recordType, sourceList);
groupMember.setLeave(false);
} else {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
if (groupMember == null) {
log.info("receive leave but no instance, group " + groupIp.toString() +
" device:" + deviceId.toString() + " port:" + portNumber.toString());
@@ -418,6 +426,7 @@
}
private void leaveAction(GroupMember groupMember) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
@@ -451,6 +460,7 @@
private class IgmpPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
+
eventExecutor.execute(() -> {
try {
InboundPacket pkt = context.inPacket();
@@ -458,6 +468,7 @@
if (ethPkt == null) {
return;
}
+ igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
return;
@@ -484,6 +495,7 @@
PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
switch (igmp.getIgmpType()) {
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
//Discard Query from OLT’s non-uplink port’s
if (!pkt.receivedFrom().port().equals(upLinkPort)) {
if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
@@ -504,49 +516,61 @@
0xff & igmp.getMaxRespField());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
log.debug("IGMP version 1 message types are not currently supported.");
break;
case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+ processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+ break;
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+ processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+ break;
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
- //Discard join/leave from OLT’s uplink port’s
- if (pkt.receivedFrom().port().equals(upLinkPort) ||
- isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.info("IGMP Picked up join/leave from uplink/connectPoint port");
- return;
- }
-
- Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
- while (itr.hasNext()) {
- IGMPGroup group = itr.next();
- if (group instanceof IGMPMembership) {
- processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
- } else if (group instanceof IGMPQuery) {
- IGMPMembership mgroup;
- mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
- mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE :
- IGMPMembership.MODE_IS_INCLUDE);
- processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
- }
- }
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+ processIgmpMessage(pkt, igmp, upLinkPort, vlan);
break;
default:
- log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+ log.warn("wrong IGMP v3 type:" + igmp.getIgmpType());
+ igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
break;
}
} catch (Exception ex) {
log.error("igmp process error : {} ", ex);
- ex.printStackTrace();
}
});
}
}
+ private void processIgmpMessage(InboundPacket pkt, IGMP igmp, PortNumber upLinkPort, short vlan) {
+ //Discard join/leave from OLT’s uplink port’s
+ if (pkt.receivedFrom().port().equals(upLinkPort) ||
+ isConnectPoint(pkt.receivedFrom().deviceId(), pkt.receivedFrom().port())) {
+ log.info("IGMP Picked up join/leave from uplink/connectPoint port");
+ return;
+ }
+
+ Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+ while (itr.hasNext()) {
+ IGMPGroup group = itr.next();
+ if (group instanceof IGMPMembership) {
+ processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ } else {
+ IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+ mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
+ processIgmpReport(mgroup, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ }
+ }
+
+ }
+
private class IgmpProxyTimerTask extends TimerTask {
public void run() {
try {
@@ -953,4 +977,5 @@
}
processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
}
+
}