[SEBA-593] Create only one KafkaProducer when the app is activated (or the config change)

Change-Id: I2c95048ca7ff0add4ed5271bb855df4bb438abe5
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 3610d50..0720a86 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -59,6 +59,8 @@
     private static final String APP_NAME = "org.opencord.kafka";
     private ApplicationId appId;
 
+    private Boolean kafkaStarted = false;
+
     @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
     protected CoreService coreService;
 
@@ -154,18 +156,19 @@
     }
 
     private void startKafka(Properties properties) {
-        shutdownKafka();
-
-        // Kafka client doesn't play nice with the default OSGi classloader
-        // This workaround temporarily changes the thread's classloader so that
-        // the Kafka client can load the serializer classes.
-        ClassLoader original = Thread.currentThread().getContextClassLoader();
-        Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
-        try {
-            log.info("Starting Kafka producer");
-            kafkaProducer = new KafkaProducer<>(properties);
-        } finally {
-            Thread.currentThread().setContextClassLoader(original);
+        if (!kafkaStarted) {
+            // Kafka client doesn't play nice with the default OSGi classloader
+            // This workaround temporarily changes the thread's classloader so that
+            // the Kafka client can load the serializer classes.
+            ClassLoader original = Thread.currentThread().getContextClassLoader();
+            Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+            try {
+                log.info("Starting Kafka producer");
+                kafkaProducer = new KafkaProducer<>(properties);
+                kafkaStarted = true;
+            } finally {
+                Thread.currentThread().setContextClassLoader(original);
+            }
         }
     }
 
@@ -176,6 +179,7 @@
             kafkaProducer.close(0, TimeUnit.MILLISECONDS);
             kafkaProducer = null;
         }
+        kafkaStarted = false;
     }
 
     private void logException(Exception e) {
@@ -207,6 +211,7 @@
             switch (event.type()) {
             case CONFIG_ADDED:
             case CONFIG_UPDATED:
+                unconfigure();
                 configure((KafkaConfig) event.config().get());
                 break;
             case CONFIG_REMOVED: