[SEBA-36] Operational Status 802.1x Session

cherry-picked from kafka-onos-1.2

Change-Id: I52dfaae6b7a66c0ce61ec3560e4c25f75c67b7d3
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index 2ee8366..63cfb16 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -37,6 +37,10 @@
 import org.osgi.service.component.annotations.Reference;
 import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.osgi.service.component.annotations.ReferencePolicy;
+import org.opencord.aaa.AaaMachineStatisticsEvent;
+import org.opencord.aaa.AaaMachineStatisticsEventListener;
+import org.opencord.aaa.AaaMachineStatisticsService;
+import org.opencord.aaa.AaaSupplicantMachineStats;
 
 import java.time.Instant;
 import java.util.concurrent.atomic.AtomicReference;
@@ -74,11 +78,20 @@
     protected final AtomicReference<RadiusOperationalStatusService> radiusOperationalStatusServiceRef
             = new AtomicReference<>();
 
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+            policy = ReferencePolicy.DYNAMIC,
+            bind = "bindAaaMachineStatisticsService",
+            unbind = "unbindAaaMachineStatisticsService")
+    protected volatile AaaMachineStatisticsService ignore4;
+    protected final AtomicReference<AaaMachineStatisticsService> machineStatisticsServiceRef = new AtomicReference<>();
+
     private final AuthenticationEventListener listener = new InternalAuthenticationListener();
     private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
             new InternalAuthenticationStatisticsListner();
     private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
             new InternalRadiusOperationalStatusEventListener();
+    private final AaaMachineStatisticsEventListener machineStatisticsEventListener =
+            new InternalAaaMachineStatisticsListner();
 
     // topics
     private static final String TOPIC = "authentication.events";
@@ -124,6 +137,20 @@
 
     private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
 
+    //Supplicant machine stats event params
+    private static final String SESSION_ID = "sessionId";
+    private static final String SESSION_NAME = "sessionName";
+    private static final String MAC_ADDRESS = "macAddress";
+    private static final String EAPOL_TYPE = "eapolType";
+    private static final String SESSION_DURATION = "sessionDuration";
+    private static final String TOTAL_FRAMES_RX = "totalFramesRx";
+    private static final String TOTAL_FRAMES_TX = "totalFramesTx";
+    private static final String TOTAL_PACKETS_RX = "totalPacketsRx";
+    private static final String TOTAL_PACKETS_TX = "totalFramesTx";
+    private static final String SESSION_TERMINATE_REASON = "sessionTerminateReason";
+    private static final String TOTAL_OCTETS_TX = "totalOctetsTx";
+    private static final String TOTAL_OCTETS_RX = "totalOctetsRx";
+
     protected void bindAuthenticationService(AuthenticationService incomingService) {
         bindAndAddListener(incomingService, authServiceRef, listener);
     }
@@ -151,6 +178,14 @@
                 radiusOperationalStatusEventListener);
     }
 
+    protected void bindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+        bindAndAddListener(machineStatisticsService, machineStatisticsServiceRef, machineStatisticsEventListener);
+    }
+
+    protected void unbindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+        unbindAndRemoveListener(machineStatisticsService, machineStatisticsServiceRef, machineStatisticsEventListener);
+    }
+
     @Activate
     public void activate() {
         log.info("Started AaaKafkaIntegration");
@@ -177,6 +212,11 @@
         log.info("RadiusOperationalStatusEvent sent successfully");
     }
 
+    private void handleMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+        eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeMachineStat(machineStatEvent));
+        log.info("MachineStatisticsEvent sent successfully");
+    }
+
     private JsonNode serialize(AuthenticationEvent event) {
         String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
 
@@ -237,6 +277,39 @@
         return authMetricsEvent;
     }
 
+    private JsonNode serializeMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+        log.info("Serializing AuthenticationStatisticsEvent");
+        ObjectMapper mapper = new ObjectMapper();
+        ObjectNode machineStat = mapper.createObjectNode();
+        AaaSupplicantMachineStats subject = machineStatEvent.subject();
+        machineStat.put(TIMESTAMP, Instant.now().toString());
+        machineStat.put(SESSION_ID, subject.getSessionId());
+        machineStat.put(SESSION_NAME, subject.getSessionName());
+        machineStat.put(MAC_ADDRESS, subject.getSrcMacAddress());
+        machineStat.put(SESSION_DURATION, subject.getSessionDuration());
+        machineStat.put(EAPOL_TYPE, subject.getEapolType());
+        machineStat.put(TOTAL_FRAMES_RX, subject.getTotalFramesReceived());
+        machineStat.put(TOTAL_FRAMES_TX, subject.getTotalFramesSent());
+        machineStat.put(TOTAL_PACKETS_RX, subject.getTotalFramesReceived());
+        machineStat.put(TOTAL_PACKETS_TX, subject.getTotalFramesSent());
+        machineStat.put(TOTAL_OCTETS_RX, subject.getTotalOctetRecieved());
+        machineStat.put(TOTAL_OCTETS_TX, subject.getTotalOctetSent());
+        machineStat.put(SESSION_TERMINATE_REASON, subject.getSessionTerminateReason());
+        log.debug(SESSION_ID + " - " + subject.getSessionId());
+        log.debug(SESSION_NAME + " - " + subject.getSessionName());
+        log.debug(MAC_ADDRESS + " - " + subject.getSrcMacAddress());
+        log.debug(SESSION_DURATION + " - " + subject.getSessionDuration());
+        log.debug(EAPOL_TYPE + " - " + subject.getEapolType());
+        log.debug(TOTAL_FRAMES_RX + " - " + subject.getTotalFramesReceived());
+        log.debug(TOTAL_FRAMES_TX + " - " + subject.getTotalFramesSent());
+        log.debug(TOTAL_PACKETS_RX + " - " + subject.getTotalFramesReceived());
+        log.debug(TOTAL_PACKETS_TX + " - " + subject.getTotalFramesSent());
+        log.debug(TOTAL_OCTETS_RX + " - " + subject.getTotalOctetRecieved());
+        log.debug(TOTAL_OCTETS_TX + " - " + subject.getTotalOctetSent());
+        log.debug(SESSION_TERMINATE_REASON + " - " + subject.getSessionTerminateReason());
+        return machineStat;
+    }
+
     private class InternalAuthenticationListener implements
     AuthenticationEventListener {
         @Override
@@ -259,6 +332,13 @@
         public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
             handleOperationalStatus(radiusOperationalStatusEvent);
         }
+    }
 
+    private class InternalAaaMachineStatisticsListner implements AaaMachineStatisticsEventListener {
+
+        @Override
+        public void event(AaaMachineStatisticsEvent machineStatEvent) {
+            handleMachineStat(machineStatEvent);
+        }
     }
 }