Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing

Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index 6184f58..df594fb 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -41,6 +41,7 @@
     var receivedKickstartMessages = [[],[]];
 
     describe('Workflow kickstart test', function() {
+        this.slow(5000);
 
         before(function() {
             // Start our server
@@ -90,7 +91,7 @@
                                 workflow_id: message.workflow_id,
                                 workflow_run_id: message.workflow_run_id
                             })
-                        }, 2000);
+                        }, 1000);
                     });
 
                     workflowManagerClient.on('connect', () => {
@@ -123,7 +124,9 @@
                     let essence = essenceLoader.loadEssence(essenceFileName, true);
 
                     // register the workflow
-                    workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+                    workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+                        essence: essence
+                    });
 
                     _.forOwn(essence, (_value, workflowId) => {
                         // save
@@ -164,7 +167,9 @@
 
             // remove workflows
             _.forOwn(workflowIds, (workflowId) => {
-                workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+                workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, {
+                    workflow_id: workflowId
+                });
             });
             workflowIds.length = 0;
 
@@ -216,7 +221,7 @@
                     expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
                 });
                 done();
-            }, 500);
+            }, 1000);
         });
 
         it('should have only one workflow run', function(done) {
@@ -230,10 +235,10 @@
                 // the workflow must be 'should_be_called'
                 expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
                 done();
-            }, 3000);
+            }, 2000);
         });
 
-        it('should be able to read an event that is used for workflow kickstart', function(done) {
+        it('should read an event that is used for workflow kickstart', function(done) {
             this.timeout(5000);
 
             // kickstart the workflow
@@ -268,86 +273,84 @@
                     expect(event.message.serialNumber).to.equal('testSerialXXX');
                     done();
                 });
-            }, 3000);
+            }, 2000);
         });
 
-        /*
-        it('should store user details for a new connection', () => {
-            const eventrouter = require('../src/controllers/eventrouter.js');
+        it('should map a workflow run using key-field', function(done) {
+            this.timeout(5000);
 
-            const probe = eventrouter.getClients()['probe_id'];
-            expect(probe.getParams().name).to.equal('probe@xos.org');
-
-            const manager = eventrouter.getClients()['workflow_manager_id'];
-            expect(manager.getParams().name).to.equal('manager@xos.org');
-
-            const run = eventrouter.getClients()['workflow_run_id'];
-            expect(run.getParams().name).to.equal('run@xos.org');
-        });
-
-        it('should not store the same user twice', (done) => {
-            // This test case makes cleaning up process taking long time because it leaves
-            // a client socket. It seems there's no way to release it from server-side.
-
-            // connect a client to the server
-            const client2 = io.connect(`http://localhost:${port}`, {
-                query: 'id=probe_id&type=probe' +
-                        '&name=probe@xos.org&value=different_value'
-            });
-
-            // when is connected start testing
-            client2.on('connect', () => {
-                setTimeout(() => {
-                    const eventrouter = require('../src/controllers/eventrouter.js');
-                    expect(
-                        Object.keys(eventrouter.getWorkflowRunClients()).length,
-                        'num of workflow run clients'
-                    ).to.equal(1);
-                    expect(
-                        Object.keys(eventrouter.getWorkflowManagerClients()).length,
-                        'num of workflow manager clients'
-                    ).to.equal(1);
-                    expect(
-                        Object.keys(eventrouter.getProbeClients()).length,
-                        'num of probe clients'
-                    ).to.equal(1);
-                    expect(
-                        Object.keys(eventrouter.getClients()).length,
-                        'total num of clients'
-                    ).to.equal(3);
-
-                    done();
-                }, 100);
-            });
-        });
-
-        it('should remove a user on disconnect', (done) => {
-            workflowManagerClient.disconnect();
-            workflowRunClient.disconnect();
-            probeClient.disconnect();
-
-            // we need to wait for the event to be dispatched
+            // kickstart the workflow
+            probeClient.emit(
+                'onu.events',
+                {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+            );
+            probeClient.emit(
+                'datamodel.AttWorkflowDriverServiceInstance',
+                {operation: 'update', serialNumber: 'testSerialXXX', other: 'updated_test_other_field'}
+            );
             setTimeout(() => {
-                const eventrouter = require('../src/controllers/eventrouter.js');
-                expect(
-                    Object.keys(eventrouter.getWorkflowRunClients()).length,
-                    'num of workflow run clients'
-                ).to.equal(0);
-                expect(
-                    Object.keys(eventrouter.getWorkflowManagerClients()).length,
-                    'num of workflow manager clients'
-                ).to.equal(0);
-                expect(
-                    Object.keys(eventrouter.getProbeClients()).length,
-                    'num of probe clients'
-                ).to.equal(0);
-                expect(
-                    Object.keys(eventrouter.getClients()).length,
-                    'total num of clients'
-                ).to.equal(0);
-                done();
-            }, 100);
+                // kickstart will take 2 seconds roughly
+                expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+                // the workflow must be 'should_be_called'
+                expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+
+                // connect a workflow run client to the server
+                let workflowRunClient = io.connect(`http://localhost:${port}`, {
+                    query: 'id=workflow_run_id1&type=workflow_run' +
+                            `&workflow_id=${workflowRunInfos[0].workflowId}` +
+                            `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
+                            '&name=run1@xos.org'
+                });
+                workflowRunClients.push(workflowRunClient);
+
+                workflowRunClient.on('connect', () => {
+                    // check message counts
+                    workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, {
+                        workflow_id: workflowRunInfos[0].workflowId,
+                        workflow_run_id: workflowRunInfos[0].workflowRunId
+                    });
+                });
+
+                let eventRaised = 0;
+                workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, (result) => {
+                    let count = result.result;
+                    expect(count, 'number of events queued').to.equal(2);
+
+                    // fetch two events
+                    workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+                        workflow_id: workflowRunInfos[0].workflowId,
+                        workflow_run_id: workflowRunInfos[0].workflowRunId,
+                        task_id: 'onu_event_handler',
+                        topic: 'onu.events'
+                    });
+                    eventRaised++;
+
+                    workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+                        workflow_id: workflowRunInfos[0].workflowId,
+                        workflow_run_id: workflowRunInfos[0].workflowRunId,
+                        task_id: 'onu_model_event_handler',
+                        topic: 'datamodel.AttWorkflowDriverServiceInstance'
+                    });
+                    eventRaised++;
+                });
+
+                workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
+                    let event = result.result;
+                    if(eventRaised === 2) {
+                        expect(event.topic).to.equal('onu.events');
+                        expect(event.message.serialNumber).to.equal('testSerialXXX');
+                    }
+                    else if(eventRaised === 1) {
+                        expect(event.topic).to.equal('datamodel.AttWorkflowDriverServiceInstance');
+                        expect(event.message.serialNumber).to.equal('testSerialXXX');
+                    }
+                    eventRaised--;
+
+                    if(eventRaised === 0) {
+                        done();
+                    }
+                });
+            }, 2000);
         });
-        */
     });
 })();
\ No newline at end of file