Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing

Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow: <workflow>
+    //     }
     // }
-    const registWorkflow = (topic, message, cb) => {
+    const registerWorkflow = (topic, message, cb) => {
         const distributor = require('./eventrouter.js/index.js');
 
         let errorMessage;
@@ -51,7 +54,15 @@
             return;
         }
 
-        let workflow = message;
+        if(!('workflow' in message)) {
+            // error
+            errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflow = message.workflow;
 
         logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
 
@@ -69,8 +80,11 @@
     // WebSocket interface for workflow registration (via essence)
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow essence>
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         essence: <workflow essence>
+    //     }
     // }
     const registerWorkflowEssence = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
             return;
         }
 
-        let essence = message;
+        if(!('essence' in message)) {
+            // error
+            errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let essence = message.essence;
         let result = true;
         let errorResults = [];
 
@@ -112,9 +134,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflows = (topic, message, cb) => {
+    const listWorkflows = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflowRuns = (topic, message, cb) => {
+    const listWorkflowRuns = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.check',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const checkWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
             return;
         }
 
-        let workflowId = message;
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.checkWorkflow(workflowId);
         cb(null, result);
         return;
@@ -165,8 +202,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.remove',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const removeWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
-        let workflowId = message;
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.removeWorkflow(workflowId);
         cb(null, result);
         return;
@@ -226,8 +284,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.remove',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
 
     const getRouter = () => {
         return {
-            registWorkflow: {
+            registerWorkflow: {
                 topic: serviceEvents.WORKFLOW_REG,
-                handler: registWorkflow
+                handler: registerWorkflow
             },
             registerWorkflowEssence: {
                 topic: serviceEvents.WORKFLOW_REG_ESSENCE,