Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements
Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 5c77ebd..3d81bac 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -23,14 +23,17 @@
const logger = require('../config/logger.js');
let serviceEvents = {
- WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
- WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ // manager -> controller -> manager
+ WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
+ WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
- WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
- WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
- WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
+ WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // controller -> manager
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
};
// WebSocket interface for workflow registration
@@ -38,12 +41,12 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.reg',
// message: {
- // req_id: <req_id> // optional
+ // req_id: <req_id>, // optional
// workflow: <workflow>
// }
// }
const registerWorkflow = (topic, message, cb) => {
- const distributor = require('./eventrouter.js/index.js');
+ const eventrouter = require('./eventrouter.js');
let errorMessage;
if(!message) {
@@ -66,7 +69,7 @@
logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
- let result = distributor.addWorkflow(workflow);
+ let result = eventrouter.addWorkflow(workflow);
if(!result) {
errorMessage = `failed to register a workflow ${workflow.getId()}`;
cb(errorMessage, false);
@@ -197,53 +200,6 @@
return;
};
- // WebSocket interface for workflow start notification
- // Message format:
- // {
- // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
- // message: {
- // req_id: <req_id> // optional
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
- // }
- // }
- const notifyWorkflowStart = (topic, message, cb) => {
- const eventrouter = require('./eventrouter.js');
-
- let errorMessage;
- if(!message) {
- // error
- errorMessage = `Message body for topic ${topic} is null or empty`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_id' in message)) {
- // error
- errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_run_id' in message)) {
- // error
- errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- let workflowRunId = message.workflow_run_id;
-
- // there must be a workflow matching
- // set the workflow kickstarted
- eventrouter.setWorkflowRunKickstarted(workflowRunId);
- cb(null, true);
- return;
- }
-
// WebSocket interface for workflow removal
// Message format:
// {
@@ -324,14 +280,61 @@
return;
}
+ // WebSocket interface for notifying a new workflow run
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyNewWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
const getRouter = () => {
return {
registerWorkflow: {
- topic: serviceEvents.WORKFLOW_REG,
+ topic: serviceEvents.WORKFLOW_REGISTER,
handler: registerWorkflow
},
registerWorkflowEssence: {
- topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
handler: registerWorkflowEssence
},
listWorkflows: {
@@ -339,25 +342,24 @@
handler: listWorkflows
},
listWorkflowRuns: {
- topic: serviceEvents.WORKFLOW_RUN_LIST,
+ topic: serviceEvents.WORKFLOW_LIST_RUN,
handler: listWorkflowRuns
},
checkWorkflow: {
topic: serviceEvents.WORKFLOW_CHECK,
handler: checkWorkflow
},
- notifyWorkflowStart: {
- topic: serviceEvents.WORKFLOW_KICKSTART,
- handler: notifyWorkflowStart,
- return: false
- },
removeWorkflow: {
topic: serviceEvents.WORKFLOW_REMOVE,
handler: removeWorkflow
},
removeWorkflowRun: {
- topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ topic: serviceEvents.WORKFLOW_REMOVE_RUN,
handler: removeWorkflowRun
+ },
+ notifyNewWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
+ handler: notifyNewWorkflowRun
}
};
};
@@ -384,4 +386,4 @@
getRouter: getRouter,
kickstartWorkflow: kickstartWorkflow
};
-})();
\ No newline at end of file
+})();