Retrying kafka connection
Change-Id: Id1a4974b4c5664eff2c3231d935922aab85e709c
diff --git a/src/controllers/kafka.js b/src/controllers/kafka.js
index f754a65..80e64b8 100644
--- a/src/controllers/kafka.js
+++ b/src/controllers/kafka.js
@@ -25,60 +25,69 @@
// docs: https://github.com/Blizzard/node-rdkafka
var kafka = require('node-rdkafka');
- logger.log('debug',`Using librdkafka version: ${kafka.librdkafkaVersion}, kafka features: '${kafka.features}'`);
- logger.log('debug',`Connecting to broker: ${config.kafka_bootstrap_servers}`);
-
- var stream = kafka.KafkaConsumer.createReadStream({
- 'metadata.broker.list': config.kafka_bootstrap_servers,
- 'group.id': 'xos-ws',
- 'socket.keepalive.enable': true,
- 'enable.auto.commit': false
- }, {}, {
- topics: ['xos.gui_events'],
- });
-
- stream.on('ready', function () {
- logger.log('info', 'Kafka connected');
- });
-
- stream.on('error', function (err) {
- logger.log('error', err);
- });
-
- stream.consumer.on('event.error', function (err) {
- logger.log('error', err);
- });
-
- stream.on('data', function (msg) {
- logger.log('debug', `Topic: ${msg.topic}, Key: ${msg.key}, Timestamp: ${msg.timestamp}`);
-
- // strip diag messages
- // NOTE: have to coerce to string (due to FFI?)
- if (msg.key.toString() === 'Diag') {
- return;
- }
-
- let msgobj;
+ const connect = () => {
try {
- // TODO find the user that needs to be notified for msg.object update
- msgobj = JSON.parse(msg.value)
- }
+ logger.log('debug',`Using librdkafka version: ${kafka.librdkafkaVersion}, kafka features: '${kafka.features}'`);
+ logger.log('debug',`Connecting to broker: ${config.kafka_bootstrap_servers}`);
+ var stream = kafka.KafkaConsumer.createReadStream({
+ 'metadata.broker.list': config.kafka_bootstrap_servers,
+ 'group.id': 'xos-ws',
+ 'socket.keepalive.enable': true,
+ 'enable.auto.commit': false
+ }, {}, {
+ topics: ['xos.gui_events'],
+ });
+
+ stream.on('ready', function () {
+ logger.log('info', 'Kafka connected');
+ });
+
+ stream.on('error', function (err) {
+ logger.log('error', err);
+ });
+
+ stream.consumer.on('event.error', function (err) {
+ logger.log('error', err);
+ });
+
+ stream.on('data', function (msg) {
+ logger.log('debug', `Topic: ${msg.topic}, Key: ${msg.key}, Timestamp: ${msg.timestamp}`);
+
+ // strip diag messages
+ // NOTE: have to coerce to string (due to FFI?)
+ if (msg.key.toString() === 'Diag') {
+ return;
+ }
+ let msgobj;
+
+ try {
+ // TODO find the user that needs to be notified for msg.object update
+ msgobj = JSON.parse(msg.value)
+ }
+ catch(e) {
+ // stringify the event if it is not JSON
+ msgobj = msg.value.toString()
+ }
+
+ if (msgobj.deleted) {
+ logger.log('info', 'Remove on: ' + msg.key + ': ' + msg.value);
+ socket.emit('remove', {model: msg.key.toString(), msg: msgobj, deleted: true});
+ }
+ else {
+ logger.log('info', 'Update on: ' + msg.key + ': ' + msg.value);
+ socket.emit('update', {model: msg.key.toString(), msg: msgobj});
+ }
+
+ });
+ }
catch(e) {
- // stringify the event if it is not JSON
- msgobj = msg.value.toString()
+ logger.log('warning', 'Failed to connect to kafka, reconnecting in 5 sec', e)
+ setTimeout(connect, 5 * 1000);
}
+ }
- if (msgobj.deleted) {
- logger.log('info', 'Remove on: ' + msg.key + ': ' + msg.value);
- socket.emit('remove', {model: msg.key.toString(), msg: msgobj, deleted: true});
- }
- else {
- logger.log('info', 'Update on: ' + msg.key + ': ' + msg.value);
- socket.emit('update', {model: msg.key.toString(), msg: msgobj});
- }
-
- });
+ connect()
})();