[SEBA-37] Added 11 Statistics counters for Accounting Server operations
Change-Id: I46f73de9a4a5c8156e028079d71688e7d63cfcdb
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index 316032e..f4efb0c 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -15,7 +15,6 @@
*/
package org.opencord.kafka.integrations;
-
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
@@ -31,6 +30,9 @@
import org.opencord.aaa.AuthenticationEventListener;
import org.opencord.aaa.AuthenticationService;
import org.opencord.kafka.EventBusService;
+import org.opencord.aaa.AuthenticationStatisticsEvent;
+import org.opencord.aaa.AuthenticationStatisticsEventListener;
+import org.opencord.aaa.AuthenticationStatisticsService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,17 +57,41 @@
bind = "bindAuthenticationService",
unbind = "unbindAuthenticationService")
protected AuthenticationService authenticationService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ policy = ReferencePolicy.DYNAMIC,
+ bind = "bindAuthenticationStatService",
+ unbind = "unbindAuthenticationStatService")
+ protected AuthenticationStatisticsService authenticationStatisticsService;
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
+ private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
+ new InternalAuthenticationStatisticsListner();
+ // topics
private static final String TOPIC = "authentication.events";
+ private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
+ // auth event params
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "deviceId";
private static final String PORT_NUMBER = "portNumber";
private static final String SERIAL_NUMBER = "serialNumber";
private static final String AUTHENTICATION_STATE = "authenticationState";
+ // auth stats event params
+ private static final String ACCEPT_RESPONSES_RX = "acceptResponsesRx";
+ private static final String REJECT_RESPONSES_RX = "rejectResponsesRx";
+ private static final String CHALLENGE_RESPONSES_RX = "challengeResponsesRx";
+ private static final String ACCESS_REQUESTS_TX = "accessRequestsTx";
+ private static final String INVALID_VALIDATORS_RX = "invalidValidatorsRx";
+ private static final String UNKNOWN_TYPE_RX = "unknownTypeRx";
+ private static final String PENDING_REQUESTS = "pendingRequests";
+ private static final String DROPPED_RESPONSES_RX = "droppedResponsesRx";
+ private static final String MALFORMED_RESPONSES_RX = "malformedResponsesRx";
+ private static final String UNKNOWN_SERVER_RX = "unknownServerRx";
+ private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
+ private static final String REQUEST_RE_TX = "requestReTx";
+
protected void bindAuthenticationService(AuthenticationService authenticationService) {
log.info("bindAuthenticationService");
if (this.authenticationService == null) {
@@ -90,6 +116,30 @@
}
}
+ protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
+ log.info("bindAuthenticationStatService");
+ if (this.authenticationStatisticsService == null) {
+ log.info("Binding AuthenticationStastService");
+ this.authenticationStatisticsService = authenticationStatisticsService;
+ log.info("Adding listener on AuthenticationStatService");
+ authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
+ } else {
+ log.warn("Trying to bind AuthenticationStatService but it is already bound");
+ }
+ }
+
+ protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
+ log.info("unbindAuthenticationStatService");
+ if (this.authenticationStatisticsService == authenticationStatisticsService) {
+ log.info("Unbinding AuthenticationStatService");
+ this.authenticationStatisticsService = null;
+ log.info("Removing listener on AuthenticationStatService");
+ authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
+ } else {
+ log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
+ }
+ }
+
@Activate
public void activate() {
log.info("Started AaaKafkaIntegration");
@@ -104,8 +154,12 @@
eventBusService.send(TOPIC, serialize(event));
}
- private JsonNode serialize(AuthenticationEvent event) {
+ private void handleStat(AuthenticationStatisticsEvent event) {
+ eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
+ log.info("AuthenticationStatisticsEvent sent successfully");
+ }
+ private JsonNode serialize(AuthenticationEvent event) {
String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
ObjectMapper mapper = new ObjectMapper();
@@ -118,12 +172,39 @@
return authEvent;
}
+ private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
+ log.info("Serializing AuthenticationStatisticsEvent");
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode authMetricsEvent = mapper.createObjectNode();
+ authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
+ authMetricsEvent.put(ACCEPT_RESPONSES_RX, event.subject().getAcceptResponsesRx());
+ authMetricsEvent.put(REJECT_RESPONSES_RX, event.subject().getRejectResponsesRx());
+ authMetricsEvent.put(CHALLENGE_RESPONSES_RX, event.subject().getChallengeResponsesRx());
+ authMetricsEvent.put(ACCESS_REQUESTS_TX, event.subject().getAccessRequestsTx());
+ authMetricsEvent.put(INVALID_VALIDATORS_RX, event.subject().getInvalidValidatorsRx());
+ authMetricsEvent.put(UNKNOWN_TYPE_RX, event.subject().getUnknownTypeRx());
+ authMetricsEvent.put(PENDING_REQUESTS, event.subject().getPendingRequests());
+ authMetricsEvent.put(DROPPED_RESPONSES_RX, event.subject().getDroppedResponsesRx());
+ authMetricsEvent.put(MALFORMED_RESPONSES_RX, event.subject().getMalformedResponsesRx());
+ authMetricsEvent.put(UNKNOWN_SERVER_RX, event.subject().getUnknownServerRx());
+ authMetricsEvent.put(REQUEST_RTT_MILLIS, event.subject().getRequestRttMilis());
+ authMetricsEvent.put(REQUEST_RE_TX, event.subject().getRequestReTx());
+ return authMetricsEvent;
+ }
+
private class InternalAuthenticationListener implements
AuthenticationEventListener {
-
@Override
public void event(AuthenticationEvent authenticationEvent) {
handle(authenticationEvent);
}
}
-}
+
+ private class InternalAuthenticationStatisticsListner implements
+ AuthenticationStatisticsEventListener {
+ @Override
+ public void event(AuthenticationStatisticsEvent authenticationStatisticsEvent) {
+ handleStat(authenticationStatisticsEvent);
+ }
+ }
+}
\ No newline at end of file