Bump ONOS to 2.2 and build with Java 11
This app relies on dynamic OSGi @Reference. New Karaf demands volatile
fields, which required different handling of service bind/unbind events.
Change-Id: I215f7ca5cbded3acd9c440fe723f6f21d77f9ed5
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 36954ad..9ffb90f 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -19,38 +19,41 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Port;
import org.opencord.kafka.EventBusService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
import org.opencord.olt.AccessDeviceService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for access device events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class AccessDeviceKafkaIntegration {
+public class AccessDeviceKafkaIntegration extends AbstractKafkaIntegration {
public Logger log = LoggerFactory.getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAccessDeviceService",
unbind = "unbindAccessDeviceService")
- protected AccessDeviceService accessDeviceService;
+ protected volatile AccessDeviceService accessDeviceService;
+ private final AtomicReference<AccessDeviceService> accessDeviceServiceRef = new AtomicReference<>();
private final AccessDeviceListener listener = new InternalAccessDeviceListener();
@@ -67,26 +70,12 @@
private static final String ACTIVATED = "activated";
private static final String DISABLED = "disabled";
- protected void bindAccessDeviceService(AccessDeviceService accessDeviceService) {
- if (this.accessDeviceService == null) {
- log.info("Binding AccessDeviceService");
- this.accessDeviceService = accessDeviceService;
- log.info("Adding listener on AccessDeviceService");
- accessDeviceService.addListener(listener);
- } else {
- log.warn("Trying to bind AccessDeviceService but it is already bound");
- }
+ protected void bindAccessDeviceService(AccessDeviceService incomingService) {
+ bindAndAddListener(incomingService, accessDeviceServiceRef, listener);
}
- protected void unbindAccessDeviceService(AccessDeviceService accessDeviceService) {
- if (this.accessDeviceService == accessDeviceService) {
- log.info("Unbinding AccessDeviceService");
- this.accessDeviceService = null;
- log.info("Removing listener on AccessDeviceService");
- accessDeviceService.removeListener(listener);
- } else {
- log.warn("Trying to unbind AccessDeviceService but it is already unbound");
- }
+ protected void unbindAccessDeviceService(AccessDeviceService outgoingService) {
+ unbindAndRemoveListener(outgoingService, accessDeviceServiceRef, listener);
}
@Activate
@@ -96,6 +85,7 @@
@Deactivate
public void deactivate() {
+ unbindAccessDeviceService(accessDeviceServiceRef.get());
log.info("Stopped AccessDeviceKafkaIntegration");
}