blob: 334c08b3c278b8701f0879b2c153ca6449a5f498 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070025 const ws_probe = require('./ws_probe.js');
Illyoung Choi59820ed2019-06-24 17:01:00 -070026 const ws_manager = require('./ws_manager.js');
27 const ws_workflowrun = require('./ws_workflowrun.js');
28
29 let allClients = {}; // has publishers and subscribers
30 let probeClients = {}; // a subset of clients
31 let workflowManagerClients = {}; // a subset of clients
32 let workflowRunClients = {}; // a subset of clients
33
34 //let io;
35
36 // key: workflow id
37 // value: Workflow instance
38 let workflows = {};
39
40 // key: workflow run id
41 // value: WorkflowRun instance
42 let workflowRuns = {};
43
44 let serviceEvents = {
45 GREETING: 'cord.workflow.ctlsvc.greeting'
46 };
47
Illyoung Choiab109032019-07-29 14:04:10 -070048 setInterval(function () {
49 let requests = [];
50 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
51 let obj = {
52 workflow_id: workflowRun.getWorkflowId(),
53 workflow_run_id: workflowRunId
54 };
55 requests.push(obj);
56 });
57
58 checkWorkflowRunStatusBulk(requests);
59 }, 5000);
60
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070061 // add ws_probe events
62 _.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
63 serviceEvents[key] = wsServiceEvent;
64 });
65
66 // add ws_manager events
Illyoung Choi59820ed2019-06-24 17:01:00 -070067 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
68 serviceEvents[key] = wsServiceEvent;
69 });
70
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070071 // add ws_workflowrun events
Illyoung Choi59820ed2019-06-24 17:01:00 -070072 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
73 serviceEvents[key] = wsServiceEvent;
74 });
75
76 //const setIO = (ioInstance) => {
77 // io = ioInstance;
78 //};
79
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070080 const checkObject = (obj) => {
81 return Object.prototype.toString.call(obj) === '[object Object]';
82 };
83
Illyoung Choi59820ed2019-06-24 17:01:00 -070084 const destroy = () => {
85 removeClients();
86 clearWorkflowRuns();
87 clearWorkflows();
88 };
89
90 const listWorkflows = () => {
91 let workflowList = [];
92 _.forOwn(workflows, (_workflow, workflowId) => {
93 workflowList.push(workflowId);
94 });
95 return workflowList;
96 };
97
98 const checkWorkflow = (workflowId) => {
99 if(workflowId in workflows) {
100 return true;
101 }
102 return false;
103 };
104
105 const addWorkflow = (workflow) => {
106 if(workflow.getId() in workflows) {
107 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
108 return false;
109 }
110
111 let workflowId = workflow.getId();
112 workflows[workflowId] = workflow;
113 return true;
114 };
115
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700116 const getWorkflow = (workflowId) => {
117 if(workflowId in workflows) {
118 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
119 return null;
120 }
121
122 return workflows[workflowId];
123 };
124
Illyoung Choi59820ed2019-06-24 17:01:00 -0700125 const clearWorkflows = () => {
126 _.forOwn(workflows, (_workflow, workflowId) => {
127 delete workflows[workflowId];
128 });
129 };
130
131 const listWorkflowRuns = () => {
132 let workflowRunList = [];
133 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
134 workflowRunList.push(workflowRunId);
135 });
136 return workflowRunList;
137 };
138
139 const checkWorkflowRun = (workflowRunId) => {
140 if(workflowRunId in workflowRuns) {
141 return true;
142 }
143 return false;
144 };
145
146 const addWorkflowRun = (workflowRun) => {
147 let workflowId = workflowRun.getWorkflowId();
148 let workflowRunId = workflowRun.getId();
149
150 if(workflowRunId in workflowRuns) {
151 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
152 return false;
153 }
154
155 if(!(workflowId in workflows)) {
156 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
157 return false;
158 }
159
160 workflowRuns[workflowRunId] = workflowRun;
161 return true;
162 };
163
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700164 const getWorkflowRun = (workflowRunId) => {
165 if(workflowRunId in workflowRuns) {
166 logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
167 return null;
168 }
169
170 return workflowRuns[workflowRunId];
171 };
172
Illyoung Choi59820ed2019-06-24 17:01:00 -0700173 const clearWorkflowRuns = () => {
174 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
175 delete workflowRuns[workflowRunId];
176 });
177 };
178
179 const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
180 if(!(workflowRunId in workflowRuns)) {
181 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
182 return false;
183 }
184
185 let workflowRun = workflowRuns[workflowRunId];
186 workflowRun.updateTaskStatus(taskId, status);
187 return true;
188 };
189
190 const setWorkflowRunKickstarted = (workflowRunId) => {
191 if(!(workflowRunId in workflowRuns)) {
192 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
193 return false;
194 }
195
196 let workflowRun = workflowRuns[workflowRunId];
197 workflowRun.setKickstarted();
198 return true;
199 };
200
Illyoung Choiab109032019-07-29 14:04:10 -0700201 const setWorkflowRunStatus = (workflowRunId, status) => {
Illyoung Choic707c052019-07-18 13:50:49 -0700202 if(!(workflowRunId in workflowRuns)) {
203 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
204 return false;
205 }
206
Illyoung Choiab109032019-07-29 14:04:10 -0700207 if(status in ['success', 'failed', 'end']) {
Illyoung Choic707c052019-07-18 13:50:49 -0700208 removeWorkflowRun(workflowRunId);
209 }
210 return true;
211 };
212
Illyoung Choi59820ed2019-06-24 17:01:00 -0700213 const kickstart = (workflowId, workflowRunId) => {
214 if(!(workflowId in workflows)) {
215 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
216 return false;
217 }
218
219 if(!(workflowRunId in workflowRuns)) {
220 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
221 return false;
222 }
223
224 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
Illyoung Choic707c052019-07-18 13:50:49 -0700225 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700226 };
227
Illyoung Choiab109032019-07-29 14:04:10 -0700228 /*
229 const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
230 if(!(workflowId in workflows)) {
231 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
232 return false;
233 }
234
235 if(!(workflowRunId in workflowRuns)) {
236 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
237 return false;
238 }
239
240 ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
241 return true;
242 };
243 */
244
245 const checkWorkflowRunStatusBulk = (requests) => {
246 if(requests) {
247 ws_manager.checkWorkflowRunStatusBulk(requests);
248 return true;
249 }
250 return false;
251 };
252
Illyoung Choi59820ed2019-06-24 17:01:00 -0700253 const removeWorkflow = (workflowId) => {
254 if(!(workflowId in workflows)) {
255 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
256 return false;
257 }
258
259 // check if there are workflow runs
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700260 for(let key in workflowRuns) {
261 if (!workflowRuns.hasOwnProperty(key)) {
262 continue;
263 }
264
265 let workflowRun = workflowRuns[key];
Illyoung Choi59820ed2019-06-24 17:01:00 -0700266 if(workflowRun.getWorkflowId() === workflowId) {
267 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
268 return false;
269 }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700270 }
271
272 // we don't use below code becuase it cannot properly stop and return value with 'return'
273 // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
274 // if(workflowRun.getWorkflowId() === workflowId) {
275 // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
276 // return false;
277 // }
278 // });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700279
280 delete workflows[workflowId];
281 return true;
282 };
283
284 const removeWorkflowRun = (workflowRunId) => {
285 if(!(workflowRunId in workflowRuns)) {
286 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
287 return false;
288 }
289
290 let workflowRun = workflowRuns[workflowRunId];
291 delete workflowRuns[workflowRunId];
292
293 workflowRun.setFinished();
294 return true;
295 };
296
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700297 const emitEvent = (topic, message) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700298 // list of workflowIds
299 // to check if there are workflow runs for the events
300 let workflowIdsRunning = [];
301
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700302 logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
303
Illyoung Choi59820ed2019-06-24 17:01:00 -0700304 // route event to running instances
305 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
306 let workflowId = workflowRun.getWorkflowId();
307 let workflow = workflows[workflowId];
308
309 // event will be routed to workflow runs that meet following criteria
310 // 1) the workflow is currently interested in the same topic
311 // (already finished tasks are not counted)
312 // 2) the task's key field and value
313 if(workflowRun.isEventAcceptable(workflow, topic, message)) {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700314 //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700315 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
316 workflowRun.enqueueEvent(topic, message);
317
318 if(!workflowIdsRunning.includes(workflowId)) {
319 workflowIdsRunning.push(workflowId);
320 }
321 }
322 });
323
324 // check if the event is a kickstart event
325 _.forOwn(workflows, (workflow, workflowId) => {
326 if(workflow.isKickstartTopic(topic)) {
327 // check if there is a workflow run for the event
328 // kickstart a workflow if there is no workflows runs for the event
329 if(!workflowIdsRunning.includes(workflowId)) {
330 // we need to buffer the event until workflow run is brought up
331 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700332 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
333
Illyoung Choi59820ed2019-06-24 17:01:00 -0700334 let workflowRunId = workflowRun.getId();
335
336 // register for management
337 workflowRuns[workflowRunId] = workflowRun;
338
339 // route event
Illyoung Choid8f79562019-07-25 12:54:55 -0700340 logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700341 workflowRun.enqueueEvent(topic, message);
342
343 // KICKSTART!
344 kickstart(workflowId, workflowRunId);
345 }
346 }
347 });
Illyoung Choid8f79562019-07-25 12:54:55 -0700348
349 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700350 };
351
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700352 const countQueuedEvents = (workflowRunId) => {
353 // this counts queued events
354 if(!(workflowRunId in workflowRuns)) {
355 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
356 return null;
357 }
358
359 let workflowRun = workflowRuns[workflowRunId];
360 return workflowRun.lengthEventQueue();
361 };
362
Illyoung Choi59820ed2019-06-24 17:01:00 -0700363 const fetchEvent = (workflowRunId, taskId, topic) => {
364 // this returns an event or an empty obj when there is no message
365 if(!(workflowRunId in workflowRuns)) {
366 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
367 return null;
368 }
369
370 let workflowRun = workflowRuns[workflowRunId];
371 let workflowId = workflowRun.getWorkflowId();
372
373 if(!(workflowId in workflows)) {
374 logger.log('warn', `workflow ${workflowId} does not exist`);
375 return null;
376 }
377
378 let workflow = workflows[workflowId];
379
380 let task = workflow.getTask(taskId);
381 if(!task) {
382 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
383 return null;
384 }
385
386 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
387
388 let event = workflowRun.dequeueEvent(topic);
389 if(event) {
390 return event;
391 }
392 else {
393 return {};
394 }
395 };
396
397 const addClient = (c) => {
398 let clientId = c.getId();
399 let socket = c.getSocket();
400
401 // check id that client is already there
402 if(clientId in allClients) {
403 logger.log('warn', `there exists a client with the same id - ${clientId}`);
404 return false;
405 }
406
407 if(c.getType() === Client.Type.PROBE) {
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700408 // probe
Illyoung Choi59820ed2019-06-24 17:01:00 -0700409 // probe protocol:
410 // REQ:
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700411 // topic: operation
412 // message: {
413 // req_id: <req_id>,
414 // topic: <topic>,
415 // message: <data>
416 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700417 // RES:
418 // topic: topic sent
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700419 // message: {
420 // req_id: <req_id>,
421 // error: <true/false>,
422 // result: <true/false>,
423 // message: <error message>
424 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700425 allClients[clientId] = c;
426 probeClients[clientId] = c;
427
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700428 // attach probe operations
429 let router = ws_probe.getRouter();
430 _.forOwn(router, (routerElem, _key) => {
431 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700432 logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
433
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700434 // handle a common parameter - req_id
435 // when we get req_id, return the same req_id in response.
436 // this is to help identify a request from a response at client-side
437 let req_id = 101010; // default number, signiture
438 if(msg && checkObject(msg)) {
439 if('req_id' in msg) {
440 req_id = msg.req_id;
441 }
442 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700443
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700444 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
445 if(err) {
446 logger.log('warn', `unable to handle a message - ${err}`);
447 socket.emit(routerElem.topic, {
448 req_id: req_id,
449 error: true,
450 result: result,
451 message: err
452 });
453 return;
454 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700455
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700456 // we return result
457 if(routerElem.return === undefined || routerElem.return) {
458 socket.emit(routerElem.topic, {
459 req_id: req_id,
460 error: false,
461 result: result
462 });
463 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700464 });
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700465 });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700466 });
467 return true;
468 }
469 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
470 // manager
471 // manager protocol:
472 // REQ:
473 // topic: operation
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700474 // message: {
475 // req_id: <req_id>,
476 // <data>...
477 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700478 // RES:
479 // topic: topic sent
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700480 // message: {
481 // req_id: <req_id>,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700482 // error: <true/false>,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700483 // result: <true/false>,
484 // message: <error message>
485 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700486 allClients[clientId] = c;
487 workflowManagerClients[clientId] = c;
488
489 // attach manager operations
490 let router = ws_manager.getRouter();
491 _.forOwn(router, (routerElem, _key) => {
492 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700493 logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
494
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700495 // handle a common parameter - req_id
496 // when we get req_id, return the same req_id in response.
497 // this is to help identify a request from a response at client-side
498 let req_id = 101010; // default number, signiture
499 if(msg && checkObject(msg)) {
500 if('req_id' in msg) {
501 req_id = msg.req_id;
502 }
503 }
504
505 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700506 if(err) {
507 logger.log('warn', `unable to handle a message - ${err}`);
508 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700509 req_id: req_id,
510 error: true,
511 result: result,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700512 message: err
513 });
514 return;
515 }
516
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700517 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700518 if(routerElem.return === undefined || routerElem.return) {
519 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700520 req_id: req_id,
521 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700522 result: result
523 });
524 }
525 });
526 });
527 });
528 return true;
529 }
530 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
531 // workflow run
532 // workflow run protocol:
533 // REQ:
534 // topic: operation
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700535 // message: {
536 // req_id: <req_id>,
537 // <data>...
538 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700539 // RES:
540 // topic: topic sent
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700541 // message: {
542 // req_id: <req_id>,
543 // error: <true/false>,
544 // result: <true/false>,
545 // message: <error message>
546 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700547
548 // map to WorkflowRun instance
549 let workflowId = c.getWorkflowId();
550 let workflowRunId = c.getWorkflowRunId();
551 let workflowRun;
552
553 if(!(workflowId in workflows)) {
554 logger.log('warn', `cannot find a workflow ${workflowId}`);
555 return false;
556 }
557
558 // register client to workflow run
559 if(!(workflowRunId in workflowRuns)) {
560 // workflow run not exist yet
561 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
562 return false;
563 }
564
565 //let workflow = workflows[workflowId];
566
567 allClients[clientId] = c;
568 workflowRunClients[clientId] = c;
569
570 // update
571 workflowRun = workflowRuns[workflowRunId];
572 workflowRun.addClientId(clientId);
573
574 // attach workflow run operations
575 let router = ws_workflowrun.getRouter();
576 _.forOwn(router, (routerElem, _key) => {
577 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700578 logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
579
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700580 // handle a common parameter - req_id
581 // when we get req_id, return the same req_id in response.
582 // this is to help identify a request from a response at client-side
583 let req_id = 101010; // default number, signiture
584 if(msg && checkObject(msg)) {
585 if('req_id' in msg) {
586 req_id = msg.req_id;
587 }
588 }
589
590 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700591 if(err) {
592 logger.log('warn', `unable to handle a message - ${err}`);
593 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700594 req_id: req_id,
595 error: true,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700596 result: false,
597 message: err
598 });
599 return;
600 }
601
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700602 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700603 if(routerElem.return === undefined || routerElem.return) {
604 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700605 req_id: req_id,
606 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700607 result: result
608 });
609 }
610 });
611 });
612 });
613 return true;
614 }
615 return false;
616 };
617
618 const removeClient = (id) => {
619 if(id in allClients) {
620 let removedClient = allClients[id];
621 delete allClients[id];
622
623 let type = removedClient.getType();
624 if(type === Client.Type.PROBE) {
625 delete probeClients[id];
626 }
627 else if(type === Client.Type.WORKFLOW_MANAGER) {
628 delete workflowManagerClients[id];
629 }
630 else if(type === Client.Type.WORKFLOW_RUN) {
631 delete workflowRunClients[id];
632
633 let workflowRunId = removedClient.getWorkflowRunId();
634 let workflowRun = workflowRuns[workflowRunId];
635
636 if(workflowRun) {
637 workflowRun.removeClientId(id);
638
639 //TODO
640 // WorkflowRun can have no clients between tasks
641 // So we should not remove the run until the workflow run finishes
642 }
643 }
644 }
645 };
646
647 const removeClients = () => {
648 let probeClients = {};
649
650 _.forOwn(probeClients, (_probeClient, clientId) => {
651 delete probeClients[clientId];
652 });
653
654 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
655 delete workflowManagerClients[clientId];
656 });
657
658 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
659 delete workflowRunClients[clientId];
660 });
661
662 _.forOwn(allClients, (client, clientId) => {
663 client.getSocket().disconnect(true);
664 delete allClients[clientId];
665 });
666 }
667
668 module.exports = {
669 serviceEvents: serviceEvents,
670 destroy: destroy,
671 getClients: () => { return allClients; },
672 getProbeClients: () => { return probeClients; },
673 getWorkflowManagerClients: () => { return workflowManagerClients; },
674 getWorkflowRunClients: () => { return workflowRunClients; },
675 clientType: Client.Type,
676 //setIO: setIO,
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700677 emitEvent: emitEvent,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700678 countQueuedEvents: countQueuedEvents,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700679 fetchEvent: fetchEvent,
680 addClient: addClient,
681 removeClient: removeClient,
682 removeClients: removeClients,
683 addWorkflow: addWorkflow,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700684 getWorkflow: getWorkflow,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700685 listWorkflows: listWorkflows,
686 checkWorkflow: checkWorkflow,
687 removeWorkflow: removeWorkflow,
688 clearWorkflows: clearWorkflows,
689 addWorkflowRun: addWorkflowRun,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700690 getWorkflowRun: getWorkflowRun,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700691 listWorkflowRuns: listWorkflowRuns,
692 checkWorkflowRun: checkWorkflowRun,
693 removeWorkflowRun: removeWorkflowRun,
694 clearWorkflowRuns: clearWorkflowRuns,
695 updateWorkflowRunStatus: updateWorkflowRunStatus,
696 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
Illyoung Choiab109032019-07-29 14:04:10 -0700697 setWorkflowRunStatus: setWorkflowRunStatus
Illyoung Choi59820ed2019-06-24 17:01:00 -0700698 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700699})();