[SEBA-624] Implementation of radius server operational status
Change-Id: Id40fb124a05f0797dd823efd082bedc2a4744cf7
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index fe7051b..d67b6a1 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -32,6 +32,9 @@
import org.opencord.aaa.AuthenticationStatisticsEventListener;
import org.opencord.aaa.AuthenticationStatisticsService;
import org.opencord.kafka.EventBusService;
+import org.opencord.aaa.RadiusOperationalStatusEvent;
+import org.opencord.aaa.RadiusOperationalStatusEventListener;
+import org.opencord.aaa.RadiusOperationalStatusService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,14 +67,21 @@
unbind = "unbindAuthenticationStatService")
protected AuthenticationStatisticsService authenticationStatisticsService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ bind = "bindRadiusOperationalStatusService",
+ unbind = "unbindRadiusOperationalStatusService")
+ protected RadiusOperationalStatusService radiusOperationalStatusService;
+
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
new InternalAuthenticationStatisticsListner();
+ private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
+ new InternalRadiusOperationalStatusEventListener();
// topics
private static final String TOPIC = "authentication.events";
private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
-
+ private static final String RADIUS_OPERATION_STATUS_TOPIC = "radiusOperationalStatus.events";
// auth event params
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "deviceId";
@@ -110,6 +120,8 @@
private static final String PENDING_RES_SUPPLICANT = "pendingResSupplicant";
private static final String RES_ID_EAP_FRAMES_RX = "resIdEapFramesRx";
+ private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
+
protected void bindAuthenticationService(AuthenticationService authenticationService) {
log.info("bindAuthenticationService");
if (this.authenticationService == null) {
@@ -158,6 +170,32 @@
}
}
+ protected void bindRadiusOperationalStatusService(
+ RadiusOperationalStatusService radiusOperationalStatusService) {
+ log.info("bindRadiusOperationalStatusService");
+ if (this.radiusOperationalStatusService == null) {
+ log.info("Binding RadiusOperationalStatusService");
+ this.radiusOperationalStatusService = radiusOperationalStatusService;
+ log.info("Adding listener on RadiusOperationalStatusService");
+ radiusOperationalStatusService.addListener(radiusOperationalStatusEventListener);
+ } else {
+ log.warn("Trying to bind radiusOperationalStatusService but it is already bound");
+ }
+ }
+
+ protected void unbindRadiusOperationalStatusService(
+ RadiusOperationalStatusService radiusOperationalStatusService) {
+ log.info("unbindRadiusOperationalStatusService");
+ if (this.radiusOperationalStatusService == radiusOperationalStatusService) {
+ log.info("Unbind RadiusOperationalStatusService");
+ this.radiusOperationalStatusService = null;
+ log.info("Removing listener on RadiusOperationalStatusService");
+ radiusOperationalStatusService.removeListener(radiusOperationalStatusEventListener);
+ } else {
+ log.warn("Trying to unbind radiusOperationalStatusService but it is already unbound");
+ }
+ }
+
@Activate
public void activate() {
log.info("Started AaaKafkaIntegration");
@@ -177,6 +215,11 @@
log.trace("AuthenticationStatisticsEvent sent successfully");
}
+ private void handleOperationalStatus(RadiusOperationalStatusEvent event) {
+ eventBusService.send(RADIUS_OPERATION_STATUS_TOPIC, serializeOperationalStatus(event));
+ log.info("RadiusOperationalStatusEvent sent successfully");
+ }
+
private JsonNode serialize(AuthenticationEvent event) {
String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
@@ -227,6 +270,16 @@
return authMetricsEvent;
}
+ private JsonNode serializeOperationalStatus(RadiusOperationalStatusEvent event) {
+ log.info("Serializing RadiusOperationalStatusEvent");
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode authMetricsEvent = mapper.createObjectNode();
+ authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
+ log.info("---OPERATIONAL_STATUS----" + event.subject());
+ authMetricsEvent.put(OPERATIONAL_STATUS, event.subject());
+ return authMetricsEvent;
+ }
+
private class InternalAuthenticationListener implements
AuthenticationEventListener {
@Override
@@ -242,4 +295,13 @@
handleStat(authenticationStatisticsEvent);
}
}
+
+ private class InternalRadiusOperationalStatusEventListener implements
+ RadiusOperationalStatusEventListener {
+ @Override
+ public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
+ handleOperationalStatus(radiusOperationalStatusEvent);
+ }
+
+ }
}