blob: 9a54181cf4055edc1dc80068362c0b207f7be529 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
25 const ws_manager = require('./ws_manager.js');
26 const ws_workflowrun = require('./ws_workflowrun.js');
27
28 let allClients = {}; // has publishers and subscribers
29 let probeClients = {}; // a subset of clients
30 let workflowManagerClients = {}; // a subset of clients
31 let workflowRunClients = {}; // a subset of clients
32
33 //let io;
34
35 // key: workflow id
36 // value: Workflow instance
37 let workflows = {};
38
39 // key: workflow run id
40 // value: WorkflowRun instance
41 let workflowRuns = {};
42
43 let serviceEvents = {
44 GREETING: 'cord.workflow.ctlsvc.greeting'
45 };
46
47 // add ws_mgroperation events
48 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
49 serviceEvents[key] = wsServiceEvent;
50 });
51
52 // add ws_runoperation events
53 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
54 serviceEvents[key] = wsServiceEvent;
55 });
56
57 //const setIO = (ioInstance) => {
58 // io = ioInstance;
59 //};
60
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070061 const checkObject = (obj) => {
62 return Object.prototype.toString.call(obj) === '[object Object]';
63 };
64
Illyoung Choi59820ed2019-06-24 17:01:00 -070065 const destroy = () => {
66 removeClients();
67 clearWorkflowRuns();
68 clearWorkflows();
69 };
70
71 const listWorkflows = () => {
72 let workflowList = [];
73 _.forOwn(workflows, (_workflow, workflowId) => {
74 workflowList.push(workflowId);
75 });
76 return workflowList;
77 };
78
79 const checkWorkflow = (workflowId) => {
80 if(workflowId in workflows) {
81 return true;
82 }
83 return false;
84 };
85
86 const addWorkflow = (workflow) => {
87 if(workflow.getId() in workflows) {
88 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
89 return false;
90 }
91
92 let workflowId = workflow.getId();
93 workflows[workflowId] = workflow;
94 return true;
95 };
96
97 const clearWorkflows = () => {
98 _.forOwn(workflows, (_workflow, workflowId) => {
99 delete workflows[workflowId];
100 });
101 };
102
103 const listWorkflowRuns = () => {
104 let workflowRunList = [];
105 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
106 workflowRunList.push(workflowRunId);
107 });
108 return workflowRunList;
109 };
110
111 const checkWorkflowRun = (workflowRunId) => {
112 if(workflowRunId in workflowRuns) {
113 return true;
114 }
115 return false;
116 };
117
118 const addWorkflowRun = (workflowRun) => {
119 let workflowId = workflowRun.getWorkflowId();
120 let workflowRunId = workflowRun.getId();
121
122 if(workflowRunId in workflowRuns) {
123 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
124 return false;
125 }
126
127 if(!(workflowId in workflows)) {
128 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
129 return false;
130 }
131
132 workflowRuns[workflowRunId] = workflowRun;
133 return true;
134 };
135
136 const clearWorkflowRuns = () => {
137 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
138 delete workflowRuns[workflowRunId];
139 });
140 };
141
142 const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
143 if(!(workflowRunId in workflowRuns)) {
144 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
145 return false;
146 }
147
148 let workflowRun = workflowRuns[workflowRunId];
149 workflowRun.updateTaskStatus(taskId, status);
150 return true;
151 };
152
153 const setWorkflowRunKickstarted = (workflowRunId) => {
154 if(!(workflowRunId in workflowRuns)) {
155 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
156 return false;
157 }
158
159 let workflowRun = workflowRuns[workflowRunId];
160 workflowRun.setKickstarted();
161 return true;
162 };
163
164 const kickstart = (workflowId, workflowRunId) => {
165 if(!(workflowId in workflows)) {
166 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
167 return false;
168 }
169
170 if(!(workflowRunId in workflowRuns)) {
171 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
172 return false;
173 }
174
175 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
176 };
177
178 const removeWorkflow = (workflowId) => {
179 if(!(workflowId in workflows)) {
180 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
181 return false;
182 }
183
184 // check if there are workflow runs
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700185 for(let key in workflowRuns) {
186 if (!workflowRuns.hasOwnProperty(key)) {
187 continue;
188 }
189
190 let workflowRun = workflowRuns[key];
Illyoung Choi59820ed2019-06-24 17:01:00 -0700191 if(workflowRun.getWorkflowId() === workflowId) {
192 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
193 return false;
194 }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700195 }
196
197 // we don't use below code becuase it cannot properly stop and return value with 'return'
198 // _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
199 // if(workflowRun.getWorkflowId() === workflowId) {
200 // logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
201 // return false;
202 // }
203 // });
Illyoung Choi59820ed2019-06-24 17:01:00 -0700204
205 delete workflows[workflowId];
206 return true;
207 };
208
209 const removeWorkflowRun = (workflowRunId) => {
210 if(!(workflowRunId in workflowRuns)) {
211 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
212 return false;
213 }
214
215 let workflowRun = workflowRuns[workflowRunId];
216 delete workflowRuns[workflowRunId];
217
218 workflowRun.setFinished();
219 return true;
220 };
221
222 const sendEvent = (topic, message) => {
223 // list of workflowIds
224 // to check if there are workflow runs for the events
225 let workflowIdsRunning = [];
226
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700227 logger.log('debug', `event is raised : topic ${topic}, message ${JSON.stringify(message)}`);
228
Illyoung Choi59820ed2019-06-24 17:01:00 -0700229 // route event to running instances
230 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
231 let workflowId = workflowRun.getWorkflowId();
232 let workflow = workflows[workflowId];
233
234 // event will be routed to workflow runs that meet following criteria
235 // 1) the workflow is currently interested in the same topic
236 // (already finished tasks are not counted)
237 // 2) the task's key field and value
238 if(workflowRun.isEventAcceptable(workflow, topic, message)) {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700239 //console.log(`event ${topic} is routed to workflow run ${workflowRunId}`);
Illyoung Choi59820ed2019-06-24 17:01:00 -0700240 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
241 workflowRun.enqueueEvent(topic, message);
242
243 if(!workflowIdsRunning.includes(workflowId)) {
244 workflowIdsRunning.push(workflowId);
245 }
246 }
247 });
248
249 // check if the event is a kickstart event
250 _.forOwn(workflows, (workflow, workflowId) => {
251 if(workflow.isKickstartTopic(topic)) {
252 // check if there is a workflow run for the event
253 // kickstart a workflow if there is no workflows runs for the event
254 if(!workflowIdsRunning.includes(workflowId)) {
255 // we need to buffer the event until workflow run is brought up
256 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700257 workflowRun.updateEventKeyFieldValueFromMessage(topic, message);
258
Illyoung Choi59820ed2019-06-24 17:01:00 -0700259 let workflowRunId = workflowRun.getId();
260
261 // register for management
262 workflowRuns[workflowRunId] = workflowRun;
263
264 // route event
265 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
266 workflowRun.enqueueEvent(topic, message);
267
268 // KICKSTART!
269 kickstart(workflowId, workflowRunId);
270 }
271 }
272 });
273 };
274
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700275 const countQueuedEvents = (workflowRunId) => {
276 // this counts queued events
277 if(!(workflowRunId in workflowRuns)) {
278 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
279 return null;
280 }
281
282 let workflowRun = workflowRuns[workflowRunId];
283 return workflowRun.lengthEventQueue();
284 };
285
Illyoung Choi59820ed2019-06-24 17:01:00 -0700286 const fetchEvent = (workflowRunId, taskId, topic) => {
287 // this returns an event or an empty obj when there is no message
288 if(!(workflowRunId in workflowRuns)) {
289 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
290 return null;
291 }
292
293 let workflowRun = workflowRuns[workflowRunId];
294 let workflowId = workflowRun.getWorkflowId();
295
296 if(!(workflowId in workflows)) {
297 logger.log('warn', `workflow ${workflowId} does not exist`);
298 return null;
299 }
300
301 let workflow = workflows[workflowId];
302
303 let task = workflow.getTask(taskId);
304 if(!task) {
305 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
306 return null;
307 }
308
309 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
310
311 let event = workflowRun.dequeueEvent(topic);
312 if(event) {
313 return event;
314 }
315 else {
316 return {};
317 }
318 };
319
320 const addClient = (c) => {
321 let clientId = c.getId();
322 let socket = c.getSocket();
323
324 // check id that client is already there
325 if(clientId in allClients) {
326 logger.log('warn', `there exists a client with the same id - ${clientId}`);
327 return false;
328 }
329
330 if(c.getType() === Client.Type.PROBE) {
331 // probe' messages are relayed
332 // relay messages based on topic
333 // probe protocol:
334 // REQ:
335 // topic: event topic
336 // message: <data>
337 // RES:
338 // topic: topic sent
339 // message: {result: <true/false>, message: <error message> }
340 allClients[clientId] = c;
341 probeClients[clientId] = c;
342
343 socket.on('*', (msg) => {
344 let jsonMsg = msg.data;
345 if(jsonMsg.length === 2) {
346 // must have two parts
347 // first part is topic
348 // second part is message body
349 let topic = jsonMsg[0];
350 let messageBody = jsonMsg[1];
351
352 sendEvent(topic, messageBody);
353
354 // return true for success
355 socket.emit(topic, {
356 result: true
357 });
358 }
359 else {
360 logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
361 socket.emit(jsonMsg[0], {
362 result: false,
363 message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
364 });
365 }
366 });
367 return true;
368 }
369 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
370 // manager
371 // manager protocol:
372 // REQ:
373 // topic: operation
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700374 // message: {
375 // req_id: <req_id>,
376 // <data>...
377 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700378 // RES:
379 // topic: topic sent
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700380 // message: {
381 // req_id: <req_id>,
382 // result: <true/false>,
383 // message: <error message>
384 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700385 allClients[clientId] = c;
386 workflowManagerClients[clientId] = c;
387
388 // attach manager operations
389 let router = ws_manager.getRouter();
390 _.forOwn(router, (routerElem, _key) => {
391 socket.on(routerElem.topic, (msg) => {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700392 // handle a common parameter - req_id
393 // when we get req_id, return the same req_id in response.
394 // this is to help identify a request from a response at client-side
395 let req_id = 101010; // default number, signiture
396 if(msg && checkObject(msg)) {
397 if('req_id' in msg) {
398 req_id = msg.req_id;
399 }
400 }
401
402 routerElem.handler(routerElem.topic, msg || {}, (err, result) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700403 if(err) {
404 logger.log('warn', `unable to handle a message - ${err}`);
405 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700406 req_id: req_id,
407 error: true,
408 result: result,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700409 message: err
410 });
411 return;
412 }
413
414 if(routerElem.return === undefined || routerElem.return) {
415 socket.emit(routerElem.topic, {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700416 req_id: req_id,
417 error: false,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700418 result: result
419 });
420 }
421 });
422 });
423 });
424 return true;
425 }
426 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
427 // workflow run
428 // workflow run protocol:
429 // REQ:
430 // topic: operation
431 // message: <data>
432 // RES:
433 // topic: topic sent
434 // message: {result: <true/false>, message: <error message> }
435
436 // map to WorkflowRun instance
437 let workflowId = c.getWorkflowId();
438 let workflowRunId = c.getWorkflowRunId();
439 let workflowRun;
440
441 if(!(workflowId in workflows)) {
442 logger.log('warn', `cannot find a workflow ${workflowId}`);
443 return false;
444 }
445
446 // register client to workflow run
447 if(!(workflowRunId in workflowRuns)) {
448 // workflow run not exist yet
449 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
450 return false;
451 }
452
453 //let workflow = workflows[workflowId];
454
455 allClients[clientId] = c;
456 workflowRunClients[clientId] = c;
457
458 // update
459 workflowRun = workflowRuns[workflowRunId];
460 workflowRun.addClientId(clientId);
461
462 // attach workflow run operations
463 let router = ws_workflowrun.getRouter();
464 _.forOwn(router, (routerElem, _key) => {
465 socket.on(routerElem.topic, (msg) => {
466 routerElem.handler(routerElem.topic, msg, (err, result) => {
467 if(err) {
468 logger.log('warn', `unable to handle a message - ${err}`);
469 socket.emit(routerElem.topic, {
470 result: false,
471 message: err
472 });
473 return;
474 }
475
476 if(routerElem.return === undefined || routerElem.return) {
477 socket.emit(routerElem.topic, {
478 result: result
479 });
480 }
481 });
482 });
483 });
484 return true;
485 }
486 return false;
487 };
488
489 const removeClient = (id) => {
490 if(id in allClients) {
491 let removedClient = allClients[id];
492 delete allClients[id];
493
494 let type = removedClient.getType();
495 if(type === Client.Type.PROBE) {
496 delete probeClients[id];
497 }
498 else if(type === Client.Type.WORKFLOW_MANAGER) {
499 delete workflowManagerClients[id];
500 }
501 else if(type === Client.Type.WORKFLOW_RUN) {
502 delete workflowRunClients[id];
503
504 let workflowRunId = removedClient.getWorkflowRunId();
505 let workflowRun = workflowRuns[workflowRunId];
506
507 if(workflowRun) {
508 workflowRun.removeClientId(id);
509
510 //TODO
511 // WorkflowRun can have no clients between tasks
512 // So we should not remove the run until the workflow run finishes
513 }
514 }
515 }
516 };
517
518 const removeClients = () => {
519 let probeClients = {};
520
521 _.forOwn(probeClients, (_probeClient, clientId) => {
522 delete probeClients[clientId];
523 });
524
525 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
526 delete workflowManagerClients[clientId];
527 });
528
529 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
530 delete workflowRunClients[clientId];
531 });
532
533 _.forOwn(allClients, (client, clientId) => {
534 client.getSocket().disconnect(true);
535 delete allClients[clientId];
536 });
537 }
538
539 module.exports = {
540 serviceEvents: serviceEvents,
541 destroy: destroy,
542 getClients: () => { return allClients; },
543 getProbeClients: () => { return probeClients; },
544 getWorkflowManagerClients: () => { return workflowManagerClients; },
545 getWorkflowRunClients: () => { return workflowRunClients; },
546 clientType: Client.Type,
547 //setIO: setIO,
548 sendEvent: sendEvent,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700549 countQueuedEvents: countQueuedEvents,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700550 fetchEvent: fetchEvent,
551 addClient: addClient,
552 removeClient: removeClient,
553 removeClients: removeClients,
554 addWorkflow: addWorkflow,
555 listWorkflows: listWorkflows,
556 checkWorkflow: checkWorkflow,
557 removeWorkflow: removeWorkflow,
558 clearWorkflows: clearWorkflows,
559 addWorkflowRun: addWorkflowRun,
560 listWorkflowRuns: listWorkflowRuns,
561 checkWorkflowRun: checkWorkflowRun,
562 removeWorkflowRun: removeWorkflowRun,
563 clearWorkflowRuns: clearWorkflowRuns,
564 updateWorkflowRunStatus: updateWorkflowRunStatus,
565 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
566 };
567})();