Reflect changes on Airflow sensor/operator and essence
Fix spontaneous failures of test cases due to delayed client disconnection
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs
Bump up version
Change-Id: I4fe25ec504751c6ea7a196c56ee4d157bab35abd
diff --git a/package.json b/package.json
index 676fcb1..fe064f5 100644
--- a/package.json
+++ b/package.json
@@ -1,6 +1,6 @@
{
"name": "cord_workflow_controller",
- "version": "0.1.2",
+ "version": "0.2.0",
"description": "CORD Workflow Controller",
"main": "src/server.js",
"scripts": {
@@ -45,4 +45,4 @@
"mocha-multi-reporters": "^1.1.7",
"mocha-junit-reporter": "^1.23.0"
}
- }
\ No newline at end of file
+ }
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index acf1e39..0b2a217 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -40,7 +40,7 @@
var receivedKickstartMessages = [[],[]];
- describe('Workflow kickstart test', function() {
+ describe('Event Router test', function() {
this.slow(5000);
before(function() {
@@ -70,12 +70,12 @@
(callback) => {
// connect first workflow manager to the server
// this manager will kickstart a workflow
- let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ let workflowManagerClient1 = io.connect(`http://localhost:${port}`, {
query: 'id=workflow_manager_id1&type=workflow_manager' +
'&name=manager1@xos.org'
});
- workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowManagerClient1.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
// save it for check
receivedKickstartMessages[0].push(message);
@@ -87,45 +87,45 @@
setTimeout(() => {
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflowManagerClient1.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
}, 1000);
});
- workflowManagerClient.on('connect', () => {
+ workflowManagerClient1.on('connect', () => {
callback(null, true);
});
- workflowManagerClients.push(workflowManagerClient);
+ workflowManagerClients.push(workflowManagerClient1);
return;
},
(callback) => {
// connect second workflow manager to the server
// this manager will not kickstart a workflow
- let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ let workflowManagerClient2 = io.connect(`http://localhost:${port}`, {
query: 'id=workflow_manager_id2&type=workflow_manager' +
'&name=manager2@xos.org'
});
- workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowManagerClient2.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
receivedKickstartMessages[1].push(message);
setTimeout(() => {
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflowManagerClient2.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
}, 1000);
});
- workflowManagerClient.on('connect', () => {
+ workflowManagerClient2.on('connect', () => {
callback(null, true);
});
- workflowManagerClients.push(workflowManagerClient);
+ workflowManagerClients.push(workflowManagerClient2);
return;
},
(callback) => {
@@ -163,7 +163,9 @@
return;
});
- afterEach(function() {
+ afterEach(function(done) {
+ this.timeout(5000);
+
// remove workflow runs
_.forOwn(workflowRunInfos, (workflowRunInfo) => {
workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
@@ -205,6 +207,11 @@
probeClient.disconnect();
}
probeClient = null;
+
+ setTimeout(() => {
+ // this gives enough time to complete disconnection for clients
+ done();
+ }, 2000);
});
it('should have two workflows', function(done) {
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
index 90d8bfc..2d49877 100644
--- a/spec/test_clients_workflow_essence.json
+++ b/spec/test_clients_workflow_essence.json
@@ -18,27 +18,25 @@
},
"tasks": {
"task1": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task1",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "task1_func",
"task_id": "task1",
"topic": "onu.events"
},
"task2": {
- "class": "XOSModelSensor",
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task2",
"model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "task2_func",
"task_id": "task2"
}
}
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
index 7d033f5..46b11c1 100644
--- a/spec/test_multi_workflow_essence.json
+++ b/spec/test_multi_workflow_essence.json
@@ -31,31 +31,28 @@
"dag": "dag_must_not_be_called",
"local_variable": "must_not_be_called_handler",
"poke_interval": 5,
- "provide_context": true,
"task_id": "must_not_be_called_handler"
},
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
"task_id": "onu_event_handler",
"topic": "onu.events"
},
"onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
"model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
"task_id": "onu_model_event_handler"
}
}
@@ -87,27 +84,25 @@
},
"tasks": {
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
"task_id": "onu_event_handler",
"topic": "onu.events"
},
"onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
"model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
"task_id": "onu_model_event_handler"
},
"can_be_stuck_handler": {
@@ -116,7 +111,6 @@
"dag": "dag_should_be_called",
"local_variable": "can_be_stuck_handler",
"poke_interval": 5,
- "provide_context": true,
"task_id": "can_be_stuck_handler"
}
}
diff --git a/spec/clients.spec.js b/spec/websocket_clients.spec.js
similarity index 99%
rename from spec/clients.spec.js
rename to spec/websocket_clients.spec.js
index 1a40975..b1f6333 100644
--- a/spec/clients.spec.js
+++ b/spec/websocket_clients.spec.js
@@ -80,7 +80,7 @@
workflowRunId = message.workflow_run_id;
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
})
diff --git a/src/controllers/eventrouter.js b/src/controllers/eventrouter.js
index 975db2e..4919669 100644
--- a/src/controllers/eventrouter.js
+++ b/src/controllers/eventrouter.js
@@ -179,6 +179,18 @@
return true;
};
+ const setWorkflowRunState = (workflowRunId, state) => {
+ if(!(workflowRunId in workflowRuns)) {
+ logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
+ return false;
+ }
+
+ if(state in ['success', 'failed', 'end']) {
+ removeWorkflowRun(workflowRunId);
+ }
+ return true;
+ };
+
const kickstart = (workflowId, workflowRunId) => {
if(!(workflowId in workflows)) {
logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
@@ -191,6 +203,7 @@
}
ws_manager.kickstartWorkflow(workflowId, workflowRunId);
+ return true;
};
const removeWorkflow = (workflowId) => {
@@ -608,5 +621,6 @@
clearWorkflowRuns: clearWorkflowRuns,
updateWorkflowRunStatus: updateWorkflowRunStatus,
setWorkflowRunKickstarted: setWorkflowRunKickstarted,
+ setWorkflowRunState: setWorkflowRunState
};
})();
diff --git a/src/controllers/ws_manager.js b/src/controllers/ws_manager.js
index 3d81bac..752ee88 100644
--- a/src/controllers/ws_manager.js
+++ b/src/controllers/ws_manager.js
@@ -31,9 +31,11 @@
WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
- WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ WORKFLOW_REPORT_NEW_RUN: 'cord.workflow.ctlsvc.workflow.report_new_run',
+ WORKFLOW_REPORT_RUN_STATE: 'cord.workflow.ctlsvc.workflow.report_run_state',
// controller -> manager
- WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
+ WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
+ WORKFLOW_CHECK_STATE: 'cord.workflow.ctlsvc.workflow.check.state'
};
// WebSocket interface for workflow registration
@@ -233,7 +235,7 @@
let result = eventrouter.removeWorkflow(workflowId);
cb(null, result);
return;
- }
+ };
// WebSocket interface for workflow run removal
// Message format:
@@ -278,19 +280,19 @@
let result = eventrouter.removeWorkflowRun(workflowRunId);
cb(null, result);
return;
- }
+ };
- // WebSocket interface for notifying a new workflow run
+ // WebSocket interface for reporting a new workflow run
// Message format:
// {
- // topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
+ // topic: 'cord.workflow.ctlsvc.workflow.report_new_run',
// message: {
// req_id: <req_id> // optional
// workflow_id: <workflow_id>,
// workflow_run_id: <workflow_run_id>
// }
// }
- const notifyNewWorkflowRun = (topic, message, cb) => {
+ const reportNewWorkflowRun = (topic, message, cb) => {
const eventrouter = require('./eventrouter.js');
let errorMessage;
@@ -325,6 +327,63 @@
let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
cb(null, result);
return;
+ };
+
+ // WebSocket interface for reporting workflow run state
+ // Message format:
+ // {
+ // topic: 'cord.workflow.ctlsvc.workflow.report_run_state',
+ // message: {
+ // req_id: <req_id> // optional
+ // workflow_id: <workflow_id>,
+ // workflow_run_id: <workflow_run_id>,
+ // state: one of ['success', 'running', 'failed', 'unknown']
+ // }
+ // }
+ const reportWorkflowRunState = (topic, message, cb) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let errorMessage;
+ if(!message) {
+ // error
+ errorMessage = `Message body for topic ${topic} is null or empty`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('workflow_run_id' in message)) {
+ // error
+ errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ if(!('state' in message)) {
+ // error
+ errorMessage = `field 'state' does not exist in message body - ${JSON.stringify(message)}`;
+ logger.log('warn', `Return error - ${errorMessage}`);
+ cb(errorMessage, false);
+ return;
+ }
+
+ let workflowRunId = message.workflow_run_id;
+ let state = message.state;
+
+ // there must be a workflow matching
+ // set workflow state
+ let result = eventrouter.setWorkflowRunState(workflowRunId, state);
+ cb(null, result);
+ return;
}
const getRouter = () => {
@@ -357,9 +416,13 @@
topic: serviceEvents.WORKFLOW_REMOVE_RUN,
handler: removeWorkflowRun
},
- notifyNewWorkflowRun: {
- topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
- handler: notifyNewWorkflowRun
+ reportNewWorkflowRun: {
+ topic: serviceEvents.WORKFLOW_REPORT_NEW_RUN,
+ handler: reportNewWorkflowRun
+ },
+ reportWorkflowRunState: {
+ topic: serviceEvents.WORKFLOW_REPORT_RUN_STATE,
+ handler: reportWorkflowRunState
}
};
};
@@ -381,9 +444,26 @@
return;
};
+ const checkWorkflowState = (workflowId, workflowRunId) => {
+ const eventrouter = require('./eventrouter.js');
+
+ let clients = eventrouter.getWorkflowManagerClients();
+ _.forOwn(clients, (client, _clientId) => {
+ let socket = client.getSocket();
+ if(socket) {
+ socket.emit(serviceEvents.WORKFLOW_CHECK_STATE, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+ }
+ });
+ return;
+ };
+
module.exports = {
serviceEvents: serviceEvents,
getRouter: getRouter,
- kickstartWorkflow: kickstartWorkflow
+ kickstartWorkflow: kickstartWorkflow,
+ checkWorkflowState: checkWorkflowState
};
})();
diff --git a/src/types/workflowtask.js b/src/types/workflowtask.js
index 8e18700..9216a92 100644
--- a/src/types/workflowtask.js
+++ b/src/types/workflowtask.js
@@ -21,7 +21,6 @@
const logger = require('../config/logger.js');
const CORD_SENSORS = [
- 'XOSEventSensor', 'XOSModelSensor',
'CORDEventSensor', 'CORDModelSensor'
];
@@ -133,7 +132,7 @@
return false;
}
- // general Airflow operators other than XOS operators don't have these fields.
+ // general Airflow operators other than XOS/CORD operators don't have these fields.
//
// if(!this.topic) {
// logger.log('error', 'topic is not given');
diff --git a/src/workflows/hello_workflow.json b/src/workflows/hello_workflow.json
index 9de71bc..4d86250 100644
--- a/src/workflows/hello_workflow.json
+++ b/src/workflows/hello_workflow.json
@@ -9,14 +9,13 @@
},
"tasks": {
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "hello_workflow",
"dag": "dag_hello",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
"task_id": "onu_event_handler",
"topic": "onu.events"
}