blob: 5469fc25efa66e0deb9ad60fa4fa02f315c95844 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070025 const ws_probe = require('./ws_probe.js');
Illyoung Choi59820ed2019-06-24 17:01:00 -070026 const ws_manager = require('./ws_manager.js');
27 const ws_workflowrun = require('./ws_workflowrun.js');
28
29 let allClients = {}; // has publishers and subscribers
30 let probeClients = {}; // a subset of clients
31 let workflowManagerClients = {}; // a subset of clients
32 let workflowRunClients = {}; // a subset of clients
33
34 //let io;
35
36 // key: workflow id
37 // value: Workflow instance
38 let workflows = {};
39
40 // key: workflow run id
41 // value: WorkflowRun instance
42 let workflowRuns = {};
43
44 let serviceEvents = {
45 GREETING: 'cord.workflow.ctlsvc.greeting'
46 };
47
Illyoung Choiab109032019-07-29 14:04:10 -070048 setInterval(function () {
49 let requests = [];
50 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
51 let obj = {
52 workflow_id: workflowRun.getWorkflowId(),
53 workflow_run_id: workflowRunId
54 };
55 requests.push(obj);
56 });
57
58 checkWorkflowRunStatusBulk(requests);
59 }, 5000);
60
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070061 // add ws_probe events
62 _.forOwn(ws_probe.serviceEvents, (wsServiceEvent, key) => {
63 serviceEvents[key] = wsServiceEvent;
64 });
65
66 // add ws_manager events
Illyoung Choi59820ed2019-06-24 17:01:00 -070067 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
68 serviceEvents[key] = wsServiceEvent;
69 });
70
Illyoung Choi16c6d4f2019-07-24 18:09:26 -070071 // add ws_workflowrun events
Illyoung Choi59820ed2019-06-24 17:01:00 -070072 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
73 serviceEvents[key] = wsServiceEvent;
74 });
75
76 //const setIO = (ioInstance) => {
77 // io = ioInstance;
78 //};
79
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070080 const checkObject = (obj) => {
81 return Object.prototype.toString.call(obj) === '[object Object]';
82 };
83
Illyoung Choi59820ed2019-06-24 17:01:00 -070084 const destroy = () => {
85 removeClients();
86 clearWorkflowRuns();
87 clearWorkflows();
88 };
89
90 const listWorkflows = () => {
91 let workflowList = [];
92 _.forOwn(workflows, (_workflow, workflowId) => {
93 workflowList.push(workflowId);
94 });
95 return workflowList;
96 };
97
98 const checkWorkflow = (workflowId) => {
99 if(workflowId in workflows) {
100 return true;
101 }
102 return false;
103 };
104
105 const addWorkflow = (workflow) => {
106 if(workflow.getId() in workflows) {
107 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
108 return false;
109 }
110
111 let workflowId = workflow.getId();
112 workflows[workflowId] = workflow;
113 return true;
114 };
115
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700116 const getWorkflow = (workflowId) => {
117 if(workflowId in workflows) {
118 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
119 return null;
120 }
121
122 return workflows[workflowId];
123 };
124
Illyoung Choi59820ed2019-06-24 17:01:00 -0700125 const clearWorkflows = () => {
126 _.forOwn(workflows, (_workflow, workflowId) => {
127 delete workflows[workflowId];
128 });
129 };
130
131 const listWorkflowRuns = () => {
132 let workflowRunList = [];
133 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
134 workflowRunList.push(workflowRunId);
135 });
136 return workflowRunList;
137 };
138
139 const checkWorkflowRun = (workflowRunId) => {
140 if(workflowRunId in workflowRuns) {
141 return true;
142 }
143 return false;
144 };
145
146 const addWorkflowRun = (workflowRun) => {
147 let workflowId = workflowRun.getWorkflowId();
148 let workflowRunId = workflowRun.getId();
149
150 if(workflowRunId in workflowRuns) {
151 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
152 return false;
153 }
154
155 if(!(workflowId in workflows)) {
156 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
157 return false;
158 }
159
160 workflowRuns[workflowRunId] = workflowRun;
161 return true;
162 };
163
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700164 const getWorkflowRun = (workflowRunId) => {
165 if(workflowRunId in workflowRuns) {
166 logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
167 return null;
168 }
169
170 return workflowRuns[workflowRunId];
171 };
172
Illyoung Choi59820ed2019-06-24 17:01:00 -0700173 const clearWorkflowRuns = () => {
174 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
175 delete workflowRuns[workflowRunId];
176 });
177 };
178
Illyoung Choi59820ed2019-06-24 17:01:00 -0700179 const setWorkflowRunKickstarted = (workflowRunId) => {
180 if(!(workflowRunId in workflowRuns)) {
181 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
182 return false;
183 }
184
185 let workflowRun = workflowRuns[workflowRunId];
186 workflowRun.setKickstarted();
187 return true;
188 };
189
Illyoung Choiab109032019-07-29 14:04:10 -0700190 const setWorkflowRunStatus = (workflowRunId, status) => {
Illyoung Choic707c052019-07-18 13:50:49 -0700191 if(!(workflowRunId in workflowRuns)) {
192 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
193 return false;
194 }
195
Illyoung Choiab109032019-07-29 14:04:10 -0700196 if(status in ['success', 'failed', 'end']) {
Illyoung Choic707c052019-07-18 13:50:49 -0700197 removeWorkflowRun(workflowRunId);
198 }
199 return true;
200 };
201
Illyoung Choi59820ed2019-06-24 17:01:00 -0700202 const kickstart = (workflowId, workflowRunId) => {
203 if(!(workflowId in workflows)) {
204 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
205 return false;
206 }
207
208 if(!(workflowRunId in workflowRuns)) {
209 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
210 return false;
211 }
212
213 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
Illyoung Choic707c052019-07-18 13:50:49 -0700214 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700215 };
216
Illyoung Choiab109032019-07-29 14:04:10 -0700217 /*
218 const checkWorkflowRunStatus = (workflowId, workflowRunId) => {
219 if(!(workflowId in workflows)) {
220 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
221 return false;
222 }
223
224 if(!(workflowRunId in workflowRuns)) {
225 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
226 return false;
227 }
228
229 ws_manager.checkWorkflowRunStatus(workflowId, workflowRunId);
230 return true;
231 };
232 */
233
234 const checkWorkflowRunStatusBulk = (requests) => {
235 if(requests) {
236 ws_manager.checkWorkflowRunStatusBulk(requests);
237 return true;
238 }
239 return false;
240 };
241
Illyoung Choi59820ed2019-06-24 17:01:00 -0700242 const removeWorkflow = (workflowId) => {
243 if(!(workflowId in workflows)) {
244 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
245 return false;
246 }
247
248 // check if there are workflow runs
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700249 for(let key in workflowRuns) {
250 if (!workflowRuns.hasOwnProperty(key)) {
251 continue;
252 }
253
254 let workflowRun = workflowRuns[key];
Illyoung Choi59820ed2019-06-24 17:01:00 -0700255 if(workflowRun.getWorkflowId() === workflowId) {
256 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
257 return false;
258 }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700259 }
260
261 // we don't use below code becuase it cannot properly stop and return value with 'return'
262 // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
263 // if(workflowRun.getWorkflowId() === workflowId) {
264 // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
265 // return false;
266 // }
267 // });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700268
269 delete workflows[workflowId];
270 return true;
271 };
272
273 const removeWorkflowRun = (workflowRunId) => {
274 if(!(workflowRunId in workflowRuns)) {
275 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
276 return false;
277 }
278
279 let workflowRun = workflowRuns[workflowRunId];
280 delete workflowRuns[workflowRunId];
281
282 workflowRun.setFinished();
283 return true;
284 };
285
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700286 const emitEvent = (topic, message) => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700287 logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
288
Illyoung Choi577ed522019-07-30 16:28:00 -0700289 let runningWorkflows = [];
290
Illyoung Choi59820ed2019-06-24 17:01:00 -0700291 // route event to running instances
292 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
293 let workflowId = workflowRun.getWorkflowId();
294 let workflow = workflows[workflowId];
295
Illyoung Choi582b3e92019-07-29 15:53:54 -0700296 if(workflow.isEventAcceptable(topic)) {
297 logger.log('debug', `workflow ${workflowId} accept the event : topic ${topic}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700298
Illyoung Choi582b3e92019-07-29 15:53:54 -0700299 // event is acceped if event has
300 // the same key field and its value as workflow_run
301 if(workflowRun.isEventAcceptable(topic, message)) {
302 logger.log('debug', `workflow run ${workflowRunId} accept the event : \
303 topic ${topic}, message ${JSON.stringify(message)}`);
304 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
305
306 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
307 workflowRun.enqueueEvent(topic, message);
Illyoung Choi577ed522019-07-30 16:28:00 -0700308
309 // mark to not kickstart a new one
310 runningWorkflows.push(workflowId);
Illyoung Choi582b3e92019-07-29 15:53:54 -0700311 }
312 else {
313 logger.log('debug', `workflow run ${workflowRunId} reject the event : \
314 topic ${topic}, message ${JSON.stringify(message)}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700315 }
316 }
317 });
318
319 // check if the event is a kickstart event
320 _.forOwn(workflows, (workflow, workflowId) => {
Illyoung Choi577ed522019-07-30 16:28:00 -0700321 if(!runningWorkflows.includes(workflowId)) {
322 if(workflow.isKickstartTopic(topic)) {
323 // we need to buffer the event until workflow run is brought up
324 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
325 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700326
Illyoung Choi577ed522019-07-30 16:28:00 -0700327 let workflowRunId = workflowRun.getId();
Illyoung Choi59820ed2019-06-24 17:01:00 -0700328
Illyoung Choi577ed522019-07-30 16:28:00 -0700329 // register for management
330 workflowRuns[workflowRunId] = workflowRun;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700331
Illyoung Choi577ed522019-07-30 16:28:00 -0700332 // route event
333 logger.log('debug', `event ${topic} is routed to a new workflow run ${workflowRunId}`);
334 workflowRun.enqueueEvent(topic, message);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700335
Illyoung Choi577ed522019-07-30 16:28:00 -0700336 // KICKSTART!
337 kickstart(workflowId, workflowRunId);
338 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700339 }
340 });
Illyoung Choid8f79562019-07-25 12:54:55 -0700341
342 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700343 };
344
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700345 const countQueuedEvents = (workflowRunId) => {
346 // this counts queued events
347 if(!(workflowRunId in workflowRuns)) {
348 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
349 return null;
350 }
351
352 let workflowRun = workflowRuns[workflowRunId];
353 return workflowRun.lengthEventQueue();
354 };
355
Illyoung Choi59820ed2019-06-24 17:01:00 -0700356 const fetchEvent = (workflowRunId, taskId, topic) => {
357 // this returns an event or an empty obj when there is no message
358 if(!(workflowRunId in workflowRuns)) {
359 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
360 return null;
361 }
362
363 let workflowRun = workflowRuns[workflowRunId];
364 let workflowId = workflowRun.getWorkflowId();
365
366 if(!(workflowId in workflows)) {
367 logger.log('warn', `workflow ${workflowId} does not exist`);
368 return null;
369 }
370
371 let workflow = workflows[workflowId];
372
373 let task = workflow.getTask(taskId);
374 if(!task) {
375 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
376 return null;
377 }
378
379 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
380
Illyoung Choi5209f732019-07-30 16:54:54 -0700381 let event = workflowRun.dequeueEventByTopic(topic);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700382 if(event) {
383 return event;
384 }
385 else {
386 return {};
387 }
388 };
389
390 const addClient = (c) => {
391 let clientId = c.getId();
392 let socket = c.getSocket();
393
394 // check id that client is already there
395 if(clientId in allClients) {
396 logger.log('warn', `there exists a client with the same id - ${clientId}`);
397 return false;
398 }
399
400 if(c.getType() === Client.Type.PROBE) {
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700401 // probe
Illyoung Choi59820ed2019-06-24 17:01:00 -0700402 // probe protocol:
403 // REQ:
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700404 // topic: operation
405 // message: {
406 // req_id: <req_id>,
407 // topic: <topic>,
408 // message: <data>
409 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700410 // RES:
411 // topic: topic sent
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700412 // message: {
413 // req_id: <req_id>,
414 // error: <true/false>,
415 // result: <true/false>,
416 // message: <error message>
417 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700418 allClients[clientId] = c;
419 probeClients[clientId] = c;
420
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700421 // attach probe operations
422 let router = ws_probe.getRouter();
423 _.forOwn(router, (routerElem, _key) => {
424 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700425 logger.log('debug', `received a probe event ${routerElem.topic} - ${JSON.stringify(msg)}`);
426
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700427 // handle a common parameter - req_id
428 // when we get req_id, return the same req_id in response.
429 // this is to help identify a request from a response at client-side
430 let req_id = 101010; // default number, signiture
431 if(msg && checkObject(msg)) {
432 if('req_id' in msg) {
433 req_id = msg.req_id;
434 }
435 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700436
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700437 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
438 if(err) {
439 logger.log('warn', `unable to handle a message - ${err}`);
440 socket.emit(routerElem.topic, {
441 req_id: req_id,
442 error: true,
443 result: result,
444 message: err
445 });
446 return;
447 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700448
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700449 // we return result
450 if(routerElem.return === undefined || routerElem.return) {
451 socket.emit(routerElem.topic, {
452 req_id: req_id,
453 error: false,
454 result: result
455 });
456 }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700457 });
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700458 });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700459 });
460 return true;
461 }
462 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
463 // manager
464 // manager protocol:
465 // REQ:
466 // topic: operation
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700467 // message: {
468 // req_id: <req_id>,
469 // <data>...
470 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700471 // RES:
472 // topic: topic sent
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700473 // message: {
474 // req_id: <req_id>,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700475 // error: <true/false>,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700476 // result: <true/false>,
477 // message: <error message>
478 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700479 allClients[clientId] = c;
480 workflowManagerClients[clientId] = c;
481
482 // attach manager operations
483 let router = ws_manager.getRouter();
484 _.forOwn(router, (routerElem, _key) => {
485 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700486 logger.log('debug', `received a manager event ${routerElem.topic} - ${JSON.stringify(msg)}`);
487
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700488 // handle a common parameter - req_id
489 // when we get req_id, return the same req_id in response.
490 // this is to help identify a request from a response at client-side
491 let req_id = 101010; // default number, signiture
492 if(msg && checkObject(msg)) {
493 if('req_id' in msg) {
494 req_id = msg.req_id;
495 }
496 }
497
498 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700499 if(err) {
500 logger.log('warn', `unable to handle a message - ${err}`);
501 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700502 req_id: req_id,
503 error: true,
504 result: result,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700505 message: err
506 });
507 return;
508 }
509
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700510 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700511 if(routerElem.return === undefined || routerElem.return) {
512 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700513 req_id: req_id,
514 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700515 result: result
516 });
517 }
518 });
519 });
520 });
521 return true;
522 }
523 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
524 // workflow run
525 // workflow run protocol:
526 // REQ:
527 // topic: operation
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700528 // message: {
529 // req_id: <req_id>,
530 // <data>...
531 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700532 // RES:
533 // topic: topic sent
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700534 // message: {
535 // req_id: <req_id>,
536 // error: <true/false>,
537 // result: <true/false>,
538 // message: <error message>
539 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700540
541 // map to WorkflowRun instance
542 let workflowId = c.getWorkflowId();
543 let workflowRunId = c.getWorkflowRunId();
544 let workflowRun;
545
546 if(!(workflowId in workflows)) {
547 logger.log('warn', `cannot find a workflow ${workflowId}`);
548 return false;
549 }
550
551 // register client to workflow run
552 if(!(workflowRunId in workflowRuns)) {
553 // workflow run not exist yet
554 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
555 return false;
556 }
557
558 //let workflow = workflows[workflowId];
559
560 allClients[clientId] = c;
561 workflowRunClients[clientId] = c;
562
563 // update
564 workflowRun = workflowRuns[workflowRunId];
565 workflowRun.addClientId(clientId);
566
567 // attach workflow run operations
568 let router = ws_workflowrun.getRouter();
569 _.forOwn(router, (routerElem, _key) => {
570 socket.on(routerElem.topic, (msg) => {
Illyoung Choiab109032019-07-29 14:04:10 -0700571 logger.log('debug', `received a workflow run event ${routerElem.topic} - ${JSON.stringify(msg)}`);
572
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700573 // handle a common parameter - req_id
574 // when we get req_id, return the same req_id in response.
575 // this is to help identify a request from a response at client-side
576 let req_id = 101010; // default number, signiture
577 if(msg && checkObject(msg)) {
578 if('req_id' in msg) {
579 req_id = msg.req_id;
580 }
581 }
582
583 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700584 if(err) {
585 logger.log('warn', `unable to handle a message - ${err}`);
586 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700587 req_id: req_id,
588 error: true,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700589 result: false,
590 message: err
591 });
592 return;
593 }
594
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700595 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700596 if(routerElem.return === undefined || routerElem.return) {
597 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700598 req_id: req_id,
599 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700600 result: result
601 });
602 }
603 });
604 });
605 });
606 return true;
607 }
608 return false;
609 };
610
611 const removeClient = (id) => {
612 if(id in allClients) {
613 let removedClient = allClients[id];
614 delete allClients[id];
615
616 let type = removedClient.getType();
617 if(type === Client.Type.PROBE) {
618 delete probeClients[id];
619 }
620 else if(type === Client.Type.WORKFLOW_MANAGER) {
621 delete workflowManagerClients[id];
622 }
623 else if(type === Client.Type.WORKFLOW_RUN) {
624 delete workflowRunClients[id];
625
626 let workflowRunId = removedClient.getWorkflowRunId();
627 let workflowRun = workflowRuns[workflowRunId];
628
629 if(workflowRun) {
630 workflowRun.removeClientId(id);
631
632 //TODO
633 // WorkflowRun can have no clients between tasks
634 // So we should not remove the run until the workflow run finishes
635 }
636 }
637 }
638 };
639
640 const removeClients = () => {
641 let probeClients = {};
642
643 _.forOwn(probeClients, (_probeClient, clientId) => {
644 delete probeClients[clientId];
645 });
646
647 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
648 delete workflowManagerClients[clientId];
649 });
650
651 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
652 delete workflowRunClients[clientId];
653 });
654
655 _.forOwn(allClients, (client, clientId) => {
656 client.getSocket().disconnect(true);
657 delete allClients[clientId];
658 });
659 }
660
661 module.exports = {
662 serviceEvents: serviceEvents,
663 destroy: destroy,
664 getClients: () => { return allClients; },
665 getProbeClients: () => { return probeClients; },
666 getWorkflowManagerClients: () => { return workflowManagerClients; },
667 getWorkflowRunClients: () => { return workflowRunClients; },
668 clientType: Client.Type,
669 //setIO: setIO,
Illyoung Choi16c6d4f2019-07-24 18:09:26 -0700670 emitEvent: emitEvent,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700671 countQueuedEvents: countQueuedEvents,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700672 fetchEvent: fetchEvent,
673 addClient: addClient,
674 removeClient: removeClient,
675 removeClients: removeClients,
676 addWorkflow: addWorkflow,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700677 getWorkflow: getWorkflow,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700678 listWorkflows: listWorkflows,
679 checkWorkflow: checkWorkflow,
680 removeWorkflow: removeWorkflow,
681 clearWorkflows: clearWorkflows,
682 addWorkflowRun: addWorkflowRun,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700683 getWorkflowRun: getWorkflowRun,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700684 listWorkflowRuns: listWorkflowRuns,
685 checkWorkflowRun: checkWorkflowRun,
686 removeWorkflowRun: removeWorkflowRun,
687 clearWorkflowRuns: clearWorkflowRuns,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700688 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
Illyoung Choiab109032019-07-29 14:04:10 -0700689 setWorkflowRunStatus: setWorkflowRunStatus
Illyoung Choi59820ed2019-06-24 17:01:00 -0700690 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700691})();