Fixes for strange activation issues:

 * Don't reconfigure on activate (we can wait for config event)
 * Upgrade Kafka library to 1.1.1_1
 * Close immediately without waiting for in-flight operations to
   complete to avoid leaking resources

Change-Id: I60b02593ac7a5b4ddeab010c45e3b62c14eb4054
diff --git a/app.xml b/app.xml
index 84046e3..07b66d2 100644
--- a/app.xml
+++ b/app.xml
@@ -20,5 +20,5 @@
      features="${project.artifactId}" apps="org.opencord.olt,org.opencord.dhcpl2relay,org.opencord.aaa">
     <description>${project.description}</description>
     <artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
-    <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.0_1</artifact>
+    <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.1_1</artifact>
 </app>
diff --git a/features.xml b/features.xml
index 5d83ff4..2ce58dd 100644
--- a/features.xml
+++ b/features.xml
@@ -19,6 +19,6 @@
              description="${project.description}">
         <feature>onos-api</feature>
         <bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
-        <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.0_1</bundle>
+        <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.1_1</bundle>
     </feature>
 </features>
diff --git a/pom.xml b/pom.xml
index 94c9485..c3af919 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,7 @@
         <dependency>
             <groupId>org.apache.servicemix.bundles</groupId>
             <artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
-            <version>1.1.0_1</version>
+            <version>1.1.1_1</version>
         </dependency>
 
         <dependency>
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 7a44872..d6e3392 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.StringSerializer;
+import org.onosproject.cluster.ClusterService;
 import org.onosproject.core.ApplicationId;
 import org.onosproject.core.CoreService;
 import org.onosproject.net.config.ConfigFactory;
@@ -37,6 +38,7 @@
 
 import java.util.Properties;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -61,6 +63,9 @@
     protected CoreService coreService;
 
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+    protected ClusterService clusterService;
+
+    @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected NetworkConfigRegistry configRegistry;
 
     private static StringSerializer stringSerializer = new StringSerializer();
@@ -83,6 +88,7 @@
                 }
             };
 
+    private static final String CLIENT_ID = "client.id";
     private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
     private static final String RETRIES = "retries";
     private static final String RECONNECT_BACKOFF = "reconnect.backoff.ms";
@@ -102,8 +108,6 @@
         configRegistry.registerConfigFactory(kafkaConfigFactory);
         configRegistry.addListener(configListener);
 
-        configure();
-
         log.info("Started");
     }
 
@@ -132,6 +136,7 @@
         checkNotNull(config);
 
         Properties properties = new Properties();
+        properties.put(CLIENT_ID, clusterService.getLocalNode().id().toString());
         properties.put(BOOTSTRAP_SERVERS, config.getBootstrapServers());
         properties.put(RETRIES, config.getRetries());
         properties.put(RECONNECT_BACKOFF, config.getReconnectBackoff());
@@ -156,6 +161,7 @@
         ClassLoader original = Thread.currentThread().getContextClassLoader();
         Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
         try {
+            log.info("Starting Kafka producer");
             kafkaProducer = new KafkaProducer<>(properties);
         } finally {
             Thread.currentThread().setContextClassLoader(original);
@@ -164,7 +170,9 @@
 
     private void shutdownKafka() {
         if (kafkaProducer != null) {
-            kafkaProducer.close();
+            log.info("Shutting down Kafka producer");
+            kafkaProducer.flush();
+            kafkaProducer.close(0, TimeUnit.MILLISECONDS);
             kafkaProducer = null;
         }
     }
@@ -193,6 +201,7 @@
 
         @Override
         public void event(NetworkConfigEvent event) {
+            log.info("Event type {}", event.type());
             switch (event.type()) {
             case CONFIG_ADDED:
             case CONFIG_UPDATED: