Remove task status tracking
Events are not going to be routed as per status of tasks anymore
Change-Id: Ib9c714d84fbb0052f92a40ac7674c2a2b0ce5313
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 334c08b..548bc48 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -176,17 +176,6 @@
});
};
- const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
- if(!(workflowRunId in workflowRuns)) {
- logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
- return false;
- }
-
- let workflowRun = workflowRuns[workflowRunId];
- workflowRun.updateTaskStatus(taskId, status);
- return true;
- };
-
const setWorkflowRunKickstarted = (workflowRunId) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
@@ -295,10 +284,6 @@
};
const emitEvent = (topic, message) => {
- // list of workflowIds
- // to check if there are workflow runs for the events
- let workflowIdsRunning = [];
-
logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
// route event to running instances
@@ -306,17 +291,22 @@
let workflowId = workflowRun.getWorkflowId();
let workflow = workflows[workflowId];
- // event will be routed to workflow runs that meet following criteria
- // 1) the workflow is currently interested in the same topic
- // (already finished tasks are not counted)
- // 2) the task's key field and value
- if(workflowRun.isEventAcceptable(workflow, topic, message)) {
- //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
- logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
- workflowRun.enqueueEvent(topic, message);
+ if(workflow.isEventAcceptable(topic)) {
+ logger.log('debug', `workflow ${workflowId} accept the event : topic ${topic}`);
- if(!workflowIdsRunning.includes(workflowId)) {
- workflowIdsRunning.push(workflowId);
+ // event is acceped if event has
+ // the same key field and its value as workflow_run
+ if(workflowRun.isEventAcceptable(topic, message)) {
+ logger.log('debug', `workflow run ${workflowRunId} accept the event : \
+ topic ${topic}, message ${JSON.stringify(message)}`);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+ }
+ else {
+ logger.log('debug', `workflow run ${workflowRunId} reject the event : \
+ topic ${topic}, message ${JSON.stringify(message)}`);
}
}
});
@@ -324,25 +314,21 @@
// check if the event is a kickstart event
_.forOwn(workflows, (workflow, workflowId) => {
if(workflow.isKickstartTopic(topic)) {
- // check if there is a workflow run for the event
- // kickstart a workflow if there is no workflows runs for the event
- if(!workflowIdsRunning.includes(workflowId)) {
- // we need to buffer the event until workflow run is brought up
- let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
- workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+ // we need to buffer the event until workflow run is brought up
+ let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
- let workflowRunId = workflowRun.getId();
+ let workflowRunId = workflowRun.getId();
- // register for management
- workflowRuns[workflowRunId] = workflowRun;
+ // register for management
+ workflowRuns[workflowRunId] = workflowRun;
- // route event
- logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
- workflowRun.enqueueEvent(topic, message);
+ // route event
+ logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
- // KICKSTART!
- kickstart(workflowId, workflowRunId);
- }
+ // KICKSTART!
+ kickstart(workflowId, workflowRunId);
}
});
@@ -692,7 +678,6 @@
checkWorkflowRun: checkWorkflowRun,
removeWorkflowRun: removeWorkflowRun,
clearWorkflowRuns: clearWorkflowRuns,
- updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
setWorkflowRunStatus: setWorkflowRunStatus
};
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 0f058cb..c947bb9 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -23,84 +23,12 @@
let serviceEvents = {
// workflow_run -> controller -> workflow_run
- WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch',
// controller -> workflow_run
WORKFLOW_RUN_NOTIFY_EVENT: 'cord.workflow.ctlsvc.workflow.run.notify'
};
- // WebSocket interface for workflow status update
- // Message format:
- // {
- // topic: 'cord.workflow.ctlsvc.workflow.run.status',
- // message: {
- // req_id: <req_id>, // optional
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>,
- // task_id: <task_id>,
- // status: 'begin' or 'end'
- // }
- // }
- const updateWorkflowRunStatus = (topic, message, cb) => {
- const eventrouter = require('./eventrouter.js');
-
- let errorMessage;
- if(!message) {
- // error
- errorMessage = `Message body for topic ${topic} is null or empty`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_id' in message)) {
- // error
- errorMessage = `workflow_id field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_run_id' in message)) {
- // error
- errorMessage = `workflow_run_id field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('task_id' in message)) {
- // error
- errorMessage = `task_id field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('status' in message)) {
- // error
- errorMessage = `status field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- let result = eventrouter.updateWorkflowRunStatus(
- message.workflow_run_id,
- message.task_id,
- message.status.toLowerCase()
- );
- if(!result) {
- errorMessage = `failed to update workflow run status ${message.workflow_run_id}`;
- cb(errorMessage, false);
- }
- else {
- cb(null, true);
- }
- return;
- };
-
// WebSocket interface for counting queued events
// Message format:
// {
@@ -220,10 +148,6 @@
const getRouter = () => {
return {
- updateWorkflowRunStatus: {
- topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
- handler: updateWorkflowRunStatus
- },
countQueuedEvents: {
topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
handler: countQueuedEvents