Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements
Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/package.json b/package.json
index 2604522..676fcb1 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "cord_workflow_controller",
- "version": "0.1.1",
+ "version": "0.1.2",
"description": "CORD Workflow Controller",
"main": "src/server.js",
"scripts": {
diff --git a/spec/clients.spec.js b/spec/clients.spec.js
index 9996944..1a40975 100644
--- a/spec/clients.spec.js
+++ b/spec/clients.spec.js
@@ -80,7 +80,7 @@
workflowRunId = message.workflow_run_id;
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
})
@@ -128,12 +128,12 @@
if(register) {
let essence = essenceLoader.loadEssence(essenceFileName, true);
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE, {
essence: essence
});
workflowManagerClient.on(
- eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE,
(workflowRegResult) => {
callback(null, workflowRegResult);
}
@@ -182,7 +182,7 @@
afterEach(function(done) {
// remove workflow run
- workflowManagerClient.emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
});
@@ -325,4 +325,4 @@
}, 100);
});
});
-})();
\ No newline at end of file
+})();
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index df594fb..acf1e39 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -87,7 +87,7 @@
setTimeout(() => {
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
@@ -111,6 +111,14 @@
workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
receivedKickstartMessages[1].push(message);
+
+ setTimeout(() => {
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflow_id: message.workflow_id,
+ workflow_run_id: message.workflow_run_id
+ })
+ }, 1000);
});
workflowManagerClient.on('connect', () => {
@@ -124,7 +132,7 @@
let essence = essenceLoader.loadEssence(essenceFileName, true);
// register the workflow
- workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE, {
essence: essence
});
@@ -135,7 +143,7 @@
// handle return
workflowManagerClients[0].on(
- eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE,
(workflowRegResult) => {
callback(null, workflowRegResult);
}
@@ -158,7 +166,7 @@
afterEach(function() {
// remove workflow runs
_.forOwn(workflowRunInfos, (workflowRunInfo) => {
- workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
workflow_id: workflowRunInfo.workflowId,
workflow_run_id: workflowRunInfo.workflowRunId
});
@@ -213,6 +221,8 @@
});
it('all managers should receive kickstart messages', function(done) {
+ this.timeout(5000);
+
// kickstart the workflow
probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
setTimeout(() => {
@@ -221,7 +231,7 @@
expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
});
done();
- }, 1000);
+ }, 2000);
});
it('should have only one workflow run', function(done) {
@@ -353,4 +363,4 @@
}, 2000);
});
});
-})();
\ No newline at end of file
+})();
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
index bd37499..90d8bfc 100644
--- a/spec/test_clients_workflow_essence.json
+++ b/spec/test_clients_workflow_essence.json
@@ -19,6 +19,7 @@
"tasks": {
"task1": {
"class": "XOSEventSensor",
+ "dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task1",
@@ -30,6 +31,7 @@
},
"task2": {
"class": "XOSModelSensor",
+ "dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task2",
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
index 572a432..7d033f5 100644
--- a/spec/test_multi_workflow_essence.json
+++ b/spec/test_multi_workflow_essence.json
@@ -27,6 +27,7 @@
"tasks": {
"must_not_be_called_handler": {
"class": "UnknownSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"local_variable": "must_not_be_called_handler",
"poke_interval": 5,
@@ -35,6 +36,7 @@
},
"onu_event_handler": {
"class": "XOSEventSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
@@ -46,6 +48,7 @@
},
"onu_model_event_handler": {
"class": "XOSModelSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
@@ -85,6 +88,7 @@
"tasks": {
"onu_event_handler": {
"class": "XOSEventSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
@@ -96,6 +100,7 @@
},
"onu_model_event_handler": {
"class": "XOSModelSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
@@ -107,6 +112,7 @@
},
"can_be_stuck_handler": {
"class": "UnknownSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"local_variable": "can_be_stuck_handler",
"poke_interval": 5,
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 9a54181..975db2e 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -94,6 +94,15 @@
return true;
};
+ const getWorkflow = (workflowId) => {
+ if(workflowId in workflows) {
+ logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
+ return null;
+ }
+
+ return workflows[workflowId];
+ };
+
const clearWorkflows = () => {
_.forOwn(workflows, (_workflow, workflowId) => {
delete workflows[workflowId];
@@ -133,6 +142,15 @@
return true;
};
+ const getWorkflowRun = (workflowRunId) => {
+ if(workflowRunId in workflowRuns) {
+ logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
+ return null;
+ }
+
+ return workflowRuns[workflowRunId];
+ };
+
const clearWorkflowRuns = () => {
_.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
delete workflowRuns[workflowRunId];
@@ -379,6 +397,7 @@
// topic: topic sent
// message: {
// req_id: <req_id>,
+ // error: <true/false>,
// result: <true/false>,
// message: <error message>
// }
@@ -411,6 +430,7 @@
return;
}
+ // we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
req_id: req_id,
@@ -428,10 +448,18 @@
// workflow run protocol:
// REQ:
// topic: operation
- // message: <data>
+ // message: {
+ // req_id: <req_id>,
+ // <data>...
+ // }
// RES:
// topic: topic sent
- // message: {result: <true/false>, message: <error message> }
+ // message: {
+ // req_id: <req_id>,
+ // error: <true/false>,
+ // result: <true/false>,
+ // message: <error message>
+ // }
// map to WorkflowRun instance
let workflowId = c.getWorkflowId();
@@ -463,18 +491,33 @@
let router = ws_workflowrun.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
- routerElem.handler(routerElem.topic, msg, (err, result) => {
+ // handle a common parameter - req_id
+ // when we get req_id, return the same req_id in response.
+ // this is to help identify a request from a response at client-side
+ let req_id = 101010; // default number, signiture
+ if(msg && checkObject(msg)) {
+ if('req_id' in msg) {
+ req_id = msg.req_id;
+ }
+ }
+
+ routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
if(err) {
logger.log('warn', `unable to handle a message - ${err}`);
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: true,
result: false,
message: err
});
return;
}
+ // we return result
if(routerElem.return === undefined || routerElem.return) {
socket.emit(routerElem.topic, {
+ req_id: req_id,
+ error: false,
result: result
});
}
@@ -552,11 +595,13 @@
removeClient: removeClient,
removeClients: removeClients,
addWorkflow: addWorkflow,
+ getWorkflow: getWorkflow,
listWorkflows: listWorkflows,
checkWorkflow: checkWorkflow,
removeWorkflow: removeWorkflow,
clearWorkflows: clearWorkflows,
addWorkflowRun: addWorkflowRun,
+ getWorkflowRun: getWorkflowRun,
listWorkflowRuns: listWorkflowRuns,
checkWorkflowRun: checkWorkflowRun,
removeWorkflowRun: removeWorkflowRun,
@@ -564,4 +609,4 @@
updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
};
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/websocket.js b/src/controllers/websocket.js
index 3f3216a..b5eaf13 100644
--- a/src/controllers/websocket.js
+++ b/src/controllers/websocket.js
@@ -26,7 +26,10 @@
let io;
const createSocketIO = (server) => {
// INSTANTIATE SOCKET.IO
- io = socketio.listen(server);
+ io = socketio.listen(server, {
+ pingInterval: 500,
+ pingTimeout: 2000,
+ });
io.use(ioWildcard());
// set io to eventrouter
@@ -89,4 +92,4 @@
// const socket = socketIo.get();
// socket.emit('eventName', data);
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 5c77ebd..3d81bac 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -23,14 +23,17 @@
const logger = require('../config/logger.js');
let serviceEvents = {
- WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
- WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
+ // manager -> controller -> manager
+ WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
+ WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
- WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
+ WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
- WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
- WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
+ WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
+ WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // controller -> manager
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
};
// WebSocket interface for workflow registration
@@ -38,12 +41,12 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.reg',
// message: {
- // req_id: <req_id> // optional
+ // req_id: <req_id>, // optional
// workflow: <workflow>
// }
// }
const registerWorkflow = (topic, message, cb) => {
- const distributor = require('./eventrouter.js/index.js');
+ const eventrouter = require('./eventrouter.js');
let errorMessage;
if(!message) {
@@ -66,7 +69,7 @@
logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
- let result = distributor.addWorkflow(workflow);
+ let result = eventrouter.addWorkflow(workflow);
if(!result) {
errorMessage = `failed to register a workflow ${workflow.getId()}`;
cb(errorMessage, false);
@@ -197,53 +200,6 @@
return;
};
- // WebSocket interface for workflow start notification
- // Message format:
- // {
- // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
- // message: {
- // req_id: <req_id> // optional
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
- // }
- // }
- const notifyWorkflowStart = (topic, message, cb) => {
- const eventrouter = require('./eventrouter.js');
-
- let errorMessage;
- if(!message) {
- // error
- errorMessage = `Message body for topic ${topic} is null or empty`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_id' in message)) {
- // error
- errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_run_id' in message)) {
- // error
- errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- let workflowRunId = message.workflow_run_id;
-
- // there must be a workflow matching
- // set the workflow kickstarted
- eventrouter.setWorkflowRunKickstarted(workflowRunId);
- cb(null, true);
- return;
- }
-
// WebSocket interface for workflow removal
// Message format:
// {
@@ -324,14 +280,61 @@
return;
}
+ // WebSocket interface for notifying a new workflow run
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
+ // }
+ // }
+ const notifyNewWorkflowRun = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+
+ // there must be a workflow matching
+ // set the workflow kickstarted
+ let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
+ cb(null, result);
+ return;
+ }
+
const getRouter = () => {
return {
registerWorkflow: {
- topic: serviceEvents.WORKFLOW_REG,
+ topic: serviceEvents.WORKFLOW_REGISTER,
handler: registerWorkflow
},
registerWorkflowEssence: {
- topic: serviceEvents.WORKFLOW_REG_ESSENCE,
+ topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
handler: registerWorkflowEssence
},
listWorkflows: {
@@ -339,25 +342,24 @@
handler: listWorkflows
},
listWorkflowRuns: {
- topic: serviceEvents.WORKFLOW_RUN_LIST,
+ topic: serviceEvents.WORKFLOW_LIST_RUN,
handler: listWorkflowRuns
},
checkWorkflow: {
topic: serviceEvents.WORKFLOW_CHECK,
handler: checkWorkflow
},
- notifyWorkflowStart: {
- topic: serviceEvents.WORKFLOW_KICKSTART,
- handler: notifyWorkflowStart,
- return: false
- },
removeWorkflow: {
topic: serviceEvents.WORKFLOW_REMOVE,
handler: removeWorkflow
},
removeWorkflowRun: {
- topic: serviceEvents.WORKFLOW_RUN_REMOVE,
+ topic: serviceEvents.WORKFLOW_REMOVE_RUN,
handler: removeWorkflowRun
+ },
+ notifyNewWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
+ handler: notifyNewWorkflowRun
}
};
};
@@ -384,4 +386,4 @@
getRouter: getRouter,
kickstartWorkflow: kickstartWorkflow
};
-})();
\ No newline at end of file
+})();
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 036b1ef..4000f10 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -18,12 +18,16 @@
(function () {
'use strict';
+ const _ = require('lodash');
const logger = require('../config/logger.js');
let serviceEvents = {
+ // workflow_run -> controller -> workflow_run
WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
- WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch'
+ WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch',
+ // controller -> workflow_run
+ WORKFLOW_RUN_NOTIFY_EVENT: 'cord.workflow.ctlsvc.workflow.run.notify'
};
// WebSocket interface for workflow status update
@@ -31,10 +35,11 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.status',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>,
- // task_id: <task_id>,
- // status: 'begin' or 'end'
+ // req_id: <req_id>, // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // status: 'begin' or 'end'
// }
// }
const updateWorkflowRunStatus = (topic, message, cb) => {
@@ -87,7 +92,13 @@
message.task_id,
message.status.toLowerCase()
);
- cb(null, result);
+ if(!result) {
+ errorMessage = `failed to update workflow run status ${message.workflow_run_id}`;
+ cb(errorMessage, false);
+ }
+ else {
+ cb(null, true);
+ }
return;
};
@@ -96,8 +107,9 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.count',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>
+ // req_id: <req_id>, // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>
// }
// }
const countQueuedEvents = (topic, message, cb) => {
@@ -138,10 +150,11 @@
// {
// topic: 'cord.workflow.ctlsvc.workflow.run.fetch',
// message: {
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>,
- // task_id: <task_id>,
- // topic: <expected topic>
+ // req_id: <req_id>, // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // task_id: <task_id>,
+ // topic: <expected topic>
// }
// }
const fetchEvent = (topic, message, cb) => {
@@ -223,8 +236,34 @@
};
};
+ // out-going commands
+ const notifyEvent = (topic) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowRunClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let workflowId = client.getWorkflowId();
+ let workflowRunId = client.getWorkflowRunId();
+
+ let workflow = eventrouter.getWorkflow(workflowId);
+ let workflowRun = eventrouter.getWorkflowRun(workflowRunId);
+ if(workflowRun) {
+ if(workflowRun.isTopicAcceptable(workflow, topic)) {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_RUN_NOTIFY_EVENT, {
+ topic: topic
+ });
+ }
+ }
+ }
+ });
+ return;
+ };
+
module.exports = {
serviceEvents: serviceEvents,
- getRouter: getRouter
+ getRouter: getRouter,
+ notifyEvent: notifyEvent
};
-})();
\ No newline at end of file
+})();
diff --git a/src/workflows/hello_workflow.json b/src/workflows/hello_workflow.json
index ebf94af..9de71bc 100644
--- a/src/workflows/hello_workflow.json
+++ b/src/workflows/hello_workflow.json
@@ -10,6 +10,7 @@
"tasks": {
"onu_event_handler": {
"class": "XOSEventSensor",
+ "dag_id": "hello_workflow",
"dag": "dag_hello",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",