blob: 69ba6b6de879ba7c31c1edd0ce621faffcee1142 [file] [log] [blame]
Shubham Sharma47f2caf2020-02-18 12:13:40 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
developere400c582020-03-24 19:42:08 +010017package org.opencord.igmpproxy.impl;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000018
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030019import org.onlab.util.KryoNamespace;
20import org.onosproject.cluster.NodeId;
21import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
22import org.onosproject.store.cluster.messaging.ClusterMessage;
23import org.onosproject.store.cluster.messaging.MessageSubject;
24import org.onosproject.store.serializers.KryoNamespaces;
25import org.onosproject.store.service.EventuallyConsistentMap;
26import org.onosproject.store.service.Serializer;
27import org.onosproject.store.service.StorageService;
28import org.onosproject.store.service.WallClockTimestamp;
29import org.opencord.igmpproxy.IgmpStatisticType;
30import org.opencord.igmpproxy.IgmpStatisticsEvent;
31import org.opencord.igmpproxy.IgmpStatisticsEventListener;
32import org.opencord.igmpproxy.IgmpStatisticsService;
33import org.opencord.igmpproxy.IgmpLeadershipService;
34import org.opencord.igmpproxy.IgmpStatistics;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000035import org.osgi.service.component.annotations.Component;
36import org.osgi.service.component.ComponentContext;
37import org.osgi.service.component.annotations.Activate;
38import org.onlab.util.SafeRecurringTask;
39import org.onlab.util.Tools;
40import org.onosproject.cfg.ComponentConfigService;
41import org.onosproject.event.AbstractListenerManager;
42import org.osgi.service.component.annotations.Deactivate;
43import org.osgi.service.component.annotations.Modified;
44import org.osgi.service.component.annotations.Reference;
45import org.osgi.service.component.annotations.ReferenceCardinality;
46
developere400c582020-03-24 19:42:08 +010047import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030048import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
developere400c582020-03-24 19:42:08 +010049import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030050import static org.opencord.igmpproxy.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000051import static org.slf4j.LoggerFactory.getLogger;
52
53import java.util.Dictionary;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030054import java.util.Objects;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000055import java.util.concurrent.Executors;
56import java.util.concurrent.ScheduledExecutorService;
57import java.util.concurrent.ScheduledFuture;
58import java.util.concurrent.TimeUnit;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030059import java.util.concurrent.atomic.AtomicBoolean;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000060
61import org.slf4j.Logger;
62
63import com.google.common.base.Strings;
64
Shubham Sharma47f2caf2020-02-18 12:13:40 +000065
Shubham Sharma47f2caf2020-02-18 12:13:40 +000066/**
Shubham Sharma47f2caf2020-02-18 12:13:40 +000067 * Process the stats collected in Igmp proxy application. Publish to kafka onos.
Shubham Sharma47f2caf2020-02-18 12:13:40 +000068 */
69@Component(immediate = true, property = {
70 STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030071 STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
Shubham Sharma47f2caf2020-02-18 12:13:40 +000072})
73public class IgmpStatisticsManager extends
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030074 AbstractListenerManager<IgmpStatisticsEvent, IgmpStatisticsEventListener>
75 implements IgmpStatisticsService {
76 private static final String IGMP_STATISTICS = "igmp-statistics";
77 private static final String IGMP_STATISTICS_LEADERSHIP = "igmp-statistics";
78
Shubham Sharma47f2caf2020-02-18 12:13:40 +000079 private final Logger log = getLogger(getClass());
80 private IgmpStatistics igmpStats;
81
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030082 private ScheduledExecutorService executorForIgmp;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000083 private ScheduledFuture<?> publisherTask;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030084 private ScheduledFuture<?> syncTask;
Shubham Sharma47f2caf2020-02-18 12:13:40 +000085
86 protected int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +030087 protected int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
88
89 private EventuallyConsistentMap<NodeId, IgmpStatistics> statistics;
90
91 private static final MessageSubject RESET_SUBJECT = new MessageSubject("igmp-statistics-reset");
92
93 private KryoNamespace statSerializer = KryoNamespace.newBuilder()
94 .register(KryoNamespaces.API)
95 .register(IgmpStatistics.class)
96 .build();
97
98 //Statistics values are valid or invalid
99 private AtomicBoolean validityCheck = new AtomicBoolean(false);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000100
101 @Reference(cardinality = ReferenceCardinality.MANDATORY)
102 protected ComponentConfigService cfgService;
103
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300104 @Reference(cardinality = ReferenceCardinality.MANDATORY)
105 protected StorageService storageService;
106
107 @Reference(cardinality = ReferenceCardinality.MANDATORY)
108 protected IgmpLeadershipService leadershipManager;
109
110 @Reference(cardinality = ReferenceCardinality.MANDATORY)
111 protected ClusterCommunicationService clusterCommunicationService;
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000112
113 @Activate
114 public void activate(ComponentContext context) {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300115 igmpStats = getIgmpStatsInstance();
116
117
118 statistics = storageService.<NodeId, IgmpStatistics>eventuallyConsistentMapBuilder()
119 .withName(IGMP_STATISTICS)
120 .withSerializer(statSerializer)
121 .withTimestampProvider((k, v) -> new WallClockTimestamp())
122 .build();
123
124 initStats(statistics.get(leadershipManager.getLocalNodeId()));
125 syncStats();
126
127 leadershipManager.runForLeadership(IGMP_STATISTICS_LEADERSHIP);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000128
129 eventDispatcher.addSink(IgmpStatisticsEvent.class, listenerRegistry);
130 executorForIgmp = Executors.newScheduledThreadPool(1);
131 cfgService.registerProperties(getClass());
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300132
133 clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(statSerializer)::decode,
134 this::resetLocal, executorForIgmp);
135
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000136 modified(context);
137 log.info("IgmpStatisticsManager Activated");
138 }
139
140 @Modified
141 public void modified(ComponentContext context) {
142 Dictionary<String, Object> properties = context.getProperties();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000143 try {
144 String s = Tools.get(properties, STATISTICS_GENERATION_PERIOD);
145 statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ?
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300146 Integer.parseInt(STATISTICS_GENERATION_PERIOD)
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000147 : Integer.parseInt(s.trim());
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300148 log.debug("statisticsGenerationPeriodInSeconds: {}", statisticsGenerationPeriodInSeconds);
149 statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ?
150 Integer.parseInt(STATISTICS_SYNC_PERIOD)
151 : Integer.parseInt(s.trim());
152 log.debug("statisticsSyncPeriodInSeconds: {}", statisticsSyncPeriodInSeconds);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000153 } catch (NumberFormatException ne) {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300154 log.error("Unable to parse configuration parameter", ne);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000155 statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300156 statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000157 }
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300158 stopPublishTask();
159 stopSyncTask();
160
161 startPublishTask();
162 startSyncTask();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000163 }
164
165 @Deactivate
166 public void deactivate() {
167 eventDispatcher.removeSink(IgmpStatisticsEvent.class);
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300168 stopPublishTask();
169 stopSyncTask();
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000170 executorForIgmp.shutdown();
171 cfgService.unregisterProperties(getClass(), false);
172 igmpStats = null;
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300173 clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
174 leadershipManager.withdraw(IGMP_STATISTICS_LEADERSHIP);
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000175 log.info("IgmpStatisticsManager Deactivated");
176 }
177
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300178 private IgmpStatistics getIgmpStatsInstance() {
179 if (igmpStats == null) {
180 igmpStats = new IgmpStatistics();
181 log.info("Instance of igmp-statistics created.");
182 }
183 return igmpStats;
184 }
185
186 private void syncStats() {
187 if (!validityCheck.get()) {
188 //sync with valid values
189 statistics.put(leadershipManager.getLocalNodeId(), snapshot());
190 validityCheck.set(true);
191 log.debug("Valid statistic values are put.");
192 }
193 }
194
195 private void initStats(IgmpStatistics init) {
196 if (init == null) {
197 log.warn("Igmp statistics was not created.");
198 return;
199 }
200 igmpStats.setStats(init);
201 }
202
203 private IgmpStatistics snapshot() {
204 return getIgmpStatsInstance();
205 }
206
207 private void startSyncTask() {
208 syncTask = startTask(this::syncStats, statisticsSyncPeriodInSeconds);
209 log.debug("Sync task started. period in seconds: {}", statisticsSyncPeriodInSeconds);
210 }
211
212 private void stopSyncTask() {
213 stopTask(syncTask);
214 log.debug("Sync task stopped.");
215 }
216
217 private void startPublishTask() {
218 publisherTask = startTask(this::publishStats, statisticsGenerationPeriodInSeconds);
219 log.debug("Publisher task started. period in seconds: {}", statisticsGenerationPeriodInSeconds);
220 }
221
222 private void stopPublishTask() {
223 stopTask(publisherTask);
224 log.debug("Publisher task stopped.");
225 }
226
227 private ScheduledFuture<?> startTask(Runnable r, int rate) {
228 return executorForIgmp.scheduleAtFixedRate(SafeRecurringTask.wrap(r),
229 0, rate, TimeUnit.SECONDS);
230 }
231
232 private void stopTask(ScheduledFuture<?> task) {
233 if (task != null) {
234 task.cancel(true);
235 }
236 }
237
238 private void resetLocal(ClusterMessage message) {
239 //reset all-statistics
240 igmpStats.resetAll();
241 validityCheck.set(false);
242 }
243
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000244 /**
245 * Publishes stats.
246 */
247 private void publishStats() {
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300248 // Only publish events if we are the cluster leader for Igmp-stats
249 if (!Objects.equals(leadershipManager.getLeader(IGMP_STATISTICS_LEADERSHIP),
250 leadershipManager.getLocalNodeId())) {
251 log.debug("This is not leader of : {}", IGMP_STATISTICS_LEADERSHIP);
252 return;
253 }
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000254
255 if (log.isDebugEnabled()) {
256 log.debug("Notifying stats: {}", igmpStats);
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300257 log.debug("--IgmpDisconnect--" + igmpStats.getStat(IgmpStatisticType.IGMP_DISCONNECT));
258 log.debug("--IgmpFailJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_FAIL_JOIN_REQ));
259 log.debug("--IgmpJoinReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_JOIN_REQ));
260 log.debug("--IgmpLeaveReq--" + igmpStats.getStat(IgmpStatisticType.IGMP_LEAVE_REQ));
261 log.debug("--IgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.IGMP_MSG_RECEIVED));
262 log.debug("--IgmpSuccessJoinRejoinReq--" +
263 igmpStats.getStat(IgmpStatisticType.IGMP_SUCCESS_JOIN_RE_JOIN_REQ));
264 log.debug("--Igmpv1MemershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V1_MEMBERSHIP_REPORT));
265 log.debug("--Igmpv2LeaveGroup--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_LEAVE_GROUP));
266 log.debug("--Igmpv2MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V2_MEMBERSHIP_REPORT));
267 log.debug("--Igmpv3MembershipQuery--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_QUERY));
268 log.debug("--Igmpv3MembershipReport--" + igmpStats.getStat(IgmpStatisticType.IGMP_V3_MEMBERSHIP_REPORT));
269 log.debug("--InvalidIgmpMsgReceived--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_MSG_RECEIVED));
270 log.debug("--TotalMsgReceived-- " + igmpStats.getStat(IgmpStatisticType.TOTAL_MSG_RECEIVED));
271 log.debug("--UnknownIgmpTypePacketsRx--" +
272 igmpStats.getStat(IgmpStatisticType.UNKNOWN_IGMP_TYPE_PACKETS_RX_COUNTER));
273 log.debug("--ReportsRxWithWrongMode--" +
274 igmpStats.getStat(IgmpStatisticType.REPORTS_RX_WITH_WRONG_MODE_COUNTER));
275 log.debug("--FailJoinReqInsuffPermission--" +
276 igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_INSUFF_PERMISSION_ACCESS_COUNTER));
277 log.debug("--FailJoinReqUnknownMulticastIp--" +
278 igmpStats.getStat(IgmpStatisticType.FAIL_JOIN_REQ_UNKNOWN_MULTICAST_IP_COUNTER));
279 log.debug("--UnconfiguredGroupCounter--" + igmpStats.getStat(IgmpStatisticType.UNCONFIGURED_GROUP_COUNTER));
280 log.debug("--ValidIgmpPacketCounter--" + igmpStats.getStat(IgmpStatisticType.VALID_IGMP_PACKET_COUNTER));
281 log.debug("--IgmpChannelJoinCounter--" + igmpStats.getStat(IgmpStatisticType.IGMP_CHANNEL_JOIN_COUNTER));
282 log.debug("--CurrentGrpNumCounter--" + igmpStats.getStat(IgmpStatisticType.CURRENT_GRP_NUMBER_COUNTER));
283 log.debug("--IgmpValidChecksumCounter--" +
284 igmpStats.getStat(IgmpStatisticType.IGMP_VALID_CHECKSUM_COUNTER));
285 log.debug("--InvalidIgmpLength--" + igmpStats.getStat(IgmpStatisticType.INVALID_IGMP_LENGTH));
286 log.debug("--IgmpGeneralMembershipQuery--" +
287 igmpStats.getStat(IgmpStatisticType.IGMP_GENERAL_MEMBERSHIP_QUERY));
288 log.debug("--IgmpGrpSpecificMembershipQuery--" +
289 igmpStats.getStat(IgmpStatisticType.IGMP_GRP_SPECIFIC_MEMBERSHIP_QUERY));
290 log.debug("--IgmpGrpAndSrcSpecificMembershipQuery--" +
291 igmpStats.getStat(IgmpStatisticType.IGMP_GRP_AND_SRC_SPESIFIC_MEMBERSHIP_QUERY));
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000292 }
293
294 post(new IgmpStatisticsEvent(IgmpStatisticsEvent.Type.STATS_UPDATE, igmpStats));
295 }
296
Ilayda Ozdemir0872abd2020-06-03 20:20:20 +0300297 @Override
298 public void increaseStat(IgmpStatisticType type) {
299 igmpStats.increaseStat(type);
300 validityCheck.set(false);
301 }
302
303 @Override
304 public void resetAllStats() {
305 ClusterMessage reset = new ClusterMessage(leadershipManager.getLocalNodeId(), RESET_SUBJECT, new byte[]{});
306 clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT,
307 Serializer.using(statSerializer)::encode);
308 }
309
310 @Override
311 public Long getStat(IgmpStatisticType type) {
312 return igmpStats.getStat(type);
313 }
Shubham Sharma47f2caf2020-02-18 12:13:40 +0000314}