Implement bulk status update of workflow runs
Change-Id: I6b67048f502d42a2572a936944c9e54300076478
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 4d9ee47..334c08b 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -45,6 +45,19 @@
GREETING: 'cord.workflow.ctlsvc.greeting'
};
+ setInterval(function () {
+ let requests = [];
+ _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
+ let obj = {
+ workflow_id: workflowRun.getWorkflowId(),
+ workflow_run_id: workflowRunId
+ };
+ requests.push(obj);
+ });
+
+ checkWorkflowRunStatusBulk(requests);
+ }, 5000);
+
// add ws_probe events
_.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
serviceEvents[key] = wsServiceEvent;
@@ -185,13 +198,13 @@
return true;
};
- const setWorkflowRunState = (workflowRunId, state) => {
+ const setWorkflowRunStatus = (workflowRunId, status) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
return false;
}
- if(state in ['success', 'failed', 'end']) {
+ if(status in ['success', 'failed', 'end']) {
removeWorkflowRun(workflowRunId);
}
return true;
@@ -212,6 +225,31 @@
return true;
};
+ /*
+ const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
+ if(!(workflowId in workflows)) {
+ logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
+ return false;
+ }
+
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
+ return true;
+ };
+ */
+
+ const checkWorkflowRunStatusBulk = (requests) => {
+ if(requests) {
+ ws_manager.checkWorkflowRunStatusBulk(requests);
+ return true;
+ }
+ return false;
+ };
+
const removeWorkflow = (workflowId) => {
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
@@ -391,6 +429,8 @@
let router = ws_probe.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
+ logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
@@ -450,6 +490,8 @@
let router = ws_manager.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
+ logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
@@ -533,6 +575,8 @@
let router = ws_workflowrun.getRouter();
_.forOwn(router, (routerElem, _key) => {
socket.on(routerElem.topic, (msg) => {
+ logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
+
// handle a common parameter - req_id
// when we get req_id, return the same req_id in response.
// this is to help identify a request from a response at client-side
@@ -650,6 +694,6 @@
clearWorkflowRuns: clearWorkflowRuns,
updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
- setWorkflowRunState: setWorkflowRunState
+ setWorkflowRunStatus: setWorkflowRunStatus
};
})();