SEBA-989-Instance coordination and state distribution mechanism in IgmpStatisticsManager
Change-Id: Ibf3f3a2c5c91c010ef909692eea913f95ee7a92e
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
index 4f2e7bb..c40bca2 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpLeadershipManager.java
@@ -17,6 +17,7 @@
import org.onosproject.cluster.ClusterService;
import org.onosproject.cluster.LeadershipService;
+import org.onosproject.cluster.Leadership;
import org.onosproject.cluster.NodeId;
import org.onosproject.mastership.MastershipService;
import org.onosproject.net.DeviceId;
@@ -69,4 +70,24 @@
}
return true;
}
+
+ @Override
+ public NodeId getLocalNodeId() {
+ return clusterService.getLocalNode().id();
+ }
+
+ @Override
+ public NodeId getLeader(String topic) {
+ return leadershipService.getLeader(topic);
+ }
+
+ @Override
+ public Leadership runForLeadership(String topic) {
+ return leadershipService.runForLeadership(topic);
+ }
+
+ @Override
+ public void withdraw(String topic) {
+ leadershipService.withdraw(topic);
+ }
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
index ed3dad8..e32bc4a 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpManager.java
@@ -18,6 +18,7 @@
import com.google.common.collect.Sets;
import org.onosproject.net.Device;
import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatisticType;
import org.opencord.igmpproxy.IgmpStatisticsService;
import org.opencord.igmpproxy.impl.store.groupmember.GroupMember;
import org.opencord.igmpproxy.GroupMemberId;
@@ -302,10 +303,10 @@
maxResp = calculateMaxResp(maxResp);
if (gAddr != null && !gAddr.isZero()) {
stateMachineService.specialQuery(deviceId, gAddr, maxResp);
- igmpStatisticsManager.getIgmpStats().increaseIgmpGrpSpecificMembershipQuery();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY);
} else {
stateMachineService.generalQuery(deviceId, maxResp);
- igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY);
}
}
@@ -320,15 +321,15 @@
Optional<SubscriberAndDeviceInformation> accessDevice = getSubscriberAndDeviceInformation(device.id());
if (accessDevice.isPresent()) {
stateMachineService.specialQuery(device.id(), gAddr, maxResponseTime);
- igmpStatisticsManager.getIgmpStats().increaseIgmpGrpAndSrcSpecificMembershipQuery();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY);
}
});
- igmpStatisticsManager.getIgmpStats().increaseCurrentGrpNumCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER);
} else {
//Don't know which group is targeted by the query
//So query all the members(in all the OLTs) and proxy their reports
stateMachineService.generalQuery(maxResponseTime);
- igmpStatisticsManager.getIgmpStats().increaseIgmpGeneralMembershipQuery();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY);
}
}
@@ -354,7 +355,7 @@
Ip4Address groupIp = igmpGroup.getGaddr().getIp4Address();
if (!groupIp.isMulticast()) {
log.info(groupIp.toString() + " is not a valid group address");
- igmpStatisticsManager.getIgmpStats().increaseFailJoinReqUnknownMulticastIpCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER);
return;
}
Ip4Address srcIp = getDeviceIp(deviceId);
@@ -365,7 +366,7 @@
ArrayList<Ip4Address> sourceList = new ArrayList<>();
if (!validMembershipModes.contains(recordType)) {
- igmpStatisticsManager.getIgmpStats().increaseReportsRxWithWrongModeCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER);
}
if (igmpGroup.getSources().size() > 0) {
igmpGroup.getSources().forEach(source -> sourceList.add(source.getIp4Address()));
@@ -404,14 +405,15 @@
GroupMember groupMember = groupMemberStore.getGroupMember(groupMemberKey);
if (join) {
- igmpStatisticsManager.getIgmpStats().increaseIgmpJoinReq();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_JOIN_REQ);
if (groupMember == null) {
Optional<ConnectPoint> sourceConfigured = getSource();
if (!sourceConfigured.isPresent()) {
- igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ);
log.warn("Unable to process IGMP Join from {} since no source " +
"configuration is found.", deviceId);
- igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
+ igmpStatisticsManager
+ .increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER);
return;
}
@@ -432,10 +434,10 @@
boolean isJoined = stateMachineService.join(deviceId, groupIp, srcIp, deviceUplink.get());
if (isJoined) {
- igmpStatisticsManager.getIgmpStats().increaseIgmpSuccessJoinRejoinReq();
- igmpStatisticsManager.getIgmpStats().increaseIgmpChannelJoinCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ);
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER);
} else {
- igmpStatisticsManager.getIgmpStats().increaseIgmpFailJoinReq();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ);
}
groupMemberStore.putGroupMember(groupMember);
log.debug("Group member created with id: {}", groupMember.getGroupMemberId());
@@ -449,7 +451,7 @@
//add sink to the route
multicastService.addSinks(route, Sets.newHashSet(cp));
});
- igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER);
}
groupMember.resetAllTimers();
@@ -458,11 +460,11 @@
//put updated member to the store
groupMemberStore.putGroupMember(groupMember);
} else {
- igmpStatisticsManager.getIgmpStats().increaseIgmpLeaveReq();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_LEAVE_REQ);
if (groupMember == null) {
log.info("receive leave but no instance, group {} device: {} port:{}",
groupIp, deviceId, portNumber);
- igmpStatisticsManager.getIgmpStats().increaseUnconfiguredGroupCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER);
return;
} else {
groupMember.setLeave(true);
@@ -478,7 +480,7 @@
}
private void leaveAction(GroupMember groupMember) {
- igmpStatisticsManager.getIgmpStats().increaseIgmpDisconnect();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_DISCONNECT);
ConnectPoint cp = new ConnectPoint(groupMember.getDeviceId(), groupMember.getPortNumber());
stateMachineService.leave(groupMember.getDeviceId(), groupMember.getGroupIp());
groupMember.getSourceList().forEach(source -> multicastService.removeSinks(
@@ -524,7 +526,7 @@
if (ethPkt == null) {
return;
}
- igmpStatisticsManager.getIgmpStats().increaseTotalMsgReceived();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.TOTAL_MSG_RECEIVED);
if (ethPkt.getEtherType() != Ethernet.TYPE_IPV4) {
return;
@@ -536,14 +538,15 @@
return;
}
- igmpStatisticsManager.getIgmpStats().increaseIgmpValidChecksumCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER);
short vlan = ethPkt.getVlanID();
DeviceId deviceId = pkt.receivedFrom().deviceId();
if (!isConnectPoint(deviceId, pkt.receivedFrom().port()) &&
!getSubscriberAndDeviceInformation(deviceId).isPresent()) {
log.error("Device not registered in netcfg : {}", deviceId);
- igmpStatisticsManager.getIgmpStats().increaseFailJoinReqInsuffPermissionAccessCounter();
+ igmpStatisticsManager
+ .increaseStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER);
return;
}
@@ -553,7 +556,7 @@
PortNumber upLinkPort = deviceUpLinkOpt.isPresent() ? deviceUpLinkOpt.get() : null;
switch (igmp.getIgmpType()) {
case IGMP.TYPE_IGMPV3_MEMBERSHIP_QUERY:
- igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipQuery();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY);
//Discard Query from OLT’s non-uplink port’s
if (!pkt.receivedFrom().port().equals(upLinkPort)) {
if (isConnectPoint(deviceId, pkt.receivedFrom().port())) {
@@ -574,26 +577,26 @@
0xff & igmp.getMaxRespField());
break;
case IGMP.TYPE_IGMPV1_MEMBERSHIP_REPORT:
- igmpStatisticsManager.getIgmpStats().increaseIgmpv1MembershipReport();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT);
log.debug("IGMP version 1 message types are not currently supported.");
break;
case IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT:
- igmpStatisticsManager.getIgmpStats().increaseIgmpv3MembershipReport();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT);
processIgmpMessage(pkt, igmp, upLinkPort, vlan);
break;
case IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT:
- igmpStatisticsManager.getIgmpStats().increaseIgmpv2MembershipReport();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT);
processIgmpMessage(pkt, igmp, upLinkPort, vlan);
break;
case IGMP.TYPE_IGMPV2_LEAVE_GROUP:
- igmpStatisticsManager.getIgmpStats().increaseIgmpv2LeaveGroup();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP);
processIgmpMessage(pkt, igmp, upLinkPort, vlan);
break;
default:
log.warn("Unknown IGMP message type: {}", igmp.getIgmpType());
- igmpStatisticsManager.getIgmpStats().increaseInvalidIgmpMsgReceived();
- igmpStatisticsManager.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED);
+ igmpStatisticsManager.increaseStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER);
break;
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
index e6584ad..9bda1c9 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpSender.java
@@ -33,6 +33,7 @@
import org.onosproject.net.packet.OutboundPacket;
import org.onosproject.net.packet.PacketService;
import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatisticType;
import org.opencord.igmpproxy.IgmpStatisticsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -213,7 +214,7 @@
break;
default:
log.debug("Unknown igmp type: {} ", type);
- igmpStatisticsService.getIgmpStats().increaseUnknownIgmpTypePacketsRxCounter();
+ igmpStatisticsService.increaseStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER);
return null;
}
@@ -280,16 +281,16 @@
// This counter will be useful in future if we change the procedure to generate the packets.
if ((igmp.getIgmpType() == IGMP.TYPE_IGMPV2_MEMBERSHIP_REPORT
|| igmp.getIgmpType() == IGMP.TYPE_IGMPV2_LEAVE_GROUP) && igmp.serialize().length < IGMPv2.HEADER_LENGTH) {
- igmpStatisticsService.getIgmpStats().increaseInvalidIgmpLength();
+ igmpStatisticsService.increaseStat(IgmpStatisticType.INVALID_IGMP_LENGTH);
} else if (igmp.getIgmpType() == IGMP.TYPE_IGMPV3_MEMBERSHIP_REPORT
&& igmp.serialize().length < IGMPv3.MINIMUM_HEADER_LEN) {
- igmpStatisticsService.getIgmpStats().increaseInvalidIgmpLength();
+ igmpStatisticsService.increaseStat(IgmpStatisticType.INVALID_IGMP_LENGTH);
}
TrafficTreatment treatment = DefaultTrafficTreatment.builder()
.setOutput(portNumber).build();
OutboundPacket packet = new DefaultOutboundPacket(deviceId,
treatment, ByteBuffer.wrap(ethPkt.serialize()));
- igmpStatisticsService.getIgmpStats().increaseValidIgmpPacketCounter();
+ igmpStatisticsService.increaseStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER);
packetService.emit(packet);
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
index 21d0011..69ba6b6 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/IgmpStatisticsManager.java
@@ -16,6 +16,22 @@
package org.opencord.igmpproxy.impl;
+import org.onlab.util.KryoNamespace;
+import org.onosproject.cluster.NodeId;
+import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
+import org.onosproject.store.cluster.messaging.ClusterMessage;
+import org.onosproject.store.cluster.messaging.MessageSubject;
+import org.onosproject.store.serializers.KryoNamespaces;
+import org.onosproject.store.service.EventuallyConsistentMap;
+import org.onosproject.store.service.Serializer;
+import org.onosproject.store.service.StorageService;
+import org.onosproject.store.service.WallClockTimestamp;
+import org.opencord.igmpproxy.IgmpStatisticType;
+import org.opencord.igmpproxy.IgmpStatisticsEvent;
+import org.opencord.igmpproxy.IgmpStatisticsEventListener;
+import org.opencord.igmpproxy.IgmpStatisticsService;
+import org.opencord.igmpproxy.IgmpLeadershipService;
+import org.opencord.igmpproxy.IgmpStatistics;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
@@ -28,61 +44,95 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
-import org.opencord.igmpproxy.IgmpStatistics;
-import org.opencord.igmpproxy.IgmpStatisticsService;
-import org.opencord.igmpproxy.IgmpStatisticsEvent;
-import org.opencord.igmpproxy.IgmpStatisticsEventListener;
-
import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
+import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Dictionary;
+import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import com.google.common.base.Strings;
-
/**
- *
* Process the stats collected in Igmp proxy application. Publish to kafka onos.
- *
*/
@Component(immediate = true, property = {
STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
+ STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
})
public class IgmpStatisticsManager extends
- AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
- implements IgmpStatisticsService {
+ AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
+ implements IgmpStatisticsService {
+ private static final String IGMP_STATISTICS = "igmp-statistics";
+ private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
+
private final Logger log = getLogger(getClass());
private IgmpStatistics igmpStats;
- ScheduledExecutorService executorForIgmp;
+ private ScheduledExecutorService executorForIgmp;
private ScheduledFuture<?> publisherTask;
+ private ScheduledFuture<?> syncTask;
protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+ protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
+
+ private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
+
+ private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
+
+ private KryoNamespace statSerializer = KryoNamespace.newBuilder()
+ .register(KryoNamespaces.API)
+ .register(IgmpStatistics.class)
+ .build();
+
+ //Statistics values are valid or invalid
+ private AtomicBoolean validityCheck = new AtomicBoolean(false);
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService cfgService;
- @Override
- public IgmpStatistics getIgmpStats() {
- return igmpStats;
- }
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected StorageService storageService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected IgmpLeadershipService leadershipManager;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
+ protected ClusterCommunicationService clusterCommunicationService;
@Activate
public void activate(ComponentContext context) {
- igmpStats = new IgmpStatistics();
+ igmpStats = getIgmpStatsInstance();
+
+
+ statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
+ .withName(IGMP_STATISTICS)
+ .withSerializer(statSerializer)
+ .withTimestampProvider((k, v) -> new WallClockTimestamp())
+ .build();
+
+ initStats(statistics.get(leadershipManager.getLocalNodeId()));
+ syncStats();
+
+ leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
executorForIgmp = Executors.newScheduledThreadPool(1);
cfgService.registerProperties(getClass());
+
+ clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
+ this::resetLocal, executorForIgmp);
+
modified(context);
log.info("IgmpStatisticsManager Activated");
}
@@ -90,69 +140,175 @@
@Modified
public void modified(ComponentContext context) {
Dictionary<String, Object> properties = context.getProperties();
-
try {
String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
- Integer.parseInt(STATISTICS_GENERATION_PERIOD)
+ Integer.parseInt(STATISTICS_GENERATION_PERIOD)
: Integer.parseInt(s.trim());
+ log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
+ statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
+ Integer.parseInt(STATISTICS_SYNC_PERIOD)
+ : Integer.parseInt(s.trim());
+ log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
} catch (NumberFormatException ne) {
- log.error("Unable to parse configuration parameter for eventGenerationPeriodInSeconds", ne);
+ log.error("Unable to parse configuration parameter", ne);
statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
+ statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
}
- if (publisherTask != null) {
- publisherTask.cancel(true);
- }
- publisherTask = executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
- 0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
+ stopPublishTask();
+ stopSyncTask();
+
+ startPublishTask();
+ startSyncTask();
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(IgmpStatisticsEvent.class);
- publisherTask.cancel(true);
+ stopPublishTask();
+ stopSyncTask();
executorForIgmp.shutdown();
cfgService.unregisterProperties(getClass(), false);
igmpStats = null;
+ clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
+ leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
log.info("IgmpStatisticsManager Deactivated");
}
+ private IgmpStatistics getIgmpStatsInstance() {
+ if (igmpStats == null) {
+ igmpStats = new IgmpStatistics();
+ log.info("Instance of igmp-statistics created.");
+ }
+ return igmpStats;
+ }
+
+ private void syncStats() {
+ if (!validityCheck.get()) {
+ //sync with valid values
+ statistics.put(leadershipManager.getLocalNodeId(), snapshot());
+ validityCheck.set(true);
+ log.debug("Valid statistic values are put.");
+ }
+ }
+
+ private void initStats(IgmpStatistics init) {
+ if (init == null) {
+ log.warn("Igmp statistics was not created.");
+ return;
+ }
+ igmpStats.setStats(init);
+ }
+
+ private IgmpStatistics snapshot() {
+ return getIgmpStatsInstance();
+ }
+
+ private void startSyncTask() {
+ syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
+ log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
+ }
+
+ private void stopSyncTask() {
+ stopTask(syncTask);
+ log.debug("Sync task stopped.");
+ }
+
+ private void startPublishTask() {
+ publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
+ log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
+ }
+
+ private void stopPublishTask() {
+ stopTask(publisherTask);
+ log.debug("Publisher task stopped.");
+ }
+
+ private ScheduledFuture<?> startTask(Runnable r, int rate) {
+ return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
+ 0, rate, TimeUnit.SECONDS);
+ }
+
+ private void stopTask(ScheduledFuture<?> task) {
+ if (task != null) {
+ task.cancel(true);
+ }
+ }
+
+ private void resetLocal(ClusterMessage message) {
+ //reset all-statistics
+ igmpStats.resetAll();
+ validityCheck.set(false);
+ }
+
/**
* Publishes stats.
*/
private void publishStats() {
+ // Only publish events if we are the cluster leader for Igmp-stats
+ if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
+ leadershipManager.getLocalNodeId())) {
+ log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
+ return;
+ }
if (log.isDebugEnabled()) {
log.debug("Notifying stats: {}", igmpStats);
- log.debug("--IgmpDisconnect--" + igmpStats.getIgmpDisconnect());
- log.debug("--IgmpFailJoinReq--" + igmpStats.getIgmpFailJoinReq());
- log.debug("--IgmpJoinReq--" + igmpStats.getIgmpJoinReq());
- log.debug("--IgmpLeaveReq--" + igmpStats.getIgmpLeaveReq());
- log.debug("--IgmpMsgReceived--" + igmpStats.getIgmpMsgReceived());
- log.debug("--IgmpSuccessJoinRejoinReq--" + igmpStats.getIgmpSuccessJoinRejoinReq());
- log.debug("--Igmpv1MemershipReport--" + igmpStats.getIgmpv1MemershipReport());
- log.debug("--Igmpv2LeaveGroup--" + igmpStats.getIgmpv2LeaveGroup());
- log.debug("--Igmpv2MembershipReport--" + igmpStats.getIgmpv2MembershipReport());
- log.debug("--Igmpv3MembershipQuery--" + igmpStats.getIgmpv3MembershipQuery());
- log.debug("--Igmpv3MembershipReport--" + igmpStats.getIgmpv3MembershipReport());
- log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getInvalidIgmpMsgReceived());
- log.debug("--TotalMsgReceived-- " + igmpStats.getTotalMsgReceived());
- log.debug("--UnknownIgmpTypePacketsRx--" + igmpStats.getUnknownIgmpTypePacketsRxCounter());
- log.debug("--ReportsRxWithWrongMode--" + igmpStats.getReportsRxWithWrongModeCounter());
- log.debug("--FailJoinReqInsuffPermission--" + igmpStats.getFailJoinReqInsuffPermissionAccessCounter());
- log.debug("--FailJoinReqUnknownMulticastIp--" + igmpStats.getFailJoinReqUnknownMulticastIpCounter());
- log.debug("--UnconfiguredGroupCounter--" + igmpStats.getUnconfiguredGroupCounter());
- log.debug("--ValidIgmpPacketCounter--" + igmpStats.getValidIgmpPacketCounter());
- log.debug("--IgmpChannelJoinCounter--" + igmpStats.getIgmpChannelJoinCounter());
- log.debug("--CurrentGrpNumCounter--" + igmpStats.getCurrentGrpNumCounter());
- log.debug("--IgmpValidChecksumCounter--" + igmpStats.getIgmpValidChecksumCounter());
- log.debug("--InvalidIgmpLength--" + igmpStats.getInvalidIgmpLength());
- log.debug("--IgmpGeneralMembershipQuery--" + igmpStats.getIgmpGeneralMembershipQuery());
- log.debug("--IgmpGrpSpecificMembershipQuery--" + igmpStats.getIgmpGrpSpecificMembershipQuery());
- log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" + igmpStats.getIgmpGrpAndSrcSpecificMembershipQuery());
+ log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
+ log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
+ log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
+ log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
+ log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
+ log.debug("--IgmpSuccessJoinRejoinReq--" +
+ igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
+ log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
+ log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
+ log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
+ log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
+ log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
+ log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
+ log.debug("--TotalMsgReceived-- " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
+ log.debug("--UnknownIgmpTypePacketsRx--" +
+ igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
+ log.debug("--ReportsRxWithWrongMode--" +
+ igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
+ log.debug("--FailJoinReqInsuffPermission--" +
+ igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
+ log.debug("--FailJoinReqUnknownMulticastIp--" +
+ igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
+ log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
+ log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
+ log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
+ log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
+ log.debug("--IgmpValidChecksumCounter--" +
+ igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
+ log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
+ log.debug("--IgmpGeneralMembershipQuery--" +
+ igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
+ log.debug("--IgmpGrpSpecificMembershipQuery--" +
+ igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
+ log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
+ igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
}
post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
}
+ @Override
+ public void increaseStat(IgmpStatisticType type) {
+ igmpStats.increaseStat(type);
+ validityCheck.set(false);
+ }
+
+ @Override
+ public void resetAllStats() {
+ ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
+ clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
+ Serializer.using(statSerializer)::encode);
+ }
+
+ @Override
+ public Long getStat(IgmpStatisticType type) {
+ return igmpStats.getStat(type);
+ }
}
diff --git a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
index c99a3b7..26bc419 100644
--- a/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
+++ b/app/src/main/java/org/opencord/igmpproxy/impl/OsgiPropertyConstants.java
@@ -26,4 +26,7 @@
public static final String STATISTICS_GENERATION_PERIOD = "statisticsGenerationPeriodInSeconds";
public static final int STATISTICS_GENERATION_PERIOD_DEFAULT = 20;
+
+ public static final String STATISTICS_SYNC_PERIOD = "statisticsSyncPeriodInSeconds";
+ public static final int STATISTICS_SYNC_PERIOD_DEFAULT = 10;
}