Bump ONOS to 2.2 and build with Java 11
This app relies on dynamic OSGi @Reference. New Karaf demands volatile
fields, which required different handling of service bind/unbind events.
Change-Id: I215f7ca5cbded3acd9c440fe723f6f21d77f9ed5
diff --git a/pom.xml b/pom.xml
index eaf987b..e77cd0b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,30 +19,23 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.onosproject</groupId>
- <artifactId>onos-dependencies</artifactId>
- <version>1.13.9</version>
- <relativePath></relativePath>
- </parent>
-
<groupId>org.opencord</groupId>
<artifactId>kafka</artifactId>
- <version>1.1.0</version>
+ <version>2.0.0-SNAPSHOT</version>
<packaging>bundle</packaging>
<description>Kafka integration</description>
<properties>
<onos.app.name>org.opencord.kafka</onos.app.name>
- <onos.version>1.13.9</onos.version>
+ <onos.version>2.2.0</onos.version>
<onos.app.title>Kafka integration</onos.app.title>
<onos.app.url>http://opencord.org</onos.app.url>
<onos.app.readme>Integration with Kafka event bus</onos.app.readme>
- <aaa.api.version>1.9.0</aaa.api.version>
- <olt.api.version>3.0.1</olt.api.version>
- <dhcpl2relay.api.version>1.6.0</dhcpl2relay.api.version>
- <sadis.api.version>3.1.0</sadis.api.version>
+ <aaa.api.version>2.0.0-SNAPSHOT</aaa.api.version>
+ <olt.api.version>4.0.0-SNAPSHOT</olt.api.version>
+ <dhcpl2relay.api.version>2.0.0-SNAPSHOT</dhcpl2relay.api.version>
+ <sadis.api.version>4.0.0-SNAPSHOT</sadis.api.version>
</properties>
<dependencies>
@@ -50,21 +43,14 @@
<groupId>org.onosproject</groupId>
<artifactId>onos-api</artifactId>
<version>${onos.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.onosproject</groupId>
- <artifactId>onos-api</artifactId>
+ <artifactId>onlab-misc</artifactId>
<version>${onos.version}</version>
- <classifier>tests</classifier>
- <scope>test</scope>
- </dependency>
-
-
- <dependency>
- <groupId>org.onosproject</groupId>
- <artifactId>onlab-junit</artifactId>
- <version>${onos.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
@@ -77,20 +63,43 @@
<groupId>org.opencord</groupId>
<artifactId>aaa-api</artifactId>
<version>${aaa.api.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.opencord</groupId>
<artifactId>olt-api</artifactId>
<version>${olt.api.version}</version>
+ <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.opencord</groupId>
<artifactId>dhcpl2relay-api</artifactId>
<version>${dhcpl2relay.api.version}</version>
+ <scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ <version>2.9.5</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.osgi</groupId>
+ <artifactId>org.osgi.service.component.annotations</artifactId>
+ <version>1.4.0</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ <version>1.7.25</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
@@ -98,17 +107,51 @@
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
- </plugin>
-
- <plugin>
- <groupId>org.apache.felix</groupId>
- <artifactId>maven-scr-plugin</artifactId>
+ <version>4.1.0</version>
+ <extensions>true</extensions>
+ <configuration>
+ <instructions>
+ <_dsannotations-options>inherit</_dsannotations-options>
+ </instructions>
+ </configuration>
</plugin>
<plugin>
<groupId>org.onosproject</groupId>
<artifactId>onos-maven-plugin</artifactId>
- <version>1.11</version>
+ <version>2.2</version>
+ <executions>
+ <execution>
+ <id>cfg</id>
+ <phase>generate-resources</phase>
+ <goals>
+ <goal>cfg</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>swagger</id>
+ <phase>generate-sources</phase>
+ <goals>
+ <goal>swagger</goal>
+ </goals>
+ </execution>
+ <execution>
+ <id>app</id>
+ <phase>package</phase>
+ <goals>
+ <goal>app</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.0</version>
+ <configuration>
+ <release>11</release>
+ </configuration>
</plugin>
</plugins>
</build>
@@ -139,4 +182,10 @@
</snapshots>
</repository>
</repositories>
+ <distributionManagement>
+ <snapshotRepository>
+ <id>ossrh</id>
+ <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+ </snapshotRepository>
+ </distributionManagement>
</project>
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 0720a86..7dc2c35 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -16,12 +16,11 @@
package org.opencord.kafka.impl;
import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
@@ -48,7 +47,6 @@
/**
* Sends events to a Kafka event bus.
*/
-@Service
@Component(immediate = true)
public class KafkaIntegration implements EventBusService {
@@ -61,13 +59,13 @@
private Boolean kafkaStarted = false;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected CoreService coreService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected ClusterService clusterService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected NetworkConfigRegistry configRegistry;
private static StringSerializer stringSerializer = new StringSerializer();
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index f4efb0c..671f581 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -15,53 +15,53 @@
*/
package org.opencord.kafka.integrations;
+
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.aaa.AuthenticationEvent;
import org.opencord.aaa.AuthenticationEventListener;
import org.opencord.aaa.AuthenticationService;
-import org.opencord.kafka.EventBusService;
import org.opencord.aaa.AuthenticationStatisticsEvent;
import org.opencord.aaa.AuthenticationStatisticsEventListener;
import org.opencord.aaa.AuthenticationStatisticsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opencord.kafka.EventBusService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for AAA events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class AaaKafkaIntegration {
+public class AaaKafkaIntegration extends AbstractKafkaIntegration {
- public Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAuthenticationService",
unbind = "unbindAuthenticationService")
- protected AuthenticationService authenticationService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ protected volatile AuthenticationService ignore;
+ private final AtomicReference<AuthenticationService> authServiceRef = new AtomicReference<>();
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAuthenticationStatService",
unbind = "unbindAuthenticationStatService")
- protected AuthenticationStatisticsService authenticationStatisticsService;
+ protected volatile AuthenticationStatisticsService ignore2;
+ private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
private final AuthenticationEventListener listener = new InternalAuthenticationListener();
private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
@@ -92,52 +92,20 @@
private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
private static final String REQUEST_RE_TX = "requestReTx";
- protected void bindAuthenticationService(AuthenticationService authenticationService) {
- log.info("bindAuthenticationService");
- if (this.authenticationService == null) {
- log.info("Binding AuthenticationService");
- this.authenticationService = authenticationService;
- log.info("Adding listener on AuthenticationService");
- authenticationService.addListener(listener);
- } else {
- log.warn("Trying to bind AuthenticationService but it is already bound");
- }
+ protected void bindAuthenticationService(AuthenticationService incomingService) {
+ bindAndAddListener(incomingService, authServiceRef, listener);
}
- protected void unbindAuthenticationService(AuthenticationService authenticationService) {
- log.info("unbindAuthenticationService");
- if (this.authenticationService == authenticationService) {
- log.info("Unbinding AuthenticationService");
- this.authenticationService = null;
- log.info("Removing listener on AuthenticationService");
- authenticationService.removeListener(listener);
- } else {
- log.warn("Trying to unbind AuthenticationService but it is already unbound");
- }
+ protected void unbindAuthenticationService(AuthenticationService outgoingService) {
+ unbindAndRemoveListener(outgoingService, authServiceRef, listener);
}
- protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
- log.info("bindAuthenticationStatService");
- if (this.authenticationStatisticsService == null) {
- log.info("Binding AuthenticationStastService");
- this.authenticationStatisticsService = authenticationStatisticsService;
- log.info("Adding listener on AuthenticationStatService");
- authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
- } else {
- log.warn("Trying to bind AuthenticationStatService but it is already bound");
- }
+ protected void bindAuthenticationStatService(AuthenticationStatisticsService incomingService) {
+ bindAndAddListener(incomingService, authStatServiceRef, authenticationStatisticsEventListener);
}
- protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
- log.info("unbindAuthenticationStatService");
- if (this.authenticationStatisticsService == authenticationStatisticsService) {
- log.info("Unbinding AuthenticationStatService");
- this.authenticationStatisticsService = null;
- log.info("Removing listener on AuthenticationStatService");
- authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
- } else {
- log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
- }
+ protected void unbindAuthenticationStatService(AuthenticationStatisticsService outgoingService) {
+ unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
}
@Activate
@@ -147,6 +115,8 @@
@Deactivate
public void deactivate() {
+ unbindAuthenticationService(authServiceRef.get());
+ unbindAuthenticationStatService(authStatServiceRef.get());
log.info("Stopped AaaKafkaIntegration");
}
@@ -156,7 +126,7 @@
private void handleStat(AuthenticationStatisticsEvent event) {
eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
- log.info("AuthenticationStatisticsEvent sent successfully");
+ log.debug("AuthenticationStatisticsEvent sent successfully");
}
private JsonNode serialize(AuthenticationEvent event) {
@@ -173,7 +143,7 @@
}
private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
- log.info("Serializing AuthenticationStatisticsEvent");
+ log.debug("Serializing AuthenticationStatisticsEvent");
ObjectMapper mapper = new ObjectMapper();
ObjectNode authMetricsEvent = mapper.createObjectNode();
authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
diff --git a/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java
new file mode 100644
index 0000000..4badb39
--- /dev/null
+++ b/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import org.onosproject.event.EventListener;
+import org.onosproject.event.ListenerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Abstract implementation of a service-specific Kafka integration which provide
+ * convenience methods to dynamically bind/unbind event listener services.
+ */
+class AbstractKafkaIntegration {
+
+ Logger log = LoggerFactory.getLogger(getClass());
+
+ // OSGi demands dynamic @Reference to use volatile fields. We use a second
+ // field to store the actual service implementation reference and use that
+ // in the bind/unbind methods. We make sure to add listeners only if one was
+ // not already added.
+
+ <S extends ListenerService<?, L>, L extends EventListener<?>> void bindAndAddListener(
+ S incomingService, AtomicReference<S> serviceRef, L listener) {
+ if (incomingService == null) {
+ return;
+ }
+ if (serviceRef.compareAndSet(null, incomingService)) {
+ log.info("Adding listener on {}", incomingService.getClass().getSimpleName());
+ incomingService.addListener(listener);
+ } else {
+ log.warn("Trying to bind AccessDeviceService but it is already bound");
+ }
+ }
+
+ <S extends ListenerService<?, L>, L extends EventListener<?>> void unbindAndRemoveListener(
+ S outgoingService, AtomicReference<S> serviceRef, L listener) {
+ if (outgoingService != null &&
+ serviceRef.compareAndSet(outgoingService, null)) {
+ log.info("Removing listener on {}", outgoingService.getClass().getSimpleName());
+ outgoingService.removeListener(listener);
+ }
+ // Else, ignore. This is not the instance currently bound, or the
+ // outgoing service is null.
+ }
+}
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 36954ad..9ffb90f 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -19,38 +19,41 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.Port;
import org.opencord.kafka.EventBusService;
import org.opencord.olt.AccessDeviceEvent;
import org.opencord.olt.AccessDeviceListener;
import org.opencord.olt.AccessDeviceService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for access device events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class AccessDeviceKafkaIntegration {
+public class AccessDeviceKafkaIntegration extends AbstractKafkaIntegration {
public Logger log = LoggerFactory.getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindAccessDeviceService",
unbind = "unbindAccessDeviceService")
- protected AccessDeviceService accessDeviceService;
+ protected volatile AccessDeviceService accessDeviceService;
+ private final AtomicReference<AccessDeviceService> accessDeviceServiceRef = new AtomicReference<>();
private final AccessDeviceListener listener = new InternalAccessDeviceListener();
@@ -67,26 +70,12 @@
private static final String ACTIVATED = "activated";
private static final String DISABLED = "disabled";
- protected void bindAccessDeviceService(AccessDeviceService accessDeviceService) {
- if (this.accessDeviceService == null) {
- log.info("Binding AccessDeviceService");
- this.accessDeviceService = accessDeviceService;
- log.info("Adding listener on AccessDeviceService");
- accessDeviceService.addListener(listener);
- } else {
- log.warn("Trying to bind AccessDeviceService but it is already bound");
- }
+ protected void bindAccessDeviceService(AccessDeviceService incomingService) {
+ bindAndAddListener(incomingService, accessDeviceServiceRef, listener);
}
- protected void unbindAccessDeviceService(AccessDeviceService accessDeviceService) {
- if (this.accessDeviceService == accessDeviceService) {
- log.info("Unbinding AccessDeviceService");
- this.accessDeviceService = null;
- log.info("Removing listener on AccessDeviceService");
- accessDeviceService.removeListener(listener);
- } else {
- log.warn("Trying to unbind AccessDeviceService but it is already unbound");
- }
+ protected void unbindAccessDeviceService(AccessDeviceService outgoingService) {
+ unbindAndRemoveListener(outgoingService, accessDeviceServiceRef, listener);
}
@Activate
@@ -96,6 +85,7 @@
@Deactivate
public void deactivate() {
+ unbindAccessDeviceService(accessDeviceServiceRef.get());
log.info("Stopped AccessDeviceKafkaIntegration");
}
diff --git a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
index b96bd5d..68450d1 100644
--- a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
@@ -20,11 +20,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
import org.onosproject.net.DeviceId;
import org.onosproject.net.device.DeviceEvent;
import org.onosproject.net.device.DeviceListener;
@@ -48,10 +48,10 @@
public Logger log = LoggerFactory.getLogger(getClass());
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
private final DeviceListener listener = new InternalDeviceListener();
diff --git a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
index e1ca65a..f5f1636 100644
--- a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
@@ -19,12 +19,6 @@
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
import org.onosproject.net.AnnotationKeys;
import org.onosproject.net.device.DeviceService;
import org.opencord.dhcpl2relay.DhcpAllocationInfo;
@@ -32,30 +26,34 @@
import org.opencord.dhcpl2relay.DhcpL2RelayListener;
import org.opencord.dhcpl2relay.DhcpL2RelayService;
import org.opencord.kafka.EventBusService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
/**
* Listens for DHCP L2 relay events and pushes them on a Kafka bus.
*/
@Component(immediate = true)
-public class DhcpL2RelayKafkaIntegration {
+public class DhcpL2RelayKafkaIntegration extends AbstractKafkaIntegration {
- public Logger log = LoggerFactory.getLogger(getClass());
-
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected EventBusService eventBusService;
- @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ @Reference(cardinality = ReferenceCardinality.MANDATORY)
protected DeviceService deviceService;
- @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+ @Reference(cardinality = ReferenceCardinality.OPTIONAL,
policy = ReferencePolicy.DYNAMIC,
bind = "bindDhcpL2RelayService",
unbind = "unbindDhcpL2RelayService")
- protected DhcpL2RelayService dhcpL2RelayService;
+ volatile protected DhcpL2RelayService ignore;
+ private final AtomicReference<DhcpL2RelayService> dhcpL2RelayServiceRef = new AtomicReference<>();
private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
@@ -70,26 +68,12 @@
private static final String MAC_ADDRESS = "macAddress";
private static final String IP_ADDRESS = "ipAddress";
- protected void bindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
- if (this.dhcpL2RelayService == null) {
- log.info("Binding DhcpL2RelayService");
- this.dhcpL2RelayService = dhcpL2RelayService;
- log.info("Adding listener on DhcpL2RelayService");
- dhcpL2RelayService.addListener(listener);
- } else {
- log.warn("Trying to bind DhcpL2RelayService but it is already bound");
- }
+ protected void bindDhcpL2RelayService(DhcpL2RelayService incomingService) {
+ bindAndAddListener(incomingService, dhcpL2RelayServiceRef, listener);
}
- protected void unbindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
- if (this.dhcpL2RelayService == dhcpL2RelayService) {
- log.info("Unbinding DhcpL2RelayService");
- this.dhcpL2RelayService = null;
- log.info("Removing listener on DhcpL2RelayService");
- dhcpL2RelayService.removeListener(listener);
- } else {
- log.warn("Trying to unbind DhcpL2RelayService but it is already unbound");
- }
+ protected void unbindDhcpL2RelayService(DhcpL2RelayService outgoingService) {
+ unbindAndRemoveListener(outgoingService, dhcpL2RelayServiceRef, listener);
}
@Activate
@@ -99,6 +83,7 @@
@Deactivate
public void deactivate() {
+ unbindDhcpL2RelayService(dhcpL2RelayServiceRef.get());
log.info("Stopped DhcpL2RelayKafkaIntegration");
}