Fixes for strange activation issues:
* Don't reconfigure on activate (we can wait for config event)
* Upgrade Kafka library to 1.1.1_1
* Close immediately without waiting for in-flight operations to
complete to avoid leaking resources
Change-Id: I60b02593ac7a5b4ddeab010c45e3b62c14eb4054
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 7a44872..d6e3392 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -25,6 +25,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.config.ConfigFactory;
@@ -37,6 +38,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -61,6 +63,9 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry configRegistry;
private static StringSerializer stringSerializer = new StringSerializer();
@@ -83,6 +88,7 @@
}
};
+ private static final String CLIENT_ID = "client.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String RETRIES = "retries";
private static final String RECONNECT_BACKOFF = "reconnect.backoff.ms";
@@ -102,8 +108,6 @@
configRegistry.registerConfigFactory(kafkaConfigFactory);
configRegistry.addListener(configListener);
- configure();
-
log.info("Started");
}
@@ -132,6 +136,7 @@
checkNotNull(config);
Properties properties = new Properties();
+ properties.put(CLIENT_ID, clusterService.getLocalNode().id().toString());
properties.put(BOOTSTRAP_SERVERS, config.getBootstrapServers());
properties.put(RETRIES, config.getRetries());
properties.put(RECONNECT_BACKOFF, config.getReconnectBackoff());
@@ -156,6 +161,7 @@
ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try {
+ log.info("Starting Kafka producer");
kafkaProducer = new KafkaProducer<>(properties);
} finally {
Thread.currentThread().setContextClassLoader(original);
@@ -164,7 +170,9 @@
private void shutdownKafka() {
if (kafkaProducer != null) {
- kafkaProducer.close();
+ log.info("Shutting down Kafka producer");
+ kafkaProducer.flush();
+ kafkaProducer.close(0, TimeUnit.MILLISECONDS);
kafkaProducer = null;
}
}
@@ -193,6 +201,7 @@
@Override
public void event(NetworkConfigEvent event) {
+ log.info("Event type {}", event.type());
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED: