Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add a 'req_id' optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
- Rename event names for consistency
- Separate kickstart call-back event from kickstart request
- Shorten ping/pong timeout for socket.io for fast response
- Add a 'dag_id' field to tasks in essences
- Notify event arrivals to workflow run clients to let them get events as soon as possible
- Small code refinements
Change-Id: Ibc4182027eb5e2854f1603e339fffbe76e9ba621
diff --git a/spec/clients.spec.js b/spec/clients.spec.js
index 9996944..1a40975 100644
--- a/spec/clients.spec.js
+++ b/spec/clients.spec.js
@@ -80,7 +80,7 @@
workflowRunId = message.workflow_run_id;
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
})
@@ -128,12 +128,12 @@
if(register) {
let essence = essenceLoader.loadEssence(essenceFileName, true);
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE, {
essence: essence
});
workflowManagerClient.on(
- eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE,
(workflowRegResult) => {
callback(null, workflowRegResult);
}
@@ -182,7 +182,7 @@
afterEach(function(done) {
// remove workflow run
- workflowManagerClient.emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
workflow_id: workflowId,
workflow_run_id: workflowRunId
});
@@ -325,4 +325,4 @@
}, 100);
});
});
-})();
\ No newline at end of file
+})();
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index df594fb..acf1e39 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -87,7 +87,7 @@
setTimeout(() => {
// call-back
- workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_KICKSTART, {
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
@@ -111,6 +111,14 @@
workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
receivedKickstartMessages[1].push(message);
+
+ setTimeout(() => {
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
+ workflow_id: message.workflow_id,
+ workflow_run_id: message.workflow_run_id
+ })
+ }, 1000);
});
workflowManagerClient.on('connect', () => {
@@ -124,7 +132,7 @@
let essence = essenceLoader.loadEssence(essenceFileName, true);
// register the workflow
- workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE, {
essence: essence
});
@@ -135,7 +143,7 @@
// handle return
workflowManagerClients[0].on(
- eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE,
+ eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE,
(workflowRegResult) => {
callback(null, workflowRegResult);
}
@@ -158,7 +166,7 @@
afterEach(function() {
// remove workflow runs
_.forOwn(workflowRunInfos, (workflowRunInfo) => {
- workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_RUN_REMOVE, {
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
workflow_id: workflowRunInfo.workflowId,
workflow_run_id: workflowRunInfo.workflowRunId
});
@@ -213,6 +221,8 @@
});
it('all managers should receive kickstart messages', function(done) {
+ this.timeout(5000);
+
// kickstart the workflow
probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
setTimeout(() => {
@@ -221,7 +231,7 @@
expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
});
done();
- }, 1000);
+ }, 2000);
});
it('should have only one workflow run', function(done) {
@@ -353,4 +363,4 @@
}, 2000);
});
});
-})();
\ No newline at end of file
+})();
diff --git a/spec/test_clients_workflow_essence.json b/spec/test_clients_workflow_essence.json
index bd37499..90d8bfc 100644
--- a/spec/test_clients_workflow_essence.json
+++ b/spec/test_clients_workflow_essence.json
@@ -19,6 +19,7 @@
"tasks": {
"task1": {
"class": "XOSEventSensor",
+ "dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task1",
@@ -30,6 +31,7 @@
},
"task2": {
"class": "XOSModelSensor",
+ "dag_id": "test_clients_workflow",
"dag": "dag_test_clients_workflow",
"key_field": "serialNumber",
"local_variable": "task2",
diff --git a/spec/test_multi_workflow_essence.json b/spec/test_multi_workflow_essence.json
index 572a432..7d033f5 100644
--- a/spec/test_multi_workflow_essence.json
+++ b/spec/test_multi_workflow_essence.json
@@ -27,6 +27,7 @@
"tasks": {
"must_not_be_called_handler": {
"class": "UnknownSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"local_variable": "must_not_be_called_handler",
"poke_interval": 5,
@@ -35,6 +36,7 @@
},
"onu_event_handler": {
"class": "XOSEventSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
@@ -46,6 +48,7 @@
},
"onu_model_event_handler": {
"class": "XOSModelSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_must_not_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
@@ -85,6 +88,7 @@
"tasks": {
"onu_event_handler": {
"class": "XOSEventSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_event_handler",
@@ -96,6 +100,7 @@
},
"onu_model_event_handler": {
"class": "XOSModelSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"key_field": "serialNumber",
"local_variable": "onu_model_event_handler",
@@ -107,6 +112,7 @@
},
"can_be_stuck_handler": {
"class": "UnknownSensor",
+ "dag_id": "must_not_be_called",
"dag": "dag_should_be_called",
"local_variable": "can_be_stuck_handler",
"poke_interval": 5,