Reflect changes on Airflow sensor/operator and essence
Fix spontaneous failures of test cases due to delayed client disconnection
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs
Bump up version
Change-Id: I4fe25ec504751c6ea7a196c56ee4d157bab35abd
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index acf1e39..0b2a217 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -40,7 +40,7 @@
var receivedKickstartMessages = [[],[]];
- describe('Workflow kickstart test', function() {
+ describe('Event Router test', function() {
this.slow(5000);
before(function() {
@@ -70,12 +70,12 @@
(callback) => {
// connect first workflow manager to the server
// this manager will kickstart a workflow
- let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ let workflowManagerClient1 = io.connect(`http://localhost:${port}`, {
query: 'id=workflow_manager_id1&type=workflow_manager' +
'&name=manager1@xos.org'
});
- workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowManagerClient1.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
// save it for check
receivedKickstartMessages[0].push(message);
@@ -87,45 +87,45 @@
setTimeout(() => {
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflowManagerClient1.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
}, 1000);
});
- workflowManagerClient.on('connect', () => {
+ workflowManagerClient1.on('connect', () => {
callback(null, true);
});
- workflowManagerClients.push(workflowManagerClient);
+ workflowManagerClients.push(workflowManagerClient1);
return;
},
(callback) => {
// connect second workflow manager to the server
// this manager will not kickstart a workflow
- let workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ let workflowManagerClient2 = io.connect(`http://localhost:${port}`, {
query: 'id=workflow_manager_id2&type=workflow_manager' +
'&name=manager2@xos.org'
});
- workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowManagerClient2.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
receivedKickstartMessages[1].push(message);
setTimeout(() => {
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflowManagerClient2.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
}, 1000);
});
- workflowManagerClient.on('connect', () => {
+ workflowManagerClient2.on('connect', () => {
callback(null, true);
});
- workflowManagerClients.push(workflowManagerClient);
+ workflowManagerClients.push(workflowManagerClient2);
return;
},
(callback) => {
@@ -163,7 +163,9 @@
return;
});
- afterEach(function() {
+ afterEach(function(done) {
+ this.timeout(5000);
+
// remove workflow runs
_.forOwn(workflowRunInfos, (workflowRunInfo) => {
workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
@@ -205,6 +207,11 @@
probeClient.disconnect();
}
probeClient = null;
+
+ setTimeout(() => {
+ // this gives enough time to complete disconnection for clients
+ done();
+ }, 2000);
});
it('should have two workflows', function(done) {
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
index 90d8bfc..2d49877 100644
--- a/spec/test_clients_workflow_essence.json
+++ b/spec/test_clients_workflow_essence.json
@@ -18,27 +18,25 @@
},
"tasks": {
"task1": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task1",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "task1_func",
"task_id": "task1",
"topic": "onu.events"
},
"task2": {
- "class": "XOSModelSensor",
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task2",
"model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "task2_func",
"task_id": "task2"
}
}
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
index 7d033f5..46b11c1 100644
--- a/spec/test_multi_workflow_essence.json
+++ b/spec/test_multi_workflow_essence.json
@@ -31,31 +31,28 @@
"dag": "dag_must_not_be_called",
"local_variable": "must_not_be_called_handler",
"poke_interval": 5,
- "provide_context": true,
"task_id": "must_not_be_called_handler"
},
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
"task_id": "onu_event_handler",
"topic": "onu.events"
},
"onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
"model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
"task_id": "onu_model_event_handler"
}
}
@@ -87,27 +84,25 @@
},
"tasks": {
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
"task_id": "onu_event_handler",
"topic": "onu.events"
},
"onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
"model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
"task_id": "onu_model_event_handler"
},
"can_be_stuck_handler": {
@@ -116,7 +111,6 @@
"dag": "dag_should_be_called",
"local_variable": "can_be_stuck_handler",
"poke_interval": 5,
- "provide_context": true,
"task_id": "can_be_stuck_handler"
}
}
diff --git a/spec/clients.spec.js b/spec/websocket_clients.spec.js
similarity index 99%
rename from spec/clients.spec.js
rename to spec/websocket_clients.spec.js
index 1a40975..b1f6333 100644
--- a/spec/clients.spec.js
+++ b/spec/websocket_clients.spec.js
@@ -80,7 +80,7 @@
workflowRunId = message.workflow_run_id;
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
})