Bump ONOS to 2.2 and build with Java 11

This app relies on dynamic OSGi @Reference. New Karaf demands volatile
fields, which required different handling of service bind/unbind events.

Change-Id: I215f7ca5cbded3acd9c440fe723f6f21d77f9ed5
diff --git a/pom.xml b/pom.xml
index eaf987b..e77cd0b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -19,30 +19,23 @@
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
     <modelVersion>4.0.0</modelVersion>
 
-    <parent>
-        <groupId>org.onosproject</groupId>
-        <artifactId>onos-dependencies</artifactId>
-        <version>1.13.9</version>
-        <relativePath></relativePath>
-    </parent>
-
     <groupId>org.opencord</groupId>
     <artifactId>kafka</artifactId>
-    <version>1.1.0</version>
+    <version>2.0.0-SNAPSHOT</version>
     <packaging>bundle</packaging>
 
     <description>Kafka integration</description>
 
     <properties>
         <onos.app.name>org.opencord.kafka</onos.app.name>
-        <onos.version>1.13.9</onos.version>
+        <onos.version>2.2.0</onos.version>
         <onos.app.title>Kafka integration</onos.app.title>
         <onos.app.url>http://opencord.org</onos.app.url>
         <onos.app.readme>Integration with Kafka event bus</onos.app.readme>
-        <aaa.api.version>1.9.0</aaa.api.version>
-        <olt.api.version>3.0.1</olt.api.version>
-        <dhcpl2relay.api.version>1.6.0</dhcpl2relay.api.version>
-        <sadis.api.version>3.1.0</sadis.api.version>
+        <aaa.api.version>2.0.0-SNAPSHOT</aaa.api.version>
+        <olt.api.version>4.0.0-SNAPSHOT</olt.api.version>
+        <dhcpl2relay.api.version>2.0.0-SNAPSHOT</dhcpl2relay.api.version>
+        <sadis.api.version>4.0.0-SNAPSHOT</sadis.api.version>
     </properties>
 
     <dependencies>
@@ -50,21 +43,14 @@
             <groupId>org.onosproject</groupId>
             <artifactId>onos-api</artifactId>
             <version>${onos.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.onosproject</groupId>
-            <artifactId>onos-api</artifactId>
+            <artifactId>onlab-misc</artifactId>
             <version>${onos.version}</version>
-            <classifier>tests</classifier>
-            <scope>test</scope>
-        </dependency>
-
-
-        <dependency>
-            <groupId>org.onosproject</groupId>
-            <artifactId>onlab-junit</artifactId>
-            <version>${onos.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
@@ -77,20 +63,43 @@
             <groupId>org.opencord</groupId>
             <artifactId>aaa-api</artifactId>
             <version>${aaa.api.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.opencord</groupId>
             <artifactId>olt-api</artifactId>
             <version>${olt.api.version}</version>
+            <scope>provided</scope>
         </dependency>
 
         <dependency>
             <groupId>org.opencord</groupId>
             <artifactId>dhcpl2relay-api</artifactId>
             <version>${dhcpl2relay.api.version}</version>
+            <scope>provided</scope>
         </dependency>
 
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+            <version>2.9.5</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.osgi</groupId>
+            <artifactId>org.osgi.service.component.annotations</artifactId>
+            <version>1.4.0</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>1.7.25</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>
@@ -98,17 +107,51 @@
             <plugin>
                 <groupId>org.apache.felix</groupId>
                 <artifactId>maven-bundle-plugin</artifactId>
-            </plugin>
-
-            <plugin>
-                <groupId>org.apache.felix</groupId>
-                <artifactId>maven-scr-plugin</artifactId>
+                <version>4.1.0</version>
+                <extensions>true</extensions>
+                <configuration>
+                    <instructions>
+                        <_dsannotations-options>inherit</_dsannotations-options>
+                    </instructions>
+                </configuration>
             </plugin>
 
             <plugin>
                 <groupId>org.onosproject</groupId>
                 <artifactId>onos-maven-plugin</artifactId>
-                <version>1.11</version>
+                <version>2.2</version>
+                <executions>
+                    <execution>
+                        <id>cfg</id>
+                        <phase>generate-resources</phase>
+                        <goals>
+                            <goal>cfg</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>swagger</id>
+                        <phase>generate-sources</phase>
+                        <goals>
+                            <goal>swagger</goal>
+                        </goals>
+                    </execution>
+                    <execution>
+                        <id>app</id>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>app</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.0</version>
+                <configuration>
+                    <release>11</release>
+                </configuration>
             </plugin>
         </plugins>
     </build>
@@ -139,4 +182,10 @@
             </snapshots>
         </repository>
     </repositories>
+    <distributionManagement>
+        <snapshotRepository>
+            <id>ossrh</id>
+            <url>https://oss.sonatype.org/content/repositories/snapshots</url>
+        </snapshotRepository>
+    </distributionManagement>
 </project>
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 0720a86..7dc2c35 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -16,12 +16,11 @@
 package org.opencord.kafka.impl;
 
 import com.fasterxml.jackson.databind.JsonNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.Service;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -48,7 +47,6 @@
 /**
  * Sends events to a Kafka event bus.
  */
-@Service
 @Component(immediate = true)
 public class KafkaIntegration implements EventBusService {
 
@@ -61,13 +59,13 @@
 
     private Boolean kafkaStarted = false;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected ClusterService clusterService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected NetworkConfigRegistry configRegistry;
 
     private static StringSerializer stringSerializer = new StringSerializer();
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index f4efb0c..671f581 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -15,53 +15,53 @@
  */
 
 package org.opencord.kafka.integrations;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
 import org.opencord.aaa.AuthenticationEvent;
 import org.opencord.aaa.AuthenticationEventListener;
 import org.opencord.aaa.AuthenticationService;
-import org.opencord.kafka.EventBusService;
 import org.opencord.aaa.AuthenticationStatisticsEvent;
 import org.opencord.aaa.AuthenticationStatisticsEventListener;
 import org.opencord.aaa.AuthenticationStatisticsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opencord.kafka.EventBusService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Listens for AAA events and pushes them on a Kafka bus.
  */
 @Component(immediate = true)
-public class AaaKafkaIntegration {
+public class AaaKafkaIntegration extends AbstractKafkaIntegration {
 
-    public Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected EventBusService eventBusService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             policy = ReferencePolicy.DYNAMIC,
             bind = "bindAuthenticationService",
             unbind = "unbindAuthenticationService")
-    protected AuthenticationService authenticationService;
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+    protected volatile AuthenticationService ignore;
+    private final AtomicReference<AuthenticationService> authServiceRef = new AtomicReference<>();
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             policy = ReferencePolicy.DYNAMIC,
             bind = "bindAuthenticationStatService",
             unbind = "unbindAuthenticationStatService")
-    protected AuthenticationStatisticsService authenticationStatisticsService;
+    protected volatile AuthenticationStatisticsService ignore2;
+    private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
 
     private final AuthenticationEventListener listener = new InternalAuthenticationListener();
     private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
@@ -92,52 +92,20 @@
     private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
     private static final String REQUEST_RE_TX = "requestReTx";
 
-    protected void bindAuthenticationService(AuthenticationService authenticationService) {
-        log.info("bindAuthenticationService");
-        if (this.authenticationService == null) {
-            log.info("Binding AuthenticationService");
-            this.authenticationService = authenticationService;
-            log.info("Adding listener on AuthenticationService");
-            authenticationService.addListener(listener);
-        } else {
-            log.warn("Trying to bind AuthenticationService but it is already bound");
-        }
+    protected void bindAuthenticationService(AuthenticationService incomingService) {
+        bindAndAddListener(incomingService, authServiceRef, listener);
     }
 
-    protected void unbindAuthenticationService(AuthenticationService authenticationService) {
-        log.info("unbindAuthenticationService");
-        if (this.authenticationService == authenticationService) {
-            log.info("Unbinding AuthenticationService");
-            this.authenticationService = null;
-            log.info("Removing listener on AuthenticationService");
-            authenticationService.removeListener(listener);
-        } else {
-            log.warn("Trying to unbind AuthenticationService but it is already unbound");
-        }
+    protected void unbindAuthenticationService(AuthenticationService outgoingService) {
+        unbindAndRemoveListener(outgoingService, authServiceRef, listener);
     }
 
-    protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
-        log.info("bindAuthenticationStatService");
-        if (this.authenticationStatisticsService == null) {
-            log.info("Binding AuthenticationStastService");
-            this.authenticationStatisticsService = authenticationStatisticsService;
-            log.info("Adding listener on AuthenticationStatService");
-            authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
-        } else {
-            log.warn("Trying to bind AuthenticationStatService but it is already bound");
-        }
+    protected void bindAuthenticationStatService(AuthenticationStatisticsService incomingService) {
+        bindAndAddListener(incomingService, authStatServiceRef, authenticationStatisticsEventListener);
     }
 
-    protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
-        log.info("unbindAuthenticationStatService");
-        if (this.authenticationStatisticsService == authenticationStatisticsService) {
-            log.info("Unbinding AuthenticationStatService");
-            this.authenticationStatisticsService = null;
-            log.info("Removing listener on AuthenticationStatService");
-            authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
-        } else {
-            log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
-        }
+    protected void unbindAuthenticationStatService(AuthenticationStatisticsService outgoingService) {
+        unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
     }
 
     @Activate
@@ -147,6 +115,8 @@
 
     @Deactivate
     public void deactivate() {
+        unbindAuthenticationService(authServiceRef.get());
+        unbindAuthenticationStatService(authStatServiceRef.get());
         log.info("Stopped AaaKafkaIntegration");
     }
 
@@ -156,7 +126,7 @@
 
     private void handleStat(AuthenticationStatisticsEvent event) {
         eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
-        log.info("AuthenticationStatisticsEvent sent successfully");
+        log.debug("AuthenticationStatisticsEvent sent successfully");
     }
 
     private JsonNode serialize(AuthenticationEvent event) {
@@ -173,7 +143,7 @@
     }
 
     private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
-        log.info("Serializing AuthenticationStatisticsEvent");
+        log.debug("Serializing AuthenticationStatisticsEvent");
         ObjectMapper mapper = new ObjectMapper();
         ObjectNode authMetricsEvent = mapper.createObjectNode();
         authMetricsEvent.put(TIMESTAMP, Instant.now().toString());
diff --git a/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java
new file mode 100644
index 0000000..4badb39
--- /dev/null
+++ b/src/main/java/org/opencord/kafka/integrations/AbstractKafkaIntegration.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.opencord.kafka.integrations;
+
+import org.onosproject.event.EventListener;
+import org.onosproject.event.ListenerService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Abstract implementation of a service-specific Kafka integration which provide
+ * convenience methods to dynamically bind/unbind event listener services.
+ */
+class AbstractKafkaIntegration {
+
+    Logger log = LoggerFactory.getLogger(getClass());
+
+    // OSGi demands dynamic @Reference to use volatile fields. We use a second
+    // field to store the actual service implementation reference and use that
+    // in the bind/unbind methods. We make sure to add listeners only if one was
+    // not already added.
+
+    <S extends ListenerService<?, L>, L extends EventListener<?>> void bindAndAddListener(
+            S incomingService, AtomicReference<S> serviceRef, L listener) {
+        if (incomingService == null) {
+            return;
+        }
+        if (serviceRef.compareAndSet(null, incomingService)) {
+            log.info("Adding listener on {}", incomingService.getClass().getSimpleName());
+            incomingService.addListener(listener);
+        } else {
+            log.warn("Trying to bind AccessDeviceService but it is already bound");
+        }
+    }
+
+    <S extends ListenerService<?, L>, L extends EventListener<?>> void unbindAndRemoveListener(
+            S outgoingService, AtomicReference<S> serviceRef, L listener) {
+        if (outgoingService != null &&
+                serviceRef.compareAndSet(outgoingService, null)) {
+            log.info("Removing listener on {}", outgoingService.getClass().getSimpleName());
+            outgoingService.removeListener(listener);
+        }
+        // Else, ignore. This is not the instance currently bound, or the
+        // outgoing service is null.
+    }
+}
diff --git a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
index 36954ad..9ffb90f 100644
--- a/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AccessDeviceKafkaIntegration.java
@@ -19,38 +19,41 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.Port;
 import org.opencord.kafka.EventBusService;
 import org.opencord.olt.AccessDeviceEvent;
 import org.opencord.olt.AccessDeviceListener;
 import org.opencord.olt.AccessDeviceService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Listens for access device events and pushes them on a Kafka bus.
  */
 @Component(immediate = true)
-public class AccessDeviceKafkaIntegration {
+public class AccessDeviceKafkaIntegration extends AbstractKafkaIntegration {
 
     public Logger log = LoggerFactory.getLogger(getClass());
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected EventBusService eventBusService;
 
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             policy = ReferencePolicy.DYNAMIC,
             bind = "bindAccessDeviceService",
             unbind = "unbindAccessDeviceService")
-    protected AccessDeviceService accessDeviceService;
+    protected volatile AccessDeviceService accessDeviceService;
+    private final AtomicReference<AccessDeviceService> accessDeviceServiceRef = new AtomicReference<>();
 
     private final AccessDeviceListener listener = new InternalAccessDeviceListener();
 
@@ -67,26 +70,12 @@
     private static final String ACTIVATED = "activated";
     private static final String DISABLED = "disabled";
 
-    protected void bindAccessDeviceService(AccessDeviceService accessDeviceService) {
-        if (this.accessDeviceService == null) {
-            log.info("Binding AccessDeviceService");
-            this.accessDeviceService = accessDeviceService;
-            log.info("Adding listener on AccessDeviceService");
-            accessDeviceService.addListener(listener);
-        } else {
-            log.warn("Trying to bind AccessDeviceService but it is already bound");
-        }
+    protected void bindAccessDeviceService(AccessDeviceService incomingService) {
+        bindAndAddListener(incomingService, accessDeviceServiceRef, listener);
     }
 
-    protected void unbindAccessDeviceService(AccessDeviceService accessDeviceService) {
-        if (this.accessDeviceService == accessDeviceService) {
-            log.info("Unbinding AccessDeviceService");
-            this.accessDeviceService = null;
-            log.info("Removing listener on AccessDeviceService");
-            accessDeviceService.removeListener(listener);
-        } else {
-            log.warn("Trying to unbind AccessDeviceService but it is already unbound");
-        }
+    protected void unbindAccessDeviceService(AccessDeviceService outgoingService) {
+        unbindAndRemoveListener(outgoingService, accessDeviceServiceRef, listener);
     }
 
     @Activate
@@ -96,6 +85,7 @@
 
     @Deactivate
     public void deactivate() {
+        unbindAccessDeviceService(accessDeviceServiceRef.get());
         log.info("Stopped AccessDeviceKafkaIntegration");
     }
 
diff --git a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
index b96bd5d..68450d1 100644
--- a/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DeviceKafkaIntegration.java
@@ -20,11 +20,11 @@
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
 import org.onosproject.net.DeviceId;
 import org.onosproject.net.device.DeviceEvent;
 import org.onosproject.net.device.DeviceListener;
@@ -48,10 +48,10 @@
 
     public Logger log = LoggerFactory.getLogger(getClass());
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected EventBusService eventBusService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
     private final DeviceListener listener = new InternalDeviceListener();
diff --git a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
index e1ca65a..f5f1636 100644
--- a/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/DhcpL2RelayKafkaIntegration.java
@@ -19,12 +19,6 @@
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
 import org.opencord.dhcpl2relay.DhcpAllocationInfo;
@@ -32,30 +26,34 @@
 import org.opencord.dhcpl2relay.DhcpL2RelayListener;
 import org.opencord.dhcpl2relay.DhcpL2RelayService;
 import org.opencord.kafka.EventBusService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Listens for DHCP L2 relay events and pushes them on a Kafka bus.
  */
 @Component(immediate = true)
-public class DhcpL2RelayKafkaIntegration {
+public class DhcpL2RelayKafkaIntegration extends AbstractKafkaIntegration {
 
-    public Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected EventBusService eventBusService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             policy = ReferencePolicy.DYNAMIC,
             bind = "bindDhcpL2RelayService",
             unbind = "unbindDhcpL2RelayService")
-    protected DhcpL2RelayService dhcpL2RelayService;
+    volatile protected DhcpL2RelayService ignore;
+    private final AtomicReference<DhcpL2RelayService> dhcpL2RelayServiceRef = new AtomicReference<>();
 
     private final DhcpL2RelayListener listener = new InternalDhcpL2RelayListener();
 
@@ -70,26 +68,12 @@
     private static final String MAC_ADDRESS = "macAddress";
     private static final String IP_ADDRESS = "ipAddress";
 
-    protected void bindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
-        if (this.dhcpL2RelayService == null) {
-            log.info("Binding DhcpL2RelayService");
-            this.dhcpL2RelayService = dhcpL2RelayService;
-            log.info("Adding listener on DhcpL2RelayService");
-            dhcpL2RelayService.addListener(listener);
-        } else {
-            log.warn("Trying to bind DhcpL2RelayService but it is already bound");
-        }
+    protected void bindDhcpL2RelayService(DhcpL2RelayService incomingService) {
+        bindAndAddListener(incomingService, dhcpL2RelayServiceRef, listener);
     }
 
-    protected void unbindDhcpL2RelayService(DhcpL2RelayService dhcpL2RelayService) {
-        if (this.dhcpL2RelayService == dhcpL2RelayService) {
-            log.info("Unbinding DhcpL2RelayService");
-            this.dhcpL2RelayService = null;
-            log.info("Removing listener on DhcpL2RelayService");
-            dhcpL2RelayService.removeListener(listener);
-        } else {
-            log.warn("Trying to unbind DhcpL2RelayService but it is already unbound");
-        }
+    protected void unbindDhcpL2RelayService(DhcpL2RelayService outgoingService) {
+        unbindAndRemoveListener(outgoingService, dhcpL2RelayServiceRef, listener);
     }
 
     @Activate
@@ -99,6 +83,7 @@
 
     @Deactivate
     public void deactivate() {
+        unbindDhcpL2RelayService(dhcpL2RelayServiceRef.get());
         log.info("Stopped DhcpL2RelayKafkaIntegration");
     }