Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements

Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 9a54181..975db2e 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -94,6 +94,15 @@
         return true;
     };
 
+    const getWorkflow = (workflowId) => {
+        if(workflowId in workflows) {
+            logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+            return null;
+        }
+
+        return workflows[workflowId];
+    };
+
     const clearWorkflows = () => {
         _.forOwn(workflows, (_workflow, workflowId) => {
             delete workflows[workflowId];
@@ -133,6 +142,15 @@
         return true;
     };
 
+    const getWorkflowRun = (workflowRunId) => {
+        if(workflowRunId in workflowRuns) {
+            logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
+            return null;
+        }
+
+        return workflowRuns[workflowRunId];
+    };
+
     const clearWorkflowRuns = () => {
         _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
             delete workflowRuns[workflowRunId];
@@ -379,6 +397,7 @@
             //      topic: topic sent
             //      message: {
             //          req_id: <req_id>,
+            //          error: <true/false>,
             //          result: <true/false>,
             //          message: <error message>
             //      }
@@ -411,6 +430,7 @@
                             return;
                         }
 
+                        // we return result
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
                                 req_id: req_id,
@@ -428,10 +448,18 @@
             // workflow run protocol:
             // REQ:
             //      topic: operation
-            //      message: <data>
+            //      message: {
+            //          req_id: <req_id>,
+            //          <data>...
+            //      }
             // RES:
             //      topic: topic sent
-            //      message: {result: <true/false>, message: <error message> }
+            //      message: {
+            //          req_id: <req_id>,
+            //          error: <true/false>,
+            //          result: <true/false>,
+            //          message: <error message>
+            //      }
 
             // map to WorkflowRun instance
             let workflowId = c.getWorkflowId();
@@ -463,18 +491,33 @@
             let router = ws_workflowrun.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
-                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                    // handle a common parameter - req_id
+                    // when we get req_id, return the same req_id in response.
+                    // this is to help identify a request from a response at client-side
+                    let req_id = 101010; // default number, signiture
+                    if(msg && checkObject(msg)) {
+                        if('req_id' in msg) {
+                            req_id = msg.req_id;
+                        }
+                    }
+
+                    routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
                         if(err) {
                             logger.log('warn', `unable to handle a message - ${err}`);
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: true,
                                 result: false,
                                 message: err
                             });
                             return;
                         }
 
+                        // we return result
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: false,
                                 result: result
                             });
                         }
@@ -552,11 +595,13 @@
         removeClient: removeClient,
         removeClients: removeClients,
         addWorkflow: addWorkflow,
+        getWorkflow: getWorkflow,
         listWorkflows: listWorkflows,
         checkWorkflow: checkWorkflow,
         removeWorkflow: removeWorkflow,
         clearWorkflows: clearWorkflows,
         addWorkflowRun: addWorkflowRun,
+        getWorkflowRun: getWorkflowRun,
         listWorkflowRuns: listWorkflowRuns,
         checkWorkflowRun: checkWorkflowRun,
         removeWorkflowRun: removeWorkflowRun,
@@ -564,4 +609,4 @@
         updateWorkflowRunStatus: updateWorkflowRunStatus,
         setWorkflowRunKickstarted: setWorkflowRunKickstarted,
     };
-})();
\ No newline at end of file
+})();