[SEBA-36] Operational Status 802.1x Session

Change-Id: I52dfaae6b7a66c0ce61ec3560e4c25f75c67b7d3
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index d67b6a1..3cac0e8 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -35,6 +35,10 @@
 import org.opencord.aaa.RadiusOperationalStatusEvent;
 import org.opencord.aaa.RadiusOperationalStatusEventListener;
 import org.opencord.aaa.RadiusOperationalStatusService;
+import org.opencord.aaa.AaaMachineStatisticsEvent;
+import org.opencord.aaa.AaaMachineStatisticsEventListener;
+import org.opencord.aaa.AaaMachineStatisticsService;
+import org.opencord.aaa.AaaSupplicantMachineStats;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -72,12 +76,21 @@
             unbind = "unbindRadiusOperationalStatusService")
     protected RadiusOperationalStatusService radiusOperationalStatusService;
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+            policy = ReferencePolicy.DYNAMIC,
+            bind = "bindAaaMachineStatisticsService",
+            unbind = "unbindAaaMachineStatisticsService")
+    protected AaaMachineStatisticsService machineStatisticsService;
+
     private final AuthenticationEventListener listener = new InternalAuthenticationListener();
     private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
             new InternalAuthenticationStatisticsListner();
     private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
             new InternalRadiusOperationalStatusEventListener();
 
+    private final AaaMachineStatisticsEventListener machineStatisticsEventListener =
+            new InternalAaaMachineStatisticsListner();
+
     // topics
     private static final String TOPIC = "authentication.events";
     private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
@@ -122,6 +135,20 @@
 
     private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
 
+    //Supplicant machine stats event params
+    private static final String SESSION_ID = "sessionId";
+    private static final String SESSION_NAME = "sessionName";
+    private static final String MAC_ADDRESS = "macAddress";
+    private static final String EAPOL_TYPE = "eapolType";
+    private static final String SESSION_DURATION = "sessionDuration";
+    private static final String TOTAL_FRAMES_RX = "totalFramesRx";
+    private static final String TOTAL_FRAMES_TX = "totalFramesTx";
+    private static final String TOTAL_PACKETS_RX = "totalPacketsRx";
+    private static final String TOTAL_PACKETS_TX = "totalFramesTx";
+    private static final String SESSION_TERMINATE_REASON = "sessionTerminateReason";
+    private static final String TOTAL_OCTETS_TX = "totalOctetsTx";
+    private static final String TOTAL_OCTETS_RX = "totalOctetsRx";
+
     protected void bindAuthenticationService(AuthenticationService authenticationService) {
         log.info("bindAuthenticationService");
         if (this.authenticationService == null) {
@@ -196,6 +223,30 @@
         }
     }
 
+    protected void bindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+        log.info("bindAaaMachineStatisticsService");
+        if (this.machineStatisticsService == null) {
+            log.info("Binding AaaMachineStatisticsService");
+            this.machineStatisticsService = machineStatisticsService;
+            log.info("Adding listener on AaaMachineStatisticsService");
+            machineStatisticsService.addListener(machineStatisticsEventListener);
+        } else {
+            log.warn("Trying to bind AaaMachineStatisticsService but it is already bound");
+        }
+    }
+
+    protected void unbindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+        log.info("unbindAaaMachineStatisticsService");
+        if (this.machineStatisticsService == machineStatisticsService) {
+            log.info("Unbinding AaaMachineStatisticsService");
+            this.machineStatisticsService = null;
+            log.info("Removing listener on AaaMachineStatisticsService");
+            machineStatisticsService.removeListener(machineStatisticsEventListener);
+        } else {
+            log.warn("Trying to unbind AaaMachineStatisticsService but it is already unbound");
+        }
+    }
+
     @Activate
     public void activate() {
         log.info("Started AaaKafkaIntegration");
@@ -220,6 +271,11 @@
         log.info("RadiusOperationalStatusEvent sent successfully");
     }
 
+    private void handleMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+        eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeMachineStat(machineStatEvent));
+        log.info("MachineStatisticsEvent sent successfully");
+    }
+
     private JsonNode serialize(AuthenticationEvent event) {
         String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
 
@@ -280,6 +336,39 @@
         return authMetricsEvent;
     }
 
+    private JsonNode serializeMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+        log.info("Serializing AuthenticationStatisticsEvent");
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode machineStat = mapper.createObjectNode();
+        AaaSupplicantMachineStats subject = machineStatEvent.subject();
+        machineStat.put(TIMESTAMP, Instant.now().toString());
+        machineStat.put(SESSION_ID, subject.getSessionId());
+        machineStat.put(SESSION_NAME, subject.getSessionName());
+        machineStat.put(MAC_ADDRESS, subject.getSrcMacAddress());
+        machineStat.put(SESSION_DURATION, subject.getSessionDuration());
+        machineStat.put(EAPOL_TYPE, subject.getEapolType());
+        machineStat.put(TOTAL_FRAMES_RX, subject.getTotalFramesReceived());
+        machineStat.put(TOTAL_FRAMES_TX, subject.getTotalFramesSent());
+        machineStat.put(TOTAL_PACKETS_RX, subject.getTotalFramesReceived());
+        machineStat.put(TOTAL_PACKETS_TX, subject.getTotalFramesSent());
+        machineStat.put(TOTAL_OCTETS_RX, subject.getTotalOctetRecieved());
+        machineStat.put(TOTAL_OCTETS_TX, subject.getTotalOctetSent());
+        machineStat.put(SESSION_TERMINATE_REASON, subject.getSessionTerminateReason());
+        log.debug(SESSION_ID + " - " + subject.getSessionId());
+        log.debug(SESSION_NAME + " - " + subject.getSessionName());
+        log.debug(MAC_ADDRESS + " - " + subject.getSrcMacAddress());
+        log.debug(SESSION_DURATION + " - " + subject.getSessionDuration());
+        log.debug(EAPOL_TYPE + " - " + subject.getEapolType());
+        log.debug(TOTAL_FRAMES_RX + " - " + subject.getTotalFramesReceived());
+        log.debug(TOTAL_FRAMES_TX + " - " + subject.getTotalFramesSent());
+        log.debug(TOTAL_PACKETS_RX + " - " + subject.getTotalFramesReceived());
+        log.debug(TOTAL_PACKETS_TX + " - " + subject.getTotalFramesSent());
+        log.debug(TOTAL_OCTETS_RX + " - " + subject.getTotalOctetRecieved());
+        log.debug(TOTAL_OCTETS_TX + " - " + subject.getTotalOctetSent());
+        log.debug(SESSION_TERMINATE_REASON + " - " + subject.getSessionTerminateReason());
+        return machineStat;
+    }
+
     private class InternalAuthenticationListener implements
     AuthenticationEventListener {
         @Override
@@ -302,6 +391,13 @@
         public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
             handleOperationalStatus(radiusOperationalStatusEvent);
         }
+    }
 
+    private class InternalAaaMachineStatisticsListner implements AaaMachineStatisticsEventListener {
+
+        @Override
+        public void event(AaaMachineStatisticsEvent machineStatEvent) {
+            handleMachineStat(machineStatEvent);
+        }
     }
 }