Remove task status tracking
Events are not going to be routed as per status of tasks anymore
Change-Id: Ib9c714d84fbb0052f92a40ac7674c2a2b0ce5313
diff --git a/package.json b/package.json
index 1ec5d68..b43adb4 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "cord_workflow_controller",
- "version": "0.4.1",
+ "version": "0.5.0",
"description": "CORD Workflow Controller",
"main": "src/server.js",
"scripts": {
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 334c08b..548bc48 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -176,17 +176,6 @@
});
};
- const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
- if(!(workflowRunId in workflowRuns)) {
- logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
- return false;
- }
-
- let workflowRun = workflowRuns[workflowRunId];
- workflowRun.updateTaskStatus(taskId, status);
- return true;
- };
-
const setWorkflowRunKickstarted = (workflowRunId) => {
if(!(workflowRunId in workflowRuns)) {
logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
@@ -295,10 +284,6 @@
};
const emitEvent = (topic, message) => {
- // list of workflowIds
- // to check if there are workflow runs for the events
- let workflowIdsRunning = [];
-
logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
// route event to running instances
@@ -306,17 +291,22 @@
let workflowId = workflowRun.getWorkflowId();
let workflow = workflows[workflowId];
- // event will be routed to workflow runs that meet following criteria
- // 1) the workflow is currently interested in the same topic
- // (already finished tasks are not counted)
- // 2) the task's key field and value
- if(workflowRun.isEventAcceptable(workflow, topic, message)) {
- //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
- logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
- workflowRun.enqueueEvent(topic, message);
+ if(workflow.isEventAcceptable(topic)) {
+ logger.log('debug', `workflow ${workflowId} accept the event : topic ${topic}`);
- if(!workflowIdsRunning.includes(workflowId)) {
- workflowIdsRunning.push(workflowId);
+ // event is acceped if event has
+ // the same key field and its value as workflow_run
+ if(workflowRun.isEventAcceptable(topic, message)) {
+ logger.log('debug', `workflow run ${workflowRunId} accept the event : \
+ topic ${topic}, message ${JSON.stringify(message)}`);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+
+ logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
+ }
+ else {
+ logger.log('debug', `workflow run ${workflowRunId} reject the event : \
+ topic ${topic}, message ${JSON.stringify(message)}`);
}
}
});
@@ -324,25 +314,21 @@
// check if the event is a kickstart event
_.forOwn(workflows, (workflow, workflowId) => {
if(workflow.isKickstartTopic(topic)) {
- // check if there is a workflow run for the event
- // kickstart a workflow if there is no workflows runs for the event
- if(!workflowIdsRunning.includes(workflowId)) {
- // we need to buffer the event until workflow run is brought up
- let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
- workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
+ // we need to buffer the event until workflow run is brought up
+ let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
+ workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
- let workflowRunId = workflowRun.getId();
+ let workflowRunId = workflowRun.getId();
- // register for management
- workflowRuns[workflowRunId] = workflowRun;
+ // register for management
+ workflowRuns[workflowRunId] = workflowRun;
- // route event
- logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
- workflowRun.enqueueEvent(topic, message);
+ // route event
+ logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
+ workflowRun.enqueueEvent(topic, message);
- // KICKSTART!
- kickstart(workflowId, workflowRunId);
- }
+ // KICKSTART!
+ kickstart(workflowId, workflowRunId);
}
});
@@ -692,7 +678,6 @@
checkWorkflowRun: checkWorkflowRun,
removeWorkflowRun: removeWorkflowRun,
clearWorkflowRuns: clearWorkflowRuns,
- updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
setWorkflowRunStatus: setWorkflowRunStatus
};
diff --git a/src/controllers/ws_workflowrun.js b/src/controllers/ws_workflowrun.js
index 0f058cb..c947bb9 100644
--- a/src/controllers/ws_workflowrun.js
+++ b/src/controllers/ws_workflowrun.js
@@ -23,84 +23,12 @@
let serviceEvents = {
// workflow_run -> controller -> workflow_run
- WORKFLOW_RUN_UPDATE_STATUS: 'cord.workflow.ctlsvc.workflow.run.status',
WORKFLOW_RUN_COUNT_EVENTS: 'cord.workflow.ctlsvc.workflow.run.count',
WORKFLOW_RUN_FETCH_EVENT: 'cord.workflow.ctlsvc.workflow.run.fetch',
// controller -> workflow_run
WORKFLOW_RUN_NOTIFY_EVENT: 'cord.workflow.ctlsvc.workflow.run.notify'
};
- // WebSocket interface for workflow status update
- // Message format:
- // {
- // topic: 'cord.workflow.ctlsvc.workflow.run.status',
- // message: {
- // req_id: <req_id>, // optional
- // workflow_id: <workflow_id>,
- // workflow_run_id: <workflow_run_id>,
- // task_id: <task_id>,
- // status: 'begin' or 'end'
- // }
- // }
- const updateWorkflowRunStatus = (topic, message, cb) => {
- const eventrouter = require('./eventrouter.js');
-
- let errorMessage;
- if(!message) {
- // error
- errorMessage = `Message body for topic ${topic} is null or empty`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_id' in message)) {
- // error
- errorMessage = `workflow_id field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('workflow_run_id' in message)) {
- // error
- errorMessage = `workflow_run_id field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('task_id' in message)) {
- // error
- errorMessage = `task_id field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- if(!('status' in message)) {
- // error
- errorMessage = `status field is not in message body - ${message}`;
- logger.log('warn', `Return error - ${errorMessage}`);
- cb(errorMessage, false);
- return;
- }
-
- let result = eventrouter.updateWorkflowRunStatus(
- message.workflow_run_id,
- message.task_id,
- message.status.toLowerCase()
- );
- if(!result) {
- errorMessage = `failed to update workflow run status ${message.workflow_run_id}`;
- cb(errorMessage, false);
- }
- else {
- cb(null, true);
- }
- return;
- };
-
// WebSocket interface for counting queued events
// Message format:
// {
@@ -220,10 +148,6 @@
const getRouter = () => {
return {
- updateWorkflowRunStatus: {
- topic: serviceEvents.WORKFLOW_RUN_UPDATE_STATUS,
- handler: updateWorkflowRunStatus
- },
countQueuedEvents: {
topic: serviceEvents.WORKFLOW_RUN_COUNT_EVENTS,
handler: countQueuedEvents
diff --git a/src/types/workflow.js b/src/types/workflow.js
index 11ace2f..8ced05b 100644
--- a/src/types/workflow.js
+++ b/src/types/workflow.js
@@ -121,7 +121,6 @@
getTopics() {
let allTopics = [];
_.forOwn(this.topics, (_tasks, topic) => {
- // value is an array
if(!allTopics.includes(topic)) {
allTopics.push(topic);
}
@@ -129,6 +128,20 @@
return allTopics;
}
+ isEventAcceptable(topic) {
+ for(let key in this.topics) {
+ if (!this.topics.hasOwnProperty(key)) {
+ continue;
+ }
+
+ if(key === topic) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
getTasksForTopic(topic) {
if(topic in this.topics) {
let workflowTasks = this.topics[topic];
diff --git a/src/types/workflowrun.js b/src/types/workflowrun.js
index 8975490..5666dbb 100644
--- a/src/types/workflowrun.js
+++ b/src/types/workflowrun.js
@@ -20,7 +20,6 @@
const _ = require('lodash');
const dateformat = require('dateformat');
- const WorkflowRunTask = require('./workflowruntask.js');
const logger = require('../config/logger.js');
class WorkflowRun {
@@ -30,11 +29,6 @@
// workflow id
this.workflowId = workflowId;
- // workflow run tasks - for storing status
- // id: task id
- // value : workflow run task obj
- this.runTasks = {};
-
// storing key-field, key-value pairs for <event, workflow run> mapping
// key: topic
// value: [{
@@ -73,11 +67,7 @@
let workflowRun = new WorkflowRun(workflowId, workflowRunId);
let tasks = workflow.getTasks();
- _.forOwn(tasks, (task, taskId) => {
- // set run tasks
- let runTask = new WorkflowRunTask.WorkflowRunTask(taskId);
- workflowRun.addRunTask(runTask);
-
+ _.forOwn(tasks, (task, _taskId) => {
// set key_field / value
if(task.isCORDTask()) {
workflowRun.setEventKeyFieldValue(task.getTopic(), task.getKeyField(), null); // init
@@ -102,26 +92,6 @@
return this.workflowId;
}
- addRunTask(runTask) {
- this.runTasks[runTask.getTaskId()] = runTask;
- }
-
- getRunTask(taskId) {
- if(taskId in this.runTasks) {
- return this.runTasks[taskId];
- }
- return undefined;
- }
-
- getTaskStatus(taskId) {
- return this.runTasks[taskId].getStatus();
- }
-
- updateTaskStatus(taskId, status) {
- let runTask = this.runTasks[taskId];
- runTask.setStatus(status);
- }
-
setEventKeyFieldValue(topic, field, value=null) {
let keyFieldValues;
if(!(topic in this.eventKeyFieldValues)) {
@@ -217,113 +187,10 @@
return false;
}
- getFilteredRunTasks(includes, excludes) {
- // returns tasks with filters
- let includeStatuses=[];
- let excludeStatuses=[];
- let includeAll = false;
-
- if(includes) {
- if(Array.isArray(includes)) {
- // array
- includes.forEach((include) => {
- if(!includeStatuses.includes(include)) {
- includeStatuses.push(include);
- }
- });
- }
- else {
- includeStatuses.push(includes);
- }
- }
- else {
- // undefined or null
- // include all
- includeAll = true;
- }
-
- if(excludes) {
- if(Array.isArray(excludes)) {
- // array
- excludes.forEach((exclude) => {
- if(!excludeStatuses.includes(exclude)) {
- excludeStatuses.push(exclude);
- }
- });
- }
- else {
- excludeStatuses.push(excludes);
- }
- }
- else {
- // in this case, nothing will be excluded
- // leave the array empty
- }
-
- let filteredRunTasks = [];
- _.forOwn(this.runTasks, (runTask, _runTaskId) => {
- // 'excludes' has a higher priority than 'includes'
- if(!excludes.includes(runTask.getStatus())) {
- if(includeAll || includes.includes(runTask.getStatus())) {
- // screen tasks that are not finished
- filteredRunTasks.push(runTask);
- }
- }
- });
- return filteredRunTasks;
- }
-
- getFilteredTopics(workflow, includes, excludes) {
- // returns topics with filters
- let filteredRunTasks = this.getFilteredRunTasks(includes, excludes);
- let filteredTopics = [];
-
- filteredRunTasks.forEach((runTask) => {
- let taskId = runTask.getTaskId();
- let task = workflow.getTask(taskId);
- let topic = task.getTopic();
- if(!filteredTopics.includes(topic)) {
- filteredTopics.push(topic);
- }
- });
- return filteredTopics;
- }
-
- getAllTopics(workflow) {
- return this.getFilteredTopics(workflow, null, null);
- }
-
- getAcceptableTopics(workflow) {
- // return topics for tasks that are running or to be run in the future
- // include all tasks that are not ended
- return this.getFilteredTopics(workflow, null, [WorkflowRunTask.TaskStatus.END]);
- }
-
- isTopicAcceptable(workflow, topic) {
- // get topics of tasks that are not completed yet
- let filteredTopics = this.getFilteredTopics(
- workflow,
- null,
- [WorkflowRunTask.TaskStatus.END]
- );
-
- if(filteredTopics.includes(topic)) {
- return true;
- }
- else {
- return false;
- }
- }
-
- isEventAcceptable(workflow, topic, message) {
- // event is acceptable if it meets following criteria
- // 1) the workflow is currently interested in the same topic
- // (finished tasks are not counted)
- // 2) the task's key field and value
- if(this.isTopicAcceptable(workflow, topic) &&
- this.isEventAcceptableByKeyFieldValue(topic, message)) {
- // update key-field values for my topic
- this.updateEventKeyFieldValueFromMessage(topic, message);
+ isEventAcceptable(topic, message) {
+ // event is acceped if event has
+ // the same key field and its value as workflow_run
+ if(this.isEventAcceptableByKeyFieldValue(topic, message)) {
return true;
}
diff --git a/src/types/workflowruntask.js b/src/types/workflowruntask.js
deleted file mode 100644
index fbaa604..0000000
--- a/src/types/workflowruntask.js
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Copyright 2019-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-
-(function () {
- 'use strict';
-
- const logger = require('../config/logger.js');
-
- const TaskStatus = {
- INIT: 'init',
- BEGIN: 'begin',
- END: 'end',
- UNKNOWN: 'unknown'
- };
-
- class WorkflowRunTask {
- constructor(taskId) {
- this.taskId = taskId;
- this.status = TaskStatus.UNKNOWN;
- }
-
- static parseStatus(strTaskStatus) {
- if(!strTaskStatus) {
- return TaskStatus.UNKNOWN;
- }
- else if(['i', 'init'].includes(strTaskStatus.toLowerCase())) {
- return TaskStatus.END;
- }
- else if(['b', 'begin', 'start'].includes(strTaskStatus.toLowerCase())) {
- return TaskStatus.BEGIN;
- }
- else if(['e', 'end', 'finish'].includes(strTaskStatus.toLowerCase())) {
- return TaskStatus.END;
- }
- else {
- return TaskStatus.UNKNOWN;
- }
- }
-
- setTaskId(id) {
- this.taskId = id;
- }
-
- getTaskId() {
- return this.taskId;
- }
-
- setStatus(status) {
- let taskStatus = WorkflowRunTask.parseStatus(status);
- this.status = taskStatus;
- }
-
- getStatus() {
- return this.status;
- }
-
- validate() {
- if(!this.taskId) {
- logger.log('error', 'id is not given');
- return false;
- }
-
- if(!this.status) {
- logger.log('error', 'status is not given');
- return false;
- }
-
- return true;
- }
- }
-
- module.exports = {
- TaskStatus: TaskStatus,
- WorkflowRunTask: WorkflowRunTask
- };
-})();
\ No newline at end of file