blob: f4efb0c17836b5520b1e3bd8dfa22ed75b19ffeb [file] [log] [blame]
Jonathan Hart501f7882018-07-24 14:39:57 -07001/*
2 * Copyright 2018-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17package org.opencord.kafka.integrations;
Jonathan Hart501f7882018-07-24 14:39:57 -070018import com.fasterxml.jackson.databind.JsonNode;
19import com.fasterxml.jackson.databind.ObjectMapper;
20import com.fasterxml.jackson.databind.node.ObjectNode;
21import org.apache.felix.scr.annotations.Activate;
22import org.apache.felix.scr.annotations.Component;
23import org.apache.felix.scr.annotations.Deactivate;
24import org.apache.felix.scr.annotations.Reference;
25import org.apache.felix.scr.annotations.ReferenceCardinality;
Matteo Scandolod50a4d32019-04-24 12:10:21 -070026import org.apache.felix.scr.annotations.ReferencePolicy;
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070027import org.onosproject.net.AnnotationKeys;
28import org.onosproject.net.device.DeviceService;
Jonathan Hart501f7882018-07-24 14:39:57 -070029import org.opencord.aaa.AuthenticationEvent;
30import org.opencord.aaa.AuthenticationEventListener;
31import org.opencord.aaa.AuthenticationService;
32import org.opencord.kafka.EventBusService;
kartikey dubey6e903092019-05-22 13:35:28 +000033import org.opencord.aaa.AuthenticationStatisticsEvent;
34import org.opencord.aaa.AuthenticationStatisticsEventListener;
35import org.opencord.aaa.AuthenticationStatisticsService;
Jonathan Hart501f7882018-07-24 14:39:57 -070036import org.slf4j.Logger;
37import org.slf4j.LoggerFactory;
38
Jonathan Hart2aad7792018-07-31 15:09:17 -040039import java.time.Instant;
40
Jonathan Hart501f7882018-07-24 14:39:57 -070041/**
42 * Listens for AAA events and pushes them on a Kafka bus.
43 */
44@Component(immediate = true)
45public class AaaKafkaIntegration {
46
47 public Logger log = LoggerFactory.getLogger(getClass());
48
49 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
50 protected EventBusService eventBusService;
51
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070052 @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
53 protected DeviceService deviceService;
54
Matteo Scandolo03f13c12019-03-20 14:38:12 -070055 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
Matteo Scandolod50a4d32019-04-24 12:10:21 -070056 policy = ReferencePolicy.DYNAMIC,
Matteo Scandolo03f13c12019-03-20 14:38:12 -070057 bind = "bindAuthenticationService",
58 unbind = "unbindAuthenticationService")
Jonathan Hart501f7882018-07-24 14:39:57 -070059 protected AuthenticationService authenticationService;
kartikey dubey6e903092019-05-22 13:35:28 +000060 @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
61 policy = ReferencePolicy.DYNAMIC,
62 bind = "bindAuthenticationStatService",
63 unbind = "unbindAuthenticationStatService")
64 protected AuthenticationStatisticsService authenticationStatisticsService;
Jonathan Hart501f7882018-07-24 14:39:57 -070065
66 private final AuthenticationEventListener listener = new InternalAuthenticationListener();
kartikey dubey6e903092019-05-22 13:35:28 +000067 private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
68 new InternalAuthenticationStatisticsListner();
Jonathan Hart501f7882018-07-24 14:39:57 -070069
kartikey dubey6e903092019-05-22 13:35:28 +000070 // topics
Jonathan Hart501f7882018-07-24 14:39:57 -070071 private static final String TOPIC = "authentication.events";
kartikey dubey6e903092019-05-22 13:35:28 +000072 private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
Jonathan Hart501f7882018-07-24 14:39:57 -070073
kartikey dubey6e903092019-05-22 13:35:28 +000074 // auth event params
Jonathan Hart2aad7792018-07-31 15:09:17 -040075 private static final String TIMESTAMP = "timestamp";
Jonathan Hartf54e5ba2018-07-31 14:57:22 -040076 private static final String DEVICE_ID = "deviceId";
77 private static final String PORT_NUMBER = "portNumber";
Matteo Scandolo580fb7f2019-04-17 15:33:15 -070078 private static final String SERIAL_NUMBER = "serialNumber";
Jonathan Hartf54e5ba2018-07-31 14:57:22 -040079 private static final String AUTHENTICATION_STATE = "authenticationState";
Jonathan Hart501f7882018-07-24 14:39:57 -070080
kartikey dubey6e903092019-05-22 13:35:28 +000081 // auth stats event params
82 private static final String ACCEPT_RESPONSES_RX = "acceptResponsesRx";
83 private static final String REJECT_RESPONSES_RX = "rejectResponsesRx";
84 private static final String CHALLENGE_RESPONSES_RX = "challengeResponsesRx";
85 private static final String ACCESS_REQUESTS_TX = "accessRequestsTx";
86 private static final String INVALID_VALIDATORS_RX = "invalidValidatorsRx";
87 private static final String UNKNOWN_TYPE_RX = "unknownTypeRx";
88 private static final String PENDING_REQUESTS = "pendingRequests";
89 private static final String DROPPED_RESPONSES_RX = "droppedResponsesRx";
90 private static final String MALFORMED_RESPONSES_RX = "malformedResponsesRx";
91 private static final String UNKNOWN_SERVER_RX = "unknownServerRx";
92 private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
93 private static final String REQUEST_RE_TX = "requestReTx";
94
Matteo Scandolo03f13c12019-03-20 14:38:12 -070095 protected void bindAuthenticationService(AuthenticationService authenticationService) {
Matteo Scandolod50a4d32019-04-24 12:10:21 -070096 log.info("bindAuthenticationService");
Matteo Scandolo03f13c12019-03-20 14:38:12 -070097 if (this.authenticationService == null) {
98 log.info("Binding AuthenticationService");
99 this.authenticationService = authenticationService;
100 log.info("Adding listener on AuthenticationService");
101 authenticationService.addListener(listener);
102 } else {
103 log.warn("Trying to bind AuthenticationService but it is already bound");
104 }
105 }
106
107 protected void unbindAuthenticationService(AuthenticationService authenticationService) {
Matteo Scandolod50a4d32019-04-24 12:10:21 -0700108 log.info("unbindAuthenticationService");
Matteo Scandolo03f13c12019-03-20 14:38:12 -0700109 if (this.authenticationService == authenticationService) {
110 log.info("Unbinding AuthenticationService");
111 this.authenticationService = null;
112 log.info("Removing listener on AuthenticationService");
113 authenticationService.removeListener(listener);
114 } else {
115 log.warn("Trying to unbind AuthenticationService but it is already unbound");
116 }
117 }
118
kartikey dubey6e903092019-05-22 13:35:28 +0000119 protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
120 log.info("bindAuthenticationStatService");
121 if (this.authenticationStatisticsService == null) {
122 log.info("Binding AuthenticationStastService");
123 this.authenticationStatisticsService = authenticationStatisticsService;
124 log.info("Adding listener on AuthenticationStatService");
125 authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
126 } else {
127 log.warn("Trying to bind AuthenticationStatService but it is already bound");
128 }
129 }
130
131 protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
132 log.info("unbindAuthenticationStatService");
133 if (this.authenticationStatisticsService == authenticationStatisticsService) {
134 log.info("Unbinding AuthenticationStatService");
135 this.authenticationStatisticsService = null;
136 log.info("Removing listener on AuthenticationStatService");
137 authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
138 } else {
139 log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
140 }
141 }
142
Jonathan Hart501f7882018-07-24 14:39:57 -0700143 @Activate
144 public void activate() {
Matteo Scandolod50a4d32019-04-24 12:10:21 -0700145 log.info("Started AaaKafkaIntegration");
Jonathan Hart501f7882018-07-24 14:39:57 -0700146 }
147
148 @Deactivate
149 public void deactivate() {
Matteo Scandolod50a4d32019-04-24 12:10:21 -0700150 log.info("Stopped AaaKafkaIntegration");
Jonathan Hart501f7882018-07-24 14:39:57 -0700151 }
152
153 private void handle(AuthenticationEvent event) {
154 eventBusService.send(TOPIC, serialize(event));
155 }
156
kartikey dubey6e903092019-05-22 13:35:28 +0000157 private void handleStat(AuthenticationStatisticsEvent event) {
158 eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
159 log.info("AuthenticationStatisticsEvent sent successfully");
160 }
Matteo Scandolo580fb7f2019-04-17 15:33:15 -0700161
kartikey dubey6e903092019-05-22 13:35:28 +0000162 private JsonNode serialize(AuthenticationEvent event) {
Matteo Scandolo580fb7f2019-04-17 15:33:15 -0700163 String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
164
Jonathan Hart501f7882018-07-24 14:39:57 -0700165 ObjectMapper mapper = new ObjectMapper();
166 ObjectNode authEvent = mapper.createObjectNode();
Jonathan Hart2aad7792018-07-31 15:09:17 -0400167 authEvent.put(TIMESTAMP, Instant.now().toString());
Jonathan Hart501f7882018-07-24 14:39:57 -0700168 authEvent.put(DEVICE_ID, event.subject().deviceId().toString());
169 authEvent.put(PORT_NUMBER, event.subject().port().toString());
Matteo Scandolo580fb7f2019-04-17 15:33:15 -0700170 authEvent.put(SERIAL_NUMBER, sn);
Jonathan Hart501f7882018-07-24 14:39:57 -0700171 authEvent.put(AUTHENTICATION_STATE, event.type().toString());
172 return authEvent;
173 }
174
kartikey dubey6e903092019-05-22 13:35:28 +0000175 private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
176 log.info("Serializing AuthenticationStatisticsEvent");
177 ObjectMapper mapper = new ObjectMapper();
178 ObjectNode authMetricsEvent = mapper.createObjectNode();
179 authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
180 authMetricsEvent.put(ACCEPT_RESPONSES_RX, event.subject().getAcceptResponsesRx());
181 authMetricsEvent.put(REJECT_RESPONSES_RX, event.subject().getRejectResponsesRx());
182 authMetricsEvent.put(CHALLENGE_RESPONSES_RX, event.subject().getChallengeResponsesRx());
183 authMetricsEvent.put(ACCESS_REQUESTS_TX, event.subject().getAccessRequestsTx());
184 authMetricsEvent.put(INVALID_VALIDATORS_RX, event.subject().getInvalidValidatorsRx());
185 authMetricsEvent.put(UNKNOWN_TYPE_RX, event.subject().getUnknownTypeRx());
186 authMetricsEvent.put(PENDING_REQUESTS, event.subject().getPendingRequests());
187 authMetricsEvent.put(DROPPED_RESPONSES_RX, event.subject().getDroppedResponsesRx());
188 authMetricsEvent.put(MALFORMED_RESPONSES_RX, event.subject().getMalformedResponsesRx());
189 authMetricsEvent.put(UNKNOWN_SERVER_RX, event.subject().getUnknownServerRx());
190 authMetricsEvent.put(REQUEST_RTT_MILLIS, event.subject().getRequestRttMilis());
191 authMetricsEvent.put(REQUEST_RE_TX, event.subject().getRequestReTx());
192 return authMetricsEvent;
193 }
194
Jonathan Hart501f7882018-07-24 14:39:57 -0700195 private class InternalAuthenticationListener implements
196 AuthenticationEventListener {
Jonathan Hart501f7882018-07-24 14:39:57 -0700197 @Override
198 public void event(AuthenticationEvent authenticationEvent) {
199 handle(authenticationEvent);
200 }
201 }
kartikey dubey6e903092019-05-22 13:35:28 +0000202
203 private class InternalAuthenticationStatisticsListner implements
204 AuthenticationStatisticsEventListener {
205 @Override
206 public void event(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
207 handleStat(authenticationStatisticsEvent);
208 }
209 }
210}