[SEBA-36] Operational Status 802.1x Session
cherry-picked from kafka-onos-1.2
Change-Id: I52dfaae6b7a66c0ce61ec3560e4c25f75c67b7d3
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index 2ee8366..63cfb16 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -37,6 +37,10 @@
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
+import org.opencord.aaa.AaaMachineStatisticsEvent;
+import org.opencord.aaa.AaaMachineStatisticsEventListener;
+import org.opencord.aaa.AaaMachineStatisticsService;
+import org.opencord.aaa.AaaSupplicantMachineStats;
import java.time.Instant;
import java.util.concurrent.atomic.AtomicReference;
@@ -74,11 +78,20 @@
protected final AtomicReference<RadiusOperationalStatusService> radiusOperationalStatusServiceRef
= new AtomicReference<>();
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
+ policy = ReferencePolicy.DYNAMIC,
+ bind = "bindAaaMachineStatisticsService",
+ unbind = "unbindAaaMachineStatisticsService")
+ protected volatile AaaMachineStatisticsService ignore4;
+ protected final AtomicReference<AaaMachineStatisticsService> machineStatisticsServiceRef = new AtomicReference<>();
+
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
new InternalAuthenticationStatisticsListner();
private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
new InternalRadiusOperationalStatusEventListener();
+ private final AaaMachineStatisticsEventListener machineStatisticsEventListener =
+ new InternalAaaMachineStatisticsListner();
// topics
private static final String TOPIC = "authentication.events";
@@ -124,6 +137,20 @@
private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
+ //Supplicant machine stats event params
+ private static final String SESSION_ID = "sessionId";
+ private static final String SESSION_NAME = "sessionName";
+ private static final String MAC_ADDRESS = "macAddress";
+ private static final String EAPOL_TYPE = "eapolType";
+ private static final String SESSION_DURATION = "sessionDuration";
+ private static final String TOTAL_FRAMES_RX = "totalFramesRx";
+ private static final String TOTAL_FRAMES_TX = "totalFramesTx";
+ private static final String TOTAL_PACKETS_RX = "totalPacketsRx";
+ private static final String TOTAL_PACKETS_TX = "totalFramesTx";
+ private static final String SESSION_TERMINATE_REASON = "sessionTerminateReason";
+ private static final String TOTAL_OCTETS_TX = "totalOctetsTx";
+ private static final String TOTAL_OCTETS_RX = "totalOctetsRx";
+
protected void bindAuthenticationService(AuthenticationService incomingService) {
bindAndAddListener(incomingService, authServiceRef, listener);
}
@@ -151,6 +178,14 @@
radiusOperationalStatusEventListener);
}
+ protected void bindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+ bindAndAddListener(machineStatisticsService, machineStatisticsServiceRef, machineStatisticsEventListener);
+ }
+
+ protected void unbindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+ unbindAndRemoveListener(machineStatisticsService, machineStatisticsServiceRef, machineStatisticsEventListener);
+ }
+
@Activate
public void activate() {
log.info("Started AaaKafkaIntegration");
@@ -177,6 +212,11 @@
log.info("RadiusOperationalStatusEvent sent successfully");
}
+ private void handleMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+ eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeMachineStat(machineStatEvent));
+ log.info("MachineStatisticsEvent sent successfully");
+ }
+
private JsonNode serialize(AuthenticationEvent event) {
String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
@@ -237,6 +277,39 @@
return authMetricsEvent;
}
+ private JsonNode serializeMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+ log.info("Serializing AuthenticationStatisticsEvent");
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode machineStat = mapper.createObjectNode();
+ AaaSupplicantMachineStats subject = machineStatEvent.subject();
+ machineStat.put(TIMESTAMP, Instant.now().toString());
+ machineStat.put(SESSION_ID, subject.getSessionId());
+ machineStat.put(SESSION_NAME, subject.getSessionName());
+ machineStat.put(MAC_ADDRESS, subject.getSrcMacAddress());
+ machineStat.put(SESSION_DURATION, subject.getSessionDuration());
+ machineStat.put(EAPOL_TYPE, subject.getEapolType());
+ machineStat.put(TOTAL_FRAMES_RX, subject.getTotalFramesReceived());
+ machineStat.put(TOTAL_FRAMES_TX, subject.getTotalFramesSent());
+ machineStat.put(TOTAL_PACKETS_RX, subject.getTotalFramesReceived());
+ machineStat.put(TOTAL_PACKETS_TX, subject.getTotalFramesSent());
+ machineStat.put(TOTAL_OCTETS_RX, subject.getTotalOctetRecieved());
+ machineStat.put(TOTAL_OCTETS_TX, subject.getTotalOctetSent());
+ machineStat.put(SESSION_TERMINATE_REASON, subject.getSessionTerminateReason());
+ log.debug(SESSION_ID + " - " + subject.getSessionId());
+ log.debug(SESSION_NAME + " - " + subject.getSessionName());
+ log.debug(MAC_ADDRESS + " - " + subject.getSrcMacAddress());
+ log.debug(SESSION_DURATION + " - " + subject.getSessionDuration());
+ log.debug(EAPOL_TYPE + " - " + subject.getEapolType());
+ log.debug(TOTAL_FRAMES_RX + " - " + subject.getTotalFramesReceived());
+ log.debug(TOTAL_FRAMES_TX + " - " + subject.getTotalFramesSent());
+ log.debug(TOTAL_PACKETS_RX + " - " + subject.getTotalFramesReceived());
+ log.debug(TOTAL_PACKETS_TX + " - " + subject.getTotalFramesSent());
+ log.debug(TOTAL_OCTETS_RX + " - " + subject.getTotalOctetRecieved());
+ log.debug(TOTAL_OCTETS_TX + " - " + subject.getTotalOctetSent());
+ log.debug(SESSION_TERMINATE_REASON + " - " + subject.getSessionTerminateReason());
+ return machineStat;
+ }
+
private class InternalAuthenticationListener implements
AuthenticationEventListener {
@Override
@@ -259,6 +332,13 @@
public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
handleOperationalStatus(radiusOperationalStatusEvent);
}
+ }
+ private class InternalAaaMachineStatisticsListner implements AaaMachineStatisticsEventListener {
+
+ @Override
+ public void event(AaaMachineStatisticsEvent machineStatEvent) {
+ handleMachineStat(machineStatEvent);
+ }
}
}