blob: ef932ef96fd625325a9b10fd95f6aa9657d240d4 [file] [log] [blame]
Shubham Sharma47f2caf2020-02-18 12:13:40 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
developere400c582020-03-24 19:42:08 +010017package org.opencord.igmpproxy.impl;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000018
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030019import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
22import org.onosproject.store.cluster.messaging.ClusterMessage;
23import org.onosproject.store.cluster.messaging.MessageSubject;
24import org.onosproject.store.serializers.KryoNamespaces;
25import org.onosproject.store.service.EventuallyConsistentMap;
26import org.onosproject.store.service.Serializer;
27import org.onosproject.store.service.StorageService;
28import org.onosproject.store.service.WallClockTimestamp;
29import org.opencord.igmpproxy.IgmpStatisticType;
30import org.opencord.igmpproxy.IgmpStatisticsEvent;
31import org.opencord.igmpproxy.IgmpStatisticsEventListener;
32import org.opencord.igmpproxy.IgmpStatisticsService;
33import org.opencord.igmpproxy.IgmpLeadershipService;
34import org.opencord.igmpproxy.IgmpStatistics;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000035import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.ComponentContext;
37import org.osgi.service.component.annotations.Activate;
38import org.onlab.util.SafeRecurringTask;
39import org.onlab.util.Tools;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.event.AbstractListenerManager;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Modified;
44import org.osgi.service.component.annotations.Reference;
45import org.osgi.service.component.annotations.ReferenceCardinality;
46
developere400c582020-03-24 19:42:08 +010047import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030048import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
developere400c582020-03-24 19:42:08 +010049import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030050import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000051import static org.slf4j.LoggerFactory.getLogger;
52
53import java.util.Dictionary;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030054import java.util.Objects;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000055import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
57import java.util.concurrent.ScheduledFuture;
58import java.util.concurrent.TimeUnit;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030059import java.util.concurrent.atomic.AtomicBoolean;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000060
61import org.slf4j.Logger;
62
63import com.google.common.base.Strings;
64
Shubham Sharma47f2caf2020-02-18 12:13:40 +000065
Shubham Sharma47f2caf2020-02-18 12:13:40 +000066/**
Shubham Sharma47f2caf2020-02-18 12:13:40 +000067 * Process the stats collected in Igmp proxy application. Publish to kafka onos.
Shubham Sharma47f2caf2020-02-18 12:13:40 +000068 */
69@Component(immediate = true, property = {
70 STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030071 STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
Shubham Sharma47f2caf2020-02-18 12:13:40 +000072})
73public class IgmpStatisticsManager extends
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030074 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
75 implements IgmpStatisticsService {
76 private static final String IGMP_STATISTICS = "igmp-statistics";
77 private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
78
Shubham Sharma47f2caf2020-02-18 12:13:40 +000079 private final Logger log = getLogger(getClass());
80 private IgmpStatistics igmpStats;
81
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030082 private ScheduledExecutorService executorForIgmp;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000083 private ScheduledFuture<?> publisherTask;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030084 private ScheduledFuture<?> syncTask;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000085
86 protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030087 protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
88
89 private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
90
91 private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
92
93 private KryoNamespace statSerializer = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API)
95 .register(IgmpStatistics.class)
pier257da462020-06-18 12:19:30 +020096 .register(ClusterMessage.class)
97 .register(MessageSubject.class)
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030098 .build();
99
100 //Statistics values are valid or invalid
101 private AtomicBoolean validityCheck = new AtomicBoolean(false);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000102
103 @Reference(cardinality = ReferenceCardinality.MANDATORY)
104 protected ComponentConfigService cfgService;
105
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300106 @Reference(cardinality = ReferenceCardinality.MANDATORY)
107 protected StorageService storageService;
108
109 @Reference(cardinality = ReferenceCardinality.MANDATORY)
110 protected IgmpLeadershipService leadershipManager;
111
112 @Reference(cardinality = ReferenceCardinality.MANDATORY)
113 protected ClusterCommunicationService clusterCommunicationService;
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000114
115 @Activate
116 public void activate(ComponentContext context) {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300117 igmpStats = getIgmpStatsInstance();
118
119
120 statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
121 .withName(IGMP_STATISTICS)
122 .withSerializer(statSerializer)
123 .withTimestampProvider((k, v) -> new WallClockTimestamp())
124 .build();
125
126 initStats(statistics.get(leadershipManager.getLocalNodeId()));
127 syncStats();
128
129 leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000130
131 eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
132 executorForIgmp = Executors.newScheduledThreadPool(1);
133 cfgService.registerProperties(getClass());
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300134
135 clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
136 this::resetLocal, executorForIgmp);
137
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000138 modified(context);
139 log.info("IgmpStatisticsManager Activated");
140 }
141
142 @Modified
143 public void modified(ComponentContext context) {
144 Dictionary<String, Object> properties = context.getProperties();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000145 try {
146 String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
147 statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300148 Integer.parseInt(STATISTICS_GENERATION_PERIOD)
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000149 : Integer.parseInt(s.trim());
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300150 log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
151 statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
152 Integer.parseInt(STATISTICS_SYNC_PERIOD)
153 : Integer.parseInt(s.trim());
154 log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000155 } catch (NumberFormatException ne) {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300156 log.error("Unable to parse configuration parameter", ne);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000157 statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300158 statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000159 }
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300160 stopPublishTask();
161 stopSyncTask();
162
163 startPublishTask();
164 startSyncTask();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000165 }
166
167 @Deactivate
168 public void deactivate() {
169 eventDispatcher.removeSink(IgmpStatisticsEvent.class);
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300170 stopPublishTask();
171 stopSyncTask();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000172 executorForIgmp.shutdown();
173 cfgService.unregisterProperties(getClass(), false);
174 igmpStats = null;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300175 clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
176 leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000177 log.info("IgmpStatisticsManager Deactivated");
178 }
179
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300180 private IgmpStatistics getIgmpStatsInstance() {
181 if (igmpStats == null) {
182 igmpStats = new IgmpStatistics();
183 log.info("Instance of igmp-statistics created.");
184 }
185 return igmpStats;
186 }
187
188 private void syncStats() {
189 if (!validityCheck.get()) {
190 //sync with valid values
191 statistics.put(leadershipManager.getLocalNodeId(), snapshot());
192 validityCheck.set(true);
193 log.debug("Valid statistic values are put.");
194 }
195 }
196
197 private void initStats(IgmpStatistics init) {
198 if (init == null) {
199 log.warn("Igmp statistics was not created.");
200 return;
201 }
202 igmpStats.setStats(init);
203 }
204
205 private IgmpStatistics snapshot() {
206 return getIgmpStatsInstance();
207 }
208
209 private void startSyncTask() {
210 syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
211 log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
212 }
213
214 private void stopSyncTask() {
215 stopTask(syncTask);
216 log.debug("Sync task stopped.");
217 }
218
219 private void startPublishTask() {
220 publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
221 log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
222 }
223
224 private void stopPublishTask() {
225 stopTask(publisherTask);
226 log.debug("Publisher task stopped.");
227 }
228
229 private ScheduledFuture<?> startTask(Runnable r, int rate) {
230 return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
231 0, rate, TimeUnit.SECONDS);
232 }
233
234 private void stopTask(ScheduledFuture<?> task) {
235 if (task != null) {
236 task.cancel(true);
237 }
238 }
239
240 private void resetLocal(ClusterMessage message) {
241 //reset all-statistics
242 igmpStats.resetAll();
243 validityCheck.set(false);
244 }
245
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000246 /**
247 * Publishes stats.
248 */
249 private void publishStats() {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300250 // Only publish events if we are the cluster leader for Igmp-stats
251 if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
252 leadershipManager.getLocalNodeId())) {
253 log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
254 return;
255 }
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000256
257 if (log.isDebugEnabled()) {
258 log.debug("Notifying stats: {}", igmpStats);
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300259 log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
260 log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
261 log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
262 log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
263 log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
264 log.debug("--IgmpSuccessJoinRejoinReq--" +
265 igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
266 log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
267 log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
268 log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
269 log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
270 log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
271 log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
272 log.debug("--TotalMsgReceived-- " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
273 log.debug("--UnknownIgmpTypePacketsRx--" +
274 igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
275 log.debug("--ReportsRxWithWrongMode--" +
276 igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
277 log.debug("--FailJoinReqInsuffPermission--" +
278 igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
279 log.debug("--FailJoinReqUnknownMulticastIp--" +
280 igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
281 log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
282 log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
283 log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
284 log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
285 log.debug("--IgmpValidChecksumCounter--" +
286 igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
287 log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
288 log.debug("--IgmpGeneralMembershipQuery--" +
289 igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
290 log.debug("--IgmpGrpSpecificMembershipQuery--" +
291 igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
292 log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
293 igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000294 }
295
296 post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
297 }
298
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300299 @Override
300 public void increaseStat(IgmpStatisticType type) {
301 igmpStats.increaseStat(type);
302 validityCheck.set(false);
303 }
304
305 @Override
306 public void resetAllStats() {
307 ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
308 clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
309 Serializer.using(statSerializer)::encode);
310 }
311
312 @Override
313 public Long getStat(IgmpStatisticType type) {
314 return igmpStats.getStat(type);
315 }
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000316}