Retrying kafka connection

Change-Id: Id1a4974b4c5664eff2c3231d935922aab85e709c
diff --git a/spec/kafka.spec.js b/spec/kafka.spec.js
index 17df04e..2f6a8fb 100644
--- a/spec/kafka.spec.js
+++ b/spec/kafka.spec.js
@@ -1,12 +1,9 @@
 /*
  * Copyright 2017-present Open Networking Foundation
-
  * Licensed under the Apache License, Version 2.0 (the "License");
  * you may not use this file except in compliance with the License.
  * You may obtain a copy of the License at
-
  * http://www.apache.org/licenses/LICENSE-2.0
-
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -34,20 +31,9 @@
     }
   };
 
-  const trigger = {}
-
-  const mockStream = {
-    on: (event, cb) => {
-      trigger[event] = cb
-    },
-    consumer: {
-      on: sinon.spy()
-    }
-  }
-
   const fakekafka = {
     KafkaConsumer: {
-      createReadStream: () => mockStream
+      createReadStream: sinon.stub()
     }
   }
 
@@ -56,7 +42,7 @@
 
   describe('The event system', () => {
 
-    before((done) => {
+    before(() => {
 
       // Enable mockery to mock objects
       mockery.enable({
@@ -70,10 +56,6 @@
       // mock the socketIo client to have a spy
       mockery.registerMock('./websocket.js', mockSocket);
 
-      require('../src/controllers/kafka.js');
-      setTimeout(() => {
-        done();
-      }, 1000);
     });
 
     after(() => {
@@ -85,74 +67,123 @@
       socketSpy.reset();
     });
 
-    it('should send a websocket event when text Kafka event is received', (done) => {
-      trigger.data({topic:msgTopic,
-                    key:channelName,
-                    timestamp:1234,
-                    value:'I am sending a message.',
-                    });
-
-      setTimeout(() => {
-        expect(socketSpy).to.have.been.called;
-        expect(socketSpy).to.have.been.calledWith('update', {
-          model: channelName,
-          msg: 'I am sending a message.'
-        });
-        done();
-      }, 500)
+    afterEach(() => {
+      var name = require.resolve('../src/controllers/kafka.js');
+      delete require.cache[name];
     });
 
-    it('should send a websocket event when JSON Kafka event is received', (done) => {
-      trigger.data({topic:msgTopic,
-                    key:channelName,
-                    timestamp:2345,
-                    value:JSON.stringify({msg: 'JSON Message'}),
-                    });
+    describe('when connection fails', () => {
+      beforeEach((done) => {
+        fakekafka.KafkaConsumer.createReadStream
+          .onFirstCall().throws()
+          .onSecondCall().returns(true)
 
-      setTimeout(() => {
-        expect(socketSpy).to.have.been.called;
-        expect(socketSpy).to.have.been.calledWith('update', {
-          model: channelName,
-          msg: {msg: 'JSON Message'}
-        });
-        done();
-      }, 1000)
+        require('../src/controllers/kafka.js');
+        setTimeout(() => {
+          done();
+        }, 10);
+      });
+
+      it('should try to reconnect after 5 seconds', (done) => {
+        setTimeout(() => {
+          expect(fakekafka.KafkaConsumer.createReadStream.calledTwice).to.be.true;
+          done();
+        }, 6 * 1000);
+      }).timeout(7 * 1000);
+
+      afterEach(() => {
+        fakekafka.KafkaConsumer.createReadStream = sinon.stub()
+      })
     });
 
-    it('should send a websocket event with msg: Deleted when JSON object has deleted:true', (done) => {
-      trigger.data({topic:msgTopic,
-                    key:channelName,
-                    timestamp:3456,
-                    value:JSON.stringify({msg: 'Deleted', deleted: true}),
-                    });
+    describe('when is connected', () => {
+      const trigger = {}
 
-      setTimeout(() => {
-        expect(socketSpy).to.have.been.called;
-        expect(socketSpy).to.have.been.calledWith('remove', {
-          model: channelName,
-          msg: {
-            msg: 'Deleted',
+      const mockStream = {
+        on: (event, cb) => {
+          trigger[event] = cb
+        },
+        consumer: {
+          on: sinon.spy()
+        }
+      }
+      beforeEach((done) => {
+        fakekafka.KafkaConsumer.createReadStream.returns(mockStream)
+        require('../src/controllers/kafka.js');
+
+        setTimeout(() => {
+          done();
+        }, 10);
+      });
+
+      it('should send a websocket event when text Kafka event is received', (done) => {
+        trigger.data({topic:msgTopic,
+                      key:channelName,
+                      timestamp:1234,
+                      value:'I am sending a message.',
+                      });
+  
+        setTimeout(() => {
+          expect(socketSpy).to.have.been.called;
+          expect(socketSpy).to.have.been.calledWith('update', {
+            model: channelName,
+            msg: 'I am sending a message.'
+          });
+          done();
+        }, 500)
+      });
+  
+      it('should send a websocket event when JSON Kafka event is received', (done) => {
+        trigger.data({topic:msgTopic,
+                      key:channelName,
+                      timestamp:2345,
+                      value:JSON.stringify({msg: 'JSON Message'}),
+                      });
+  
+        setTimeout(() => {
+          expect(socketSpy).to.have.been.called;
+          expect(socketSpy).to.have.been.calledWith('update', {
+            model: channelName,
+            msg: {msg: 'JSON Message'}
+          });
+          done();
+        }, 1000)
+      });
+  
+      it('should send a websocket event with msg: Deleted when JSON object has deleted:true', (done) => {
+        trigger.data({topic:msgTopic,
+                      key:channelName,
+                      timestamp:3456,
+                      value:JSON.stringify({msg: 'Deleted', deleted: true}),
+                      });
+  
+        setTimeout(() => {
+          expect(socketSpy).to.have.been.called;
+          expect(socketSpy).to.have.been.calledWith('remove', {
+            model: channelName,
+            msg: {
+              msg: 'Deleted',
+              deleted: true
+            },
             deleted: true
-          },
-          deleted: true
-        });
-
-        done();
-      }, 1000)
+          });
+  
+          done();
+        }, 1000)
+      });
+  
+      it('should not send a websocket event if the Kafka key is Diag', (done) => {
+        trigger.data({topic:msgTopic,
+                      key:'Diag',
+                      timestamp:4567,
+                      value:JSON.stringify({msg: 'Diag Message'}),
+                      });
+  
+        setTimeout(() => {
+          expect(socketSpy).not.to.have.been.called;
+          done();
+        }, 1000)
+      });
     });
-
-    it('should not send a websocket event if the Kafka key is Diag', (done) => {
-      trigger.data({topic:msgTopic,
-                    key:'Diag',
-                    timestamp:4567,
-                    value:JSON.stringify({msg: 'Diag Message'}),
-                    });
-
-      setTimeout(() => {
-        expect(socketSpy).not.to.have.been.called;
-        done();
-      }, 1000)
-    });
-
   });
-})();
+})();
\ No newline at end of file