blob: 7d0f98ee75779d10a9ddd847d5ec9d7a0f04078f [file] [log] [blame]
Shubham Sharma47f2caf2020-02-18 12:13:40 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
developere400c582020-03-24 19:42:08 +010017package org.opencord.igmpproxy.impl;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000018
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030019import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
22import org.onosproject.store.cluster.messaging.ClusterMessage;
23import org.onosproject.store.cluster.messaging.MessageSubject;
24import org.onosproject.store.serializers.KryoNamespaces;
25import org.onosproject.store.service.EventuallyConsistentMap;
26import org.onosproject.store.service.Serializer;
27import org.onosproject.store.service.StorageService;
28import org.onosproject.store.service.WallClockTimestamp;
29import org.opencord.igmpproxy.IgmpStatisticType;
30import org.opencord.igmpproxy.IgmpStatisticsEvent;
31import org.opencord.igmpproxy.IgmpStatisticsEventListener;
32import org.opencord.igmpproxy.IgmpStatisticsService;
33import org.opencord.igmpproxy.IgmpLeadershipService;
34import org.opencord.igmpproxy.IgmpStatistics;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000035import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.ComponentContext;
37import org.osgi.service.component.annotations.Activate;
38import org.onlab.util.SafeRecurringTask;
39import org.onlab.util.Tools;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.event.AbstractListenerManager;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Modified;
44import org.osgi.service.component.annotations.Reference;
45import org.osgi.service.component.annotations.ReferenceCardinality;
46
developere400c582020-03-24 19:42:08 +010047import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030048import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
developere400c582020-03-24 19:42:08 +010049import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030050import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000051import static org.slf4j.LoggerFactory.getLogger;
52
53import java.util.Dictionary;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030054import java.util.Objects;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000055import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
57import java.util.concurrent.ScheduledFuture;
58import java.util.concurrent.TimeUnit;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030059import java.util.concurrent.atomic.AtomicBoolean;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000060
61import org.slf4j.Logger;
62
63import com.google.common.base.Strings;
64
Shubham Sharma47f2caf2020-02-18 12:13:40 +000065
Shubham Sharma47f2caf2020-02-18 12:13:40 +000066/**
Shubham Sharma47f2caf2020-02-18 12:13:40 +000067 * Process the stats collected in Igmp proxy application. Publish to kafka onos.
Shubham Sharma47f2caf2020-02-18 12:13:40 +000068 */
69@Component(immediate = true, property = {
70 STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030071 STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
Shubham Sharma47f2caf2020-02-18 12:13:40 +000072})
73public class IgmpStatisticsManager extends
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030074 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
75 implements IgmpStatisticsService {
76 private static final String IGMP_STATISTICS = "igmp-statistics";
77 private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
78
Shubham Sharma47f2caf2020-02-18 12:13:40 +000079 private final Logger log = getLogger(getClass());
80 private IgmpStatistics igmpStats;
81
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030082 private ScheduledExecutorService executorForIgmp;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000083 private ScheduledFuture<?> publisherTask;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030084 private ScheduledFuture<?> syncTask;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000085
86 protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030087 protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
88
89 private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
90
91 private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
92
93 private KryoNamespace statSerializer = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API)
95 .register(IgmpStatistics.class)
pier257da462020-06-18 12:19:30 +020096 .register(ClusterMessage.class)
97 .register(MessageSubject.class)
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030098 .build();
99
100 //Statistics values are valid or invalid
101 private AtomicBoolean validityCheck = new AtomicBoolean(false);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected ComponentConfigService cfgService;
105
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected StorageService storageService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected IgmpLeadershipService leadershipManager;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected ClusterCommunicationService clusterCommunicationService;
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000114
115 @Activate
116 public void activate(ComponentContext context) {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300117 igmpStats = getIgmpStatsInstance();
118
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300119 statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
120 .withName(IGMP_STATISTICS)
121 .withSerializer(statSerializer)
122 .withTimestampProvider((k, v) -> new WallClockTimestamp())
123 .build();
124
125 initStats(statistics.get(leadershipManager.getLocalNodeId()));
126 syncStats();
127
128 leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000129
130 eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
131 executorForIgmp = Executors.newScheduledThreadPool(1);
132 cfgService.registerProperties(getClass());
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300133
134 clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
135 this::resetLocal, executorForIgmp);
136
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000137 modified(context);
138 log.info("IgmpStatisticsManager Activated");
139 }
140
141 @Modified
142 public void modified(ComponentContext context) {
143 Dictionary<String, Object> properties = context.getProperties();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000144 try {
145 String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
146 statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300147 Integer.parseInt(STATISTICS_GENERATION_PERIOD)
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000148 : Integer.parseInt(s.trim());
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300149 log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
150 statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
151 Integer.parseInt(STATISTICS_SYNC_PERIOD)
152 : Integer.parseInt(s.trim());
153 log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000154 } catch (NumberFormatException ne) {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300155 log.error("Unable to parse configuration parameter", ne);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000156 statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300157 statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000158 }
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300159 stopPublishTask();
160 stopSyncTask();
161
162 startPublishTask();
163 startSyncTask();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000164 }
165
166 @Deactivate
167 public void deactivate() {
168 eventDispatcher.removeSink(IgmpStatisticsEvent.class);
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300169 stopPublishTask();
170 stopSyncTask();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000171 executorForIgmp.shutdown();
172 cfgService.unregisterProperties(getClass(), false);
173 igmpStats = null;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300174 clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
175 leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000176 log.info("IgmpStatisticsManager Deactivated");
177 }
178
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300179 private IgmpStatistics getIgmpStatsInstance() {
180 if (igmpStats == null) {
181 igmpStats = new IgmpStatistics();
182 log.info("Instance of igmp-statistics created.");
183 }
184 return igmpStats;
185 }
186
187 private void syncStats() {
188 if (!validityCheck.get()) {
189 //sync with valid values
190 statistics.put(leadershipManager.getLocalNodeId(), snapshot());
191 validityCheck.set(true);
192 log.debug("Valid statistic values are put.");
193 }
194 }
195
196 private void initStats(IgmpStatistics init) {
197 if (init == null) {
198 log.warn("Igmp statistics was not created.");
199 return;
200 }
201 igmpStats.setStats(init);
202 }
203
204 private IgmpStatistics snapshot() {
205 return getIgmpStatsInstance();
206 }
207
208 private void startSyncTask() {
209 syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
210 log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
211 }
212
213 private void stopSyncTask() {
214 stopTask(syncTask);
215 log.debug("Sync task stopped.");
216 }
217
218 private void startPublishTask() {
219 publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
220 log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
221 }
222
223 private void stopPublishTask() {
224 stopTask(publisherTask);
225 log.debug("Publisher task stopped.");
226 }
227
228 private ScheduledFuture<?> startTask(Runnable r, int rate) {
229 return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
230 0, rate, TimeUnit.SECONDS);
231 }
232
233 private void stopTask(ScheduledFuture<?> task) {
234 if (task != null) {
235 task.cancel(true);
236 }
237 }
238
239 private void resetLocal(ClusterMessage message) {
240 //reset all-statistics
241 igmpStats.resetAll();
242 validityCheck.set(false);
243 }
244
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000245 /**
246 * Publishes stats.
247 */
248 private void publishStats() {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300249 // Only publish events if we are the cluster leader for Igmp-stats
250 if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
251 leadershipManager.getLocalNodeId())) {
252 log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
253 return;
254 }
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000255
256 if (log.isDebugEnabled()) {
257 log.debug("Notifying stats: {}", igmpStats);
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300258 log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
259 log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
260 log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
261 log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
262 log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
263 log.debug("--IgmpSuccessJoinRejoinReq--" +
264 igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
265 log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
266 log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
267 log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
268 log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
269 log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
270 log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
271 log.debug("--TotalMsgReceived-- " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
272 log.debug("--UnknownIgmpTypePacketsRx--" +
273 igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
274 log.debug("--ReportsRxWithWrongMode--" +
275 igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
276 log.debug("--FailJoinReqInsuffPermission--" +
277 igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
278 log.debug("--FailJoinReqUnknownMulticastIp--" +
279 igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
280 log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
281 log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
282 log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
283 log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
284 log.debug("--IgmpValidChecksumCounter--" +
285 igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
286 log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
287 log.debug("--IgmpGeneralMembershipQuery--" +
288 igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
289 log.debug("--IgmpGrpSpecificMembershipQuery--" +
290 igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
291 log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
292 igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000293 }
294
295 post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
296 }
297
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300298 @Override
299 public void increaseStat(IgmpStatisticType type) {
300 igmpStats.increaseStat(type);
301 validityCheck.set(false);
302 }
303
304 @Override
305 public void resetAllStats() {
306 ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
307 clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
308 Serializer.using(statSerializer)::encode);
309 }
310
311 @Override
312 public Long getStat(IgmpStatisticType type) {
313 return igmpStats.getStat(type);
314 }
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000315}