blob: 4919669ee836f03cf037fc146adf04b1314f08a8 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
25 const ws_manager = require('./ws_manager.js');
26 const ws_workflowrun = require('./ws_workflowrun.js');
27
28 let allClients = {}; // has publishers and subscribers
29 let probeClients = {}; // a subset of clients
30 let workflowManagerClients = {}; // a subset of clients
31 let workflowRunClients = {}; // a subset of clients
32
33 //let io;
34
35 // key: workflow id
36 // value: Workflow instance
37 let workflows = {};
38
39 // key: workflow run id
40 // value: WorkflowRun instance
41 let workflowRuns = {};
42
43 let serviceEvents = {
44 GREETING: 'cord.workflow.ctlsvc.greeting'
45 };
46
47 // add ws_mgroperation events
48 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
49 serviceEvents[key] = wsServiceEvent;
50 });
51
52 // add ws_runoperation events
53 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
54 serviceEvents[key] = wsServiceEvent;
55 });
56
57 //const setIO = (ioInstance) => {
58 // io = ioInstance;
59 //};
60
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070061 const checkObject = (obj) => {
62 return Object.prototype.toString.call(obj) === '[object Object]';
63 };
64
Illyoung Choi59820ed2019-06-24 17:01:00 -070065 const destroy = () => {
66 removeClients();
67 clearWorkflowRuns();
68 clearWorkflows();
69 };
70
71 const listWorkflows = () => {
72 let workflowList = [];
73 _.forOwn(workflows, (_workflow, workflowId) => {
74 workflowList.push(workflowId);
75 });
76 return workflowList;
77 };
78
79 const checkWorkflow = (workflowId) => {
80 if(workflowId in workflows) {
81 return true;
82 }
83 return false;
84 };
85
86 const addWorkflow = (workflow) => {
87 if(workflow.getId() in workflows) {
88 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
89 return false;
90 }
91
92 let workflowId = workflow.getId();
93 workflows[workflowId] = workflow;
94 return true;
95 };
96
Illyoung Choib4fc0d82019-07-16 10:29:39 -070097 const getWorkflow = (workflowId) => {
98 if(workflowId in workflows) {
99 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
100 return null;
101 }
102
103 return workflows[workflowId];
104 };
105
Illyoung Choi59820ed2019-06-24 17:01:00 -0700106 const clearWorkflows = () => {
107 _.forOwn(workflows, (_workflow, workflowId) => {
108 delete workflows[workflowId];
109 });
110 };
111
112 const listWorkflowRuns = () => {
113 let workflowRunList = [];
114 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
115 workflowRunList.push(workflowRunId);
116 });
117 return workflowRunList;
118 };
119
120 const checkWorkflowRun = (workflowRunId) => {
121 if(workflowRunId in workflowRuns) {
122 return true;
123 }
124 return false;
125 };
126
127 const addWorkflowRun = (workflowRun) => {
128 let workflowId = workflowRun.getWorkflowId();
129 let workflowRunId = workflowRun.getId();
130
131 if(workflowRunId in workflowRuns) {
132 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
133 return false;
134 }
135
136 if(!(workflowId in workflows)) {
137 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
138 return false;
139 }
140
141 workflowRuns[workflowRunId] = workflowRun;
142 return true;
143 };
144
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700145 const getWorkflowRun = (workflowRunId) => {
146 if(workflowRunId in workflowRuns) {
147 logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
148 return null;
149 }
150
151 return workflowRuns[workflowRunId];
152 };
153
Illyoung Choi59820ed2019-06-24 17:01:00 -0700154 const clearWorkflowRuns = () => {
155 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
156 delete workflowRuns[workflowRunId];
157 });
158 };
159
160 const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
161 if(!(workflowRunId in workflowRuns)) {
162 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
163 return false;
164 }
165
166 let workflowRun = workflowRuns[workflowRunId];
167 workflowRun.updateTaskStatus(taskId, status);
168 return true;
169 };
170
171 const setWorkflowRunKickstarted = (workflowRunId) => {
172 if(!(workflowRunId in workflowRuns)) {
173 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
174 return false;
175 }
176
177 let workflowRun = workflowRuns[workflowRunId];
178 workflowRun.setKickstarted();
179 return true;
180 };
181
Illyoung Choic707c052019-07-18 13:50:49 -0700182 const setWorkflowRunState = (workflowRunId, state) => {
183 if(!(workflowRunId in workflowRuns)) {
184 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
185 return false;
186 }
187
188 if(state in ['success', 'failed', 'end']) {
189 removeWorkflowRun(workflowRunId);
190 }
191 return true;
192 };
193
Illyoung Choi59820ed2019-06-24 17:01:00 -0700194 const kickstart = (workflowId, workflowRunId) => {
195 if(!(workflowId in workflows)) {
196 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
197 return false;
198 }
199
200 if(!(workflowRunId in workflowRuns)) {
201 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
202 return false;
203 }
204
205 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
Illyoung Choic707c052019-07-18 13:50:49 -0700206 return true;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700207 };
208
209 const removeWorkflow = (workflowId) => {
210 if(!(workflowId in workflows)) {
211 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
212 return false;
213 }
214
215 // check if there are workflow runs
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700216 for(let key in workflowRuns) {
217 if (!workflowRuns.hasOwnProperty(key)) {
218 continue;
219 }
220
221 let workflowRun = workflowRuns[key];
Illyoung Choi59820ed2019-06-24 17:01:00 -0700222 if(workflowRun.getWorkflowId() === workflowId) {
223 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
224 return false;
225 }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700226 }
227
228 // we don't use below code becuase it cannot properly stop and return value with 'return'
229 // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
230 // if(workflowRun.getWorkflowId() === workflowId) {
231 // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
232 // return false;
233 // }
234 // });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700235
236 delete workflows[workflowId];
237 return true;
238 };
239
240 const removeWorkflowRun = (workflowRunId) => {
241 if(!(workflowRunId in workflowRuns)) {
242 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
243 return false;
244 }
245
246 let workflowRun = workflowRuns[workflowRunId];
247 delete workflowRuns[workflowRunId];
248
249 workflowRun.setFinished();
250 return true;
251 };
252
253 const sendEvent = (topic, message) => {
254 // list of workflowIds
255 // to check if there are workflow runs for the events
256 let workflowIdsRunning = [];
257
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700258 logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
259
Illyoung Choi59820ed2019-06-24 17:01:00 -0700260 // route event to running instances
261 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
262 let workflowId = workflowRun.getWorkflowId();
263 let workflow = workflows[workflowId];
264
265 // event will be routed to workflow runs that meet following criteria
266 // 1) the workflow is currently interested in the same topic
267 // (already finished tasks are not counted)
268 // 2) the task's key field and value
269 if(workflowRun.isEventAcceptable(workflow, topic, message)) {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700270 //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700271 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
272 workflowRun.enqueueEvent(topic, message);
273
274 if(!workflowIdsRunning.includes(workflowId)) {
275 workflowIdsRunning.push(workflowId);
276 }
277 }
278 });
279
280 // check if the event is a kickstart event
281 _.forOwn(workflows, (workflow, workflowId) => {
282 if(workflow.isKickstartTopic(topic)) {
283 // check if there is a workflow run for the event
284 // kickstart a workflow if there is no workflows runs for the event
285 if(!workflowIdsRunning.includes(workflowId)) {
286 // we need to buffer the event until workflow run is brought up
287 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700288 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
289
Illyoung Choi59820ed2019-06-24 17:01:00 -0700290 let workflowRunId = workflowRun.getId();
291
292 // register for management
293 workflowRuns[workflowRunId] = workflowRun;
294
295 // route event
296 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
297 workflowRun.enqueueEvent(topic, message);
298
299 // KICKSTART!
300 kickstart(workflowId, workflowRunId);
301 }
302 }
303 });
304 };
305
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700306 const countQueuedEvents = (workflowRunId) => {
307 // this counts queued events
308 if(!(workflowRunId in workflowRuns)) {
309 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
310 return null;
311 }
312
313 let workflowRun = workflowRuns[workflowRunId];
314 return workflowRun.lengthEventQueue();
315 };
316
Illyoung Choi59820ed2019-06-24 17:01:00 -0700317 const fetchEvent = (workflowRunId, taskId, topic) => {
318 // this returns an event or an empty obj when there is no message
319 if(!(workflowRunId in workflowRuns)) {
320 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
321 return null;
322 }
323
324 let workflowRun = workflowRuns[workflowRunId];
325 let workflowId = workflowRun.getWorkflowId();
326
327 if(!(workflowId in workflows)) {
328 logger.log('warn', `workflow ${workflowId} does not exist`);
329 return null;
330 }
331
332 let workflow = workflows[workflowId];
333
334 let task = workflow.getTask(taskId);
335 if(!task) {
336 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
337 return null;
338 }
339
340 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
341
342 let event = workflowRun.dequeueEvent(topic);
343 if(event) {
344 return event;
345 }
346 else {
347 return {};
348 }
349 };
350
351 const addClient = (c) => {
352 let clientId = c.getId();
353 let socket = c.getSocket();
354
355 // check id that client is already there
356 if(clientId in allClients) {
357 logger.log('warn', `there exists a client with the same id - ${clientId}`);
358 return false;
359 }
360
361 if(c.getType() === Client.Type.PROBE) {
362 // probe' messages are relayed
363 // relay messages based on topic
364 // probe protocol:
365 // REQ:
366 // topic: event topic
367 // message: <data>
368 // RES:
369 // topic: topic sent
370 // message: {result: <true/false>, message: <error message> }
371 allClients[clientId] = c;
372 probeClients[clientId] = c;
373
374 socket.on('*', (msg) => {
375 let jsonMsg = msg.data;
376 if(jsonMsg.length === 2) {
377 // must have two parts
378 // first part is topic
379 // second part is message body
380 let topic = jsonMsg[0];
381 let messageBody = jsonMsg[1];
382
383 sendEvent(topic, messageBody);
384
385 // return true for success
386 socket.emit(topic, {
387 result: true
388 });
389 }
390 else {
391 logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
392 socket.emit(jsonMsg[0], {
393 result: false,
394 message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
395 });
396 }
397 });
398 return true;
399 }
400 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
401 // manager
402 // manager protocol:
403 // REQ:
404 // topic: operation
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700405 // message: {
406 // req_id: <req_id>,
407 // <data>...
408 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700409 // RES:
410 // topic: topic sent
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700411 // message: {
412 // req_id: <req_id>,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700413 // error: <true/false>,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700414 // result: <true/false>,
415 // message: <error message>
416 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700417 allClients[clientId] = c;
418 workflowManagerClients[clientId] = c;
419
420 // attach manager operations
421 let router = ws_manager.getRouter();
422 _.forOwn(router, (routerElem, _key) => {
423 socket.on(routerElem.topic, (msg) => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700424 // handle a common parameter - req_id
425 // when we get req_id, return the same req_id in response.
426 // this is to help identify a request from a response at client-side
427 let req_id = 101010; // default number, signiture
428 if(msg && checkObject(msg)) {
429 if('req_id' in msg) {
430 req_id = msg.req_id;
431 }
432 }
433
434 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700435 if(err) {
436 logger.log('warn', `unable to handle a message - ${err}`);
437 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700438 req_id: req_id,
439 error: true,
440 result: result,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700441 message: err
442 });
443 return;
444 }
445
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700446 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700447 if(routerElem.return === undefined || routerElem.return) {
448 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700449 req_id: req_id,
450 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700451 result: result
452 });
453 }
454 });
455 });
456 });
457 return true;
458 }
459 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
460 // workflow run
461 // workflow run protocol:
462 // REQ:
463 // topic: operation
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700464 // message: {
465 // req_id: <req_id>,
466 // <data>...
467 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700468 // RES:
469 // topic: topic sent
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700470 // message: {
471 // req_id: <req_id>,
472 // error: <true/false>,
473 // result: <true/false>,
474 // message: <error message>
475 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700476
477 // map to WorkflowRun instance
478 let workflowId = c.getWorkflowId();
479 let workflowRunId = c.getWorkflowRunId();
480 let workflowRun;
481
482 if(!(workflowId in workflows)) {
483 logger.log('warn', `cannot find a workflow ${workflowId}`);
484 return false;
485 }
486
487 // register client to workflow run
488 if(!(workflowRunId in workflowRuns)) {
489 // workflow run not exist yet
490 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
491 return false;
492 }
493
494 //let workflow = workflows[workflowId];
495
496 allClients[clientId] = c;
497 workflowRunClients[clientId] = c;
498
499 // update
500 workflowRun = workflowRuns[workflowRunId];
501 workflowRun.addClientId(clientId);
502
503 // attach workflow run operations
504 let router = ws_workflowrun.getRouter();
505 _.forOwn(router, (routerElem, _key) => {
506 socket.on(routerElem.topic, (msg) => {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700507 // handle a common parameter - req_id
508 // when we get req_id, return the same req_id in response.
509 // this is to help identify a request from a response at client-side
510 let req_id = 101010; // default number, signiture
511 if(msg && checkObject(msg)) {
512 if('req_id' in msg) {
513 req_id = msg.req_id;
514 }
515 }
516
517 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700518 if(err) {
519 logger.log('warn', `unable to handle a message - ${err}`);
520 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700521 req_id: req_id,
522 error: true,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700523 result: false,
524 message: err
525 });
526 return;
527 }
528
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700529 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700530 if(routerElem.return === undefined || routerElem.return) {
531 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700532 req_id: req_id,
533 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700534 result: result
535 });
536 }
537 });
538 });
539 });
540 return true;
541 }
542 return false;
543 };
544
545 const removeClient = (id) => {
546 if(id in allClients) {
547 let removedClient = allClients[id];
548 delete allClients[id];
549
550 let type = removedClient.getType();
551 if(type === Client.Type.PROBE) {
552 delete probeClients[id];
553 }
554 else if(type === Client.Type.WORKFLOW_MANAGER) {
555 delete workflowManagerClients[id];
556 }
557 else if(type === Client.Type.WORKFLOW_RUN) {
558 delete workflowRunClients[id];
559
560 let workflowRunId = removedClient.getWorkflowRunId();
561 let workflowRun = workflowRuns[workflowRunId];
562
563 if(workflowRun) {
564 workflowRun.removeClientId(id);
565
566 //TODO
567 // WorkflowRun can have no clients between tasks
568 // So we should not remove the run until the workflow run finishes
569 }
570 }
571 }
572 };
573
574 const removeClients = () => {
575 let probeClients = {};
576
577 _.forOwn(probeClients, (_probeClient, clientId) => {
578 delete probeClients[clientId];
579 });
580
581 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
582 delete workflowManagerClients[clientId];
583 });
584
585 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
586 delete workflowRunClients[clientId];
587 });
588
589 _.forOwn(allClients, (client, clientId) => {
590 client.getSocket().disconnect(true);
591 delete allClients[clientId];
592 });
593 }
594
595 module.exports = {
596 serviceEvents: serviceEvents,
597 destroy: destroy,
598 getClients: () => { return allClients; },
599 getProbeClients: () => { return probeClients; },
600 getWorkflowManagerClients: () => { return workflowManagerClients; },
601 getWorkflowRunClients: () => { return workflowRunClients; },
602 clientType: Client.Type,
603 //setIO: setIO,
604 sendEvent: sendEvent,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700605 countQueuedEvents: countQueuedEvents,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700606 fetchEvent: fetchEvent,
607 addClient: addClient,
608 removeClient: removeClient,
609 removeClients: removeClients,
610 addWorkflow: addWorkflow,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700611 getWorkflow: getWorkflow,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700612 listWorkflows: listWorkflows,
613 checkWorkflow: checkWorkflow,
614 removeWorkflow: removeWorkflow,
615 clearWorkflows: clearWorkflows,
616 addWorkflowRun: addWorkflowRun,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700617 getWorkflowRun: getWorkflowRun,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700618 listWorkflowRuns: listWorkflowRuns,
619 checkWorkflowRun: checkWorkflowRun,
620 removeWorkflowRun: removeWorkflowRun,
621 clearWorkflowRuns: clearWorkflowRuns,
622 updateWorkflowRunStatus: updateWorkflowRunStatus,
623 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
Illyoung Choic707c052019-07-18 13:50:49 -0700624 setWorkflowRunState: setWorkflowRunState
Illyoung Choi59820ed2019-06-24 17:01:00 -0700625 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700626})();