Handle the case where producer creation fails because brokers can't be resolved.

The kafka client won't retry by itself so we need to explicity try to
recreate the client in case the configured servers are now resolvable.

Change-Id: I575b07b858eb8dada03c6f13eb460ab3ebc2ffab
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 7dc2c35..6e34ceb 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -16,6 +16,7 @@
 package org.opencord.kafka.impl;
 
 import com.fasterxml.jackson.databind.JsonNode;
+import org.apache.kafka.common.KafkaException;
 import org.osgi.service.component.annotations.Activate;
 import org.osgi.service.component.annotations.Component;
 import org.osgi.service.component.annotations.Deactivate;
@@ -37,9 +38,13 @@
 
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+import static java.util.concurrent.Executors.newScheduledThreadPool;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
 import static org.onlab.util.Tools.groupedThreads;
 import static org.slf4j.LoggerFactory.getLogger;
@@ -57,8 +62,6 @@
     private static final String APP_NAME = "org.opencord.kafka";
     private ApplicationId appId;
 
-    private Boolean kafkaStarted = false;
-
     @Reference(cardinality = ReferenceCardinality.MANDATORY)
     protected CoreService coreService;
 
@@ -70,13 +73,16 @@
 
     private static StringSerializer stringSerializer = new StringSerializer();
 
-    private KafkaProducer<String, String> kafkaProducer;
+    private AtomicReference<KafkaProducer<String, String>> kafkaProducerRef = new AtomicReference<>();
 
     private InternalNetworkConfigListener configListener =
             new InternalNetworkConfigListener();
 
     private final ExecutorService executor = newSingleThreadExecutor(
             groupedThreads(this.getClass().getSimpleName(), "events", log));
+    private final ScheduledExecutorService scheduledExecutorService = newScheduledThreadPool(1,
+            groupedThreads(this.getClass().getSimpleName(), "kafka", log));
+    ScheduledFuture producerCreate;
 
     private ConfigFactory<ApplicationId, KafkaConfig> kafkaConfigFactory =
             new ConfigFactory<ApplicationId, KafkaConfig>(
@@ -153,8 +159,8 @@
         shutdownKafka();
     }
 
-    private void startKafka(Properties properties) {
-        if (!kafkaStarted) {
+    private synchronized void startKafka(Properties properties) {
+        if (kafkaProducerRef.get() == null) {
             // Kafka client doesn't play nice with the default OSGi classloader
             // This workaround temporarily changes the thread's classloader so that
             // the Kafka client can load the serializer classes.
@@ -162,22 +168,35 @@
             Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
             try {
                 log.info("Starting Kafka producer");
-                kafkaProducer = new KafkaProducer<>(properties);
-                kafkaStarted = true;
+                try {
+                    KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
+                    kafkaProducerRef.set(kafkaProducer);
+                } catch (KafkaException e) {
+                    log.error("Unable to create the kafka producer", e);
+                    // Try again in a little bit
+                    producerCreate = scheduledExecutorService.schedule(() -> configure(), 30, TimeUnit.SECONDS);
+                }
             } finally {
                 Thread.currentThread().setContextClassLoader(original);
             }
         }
     }
 
-    private void shutdownKafka() {
-        if (kafkaProducer != null) {
+    private synchronized void shutdownKafka() {
+        if (producerCreate != null) {
+            producerCreate.cancel(true);
+            producerCreate = null;
+        }
+
+        if (kafkaProducerRef.get() != null) {
+            KafkaProducer<String, String> kafkaProducer = kafkaProducerRef.get();
+
+            kafkaProducerRef.set(null);
+
             log.info("Shutting down Kafka producer");
             kafkaProducer.flush();
             kafkaProducer.close(0, TimeUnit.MILLISECONDS);
-            kafkaProducer = null;
         }
-        kafkaStarted = false;
     }
 
     private void logException(Exception e) {
@@ -188,7 +207,8 @@
 
     @Override
     public void send(String topic, JsonNode data) {
-        if (kafkaProducer == null) {
+        KafkaProducer<String, String> producer = kafkaProducerRef.get();
+        if (producer == null) {
             log.warn("Not sending event as kafkaProducer is not defined: {}", data.toString());
             return;
         }
@@ -197,8 +217,12 @@
             log.trace("Sending event to Kafka: {}", data.toString());
         }
 
-        kafkaProducer.send(new ProducerRecord<>(topic, data.toString()),
-                (r, e) -> logException(e));
+        try {
+            producer.send(new ProducerRecord<>(topic, data.toString()),
+                    (r, e) -> logException(e));
+        } catch (KafkaException e) {
+            log.warn("Sending to kafka failed", e);
+        }
     }
 
     private class InternalNetworkConfigListener implements NetworkConfigListener {