[SEBA-593] Create only one KafkaProducer when the app is activated (or the config change)
Change-Id: I2c95048ca7ff0add4ed5271bb855df4bb438abe5
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 3610d50..0720a86 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -59,6 +59,8 @@
private static final String APP_NAME = "org.opencord.kafka";
private ApplicationId appId;
+ private Boolean kafkaStarted = false;
+
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected CoreService coreService;
@@ -154,18 +156,19 @@
}
private void startKafka(Properties properties) {
- shutdownKafka();
-
- // Kafka client doesn't play nice with the default OSGi classloader
- // This workaround temporarily changes the thread's classloader so that
- // the Kafka client can load the serializer classes.
- ClassLoader original = Thread.currentThread().getContextClassLoader();
- Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
- try {
- log.info("Starting Kafka producer");
- kafkaProducer = new KafkaProducer<>(properties);
- } finally {
- Thread.currentThread().setContextClassLoader(original);
+ if (!kafkaStarted) {
+ // Kafka client doesn't play nice with the default OSGi classloader
+ // This workaround temporarily changes the thread's classloader so that
+ // the Kafka client can load the serializer classes.
+ ClassLoader original = Thread.currentThread().getContextClassLoader();
+ Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
+ try {
+ log.info("Starting Kafka producer");
+ kafkaProducer = new KafkaProducer<>(properties);
+ kafkaStarted = true;
+ } finally {
+ Thread.currentThread().setContextClassLoader(original);
+ }
}
}
@@ -176,6 +179,7 @@
kafkaProducer.close(0, TimeUnit.MILLISECONDS);
kafkaProducer = null;
}
+ kafkaStarted = false;
}
private void logException(Exception e) {
@@ -207,6 +211,7 @@
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED:
+ unconfigure();
configure((KafkaConfig) event.config().get());
break;
case CONFIG_REMOVED: