Bump ONOS to 2.2 and build with Java 11

This app relies on dynamic OSGi @Reference. New Karaf demands volatile
fields, which required different handling of service bind/unbind events.

Change-Id: I215f7ca5cbded3acd9c440fe723f6f21d77f9ed5
diff --git a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
index f4efb0c..671f581 100644
--- a/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/integrations/AaaKafkaIntegration.java
@@ -15,53 +15,53 @@
  */
 
 package org.opencord.kafka.integrations;
+
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.databind.node.ObjectNode;
-import org.apache.felix.scr.annotations.Activate;
-import org.apache.felix.scr.annotations.Component;
-import org.apache.felix.scr.annotations.Deactivate;
-import org.apache.felix.scr.annotations.Reference;
-import org.apache.felix.scr.annotations.ReferenceCardinality;
-import org.apache.felix.scr.annotations.ReferencePolicy;
 import org.onosproject.net.AnnotationKeys;
 import org.onosproject.net.device.DeviceService;
 import org.opencord.aaa.AuthenticationEvent;
 import org.opencord.aaa.AuthenticationEventListener;
 import org.opencord.aaa.AuthenticationService;
-import org.opencord.kafka.EventBusService;
 import org.opencord.aaa.AuthenticationStatisticsEvent;
 import org.opencord.aaa.AuthenticationStatisticsEventListener;
 import org.opencord.aaa.AuthenticationStatisticsService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.opencord.kafka.EventBusService;
+import org.osgi.service.component.annotations.Activate;
+import org.osgi.service.component.annotations.Component;
+import org.osgi.service.component.annotations.Deactivate;
+import org.osgi.service.component.annotations.Reference;
+import org.osgi.service.component.annotations.ReferenceCardinality;
+import org.osgi.service.component.annotations.ReferencePolicy;
 
 import java.time.Instant;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Listens for AAA events and pushes them on a Kafka bus.
  */
 @Component(immediate = true)
-public class AaaKafkaIntegration {
+public class AaaKafkaIntegration extends AbstractKafkaIntegration {
 
-    public Logger log = LoggerFactory.getLogger(getClass());
-
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected EventBusService eventBusService;
 
-    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected DeviceService deviceService;
 
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             policy = ReferencePolicy.DYNAMIC,
             bind = "bindAuthenticationService",
             unbind = "unbindAuthenticationService")
-    protected AuthenticationService authenticationService;
-    @Reference(cardinality = ReferenceCardinality.OPTIONAL_UNARY,
+    protected volatile AuthenticationService ignore;
+    private final AtomicReference<AuthenticationService> authServiceRef = new AtomicReference<>();
+    @Reference(cardinality = ReferenceCardinality.OPTIONAL,
             policy = ReferencePolicy.DYNAMIC,
             bind = "bindAuthenticationStatService",
             unbind = "unbindAuthenticationStatService")
-    protected AuthenticationStatisticsService authenticationStatisticsService;
+    protected volatile AuthenticationStatisticsService ignore2;
+    private final AtomicReference<AuthenticationStatisticsService> authStatServiceRef = new AtomicReference<>();
 
     private final AuthenticationEventListener listener = new InternalAuthenticationListener();
     private final AuthenticationStatisticsEventListener authenticationStatisticsEventListener =
@@ -92,52 +92,20 @@
     private static final String REQUEST_RTT_MILLIS = "requestRttMillis";
     private static final String REQUEST_RE_TX = "requestReTx";
 
-    protected void bindAuthenticationService(AuthenticationService authenticationService) {
-        log.info("bindAuthenticationService");
-        if (this.authenticationService == null) {
-            log.info("Binding AuthenticationService");
-            this.authenticationService = authenticationService;
-            log.info("Adding listener on AuthenticationService");
-            authenticationService.addListener(listener);
-        } else {
-            log.warn("Trying to bind AuthenticationService but it is already bound");
-        }
+    protected void bindAuthenticationService(AuthenticationService incomingService) {
+        bindAndAddListener(incomingService, authServiceRef, listener);
     }
 
-    protected void unbindAuthenticationService(AuthenticationService authenticationService) {
-        log.info("unbindAuthenticationService");
-        if (this.authenticationService == authenticationService) {
-            log.info("Unbinding AuthenticationService");
-            this.authenticationService = null;
-            log.info("Removing listener on AuthenticationService");
-            authenticationService.removeListener(listener);
-        } else {
-            log.warn("Trying to unbind AuthenticationService but it is already unbound");
-        }
+    protected void unbindAuthenticationService(AuthenticationService outgoingService) {
+        unbindAndRemoveListener(outgoingService, authServiceRef, listener);
     }
 
-    protected void bindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
-        log.info("bindAuthenticationStatService");
-        if (this.authenticationStatisticsService == null) {
-            log.info("Binding AuthenticationStastService");
-            this.authenticationStatisticsService = authenticationStatisticsService;
-            log.info("Adding listener on AuthenticationStatService");
-            authenticationStatisticsService.addListener(authenticationStatisticsEventListener);
-        } else {
-            log.warn("Trying to bind AuthenticationStatService but it is already bound");
-        }
+    protected void bindAuthenticationStatService(AuthenticationStatisticsService incomingService) {
+        bindAndAddListener(incomingService, authStatServiceRef, authenticationStatisticsEventListener);
     }
 
-    protected void unbindAuthenticationStatService(AuthenticationStatisticsService authenticationStatisticsService) {
-        log.info("unbindAuthenticationStatService");
-        if (this.authenticationStatisticsService == authenticationStatisticsService) {
-            log.info("Unbinding AuthenticationStatService");
-            this.authenticationStatisticsService = null;
-            log.info("Removing listener on AuthenticationStatService");
-            authenticationStatisticsService.removeListener(authenticationStatisticsEventListener);
-        } else {
-            log.warn("Trying to unbind AuthenticationStatService but it is already unbound");
-        }
+    protected void unbindAuthenticationStatService(AuthenticationStatisticsService outgoingService) {
+        unbindAndRemoveListener(outgoingService, authStatServiceRef, authenticationStatisticsEventListener);
     }
 
     @Activate
@@ -147,6 +115,8 @@
 
     @Deactivate
     public void deactivate() {
+        unbindAuthenticationService(authServiceRef.get());
+        unbindAuthenticationStatService(authStatServiceRef.get());
         log.info("Stopped AaaKafkaIntegration");
     }
 
@@ -156,7 +126,7 @@
 
     private void handleStat(AuthenticationStatisticsEvent event) {
         eventBusService.send(AUTHENTICATION_STATISTICS_TOPIC, serializeStat(event));
-        log.info("AuthenticationStatisticsEvent sent successfully");
+        log.debug("AuthenticationStatisticsEvent sent successfully");
     }
 
     private JsonNode serialize(AuthenticationEvent event) {
@@ -173,7 +143,7 @@
     }
 
     private JsonNode serializeStat(AuthenticationStatisticsEvent event) {
-        log.info("Serializing AuthenticationStatisticsEvent");
+        log.debug("Serializing AuthenticationStatisticsEvent");
         ObjectMapper mapper = new ObjectMapper();
         ObjectNode authMetricsEvent = mapper.createObjectNode();
         authMetricsEvent.put(TIMESTAMP, Instant.now().toString());