[SEBA-624] Implementation of radius server operational status

cherry-picked from kafka-onos-1.2

Change-Id: I87002694538ebb6dd59e356b0c99aa335f20caa5
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index e903a98..2ee8366 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -19,7 +19,6 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import java.time.Instant;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
 import org.opencord.aaa.AuthenticationEvent;
@@ -28,6 +27,9 @@
 import org.opencord.aaa.AuthenticationStatisticsEvent;
 import org.opencord.aaa.AuthenticationStatisticsEventListener;
 import org.opencord.aaa.AuthenticationStatisticsService;
+import org.opencord.aaa.RadiusOperationalStatusEvent;
+import org.opencord.aaa.RadiusOperationalStatusEventListener;
+import org.opencord.aaa.RadiusOperationalStatusService;
 import org.opencord.kafka.EventBusService;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
@@ -36,6 +38,7 @@
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
 
+import java.time.Instant;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
@@ -63,14 +66,24 @@
     protected volatile AuthenticationStatisticsService ignore2;
     private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            policy = ReferencePolicy.DYNAMIC,
+            bind = "bindRadiusOperationalStatusService",
+            unbind = "unbindRadiusOperationalStatusService")
+    protected volatile RadiusOperationalStatusService ignore3;
+    protected final AtomicReference<RadiusOperationalStatusService> radiusOperationalStatusServiceRef
+            = new AtomicReference<>();
+
     private final AuthenticationEventListener listener = new InternalAuthenticationListener();
     private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
             new InternalAuthenticationStatisticsListner();
+    private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
+            new InternalRadiusOperationalStatusEventListener();
 
     // topics
     private static final String TOPIC = "authentication.events";
     private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
-
+    private static final String RADIUS_OPERATION_STATUS_TOPIC = "radiusOperationalStatus.events";
     // auth event params
     private static final String TIMESTAMP = "timestamp";
     private static final String DEVICE_ID = "deviceId";
@@ -109,6 +122,8 @@
     private static final String PENDING_RES_SUPPLICANT = "pendingResSupplicant";
     private static final String RES_ID_EAP_FRAMES_RX = "resIdEapFramesRx";
 
+    private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
+
     protected void bindAuthenticationService(AuthenticationService incomingService) {
         bindAndAddListener(incomingService, authServiceRef, listener);
     }
@@ -125,6 +140,17 @@
         unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
     }
 
+    protected void bindRadiusOperationalStatusService(
+            RadiusOperationalStatusService radiusOperationalStatusService) {
+        bindAndAddListener(radiusOperationalStatusService, radiusOperationalStatusServiceRef,
+                radiusOperationalStatusEventListener);
+    }
+
+    protected void unbindRadiusOperationalStatusService(RadiusOperationalStatusService radiusOperationalStatusService) {
+        unbindAndRemoveListener(radiusOperationalStatusService, radiusOperationalStatusServiceRef,
+                radiusOperationalStatusEventListener);
+    }
+
     @Activate
     public void activate() {
         log.info("Started AaaKafkaIntegration");
@@ -146,6 +172,11 @@
         log.trace("AuthenticationStatisticsEvent sent successfully");
     }
 
+    private void handleOperationalStatus(RadiusOperationalStatusEvent event) {
+        eventBusService.send(RADIUS_OPERATION_STATUS_TOPIC, serializeOperationalStatus(event));
+        log.info("RadiusOperationalStatusEvent sent successfully");
+    }
+
     private JsonNode serialize(AuthenticationEvent event) {
         String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
 
@@ -196,6 +227,16 @@
         return authMetricsEvent;
     }
 
+    private JsonNode serializeOperationalStatus(RadiusOperationalStatusEvent event) {
+        log.info("Serializing RadiusOperationalStatusEvent");
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode authMetricsEvent = mapper.createObjectNode();
+        authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
+        log.info("---OPERATIONAL_STATUS----" + event.subject());
+        authMetricsEvent.put(OPERATIONAL_STATUS, event.subject());
+        return authMetricsEvent;
+    }
+
     private class InternalAuthenticationListener implements
     AuthenticationEventListener {
         @Override
@@ -211,4 +252,13 @@
             handleStat(authenticationStatisticsEvent);
         }
     }
+
+    private class InternalRadiusOperationalStatusEventListener implements
+           RadiusOperationalStatusEventListener {
+        @Override
+        public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
+            handleOperationalStatus(radiusOperationalStatusEvent);
+        }
+
+    }
 }