Fixes for strange activation issues:
* Don't reconfigure on activate (we can wait for config event)
* Upgrade Kafka library to 1.1.1_1
* Close immediately without waiting for in-flight operations to
complete to avoid leaking resources
Change-Id: I60b02593ac7a5b4ddeab010c45e3b62c14eb4054
diff --git a/app.xml b/app.xml
index 84046e3..07b66d2 100644
--- a/app.xml
+++ b/app.xml
@@ -20,5 +20,5 @@
features="${project.artifactId}" apps="org.opencord.olt,org.opencord.dhcpl2relay,org.opencord.aaa">
<description>${project.description}</description>
<artifact>mvn:${project.groupId}/${project.artifactId}/${project.version}</artifact>
- <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.0_1</artifact>
+ <artifact>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.1_1</artifact>
</app>
diff --git a/features.xml b/features.xml
index 5d83ff4..2ce58dd 100644
--- a/features.xml
+++ b/features.xml
@@ -19,6 +19,6 @@
description="${project.description}">
<feature>onos-api</feature>
<bundle>mvn:${project.groupId}/${project.artifactId}/${project.version}</bundle>
- <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.0_1</bundle>
+ <bundle>mvn:org.apache.servicemix.bundles/org.apache.servicemix.bundles.kafka-clients/1.1.1_1</bundle>
</feature>
</features>
diff --git a/pom.xml b/pom.xml
index 94c9485..c3af919 100644
--- a/pom.xml
+++ b/pom.xml
@@ -68,7 +68,7 @@
<dependency>
<groupId>org.apache.servicemix.bundles</groupId>
<artifactId>org.apache.servicemix.bundles.kafka-clients</artifactId>
- <version>1.1.0_1</version>
+ <version>1.1.1_1</version>
</dependency>
<dependency>
diff --git a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
index 7a44872..d6e3392 100644
--- a/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
+++ b/src/main/java/org/opencord/kafka/impl/KafkaIntegration.java
@@ -25,6 +25,7 @@
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
+import org.onosproject.cluster.ClusterService;
import org.onosproject.core.ApplicationId;
import org.onosproject.core.CoreService;
import org.onosproject.net.config.ConfigFactory;
@@ -37,6 +38,7 @@
import java.util.Properties;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import static com.google.common.base.Preconditions.checkNotNull;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -61,6 +63,9 @@
protected CoreService coreService;
@Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
+ protected ClusterService clusterService;
+
+ @Reference(cardinality = ReferenceCardinality.MANDATORY_UNARY)
protected NetworkConfigRegistry configRegistry;
private static StringSerializer stringSerializer = new StringSerializer();
@@ -83,6 +88,7 @@
}
};
+ private static final String CLIENT_ID = "client.id";
private static final String BOOTSTRAP_SERVERS = "bootstrap.servers";
private static final String RETRIES = "retries";
private static final String RECONNECT_BACKOFF = "reconnect.backoff.ms";
@@ -102,8 +108,6 @@
configRegistry.registerConfigFactory(kafkaConfigFactory);
configRegistry.addListener(configListener);
- configure();
-
log.info("Started");
}
@@ -132,6 +136,7 @@
checkNotNull(config);
Properties properties = new Properties();
+ properties.put(CLIENT_ID, clusterService.getLocalNode().id().toString());
properties.put(BOOTSTRAP_SERVERS, config.getBootstrapServers());
properties.put(RETRIES, config.getRetries());
properties.put(RECONNECT_BACKOFF, config.getReconnectBackoff());
@@ -156,6 +161,7 @@
ClassLoader original = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(this.getClass().getClassLoader());
try {
+ log.info("Starting Kafka producer");
kafkaProducer = new KafkaProducer<>(properties);
} finally {
Thread.currentThread().setContextClassLoader(original);
@@ -164,7 +170,9 @@
private void shutdownKafka() {
if (kafkaProducer != null) {
- kafkaProducer.close();
+ log.info("Shutting down Kafka producer");
+ kafkaProducer.flush();
+ kafkaProducer.close(0, TimeUnit.MILLISECONDS);
kafkaProducer = null;
}
}
@@ -193,6 +201,7 @@
@Override
public void event(NetworkConfigEvent event) {
+ log.info("Event type {}", event.type());
switch (event.type()) {
case CONFIG_ADDED:
case CONFIG_UPDATED: