Fixes for strange activation issues:

 * Don't reconfigure on activate (we can wait for config event)
 * Upgrade Kafka library to 1.1.1_1
 * Close immediately without waiting for in-flight operations to
   complete to avoid leaking resources

Change-Id: I60b02593ac7a5b4ddeab010c45e3b62c14eb4054
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 7a44872..d6e3392 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.config.ConfigFactory;
@@ -37,6 +38,7 @@
 
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -61,6 +63,9 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry configRegistry;
 
     private static StringSerializer stringSerializer = new StringSerializer();
@@ -83,6 +88,7 @@
                 }
             };
 
+    private static final String CLIENT_ID = "client.id";
     private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
     private static final String RETRIES = "retries";
     private static final String RECONNECT_BACKOFF = "reconnect.backoff.ms";
@@ -102,8 +108,6 @@
         configRegistry.registerConfigFactory(kafkaConfigFactory);
         configRegistry.addListener(configListener);
 
-        configure();
-
         log.info("Started");
     }
 
@@ -132,6 +136,7 @@
         checkNotNull(config);
 
         Properties properties = new Properties();
+        properties.put(CLIENT_ID, clusterService.getLocalNode().id().toString());
         properties.put(BOOTSTRAP_SERVERS, config.getBootstrapServers());
         properties.put(RETRIES, config.getRetries());
         properties.put(RECONNECT_BACKOFF, config.getReconnectBackoff());
@@ -156,6 +161,7 @@
         ClassLoader original = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         try {
+            log.info("Starting Kafka producer");
             kafkaProducer = new KafkaProducer<>(properties);
         } finally {
             Thread.currentThread().setContextClassLoader(original);
@@ -164,7 +170,9 @@
 
     private void shutdownKafka() {
         if (kafkaProducer != null) {
-            kafkaProducer.close();
+            log.info("Shutting down Kafka producer");
+            kafkaProducer.flush();
+            kafkaProducer.close(0, TimeUnit.MILLISECONDS);
             kafkaProducer = null;
         }
     }
@@ -193,6 +201,7 @@
 
         @Override
         public void event(NetworkConfigEvent event) {
+            log.info("Event type {}", event.type());
             switch (event.type()) {
             case CONFIG_ADDED:
             case CONFIG_UPDATED: