Implement bulk status update of workflow runs
Change-Id: I6b67048f502d42a2572a936944c9e54300076478
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 4d9ee47..334c08b 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -45,6 +45,19 @@
GREETING: 'cord.workflow.ctlsvc.greeting'
};
+ setInterval(function () {
+ let requests = [];
+ _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+ let obj = {
+ workflow_id: workflowRun.getWorkflowId(),
+ workflow_run_id: workflowRunId
+ };
+ requests.push(obj);
+ });
+
+ checkWorkflowRunStatusBulk(requests);
+ }, 5000);
+
// add ws_probe events
_.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
@@ -185,13 +198,13 @@
return true;
};
- const setWorkflowRunState = (workflowRunId, state) => {
+ const setWorkflowRunStatus = (workflowRunId, status) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
- if(state in ['success', 'failed', 'end']) {
+ if(status in ['success', 'failed', 'end']) {
removeWorkflowRun(workflowRunId);
}
return true;
@@ -212,6 +225,31 @@
return true;
};
+ /*
+ const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
+ return true;
+ };
+ */
+
+ const checkWorkflowRunStatusBulk = (requests) => {
+ if(requests) {
+ ws_manager.checkWorkflowRunStatusBulk(requests);
+ return true;
+ }
+ return false;
+ };
+
const removeWorkflow = (workflowId) => {
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
@@ -391,6 +429,8 @@
let router = ws_probe.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
+ logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
@@ -450,6 +490,8 @@
let router = ws_manager.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
+ logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
@@ -533,6 +575,8 @@
let router = ws_workflowrun.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
+ logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
@@ -650,6 +694,6 @@
clearWorkflowRuns: clearWorkflowRuns,
updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
- setWorkflowRunState: setWorkflowRunState
+ setWorkflowRunStatus: setWorkflowRunStatus
};
})();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 752ee88..4ce95c5 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -32,10 +32,12 @@
WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
WORKFLOW_REPORT_NEW_RUN: 'cord.workflow.ctlsvc.workflow.report_new_run',
- WORKFLOW_REPORT_RUN_STATE: 'cord.workflow.ctlsvc.workflow.report_run_state',
+ WORKFLOW_REPORT_RUN_STATUS: 'cord.workflow.ctlsvc.workflow.report_run_status',
+ WORKFLOW_REPORT_RUN_STATUS_BULK: 'cord.workflow.ctlsvc.workflow.report_run_status_bulk',
// controller -> manager
WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
- WORKFLOW_CHECK_STATE: 'cord.workflow.ctlsvc.workflow.check.state'
+ WORKFLOW_CHECK_STATUS: 'cord.workflow.ctlsvc.workflow.check.status',
+ WORKFLOW_CHECK_STATUS_BULK: 'cord.workflow.ctlsvc.workflow.check.status_bulk',
};
// WebSocket interface for workflow registration
@@ -329,18 +331,18 @@
return;
};
- // WebSocket interface for reporting workflow run state
+ // WebSocket interface for reporting workflow run status
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflow.report_run_state',
+ // topic: 'cord.workflow.ctlsvc.workflow.report_run_status',
// message: {
// req_id: <req_id> // optional
// workflow_id: <workflow_id>,
// workflow_run_id: <workflow_run_id>,
- // state: one of ['success', 'running', 'failed', 'unknown']
+ // status: one of ['success', 'running', 'failed', 'unknown']
// }
// }
- const reportWorkflowRunState = (topic, message, cb) => {
+ const reportWorkflowRunStatus = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
let errorMessage;
@@ -368,24 +370,96 @@
return;
}
- if(!('state' in message)) {
+ if(!('status' in message)) {
// error
- errorMessage = `field 'state' does not exist in message body - ${JSON.stringify(message)}`;
+ errorMessage = `field 'status' does not exist in message body - ${JSON.stringify(message)}`;
logger.log('warn', `Return error - ${errorMessage}`);
cb(errorMessage, false);
return;
}
let workflowRunId = message.workflow_run_id;
- let state = message.state;
+ let status = message.status;
// there must be a workflow matching
- // set workflow state
- let result = eventrouter.setWorkflowRunState(workflowRunId, state);
+ // set workflow status
+ let result = eventrouter.setWorkflowRunStatus(workflowRunId, status);
cb(null, result);
return;
}
+ // WebSocket interface for reporting workflow run status bulk
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.report_run_status_bulk',
+ // message: {
+ // req_id: <req_id> // optional
+ // data: [{
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // status: one of ['success', 'running', 'failed', 'unknown']
+ // }, ...]
+ // }
+ // }
+ const reportWorkflowRunStatusBulk = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('data' in message)) {
+ // error
+ errorMessage = `field 'data' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let results = [];
+ for(let d in message.data) {
+ if(!('workflow_id' in d)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(d)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in d)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(d)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('status' in d)) {
+ // error
+ errorMessage = `field 'status' does not exist in message body - ${JSON.stringify(d)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = d.workflow_run_id;
+ let status = d.status;
+
+ // there must be a workflow matching
+ // set workflow status
+ let result = eventrouter.setWorkflowRunStatus(workflowRunId, status);
+ results.append(result);
+ }
+
+ cb(null, results);
+ return;
+ }
+
const getRouter = () => {
return {
registerWorkflow: {
@@ -420,9 +494,13 @@
topic: serviceEvents.WORKFLOW_REPORT_NEW_RUN,
handler: reportNewWorkflowRun
},
- reportWorkflowRunState: {
- topic: serviceEvents.WORKFLOW_REPORT_RUN_STATE,
- handler: reportWorkflowRunState
+ reportWorkflowRunStatus: {
+ topic: serviceEvents.WORKFLOW_REPORT_RUN_STATUS,
+ handler: reportWorkflowRunStatus
+ },
+ reportWorkflowRunStatusBulk: {
+ topic: serviceEvents.WORKFLOW_REPORT_RUN_STATUS_BULK,
+ handler: reportWorkflowRunStatusBulk
}
};
};
@@ -444,14 +522,14 @@
return;
};
- const checkWorkflowState = (workflowId, workflowRunId) => {
+ const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
const eventrouter = require('./eventrouter.js');
let clients = eventrouter.getWorkflowManagerClients();
_.forOwn(clients, (client, _clientId) => {
let socket = client.getSocket();
if(socket) {
- socket.emit(serviceEvents.WORKFLOW_CHECK_STATE, {
+ socket.emit(serviceEvents.WORKFLOW_CHECK_STATUS, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
});
@@ -460,10 +538,29 @@
return;
};
+ const checkWorkflowRunStatusBulk = (requests) => {
+ // input is an array of
+ // {
+ // workflow_id: <workflowId>,
+ // workflow_run_id: <workflowRunId>
+ // }
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowManagerClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_CHECK_STATUS_BULK, requests);
+ }
+ });
+ return;
+ };
+
module.exports = {
serviceEvents: serviceEvents,
getRouter: getRouter,
kickstartWorkflow: kickstartWorkflow,
- checkWorkflowState: checkWorkflowState
+ checkWorkflowRunStatus: checkWorkflowRunStatus,
+ checkWorkflowRunStatusBulk: checkWorkflowRunStatusBulk
};
})();
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 4000f10..0f058cb 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -87,7 +87,6 @@
}
let result = eventrouter.updateWorkflowRunStatus(
- message.workflow_id,
message.workflow_run_id,
message.task_id,
message.status.toLowerCase()