blob: 975db2e58ad946e5612d45bf08d378e9ade103a1 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
25 const ws_manager = require('./ws_manager.js');
26 const ws_workflowrun = require('./ws_workflowrun.js');
27
28 let allClients = {}; // has publishers and subscribers
29 let probeClients = {}; // a subset of clients
30 let workflowManagerClients = {}; // a subset of clients
31 let workflowRunClients = {}; // a subset of clients
32
33 //let io;
34
35 // key: workflow id
36 // value: Workflow instance
37 let workflows = {};
38
39 // key: workflow run id
40 // value: WorkflowRun instance
41 let workflowRuns = {};
42
43 let serviceEvents = {
44 GREETING: 'cord.workflow.ctlsvc.greeting'
45 };
46
47 // add ws_mgroperation events
48 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
49 serviceEvents[key] = wsServiceEvent;
50 });
51
52 // add ws_runoperation events
53 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
54 serviceEvents[key] = wsServiceEvent;
55 });
56
57 //const setIO = (ioInstance) => {
58 // io = ioInstance;
59 //};
60
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070061 const checkObject = (obj) => {
62 return Object.prototype.toString.call(obj) === '[object Object]';
63 };
64
Illyoung Choi59820ed2019-06-24 17:01:00 -070065 const destroy = () => {
66 removeClients();
67 clearWorkflowRuns();
68 clearWorkflows();
69 };
70
71 const listWorkflows = () => {
72 let workflowList = [];
73 _.forOwn(workflows, (_workflow, workflowId) => {
74 workflowList.push(workflowId);
75 });
76 return workflowList;
77 };
78
79 const checkWorkflow = (workflowId) => {
80 if(workflowId in workflows) {
81 return true;
82 }
83 return false;
84 };
85
86 const addWorkflow = (workflow) => {
87 if(workflow.getId() in workflows) {
88 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
89 return false;
90 }
91
92 let workflowId = workflow.getId();
93 workflows[workflowId] = workflow;
94 return true;
95 };
96
Illyoung Choib4fc0d82019-07-16 10:29:39 -070097 const getWorkflow = (workflowId) => {
98 if(workflowId in workflows) {
99 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
100 return null;
101 }
102
103 return workflows[workflowId];
104 };
105
Illyoung Choi59820ed2019-06-24 17:01:00 -0700106 const clearWorkflows = () => {
107 _.forOwn(workflows, (_workflow, workflowId) => {
108 delete workflows[workflowId];
109 });
110 };
111
112 const listWorkflowRuns = () => {
113 let workflowRunList = [];
114 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
115 workflowRunList.push(workflowRunId);
116 });
117 return workflowRunList;
118 };
119
120 const checkWorkflowRun = (workflowRunId) => {
121 if(workflowRunId in workflowRuns) {
122 return true;
123 }
124 return false;
125 };
126
127 const addWorkflowRun = (workflowRun) => {
128 let workflowId = workflowRun.getWorkflowId();
129 let workflowRunId = workflowRun.getId();
130
131 if(workflowRunId in workflowRuns) {
132 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
133 return false;
134 }
135
136 if(!(workflowId in workflows)) {
137 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
138 return false;
139 }
140
141 workflowRuns[workflowRunId] = workflowRun;
142 return true;
143 };
144
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700145 const getWorkflowRun = (workflowRunId) => {
146 if(workflowRunId in workflowRuns) {
147 logger.log('warn', `cannot find a workflow run with id - ${workflowRunId}`);
148 return null;
149 }
150
151 return workflowRuns[workflowRunId];
152 };
153
Illyoung Choi59820ed2019-06-24 17:01:00 -0700154 const clearWorkflowRuns = () => {
155 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
156 delete workflowRuns[workflowRunId];
157 });
158 };
159
160 const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
161 if(!(workflowRunId in workflowRuns)) {
162 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
163 return false;
164 }
165
166 let workflowRun = workflowRuns[workflowRunId];
167 workflowRun.updateTaskStatus(taskId, status);
168 return true;
169 };
170
171 const setWorkflowRunKickstarted = (workflowRunId) => {
172 if(!(workflowRunId in workflowRuns)) {
173 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
174 return false;
175 }
176
177 let workflowRun = workflowRuns[workflowRunId];
178 workflowRun.setKickstarted();
179 return true;
180 };
181
182 const kickstart = (workflowId, workflowRunId) => {
183 if(!(workflowId in workflows)) {
184 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
185 return false;
186 }
187
188 if(!(workflowRunId in workflowRuns)) {
189 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
190 return false;
191 }
192
193 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
194 };
195
196 const removeWorkflow = (workflowId) => {
197 if(!(workflowId in workflows)) {
198 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
199 return false;
200 }
201
202 // check if there are workflow runs
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700203 for(let key in workflowRuns) {
204 if (!workflowRuns.hasOwnProperty(key)) {
205 continue;
206 }
207
208 let workflowRun = workflowRuns[key];
Illyoung Choi59820ed2019-06-24 17:01:00 -0700209 if(workflowRun.getWorkflowId() === workflowId) {
210 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
211 return false;
212 }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700213 }
214
215 // we don't use below code becuase it cannot properly stop and return value with 'return'
216 // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
217 // if(workflowRun.getWorkflowId() === workflowId) {
218 // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
219 // return false;
220 // }
221 // });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700222
223 delete workflows[workflowId];
224 return true;
225 };
226
227 const removeWorkflowRun = (workflowRunId) => {
228 if(!(workflowRunId in workflowRuns)) {
229 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
230 return false;
231 }
232
233 let workflowRun = workflowRuns[workflowRunId];
234 delete workflowRuns[workflowRunId];
235
236 workflowRun.setFinished();
237 return true;
238 };
239
240 const sendEvent = (topic, message) => {
241 // list of workflowIds
242 // to check if there are workflow runs for the events
243 let workflowIdsRunning = [];
244
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700245 logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
246
Illyoung Choi59820ed2019-06-24 17:01:00 -0700247 // route event to running instances
248 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
249 let workflowId = workflowRun.getWorkflowId();
250 let workflow = workflows[workflowId];
251
252 // event will be routed to workflow runs that meet following criteria
253 // 1) the workflow is currently interested in the same topic
254 // (already finished tasks are not counted)
255 // 2) the task's key field and value
256 if(workflowRun.isEventAcceptable(workflow, topic, message)) {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700257 //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700258 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
259 workflowRun.enqueueEvent(topic, message);
260
261 if(!workflowIdsRunning.includes(workflowId)) {
262 workflowIdsRunning.push(workflowId);
263 }
264 }
265 });
266
267 // check if the event is a kickstart event
268 _.forOwn(workflows, (workflow, workflowId) => {
269 if(workflow.isKickstartTopic(topic)) {
270 // check if there is a workflow run for the event
271 // kickstart a workflow if there is no workflows runs for the event
272 if(!workflowIdsRunning.includes(workflowId)) {
273 // we need to buffer the event until workflow run is brought up
274 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700275 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
276
Illyoung Choi59820ed2019-06-24 17:01:00 -0700277 let workflowRunId = workflowRun.getId();
278
279 // register for management
280 workflowRuns[workflowRunId] = workflowRun;
281
282 // route event
283 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
284 workflowRun.enqueueEvent(topic, message);
285
286 // KICKSTART!
287 kickstart(workflowId, workflowRunId);
288 }
289 }
290 });
291 };
292
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700293 const countQueuedEvents = (workflowRunId) => {
294 // this counts queued events
295 if(!(workflowRunId in workflowRuns)) {
296 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
297 return null;
298 }
299
300 let workflowRun = workflowRuns[workflowRunId];
301 return workflowRun.lengthEventQueue();
302 };
303
Illyoung Choi59820ed2019-06-24 17:01:00 -0700304 const fetchEvent = (workflowRunId, taskId, topic) => {
305 // this returns an event or an empty obj when there is no message
306 if(!(workflowRunId in workflowRuns)) {
307 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
308 return null;
309 }
310
311 let workflowRun = workflowRuns[workflowRunId];
312 let workflowId = workflowRun.getWorkflowId();
313
314 if(!(workflowId in workflows)) {
315 logger.log('warn', `workflow ${workflowId} does not exist`);
316 return null;
317 }
318
319 let workflow = workflows[workflowId];
320
321 let task = workflow.getTask(taskId);
322 if(!task) {
323 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
324 return null;
325 }
326
327 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
328
329 let event = workflowRun.dequeueEvent(topic);
330 if(event) {
331 return event;
332 }
333 else {
334 return {};
335 }
336 };
337
338 const addClient = (c) => {
339 let clientId = c.getId();
340 let socket = c.getSocket();
341
342 // check id that client is already there
343 if(clientId in allClients) {
344 logger.log('warn', `there exists a client with the same id - ${clientId}`);
345 return false;
346 }
347
348 if(c.getType() === Client.Type.PROBE) {
349 // probe' messages are relayed
350 // relay messages based on topic
351 // probe protocol:
352 // REQ:
353 // topic: event topic
354 // message: <data>
355 // RES:
356 // topic: topic sent
357 // message: {result: <true/false>, message: <error message> }
358 allClients[clientId] = c;
359 probeClients[clientId] = c;
360
361 socket.on('*', (msg) => {
362 let jsonMsg = msg.data;
363 if(jsonMsg.length === 2) {
364 // must have two parts
365 // first part is topic
366 // second part is message body
367 let topic = jsonMsg[0];
368 let messageBody = jsonMsg[1];
369
370 sendEvent(topic, messageBody);
371
372 // return true for success
373 socket.emit(topic, {
374 result: true
375 });
376 }
377 else {
378 logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
379 socket.emit(jsonMsg[0], {
380 result: false,
381 message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
382 });
383 }
384 });
385 return true;
386 }
387 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
388 // manager
389 // manager protocol:
390 // REQ:
391 // topic: operation
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700392 // message: {
393 // req_id: <req_id>,
394 // <data>...
395 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700396 // RES:
397 // topic: topic sent
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700398 // message: {
399 // req_id: <req_id>,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700400 // error: <true/false>,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700401 // result: <true/false>,
402 // message: <error message>
403 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700404 allClients[clientId] = c;
405 workflowManagerClients[clientId] = c;
406
407 // attach manager operations
408 let router = ws_manager.getRouter();
409 _.forOwn(router, (routerElem, _key) => {
410 socket.on(routerElem.topic, (msg) => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700411 // handle a common parameter - req_id
412 // when we get req_id, return the same req_id in response.
413 // this is to help identify a request from a response at client-side
414 let req_id = 101010; // default number, signiture
415 if(msg && checkObject(msg)) {
416 if('req_id' in msg) {
417 req_id = msg.req_id;
418 }
419 }
420
421 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700422 if(err) {
423 logger.log('warn', `unable to handle a message - ${err}`);
424 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700425 req_id: req_id,
426 error: true,
427 result: result,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700428 message: err
429 });
430 return;
431 }
432
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700433 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700434 if(routerElem.return === undefined || routerElem.return) {
435 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700436 req_id: req_id,
437 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700438 result: result
439 });
440 }
441 });
442 });
443 });
444 return true;
445 }
446 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
447 // workflow run
448 // workflow run protocol:
449 // REQ:
450 // topic: operation
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700451 // message: {
452 // req_id: <req_id>,
453 // <data>...
454 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700455 // RES:
456 // topic: topic sent
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700457 // message: {
458 // req_id: <req_id>,
459 // error: <true/false>,
460 // result: <true/false>,
461 // message: <error message>
462 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700463
464 // map to WorkflowRun instance
465 let workflowId = c.getWorkflowId();
466 let workflowRunId = c.getWorkflowRunId();
467 let workflowRun;
468
469 if(!(workflowId in workflows)) {
470 logger.log('warn', `cannot find a workflow ${workflowId}`);
471 return false;
472 }
473
474 // register client to workflow run
475 if(!(workflowRunId in workflowRuns)) {
476 // workflow run not exist yet
477 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
478 return false;
479 }
480
481 //let workflow = workflows[workflowId];
482
483 allClients[clientId] = c;
484 workflowRunClients[clientId] = c;
485
486 // update
487 workflowRun = workflowRuns[workflowRunId];
488 workflowRun.addClientId(clientId);
489
490 // attach workflow run operations
491 let router = ws_workflowrun.getRouter();
492 _.forOwn(router, (routerElem, _key) => {
493 socket.on(routerElem.topic, (msg) => {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700494 // handle a common parameter - req_id
495 // when we get req_id, return the same req_id in response.
496 // this is to help identify a request from a response at client-side
497 let req_id = 101010; // default number, signiture
498 if(msg && checkObject(msg)) {
499 if('req_id' in msg) {
500 req_id = msg.req_id;
501 }
502 }
503
504 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700505 if(err) {
506 logger.log('warn', `unable to handle a message - ${err}`);
507 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700508 req_id: req_id,
509 error: true,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700510 result: false,
511 message: err
512 });
513 return;
514 }
515
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700516 // we return result
Illyoung Choi59820ed2019-06-24 17:01:00 -0700517 if(routerElem.return === undefined || routerElem.return) {
518 socket.emit(routerElem.topic, {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700519 req_id: req_id,
520 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700521 result: result
522 });
523 }
524 });
525 });
526 });
527 return true;
528 }
529 return false;
530 };
531
532 const removeClient = (id) => {
533 if(id in allClients) {
534 let removedClient = allClients[id];
535 delete allClients[id];
536
537 let type = removedClient.getType();
538 if(type === Client.Type.PROBE) {
539 delete probeClients[id];
540 }
541 else if(type === Client.Type.WORKFLOW_MANAGER) {
542 delete workflowManagerClients[id];
543 }
544 else if(type === Client.Type.WORKFLOW_RUN) {
545 delete workflowRunClients[id];
546
547 let workflowRunId = removedClient.getWorkflowRunId();
548 let workflowRun = workflowRuns[workflowRunId];
549
550 if(workflowRun) {
551 workflowRun.removeClientId(id);
552
553 //TODO
554 // WorkflowRun can have no clients between tasks
555 // So we should not remove the run until the workflow run finishes
556 }
557 }
558 }
559 };
560
561 const removeClients = () => {
562 let probeClients = {};
563
564 _.forOwn(probeClients, (_probeClient, clientId) => {
565 delete probeClients[clientId];
566 });
567
568 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
569 delete workflowManagerClients[clientId];
570 });
571
572 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
573 delete workflowRunClients[clientId];
574 });
575
576 _.forOwn(allClients, (client, clientId) => {
577 client.getSocket().disconnect(true);
578 delete allClients[clientId];
579 });
580 }
581
582 module.exports = {
583 serviceEvents: serviceEvents,
584 destroy: destroy,
585 getClients: () => { return allClients; },
586 getProbeClients: () => { return probeClients; },
587 getWorkflowManagerClients: () => { return workflowManagerClients; },
588 getWorkflowRunClients: () => { return workflowRunClients; },
589 clientType: Client.Type,
590 //setIO: setIO,
591 sendEvent: sendEvent,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700592 countQueuedEvents: countQueuedEvents,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700593 fetchEvent: fetchEvent,
594 addClient: addClient,
595 removeClient: removeClient,
596 removeClients: removeClients,
597 addWorkflow: addWorkflow,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700598 getWorkflow: getWorkflow,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700599 listWorkflows: listWorkflows,
600 checkWorkflow: checkWorkflow,
601 removeWorkflow: removeWorkflow,
602 clearWorkflows: clearWorkflows,
603 addWorkflowRun: addWorkflowRun,
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700604 getWorkflowRun: getWorkflowRun,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700605 listWorkflowRuns: listWorkflowRuns,
606 checkWorkflowRun: checkWorkflowRun,
607 removeWorkflowRun: removeWorkflowRun,
608 clearWorkflowRuns: clearWorkflowRuns,
609 updateWorkflowRunStatus: updateWorkflowRunStatus,
610 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
611 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700612})();