blob: acf1e39137c308184d03668f1432c8f98bd3dee6 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const path = require('path');
22 const chai = require('chai');
23 const expect = chai.expect;
24 const sinonChai = require('sinon-chai');
25 chai.use(sinonChai);
26 const io = require('socket.io-client');
27 const async = require('async');
28 const _ = require('lodash');
29 const server = require('../src/server.js');
30 const port = 4000;
31 const eventrouter = require('../src/controllers/eventrouter.js');
32 const essenceLoader = require('../src/workflows/loader.js');
33 const essenceFileName = path.join(__dirname, 'test_multi_workflow_essence.json');
34
35 var probeClient;
36 var workflowManagerClients = [];
37 var workflowRunClients = [];
38 var workflowIds = [];
39 var workflowRunInfos = [];
40
41 var receivedKickstartMessages = [[],[]];
42
43 describe('Workflow kickstart test', function() {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070044 this.slow(5000);
Illyoung Choi59820ed2019-06-24 17:01:00 -070045
46 before(function() {
47 // Start our server
48 server.start(port);
49 });
50
51 after(function() {
52 server.stop();
53 });
54
55 beforeEach(function(done) {
56 async.series([
57 (callback) => {
58 // connect a probe to the server
59 // to send events for test
60 probeClient = io.connect(`http://localhost:${port}`, {
61 query: 'id=probe_id&type=probe' +
62 '&name=probe@xos.org'
63 });
64
65 probeClient.on('connect', () => {
66 callback(null, true);
67 });
68 return;
69 },
70 (callback) => {
71 // connect first workflow manager to the server
72 // this manager will kickstart a workflow
73 let workflowManagerClient = io.connect(`http://localhost:${port}`, {
74 query: 'id=workflow_manager_id1&type=workflow_manager' +
75 '&name=manager1@xos.org'
76 });
77
78 workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
79 // save it for check
80 receivedKickstartMessages[0].push(message);
81
82 // save workflow_id and workflow_run_id
83 workflowRunInfos.push({
84 workflowId: message.workflow_id,
85 workflowRunId: message.workflow_run_id
86 });
87
88 setTimeout(() => {
89 // call-back
Illyoung Choib4fc0d82019-07-16 10:29:39 -070090 workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
Illyoung Choi59820ed2019-06-24 17:01:00 -070091 workflow_id: message.workflow_id,
92 workflow_run_id: message.workflow_run_id
93 })
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070094 }, 1000);
Illyoung Choi59820ed2019-06-24 17:01:00 -070095 });
96
97 workflowManagerClient.on('connect', () => {
98 callback(null, true);
99 });
100
101 workflowManagerClients.push(workflowManagerClient);
102 return;
103 },
104 (callback) => {
105 // connect second workflow manager to the server
106 // this manager will not kickstart a workflow
107 let workflowManagerClient = io.connect(`http://localhost:${port}`, {
108 query: 'id=workflow_manager_id2&type=workflow_manager' +
109 '&name=manager2@xos.org'
110 });
111
112 workflowManagerClient.on(eventrouter.serviceEvents.WORKFLOW_KICKSTART, (message) => {
113 receivedKickstartMessages[1].push(message);
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700114
115 setTimeout(() => {
116 // call-back
117 workflowManagerClient.emit(eventrouter.serviceEvents.WORKFLOW_NOTIFY_NEW_RUN, {
118 workflow_id: message.workflow_id,
119 workflow_run_id: message.workflow_run_id
120 })
121 }, 1000);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700122 });
123
124 workflowManagerClient.on('connect', () => {
125 callback(null, true);
126 });
127
128 workflowManagerClients.push(workflowManagerClient);
129 return;
130 },
131 (callback) => {
132 let essence = essenceLoader.loadEssence(essenceFileName, true);
133
134 // register the workflow
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700135 workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700136 essence: essence
137 });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700138
139 _.forOwn(essence, (_value, workflowId) => {
140 // save
141 workflowIds.push(workflowId);
142 });
143
144 // handle return
145 workflowManagerClients[0].on(
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700146 eventrouter.serviceEvents.WORKFLOW_REGISTER_ESSENCE,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700147 (workflowRegResult) => {
148 callback(null, workflowRegResult);
149 }
150 );
151 return;
152 }
153 ],
154 function(err, results) {
155 // we do not actually check results;
156 if(results.includes(false)) {
157 done.fail(err);
158 }
159 else {
160 done();
161 }
162 });
163 return;
164 });
165
166 afterEach(function() {
167 // remove workflow runs
168 _.forOwn(workflowRunInfos, (workflowRunInfo) => {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700169 workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE_RUN, {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700170 workflow_id: workflowRunInfo.workflowId,
171 workflow_run_id: workflowRunInfo.workflowRunId
172 });
173 });
174 workflowRunInfos.length = 0;
175
176 // remove workflows
177 _.forOwn(workflowIds, (workflowId) => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700178 workflowManagerClients[0].emit(server.serviceEvents.WORKFLOW_REMOVE, {
179 workflow_id: workflowId
180 });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700181 });
182 workflowIds.length = 0;
183
184 // remove message store
185 receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
186 receivedKickstartMessageStore.length = 0;
187 });
188
189 // disconnect clients
190 workflowManagerClients.forEach((workflowManagerClient) => {
191 if(workflowManagerClient.connected) {
192 workflowManagerClient.disconnect();
193 }
194 });
195 workflowManagerClients.length = 0;
196
197 workflowRunClients.forEach((workflowRunClient) => {
198 if(workflowRunClient.connected) {
199 workflowRunClient.disconnect();
200 }
201 });
202 workflowRunClients.length = 0;
203
204 if(probeClient.connected) {
205 probeClient.disconnect();
206 }
207 probeClient = null;
208 });
209
210 it('should have two workflows', function(done) {
211 workflowManagerClients[0].on(eventrouter.serviceEvents.WORKFLOW_LIST, (result) => {
212 let workflowsList = result.result;
213 expect(workflowsList.length).to.equal(2);
214 workflowsList.forEach((workflowIdInList) => {
215 expect(workflowIds).to.includes(workflowIdInList);
216 });
217 done();
218 });
219
220 workflowManagerClients[0].emit(eventrouter.serviceEvents.WORKFLOW_LIST, {});
221 });
222
223 it('all managers should receive kickstart messages', function(done) {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700224 this.timeout(5000);
225
Illyoung Choi59820ed2019-06-24 17:01:00 -0700226 // kickstart the workflow
227 probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
228 setTimeout(() => {
229 expect(receivedKickstartMessages.length, 'num of message stores').to.equal(2);
230 receivedKickstartMessages.forEach((receivedKickstartMessageStore) => {
231 expect(receivedKickstartMessageStore.length, 'num of messages in a store').to.equal(1);
232 });
233 done();
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700234 }, 2000);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700235 });
236
237 it('should have only one workflow run', function(done) {
238 this.timeout(5000);
239
240 // kickstart the workflow
241 probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
242 setTimeout(() => {
243 // kickstart will take 2 seconds roughly
244 expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
245 // the workflow must be 'should_be_called'
246 expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
247 done();
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700248 }, 2000);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700249 });
250
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700251 it('should read an event that is used for workflow kickstart', function(done) {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700252 this.timeout(5000);
253
254 // kickstart the workflow
255 probeClient.emit('onu.events', {serialNumber: 'testSerialXXX', other: 'test_other_field'});
256 setTimeout(() => {
257 // kickstart will take 2 seconds roughly
258 expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
259 // the workflow must be 'should_be_called'
260 expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
261
262 // connect a workflow run client to the server
263 let workflowRunClient = io.connect(`http://localhost:${port}`, {
264 query: 'id=workflow_run_id1&type=workflow_run' +
265 `&workflow_id=${workflowRunInfos[0].workflowId}` +
266 `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
267 '&name=run1@xos.org'
268 });
269 workflowRunClients.push(workflowRunClient);
270
271 workflowRunClient.on('connect', () => {
272 workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
273 workflow_id: workflowRunInfos[0].workflowId,
274 workflow_run_id: workflowRunInfos[0].workflowRunId,
275 task_id: 'onu_event_handler',
276 topic: 'onu.events'
277 });
278 });
279
280 workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
281 let event = result.result;
282 expect(event.topic).to.equal('onu.events');
283 expect(event.message.serialNumber).to.equal('testSerialXXX');
284 done();
285 });
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700286 }, 2000);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700287 });
288
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700289 it('should map a workflow run using key-field', function(done) {
290 this.timeout(5000);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700291
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700292 // kickstart the workflow
293 probeClient.emit(
294 'onu.events',
295 {serialNumber: 'testSerialXXX', other: 'test_other_field'}
296 );
297 probeClient.emit(
298 'datamodel.AttWorkflowDriverServiceInstance',
299 {operation: 'update', serialNumber: 'testSerialXXX', other: 'updated_test_other_field'}
300 );
Illyoung Choi59820ed2019-06-24 17:01:00 -0700301 setTimeout(() => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700302 // kickstart will take 2 seconds roughly
303 expect(workflowRunInfos.length, 'num of workflow runs').to.equal(1);
304 // the workflow must be 'should_be_called'
305 expect(workflowRunInfos[0].workflowId, 'workflow id').to.equal('should_be_called');
306
307 // connect a workflow run client to the server
308 let workflowRunClient = io.connect(`http://localhost:${port}`, {
309 query: 'id=workflow_run_id1&type=workflow_run' +
310 `&workflow_id=${workflowRunInfos[0].workflowId}` +
311 `&workflow_run_id=${workflowRunInfos[0].workflowRunId}` +
312 '&name=run1@xos.org'
313 });
314 workflowRunClients.push(workflowRunClient);
315
316 workflowRunClient.on('connect', () => {
317 // check message counts
318 workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, {
319 workflow_id: workflowRunInfos[0].workflowId,
320 workflow_run_id: workflowRunInfos[0].workflowRunId
321 });
322 });
323
324 let eventRaised = 0;
325 workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_COUNT_EVENTS, (result) => {
326 let count = result.result;
327 expect(count, 'number of events queued').to.equal(2);
328
329 // fetch two events
330 workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
331 workflow_id: workflowRunInfos[0].workflowId,
332 workflow_run_id: workflowRunInfos[0].workflowRunId,
333 task_id: 'onu_event_handler',
334 topic: 'onu.events'
335 });
336 eventRaised++;
337
338 workflowRunClient.emit(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, {
339 workflow_id: workflowRunInfos[0].workflowId,
340 workflow_run_id: workflowRunInfos[0].workflowRunId,
341 task_id: 'onu_model_event_handler',
342 topic: 'datamodel.AttWorkflowDriverServiceInstance'
343 });
344 eventRaised++;
345 });
346
347 workflowRunClient.on(eventrouter.serviceEvents.WORKFLOW_RUN_FETCH_EVENT, (result) => {
348 let event = result.result;
349 if(eventRaised === 2) {
350 expect(event.topic).to.equal('onu.events');
351 expect(event.message.serialNumber).to.equal('testSerialXXX');
352 }
353 else if(eventRaised === 1) {
354 expect(event.topic).to.equal('datamodel.AttWorkflowDriverServiceInstance');
355 expect(event.message.serialNumber).to.equal('testSerialXXX');
356 }
357 eventRaised--;
358
359 if(eventRaised === 0) {
360 done();
361 }
362 });
363 }, 2000);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700364 });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700365 });
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700366})();