blob: 548bc48beae8711ed5868462760b6f113a1a1015 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070025 const ws_probe = require('./ws_probe.js');
Illyoung Choi59820ed2019-06-24 17:01:00 -070026 const ws_manager = require('./ws_manager.js');
27 const ws_workflowrun = require('./ws_workflowrun.js');
28
29 let allClients = {}; // has publishers and subscribers
30 let probeClients = {}; // a subset of clients
31 let workflowManagerClients = {}; // a subset of clients
32 let workflowRunClients = {}; // a subset of clients
33
34 //let io;
35
36 // key: workflow id
37 // value: Workflow instance
38 let workflows = {};
39
40 // key: workflow run id
41 // value: WorkflowRun instance
42 let workflowRuns = {};
43
44 let serviceEvents = {
45 GREETING: 'cord.workflow.ctlsvc.greeting'
46 };
47
Illyoung Choiab109032019-07-29 14:04:10 -070048 setInterval(function () {
49 let requests = [];
50 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
51 let obj = {
52 workflow_id: workflowRun.getWorkflowId(),
53 workflow_run_id: workflowRunId
54 };
55 requests.push(obj);
56 });
57
58 checkWorkflowRunStatusBulk(requests);
59 }, 5000);
60
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070061 // add ws_probe events
62 _.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
63 serviceEvents[key] = wsServiceEvent;
64 });
65
66 // add ws_manager events
Illyoung Choi59820ed2019-06-24 17:01:00 -070067 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
68 serviceEvents[key] = wsServiceEvent;
69 });
70
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070071 // add ws_workflowrun events
Illyoung Choi59820ed2019-06-24 17:01:00 -070072 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
73 serviceEvents[key] = wsServiceEvent;
74 });
75
76 //const setIO = (ioInstance) => {
77 // io = ioInstance;
78 //};
79
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070080 const checkObject = (obj) => {
81 return Object.prototype.toString.call(obj) === '[object Object]';
82 };
83
Illyoung Choi59820ed2019-06-24 17:01:00 -070084 const destroy = () => {
85 removeClients();
86 clearWorkflowRuns();
87 clearWorkflows();
88 };
89
90 const listWorkflows = () => {
91 let workflowList = [];
92 _.forOwn(workflows, (_workflow, workflowId) => {
93 workflowList.push(workflowId);
94 });
95 return workflowList;
96 };
97
98 const checkWorkflow = (workflowId) => {
99 if(workflowId in workflows) {
100 return true;
101 }
102 return false;
103 };
104
105 const addWorkflow = (workflow) => {
106 if(workflow.getId() in workflows) {
107 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
108 return false;
109 }
110
111 let workflowId = workflow.getId();
112 workflows[workflowId] = workflow;
113 return true;
114 };
115
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700116 const getWorkflow = (workflowId) => {
117 if(workflowId in workflows) {
118 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
119 return null;
120 }
121
122 return workflows[workflowId];
123 };
124
Illyoung Choi59820ed2019-06-24 17:01:00 -0700125 const clearWorkflows = () => {
126 _.forOwn(workflows, (_workflow, workflowId) => {
127 delete workflows[workflowId];
128 });
129 };
130
131 const listWorkflowRuns = () => {
132 let workflowRunList = [];
133 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
134 workflowRunList.push(workflowRunId);
135 });
136 return workflowRunList;
137 };
138
139 const checkWorkflowRun = (workflowRunId) => {
140 if(workflowRunId in workflowRuns) {
141 return true;
142 }
143 return false;
144 };
145
146 const addWorkflowRun = (workflowRun) => {
147 let workflowId = workflowRun.getWorkflowId();
148 let workflowRunId = workflowRun.getId();
149
150 if(workflowRunId in workflowRuns) {
151 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
152 return false;
153 }
154
155 if(!(workflowId in workflows)) {
156 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
157 return false;
158 }
159
160 workflowRuns[workflowRunId] = workflowRun;
161 return true;
162 };
163
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700164 const getWorkflowRun = (workflowRunId) => {
165 if(workflowRunId in workflowRuns) {
166 logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
167 return null;
168 }
169
170 return workflowRuns[workflowRunId];
171 };
172
Illyoung Choi59820ed2019-06-24 17:01:00 -0700173 const clearWorkflowRuns = () => {
174 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
175 delete workflowRuns[workflowRunId];
176 });
177 };
178
Illyoung Choi59820ed2019-06-24 17:01:00 -0700179 const setWorkflowRunKickstarted = (workflowRunId) => {
180 if(!(workflowRunId in workflowRuns)) {
181 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
182 return false;
183 }
184
185 let workflowRun = workflowRuns[workflowRunId];
186 workflowRun.setKickstarted();
187 return true;
188 };
189
Illyoung Choiab109032019-07-29 14:04:10 -0700190 const setWorkflowRunStatus = (workflowRunId, status) => {
Illyoung Choic707c052019-07-18 13:50:49 -0700191 if(!(workflowRunId in workflowRuns)) {
192 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
193 return false;
194 }
195
Illyoung Choiab109032019-07-29 14:04:10 -0700196 if(status in ['success', 'failed', 'end']) {
Illyoung Choic707c052019-07-18 13:50:49 -0700197 removeWorkflowRun(workflowRunId);
198 }
199 return true;
200 };
201
Illyoung Choi59820ed2019-06-24 17:01:00 -0700202 const kickstart = (workflowId, workflowRunId) => {
203 if(!(workflowId in workflows)) {
204 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
205 return false;
206 }
207
208 if(!(workflowRunId in workflowRuns)) {
209 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
210 return false;
211 }
212
213 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
Illyoung Choic707c052019-07-18 13:50:49 -0700214 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700215 };
216
Illyoung Choiab109032019-07-29 14:04:10 -0700217 /*
218 const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
219 if(!(workflowId in workflows)) {
220 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
221 return false;
222 }
223
224 if(!(workflowRunId in workflowRuns)) {
225 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
226 return false;
227 }
228
229 ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
230 return true;
231 };
232 */
233
234 const checkWorkflowRunStatusBulk = (requests) => {
235 if(requests) {
236 ws_manager.checkWorkflowRunStatusBulk(requests);
237 return true;
238 }
239 return false;
240 };
241
Illyoung Choi59820ed2019-06-24 17:01:00 -0700242 const removeWorkflow = (workflowId) => {
243 if(!(workflowId in workflows)) {
244 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
245 return false;
246 }
247
248 // check if there are workflow runs
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700249 for(let key in workflowRuns) {
250 if (!workflowRuns.hasOwnProperty(key)) {
251 continue;
252 }
253
254 let workflowRun = workflowRuns[key];
Illyoung Choi59820ed2019-06-24 17:01:00 -0700255 if(workflowRun.getWorkflowId() === workflowId) {
256 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
257 return false;
258 }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700259 }
260
261 // we don't use below code becuase it cannot properly stop and return value with 'return'
262 // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
263 // if(workflowRun.getWorkflowId() === workflowId) {
264 // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
265 // return false;
266 // }
267 // });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700268
269 delete workflows[workflowId];
270 return true;
271 };
272
273 const removeWorkflowRun = (workflowRunId) => {
274 if(!(workflowRunId in workflowRuns)) {
275 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
276 return false;
277 }
278
279 let workflowRun = workflowRuns[workflowRunId];
280 delete workflowRuns[workflowRunId];
281
282 workflowRun.setFinished();
283 return true;
284 };
285
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700286 const emitEvent = (topic, message) => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700287 logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
288
Illyoung Choi59820ed2019-06-24 17:01:00 -0700289 // route event to running instances
290 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
291 let workflowId = workflowRun.getWorkflowId();
292 let workflow = workflows[workflowId];
293
Illyoung Choi582b3e92019-07-29 15:53:54 -0700294 if(workflow.isEventAcceptable(topic)) {
295 logger.log('debug', `workflow ${workflowId} accept the event : topic ${topic}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700296
Illyoung Choi582b3e92019-07-29 15:53:54 -0700297 // event is acceped if event has
298 // the same key field and its value as workflow_run
299 if(workflowRun.isEventAcceptable(topic, message)) {
300 logger.log('debug', `workflow run ${workflowRunId} accept the event : \
301 topic ${topic}, message ${JSON.stringify(message)}`);
302 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
303
304 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
305 workflowRun.enqueueEvent(topic, message);
306 }
307 else {
308 logger.log('debug', `workflow run ${workflowRunId} reject the event : \
309 topic ${topic}, message ${JSON.stringify(message)}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700310 }
311 }
312 });
313
314 // check if the event is a kickstart event
315 _.forOwn(workflows, (workflow, workflowId) => {
316 if(workflow.isKickstartTopic(topic)) {
Illyoung Choi582b3e92019-07-29 15:53:54 -0700317 // we need to buffer the event until workflow run is brought up
318 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
319 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700320
Illyoung Choi582b3e92019-07-29 15:53:54 -0700321 let workflowRunId = workflowRun.getId();
Illyoung Choi59820ed2019-06-24 17:01:00 -0700322
Illyoung Choi582b3e92019-07-29 15:53:54 -0700323 // register for management
324 workflowRuns[workflowRunId] = workflowRun;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700325
Illyoung Choi582b3e92019-07-29 15:53:54 -0700326 // route event
327 logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
328 workflowRun.enqueueEvent(topic, message);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700329
Illyoung Choi582b3e92019-07-29 15:53:54 -0700330 // KICKSTART!
331 kickstart(workflowId, workflowRunId);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700332 }
333 });
Illyoung Choid8f79562019-07-25 12:54:55 -0700334
335 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700336 };
337
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700338 const countQueuedEvents = (workflowRunId) => {
339 // this counts queued events
340 if(!(workflowRunId in workflowRuns)) {
341 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
342 return null;
343 }
344
345 let workflowRun = workflowRuns[workflowRunId];
346 return workflowRun.lengthEventQueue();
347 };
348
Illyoung Choi59820ed2019-06-24 17:01:00 -0700349 const fetchEvent = (workflowRunId, taskId, topic) => {
350 // this returns an event or an empty obj when there is no message
351 if(!(workflowRunId in workflowRuns)) {
352 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
353 return null;
354 }
355
356 let workflowRun = workflowRuns[workflowRunId];
357 let workflowId = workflowRun.getWorkflowId();
358
359 if(!(workflowId in workflows)) {
360 logger.log('warn', `workflow ${workflowId} does not exist`);
361 return null;
362 }
363
364 let workflow = workflows[workflowId];
365
366 let task = workflow.getTask(taskId);
367 if(!task) {
368 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
369 return null;
370 }
371
372 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
373
374 let event = workflowRun.dequeueEvent(topic);
375 if(event) {
376 return event;
377 }
378 else {
379 return {};
380 }
381 };
382
383 const addClient = (c) => {
384 let clientId = c.getId();
385 let socket = c.getSocket();
386
387 // check id that client is already there
388 if(clientId in allClients) {
389 logger.log('warn', `there exists a client with the same id - ${clientId}`);
390 return false;
391 }
392
393 if(c.getType() === Client.Type.PROBE) {
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700394 // probe
Illyoung Choi59820ed2019-06-24 17:01:00 -0700395 // probe protocol:
396 // REQ:
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700397 // topic: operation
398 // message: {
399 // req_id: <req_id>,
400 // topic: <topic>,
401 // message: <data>
402 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700403 // RES:
404 // topic: topic sent
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700405 // message: {
406 // req_id: <req_id>,
407 // error: <true/false>,
408 // result: <true/false>,
409 // message: <error message>
410 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700411 allClients[clientId] = c;
412 probeClients[clientId] = c;
413
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700414 // attach probe operations
415 let router = ws_probe.getRouter();
416 _.forOwn(router, (routerElem, _key) => {
417 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700418 logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
419
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700420 // handle a common parameter - req_id
421 // when we get req_id, return the same req_id in response.
422 // this is to help identify a request from a response at client-side
423 let req_id = 101010; // default number, signiture
424 if(msg && checkObject(msg)) {
425 if('req_id' in msg) {
426 req_id = msg.req_id;
427 }
428 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700429
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700430 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
431 if(err) {
432 logger.log('warn', `unable to handle a message - ${err}`);
433 socket.emit(routerElem.topic, {
434 req_id: req_id,
435 error: true,
436 result: result,
437 message: err
438 });
439 return;
440 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700441
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700442 // we return result
443 if(routerElem.return === undefined || routerElem.return) {
444 socket.emit(routerElem.topic, {
445 req_id: req_id,
446 error: false,
447 result: result
448 });
449 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700450 });
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700451 });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700452 });
453 return true;
454 }
455 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
456 // manager
457 // manager protocol:
458 // REQ:
459 // topic: operation
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700460 // message: {
461 // req_id: <req_id>,
462 // <data>...
463 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700464 // RES:
465 // topic: topic sent
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700466 // message: {
467 // req_id: <req_id>,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700468 // error: <true/false>,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700469 // result: <true/false>,
470 // message: <error message>
471 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700472 allClients[clientId] = c;
473 workflowManagerClients[clientId] = c;
474
475 // attach manager operations
476 let router = ws_manager.getRouter();
477 _.forOwn(router, (routerElem, _key) => {
478 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700479 logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
480
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700481 // handle a common parameter - req_id
482 // when we get req_id, return the same req_id in response.
483 // this is to help identify a request from a response at client-side
484 let req_id = 101010; // default number, signiture
485 if(msg && checkObject(msg)) {
486 if('req_id' in msg) {
487 req_id = msg.req_id;
488 }
489 }
490
491 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700492 if(err) {
493 logger.log('warn', `unable to handle a message - ${err}`);
494 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700495 req_id: req_id,
496 error: true,
497 result: result,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700498 message: err
499 });
500 return;
501 }
502
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700503 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700504 if(routerElem.return === undefined || routerElem.return) {
505 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700506 req_id: req_id,
507 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700508 result: result
509 });
510 }
511 });
512 });
513 });
514 return true;
515 }
516 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
517 // workflow run
518 // workflow run protocol:
519 // REQ:
520 // topic: operation
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700521 // message: {
522 // req_id: <req_id>,
523 // <data>...
524 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700525 // RES:
526 // topic: topic sent
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700527 // message: {
528 // req_id: <req_id>,
529 // error: <true/false>,
530 // result: <true/false>,
531 // message: <error message>
532 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700533
534 // map to WorkflowRun instance
535 let workflowId = c.getWorkflowId();
536 let workflowRunId = c.getWorkflowRunId();
537 let workflowRun;
538
539 if(!(workflowId in workflows)) {
540 logger.log('warn', `cannot find a workflow ${workflowId}`);
541 return false;
542 }
543
544 // register client to workflow run
545 if(!(workflowRunId in workflowRuns)) {
546 // workflow run not exist yet
547 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
548 return false;
549 }
550
551 //let workflow = workflows[workflowId];
552
553 allClients[clientId] = c;
554 workflowRunClients[clientId] = c;
555
556 // update
557 workflowRun = workflowRuns[workflowRunId];
558 workflowRun.addClientId(clientId);
559
560 // attach workflow run operations
561 let router = ws_workflowrun.getRouter();
562 _.forOwn(router, (routerElem, _key) => {
563 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700564 logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
565
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700566 // handle a common parameter - req_id
567 // when we get req_id, return the same req_id in response.
568 // this is to help identify a request from a response at client-side
569 let req_id = 101010; // default number, signiture
570 if(msg && checkObject(msg)) {
571 if('req_id' in msg) {
572 req_id = msg.req_id;
573 }
574 }
575
576 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700577 if(err) {
578 logger.log('warn', `unable to handle a message - ${err}`);
579 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700580 req_id: req_id,
581 error: true,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700582 result: false,
583 message: err
584 });
585 return;
586 }
587
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700588 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700589 if(routerElem.return === undefined || routerElem.return) {
590 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700591 req_id: req_id,
592 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700593 result: result
594 });
595 }
596 });
597 });
598 });
599 return true;
600 }
601 return false;
602 };
603
604 const removeClient = (id) => {
605 if(id in allClients) {
606 let removedClient = allClients[id];
607 delete allClients[id];
608
609 let type = removedClient.getType();
610 if(type === Client.Type.PROBE) {
611 delete probeClients[id];
612 }
613 else if(type === Client.Type.WORKFLOW_MANAGER) {
614 delete workflowManagerClients[id];
615 }
616 else if(type === Client.Type.WORKFLOW_RUN) {
617 delete workflowRunClients[id];
618
619 let workflowRunId = removedClient.getWorkflowRunId();
620 let workflowRun = workflowRuns[workflowRunId];
621
622 if(workflowRun) {
623 workflowRun.removeClientId(id);
624
625 //TODO
626 // WorkflowRun can have no clients between tasks
627 // So we should not remove the run until the workflow run finishes
628 }
629 }
630 }
631 };
632
633 const removeClients = () => {
634 let probeClients = {};
635
636 _.forOwn(probeClients, (_probeClient, clientId) => {
637 delete probeClients[clientId];
638 });
639
640 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
641 delete workflowManagerClients[clientId];
642 });
643
644 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
645 delete workflowRunClients[clientId];
646 });
647
648 _.forOwn(allClients, (client, clientId) => {
649 client.getSocket().disconnect(true);
650 delete allClients[clientId];
651 });
652 }
653
654 module.exports = {
655 serviceEvents: serviceEvents,
656 destroy: destroy,
657 getClients: () => { return allClients; },
658 getProbeClients: () => { return probeClients; },
659 getWorkflowManagerClients: () => { return workflowManagerClients; },
660 getWorkflowRunClients: () => { return workflowRunClients; },
661 clientType: Client.Type,
662 //setIO: setIO,
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700663 emitEvent: emitEvent,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700664 countQueuedEvents: countQueuedEvents,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700665 fetchEvent: fetchEvent,
666 addClient: addClient,
667 removeClient: removeClient,
668 removeClients: removeClients,
669 addWorkflow: addWorkflow,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700670 getWorkflow: getWorkflow,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700671 listWorkflows: listWorkflows,
672 checkWorkflow: checkWorkflow,
673 removeWorkflow: removeWorkflow,
674 clearWorkflows: clearWorkflows,
675 addWorkflowRun: addWorkflowRun,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700676 getWorkflowRun: getWorkflowRun,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700677 listWorkflowRuns: listWorkflowRuns,
678 checkWorkflowRun: checkWorkflowRun,
679 removeWorkflowRun: removeWorkflowRun,
680 clearWorkflowRuns: clearWorkflowRuns,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700681 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
Illyoung Choiab109032019-07-29 14:04:10 -0700682 setWorkflowRunStatus: setWorkflowRunStatus
Illyoung Choi59820ed2019-06-24 17:01:00 -0700683 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700684})();