Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements
Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 9a54181..975db2e 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -94,6 +94,15 @@
return true;
};
+ const getWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return null;
+ }
+
+ return workflows[workflowId];
+ };
+
const clearWorkflows = () => {
_.forOwn(workflows, (_workflow, workflowId) => {
delete workflows[workflowId];
@@ -133,6 +142,15 @@
return true;
};
+ const getWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
+ return null;
+ }
+
+ return workflowRuns[workflowRunId];
+ };
+
const clearWorkflowRuns = () => {
_.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
delete workflowRuns[workflowRunId];
@@ -379,6 +397,7 @@
// topic: topic sent
// message: {
// req_id: <req_id>,
+ // error: <true/false>,
// result: <true/false>,
// message: <error message>
// }
@@ -411,6 +430,7 @@
return;
}
+ // we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
req_id: req_id,
@@ -428,10 +448,18 @@
// workflow run protocol:
// REQ:
// topic: operation
- // message: <data>
+ // message: {
+ // req_id: <req_id>,
+ // <data>...
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }
+ // message: {
+ // req_id: <req_id>,
+ // error: <true/false>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
// map to WorkflowRun instance
let workflowId = c.getWorkflowId();
@@ -463,18 +491,33 @@
let router = ws_workflowrun.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
- routerElem.handler(routerElem.topic, msg, (err, result) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
+
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: true,
result: false,
message: err
});
return;
}
+ // we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
result: result
});
}
@@ -552,11 +595,13 @@
removeClient: removeClient,
removeClients: removeClients,
addWorkflow: addWorkflow,
+ getWorkflow: getWorkflow,
listWorkflows: listWorkflows,
checkWorkflow: checkWorkflow,
removeWorkflow: removeWorkflow,
clearWorkflows: clearWorkflows,
addWorkflowRun: addWorkflowRun,
+ getWorkflowRun: getWorkflowRun,
listWorkflowRuns: listWorkflowRuns,
checkWorkflowRun: checkWorkflowRun,
removeWorkflowRun: removeWorkflowRun,
@@ -564,4 +609,4 @@
updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
};
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
index 3f3216a..b5eaf13 100644
--- a/src/controllers/websocket.js
+++ b/src/controllers/websocket.js
@@ -26,7 +26,10 @@
let io;
const createSocketIO = (server) => {
// INSTANTIATE SOCKET.IO
- io = socketio.listen(server);
+ io = socketio.listen(server, {
+ pingInterval: 500,
+ pingTimeout: 2000,
+ });
io.use(ioWildcard());
// set io to eventrouter
@@ -89,4 +92,4 @@
// const socket = socketIo.get();
// socket.emit('eventName', data);
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 5c77ebd..3d81bac 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -23,14 +23,17 @@
const logger = require('../config/logger.js');
let serviceEvents = {
- WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
- WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ // manager -> controller -> manager
+ WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
+ WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
- WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
- WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
- WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
+ WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // controller -> manager
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
};
// WebSocket interface for workflow registration
@@ -38,12 +41,12 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.reg',
// message: {
- // req_id: <req_id> // optional
+ // req_id: <req_id>, // optional
// workflow: <workflow>
// }
// }
const registerWorkflow = (topic, message, cb) => {
- const distributor = require('./eventrouter.js/index.js');
+ const eventrouter = require('./eventrouter.js');
let errorMessage;
if(!message) {
@@ -66,7 +69,7 @@
logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
- let result = distributor.addWorkflow(workflow);
+ let result = eventrouter.addWorkflow(workflow);
if(!result) {
errorMessage = `failed to register a workflow ${workflow.getId()}`;
cb(errorMessage, false);
@@ -197,53 +200,6 @@
return;
};
- // WebSocket interface for workflow start notification
- // Message format:
- // {
- // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
- // message: {
- // req_id: <req_id> // optional
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
- // }
- // }
- const notifyWorkflowStart = (topic, message, cb) => {
- const eventrouter = require('./eventrouter.js');
-
- let errorMessage;
- if(!message) {
- // error
- errorMessage = `Message body for topic ${topic} is null or empty`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_id' in message)) {
- // error
- errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_run_id' in message)) {
- // error
- errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- let workflowRunId = message.workflow_run_id;
-
- // there must be a workflow matching
- // set the workflow kickstarted
- eventrouter.setWorkflowRunKickstarted(workflowRunId);
- cb(null, true);
- return;
- }
-
// WebSocket interface for workflow removal
// Message format:
// {
@@ -324,14 +280,61 @@
return;
}
+ // WebSocket interface for notifying a new workflow run
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyNewWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
const getRouter = () => {
return {
registerWorkflow: {
- topic: serviceEvents.WORKFLOW_REG,
+ topic: serviceEvents.WORKFLOW_REGISTER,
handler: registerWorkflow
},
registerWorkflowEssence: {
- topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
handler: registerWorkflowEssence
},
listWorkflows: {
@@ -339,25 +342,24 @@
handler: listWorkflows
},
listWorkflowRuns: {
- topic: serviceEvents.WORKFLOW_RUN_LIST,
+ topic: serviceEvents.WORKFLOW_LIST_RUN,
handler: listWorkflowRuns
},
checkWorkflow: {
topic: serviceEvents.WORKFLOW_CHECK,
handler: checkWorkflow
},
- notifyWorkflowStart: {
- topic: serviceEvents.WORKFLOW_KICKSTART,
- handler: notifyWorkflowStart,
- return: false
- },
removeWorkflow: {
topic: serviceEvents.WORKFLOW_REMOVE,
handler: removeWorkflow
},
removeWorkflowRun: {
- topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ topic: serviceEvents.WORKFLOW_REMOVE_RUN,
handler: removeWorkflowRun
+ },
+ notifyNewWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
+ handler: notifyNewWorkflowRun
}
};
};
@@ -384,4 +386,4 @@
getRouter: getRouter,
kickstartWorkflow: kickstartWorkflow
};
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 036b1ef..4000f10 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -18,12 +18,16 @@
(function () {
'use strict';
+ const _ = require('lodash');
const logger = require('../config/logger.js');
let serviceEvents = {
+ // workflow_run -> controller -> workflow_run
WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
- WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+ WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch',
+ // controller -> workflow_run
+ WORKFLOW_RUN_NOTIFY_EVENT: 'cord.workflow.ctlsvc.workflow.run.notify'
};
// WebSocket interface for workflow status update
@@ -31,10 +35,11 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.status',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>,
- // task_id: <task_id>,
- // status: 'begin' or 'end'
+ // req_id: <req_id>, // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // status: 'begin' or 'end'
// }
// }
const updateWorkflowRunStatus = (topic, message, cb) => {
@@ -87,7 +92,13 @@
message.task_id,
message.status.toLowerCase()
);
- cb(null, result);
+ if(!result) {
+ errorMessage = `failed to update workflow run status ${message.workflow_run_id}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
return;
};
@@ -96,8 +107,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.count',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id>, // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const countQueuedEvents = (topic, message, cb) => {
@@ -138,10 +150,11 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>,
- // task_id: <task_id>,
- // topic: <expected topic>
+ // req_id: <req_id>, // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // topic: <expected topic>
// }
// }
const fetchEvent = (topic, message, cb) => {
@@ -223,8 +236,34 @@
};
};
+ // out-going commands
+ const notifyEvent = (topic) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowRunClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let workflowId = client.getWorkflowId();
+ let workflowRunId = client.getWorkflowRunId();
+
+ let workflow = eventrouter.getWorkflow(workflowId);
+ let workflowRun = eventrouter.getWorkflowRun(workflowRunId);
+ if(workflowRun) {
+ if(workflowRun.isTopicAcceptable(workflow, topic)) {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_RUN_NOTIFY_EVENT, {
+ topic: topic
+ });
+ }
+ }
+ }
+ });
+ return;
+ };
+
module.exports = {
serviceEvents: serviceEvents,
- getRouter: getRouter
+ getRouter: getRouter,
+ notifyEvent: notifyEvent
};
-})();
\ No newline at end of file
+})();