[SEBA-624] Implementation of radius server operational status

Change-Id: Id40fb124a05f0797dd823efd082bedc2a4744cf7
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index fe7051b..d67b6a1 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -32,6 +32,9 @@
 import org.opencord.aaa.AuthenticationStatisticsEventListener;
 import org.opencord.aaa.AuthenticationStatisticsService;
 import org.opencord.kafka.EventBusService;
+import org.opencord.aaa.RadiusOperationalStatusEvent;
+import org.opencord.aaa.RadiusOperationalStatusEventListener;
+import org.opencord.aaa.RadiusOperationalStatusService;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -64,14 +67,21 @@
             unbind = "unbindAuthenticationStatService")
     protected AuthenticationStatisticsService authenticationStatisticsService;
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+            bind = "bindRadiusOperationalStatusService",
+            unbind = "unbindRadiusOperationalStatusService")
+    protected RadiusOperationalStatusService radiusOperationalStatusService;
+
     private final AuthenticationEventListener listener = new InternalAuthenticationListener();
     private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
             new InternalAuthenticationStatisticsListner();
+    private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
+            new InternalRadiusOperationalStatusEventListener();
 
     // topics
     private static final String TOPIC = "authentication.events";
     private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
-
+    private static final String RADIUS_OPERATION_STATUS_TOPIC = "radiusOperationalStatus.events";
     // auth event params
     private static final String TIMESTAMP = "timestamp";
     private static final String DEVICE_ID = "deviceId";
@@ -110,6 +120,8 @@
     private static final String PENDING_RES_SUPPLICANT = "pendingResSupplicant";
     private static final String RES_ID_EAP_FRAMES_RX = "resIdEapFramesRx";
 
+    private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
+
     protected void bindAuthenticationService(AuthenticationService authenticationService) {
         log.info("bindAuthenticationService");
         if (this.authenticationService == null) {
@@ -158,6 +170,32 @@
         }
     }
 
+    protected void bindRadiusOperationalStatusService(
+            RadiusOperationalStatusService radiusOperationalStatusService) {
+        log.info("bindRadiusOperationalStatusService");
+        if (this.radiusOperationalStatusService == null) {
+            log.info("Binding RadiusOperationalStatusService");
+            this.radiusOperationalStatusService = radiusOperationalStatusService;
+            log.info("Adding listener on RadiusOperationalStatusService");
+            radiusOperationalStatusService.addListener(radiusOperationalStatusEventListener);
+        } else {
+            log.warn("Trying to bind radiusOperationalStatusService but it is already bound");
+        }
+    }
+
+    protected void unbindRadiusOperationalStatusService(
+            RadiusOperationalStatusService radiusOperationalStatusService) {
+        log.info("unbindRadiusOperationalStatusService");
+        if (this.radiusOperationalStatusService == radiusOperationalStatusService) {
+            log.info("Unbind RadiusOperationalStatusService");
+            this.radiusOperationalStatusService = null;
+            log.info("Removing listener on RadiusOperationalStatusService");
+            radiusOperationalStatusService.removeListener(radiusOperationalStatusEventListener);
+        } else {
+            log.warn("Trying to unbind radiusOperationalStatusService but it is already unbound");
+        }
+    }
+
     @Activate
     public void activate() {
         log.info("Started AaaKafkaIntegration");
@@ -177,6 +215,11 @@
         log.trace("AuthenticationStatisticsEvent sent successfully");
     }
 
+    private void handleOperationalStatus(RadiusOperationalStatusEvent event) {
+        eventBusService.send(RADIUS_OPERATION_STATUS_TOPIC, serializeOperationalStatus(event));
+        log.info("RadiusOperationalStatusEvent sent successfully");
+    }
+
     private JsonNode serialize(AuthenticationEvent event) {
         String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
 
@@ -227,6 +270,16 @@
         return authMetricsEvent;
     }
 
+    private JsonNode serializeOperationalStatus(RadiusOperationalStatusEvent event) {
+        log.info("Serializing RadiusOperationalStatusEvent");
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode authMetricsEvent = mapper.createObjectNode();
+        authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
+        log.info("---OPERATIONAL_STATUS----" + event.subject());
+        authMetricsEvent.put(OPERATIONAL_STATUS, event.subject());
+        return authMetricsEvent;
+    }
+
     private class InternalAuthenticationListener implements
     AuthenticationEventListener {
         @Override
@@ -242,4 +295,13 @@
             handleStat(authenticationStatisticsEvent);
         }
     }
+
+    private class InternalRadiusOperationalStatusEventListener implements
+           RadiusOperationalStatusEventListener {
+        @Override
+        public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
+            handleOperationalStatus(radiusOperationalStatusEvent);
+        }
+
+    }
 }