[SEBA-36] Operational Status 802.1x Session
Change-Id: I52dfaae6b7a66c0ce61ec3560e4c25f75c67b7d3
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index d67b6a1..3cac0e8 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -35,6 +35,10 @@
import org.opencord.aaa.RadiusOperationalStatusEvent;
import org.opencord.aaa.RadiusOperationalStatusEventListener;
import org.opencord.aaa.RadiusOperationalStatusService;
+import org.opencord.aaa.AaaMachineStatisticsEvent;
+import org.opencord.aaa.AaaMachineStatisticsEventListener;
+import org.opencord.aaa.AaaMachineStatisticsService;
+import org.opencord.aaa.AaaSupplicantMachineStats;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -72,12 +76,21 @@
unbind = "unbindRadiusOperationalStatusService")
protected RadiusOperationalStatusService radiusOperationalStatusService;
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ policy = ReferencePolicy.DYNAMIC,
+ bind = "bindAaaMachineStatisticsService",
+ unbind = "unbindAaaMachineStatisticsService")
+ protected AaaMachineStatisticsService machineStatisticsService;
+
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
new InternalAuthenticationStatisticsListner();
private final RadiusOperationalStatusEventListener radiusOperationalStatusEventListener =
new InternalRadiusOperationalStatusEventListener();
+ private final AaaMachineStatisticsEventListener machineStatisticsEventListener =
+ new InternalAaaMachineStatisticsListner();
+
// topics
private static final String TOPIC = "authentication.events";
private static final String AUTHENTICATION_STATISTICS_TOPIC = "onos.aaa.stats.kpis";
@@ -122,6 +135,20 @@
private static final String OPERATIONAL_STATUS = "radiusOperationalStatus";
+ //Supplicant machine stats event params
+ private static final String SESSION_ID = "sessionId";
+ private static final String SESSION_NAME = "sessionName";
+ private static final String MAC_ADDRESS = "macAddress";
+ private static final String EAPOL_TYPE = "eapolType";
+ private static final String SESSION_DURATION = "sessionDuration";
+ private static final String TOTAL_FRAMES_RX = "totalFramesRx";
+ private static final String TOTAL_FRAMES_TX = "totalFramesTx";
+ private static final String TOTAL_PACKETS_RX = "totalPacketsRx";
+ private static final String TOTAL_PACKETS_TX = "totalFramesTx";
+ private static final String SESSION_TERMINATE_REASON = "sessionTerminateReason";
+ private static final String TOTAL_OCTETS_TX = "totalOctetsTx";
+ private static final String TOTAL_OCTETS_RX = "totalOctetsRx";
+
protected void bindAuthenticationService(AuthenticationService authenticationService) {
log.info("bindAuthenticationService");
if (this.authenticationService == null) {
@@ -196,6 +223,30 @@
}
}
+ protected void bindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+ log.info("bindAaaMachineStatisticsService");
+ if (this.machineStatisticsService == null) {
+ log.info("Binding AaaMachineStatisticsService");
+ this.machineStatisticsService = machineStatisticsService;
+ log.info("Adding listener on AaaMachineStatisticsService");
+ machineStatisticsService.addListener(machineStatisticsEventListener);
+ } else {
+ log.warn("Trying to bind AaaMachineStatisticsService but it is already bound");
+ }
+ }
+
+ protected void unbindAaaMachineStatisticsService(AaaMachineStatisticsService machineStatisticsService) {
+ log.info("unbindAaaMachineStatisticsService");
+ if (this.machineStatisticsService == machineStatisticsService) {
+ log.info("Unbinding AaaMachineStatisticsService");
+ this.machineStatisticsService = null;
+ log.info("Removing listener on AaaMachineStatisticsService");
+ machineStatisticsService.removeListener(machineStatisticsEventListener);
+ } else {
+ log.warn("Trying to unbind AaaMachineStatisticsService but it is already unbound");
+ }
+ }
+
@Activate
public void activate() {
log.info("Started AaaKafkaIntegration");
@@ -220,6 +271,11 @@
log.info("RadiusOperationalStatusEvent sent successfully");
}
+ private void handleMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+ eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeMachineStat(machineStatEvent));
+ log.info("MachineStatisticsEvent sent successfully");
+ }
+
private JsonNode serialize(AuthenticationEvent event) {
String sn = deviceService.getPort(event.subject()).annotations().value(AnnotationKeys.PORT_NAME);
@@ -280,6 +336,39 @@
return authMetricsEvent;
}
+ private JsonNode serializeMachineStat(AaaMachineStatisticsEvent machineStatEvent) {
+ log.info("Serializing AuthenticationStatisticsEvent");
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode machineStat = mapper.createObjectNode();
+ AaaSupplicantMachineStats subject = machineStatEvent.subject();
+ machineStat.put(TIMESTAMP, Instant.now().toString());
+ machineStat.put(SESSION_ID, subject.getSessionId());
+ machineStat.put(SESSION_NAME, subject.getSessionName());
+ machineStat.put(MAC_ADDRESS, subject.getSrcMacAddress());
+ machineStat.put(SESSION_DURATION, subject.getSessionDuration());
+ machineStat.put(EAPOL_TYPE, subject.getEapolType());
+ machineStat.put(TOTAL_FRAMES_RX, subject.getTotalFramesReceived());
+ machineStat.put(TOTAL_FRAMES_TX, subject.getTotalFramesSent());
+ machineStat.put(TOTAL_PACKETS_RX, subject.getTotalFramesReceived());
+ machineStat.put(TOTAL_PACKETS_TX, subject.getTotalFramesSent());
+ machineStat.put(TOTAL_OCTETS_RX, subject.getTotalOctetRecieved());
+ machineStat.put(TOTAL_OCTETS_TX, subject.getTotalOctetSent());
+ machineStat.put(SESSION_TERMINATE_REASON, subject.getSessionTerminateReason());
+ log.debug(SESSION_ID + " - " + subject.getSessionId());
+ log.debug(SESSION_NAME + " - " + subject.getSessionName());
+ log.debug(MAC_ADDRESS + " - " + subject.getSrcMacAddress());
+ log.debug(SESSION_DURATION + " - " + subject.getSessionDuration());
+ log.debug(EAPOL_TYPE + " - " + subject.getEapolType());
+ log.debug(TOTAL_FRAMES_RX + " - " + subject.getTotalFramesReceived());
+ log.debug(TOTAL_FRAMES_TX + " - " + subject.getTotalFramesSent());
+ log.debug(TOTAL_PACKETS_RX + " - " + subject.getTotalFramesReceived());
+ log.debug(TOTAL_PACKETS_TX + " - " + subject.getTotalFramesSent());
+ log.debug(TOTAL_OCTETS_RX + " - " + subject.getTotalOctetRecieved());
+ log.debug(TOTAL_OCTETS_TX + " - " + subject.getTotalOctetSent());
+ log.debug(SESSION_TERMINATE_REASON + " - " + subject.getSessionTerminateReason());
+ return machineStat;
+ }
+
private class InternalAuthenticationListener implements
AuthenticationEventListener {
@Override
@@ -302,6 +391,13 @@
public void event(RadiusOperationalStatusEvent radiusOperationalStatusEvent) {
handleOperationalStatus(radiusOperationalStatusEvent);
}
+ }
+ private class InternalAaaMachineStatisticsListner implements AaaMachineStatisticsEventListener {
+
+ @Override
+ public void event(AaaMachineStatisticsEvent machineStatEvent) {
+ handleMachineStat(machineStatEvent);
+ }
}
}