[SEBA-41] Operational Status IGMP Data
Change-Id: I8e3d9bdfafe61d7d357dc1fae8735ed78b9a77eb
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpManager.java b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
index bb134d5..a42d41b 100644
--- a/src/main/java/org/opencord/igmpproxy/IgmpManager.java
+++ b/src/main/java/org/opencord/igmpproxy/IgmpManager.java
@@ -162,6 +162,9 @@
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected SadisService sadisService;
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpStatisticsService igmpStatisticsManager;
+
private IgmpPacketProcessor processor = new IgmpPacketProcessor();
private Logger log = LoggerFactory.getLogger(getClass());
private ApplicationId coreAppId;
@@ -214,7 +217,6 @@
configListener.reconfigureSsmTable(networkConfig.getConfig(appId, IGMPPROXY_SSM_CONFIG_CLASS));
subsService = sadisService.getSubscriberInfoService();
-
if (connectPointMode) {
provisionConnectPointFlows();
} else {
@@ -230,7 +232,6 @@
scheduledExecutorService.scheduleAtFixedRate(new IgmpProxyTimerTask(), 1000, 1000, TimeUnit.MILLISECONDS);
eventExecutor = newSingleThreadScheduledExecutor(groupedThreads("cord/igmpproxy",
"events-igmp-%d", log));
-
log.info("Started");
}
@@ -246,7 +247,6 @@
deviceService.removeListener(deviceListener);
packetService.removeProcessor(processor);
flowRuleService.removeFlowRulesById(appId);
-
log.info("Stopped");
}
@@ -361,9 +361,11 @@
GroupMember groupMember = groupMemberMap.get(groupMemberKey);
if (join) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
if (groupMember == null) {
Optional<ConnectPoint> sourceConfigured = getSource();
if (!sourceConfigured.isPresent()) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
log.warn("Unable to process IGMP Join from {} since no source " +
"configuration is found.", deviceId);
return;
@@ -384,7 +386,12 @@
HashSet<ConnectPoint> sourceConnectPoints = Sets.newHashSet(sourceConfigured.get());
- StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+ boolean isJoined = StateMachine.join(deviceId, groupIp, srcIp, deviceUplink.get());
+ if (isJoined) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
+ } else {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+ }
groupMemberMap.put(groupMemberKey, groupMember);
groupMember.updateList(recordType, sourceList);
groupMember.getSourceList().forEach(source -> {
@@ -402,6 +409,7 @@
groupMember.updateList(recordType, sourceList);
groupMember.setLeave(false);
} else {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
if (groupMember == null) {
log.info("receive leave but no instance, group " + groupIp.toString() +
" device:" + deviceId.toString() + " port:" + portNumber.toString());
@@ -418,6 +426,7 @@
}
private void leaveAction(GroupMember groupMember) {
+ igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
StateMachine.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
@@ -451,6 +460,7 @@
private class IgmpPacketProcessor implements PacketProcessor {
@Override
public void process(PacketContext context) {
+
eventExecutor.execute(() -> {
try {
InboundPacket pkt = context.inPacket();
@@ -458,6 +468,7 @@
if (ethPkt == null) {
return;
}
+ igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
return;
@@ -484,6 +495,7 @@
PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
switch (igmp.getIgmpType()) {
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
//Discard Query from OLT’s non-uplink port’s
if (!pkt.receivedFrom().port().equals(upLinkPort)) {
if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
@@ -504,49 +516,61 @@
0xff & igmp.getMaxRespField());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
log.debug("IGMP version 1 message types are not currently supported.");
break;
case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+ processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+ break;
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+ processIgmpMessage(pkt, igmp, upLinkPort, vlan);
+ break;
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
- //Discard join/leave from OLT’s uplink port’s
- if (pkt.receivedFrom().port().equals(upLinkPort) ||
- isConnectPoint(deviceId, pkt.receivedFrom().port())) {
- log.info("IGMP Picked up join/leave from uplink/connectPoint port");
- return;
- }
-
- Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
- while (itr.hasNext()) {
- IGMPGroup group = itr.next();
- if (group instanceof IGMPMembership) {
- processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
- } else if (group instanceof IGMPQuery) {
- IGMPMembership mgroup;
- mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
- mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
- IGMPMembership.MODE_IS_EXCLUDE :
- IGMPMembership.MODE_IS_INCLUDE);
- processIgmpReport(mgroup, VlanId.vlanId(vlan),
- pkt.receivedFrom(), igmp.getIgmpType());
- }
- }
+ igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+ processIgmpMessage(pkt, igmp, upLinkPort, vlan);
break;
default:
- log.info("wrong IGMP v3 type:" + igmp.getIgmpType());
+ log.warn("wrong IGMP v3 type:" + igmp.getIgmpType());
+ igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
break;
}
} catch (Exception ex) {
log.error("igmp process error : {} ", ex);
- ex.printStackTrace();
}
});
}
}
+ private void processIgmpMessage(InboundPacket pkt, IGMP igmp, PortNumber upLinkPort, short vlan) {
+ //Discard join/leave from OLT’s uplink port’s
+ if (pkt.receivedFrom().port().equals(upLinkPort) ||
+ isConnectPoint(pkt.receivedFrom().deviceId(), pkt.receivedFrom().port())) {
+ log.info("IGMP Picked up join/leave from uplink/connectPoint port");
+ return;
+ }
+
+ Iterator<IGMPGroup> itr = igmp.getGroups().iterator();
+ while (itr.hasNext()) {
+ IGMPGroup group = itr.next();
+ if (group instanceof IGMPMembership) {
+ processIgmpReport((IGMPMembership) group, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ } else {
+ IGMPMembership mgroup = new IGMPMembership(group.getGaddr().getIp4Address());
+ mgroup.setRecordType(igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT ?
+ IGMPMembership.MODE_IS_EXCLUDE :
+ IGMPMembership.MODE_IS_INCLUDE);
+ processIgmpReport(mgroup, VlanId.vlanId(vlan),
+ pkt.receivedFrom(), igmp.getIgmpType());
+ }
+ }
+
+ }
+
private class IgmpProxyTimerTask extends TimerTask {
public void run() {
try {
@@ -953,4 +977,5 @@
}
processFilterObjective(connectPoint.deviceId(), connectPoint.port(), true);
}
+
}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java b/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
new file mode 100644
index 0000000..c7c4650
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatistics.java
@@ -0,0 +1,159 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.opencord.igmpproxy;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ *
+ * Records metrics for IgmpProxy application.
+ *
+ */
+public class IgmpStatistics {
+
+ //Total number of join requests
+ private AtomicLong igmpJoinReq = new AtomicLong();
+ //Total number of successful join and rejoin requests
+ private AtomicLong igmpSuccessJoinRejoinReq = new AtomicLong();
+ //Total number of failed join requests
+ private AtomicLong igmpFailJoinReq = new AtomicLong();
+ //Total number of leaves requests
+ private AtomicLong igmpLeaveReq = new AtomicLong();
+ // Total number of disconnects
+ private AtomicLong igmpDisconnect = new AtomicLong();
+ //Count of Total number of IGMPV3_MEMBERSHIP_QUERY
+ private AtomicLong igmpv3MembershipQuery = new AtomicLong();
+ //Count of IGMPV1_MEMBERSHIP_REPORT
+ private AtomicLong igmpv1MembershipReport = new AtomicLong();
+ //Count of IGMPV3_MEMBERSHIP_REPORT
+ private AtomicLong igmpv3MembershipReport = new AtomicLong();
+ //Count of IGMPV2_MEMBERSHIP_REPORT
+ private AtomicLong igmpv2MembershipReport = new AtomicLong();
+ //Count of TYPE_IGMPV2_LEAVE_GROUP
+ private AtomicLong igmpv2LeaveGroup = new AtomicLong();
+ //Total number of messages received.
+ private AtomicLong totalMsgReceived = new AtomicLong();
+ //Total number of IGMP messages received
+ private AtomicLong igmpMsgReceived = new AtomicLong();
+ //Total number of invalid IGMP messages received
+ private AtomicLong invalidIgmpMsgReceived = new AtomicLong();
+
+ public Long getIgmpJoinReq() {
+ return igmpJoinReq.get();
+ }
+
+ public Long getIgmpSuccessJoinRejoinReq() {
+ return igmpSuccessJoinRejoinReq.get();
+ }
+
+ public Long getIgmpFailJoinReq() {
+ return igmpFailJoinReq.get();
+ }
+
+ public Long getIgmpLeaveReq() {
+ return igmpLeaveReq.get();
+ }
+
+ public Long getIgmpDisconnect() {
+ return igmpDisconnect.get();
+ }
+
+ public Long getIgmpv3MembershipQuery() {
+ return igmpv3MembershipQuery.get();
+ }
+
+ public Long getIgmpv1MemershipReport() {
+ return igmpv1MembershipReport.get();
+ }
+
+ public Long getIgmpv3MembershipReport() {
+ return igmpv3MembershipReport.get();
+ }
+
+ public Long getIgmpv2MembershipReport() {
+ return igmpv2MembershipReport.get();
+ }
+
+ public Long getIgmpv2LeaveGroup() {
+ return igmpv2LeaveGroup.get();
+ }
+
+ public Long getTotalMsgReceived() {
+ return totalMsgReceived.get();
+ }
+
+ public Long getIgmpMsgReceived() {
+ return igmpMsgReceived.get();
+ }
+
+ public Long getInvalidIgmpMsgReceived() {
+ return invalidIgmpMsgReceived.get();
+ }
+
+ public void increaseIgmpJoinReq() {
+ igmpJoinReq.incrementAndGet();
+ }
+
+ public void increaseIgmpSuccessJoinRejoinReq() {
+ igmpSuccessJoinRejoinReq.incrementAndGet();
+ }
+
+ public void increaseIgmpFailJoinReq() {
+ igmpFailJoinReq.incrementAndGet();
+ }
+
+ public void increaseIgmpLeaveReq() {
+ igmpLeaveReq.incrementAndGet();
+ }
+
+ public void increaseIgmpDisconnect() {
+ igmpDisconnect.incrementAndGet();
+ }
+
+ public void increaseIgmpv3MembershipQuery() {
+ igmpv3MembershipQuery.incrementAndGet();
+ igmpMsgReceived.incrementAndGet();
+ }
+
+ public void increaseIgmpv2MembershipReport() {
+ igmpv2MembershipReport.incrementAndGet();
+ igmpMsgReceived.incrementAndGet();
+ }
+
+ public void increaseIgmpv1MembershipReport() {
+ igmpv1MembershipReport.incrementAndGet();
+ igmpMsgReceived.incrementAndGet();
+ }
+
+ public void increaseIgmpv3MembershipReport() {
+ igmpv3MembershipReport.incrementAndGet();
+ igmpMsgReceived.incrementAndGet();
+ }
+
+ public void increaseIgmpv2LeaveGroup() {
+ igmpv2LeaveGroup.incrementAndGet();
+ igmpMsgReceived.incrementAndGet();
+ }
+
+ public void increaseInvalidIgmpMsgReceived() {
+ invalidIgmpMsgReceived.incrementAndGet();
+ }
+
+ public void increaseTotalMsgReceived() {
+ totalMsgReceived.incrementAndGet();
+ }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEvent.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEvent.java
new file mode 100644
index 0000000..6717336
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEvent.java
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.onosproject.event.AbstractEvent;
+
+
+/**
+ * Event indicating the Statistics Data of IGMP.
+ */
+public class IgmpStatisticsEvent extends
+ AbstractEvent<IgmpStatisticsEvent.Type, IgmpStatistics> {
+ /**
+ * Statistics data.
+ * IgmpStatisticsEvent event type.
+ */
+ public enum Type {
+ /**
+ * signifies that the IGMP Statistics Event stats has been updated.
+ */
+ STATS_UPDATE
+ }
+
+ public IgmpStatisticsEvent(Type type, IgmpStatistics stats) {
+ super(type, stats);
+ }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEventListener.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEventListener.java
new file mode 100644
index 0000000..7bdf8e8
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsEventListener.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.onosproject.event.EventListener;
+
+/**
+ * Listener for accounting events.
+ */
+public interface IgmpStatisticsEventListener extends
+ EventListener<IgmpStatisticsEvent> {
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsManager.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsManager.java
new file mode 100644
index 0000000..a30c5e8
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsManager.java
@@ -0,0 +1,140 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.annotations.Activate;
+import org.onlab.util.SafeRecurringTask;
+import org.onlab.util.Tools;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.event.AbstractListenerManager;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Modified;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+
+import static org.slf4j.LoggerFactory.getLogger;
+
+import java.util.Dictionary;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+import org.slf4j.Logger;
+
+import com.google.common.base.Strings;
+
+import static org.opencord.igmpproxy.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.igmpproxy.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+
+
+/**
+ *
+ * Process the stats collected in Igmp proxy application. Publish to kafka onos.
+ *
+ */
+@Component(immediate = true, property = {
+ STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+})
+public class IgmpStatisticsManager extends
+ AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
+ implements IgmpStatisticsService {
+ private final Logger log = getLogger(getClass());
+ private IgmpStatistics igmpStats;
+
+ ScheduledExecutorService executorForIgmp;
+ private ScheduledFuture<?> publisherTask;
+
+ protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ComponentConfigService cfgService;
+
+ @Override
+ public IgmpStatistics getIgmpStats() {
+ return igmpStats;
+ }
+
+ @Activate
+ public void activate(ComponentContext context) {
+ igmpStats = new IgmpStatistics();
+
+ eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
+ executorForIgmp = Executors.newScheduledThreadPool(1);
+ cfgService.registerProperties(getClass());
+ modified(context);
+ log.info("IgmpStatisticsManager Activated");
+ }
+
+ @Modified
+ public void modified(ComponentContext context) {
+ Dictionary<String, Object> properties = context.getProperties();
+
+ try {
+ String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
+ statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
+ Integer.parseInt(STATISTICS_GENERATION_PERIOD)
+ : Integer.parseInt(s.trim());
+ } catch (NumberFormatException ne) {
+ log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+ statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+ }
+ if (publisherTask != null) {
+ publisherTask.cancel(true);
+ }
+ publisherTask = executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
+ 0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
+ }
+
+ @Deactivate
+ public void deactivate() {
+ eventDispatcher.removeSink(IgmpStatisticsEvent.class);
+ publisherTask.cancel(true);
+ executorForIgmp.shutdown();
+ cfgService.unregisterProperties(getClass(), false);
+ igmpStats = null;
+ log.info("IgmpStatisticsManager Deactivated");
+ }
+
+ /**
+ * Publishes stats.
+ */
+ private void publishStats() {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Notifying stats: {}", igmpStats);
+ log.debug("--IgmpDisconnect--" + igmpStats.getIgmpDisconnect());
+ log.debug("--IgmpFailJoinReq--" + igmpStats.getIgmpFailJoinReq());
+ log.debug("--IgmpJoinReq--" + igmpStats.getIgmpJoinReq());
+ log.debug("--IgmpLeaveReq--" + igmpStats.getIgmpLeaveReq());
+ log.debug("--IgmpMsgReceived--" + igmpStats.getIgmpMsgReceived());
+ log.debug("--IgmpSuccessJoinRejoinReq--" + igmpStats.getIgmpSuccessJoinRejoinReq());
+ log.debug("--Igmpv1MemershipReport--" + igmpStats.getIgmpv1MemershipReport());
+ log.debug("--Igmpv2LeaveGroup--" + igmpStats.getIgmpv2LeaveGroup());
+ log.debug("--Igmpv2MembershipReport--" + igmpStats.getIgmpv2MembershipReport());
+ log.debug("--Igmpv3MembershipQuery--" + igmpStats.getIgmpv3MembershipQuery());
+ log.debug("--Igmpv3MembershipReport--" + igmpStats.getIgmpv3MembershipReport());
+ log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getInvalidIgmpMsgReceived());
+ log.debug("--TotalMsgReceived-- " + igmpStats.getTotalMsgReceived());
+ }
+
+ post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
+ }
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
new file mode 100644
index 0000000..3574ad5
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/IgmpStatisticsService.java
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import org.onosproject.event.ListenerService;
+
+/**
+ * Service for interacting with accounting module.
+ */
+public interface IgmpStatisticsService extends
+ ListenerService<IgmpStatisticsEvent, IgmpStatisticsEventListener> {
+ /**
+ * Returns IgmpStatistics object.
+ *
+ * @return IgmpStatistics
+ */
+ public IgmpStatistics getIgmpStats();
+
+}
diff --git a/src/main/java/org/opencord/igmpproxy/OsgiPropertyConstants.java b/src/main/java/org/opencord/igmpproxy/OsgiPropertyConstants.java
new file mode 100644
index 0000000..80d5cf0
--- /dev/null
+++ b/src/main/java/org/opencord/igmpproxy/OsgiPropertyConstants.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+/**
+ * Constants for default values of configurable properties.
+ */
+public final class OsgiPropertyConstants {
+
+ private OsgiPropertyConstants() {
+ }
+
+ public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
+ public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
+
+}
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
index 43cc24a..05581ed 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerBase.java
@@ -20,7 +20,13 @@
import org.onlab.packet.Ip4Address;
import org.onlab.packet.IpAddress;
import org.onlab.packet.MacAddress;
+import org.onosproject.cfg.ComponentConfigService;
+import org.onosproject.cfg.ConfigProperty;
import org.onosproject.core.ApplicationId;
+import org.onosproject.event.DefaultEventSinkRegistry;
+import org.onosproject.event.Event;
+import org.onosproject.event.EventDeliveryService;
+import org.onosproject.event.EventSink;
import org.onosproject.mastership.MastershipServiceAdapter;
import org.onosproject.mcast.api.McastListener;
import org.onosproject.mcast.api.McastRoute;
@@ -57,9 +63,18 @@
import org.opencord.sadis.SubscriberAndDeviceInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.osgi.framework.Bundle;
+import org.osgi.framework.BundleContext;
+import org.osgi.framework.ServiceReference;
+import org.osgi.service.component.ComponentContext;
+import org.osgi.service.component.ComponentInstance;
+
+import static com.google.common.base.Preconditions.checkState;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Dictionary;
+import java.util.Hashtable;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;
@@ -92,6 +107,8 @@
protected static final String CLIENT_CIRCUIT_ID = "CIR-PON 1/1";
protected String dsBpId = "HSIA-DS";
+ private static final int STATISTICS_GEN_PERIOD_IN_SEC = 2;
+
private static final String NNI_PREFIX = "nni";
protected List<Port> lsPorts = new ArrayList<Port>();
@@ -191,6 +208,7 @@
class TestNetworkConfigRegistry extends NetworkConfigRegistryAdapter {
Boolean igmpOnPodFlag = false;
+
TestNetworkConfigRegistry(Boolean igmpFlag) {
igmpOnPodFlag = igmpFlag;
}
@@ -377,28 +395,36 @@
}
/**
- * Sends an Ethernet packet to the process method of the Packet Processor.
+ * Sends Ethernet packet to the process method of the Packet Processor.
*
* @param reply Ethernet packet
* @throws InterruptedException
*/
- void sendPacket(Ethernet reply) {
+ void sendPacket(Ethernet reply, boolean isSingleSend) {
final ByteBuffer byteBuffer = ByteBuffer.wrap(reply.serialize());
- if (flagForPacket) {
- InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
- context = new TestPacketContext(127L, inPacket, null, false);
- flagForPacket = false;
+ if (isSingleSend) {
+ InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+ context = new TestPacketContext(127L, inBoundPacket, null, false);
packetProcessor.process(context);
} else {
- InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
- context = new TestPacketContext(127L, inBoundPacket, null, false);
- flagForPacket = true;
+ if (flagForPacket) {
+ InboundPacket inPacket = new DefaultInboundPacket(CONNECT_POINT_A, reply, byteBuffer);
+ context = new TestPacketContext(127L, inPacket, null, false);
+ flagForPacket = false;
- packetProcessor.process(context);
+ packetProcessor.process(context);
+ } else {
+ InboundPacket inBoundPacket = new DefaultInboundPacket(CONNECT_POINT_B, reply, byteBuffer);
+ context = new TestPacketContext(127L, inBoundPacket, null, false);
+ flagForPacket = true;
+
+ packetProcessor.process(context);
+ }
}
+
}
protected class MockSadisService implements SadisService {
@@ -484,4 +510,134 @@
}
}
+ protected class MockCfgService implements ComponentConfigService {
+
+ @Override
+ public Set<String> getComponentNames() {
+ return null;
+ }
+
+ @Override
+ public void registerProperties(Class<?> componentClass) {
+
+ }
+
+ @Override
+ public void unregisterProperties(Class<?> componentClass, boolean clear) {
+
+ }
+
+ @Override
+ public Set<ConfigProperty> getProperties(String componentName) {
+ return null;
+ }
+
+ @Override
+ public void setProperty(String componentName, String name, String value) {
+
+ }
+
+ @Override
+ public void preSetProperty(String componentName, String name, String value) {
+
+ }
+
+ @Override
+ public void preSetProperty(String componentName, String name, String value, boolean override) {
+
+ }
+
+ @Override
+ public void unsetProperty(String componentName, String name) {
+
+ }
+
+ @Override
+ public ConfigProperty getProperty(String componentName, String attribute) {
+ return null;
+ }
+
+ }
+
+ public static class TestEventDispatcher extends DefaultEventSinkRegistry implements EventDeliveryService {
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public synchronized void post(Event event) {
+ EventSink sink = getSink(event.getClass());
+ checkState(sink != null, "No sink for event %s", event);
+ sink.process(event);
+ }
+
+ @Override
+ public void setDispatchTimeLimit(long millis) {
+ }
+
+ @Override
+ public long getDispatchTimeLimit() {
+ return 0;
+ }
+ }
+
+ class MockComponentContext implements ComponentContext {
+
+ @Override
+ public Dictionary<String, Object> getProperties() {
+ Dictionary<String, Object> cfgDict = new Hashtable<String, Object>();
+ cfgDict.put("statisticsGenerationPeriodInSeconds", STATISTICS_GEN_PERIOD_IN_SEC);
+ return cfgDict;
+ }
+
+ @Override
+ public Object locateService(String name) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object locateService(String name, ServiceReference reference) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Object[] locateServices(String name) {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public BundleContext getBundleContext() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public Bundle getUsingBundle() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public ComponentInstance getComponentInstance() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ @Override
+ public void enableComponent(String name) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public void disableComponent(String name) {
+ // TODO Auto-generated method stub
+ }
+
+ @Override
+ public ServiceReference getServiceReference() {
+ // TODO Auto-generated method stub
+ return null;
+ }
+ }
}
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java b/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
index 21a83fe..b77a09c 100644
--- a/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
+++ b/src/test/java/org/opencord/igmpproxy/IgmpManagerTest.java
@@ -18,19 +18,26 @@
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.onlab.junit.TestUtils;
import org.onlab.packet.Ethernet;
import org.onosproject.core.CoreServiceAdapter;
import org.onosproject.net.flow.FlowRuleServiceAdapter;
import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+import org.opencord.igmpproxy.IgmpManagerBase.MockComponentContext;
import static org.junit.Assert.*;
+/**
+ * Set of tests of the ONOS application component.
+ */
public class IgmpManagerTest extends IgmpManagerBase {
private static final int WAIT_TIMEOUT = 1000;
private IgmpManager igmpManager;
+ private IgmpStatisticsManager igmpStatisticsManager;
+
// Set up the IGMP application.
@Before
public void setUp() {
@@ -43,6 +50,11 @@
igmpManager.flowRuleService = new FlowRuleServiceAdapter();
igmpManager.multicastService = new TestMulticastRouteService();
igmpManager.sadisService = new MockSadisService();
+ igmpStatisticsManager = new IgmpStatisticsManager();
+ igmpStatisticsManager.cfgService = new MockCfgService();
+ TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+ igmpStatisticsManager.activate(new MockComponentContext());
+ igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
// By default - we send query messages
SingleStateMachine.sendQuery = true;
}
@@ -84,16 +96,17 @@
Ethernet firstPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_A);
Ethernet secondPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_B);
// Sending first packet and here shouldSendjoin flag will be true
- sendPacket(firstPacket);
+ sendPacket(firstPacket, false);
// Emitted packet is stored in list savedPackets
assertNotNull(savedPackets);
synchronized (savedPackets) {
savedPackets.wait(WAIT_TIMEOUT);
}
+
assertNotNull(savedPackets);
assertEquals(1, savedPackets.size());
// Sending the second packet with same group ip address
- sendPacket(secondPacket);
+ sendPacket(secondPacket, false);
synchronized (savedPackets) {
savedPackets.wait(WAIT_TIMEOUT);
}
@@ -113,7 +126,7 @@
Ethernet firstPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_A);
Ethernet secondPacket = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_B);
// Sending first packet and here shouldSendjoin flag will be true
- sendPacket(firstPacket);
+ sendPacket(firstPacket, false);
// Emitted packet is stored in list savedPackets
synchronized (savedPackets) {
savedPackets.wait(WAIT_TIMEOUT);
@@ -122,10 +135,11 @@
assertEquals(1, savedPackets.size());
// Sending the second packet with same group ip address which will not be emitted
// shouldSendJoin flag will be false.
- sendPacket(secondPacket);
+ sendPacket(secondPacket, false);
synchronized (savedPackets) {
savedPackets.wait(WAIT_TIMEOUT);
}
assertEquals(1, savedPackets.size());
}
+
}
diff --git a/src/test/java/org/opencord/igmpproxy/IgmpStatisticsTest.java b/src/test/java/org/opencord/igmpproxy/IgmpStatisticsTest.java
new file mode 100644
index 0000000..e1e4f35
--- /dev/null
+++ b/src/test/java/org/opencord/igmpproxy/IgmpStatisticsTest.java
@@ -0,0 +1,133 @@
+/*
+ * Copyright 2017-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.igmpproxy;
+
+import static org.junit.Assert.assertEquals;
+import static org.onlab.junit.TestTools.assertAfter;
+
+import java.util.List;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.onlab.junit.TestUtils;
+import org.onlab.packet.Ethernet;
+import org.onosproject.core.CoreServiceAdapter;
+import org.onosproject.net.flow.FlowRuleServiceAdapter;
+import org.onosproject.net.flowobjective.FlowObjectiveServiceAdapter;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Set of tests of the ONOS application component for IGMP Statistics.
+ */
+public class IgmpStatisticsTest extends IgmpManagerBase {
+
+ private static final int WAIT_TIMEOUT = 500;
+
+ private IgmpManager igmpManager;
+
+ private IgmpStatisticsManager igmpStatisticsManager;
+
+ private MockIgmpStatisticsEventListener mockListener = new MockIgmpStatisticsEventListener();
+
+ // Set up the IGMP application.
+ @Before
+ public void setUp() {
+ igmpManager = new IgmpManager();
+ igmpManager.coreService = new CoreServiceAdapter();
+ igmpManager.mastershipService = new MockMastershipService();
+ igmpManager.flowObjectiveService = new FlowObjectiveServiceAdapter();
+ igmpManager.deviceService = new MockDeviceService();
+ igmpManager.packetService = new MockPacketService();
+ igmpManager.flowRuleService = new FlowRuleServiceAdapter();
+ igmpManager.multicastService = new TestMulticastRouteService();
+ igmpManager.sadisService = new MockSadisService();
+ igmpStatisticsManager = new IgmpStatisticsManager();
+ igmpStatisticsManager.cfgService = new MockCfgService();
+ igmpStatisticsManager.addListener(mockListener);
+ TestUtils.setField(igmpStatisticsManager, "eventDispatcher", new TestEventDispatcher());
+ igmpStatisticsManager.activate(new MockComponentContext());
+ igmpManager.igmpStatisticsManager = this.igmpStatisticsManager;
+ // By default - we send query messages
+ SingleStateMachine.sendQuery = true;
+ }
+
+ // Tear Down the IGMP application.
+ @After
+ public void tearDown() {
+ igmpStatisticsManager.removeListener(mockListener);
+ igmpStatisticsManager.deactivate();
+ IgmpManager.groupMemberMap.clear();
+ StateMachine.clearMap();
+ }
+
+ //Test Igmp Statistics.
+ @Test
+ public void testIgmpStatistics() throws InterruptedException {
+ igmpManager.networkConfig = new TestNetworkConfigRegistry(false);
+ igmpManager.activate();
+ //IGMPv3 Join
+ Ethernet igmpv3MembershipReportPkt = IgmpSender.getInstance().buildIgmpV3Join(GROUP_IP, SOURCE_IP_OF_A);
+ sendPacket(igmpv3MembershipReportPkt, true);
+ synchronized (savedPackets) {
+ savedPackets.wait(WAIT_TIMEOUT);
+ }
+ //Leave
+ Ethernet igmpv3LeavePkt = IgmpSender.getInstance().buildIgmpV3Leave(GROUP_IP, SOURCE_IP_OF_A);
+ sendPacket(igmpv3LeavePkt, true);
+ synchronized (savedPackets) {
+ savedPackets.wait(WAIT_TIMEOUT);
+ }
+
+ assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+ assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getTotalMsgReceived().longValue()));
+ assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpJoinReq().longValue());
+ assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpv3MembershipReport().longValue());
+ assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpSuccessJoinRejoinReq().longValue());
+
+ assertEquals((long) 1, igmpStatisticsManager.getIgmpStats().getIgmpLeaveReq().longValue());
+ assertEquals((long) 2, igmpStatisticsManager.getIgmpStats().getIgmpMsgReceived().longValue());
+
+ }
+
+ //Test Events
+ @Test
+ public void testIgmpStatisticsEvent() {
+ final int waitEventGeneration = igmpStatisticsManager.statisticsGenerationPeriodInSeconds * 1000;
+ //assert that event listened as the app activates
+ assertAfter(WAIT_TIMEOUT, WAIT_TIMEOUT * 2, () ->
+ assertEquals(mockListener.events.size(), 1));
+
+ assertAfter(waitEventGeneration / 2, waitEventGeneration, () ->
+ assertEquals(mockListener.events.size(), 2));
+
+ for (IgmpStatisticsEvent event : mockListener.events) {
+ assertEquals(event.type(), IgmpStatisticsEvent.Type.STATS_UPDATE);
+ }
+ }
+
+ public class MockIgmpStatisticsEventListener implements IgmpStatisticsEventListener {
+ protected List<IgmpStatisticsEvent> events = Lists.newArrayList();
+
+ @Override
+ public void event(IgmpStatisticsEvent event) {
+ events.add(event);
+ }
+
+ }
+}