Bump ONOS to 2.2 and build with Java 11
This app relies on dynamic OSGi @Reference. New Karaf demands volatile
fields, which required different handling of service bind/unbind events.
Change-Id: I215f7ca5cbded3acd9c440fe723f6f21d77f9ed5
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 0720a86..7dc2c35 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -16,12 +16,11 @@
package org.opencord.kafka.impl;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -48,7 +47,6 @@
/**
* Sends events to a Kafka event bus.
*/
-@Service
@Component(immediate = true)
public class KafkaIntegration implements EventBusService {
@@ -61,13 +59,13 @@
private Boolean kafkaStarted = false;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected NetworkConfigRegistry configRegistry;
private static StringSerializer stringSerializer = new StringSerializer();
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index f4efb0c..671f581 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -15,53 +15,53 @@
*/
package org.opencord.kafka.integrations;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.aaa.AuthenticationEvent;
import org.opencord.aaa.AuthenticationEventListener;
import org.opencord.aaa.AuthenticationService;
-import org.opencord.kafka.EventBusService;
import org.opencord.aaa.AuthenticationStatisticsEvent;
import org.opencord.aaa.AuthenticationStatisticsEventListener;
import org.opencord.aaa.AuthenticationStatisticsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opencord.kafka.EventBusService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for AAA events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class AaaKafkaIntegration {
+public class AaaKafkaIntegration extends AbstractKafkaIntegration {
- public Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAuthenticationService",
unbind = "unbindAuthenticationService")
- protected AuthenticationService authenticationService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ protected volatile AuthenticationService ignore;
+ private final AtomicReference<AuthenticationService> authServiceRef = new AtomicReference<>();
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAuthenticationStatService",
unbind = "unbindAuthenticationStatService")
- protected AuthenticationStatisticsService authenticationStatisticsService;
+ protected volatile AuthenticationStatisticsService ignore2;
+ private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
@@ -92,52 +92,20 @@
private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
private static final String REQUEST_RE_TX = "requestReTx";
- protected void bindAuthenticationService(AuthenticationService authenticationService) {
- log.info("bindAuthenticationService");
- if (this.authenticationService == null) {
- log.info("Binding AuthenticationService");
- this.authenticationService = authenticationService;
- log.info("Adding listener on AuthenticationService");
- authenticationService.addListener(listener);
- } else {
- log.warn("Trying to bind AuthenticationService but it is already bound");
- }
+ protected void bindAuthenticationService(AuthenticationService incomingService) {
+ bindAndAddListener(incomingService, authServiceRef, listener);
}
- protected void unbindAuthenticationService(AuthenticationService authenticationService) {
- log.info("unbindAuthenticationService");
- if (this.authenticationService == authenticationService) {
- log.info("Unbinding AuthenticationService");
- this.authenticationService = null;
- log.info("Removing listener on AuthenticationService");
- authenticationService.removeListener(listener);
- } else {
- log.warn("Trying to unbind AuthenticationService but it is already unbound");
- }
+ protected void unbindAuthenticationService(AuthenticationService outgoingService) {
+ unbindAndRemoveListener(outgoingService, authServiceRef, listener);
}
- protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
- log.info("bindAuthenticationStatService");
- if (this.authenticationStatisticsService == null) {
- log.info("Binding AuthenticationStastService");
- this.authenticationStatisticsService = authenticationStatisticsService;
- log.info("Adding listener on AuthenticationStatService");
- authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
- } else {
- log.warn("Trying to bind AuthenticationStatService but it is already bound");
- }
+ protected void bindAuthenticationStatService(AuthenticationStatisticsService incomingService) {
+ bindAndAddListener(incomingService, authStatServiceRef, authenticationStatisticsEventListener);
}
- protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
- log.info("unbindAuthenticationStatService");
- if (this.authenticationStatisticsService == authenticationStatisticsService) {
- log.info("Unbinding AuthenticationStatService");
- this.authenticationStatisticsService = null;
- log.info("Removing listener on AuthenticationStatService");
- authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
- } else {
- log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
- }
+ protected void unbindAuthenticationStatService(AuthenticationStatisticsService outgoingService) {
+ unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
}
@Activate
@@ -147,6 +115,8 @@
@Deactivate
public void deactivate() {
+ unbindAuthenticationService(authServiceRef.get());
+ unbindAuthenticationStatService(authStatServiceRef.get());
log.info("Stopped AaaKafkaIntegration");
}
@@ -156,7 +126,7 @@
private void handleStat(AuthenticationStatisticsEvent event) {
eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
- log.info("AuthenticationStatisticsEvent sent successfully");
+ log.debug("AuthenticationStatisticsEvent sent successfully");
}
private JsonNode serialize(AuthenticationEvent event) {
@@ -173,7 +143,7 @@
}
private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
- log.info("Serializing AuthenticationStatisticsEvent");
+ log.debug("Serializing AuthenticationStatisticsEvent");
ObjectMapper mapper = new ObjectMapper();
ObjectNode authMetricsEvent = mapper.createObjectNode();
authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
diff --git a/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java
new file mode 100644
index 0000000..4badb39
--- /dev/null
+++ b/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import org.onosproject.event.EventListener;
+import org.onosproject.event.ListenerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Abstract implementation of a service-specific Kafka integration which provide
+ * convenience methods to dynamically bind/unbind event listener services.
+ */
+class AbstractKafkaIntegration {
+
+ Logger log = LoggerFactory.getLogger(getClass());
+
+ // OSGi demands dynamic @Reference to use volatile fields. We use a second
+ // field to store the actual service implementation reference and use that
+ // in the bind/unbind methods. We make sure to add listeners only if one was
+ // not already added.
+
+ <S extends ListenerService<?, L>, L extends EventListener<?>> void bindAndAddListener(
+ S incomingService, AtomicReference<S> serviceRef, L listener) {
+ if (incomingService == null) {
+ return;
+ }
+ if (serviceRef.compareAndSet(null, incomingService)) {
+ log.info("Adding listener on {}", incomingService.getClass().getSimpleName());
+ incomingService.addListener(listener);
+ } else {
+ log.warn("Trying to bind AccessDeviceService but it is already bound");
+ }
+ }
+
+ <S extends ListenerService<?, L>, L extends EventListener<?>> void unbindAndRemoveListener(
+ S outgoingService, AtomicReference<S> serviceRef, L listener) {
+ if (outgoingService != null &&
+ serviceRef.compareAndSet(outgoingService, null)) {
+ log.info("Removing listener on {}", outgoingService.getClass().getSimpleName());
+ outgoingService.removeListener(listener);
+ }
+ // Else, ignore. This is not the instance currently bound, or the
+ // outgoing service is null.
+ }
+}
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 36954ad..9ffb90f 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -19,38 +19,41 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Port;
import org.opencord.kafka.EventBusService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
import org.opencord.olt.AccessDeviceService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for access device events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class AccessDeviceKafkaIntegration {
+public class AccessDeviceKafkaIntegration extends AbstractKafkaIntegration {
public Logger log = LoggerFactory.getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAccessDeviceService",
unbind = "unbindAccessDeviceService")
- protected AccessDeviceService accessDeviceService;
+ protected volatile AccessDeviceService accessDeviceService;
+ private final AtomicReference<AccessDeviceService> accessDeviceServiceRef = new AtomicReference<>();
private final AccessDeviceListener listener = new InternalAccessDeviceListener();
@@ -67,26 +70,12 @@
private static final String ACTIVATED = "activated";
private static final String DISABLED = "disabled";
- protected void bindAccessDeviceService(AccessDeviceService accessDeviceService) {
- if (this.accessDeviceService == null) {
- log.info("Binding AccessDeviceService");
- this.accessDeviceService = accessDeviceService;
- log.info("Adding listener on AccessDeviceService");
- accessDeviceService.addListener(listener);
- } else {
- log.warn("Trying to bind AccessDeviceService but it is already bound");
- }
+ protected void bindAccessDeviceService(AccessDeviceService incomingService) {
+ bindAndAddListener(incomingService, accessDeviceServiceRef, listener);
}
- protected void unbindAccessDeviceService(AccessDeviceService accessDeviceService) {
- if (this.accessDeviceService == accessDeviceService) {
- log.info("Unbinding AccessDeviceService");
- this.accessDeviceService = null;
- log.info("Removing listener on AccessDeviceService");
- accessDeviceService.removeListener(listener);
- } else {
- log.warn("Trying to unbind AccessDeviceService but it is already unbound");
- }
+ protected void unbindAccessDeviceService(AccessDeviceService outgoingService) {
+ unbindAndRemoveListener(outgoingService, accessDeviceServiceRef, listener);
}
@Activate
@@ -96,6 +85,7 @@
@Deactivate
public void deactivate() {
+ unbindAccessDeviceService(accessDeviceServiceRef.get());
log.info("Stopped AccessDeviceKafkaIntegration");
}
diff --git a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
index b96bd5d..68450d1 100644
--- a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
@@ -20,11 +20,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
@@ -48,10 +48,10 @@
public Logger log = LoggerFactory.getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
private final DeviceListener listener = new InternalDeviceListener();
diff --git a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
index e1ca65a..f5f1636 100644
--- a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
@@ -19,12 +19,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.dhcpl2relay.DhcpAllocationInfo;
@@ -32,30 +26,34 @@
import org.opencord.dhcpl2relay.DhcpL2RelayListener;
import org.opencord.dhcpl2relay.DhcpL2RelayService;
import org.opencord.kafka.EventBusService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for DHCP L2 relay events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class DhcpL2RelayKafkaIntegration {
+public class DhcpL2RelayKafkaIntegration extends AbstractKafkaIntegration {
- public Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindDhcpL2RelayService",
unbind = "unbindDhcpL2RelayService")
- protected DhcpL2RelayService dhcpL2RelayService;
+ volatile protected DhcpL2RelayService ignore;
+ private final AtomicReference<DhcpL2RelayService> dhcpL2RelayServiceRef = new AtomicReference<>();
private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
@@ -70,26 +68,12 @@
private static final String MAC_ADDRESS = "macAddress";
private static final String IP_ADDRESS = "ipAddress";
- protected void bindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
- if (this.dhcpL2RelayService == null) {
- log.info("Binding DhcpL2RelayService");
- this.dhcpL2RelayService = dhcpL2RelayService;
- log.info("Adding listener on DhcpL2RelayService");
- dhcpL2RelayService.addListener(listener);
- } else {
- log.warn("Trying to bind DhcpL2RelayService but it is already bound");
- }
+ protected void bindDhcpL2RelayService(DhcpL2RelayService incomingService) {
+ bindAndAddListener(incomingService, dhcpL2RelayServiceRef, listener);
}
- protected void unbindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
- if (this.dhcpL2RelayService == dhcpL2RelayService) {
- log.info("Unbinding DhcpL2RelayService");
- this.dhcpL2RelayService = null;
- log.info("Removing listener on DhcpL2RelayService");
- dhcpL2RelayService.removeListener(listener);
- } else {
- log.warn("Trying to unbind DhcpL2RelayService but it is already unbound");
- }
+ protected void unbindDhcpL2RelayService(DhcpL2RelayService outgoingService) {
+ unbindAndRemoveListener(outgoingService, dhcpL2RelayServiceRef, listener);
}
@Activate
@@ -99,6 +83,7 @@
@Deactivate
public void deactivate() {
+ unbindDhcpL2RelayService(dhcpL2RelayServiceRef.get());
log.info("Stopped DhcpL2RelayKafkaIntegration");
}