Retrying kafka connection

Change-Id: Id1a4974b4c5664eff2c3231d935922aab85e709c
diff --git a/src/controllers/kafka.js b/src/controllers/kafka.js
index f754a65..80e64b8 100644
--- a/src/controllers/kafka.js
+++ b/src/controllers/kafka.js
@@ -25,60 +25,69 @@
   // docs: https://github.com/Blizzard/node-rdkafka
   var kafka = require('node-rdkafka');
 
-  logger.log('debug',`Using librdkafka version: ${kafka.librdkafkaVersion}, kafka features: '${kafka.features}'`);
-  logger.log('debug',`Connecting to broker: ${config.kafka_bootstrap_servers}`);
-
-  var stream = kafka.KafkaConsumer.createReadStream({
-    'metadata.broker.list': config.kafka_bootstrap_servers,
-    'group.id': 'xos-ws',
-    'socket.keepalive.enable': true,
-    'enable.auto.commit': false
-  }, {}, {
-    topics: ['xos.gui_events'],
-  });
-
-  stream.on('ready', function () {
-    logger.log('info', 'Kafka connected');
-  });
-
-  stream.on('error', function (err) {
-    logger.log('error', err);
-  });
-
-  stream.consumer.on('event.error', function (err) {
-    logger.log('error', err);
-  });
-
-  stream.on('data', function (msg) {
-    logger.log('debug', `Topic: ${msg.topic}, Key: ${msg.key}, Timestamp: ${msg.timestamp}`);
-
-    // strip diag messages
-    // NOTE: have to coerce to string (due to FFI?)
-    if (msg.key.toString() === 'Diag') {
-      return;
-    }
-
-    let msgobj;
+  const connect = () => {
 
     try {
-      // TODO find the user that needs to be notified for msg.object update
-      msgobj = JSON.parse(msg.value)
-    }
+      logger.log('debug',`Using librdkafka version: ${kafka.librdkafkaVersion}, kafka features: '${kafka.features}'`);
+      logger.log('debug',`Connecting to broker: ${config.kafka_bootstrap_servers}`);
 
+      var stream = kafka.KafkaConsumer.createReadStream({
+        'metadata.broker.list': config.kafka_bootstrap_servers,
+        'group.id': 'xos-ws',
+        'socket.keepalive.enable': true,
+        'enable.auto.commit': false
+      }, {}, {
+        topics: ['xos.gui_events'],
+      });
+    
+      stream.on('ready', function () {
+        logger.log('info', 'Kafka connected');
+      });
+    
+      stream.on('error', function (err) {
+        logger.log('error', err);
+      });
+    
+      stream.consumer.on('event.error', function (err) {
+        logger.log('error', err);
+      });
+    
+      stream.on('data', function (msg) {
+        logger.log('debug', `Topic: ${msg.topic}, Key: ${msg.key}, Timestamp: ${msg.timestamp}`);
+    
+        // strip diag messages
+        // NOTE: have to coerce to string (due to FFI?)
+        if (msg.key.toString() === 'Diag') {
+          return;
+        }
+        let msgobj;
+    
+        try {
+          // TODO find the user that needs to be notified for msg.object update
+          msgobj = JSON.parse(msg.value)
+        }
+        catch(e) {
+          // stringify the event if it is not JSON
+          msgobj = msg.value.toString()
+        }
+    
+        if (msgobj.deleted) {
+          logger.log('info', 'Remove on: ' + msg.key + ': ' + msg.value);
+          socket.emit('remove', {model: msg.key.toString(), msg: msgobj, deleted: true});
+        }
+        else {
+          logger.log('info', 'Update on: ' + msg.key + ': ' + msg.value);
+          socket.emit('update', {model: msg.key.toString(), msg: msgobj});
+        }
+    
+      });
+    }
     catch(e) {
-      // stringify the event if it is not JSON
-      msgobj = msg.value.toString()
+      logger.log('warning', 'Failed to connect to kafka, reconnecting in 5 sec', e)
+      setTimeout(connect, 5 * 1000);
     }
+  }
 
-    if (msgobj.deleted) {
-      logger.log('info', 'Remove on: ' + msg.key + ': ' + msg.value);
-      socket.emit('remove', {model: msg.key.toString(), msg: msgobj, deleted: true});
-    }
-    else {
-      logger.log('info', 'Update on: ' + msg.key + ': ' + msg.value);
-      socket.emit('update', {model: msg.key.toString(), msg: msgobj});
-    }
-
-  });
+  connect()
 
 })();