blob: 82d27dbfb6ff2eefa1da497f9c5715603f08e126 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const logger = require('../config/logger.js');
23 const Client = require('../types/client.js');
24 const WorkflowRun = require('../types/workflowrun.js');
25 const ws_manager = require('./ws_manager.js');
26 const ws_workflowrun = require('./ws_workflowrun.js');
27
28 let allClients = {}; // has publishers and subscribers
29 let probeClients = {}; // a subset of clients
30 let workflowManagerClients = {}; // a subset of clients
31 let workflowRunClients = {}; // a subset of clients
32
33 //let io;
34
35 // key: workflow id
36 // value: Workflow instance
37 let workflows = {};
38
39 // key: workflow run id
40 // value: WorkflowRun instance
41 let workflowRuns = {};
42
43 let serviceEvents = {
44 GREETING: 'cord.workflow.ctlsvc.greeting'
45 };
46
47 // add ws_mgroperation events
48 _.forOwn(ws_manager.serviceEvents, (wsServiceEvent, key) => {
49 serviceEvents[key] = wsServiceEvent;
50 });
51
52 // add ws_runoperation events
53 _.forOwn(ws_workflowrun.serviceEvents, (wsServiceEvent, key) => {
54 serviceEvents[key] = wsServiceEvent;
55 });
56
57 //const setIO = (ioInstance) => {
58 // io = ioInstance;
59 //};
60
61 const destroy = () => {
62 removeClients();
63 clearWorkflowRuns();
64 clearWorkflows();
65 };
66
67 const listWorkflows = () => {
68 let workflowList = [];
69 _.forOwn(workflows, (_workflow, workflowId) => {
70 workflowList.push(workflowId);
71 });
72 return workflowList;
73 };
74
75 const checkWorkflow = (workflowId) => {
76 if(workflowId in workflows) {
77 return true;
78 }
79 return false;
80 };
81
82 const addWorkflow = (workflow) => {
83 if(workflow.getId() in workflows) {
84 logger.log('error', `there exists a workflow with the same id - ${workflow.getId()}`);
85 return false;
86 }
87
88 let workflowId = workflow.getId();
89 workflows[workflowId] = workflow;
90 return true;
91 };
92
93 const clearWorkflows = () => {
94 _.forOwn(workflows, (_workflow, workflowId) => {
95 delete workflows[workflowId];
96 });
97 };
98
99 const listWorkflowRuns = () => {
100 let workflowRunList = [];
101 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
102 workflowRunList.push(workflowRunId);
103 });
104 return workflowRunList;
105 };
106
107 const checkWorkflowRun = (workflowRunId) => {
108 if(workflowRunId in workflowRuns) {
109 return true;
110 }
111 return false;
112 };
113
114 const addWorkflowRun = (workflowRun) => {
115 let workflowId = workflowRun.getWorkflowId();
116 let workflowRunId = workflowRun.getId();
117
118 if(workflowRunId in workflowRuns) {
119 logger.log('warn', `there exists a workflow run with the same id - ${workflowRunId}`);
120 return false;
121 }
122
123 if(!(workflowId in workflows)) {
124 logger.log('warn', `cannot find a workflow with id - ${workflowId}`);
125 return false;
126 }
127
128 workflowRuns[workflowRunId] = workflowRun;
129 return true;
130 };
131
132 const clearWorkflowRuns = () => {
133 _.forOwn(workflowRuns, (_workflowRun, workflowRunId) => {
134 delete workflowRuns[workflowRunId];
135 });
136 };
137
138 const updateWorkflowRunStatus = (workflowRunId, taskId, status) => {
139 if(!(workflowRunId in workflowRuns)) {
140 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
141 return false;
142 }
143
144 let workflowRun = workflowRuns[workflowRunId];
145 workflowRun.updateTaskStatus(taskId, status);
146 return true;
147 };
148
149 const setWorkflowRunKickstarted = (workflowRunId) => {
150 if(!(workflowRunId in workflowRuns)) {
151 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
152 return false;
153 }
154
155 let workflowRun = workflowRuns[workflowRunId];
156 workflowRun.setKickstarted();
157 return true;
158 };
159
160 const kickstart = (workflowId, workflowRunId) => {
161 if(!(workflowId in workflows)) {
162 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
163 return false;
164 }
165
166 if(!(workflowRunId in workflowRuns)) {
167 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
168 return false;
169 }
170
171 ws_manager.kickstartWorkflow(workflowId, workflowRunId);
172 };
173
174 const removeWorkflow = (workflowId) => {
175 if(!(workflowId in workflows)) {
176 logger.log('warn', `cannot find a workflow with the id - ${workflowId}`);
177 return false;
178 }
179
180 // check if there are workflow runs
181 _.forOwn(workflowRuns, (workflowRun, _workflowRunId) => {
182 if(workflowRun.getWorkflowId() === workflowId) {
183 logger.log('warn', `there exists a workflow run for a workflow id - ${workflowId}`);
184 return false;
185 }
186 });
187
188 delete workflows[workflowId];
189 return true;
190 };
191
192 const removeWorkflowRun = (workflowRunId) => {
193 if(!(workflowRunId in workflowRuns)) {
194 logger.log('warn', `cannot find a workflow run with the id - ${workflowRunId}`);
195 return false;
196 }
197
198 let workflowRun = workflowRuns[workflowRunId];
199 delete workflowRuns[workflowRunId];
200
201 workflowRun.setFinished();
202 return true;
203 };
204
205 const sendEvent = (topic, message) => {
206 // list of workflowIds
207 // to check if there are workflow runs for the events
208 let workflowIdsRunning = [];
209
210 // route event to running instances
211 _.forOwn(workflowRuns, (workflowRun, workflowRunId) => {
212 let workflowId = workflowRun.getWorkflowId();
213 let workflow = workflows[workflowId];
214
215 // event will be routed to workflow runs that meet following criteria
216 // 1) the workflow is currently interested in the same topic
217 // (already finished tasks are not counted)
218 // 2) the task's key field and value
219 if(workflowRun.isEventAcceptable(workflow, topic, message)) {
220 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
221 workflowRun.enqueueEvent(topic, message);
222
223 if(!workflowIdsRunning.includes(workflowId)) {
224 workflowIdsRunning.push(workflowId);
225 }
226 }
227 });
228
229 // check if the event is a kickstart event
230 _.forOwn(workflows, (workflow, workflowId) => {
231 if(workflow.isKickstartTopic(topic)) {
232 // check if there is a workflow run for the event
233 // kickstart a workflow if there is no workflows runs for the event
234 if(!workflowIdsRunning.includes(workflowId)) {
235 // we need to buffer the event until workflow run is brought up
236 let workflowRun = WorkflowRun.WorkflowRun.makeNewRun(workflow);
237 let workflowRunId = workflowRun.getId();
238
239 // register for management
240 workflowRuns[workflowRunId] = workflowRun;
241
242 // route event
243 logger.log('debug', `event ${topic} is routed to workflow run ${workflowRunId}`);
244 workflowRun.enqueueEvent(topic, message);
245
246 // KICKSTART!
247 kickstart(workflowId, workflowRunId);
248 }
249 }
250 });
251 };
252
253 const fetchEvent = (workflowRunId, taskId, topic) => {
254 // this returns an event or an empty obj when there is no message
255 if(!(workflowRunId in workflowRuns)) {
256 logger.log('warn', `workflow run ${workflowRunId} does not exist`);
257 return null;
258 }
259
260 let workflowRun = workflowRuns[workflowRunId];
261 let workflowId = workflowRun.getWorkflowId();
262
263 if(!(workflowId in workflows)) {
264 logger.log('warn', `workflow ${workflowId} does not exist`);
265 return null;
266 }
267
268 let workflow = workflows[workflowId];
269
270 let task = workflow.getTask(taskId);
271 if(!task) {
272 logger.log('warn', `workflow ${workflowId} does not have task ${taskId}`);
273 return null;
274 }
275
276 logger.log('debug', `workflow run ${workflowRunId}, task ${taskId} fetches an event`);
277
278 let event = workflowRun.dequeueEvent(topic);
279 if(event) {
280 return event;
281 }
282 else {
283 return {};
284 }
285 };
286
287 const addClient = (c) => {
288 let clientId = c.getId();
289 let socket = c.getSocket();
290
291 // check id that client is already there
292 if(clientId in allClients) {
293 logger.log('warn', `there exists a client with the same id - ${clientId}`);
294 return false;
295 }
296
297 if(c.getType() === Client.Type.PROBE) {
298 // probe' messages are relayed
299 // relay messages based on topic
300 // probe protocol:
301 // REQ:
302 // topic: event topic
303 // message: <data>
304 // RES:
305 // topic: topic sent
306 // message: {result: <true/false>, message: <error message> }
307 allClients[clientId] = c;
308 probeClients[clientId] = c;
309
310 socket.on('*', (msg) => {
311 let jsonMsg = msg.data;
312 if(jsonMsg.length === 2) {
313 // must have two parts
314 // first part is topic
315 // second part is message body
316 let topic = jsonMsg[0];
317 let messageBody = jsonMsg[1];
318
319 sendEvent(topic, messageBody);
320
321 // return true for success
322 socket.emit(topic, {
323 result: true
324 });
325 }
326 else {
327 logger.log('warn', `unexpected message is received - ${JSON.stringify(jsonMsg)}`);
328 socket.emit(jsonMsg[0], {
329 result: false,
330 message: `unexpected message is received - ${JSON.stringify(jsonMsg)}`
331 });
332 }
333 });
334 return true;
335 }
336 else if(c.getType() === Client.Type.WORKFLOW_MANAGER) {
337 // manager
338 // manager protocol:
339 // REQ:
340 // topic: operation
341 // message: <data>
342 // RES:
343 // topic: topic sent
344 // message: {result: <true/false>, message: <error message> }F
345 allClients[clientId] = c;
346 workflowManagerClients[clientId] = c;
347
348 // attach manager operations
349 let router = ws_manager.getRouter();
350 _.forOwn(router, (routerElem, _key) => {
351 socket.on(routerElem.topic, (msg) => {
352 routerElem.handler(routerElem.topic, msg, (err, result) => {
353 if(err) {
354 logger.log('warn', `unable to handle a message - ${err}`);
355 socket.emit(routerElem.topic, {
356 result: false,
357 message: err
358 });
359 return;
360 }
361
362 if(routerElem.return === undefined || routerElem.return) {
363 socket.emit(routerElem.topic, {
364 result: result
365 });
366 }
367 });
368 });
369 });
370 return true;
371 }
372 else if(c.getType() === Client.Type.WORKFLOW_RUN) {
373 // workflow run
374 // workflow run protocol:
375 // REQ:
376 // topic: operation
377 // message: <data>
378 // RES:
379 // topic: topic sent
380 // message: {result: <true/false>, message: <error message> }
381
382 // map to WorkflowRun instance
383 let workflowId = c.getWorkflowId();
384 let workflowRunId = c.getWorkflowRunId();
385 let workflowRun;
386
387 if(!(workflowId in workflows)) {
388 logger.log('warn', `cannot find a workflow ${workflowId}`);
389 return false;
390 }
391
392 // register client to workflow run
393 if(!(workflowRunId in workflowRuns)) {
394 // workflow run not exist yet
395 logger.log('warn', `cannot find a workflow run ${workflowRunId}`);
396 return false;
397 }
398
399 //let workflow = workflows[workflowId];
400
401 allClients[clientId] = c;
402 workflowRunClients[clientId] = c;
403
404 // update
405 workflowRun = workflowRuns[workflowRunId];
406 workflowRun.addClientId(clientId);
407
408 // attach workflow run operations
409 let router = ws_workflowrun.getRouter();
410 _.forOwn(router, (routerElem, _key) => {
411 socket.on(routerElem.topic, (msg) => {
412 routerElem.handler(routerElem.topic, msg, (err, result) => {
413 if(err) {
414 logger.log('warn', `unable to handle a message - ${err}`);
415 socket.emit(routerElem.topic, {
416 result: false,
417 message: err
418 });
419 return;
420 }
421
422 if(routerElem.return === undefined || routerElem.return) {
423 socket.emit(routerElem.topic, {
424 result: result
425 });
426 }
427 });
428 });
429 });
430 return true;
431 }
432 return false;
433 };
434
435 const removeClient = (id) => {
436 if(id in allClients) {
437 let removedClient = allClients[id];
438 delete allClients[id];
439
440 let type = removedClient.getType();
441 if(type === Client.Type.PROBE) {
442 delete probeClients[id];
443 }
444 else if(type === Client.Type.WORKFLOW_MANAGER) {
445 delete workflowManagerClients[id];
446 }
447 else if(type === Client.Type.WORKFLOW_RUN) {
448 delete workflowRunClients[id];
449
450 let workflowRunId = removedClient.getWorkflowRunId();
451 let workflowRun = workflowRuns[workflowRunId];
452
453 if(workflowRun) {
454 workflowRun.removeClientId(id);
455
456 //TODO
457 // WorkflowRun can have no clients between tasks
458 // So we should not remove the run until the workflow run finishes
459 }
460 }
461 }
462 };
463
464 const removeClients = () => {
465 let probeClients = {};
466
467 _.forOwn(probeClients, (_probeClient, clientId) => {
468 delete probeClients[clientId];
469 });
470
471 _.forOwn(workflowManagerClients, (_workflowManagerClient, clientId) => {
472 delete workflowManagerClients[clientId];
473 });
474
475 _.forOwn(workflowRunClients, (_workflowRunClients, clientId) => {
476 delete workflowRunClients[clientId];
477 });
478
479 _.forOwn(allClients, (client, clientId) => {
480 client.getSocket().disconnect(true);
481 delete allClients[clientId];
482 });
483 }
484
485 module.exports = {
486 serviceEvents: serviceEvents,
487 destroy: destroy,
488 getClients: () => { return allClients; },
489 getProbeClients: () => { return probeClients; },
490 getWorkflowManagerClients: () => { return workflowManagerClients; },
491 getWorkflowRunClients: () => { return workflowRunClients; },
492 clientType: Client.Type,
493 //setIO: setIO,
494 sendEvent: sendEvent,
495 fetchEvent: fetchEvent,
496 addClient: addClient,
497 removeClient: removeClient,
498 removeClients: removeClients,
499 addWorkflow: addWorkflow,
500 listWorkflows: listWorkflows,
501 checkWorkflow: checkWorkflow,
502 removeWorkflow: removeWorkflow,
503 clearWorkflows: clearWorkflows,
504 addWorkflowRun: addWorkflowRun,
505 listWorkflowRuns: listWorkflowRuns,
506 checkWorkflowRun: checkWorkflowRun,
507 removeWorkflowRun: removeWorkflowRun,
508 clearWorkflowRuns: clearWorkflowRuns,
509 updateWorkflowRunStatus: updateWorkflowRunStatus,
510 setWorkflowRunKickstarted: setWorkflowRunKickstarted,
511 };
512})();