[SEBA-624] Implementation of radius server operational status
cherry-picked from kafka-onos-1.2
Change-Id: I87002694538ebb6dd59e356b0c99aa335f20caa5
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index e903a98..2ee8366 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -19,7 +19,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.time.Instant;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.aaa.AuthenticationEvent;
@@ -28,6 +27,9 @@
import org.opencord.aaa.AuthenticationStatisticsEvent;
import org.opencord.aaa.AuthenticationStatisticsEventListener;
import org.opencord.aaa.AuthenticationStatisticsService;
+import org.opencord.aaa.RadiusOperationalStatusEvent;
+import org.opencord.aaa.RadiusOperationalStatusEventListener;
+import org.opencord.aaa.RadiusOperationalStatusService;
import org.opencord.kafka.EventBusService;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
@@ -36,6 +38,7 @@
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
+import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -63,14 +66,24 @@
protected volatile AuthenticationStatisticsService ignore2;
private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+ policy = ReferencePolicy.DYNAMIC,
+ bind = "bindRadiusOperationalStatusService",
+ unbind = "unbindRadiusOperationalStatusService")
+ protected volatile RadiusOperationalStatusService ignore3;
+ protected final AtomicReference<RadiusOperationalStatusService> radiusOperationalStatusServiceRef
+ = new AtomicReference<>();
+
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
new InternalAuthenticationStatisticsListner();
+ private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
+ new InternalRadiusOperationalStatusEventListener();
// topics
private static final String TOPIC = "authentication.events";
private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
-
+ private static final String RADIUS_OPERATION_STATUS_TOPIC = "radiusOperationalStatus.events";
// auth event params
private static final String TIMESTAMP = "timestamp";
private static final String DEVICE_ID = "deviceId";
@@ -109,6 +122,8 @@
private static final String PENDING_RES_SUPPLICANT = "pendingResSupplicant";
private static final String RES_ID_EAP_FRAMES_RX = "resIdEapFramesRx";
+ private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
+
protected void bindAuthenticationService(AuthenticationService incomingService) {
bindAndAddListener(incomingService, authServiceRef, listener);
}
@@ -125,6 +140,17 @@
unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
}
+ protected void bindRadiusOperationalStatusService(
+ RadiusOperationalStatusService radiusOperationalStatusService) {
+ bindAndAddListener(radiusOperationalStatusService, radiusOperationalStatusServiceRef,
+ radiusOperationalStatusEventListener);
+ }
+
+ protected void unbindRadiusOperationalStatusService(RadiusOperationalStatusService radiusOperationalStatusService) {
+ unbindAndRemoveListener(radiusOperationalStatusService, radiusOperationalStatusServiceRef,
+ radiusOperationalStatusEventListener);
+ }
+
@Activate
public void activate() {
log.info("Started AaaKafkaIntegration");
@@ -146,6 +172,11 @@
log.trace("AuthenticationStatisticsEvent sent successfully");
}
+ private void handleOperationalStatus(RadiusOperationalStatusEvent event) {
+ eventBusService.send(RADIUS_OPERATION_STATUS_TOPIC, serializeOperationalStatus(event));
+ log.info("RadiusOperationalStatusEvent sent successfully");
+ }
+
private JsonNode serialize(AuthenticationEvent event) {
String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
@@ -196,6 +227,16 @@
return authMetricsEvent;
}
+ private JsonNode serializeOperationalStatus(RadiusOperationalStatusEvent event) {
+ log.info("Serializing RadiusOperationalStatusEvent");
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode authMetricsEvent = mapper.createObjectNode();
+ authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
+ log.info("---OPERATIONAL_STATUS----" + event.subject());
+ authMetricsEvent.put(OPERATIONAL_STATUS, event.subject());
+ return authMetricsEvent;
+ }
+
private class InternalAuthenticationListener implements
AuthenticationEventListener {
@Override
@@ -211,4 +252,13 @@
handleStat(authenticationStatisticsEvent);
}
}
+
+ private class InternalRadiusOperationalStatusEventListener implements
+ RadiusOperationalStatusEventListener {
+ @Override
+ public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
+ handleOperationalStatus(radiusOperationalStatusEvent);
+ }
+
+ }
}