Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.reg',
- // message: <workflow>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow: <workflow>
+ // }
// }
- const registWorkflow = (topic, message, cb) => {
+ const registerWorkflow = (topic, message, cb) => {
const distributor = require('./eventrouter.js/index.js');
let errorMessage;
@@ -51,7 +54,15 @@
return;
}
- let workflow = message;
+ if(!('workflow' in message)) {
+ // error
+ errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message.workflow;
logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
@@ -69,8 +80,11 @@
// WebSocket interface for workflow registration (via essence)
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflow.reg',
- // message: <workflow essence>
+ // topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ // message: {
+ // req_id: <req_id> // optional
+ // essence: <workflow essence>
+ // }
// }
const registerWorkflowEssence = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
return;
}
- let essence = message;
+ if(!('essence' in message)) {
+ // error
+ errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message.essence;
let result = true;
let errorResults = [];
@@ -112,9 +134,11 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.list',
- // message: null
+ // message: {
+ // req_id: <req_id> // optional
+ // }
// }
- const listWorkflows = (topic, message, cb) => {
+ const listWorkflows = (_topic, _message, cb) => {
const eventrouter = require('./eventrouter.js');
let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.list',
- // message: null
+ // message: {
+ // req_id: <req_id> // optional
+ // }
// }
- const listWorkflowRuns = (topic, message, cb) => {
+ const listWorkflowRuns = (_topic, _message, cb) => {
const eventrouter = require('./eventrouter.js');
let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.check',
- // message: <workflow_id>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>
+ // }
// }
const checkWorkflow = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
return;
}
- let workflowId = message;
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message.workflow_id;
let result = eventrouter.checkWorkflow(workflowId);
cb(null, result);
return;
@@ -165,8 +202,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.kickstart',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.remove',
- // message: <workflow_id>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>
+ // }
// }
const removeWorkflow = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
- let workflowId = message;
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message.workflow_id;
let result = eventrouter.removeWorkflow(workflowId);
cb(null, result);
return;
@@ -226,8 +284,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.remove',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
const getRouter = () => {
return {
- registWorkflow: {
+ registerWorkflow: {
topic: serviceEvents.WORKFLOW_REG,
- handler: registWorkflow
+ handler: registerWorkflow
},
registerWorkflowEssence: {
topic: serviceEvents.WORKFLOW_REG_ESSENCE,