Reflect changes on Airflow sensor/operator and essence
Fix spontaneous failures of test cases due to delayed client disconnection
Rename event '*.notify_*' to '*.report_*'
Add a new function to report status of workflow runs
Bump up version
Change-Id: I4fe25ec504751c6ea7a196c56ee4d157bab35abd
diff --git a/spec/websocket_clients.spec.js b/spec/websocket_clients.spec.js
new file mode 100644
index 0000000..b1f6333
--- /dev/null
+++ b/spec/websocket_clients.spec.js
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+(function () {
+ 'use strict';
+
+ const path = require('path');
+ const chai = require('chai');
+ const expect = chai.expect;
+ const sinonChai = require('sinon-chai');
+ chai.use(sinonChai);
+ const io = require('socket.io-client');
+ const async = require('async');
+ const _ = require('lodash');
+ const server = require('../src/server.js');
+ const port = 4000;
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ const essenceLoader = require('../src/workflows/loader.js');
+ const essenceFileName = path.join(__dirname, 'test_clients_workflow_essence.json');
+ const workflowIdInEssence = 'test_clients_workflow'
+
+ describe('Simple websocket client test', function() {
+
+ var probeClient;
+ var workflowManagerClient;
+ var workflowRunClient;
+ var workflowId;
+ var workflowRunId;
+
+ this.slow(5000);
+
+ before(function() {
+ // Start our server
+ server.start(port);
+ });
+
+ after(function() {
+ server.stop();
+ });
+
+ beforeEach(function(done) {
+ let workflowCheckResults = [];
+ async.series([
+ (callback) => {
+ // connect a probe to the server
+ // to send events for test
+ probeClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org'
+ });
+
+ probeClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // connect a workflow manager to the server
+ // to register a test workflow
+ workflowManagerClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_manager_id&type=workflow_manager' +
+ '&name=manager@xos.org'
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
+ workflowRunId = message.workflow_run_id;
+
+ // call-back
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REPORT_NEW_RUN, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ })
+ });
+
+ workflowManagerClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ },
+ (callback) => {
+ // check existance of the workflow
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+ let workflowCnt=0;
+
+ _.forOwn(essence, (_value, essenceWorkflowId) => {
+ workflowId = essenceWorkflowId; // preseve only the last one for test
+
+ workflowCnt++;
+
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_CHECK, {
+ workflow_id: essenceWorkflowId
+ });
+
+ workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_CHECK, (workflowCheckResult) => {
+ workflowCnt--;
+ workflowCheckResults.push(workflowCheckResult.result);
+
+ if(workflowCnt <= 0) {
+ callback(null, workflowCheckResults);
+ }
+ });
+ });
+ return;
+ },
+ (callback) => {
+ // register the workflow
+ let register = false;
+ workflowCheckResults.forEach((workflowCheckResult) => {
+ if(!workflowCheckResult) {
+ register = true;
+ }
+ });
+
+ if(register) {
+ let essence = essenceLoader.loadEssence(essenceFileName, true);
+
+ workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE, {
+ essence: essence
+ });
+
+ workflowManagerClient.on(
+ eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE,
+ (workflowRegResult) => {
+ callback(null, workflowRegResult);
+ }
+ );
+ }
+ else {
+ callback(null, true);
+ }
+ return;
+ },
+ (callback) => {
+ // kickstart the test workflow
+ probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
+ setTimeout(() => {
+ expect(workflowRunId).to.not.be.undefined;
+ callback(null, true);
+ }, 1000);
+ return;
+ },
+ (callback) => {
+ // connect a workflow run client to the server
+ workflowRunClient = io.connect(`http://localhost:${port}`, {
+ query: 'id=workflow_run_id&type=workflow_run' +
+ `&workflow_id=${workflowIdInEssence}&workflow_run_id=${workflowRunId}` +
+ '&name=run@xos.org'
+ });
+
+ // when is connected start testing
+ workflowRunClient.on('connect', () => {
+ callback(null, true);
+ });
+ return;
+ }
+ ],
+ function(err, results) {
+ // we do not actually check results
+ if(results.includes(false)) {
+ done.fail(err);
+ }
+ else {
+ done();
+ }
+ });
+ return;
+ });
+
+ afterEach(function(done) {
+ // remove workflow run
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
+ workflow_id: workflowId,
+ workflow_run_id: workflowRunId
+ });
+
+ // remove workflow
+ workflowManagerClient.emit(server.serviceEvents.WORKFLOW_REMOVE, {
+ workflow_id: workflowId
+ });
+
+ workflowId = null;
+ workflowRunId = null;
+
+ // disconnect clients
+ if(workflowManagerClient.connected) {
+ workflowManagerClient.disconnect();
+ }
+ workflowManagerClient = null;
+
+ if(workflowRunClient.connected) {
+ workflowRunClient.disconnect();
+ }
+ workflowRunClient = null;
+
+ if(probeClient.connected) {
+ probeClient.disconnect();
+ }
+ probeClient = null;
+
+ done();
+ });
+
+ it('should have a probe, a workflow manager and a workflow run', function(done) {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ expect(
+ 'probe_id' in eventrouter.getClients(),
+ 'a client called prove_id exists'
+ ).to.equal(true);
+ expect(
+ 'workflow_manager_id' in eventrouter.getClients(),
+ 'a client called workflow_manager_id exists'
+ ).to.equal(true);
+ expect(
+ 'workflow_run_id' in eventrouter.getClients(),
+ 'a client called workflow_run_id exists'
+ ).to.equal(true);
+ done();
+ });
+
+ it('should store user details for a new connection', function() {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+
+ const probe = eventrouter.getClients()['probe_id'];
+ expect(probe.getParams().name).to.equal('probe@xos.org');
+
+ const manager = eventrouter.getClients()['workflow_manager_id'];
+ expect(manager.getParams().name).to.equal('manager@xos.org');
+
+ const run = eventrouter.getClients()['workflow_run_id'];
+ expect(run.getParams().name).to.equal('run@xos.org');
+ });
+
+ it('should not store the same user twice', function(done) {
+ // This test case makes cleaning up process taking long time because it leaves
+ // a client socket. It seems there's no way to release it from server-side.
+
+ // connect a client to the server
+ const client2 = io.connect(`http://localhost:${port}`, {
+ query: 'id=probe_id&type=probe' +
+ '&name=probe@xos.org&value=different_value'
+ });
+
+ // when is connected start testing
+ client2.on('connect', () => {
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(1);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(3);
+
+ done();
+ }, 100);
+ });
+ });
+
+ it('should remove a user on disconnect', function(done) {
+ workflowManagerClient.disconnect();
+ workflowRunClient.disconnect();
+ probeClient.disconnect();
+
+ // we need to wait for the event to be dispatched
+ setTimeout(() => {
+ const eventrouter = require('../src/controllers/eventrouter.js');
+ expect(
+ Object.keys(eventrouter.getWorkflowRunClients()).length,
+ 'num of workflow run clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getWorkflowManagerClients()).length,
+ 'num of workflow manager clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getProbeClients()).length,
+ 'num of probe clients'
+ ).to.equal(0);
+ expect(
+ Object.keys(eventrouter.getClients()).length,
+ 'total num of clients'
+ ).to.equal(0);
+ done();
+ }, 100);
+ });
+ });
+})();