Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing

Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 82d27db..9a54181 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -58,6 +58,10 @@
     //    io = ioInstance;
     //};
 
+    const checkObject = (obj) => {
+        return Object.prototype.toString.call(obj) === '[object Object]';
+    };
+
     const destroy = () => {
         removeClients();
         clearWorkflowRuns();
@@ -178,12 +182,25 @@
         }
 
         // check if there are workflow runs
-        _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+        for(let key in workflowRuns) {
+            if (!workflowRuns.hasOwnProperty(key)) {
+                continue;
+            }
+
+            let workflowRun = workflowRuns[key];
             if(workflowRun.getWorkflowId() === workflowId) {
                 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
                 return false;
             }
-        });
+        }
+
+        // we don't use below code becuase it cannot properly stop and return value with 'return'
+        // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+        //     if(workflowRun.getWorkflowId() === workflowId) {
+        //         logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+        //         return false;
+        //     }
+        // });
 
         delete workflows[workflowId];
         return true;
@@ -207,6 +224,8 @@
         // to check if there are workflow runs for the events
         let workflowIdsRunning = [];
 
+        logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
+
         // route event to running instances
         _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
             let workflowId = workflowRun.getWorkflowId();
@@ -217,6 +236,7 @@
             //      (already finished tasks are not counted)
             // 2) the task's key field and value
             if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+                //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
                 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
                 workflowRun.enqueueEvent(topic, message);
 
@@ -234,6 +254,8 @@
                 if(!workflowIdsRunning.includes(workflowId)) {
                     // we need to buffer the event until workflow run is brought up
                     let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+                    workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
                     let workflowRunId = workflowRun.getId();
 
                     // register for management
@@ -250,6 +272,17 @@
         });
     };
 
+    const countQueuedEvents = (workflowRunId) => {
+        // this counts queued events
+        if(!(workflowRunId in workflowRuns)) {
+            logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+            return null;
+        }
+
+        let workflowRun = workflowRuns[workflowRunId];
+        return workflowRun.lengthEventQueue();
+    };
+
     const fetchEvent = (workflowRunId, taskId, topic) => {
         // this returns an event or an empty obj when there is no message
         if(!(workflowRunId in workflowRuns)) {
@@ -338,10 +371,17 @@
             // manager protocol:
             // REQ:
             //      topic: operation
-            //      message: <data>
+            //      message: {
+            //          req_id: <req_id>,
+            //          <data>...
+            //      }
             // RES:
             //      topic: topic sent
-            //      message: {result: <true/false>, message: <error message> }F
+            //      message: {
+            //          req_id: <req_id>,
+            //          result: <true/false>,
+            //          message: <error message>
+            //      }
             allClients[clientId] = c;
             workflowManagerClients[clientId] = c;
 
@@ -349,11 +389,23 @@
             let router = ws_manager.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
-                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                    // handle a common parameter - req_id
+                    // when we get req_id, return the same req_id in response.
+                    // this is to help identify a request from a response at client-side
+                    let req_id = 101010; // default number, signiture
+                    if(msg && checkObject(msg)) {
+                        if('req_id' in msg) {
+                            req_id = msg.req_id;
+                        }
+                    }
+
+                    routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
                         if(err) {
                             logger.log('warn', `unable to handle a message - ${err}`);
                             socket.emit(routerElem.topic, {
-                                result: false,
+                                req_id: req_id,
+                                error: true,
+                                result: result,
                                 message: err
                             });
                             return;
@@ -361,6 +413,8 @@
 
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: false,
                                 result: result
                             });
                         }
@@ -492,6 +546,7 @@
         clientType: Client.Type,
         //setIO: setIO,
         sendEvent: sendEvent,
+        countQueuedEvents: countQueuedEvents,
         fetchEvent: fetchEvent,
         addClient: addClient,
         removeClient: removeClient,
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow: <workflow>
+    //     }
     // }
-    const registWorkflow = (topic, message, cb) => {
+    const registerWorkflow = (topic, message, cb) => {
         const distributor = require('./eventrouter.js/index.js');
 
         let errorMessage;
@@ -51,7 +54,15 @@
             return;
         }
 
-        let workflow = message;
+        if(!('workflow' in message)) {
+            // error
+            errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflow = message.workflow;
 
         logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
 
@@ -69,8 +80,11 @@
     // WebSocket interface for workflow registration (via essence)
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.reg',
-    //     message: <workflow essence>
+    //     topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         essence: <workflow essence>
+    //     }
     // }
     const registerWorkflowEssence = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
             return;
         }
 
-        let essence = message;
+        if(!('essence' in message)) {
+            // error
+            errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let essence = message.essence;
         let result = true;
         let errorResults = [];
 
@@ -112,9 +134,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflows = (topic, message, cb) => {
+    const listWorkflows = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.list',
-    //     message: null
+    //     message: {
+    //         req_id: <req_id> // optional
+    //     }
     // }
-    const listWorkflowRuns = (topic, message, cb) => {
+    const listWorkflowRuns = (_topic, _message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
         let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.check',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const checkWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
             return;
         }
 
-        let workflowId = message;
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.checkWorkflow(workflowId);
         cb(null, result);
         return;
@@ -165,8 +202,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
     // Message format:
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.remove',
-    //     message: <workflow_id>
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>
+    //     }
     // }
     const removeWorkflow = (topic, message, cb) => {
         const eventrouter = require('./eventrouter.js');
 
-        let workflowId = message;
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowId = message.workflow_id;
         let result = eventrouter.removeWorkflow(workflowId);
         cb(null, result);
         return;
@@ -226,8 +284,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.remove',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
 
     const getRouter = () => {
         return {
-            registWorkflow: {
+            registerWorkflow: {
                 topic: serviceEvents.WORKFLOW_REG,
-                handler: registWorkflow
+                handler: registerWorkflow
             },
             registerWorkflowEssence: {
                 topic: serviceEvents.WORKFLOW_REG_ESSENCE,
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index f7ee474..036b1ef 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -22,13 +22,14 @@
 
     let serviceEvents = {
         WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+        WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
         WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
     };
 
     // WebSocket interface for workflow status update
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflowrun.status',
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.status',
     //     message: {
     //          workflow_id: <workflow_id>,
     //          workflow_run_id: <workflow_run_id>,
@@ -90,10 +91,52 @@
         return;
     };
 
-    // WebSocket interface for workflow status update
+    // WebSocket interface for counting queued events
     // Message format:
     // {
-    //     topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.count',
+    //     message: {
+    //          workflow_id: <workflow_id>,
+    //          workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const countQueuedEvents = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `workflow_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `workflow_run_id field is not in message body - ${message}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let count = eventrouter.countQueuedEvents(message.workflow_run_id);
+        cb(null, count);
+        return;
+    };
+
+    // WebSocket interface for fetching an event
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
     //     message: {
     //          workflow_id: <workflow_id>,
     //          workflow_run_id: <workflow_run_id>,
@@ -169,6 +212,10 @@
                 topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
                 handler: updateWorkflowRunStatus
             },
+            countQueuedEvents: {
+                topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
+                handler: countQueuedEvents
+            },
             fetchEvent: {
                 topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
                 handler: fetchEvent