Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements

Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 9a54181..975db2e 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -94,6 +94,15 @@
         return true;
     };
 
+    const getWorkflow = (workflowId) => {
+        if(workflowId in workflows) {
+            logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+            return null;
+        }
+
+        return workflows[workflowId];
+    };
+
     const clearWorkflows = () => {
         _.forOwn(workflows, (_workflow, workflowId) => {
             delete workflows[workflowId];
@@ -133,6 +142,15 @@
         return true;
     };
 
+    const getWorkflowRun = (workflowRunId) => {
+        if(workflowRunId in workflowRuns) {
+            logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
+            return null;
+        }
+
+        return workflowRuns[workflowRunId];
+    };
+
     const clearWorkflowRuns = () => {
         _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
             delete workflowRuns[workflowRunId];
@@ -379,6 +397,7 @@
             //      topic: topic sent
             //      message: {
             //          req_id: <req_id>,
+            //          error: <true/false>,
             //          result: <true/false>,
             //          message: <error message>
             //      }
@@ -411,6 +430,7 @@
                             return;
                         }
 
+                        // we return result
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
                                 req_id: req_id,
@@ -428,10 +448,18 @@
             // workflow run protocol:
             // REQ:
             //      topic: operation
-            //      message: <data>
+            //      message: {
+            //          req_id: <req_id>,
+            //          <data>...
+            //      }
             // RES:
             //      topic: topic sent
-            //      message: {result: <true/false>, message: <error message> }
+            //      message: {
+            //          req_id: <req_id>,
+            //          error: <true/false>,
+            //          result: <true/false>,
+            //          message: <error message>
+            //      }
 
             // map to WorkflowRun instance
             let workflowId = c.getWorkflowId();
@@ -463,18 +491,33 @@
             let router = ws_workflowrun.getRouter();
             _.forOwn(router, (routerElem, _key) => {
                 socket.on(routerElem.topic, (msg) => {
-                    routerElem.handler(routerElem.topic, msg, (err, result) => {
+                    // handle a common parameter - req_id
+                    // when we get req_id, return the same req_id in response.
+                    // this is to help identify a request from a response at client-side
+                    let req_id = 101010; // default number, signiture
+                    if(msg && checkObject(msg)) {
+                        if('req_id' in msg) {
+                            req_id = msg.req_id;
+                        }
+                    }
+
+                    routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
                         if(err) {
                             logger.log('warn', `unable to handle a message - ${err}`);
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: true,
                                 result: false,
                                 message: err
                             });
                             return;
                         }
 
+                        // we return result
                         if(routerElem.return === undefined || routerElem.return) {
                             socket.emit(routerElem.topic, {
+                                req_id: req_id,
+                                error: false,
                                 result: result
                             });
                         }
@@ -552,11 +595,13 @@
         removeClient: removeClient,
         removeClients: removeClients,
         addWorkflow: addWorkflow,
+        getWorkflow: getWorkflow,
         listWorkflows: listWorkflows,
         checkWorkflow: checkWorkflow,
         removeWorkflow: removeWorkflow,
         clearWorkflows: clearWorkflows,
         addWorkflowRun: addWorkflowRun,
+        getWorkflowRun: getWorkflowRun,
         listWorkflowRuns: listWorkflowRuns,
         checkWorkflowRun: checkWorkflowRun,
         removeWorkflowRun: removeWorkflowRun,
@@ -564,4 +609,4 @@
         updateWorkflowRunStatus: updateWorkflowRunStatus,
         setWorkflowRunKickstarted: setWorkflowRunKickstarted,
     };
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
index 3f3216a..b5eaf13 100644
--- a/src/controllers/websocket.js
+++ b/src/controllers/websocket.js
@@ -26,7 +26,10 @@
     let io;
     const createSocketIO = (server) => {
         // INSTANTIATE SOCKET.IO
-        io = socketio.listen(server);
+        io = socketio.listen(server, {
+            pingInterval: 500,
+            pingTimeout: 2000,
+        });
         io.use(ioWildcard());
 
         // set io to eventrouter
@@ -89,4 +92,4 @@
     // const socket = socketIo.get();
     // socket.emit('eventName', data);
 
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 5c77ebd..3d81bac 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -23,14 +23,17 @@
     const logger = require('../config/logger.js');
 
     let serviceEvents = {
-        WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
-        WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+        // manager -> controller -> manager
+        WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
+        WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
         WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
-        WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+        WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
         WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
-        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
         WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
-        WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+        WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
+        WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+        // controller -> manager
+        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
     };
 
     // WebSocket interface for workflow registration
@@ -38,12 +41,12 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.reg',
     //     message: {
-    //         req_id: <req_id> // optional
+    //         req_id: <req_id>, // optional
     //         workflow: <workflow>
     //     }
     // }
     const registerWorkflow = (topic, message, cb) => {
-        const distributor = require('./eventrouter.js/index.js');
+        const eventrouter = require('./eventrouter.js');
 
         let errorMessage;
         if(!message) {
@@ -66,7 +69,7 @@
 
         logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
 
-        let result = distributor.addWorkflow(workflow);
+        let result = eventrouter.addWorkflow(workflow);
         if(!result) {
             errorMessage = `failed to register a workflow ${workflow.getId()}`;
             cb(errorMessage, false);
@@ -197,53 +200,6 @@
         return;
     };
 
-    // WebSocket interface for workflow start notification
-    // Message format:
-    // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
-    //     message: {
-    //         req_id: <req_id> // optional
-    //         workflow_id: <workflow_id>,
-    //         workflow_run_id: <workflow_run_id>
-    //     }
-    // }
-    const notifyWorkflowStart = (topic, message, cb) => {
-        const eventrouter = require('./eventrouter.js');
-
-        let errorMessage;
-        if(!message) {
-            // error
-            errorMessage = `Message body for topic ${topic} is null or empty`;
-            logger.log('warn', `Return error - ${errorMessage}`);
-            cb(errorMessage, false);
-            return;
-        }
-
-        if(!('workflow_id' in message)) {
-            // error
-            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
-            logger.log('warn', `Return error - ${errorMessage}`);
-            cb(errorMessage, false);
-            return;
-        }
-
-        if(!('workflow_run_id' in message)) {
-            // error
-            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
-            logger.log('warn', `Return error - ${errorMessage}`);
-            cb(errorMessage, false);
-            return;
-        }
-
-        let workflowRunId = message.workflow_run_id;
-
-        // there must be a workflow matching
-        // set the workflow kickstarted
-        eventrouter.setWorkflowRunKickstarted(workflowRunId);
-        cb(null, true);
-        return;
-    }
-
     // WebSocket interface for workflow removal
     // Message format:
     // {
@@ -324,14 +280,61 @@
         return;
     }
 
+    // WebSocket interface for notifying a new workflow run
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const notifyNewWorkflowRun = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+
+        // there must be a workflow matching
+        // set the workflow kickstarted
+        let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
+        cb(null, result);
+        return;
+    }
+
     const getRouter = () => {
         return {
             registerWorkflow: {
-                topic: serviceEvents.WORKFLOW_REG,
+                topic: serviceEvents.WORKFLOW_REGISTER,
                 handler: registerWorkflow
             },
             registerWorkflowEssence: {
-                topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+                topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
                 handler: registerWorkflowEssence
             },
             listWorkflows: {
@@ -339,25 +342,24 @@
                 handler: listWorkflows
             },
             listWorkflowRuns: {
-                topic: serviceEvents.WORKFLOW_RUN_LIST,
+                topic: serviceEvents.WORKFLOW_LIST_RUN,
                 handler: listWorkflowRuns
             },
             checkWorkflow: {
                 topic: serviceEvents.WORKFLOW_CHECK,
                 handler: checkWorkflow
             },
-            notifyWorkflowStart: {
-                topic: serviceEvents.WORKFLOW_KICKSTART,
-                handler: notifyWorkflowStart,
-                return: false
-            },
             removeWorkflow: {
                 topic: serviceEvents.WORKFLOW_REMOVE,
                 handler: removeWorkflow
             },
             removeWorkflowRun: {
-                topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+                topic: serviceEvents.WORKFLOW_REMOVE_RUN,
                 handler: removeWorkflowRun
+            },
+            notifyNewWorkflowRun: {
+                topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
+                handler: notifyNewWorkflowRun
             }
         };
     };
@@ -384,4 +386,4 @@
         getRouter: getRouter,
         kickstartWorkflow: kickstartWorkflow
     };
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 036b1ef..4000f10 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -18,12 +18,16 @@
 (function () {
     'use strict';
 
+    const _ = require('lodash');
     const logger = require('../config/logger.js');
 
     let serviceEvents = {
+        // workflow_run -> controller -> workflow_run
         WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
         WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
-        WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+        WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch',
+        // controller -> workflow_run
+        WORKFLOW_RUN_NOTIFY_EVENT: 'cord.workflow.ctlsvc.workflow.run.notify'
     };
 
     // WebSocket interface for workflow status update
@@ -31,10 +35,11 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.status',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>,
-    //          task_id: <task_id>,
-    //          status: 'begin' or 'end'
+    //         req_id: <req_id>, // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>,
+    //         task_id: <task_id>,
+    //         status: 'begin' or 'end'
     //     }
     // }
     const updateWorkflowRunStatus = (topic, message, cb) => {
@@ -87,7 +92,13 @@
             message.task_id,
             message.status.toLowerCase()
         );
-        cb(null, result);
+        if(!result) {
+            errorMessage = `failed to update workflow run status ${message.workflow_run_id}`;
+            cb(errorMessage, false);
+        }
+        else {
+            cb(null, true);
+        }
         return;
     };
 
@@ -96,8 +107,9 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.count',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>
+    //         req_id: <req_id>, // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
     //     }
     // }
     const countQueuedEvents = (topic, message, cb) => {
@@ -138,10 +150,11 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
     //     message: {
-    //          workflow_id: <workflow_id>,
-    //          workflow_run_id: <workflow_run_id>,
-    //          task_id: <task_id>,
-    //          topic: <expected topic>
+    //         req_id: <req_id>, // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>,
+    //         task_id: <task_id>,
+    //         topic: <expected topic>
     //     }
     // }
     const fetchEvent = (topic, message, cb) => {
@@ -223,8 +236,34 @@
         };
     };
 
+    // out-going commands
+    const notifyEvent = (topic) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let clients = eventrouter.getWorkflowRunClients();
+        _.forOwn(clients, (client, _clientId) => {
+            let workflowId = client.getWorkflowId();
+            let workflowRunId = client.getWorkflowRunId();
+
+            let workflow = eventrouter.getWorkflow(workflowId);
+            let workflowRun = eventrouter.getWorkflowRun(workflowRunId);
+            if(workflowRun) {
+                if(workflowRun.isTopicAcceptable(workflow, topic)) {
+                    let socket = client.getSocket();
+                    if(socket) {
+                        socket.emit(serviceEvents.WORKFLOW_RUN_NOTIFY_EVENT, {
+                            topic: topic
+                        });
+                    }
+                }
+            }
+        });
+        return;
+    };
+
     module.exports = {
         serviceEvents: serviceEvents,
-        getRouter: getRouter
+        getRouter: getRouter,
+        notifyEvent: notifyEvent
     };
-})();
\ No newline at end of file
+})();