blob: ef932ef96fd625325a9b10fd95f6aa9657d240d4 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.opencord.igmpproxy.impl;
import org.onlab.util.KryoNamespace;
import org.onosproject.cluster.NodeId;
import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
import org.onosproject.store.cluster.messaging.ClusterMessage;
import org.onosproject.store.cluster.messaging.MessageSubject;
import org.onosproject.store.serializers.KryoNamespaces;
import org.onosproject.store.service.EventuallyConsistentMap;
import org.onosproject.store.service.Serializer;
import org.onosproject.store.service.StorageService;
import org.onosproject.store.service.WallClockTimestamp;
import org.opencord.igmpproxy.IgmpStatisticType;
import org.opencord.igmpproxy.IgmpStatisticsEvent;
import org.opencord.igmpproxy.IgmpStatisticsEventListener;
import org.opencord.igmpproxy.IgmpStatisticsService;
import org.opencord.igmpproxy.IgmpLeadershipService;
import org.opencord.igmpproxy.IgmpStatistics;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.onlab.util.SafeRecurringTask;
import org.onlab.util.Tools;
import org.onosproject.cfg.ComponentConfigService;
import org.onosproject.event.AbstractListenerManager;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Modified;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
import static org.slf4j.LoggerFactory.getLogger;
import java.util.Dictionary;
import java.util.Objects;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import com.google.common.base.Strings;
/**
* Process the stats collected in Igmp proxy application. Publish to kafka onos.
*/
@Component(immediate = true, property = {
STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
})
public class IgmpStatisticsManager extends
AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
implements IgmpStatisticsService {
private static final String IGMP_STATISTICS = "igmp-statistics";
private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
private final Logger log = getLogger(getClass());
private IgmpStatistics igmpStats;
private ScheduledExecutorService executorForIgmp;
private ScheduledFuture<?> publisherTask;
private ScheduledFuture<?> syncTask;
protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
private KryoNamespace statSerializer = KryoNamespace.newBuilder()
.register(KryoNamespaces.API)
.register(IgmpStatistics.class)
.register(ClusterMessage.class)
.register(MessageSubject.class)
.build();
//Statistics values are valid or invalid
private AtomicBoolean validityCheck = new AtomicBoolean(false);
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ComponentConfigService cfgService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected StorageService storageService;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected IgmpLeadershipService leadershipManager;
@Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterCommunicationService clusterCommunicationService;
@Activate
public void activate(ComponentContext context) {
igmpStats = getIgmpStatsInstance();
statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
.withName(IGMP_STATISTICS)
.withSerializer(statSerializer)
.withTimestampProvider((k, v) -> new WallClockTimestamp())
.build();
initStats(statistics.get(leadershipManager.getLocalNodeId()));
syncStats();
leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
executorForIgmp = Executors.newScheduledThreadPool(1);
cfgService.registerProperties(getClass());
clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
this::resetLocal, executorForIgmp);
modified(context);
log.info("IgmpStatisticsManager Activated");
}
@Modified
public void modified(ComponentContext context) {
Dictionary<String, Object> properties = context.getProperties();
try {
String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
Integer.parseInt(STATISTICS_GENERATION_PERIOD)
: Integer.parseInt(s.trim());
log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
Integer.parseInt(STATISTICS_SYNC_PERIOD)
: Integer.parseInt(s.trim());
log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
} catch (NumberFormatException ne) {
log.error("Unable to parse configuration parameter", ne);
statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
}
stopPublishTask();
stopSyncTask();
startPublishTask();
startSyncTask();
}
@Deactivate
public void deactivate() {
eventDispatcher.removeSink(IgmpStatisticsEvent.class);
stopPublishTask();
stopSyncTask();
executorForIgmp.shutdown();
cfgService.unregisterProperties(getClass(), false);
igmpStats = null;
clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
log.info("IgmpStatisticsManager Deactivated");
}
private IgmpStatistics getIgmpStatsInstance() {
if (igmpStats == null) {
igmpStats = new IgmpStatistics();
log.info("Instance of igmp-statistics created.");
}
return igmpStats;
}
private void syncStats() {
if (!validityCheck.get()) {
//sync with valid values
statistics.put(leadershipManager.getLocalNodeId(), snapshot());
validityCheck.set(true);
log.debug("Valid statistic values are put.");
}
}
private void initStats(IgmpStatistics init) {
if (init == null) {
log.warn("Igmp statistics was not created.");
return;
}
igmpStats.setStats(init);
}
private IgmpStatistics snapshot() {
return getIgmpStatsInstance();
}
private void startSyncTask() {
syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
}
private void stopSyncTask() {
stopTask(syncTask);
log.debug("Sync task stopped.");
}
private void startPublishTask() {
publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
}
private void stopPublishTask() {
stopTask(publisherTask);
log.debug("Publisher task stopped.");
}
private ScheduledFuture<?> startTask(Runnable r, int rate) {
return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
0, rate, TimeUnit.SECONDS);
}
private void stopTask(ScheduledFuture<?> task) {
if (task != null) {
task.cancel(true);
}
}
private void resetLocal(ClusterMessage message) {
//reset all-statistics
igmpStats.resetAll();
validityCheck.set(false);
}
/**
* Publishes stats.
*/
private void publishStats() {
// Only publish events if we are the cluster leader for Igmp-stats
if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
leadershipManager.getLocalNodeId())) {
log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
return;
}
if (log.isDebugEnabled()) {
log.debug("Notifying stats: {}", igmpStats);
log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
log.debug("--IgmpSuccessJoinRejoinReq--" +
igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
log.debug("--TotalMsgReceived-- " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
log.debug("--UnknownIgmpTypePacketsRx--" +
igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
log.debug("--ReportsRxWithWrongMode--" +
igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
log.debug("--FailJoinReqInsuffPermission--" +
igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
log.debug("--FailJoinReqUnknownMulticastIp--" +
igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
log.debug("--IgmpValidChecksumCounter--" +
igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
log.debug("--IgmpGeneralMembershipQuery--" +
igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
log.debug("--IgmpGrpSpecificMembershipQuery--" +
igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
}
post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
}
@Override
public void increaseStat(IgmpStatisticType type) {
igmpStats.increaseStat(type);
validityCheck.set(false);
}
@Override
public void resetAllStats() {
ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
Serializer.using(statSerializer)::encode);
}
@Override
public Long getStat(IgmpStatisticType type) {
return igmpStats.getStat(type);
}
}