blob: 8bb2f53e38dd65abd00f00bc6284f181562b9c10 [file] [log] [blame]
kartikey dubeye1545422019-05-22 12:53:45 +00001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.aaa.impl;
18
Jonathan Hartc41227c2020-01-28 16:56:49 -080019import com.google.common.base.Strings;
20import org.onlab.util.KryoNamespace;
21import org.onlab.util.SafeRecurringTask;
22import org.onlab.util.Tools;
23import org.onosproject.cluster.ClusterService;
24import org.onosproject.cluster.LeadershipService;
25import org.onosproject.cluster.NodeId;
kartikey dubeye1545422019-05-22 12:53:45 +000026import org.onosproject.event.AbstractListenerManager;
Jonathan Hartc41227c2020-01-28 16:56:49 -080027import org.onosproject.store.cluster.messaging.ClusterCommunicationService;
28import org.onosproject.store.cluster.messaging.ClusterMessage;
29import org.onosproject.store.cluster.messaging.MessageSubject;
30import org.onosproject.store.serializers.KryoNamespaces;
31import org.onosproject.store.service.EventuallyConsistentMap;
32import org.onosproject.store.service.Serializer;
33import org.onosproject.store.service.StorageService;
34import org.onosproject.store.service.WallClockTimestamp;
kartikey dubeye1545422019-05-22 12:53:45 +000035import org.opencord.aaa.AaaStatistics;
Jonathan Hartc41227c2020-01-28 16:56:49 -080036import org.opencord.aaa.AaaStatisticsSnapshot;
kartikey dubeye1545422019-05-22 12:53:45 +000037import org.opencord.aaa.AuthenticationStatisticsEvent;
38import org.opencord.aaa.AuthenticationStatisticsEventListener;
39import org.opencord.aaa.AuthenticationStatisticsService;
Jonathan Hartc41227c2020-01-28 16:56:49 -080040import org.osgi.service.component.ComponentContext;
Carmelo Cascone58b53292019-09-30 12:35:31 -070041import org.osgi.service.component.annotations.Activate;
Jonathan Hart612651f2019-11-25 09:21:43 -080042import org.osgi.service.component.annotations.Component;
Carmelo Cascone58b53292019-09-30 12:35:31 -070043import org.osgi.service.component.annotations.Deactivate;
Jonathan Hartc41227c2020-01-28 16:56:49 -080044import org.osgi.service.component.annotations.Modified;
45import org.osgi.service.component.annotations.Reference;
46import org.osgi.service.component.annotations.ReferenceCardinality;
kartikey dubeye1545422019-05-22 12:53:45 +000047import org.slf4j.Logger;
48
Jonathan Hartc41227c2020-01-28 16:56:49 -080049import java.util.Dictionary;
Jonathan Hart612651f2019-11-25 09:21:43 -080050import java.util.HashMap;
51import java.util.Map;
Jonathan Hartc41227c2020-01-28 16:56:49 -080052import java.util.Objects;
53import java.util.concurrent.Executors;
54import java.util.concurrent.ScheduledExecutorService;
55import java.util.concurrent.ScheduledFuture;
56import java.util.concurrent.TimeUnit;
Jonathan Hart612651f2019-11-25 09:21:43 -080057import java.util.concurrent.atomic.AtomicLong;
58
Jonathan Hartc41227c2020-01-28 16:56:49 -080059import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD;
60import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_GENERATION_PERIOD_DEFAULT;
61import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD;
62import static org.opencord.aaa.impl.OsgiPropertyConstants.STATISTICS_SYNC_PERIOD_DEFAULT;
Jonathan Hart612651f2019-11-25 09:21:43 -080063import static org.slf4j.LoggerFactory.getLogger;
64
Jonathan Hartc41227c2020-01-28 16:56:49 -080065/**
66 * Manages collection and publishing of statistics for the AAA application.
67 */
68@Component(immediate = true, property = {
69 STATISTICS_GENERATION_PERIOD + ":Integer=" + STATISTICS_GENERATION_PERIOD_DEFAULT,
70 STATISTICS_SYNC_PERIOD + ":Integer=" + STATISTICS_SYNC_PERIOD_DEFAULT,
71})
kartikey dubeye1545422019-05-22 12:53:45 +000072public class AaaStatisticsManager
Jonathan Hartc41227c2020-01-28 16:56:49 -080073 extends AbstractListenerManager<AuthenticationStatisticsEvent, AuthenticationStatisticsEventListener>
74 implements AuthenticationStatisticsService {
kartikey dubeye1545422019-05-22 12:53:45 +000075
Jonathan Hartc41227c2020-01-28 16:56:49 -080076 private static final String AAA_STATISTICS_LEADERSHIP = "aaa-statistics";
kartikey dubeye1545422019-05-22 12:53:45 +000077
Jonathan Hartc41227c2020-01-28 16:56:49 -080078 private static final MessageSubject RESET_SUBJECT = new MessageSubject("aaa-statistics-reset");
79
80 private int statisticsGenerationPeriodInSeconds = STATISTICS_GENERATION_PERIOD_DEFAULT;
81 private int statisticsSyncPeriodInSeconds = STATISTICS_SYNC_PERIOD_DEFAULT;
82
83 @Reference(cardinality = ReferenceCardinality.MANDATORY)
84 protected StorageService storageService;
85
86 @Reference(cardinality = ReferenceCardinality.MANDATORY)
87 protected ClusterService clusterService;
88
89 @Reference(cardinality = ReferenceCardinality.MANDATORY)
90 protected LeadershipService leadershipService;
91
92 @Reference(cardinality = ReferenceCardinality.MANDATORY)
93 protected ClusterCommunicationService clusterCommunicationService;
94
95 private ScheduledExecutorService executor;
96
97 private ScheduledFuture<?> publisherTask;
98 private ScheduledFuture<?> syncTask;
99
100 private EventuallyConsistentMap<NodeId, AaaStatisticsSnapshot> statistics;
kartikey dubeye1545422019-05-22 12:53:45 +0000101
102 private final Logger log = getLogger(getClass());
103 private AaaStatistics aaaStats;
Jonathan Hartc41227c2020-01-28 16:56:49 -0800104 private Map<Byte, Long> outgoingPacketMap = new HashMap<>();
kartikey dubeye1545422019-05-22 12:53:45 +0000105 private static final int PACKET_COUNT_FOR_AVERAGE_RTT_CALCULATION = 5;
106
Jonathan Hartc41227c2020-01-28 16:56:49 -0800107 KryoNamespace serializer = KryoNamespace.newBuilder()
108 .register(KryoNamespaces.API)
109 .register(AaaStatisticsSnapshot.class)
110 .register(ClusterMessage.class)
111 .register(MessageSubject.class)
112 .build();
113
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000114 @Override
kartikey dubeye1545422019-05-22 12:53:45 +0000115 public AaaStatistics getAaaStats() {
116 return aaaStats;
117 }
118
Jonathan Hartc41227c2020-01-28 16:56:49 -0800119 @Override
120 public AaaStatisticsSnapshot getClusterStatistics() {
121 return aggregate();
122 }
123
kartikey dubeye1545422019-05-22 12:53:45 +0000124 @Activate
Jonathan Hartc41227c2020-01-28 16:56:49 -0800125 public void activate(ComponentContext context) {
kartikey dubeye1545422019-05-22 12:53:45 +0000126 log.info("Activate aaaStatisticsManager");
Jonathan Hartc41227c2020-01-28 16:56:49 -0800127 modified(context);
128
129 statistics = storageService.<NodeId, AaaStatisticsSnapshot>eventuallyConsistentMapBuilder()
130 .withName("aaa-statistics")
131 .withSerializer(serializer)
132 .withTimestampProvider((k, v) -> new WallClockTimestamp())
133 .build();
134
135 AaaStatisticsSnapshot snapshot = statistics.get(clusterService.getLocalNode().id());
136 if (snapshot == null) {
137 aaaStats = new AaaStatistics();
138 } else {
139 aaaStats = AaaStatistics.fromSnapshot(snapshot);
140 }
141
142 leadershipService.runForLeadership(AAA_STATISTICS_LEADERSHIP);
143
kartikey dubeye1545422019-05-22 12:53:45 +0000144 eventDispatcher.addSink(AuthenticationStatisticsEvent.class, listenerRegistry);
Jonathan Hartc41227c2020-01-28 16:56:49 -0800145
146 executor = Executors.newScheduledThreadPool(1);
147
148 clusterCommunicationService.addSubscriber(RESET_SUBJECT, Serializer.using(serializer)::decode,
149 this::resetLocal, executor);
150
151 syncTask = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::syncStats),
152 0, statisticsSyncPeriodInSeconds, TimeUnit.SECONDS);
153
154 publisherTask = executor.scheduleAtFixedRate(SafeRecurringTask.wrap(this::publishStats),
155 0, statisticsGenerationPeriodInSeconds, TimeUnit.SECONDS);
kartikey dubeye1545422019-05-22 12:53:45 +0000156 }
157
158 @Deactivate
159 public void deactivate() {
Jonathan Hartc41227c2020-01-28 16:56:49 -0800160 clusterCommunicationService.removeSubscriber(RESET_SUBJECT);
161
162 publisherTask.cancel(true);
163 syncTask.cancel(true);
164 executor.shutdownNow();
165
166 leadershipService.withdraw(AAA_STATISTICS_LEADERSHIP);
167
kartikey dubeye1545422019-05-22 12:53:45 +0000168 eventDispatcher.removeSink(AuthenticationStatisticsEvent.class);
169 }
170
Jonathan Hartc41227c2020-01-28 16:56:49 -0800171 @Modified
172 public void modified(ComponentContext context) {
173 Dictionary<String, Object> properties = context.getProperties();
174
175 String s = Tools.get(properties, "statisticsGenerationPeriodInSeconds");
176 statisticsGenerationPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_GENERATION_PERIOD_DEFAULT
177 : Integer.parseInt(s.trim());
178
179 s = Tools.get(properties, "statisticsSyncPeriodInSeconds");
180 statisticsSyncPeriodInSeconds = Strings.isNullOrEmpty(s) ? STATISTICS_SYNC_PERIOD_DEFAULT
181 : Integer.parseInt(s.trim());
182 }
183
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000184 @Override
kartikey dubeye1545422019-05-22 12:53:45 +0000185 public void handleRoundtripTime(byte inPacketIdentifier) {
186 long inTimeInMilis = System.currentTimeMillis();
187 if (outgoingPacketMap.containsKey(inPacketIdentifier)) {
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000188 if (aaaStats.getPacketRoundTripTimeListSize() > PACKET_COUNT_FOR_AVERAGE_RTT_CALCULATION) {
189 aaaStats.getPacketRoundTripTimeListRemoveFirst();
kartikey dubeye1545422019-05-22 12:53:45 +0000190 }
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000191 aaaStats.getPacketRoundTripTimeListAdd(inTimeInMilis - outgoingPacketMap.get(inPacketIdentifier));
kartikey dubeye1545422019-05-22 12:53:45 +0000192 }
193 }
194
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000195 @Override
196 public void resetAllCounters() {
Jonathan Hartc41227c2020-01-28 16:56:49 -0800197 ClusterMessage reset = new ClusterMessage(clusterService.getLocalNode().id(), RESET_SUBJECT, new byte[]{});
198 clusterCommunicationService.broadcastIncludeSelf(reset, RESET_SUBJECT, Serializer.using(serializer)::encode);
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000199 }
200
201 @Override
kartikey dubeye1545422019-05-22 12:53:45 +0000202 public void calculatePacketRoundtripTime() {
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000203 if (aaaStats.getPacketRoundTripTimeListSize() > 0) {
204 long avg = (long) aaaStats.getPacketRoundTripTimeList().stream().mapToLong(i -> i).average().getAsDouble();
kartikey dubeye1545422019-05-22 12:53:45 +0000205 aaaStats.setRequestRttMilis(new AtomicLong(avg));
206 }
207 }
208
Vijaykumar Kushwahaa54ce552019-06-18 09:37:42 +0000209 @Override
kartikey dubeye1545422019-05-22 12:53:45 +0000210 public void putOutgoingIdentifierToMap(byte outPacketIdentifier) {
211 outgoingPacketMap.put(outPacketIdentifier, System.currentTimeMillis());
212 }
213
214 /**
Jonathan Hartc41227c2020-01-28 16:56:49 -0800215 * Pushes in-memory stats into the eventually-consistent map for cluster-wide retention.
kartikey dubeye1545422019-05-22 12:53:45 +0000216 */
Jonathan Hartc41227c2020-01-28 16:56:49 -0800217 private void syncStats() {
218 calculatePacketRoundtripTime();
219
220 statistics.put(clusterService.getLocalNode().id(), aaaStats.snapshot());
221 }
222
223 /**
224 * Aggregates cluster-wise stats from the ec-map.
225 *
226 * @return aggregate stats
227 */
228 private AaaStatisticsSnapshot aggregate() {
229 return statistics.values().stream()
230 .reduce(new AaaStatisticsSnapshot(), AaaStatisticsSnapshot::add);
231 }
232
233 /**
234 * Publishes cluster-wide stats.
235 */
236 private void publishStats() {
237 // only publish if we are the leader
238 if (!Objects.equals(leadershipService.getLeader(AAA_STATISTICS_LEADERSHIP),
239 clusterService.getLocalNode().id())) {
240 return;
kartikey dubeye1545422019-05-22 12:53:45 +0000241 }
Jonathan Hartc41227c2020-01-28 16:56:49 -0800242
243 AaaStatisticsSnapshot clusterStats = aggregate();
244
245 if (log.isDebugEnabled()) {
246 log.debug("Notifying stats: {}", clusterStats);
247 }
248
249 post(new AuthenticationStatisticsEvent(AuthenticationStatisticsEvent.Type.STATS_UPDATE,
250 AaaStatistics.fromSnapshot(clusterStats)));
251 }
252
253 private void resetLocal(ClusterMessage m) {
254 aaaStats.resetAllCounters();
255 syncStats();
kartikey dubeye1545422019-05-22 12:53:45 +0000256 }
257}