Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 82d27db..9a54181 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -58,6 +58,10 @@
// io = ioInstance;
//};
+ const checkObject = (obj) => {
+ return Object.prototype.toString.call(obj) === '[object Object]';
+ };
+
const destroy = () => {
removeClients();
clearWorkflowRuns();
@@ -178,12 +182,25 @@
}
// check if there are workflow runs
- _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ for(let key in workflowRuns) {
+ if (!workflowRuns.hasOwnProperty(key)) {
+ continue;
+ }
+
+ let workflowRun = workflowRuns[key];
if(workflowRun.getWorkflowId() === workflowId) {
logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
return false;
}
- });
+ }
+
+ // we don't use below code becuase it cannot properly stop and return value with 'return'
+ // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ // if(workflowRun.getWorkflowId() === workflowId) {
+ // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ // return false;
+ // }
+ // });
delete workflows[workflowId];
return true;
@@ -207,6 +224,8 @@
// to check if there are workflow runs for the events
let workflowIdsRunning = [];
+ logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
+
// route event to running instances
_.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
let workflowId = workflowRun.getWorkflowId();
@@ -217,6 +236,7 @@
// (already finished tasks are not counted)
// 2) the task's key field and value
if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
workflowRun.enqueueEvent(topic, message);
@@ -234,6 +254,8 @@
if(!workflowIdsRunning.includes(workflowId)) {
// we need to buffer the event until workflow run is brought up
let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
let workflowRunId = workflowRun.getId();
// register for management
@@ -250,6 +272,17 @@
});
};
+ const countQueuedEvents = (workflowRunId) => {
+ // this counts queued events
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ return workflowRun.lengthEventQueue();
+ };
+
const fetchEvent = (workflowRunId, taskId, topic) => {
// this returns an event or an empty obj when there is no message
if(!(workflowRunId in workflowRuns)) {
@@ -338,10 +371,17 @@
// manager protocol:
// REQ:
// topic: operation
- // message: <data>
+ // message: {
+ // req_id: <req_id>,
+ // <data>...
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }F
+ // message: {
+ // req_id: <req_id>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
allClients[clientId] = c;
workflowManagerClients[clientId] = c;
@@ -349,11 +389,23 @@
let router = ws_manager.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
- routerElem.handler(routerElem.topic, msg, (err, result) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
+
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
- result: false,
+ req_id: req_id,
+ error: true,
+ result: result,
message: err
});
return;
@@ -361,6 +413,8 @@
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
result: result
});
}
@@ -492,6 +546,7 @@
clientType: Client.Type,
//setIO: setIO,
sendEvent: sendEvent,
+ countQueuedEvents: countQueuedEvents,
fetchEvent: fetchEvent,
addClient: addClient,
removeClient: removeClient,
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.reg',
- // message: <workflow>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow: <workflow>
+ // }
// }
- const registWorkflow = (topic, message, cb) => {
+ const registerWorkflow = (topic, message, cb) => {
const distributor = require('./eventrouter.js/index.js');
let errorMessage;
@@ -51,7 +54,15 @@
return;
}
- let workflow = message;
+ if(!('workflow' in message)) {
+ // error
+ errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message.workflow;
logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
@@ -69,8 +80,11 @@
// WebSocket interface for workflow registration (via essence)
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflow.reg',
- // message: <workflow essence>
+ // topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ // message: {
+ // req_id: <req_id> // optional
+ // essence: <workflow essence>
+ // }
// }
const registerWorkflowEssence = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
return;
}
- let essence = message;
+ if(!('essence' in message)) {
+ // error
+ errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message.essence;
let result = true;
let errorResults = [];
@@ -112,9 +134,11 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.list',
- // message: null
+ // message: {
+ // req_id: <req_id> // optional
+ // }
// }
- const listWorkflows = (topic, message, cb) => {
+ const listWorkflows = (_topic, _message, cb) => {
const eventrouter = require('./eventrouter.js');
let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.list',
- // message: null
+ // message: {
+ // req_id: <req_id> // optional
+ // }
// }
- const listWorkflowRuns = (topic, message, cb) => {
+ const listWorkflowRuns = (_topic, _message, cb) => {
const eventrouter = require('./eventrouter.js');
let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.check',
- // message: <workflow_id>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>
+ // }
// }
const checkWorkflow = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
return;
}
- let workflowId = message;
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message.workflow_id;
let result = eventrouter.checkWorkflow(workflowId);
cb(null, result);
return;
@@ -165,8 +202,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.kickstart',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.remove',
- // message: <workflow_id>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>
+ // }
// }
const removeWorkflow = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
- let workflowId = message;
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message.workflow_id;
let result = eventrouter.removeWorkflow(workflowId);
cb(null, result);
return;
@@ -226,8 +284,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.remove',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
const getRouter = () => {
return {
- registWorkflow: {
+ registerWorkflow: {
topic: serviceEvents.WORKFLOW_REG,
- handler: registWorkflow
+ handler: registerWorkflow
},
registerWorkflowEssence: {
topic: serviceEvents.WORKFLOW_REG_ESSENCE,
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index f7ee474..036b1ef 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -22,13 +22,14 @@
let serviceEvents = {
WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+ WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
};
// WebSocket interface for workflow status update
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflowrun.status',
+ // topic: 'cord.workflow.ctlsvc.workflow.run.status',
// message: {
// workflow_id: <workflow_id>,
// workflow_run_id: <workflow_run_id>,
@@ -90,10 +91,52 @@
return;
};
- // WebSocket interface for workflow status update
+ // WebSocket interface for counting queued events
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+ // topic: 'cord.workflow.ctlsvc.workflow.run.count',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const countQueuedEvents = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let count = eventrouter.countQueuedEvents(message.workflow_run_id);
+ cb(null, count);
+ return;
+ };
+
+ // WebSocket interface for fetching an event
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
// message: {
// workflow_id: <workflow_id>,
// workflow_run_id: <workflow_run_id>,
@@ -169,6 +212,10 @@
topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
handler: updateWorkflowRunStatus
},
+ countQueuedEvents: {
+ topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
+ handler: countQueuedEvents
+ },
fetchEvent: {
topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
handler: fetchEvent