Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements

Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 5c77ebd..3d81bac 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -23,14 +23,17 @@
     const logger = require('../config/logger.js');
 
     let serviceEvents = {
-        WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
-        WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+        // manager -> controller -> manager
+        WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
+        WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
         WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
-        WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+        WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
         WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
-        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
         WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
-        WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+        WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
+        WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+        // controller -> manager
+        WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
     };
 
     // WebSocket interface for workflow registration
@@ -38,12 +41,12 @@
     // {
     //     topic: 'cord.workflow.ctlsvc.workflow.reg',
     //     message: {
-    //         req_id: <req_id> // optional
+    //         req_id: <req_id>, // optional
     //         workflow: <workflow>
     //     }
     // }
     const registerWorkflow = (topic, message, cb) => {
-        const distributor = require('./eventrouter.js/index.js');
+        const eventrouter = require('./eventrouter.js');
 
         let errorMessage;
         if(!message) {
@@ -66,7 +69,7 @@
 
         logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
 
-        let result = distributor.addWorkflow(workflow);
+        let result = eventrouter.addWorkflow(workflow);
         if(!result) {
             errorMessage = `failed to register a workflow ${workflow.getId()}`;
             cb(errorMessage, false);
@@ -197,53 +200,6 @@
         return;
     };
 
-    // WebSocket interface for workflow start notification
-    // Message format:
-    // {
-    //     topic: 'cord.workflow.ctlsvc.workflow.kickstart',
-    //     message: {
-    //         req_id: <req_id> // optional
-    //         workflow_id: <workflow_id>,
-    //         workflow_run_id: <workflow_run_id>
-    //     }
-    // }
-    const notifyWorkflowStart = (topic, message, cb) => {
-        const eventrouter = require('./eventrouter.js');
-
-        let errorMessage;
-        if(!message) {
-            // error
-            errorMessage = `Message body for topic ${topic} is null or empty`;
-            logger.log('warn', `Return error - ${errorMessage}`);
-            cb(errorMessage, false);
-            return;
-        }
-
-        if(!('workflow_id' in message)) {
-            // error
-            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
-            logger.log('warn', `Return error - ${errorMessage}`);
-            cb(errorMessage, false);
-            return;
-        }
-
-        if(!('workflow_run_id' in message)) {
-            // error
-            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
-            logger.log('warn', `Return error - ${errorMessage}`);
-            cb(errorMessage, false);
-            return;
-        }
-
-        let workflowRunId = message.workflow_run_id;
-
-        // there must be a workflow matching
-        // set the workflow kickstarted
-        eventrouter.setWorkflowRunKickstarted(workflowRunId);
-        cb(null, true);
-        return;
-    }
-
     // WebSocket interface for workflow removal
     // Message format:
     // {
@@ -324,14 +280,61 @@
         return;
     }
 
+    // WebSocket interface for notifying a new workflow run
+    // Message format:
+    // {
+    //     topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+    //     message: {
+    //         req_id: <req_id> // optional
+    //         workflow_id: <workflow_id>,
+    //         workflow_run_id: <workflow_run_id>
+    //     }
+    // }
+    const notifyNewWorkflowRun = (topic, message, cb) => {
+        const eventrouter = require('./eventrouter.js');
+
+        let errorMessage;
+        if(!message) {
+            // error
+            errorMessage = `Message body for topic ${topic} is null or empty`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        if(!('workflow_run_id' in message)) {
+            // error
+            errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+            logger.log('warn', `Return error - ${errorMessage}`);
+            cb(errorMessage, false);
+            return;
+        }
+
+        let workflowRunId = message.workflow_run_id;
+
+        // there must be a workflow matching
+        // set the workflow kickstarted
+        let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
+        cb(null, result);
+        return;
+    }
+
     const getRouter = () => {
         return {
             registerWorkflow: {
-                topic: serviceEvents.WORKFLOW_REG,
+                topic: serviceEvents.WORKFLOW_REGISTER,
                 handler: registerWorkflow
             },
             registerWorkflowEssence: {
-                topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+                topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
                 handler: registerWorkflowEssence
             },
             listWorkflows: {
@@ -339,25 +342,24 @@
                 handler: listWorkflows
             },
             listWorkflowRuns: {
-                topic: serviceEvents.WORKFLOW_RUN_LIST,
+                topic: serviceEvents.WORKFLOW_LIST_RUN,
                 handler: listWorkflowRuns
             },
             checkWorkflow: {
                 topic: serviceEvents.WORKFLOW_CHECK,
                 handler: checkWorkflow
             },
-            notifyWorkflowStart: {
-                topic: serviceEvents.WORKFLOW_KICKSTART,
-                handler: notifyWorkflowStart,
-                return: false
-            },
             removeWorkflow: {
                 topic: serviceEvents.WORKFLOW_REMOVE,
                 handler: removeWorkflow
             },
             removeWorkflowRun: {
-                topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+                topic: serviceEvents.WORKFLOW_REMOVE_RUN,
                 handler: removeWorkflowRun
+            },
+            notifyNewWorkflowRun: {
+                topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
+                handler: notifyNewWorkflowRun
             }
         };
     };
@@ -384,4 +386,4 @@
         getRouter: getRouter,
         kickstartWorkflow: kickstartWorkflow
     };
-})();
\ No newline at end of file
+})();