Bump ONOS to 2.2 and build with Java 11
This app relies on dynamic OSGi @Reference. New Karaf demands volatile
fields, which required different handling of service bind/unbind events.
Change-Id: I215f7ca5cbded3acd9c440fe723f6f21d77f9ed5
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index f4efb0c..671f581 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -15,53 +15,53 @@
*/
package org.opencord.kafka.integrations;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.aaa.AuthenticationEvent;
import org.opencord.aaa.AuthenticationEventListener;
import org.opencord.aaa.AuthenticationService;
-import org.opencord.kafka.EventBusService;
import org.opencord.aaa.AuthenticationStatisticsEvent;
import org.opencord.aaa.AuthenticationStatisticsEventListener;
import org.opencord.aaa.AuthenticationStatisticsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opencord.kafka.EventBusService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for AAA events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class AaaKafkaIntegration {
+public class AaaKafkaIntegration extends AbstractKafkaIntegration {
- public Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAuthenticationService",
unbind = "unbindAuthenticationService")
- protected AuthenticationService authenticationService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ protected volatile AuthenticationService ignore;
+ private final AtomicReference<AuthenticationService> authServiceRef = new AtomicReference<>();
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAuthenticationStatService",
unbind = "unbindAuthenticationStatService")
- protected AuthenticationStatisticsService authenticationStatisticsService;
+ protected volatile AuthenticationStatisticsService ignore2;
+ private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
@@ -92,52 +92,20 @@
private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
private static final String REQUEST_RE_TX = "requestReTx";
- protected void bindAuthenticationService(AuthenticationService authenticationService) {
- log.info("bindAuthenticationService");
- if (this.authenticationService == null) {
- log.info("Binding AuthenticationService");
- this.authenticationService = authenticationService;
- log.info("Adding listener on AuthenticationService");
- authenticationService.addListener(listener);
- } else {
- log.warn("Trying to bind AuthenticationService but it is already bound");
- }
+ protected void bindAuthenticationService(AuthenticationService incomingService) {
+ bindAndAddListener(incomingService, authServiceRef, listener);
}
- protected void unbindAuthenticationService(AuthenticationService authenticationService) {
- log.info("unbindAuthenticationService");
- if (this.authenticationService == authenticationService) {
- log.info("Unbinding AuthenticationService");
- this.authenticationService = null;
- log.info("Removing listener on AuthenticationService");
- authenticationService.removeListener(listener);
- } else {
- log.warn("Trying to unbind AuthenticationService but it is already unbound");
- }
+ protected void unbindAuthenticationService(AuthenticationService outgoingService) {
+ unbindAndRemoveListener(outgoingService, authServiceRef, listener);
}
- protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
- log.info("bindAuthenticationStatService");
- if (this.authenticationStatisticsService == null) {
- log.info("Binding AuthenticationStastService");
- this.authenticationStatisticsService = authenticationStatisticsService;
- log.info("Adding listener on AuthenticationStatService");
- authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
- } else {
- log.warn("Trying to bind AuthenticationStatService but it is already bound");
- }
+ protected void bindAuthenticationStatService(AuthenticationStatisticsService incomingService) {
+ bindAndAddListener(incomingService, authStatServiceRef, authenticationStatisticsEventListener);
}
- protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
- log.info("unbindAuthenticationStatService");
- if (this.authenticationStatisticsService == authenticationStatisticsService) {
- log.info("Unbinding AuthenticationStatService");
- this.authenticationStatisticsService = null;
- log.info("Removing listener on AuthenticationStatService");
- authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
- } else {
- log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
- }
+ protected void unbindAuthenticationStatService(AuthenticationStatisticsService outgoingService) {
+ unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
}
@Activate
@@ -147,6 +115,8 @@
@Deactivate
public void deactivate() {
+ unbindAuthenticationService(authServiceRef.get());
+ unbindAuthenticationStatService(authStatServiceRef.get());
log.info("Stopped AaaKafkaIntegration");
}
@@ -156,7 +126,7 @@
private void handleStat(AuthenticationStatisticsEvent event) {
eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
- log.info("AuthenticationStatisticsEvent sent successfully");
+ log.debug("AuthenticationStatisticsEvent sent successfully");
}
private JsonNode serialize(AuthenticationEvent event) {
@@ -173,7 +143,7 @@
}
private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
- log.info("Serializing AuthenticationStatisticsEvent");
+ log.debug("Serializing AuthenticationStatisticsEvent");
ObjectMapper mapper = new ObjectMapper();
ObjectNode authMetricsEvent = mapper.createObjectNode();
authMetricsEvent.put(TIMESTAMP, Instant.now().toString());