Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 82d27db..9a54181 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -58,6 +58,10 @@
// io = ioInstance;
//};
+ const checkObject = (obj) => {
+ return Object.prototype.toString.call(obj) === '[object Object]';
+ };
+
const destroy = () => {
removeClients();
clearWorkflowRuns();
@@ -178,12 +182,25 @@
}
// check if there are workflow runs
- _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ for(let key in workflowRuns) {
+ if (!workflowRuns.hasOwnProperty(key)) {
+ continue;
+ }
+
+ let workflowRun = workflowRuns[key];
if(workflowRun.getWorkflowId() === workflowId) {
logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
return false;
}
- });
+ }
+
+ // we don't use below code becuase it cannot properly stop and return value with 'return'
+ // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
+ // if(workflowRun.getWorkflowId() === workflowId) {
+ // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
+ // return false;
+ // }
+ // });
delete workflows[workflowId];
return true;
@@ -207,6 +224,8 @@
// to check if there are workflow runs for the events
let workflowIdsRunning = [];
+ logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
+
// route event to running instances
_.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
let workflowId = workflowRun.getWorkflowId();
@@ -217,6 +236,7 @@
// (already finished tasks are not counted)
// 2) the task's key field and value
if(workflowRun.isEventAcceptable(workflow, topic, message)) {
+ //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
workflowRun.enqueueEvent(topic, message);
@@ -234,6 +254,8 @@
if(!workflowIdsRunning.includes(workflowId)) {
// we need to buffer the event until workflow run is brought up
let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
let workflowRunId = workflowRun.getId();
// register for management
@@ -250,6 +272,17 @@
});
};
+ const countQueuedEvents = (workflowRunId) => {
+ // this counts queued events
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `workflow run ${workflowRunId} does not exist`);
+ return null;
+ }
+
+ let workflowRun = workflowRuns[workflowRunId];
+ return workflowRun.lengthEventQueue();
+ };
+
const fetchEvent = (workflowRunId, taskId, topic) => {
// this returns an event or an empty obj when there is no message
if(!(workflowRunId in workflowRuns)) {
@@ -338,10 +371,17 @@
// manager protocol:
// REQ:
// topic: operation
- // message: <data>
+ // message: {
+ // req_id: <req_id>,
+ // <data>...
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }F
+ // message: {
+ // req_id: <req_id>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
allClients[clientId] = c;
workflowManagerClients[clientId] = c;
@@ -349,11 +389,23 @@
let router = ws_manager.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
- routerElem.handler(routerElem.topic, msg, (err, result) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
+
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
- result: false,
+ req_id: req_id,
+ error: true,
+ result: result,
message: err
});
return;
@@ -361,6 +413,8 @@
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
result: result
});
}
@@ -492,6 +546,7 @@
clientType: Client.Type,
//setIO: setIO,
sendEvent: sendEvent,
+ countQueuedEvents: countQueuedEvents,
fetchEvent: fetchEvent,
addClient: addClient,
removeClient: removeClient,