Add message routing testcases and related bugfixes
- Handle XOS sensor tasks specially to route events correctly
- Add message counting API for test
- Add req_id optional field to manager request API for client-side req-res mapping
- Fix several bugs related to message routing
Change-Id: Ie18cbc63926b352bd7655797655194ece9506c6b
diff --git a/spec/eventrouter.spec.js b/spec/eventrouter.spec.js
index 6184f58..df594fb 100644
--- a/spec/eventrouter.spec.js
+++ b/spec/eventrouter.spec.js
@@ -41,6 +41,7 @@
var receivedKickstartMessages = [[],[]];
describe('Workflow kickstart test', function() {
+ this.slow(5000);
before(function() {
// Start our server
@@ -90,7 +91,7 @@
workflow_id: message.workflow_id,
workflow_run_id: message.workflow_run_id
})
- }, 2000);
+ }, 1000);
});
workflowManagerClient.on('connect', () => {
@@ -123,7 +124,9 @@
let essence = essenceLoader.loadEssence(essenceFileName, true);
// register the workflow
- workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, essence);
+ workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REG_ESSENCE, {
+ essence: essence
+ });
_.forOwn(essence, (_value, workflowId) => {
// save
@@ -164,7 +167,9 @@
// remove workflows
_.forOwn(workflowIds, (workflowId) => {
- workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, workflowId);
+ workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, {
+ workflow_id: workflowId
+ });
});
workflowIds.length = 0;
@@ -216,7 +221,7 @@
expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
});
done();
- }, 500);
+ }, 1000);
});
it('should have only one workflow run', function(done) {
@@ -230,10 +235,10 @@
// the workflow must be 'should_be_called'
expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
done();
- }, 3000);
+ }, 2000);
});
- it('should be able to read an event that is used for workflow kickstart', function(done) {
+ it('should read an event that is used for workflow kickstart', function(done) {
this.timeout(5000);
// kickstart the workflow
@@ -268,86 +273,84 @@
expect(event.message.serialNumber).to.equal('testSerialXXX');
done();
});
- }, 3000);
+ }, 2000);
});
- /*
- it('should store user details for a new connection', () => {
- const eventrouter = require('../src/controllers/eventrouter.js');
+ it('should map a workflow run using key-field', function(done) {
+ this.timeout(5000);
- const probe = eventrouter.getClients()['probe_id'];
- expect(probe.getParams().name).to.equal('probe@xos.org');
-
- const manager = eventrouter.getClients()['workflow_manager_id'];
- expect(manager.getParams().name).to.equal('manager@xos.org');
-
- const run = eventrouter.getClients()['workflow_run_id'];
- expect(run.getParams().name).to.equal('run@xos.org');
- });
-
- it('should not store the same user twice', (done) => {
- // This test case makes cleaning up process taking long time because it leaves
- // a client socket. It seems there's no way to release it from server-side.
-
- // connect a client to the server
- const client2 = io.connect(`http://localhost:${port}`, {
- query: 'id=probe_id&type=probe' +
- '&name=probe@xos.org&value=different_value'
- });
-
- // when is connected start testing
- client2.on('connect', () => {
- setTimeout(() => {
- const eventrouter = require('../src/controllers/eventrouter.js');
- expect(
- Object.keys(eventrouter.getWorkflowRunClients()).length,
- 'num of workflow run clients'
- ).to.equal(1);
- expect(
- Object.keys(eventrouter.getWorkflowManagerClients()).length,
- 'num of workflow manager clients'
- ).to.equal(1);
- expect(
- Object.keys(eventrouter.getProbeClients()).length,
- 'num of probe clients'
- ).to.equal(1);
- expect(
- Object.keys(eventrouter.getClients()).length,
- 'total num of clients'
- ).to.equal(3);
-
- done();
- }, 100);
- });
- });
-
- it('should remove a user on disconnect', (done) => {
- workflowManagerClient.disconnect();
- workflowRunClient.disconnect();
- probeClient.disconnect();
-
- // we need to wait for the event to be dispatched
+ // kickstart the workflow
+ probeClient.emit(
+ 'onu.events',
+ {serialNumber: 'testSerialXXX', other: 'test_other_field'}
+ );
+ probeClient.emit(
+ 'datamodel.AttWorkflowDriverServiceInstance',
+ {operation: 'update', serialNumber: 'testSerialXXX', other: 'updated_test_other_field'}
+ );
setTimeout(() => {
- const eventrouter = require('../src/controllers/eventrouter.js');
- expect(
- Object.keys(eventrouter.getWorkflowRunClients()).length,
- 'num of workflow run clients'
- ).to.equal(0);
- expect(
- Object.keys(eventrouter.getWorkflowManagerClients()).length,
- 'num of workflow manager clients'
- ).to.equal(0);
- expect(
- Object.keys(eventrouter.getProbeClients()).length,
- 'num of probe clients'
- ).to.equal(0);
- expect(
- Object.keys(eventrouter.getClients()).length,
- 'total num of clients'
- ).to.equal(0);
- done();
- }, 100);
+ // kickstart will take 2 seconds roughly
+ expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
+ // the workflow must be 'should_be_called'
+ expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
+
+ // connect a workflow run client to the server
+ let workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id1&type=workflow_run' +
+ `&workflow_id=${workflowRunInfos[0].workflowId}` +
+ `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
+ '&name=run1@xos.org'
+ });
+ workflowRunClients.push(workflowRunClient);
+
+ workflowRunClient.on('connect', () => {
+ // check message counts
+ workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, {
+ workflow_id: workflowRunInfos[0].workflowId,
+ workflow_run_id: workflowRunInfos[0].workflowRunId
+ });
+ });
+
+ let eventRaised = 0;
+ workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, (result) => {
+ let count = result.result;
+ expect(count, 'number of events queued').to.equal(2);
+
+ // fetch two events
+ workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+ workflow_id: workflowRunInfos[0].workflowId,
+ workflow_run_id: workflowRunInfos[0].workflowRunId,
+ task_id: 'onu_event_handler',
+ topic: 'onu.events'
+ });
+ eventRaised++;
+
+ workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
+ workflow_id: workflowRunInfos[0].workflowId,
+ workflow_run_id: workflowRunInfos[0].workflowRunId,
+ task_id: 'onu_model_event_handler',
+ topic: 'datamodel.AttWorkflowDriverServiceInstance'
+ });
+ eventRaised++;
+ });
+
+ workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
+ let event = result.result;
+ if(eventRaised === 2) {
+ expect(event.topic).to.equal('onu.events');
+ expect(event.message.serialNumber).to.equal('testSerialXXX');
+ }
+ else if(eventRaised === 1) {
+ expect(event.topic).to.equal('datamodel.AttWorkflowDriverServiceInstance');
+ expect(event.message.serialNumber).to.equal('testSerialXXX');
+ }
+ eventRaised--;
+
+ if(eventRaised === 0) {
+ done();
+ }
+ });
+ }, 2000);
});
- */
});
})();
\ No newline at end of file