Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 82d27db..9a54181 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -58,6 +58,10 @@
// io = ioInstance;
//};
+ const checkObject = (obj) => {
+ return Object.prototype.toString.call(obj) === '[object Object]';
+ };
+
const destroy = () => {
removeClients();
clearWorkflowRuns();
@@ -178,12 +182,25 @@
}
// check if there are workflow runs
- _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ for(let key in workflowRuns) {
+ if (!workflowRuns.hasOwnProperty(key)) {
+ continue;
+ }
+
+ let workflowRun = workflowRuns[key];
if(workflowRun.getWorkflowId() === workflowId) {
logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
return false;
}
- });
+ }
+
+ // we don't use below code becuase it cannot properly stop and return value with 'return'
+ // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ // if(workflowRun.getWorkflowId() === workflowId) {
+ // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ // return false;
+ // }
+ // });
delete workflows[workflowId];
return true;
@@ -207,6 +224,8 @@
// to check if there are workflow runs for the events
let workflowIdsRunning = [];
+ logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
+
// route event to running instances
_.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
let workflowId = workflowRun.getWorkflowId();
@@ -217,6 +236,7 @@
// (already finished tasks are not counted)
// 2) the task's key field and value
if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
workflowRun.enqueueEvent(topic, message);
@@ -234,6 +254,8 @@
if(!workflowIdsRunning.includes(workflowId)) {
// we need to buffer the event until workflow run is brought up
let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
let workflowRunId = workflowRun.getId();
// register for management
@@ -250,6 +272,17 @@
});
};
+ const countQueuedEvents = (workflowRunId) => {
+ // this counts queued events
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ return workflowRun.lengthEventQueue();
+ };
+
const fetchEvent = (workflowRunId, taskId, topic) => {
// this returns an event or an empty obj when there is no message
if(!(workflowRunId in workflowRuns)) {
@@ -338,10 +371,17 @@
// manager protocol:
// REQ:
// topic: operation
- // message: <data>
+ // message: {
+ // req_id: <req_id>,
+ // <data>...
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }F
+ // message: {
+ // req_id: <req_id>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
allClients[clientId] = c;
workflowManagerClients[clientId] = c;
@@ -349,11 +389,23 @@
let router = ws_manager.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
- routerElem.handler(routerElem.topic, msg, (err, result) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
+
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
- result: false,
+ req_id: req_id,
+ error: true,
+ result: result,
message: err
});
return;
@@ -361,6 +413,8 @@
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
result: result
});
}
@@ -492,6 +546,7 @@
clientType: Client.Type,
//setIO: setIO,
sendEvent: sendEvent,
+ countQueuedEvents: countQueuedEvents,
fetchEvent: fetchEvent,
addClient: addClient,
removeClient: removeClient,
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index b3e8877..5c77ebd 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -37,9 +37,12 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.reg',
- // message: <workflow>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow: <workflow>
+ // }
// }
- const registWorkflow = (topic, message, cb) => {
+ const registerWorkflow = (topic, message, cb) => {
const distributor = require('./eventrouter.js/index.js');
let errorMessage;
@@ -51,7 +54,15 @@
return;
}
- let workflow = message;
+ if(!('workflow' in message)) {
+ // error
+ errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflow = message.workflow;
logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
@@ -69,8 +80,11 @@
// WebSocket interface for workflow registration (via essence)
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflow.reg',
- // message: <workflow essence>
+ // topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ // message: {
+ // req_id: <req_id> // optional
+ // essence: <workflow essence>
+ // }
// }
const registerWorkflowEssence = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
@@ -83,7 +97,15 @@
return;
}
- let essence = message;
+ if(!('essence' in message)) {
+ // error
+ errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let essence = message.essence;
let result = true;
let errorResults = [];
@@ -112,9 +134,11 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.list',
- // message: null
+ // message: {
+ // req_id: <req_id> // optional
+ // }
// }
- const listWorkflows = (topic, message, cb) => {
+ const listWorkflows = (_topic, _message, cb) => {
const eventrouter = require('./eventrouter.js');
let result = eventrouter.listWorkflows();
@@ -126,9 +150,11 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.list',
- // message: null
+ // message: {
+ // req_id: <req_id> // optional
+ // }
// }
- const listWorkflowRuns = (topic, message, cb) => {
+ const listWorkflowRuns = (_topic, _message, cb) => {
const eventrouter = require('./eventrouter.js');
let result = eventrouter.listWorkflowRuns();
@@ -140,7 +166,10 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.check',
- // message: <workflow_id>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>
+ // }
// }
const checkWorkflow = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
@@ -154,7 +183,15 @@
return;
}
- let workflowId = message;
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message.workflow_id;
let result = eventrouter.checkWorkflow(workflowId);
cb(null, result);
return;
@@ -165,8 +202,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.kickstart',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const notifyWorkflowStart = (topic, message, cb) => {
@@ -210,12 +248,32 @@
// Message format:
// {
// topic: 'cord.workflow.ctlsvc.workflow.remove',
- // message: <workflow_id>
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>
+ // }
// }
const removeWorkflow = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
- let workflowId = message;
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowId = message.workflow_id;
let result = eventrouter.removeWorkflow(workflowId);
cb(null, result);
return;
@@ -226,8 +284,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.remove',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const removeWorkflowRun = (topic, message, cb) => {
@@ -267,9 +326,9 @@
const getRouter = () => {
return {
- registWorkflow: {
+ registerWorkflow: {
topic: serviceEvents.WORKFLOW_REG,
- handler: registWorkflow
+ handler: registerWorkflow
},
registerWorkflowEssence: {
topic: serviceEvents.WORKFLOW_REG_ESSENCE,
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index f7ee474..036b1ef 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -22,13 +22,14 @@
let serviceEvents = {
WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
+ WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
};
// WebSocket interface for workflow status update
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflowrun.status',
+ // topic: 'cord.workflow.ctlsvc.workflow.run.status',
// message: {
// workflow_id: <workflow_id>,
// workflow_run_id: <workflow_run_id>,
@@ -90,10 +91,52 @@
return;
};
- // WebSocket interface for workflow status update
+ // WebSocket interface for counting queued events
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflowrun.fetch',
+ // topic: 'cord.workflow.ctlsvc.workflow.run.count',
+ // message: {
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const countQueuedEvents = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `workflow_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `workflow_run_id field is not in message body - ${message}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let count = eventrouter.countQueuedEvents(message.workflow_run_id);
+ cb(null, count);
+ return;
+ };
+
+ // WebSocket interface for fetching an event
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
// message: {
// workflow_id: <workflow_id>,
// workflow_run_id: <workflow_run_id>,
@@ -169,6 +212,10 @@
topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
handler: updateWorkflowRunStatus
},
+ countQueuedEvents: {
+ topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
+ handler: countQueuedEvents
+ },
fetchEvent: {
topic: serviceEvents.WORKFLOW_RUN_FETCH_EVENT,
handler: fetchEvent
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
index 65027ff..4b0b6b0 100644
--- a/src/types/workflowrun.js
+++ b/src/types/workflowrun.js
@@ -79,7 +79,9 @@
workflowRun.addRunTask(runTask);
// set key_field / value
- workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+ if(task.isCORDTask()) {
+ workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
+ }
});
return workflowRun;
}
@@ -152,22 +154,21 @@
return true;
}
- isEventKeyFieldValueAcceptable(topic, field, value) {
+ updateEventKeyFieldValueFromMessage(topic, message) {
if(!(topic in this.eventKeyFieldValues)) {
- // topic does not exist
+ logger.log('warn', `cannot find a topic ${topic} in event key field values`);
return false;
}
let keyFieldValues = this.eventKeyFieldValues[topic];
- let index = _.findIndex(keyFieldValues, (keyFieldValue) => {
- return (keyFieldValue.field === field) &&
- ((!keyFieldValue.value) || (keyFieldValue.value === value));
+ keyFieldValues.forEach((keyFieldValue) => {
+ if(keyFieldValue.field in message) {
+ // has same field in the message
+ // set value
+ keyFieldValue['value'] = message[keyFieldValue.field];
+ }
});
-
- if(index >= 0) {
- return true;
- }
- return false;
+ return true;
}
isEventAcceptableByKeyFieldValue(topic, message) {
@@ -176,17 +177,43 @@
return false;
}
- let keyFieldValues = this.eventKeyFieldValues[topic];
- keyFieldValues.forEach((keyFieldValue) => {
- if(keyFieldValue.field in message) {
- // has same field in the message
- // check value
- if(keyFieldValue.value === message[keyFieldValue.field]) {
- // has the same value
- return true;
+ // check all key-field values
+ for(let key in this.eventKeyFieldValues) {
+ if (!this.eventKeyFieldValues.hasOwnProperty(key)) {
+ continue;
+ }
+
+ let keyFieldValues = this.eventKeyFieldValues[key];
+ let arrayLength = keyFieldValues.length;
+ for (var i = 0; i < arrayLength; i++) {
+ let keyFieldValue = keyFieldValues[i];
+
+ if(keyFieldValue.field in message) {
+ // has same field in the message
+ // check value
+ if(keyFieldValue.value === message[keyFieldValue.field]) {
+ // has the same value
+ return true;
+ }
}
}
- });
+ }
+
+ // We cannot break the loop when we get the result.
+ // because return/break does not work with handler functions
+ // _.forOwn(this.eventKeyFieldValues, (keyFieldValues, _topic) => {
+ // keyFieldValues.forEach((keyFieldValue) => {
+ // if(keyFieldValue.field in message) {
+ // // has same field in the message
+ // // check value
+ // if(keyFieldValue.value === message[keyFieldValue.field]) {
+ // // has the same value
+ // result = true;
+ // }
+ // }
+ // });
+ // });
+
return false;
}
@@ -295,8 +322,11 @@
// 2) the task's key field and value
if(this.isTopicAcceptable(workflow, topic) &&
this.isEventAcceptableByKeyFieldValue(topic, message)) {
+ // update key-field values for my topic
+ this.updateEventKeyFieldValueFromMessage(topic, message);
return true;
}
+
return false;
}
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
index efde42b..8e18700 100644
--- a/src/types/workflowtask.js
+++ b/src/types/workflowtask.js
@@ -20,9 +20,15 @@
const logger = require('../config/logger.js');
+ const CORD_SENSORS = [
+ 'XOSEventSensor', 'XOSModelSensor',
+ 'CORDEventSensor', 'CORDModelSensor'
+ ];
+
class WorkflowTask {
constructor(id, kickstart=false) {
this.id = id;
+ this.sensorClass = null;
this.topic = null;
this.kickstart = kickstart;
this.keyField = null;
@@ -40,20 +46,22 @@
return null;
}
+ if('class' in essence) {
+ workflowTask.setSensorClass(essence.class);
+ }
+
if('topic' in essence) {
workflowTask.setTopic(essence.topic);
}
if('model_name' in essence) {
- workflowTask.setTopic('datamodel.' + essence.model_name + '.create');
- workflowTask.setTopic('datamodel.' + essence.model_name + '.update');
- workflowTask.setTopic('datamodel.' + essence.model_name + '.delete');
+ workflowTask.setTopic('datamodel.' + essence.model_name);
}
if('key_field' in essence) {
workflowTask.setKeyField(essence.key_field);
}
-
+
workflowTask.setEssence(essence);
return workflowTask;
}
@@ -70,6 +78,23 @@
return this.id;
}
+ setSensorClass(sensorClass) {
+ this.sensorClass = sensorClass;
+ }
+
+ getSensorClass() {
+ return this.sensorClass;
+ }
+
+ isCORDTask() {
+ if(this.sensorClass) {
+ if(CORD_SENSORS.includes(this.sensorClass)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
setTopic(topic) {
this.topic = topic;
}
@@ -89,7 +114,7 @@
setKeyField(keyField) {
this.keyField = keyField;
}
-
+
getKeyField() {
return this.keyField;
}
@@ -109,21 +134,21 @@
}
// general Airflow operators other than XOS operators don't have these fields.
- //
+ //
// if(!this.topic) {
// logger.log('error', 'topic is not given');
// return false;
// }
-
+
// if(!this.keyField) {
// logger.log('error', 'keyField is not given');
// return false;
// }
-
+
return true;
}
}
-
+
module.exports = {
WorkflowTask: WorkflowTask
};