Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements
Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 9a54181..975db2e 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -94,6 +94,15 @@
return true;
};
+ const getWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return null;
+ }
+
+ return workflows[workflowId];
+ };
+
const clearWorkflows = () => {
_.forOwn(workflows, (_workflow, workflowId) => {
delete workflows[workflowId];
@@ -133,6 +142,15 @@
return true;
};
+ const getWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
+ return null;
+ }
+
+ return workflowRuns[workflowRunId];
+ };
+
const clearWorkflowRuns = () => {
_.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
delete workflowRuns[workflowRunId];
@@ -379,6 +397,7 @@
// topic: topic sent
// message: {
// req_id: <req_id>,
+ // error: <true/false>,
// result: <true/false>,
// message: <error message>
// }
@@ -411,6 +430,7 @@
return;
}
+ // we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
req_id: req_id,
@@ -428,10 +448,18 @@
// workflow run protocol:
// REQ:
// topic: operation
- // message: <data>
+ // message: {
+ // req_id: <req_id>,
+ // <data>...
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }
+ // message: {
+ // req_id: <req_id>,
+ // error: <true/false>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
// map to WorkflowRun instance
let workflowId = c.getWorkflowId();
@@ -463,18 +491,33 @@
let router = ws_workflowrun.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
- routerElem.handler(routerElem.topic, msg, (err, result) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
+
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: true,
result: false,
message: err
});
return;
}
+ // we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
result: result
});
}
@@ -552,11 +595,13 @@
removeClient: removeClient,
removeClients: removeClients,
addWorkflow: addWorkflow,
+ getWorkflow: getWorkflow,
listWorkflows: listWorkflows,
checkWorkflow: checkWorkflow,
removeWorkflow: removeWorkflow,
clearWorkflows: clearWorkflows,
addWorkflowRun: addWorkflowRun,
+ getWorkflowRun: getWorkflowRun,
listWorkflowRuns: listWorkflowRuns,
checkWorkflowRun: checkWorkflowRun,
removeWorkflowRun: removeWorkflowRun,
@@ -564,4 +609,4 @@
updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
};
-})();
\ No newline at end of file
+})();