blob: 752ee8888b102c59916f0b2c76b46210ed7f10df [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const Workflow = require('../types/workflow.js');
23 const logger = require('../config/logger.js');
24
25 let serviceEvents = {
Illyoung Choib4fc0d82019-07-16 10:29:39 -070026 // manager -> controller -> manager
27 WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
28 WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
Illyoung Choi59820ed2019-06-24 17:01:00 -070029 WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choib4fc0d82019-07-16 10:29:39 -070030 WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
Illyoung Choi59820ed2019-06-24 17:01:00 -070031 WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
Illyoung Choi59820ed2019-06-24 17:01:00 -070032 WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
Illyoung Choib4fc0d82019-07-16 10:29:39 -070033 WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
Illyoung Choic707c052019-07-18 13:50:49 -070034 WORKFLOW_REPORT_NEW_RUN: 'cord.workflow.ctlsvc.workflow.report_new_run',
35 WORKFLOW_REPORT_RUN_STATE: 'cord.workflow.ctlsvc.workflow.report_run_state',
Illyoung Choib4fc0d82019-07-16 10:29:39 -070036 // controller -> manager
Illyoung Choic707c052019-07-18 13:50:49 -070037 WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
38 WORKFLOW_CHECK_STATE: 'cord.workflow.ctlsvc.workflow.check.state'
Illyoung Choi59820ed2019-06-24 17:01:00 -070039 };
40
41 // WebSocket interface for workflow registration
42 // Message format:
43 // {
44 // topic: 'cord.workflow.ctlsvc.workflow.reg',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070045 // message: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -070046 // req_id: <req_id>, // optional
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070047 // workflow: <workflow>
48 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -070049 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070050 const registerWorkflow = (topic, message, cb) => {
Illyoung Choib4fc0d82019-07-16 10:29:39 -070051 const eventrouter = require('./eventrouter.js');
Illyoung Choi59820ed2019-06-24 17:01:00 -070052
53 let errorMessage;
54 if(!message) {
55 // error
56 errorMessage = `Message body for topic ${topic} is null or empty`;
57 logger.log('warn', `Return error - ${errorMessage}`);
58 cb(errorMessage, false);
59 return;
60 }
61
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070062 if(!('workflow' in message)) {
63 // error
64 errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
65 logger.log('warn', `Return error - ${errorMessage}`);
66 cb(errorMessage, false);
67 return;
68 }
69
70 let workflow = message.workflow;
Illyoung Choi59820ed2019-06-24 17:01:00 -070071
72 logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
73
Illyoung Choib4fc0d82019-07-16 10:29:39 -070074 let result = eventrouter.addWorkflow(workflow);
Illyoung Choi59820ed2019-06-24 17:01:00 -070075 if(!result) {
76 errorMessage = `failed to register a workflow ${workflow.getId()}`;
77 cb(errorMessage, false);
78 }
79 else {
80 cb(null, true);
81 }
82 return;
83 };
84
85 // WebSocket interface for workflow registration (via essence)
86 // Message format:
87 // {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070088 // topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
89 // message: {
90 // req_id: <req_id> // optional
91 // essence: <workflow essence>
92 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -070093 // }
94 const registerWorkflowEssence = (topic, message, cb) => {
95 const eventrouter = require('./eventrouter.js');
96 let errorMessage;
97 if(!message) {
98 // error
99 errorMessage = `Message body for topic ${topic} is null or empty`;
100 logger.log('warn', `Return error - ${errorMessage}`);
101 cb(errorMessage, false);
102 return;
103 }
104
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700105 if(!('essence' in message)) {
106 // error
107 errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
108 logger.log('warn', `Return error - ${errorMessage}`);
109 cb(errorMessage, false);
110 return;
111 }
112
113 let essence = message.essence;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700114 let result = true;
115 let errorResults = [];
116
117 logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
118
119 let workflows = Workflow.loadWorkflowsFromEssence(essence);
120 workflows.forEach((workflow) => {
121 if(workflow) {
122 let localResult = eventrouter.addWorkflow(workflow);
123 errorResults.push(localResult);
124 result = result && localResult; // false if any of registrations fails
125 }
126 });
127
128 if(!result) {
129 errorMessage = `failed to register workflows ${errorResults}`;
130 cb(errorMessage, false);
131 }
132 else {
133 cb(null, true);
134 }
135 return;
136 };
137
138 // WebSocket interface for workflow listing
139 // Message format:
140 // {
141 // topic: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700142 // message: {
143 // req_id: <req_id> // optional
144 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700145 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700146 const listWorkflows = (_topic, _message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700147 const eventrouter = require('./eventrouter.js');
148
149 let result = eventrouter.listWorkflows();
150 cb(null, result);
151 return;
152 };
153
154 // WebSocket interface for workflow run listing
155 // Message format:
156 // {
157 // topic: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700158 // message: {
159 // req_id: <req_id> // optional
160 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700161 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700162 const listWorkflowRuns = (_topic, _message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700163 const eventrouter = require('./eventrouter.js');
164
165 let result = eventrouter.listWorkflowRuns();
166 cb(null, result);
167 return;
168 };
169
170 // WebSocket interface for workflow check
171 // Message format:
172 // {
173 // topic: 'cord.workflow.ctlsvc.workflow.check',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700174 // message: {
175 // req_id: <req_id> // optional
176 // workflow_id: <workflow_id>
177 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700178 // }
179 const checkWorkflow = (topic, message, cb) => {
180 const eventrouter = require('./eventrouter.js');
181
182 let errorMessage;
183 if(!message) {
184 // error
185 errorMessage = `Message body for topic ${topic} is null or empty`;
186 logger.log('warn', `Return error - ${errorMessage}`);
187 cb(errorMessage, false);
188 return;
189 }
190
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700191 if(!('workflow_id' in message)) {
192 // error
193 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
194 logger.log('warn', `Return error - ${errorMessage}`);
195 cb(errorMessage, false);
196 return;
197 }
198
199 let workflowId = message.workflow_id;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700200 let result = eventrouter.checkWorkflow(workflowId);
201 cb(null, result);
202 return;
203 };
204
Illyoung Choi59820ed2019-06-24 17:01:00 -0700205 // WebSocket interface for workflow removal
206 // Message format:
207 // {
208 // topic: 'cord.workflow.ctlsvc.workflow.remove',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700209 // message: {
210 // req_id: <req_id> // optional
211 // workflow_id: <workflow_id>
212 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700213 // }
214 const removeWorkflow = (topic, message, cb) => {
215 const eventrouter = require('./eventrouter.js');
216
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700217 let errorMessage;
218 if(!message) {
219 // error
220 errorMessage = `Message body for topic ${topic} is null or empty`;
221 logger.log('warn', `Return error - ${errorMessage}`);
222 cb(errorMessage, false);
223 return;
224 }
225
226 if(!('workflow_id' in message)) {
227 // error
228 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
229 logger.log('warn', `Return error - ${errorMessage}`);
230 cb(errorMessage, false);
231 return;
232 }
233
234 let workflowId = message.workflow_id;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700235 let result = eventrouter.removeWorkflow(workflowId);
236 cb(null, result);
237 return;
Illyoung Choic707c052019-07-18 13:50:49 -0700238 };
Illyoung Choi59820ed2019-06-24 17:01:00 -0700239
240 // WebSocket interface for workflow run removal
241 // Message format:
242 // {
243 // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
244 // message: {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700245 // req_id: <req_id> // optional
246 // workflow_id: <workflow_id>,
247 // workflow_run_id: <workflow_run_id>
Illyoung Choi59820ed2019-06-24 17:01:00 -0700248 // }
249 // }
250 const removeWorkflowRun = (topic, message, cb) => {
251 const eventrouter = require('./eventrouter.js');
252
253 let errorMessage;
254 if(!message) {
255 // error
256 errorMessage = `Message body for topic ${topic} is null or empty`;
257 logger.log('warn', `Return error - ${errorMessage}`);
258 cb(errorMessage, false);
259 return;
260 }
261
262 if(!('workflow_id' in message)) {
263 // error
264 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
265 logger.log('warn', `Return error - ${errorMessage}`);
266 cb(errorMessage, false);
267 return;
268 }
269
270 if(!('workflow_run_id' in message)) {
271 // error
272 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
273 logger.log('warn', `Return error - ${errorMessage}`);
274 cb(errorMessage, false);
275 return;
276 }
277
278 let workflowRunId = message.workflow_run_id;
279
280 let result = eventrouter.removeWorkflowRun(workflowRunId);
281 cb(null, result);
282 return;
Illyoung Choic707c052019-07-18 13:50:49 -0700283 };
Illyoung Choi59820ed2019-06-24 17:01:00 -0700284
Illyoung Choic707c052019-07-18 13:50:49 -0700285 // WebSocket interface for reporting a new workflow run
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700286 // Message format:
287 // {
Illyoung Choic707c052019-07-18 13:50:49 -0700288 // topic: 'cord.workflow.ctlsvc.workflow.report_new_run',
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700289 // message: {
290 // req_id: <req_id> // optional
291 // workflow_id: <workflow_id>,
292 // workflow_run_id: <workflow_run_id>
293 // }
294 // }
Illyoung Choic707c052019-07-18 13:50:49 -0700295 const reportNewWorkflowRun = (topic, message, cb) => {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700296 const eventrouter = require('./eventrouter.js');
297
298 let errorMessage;
299 if(!message) {
300 // error
301 errorMessage = `Message body for topic ${topic} is null or empty`;
302 logger.log('warn', `Return error - ${errorMessage}`);
303 cb(errorMessage, false);
304 return;
305 }
306
307 if(!('workflow_id' in message)) {
308 // error
309 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
310 logger.log('warn', `Return error - ${errorMessage}`);
311 cb(errorMessage, false);
312 return;
313 }
314
315 if(!('workflow_run_id' in message)) {
316 // error
317 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
318 logger.log('warn', `Return error - ${errorMessage}`);
319 cb(errorMessage, false);
320 return;
321 }
322
323 let workflowRunId = message.workflow_run_id;
324
325 // there must be a workflow matching
326 // set the workflow kickstarted
327 let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
328 cb(null, result);
329 return;
Illyoung Choic707c052019-07-18 13:50:49 -0700330 };
331
332 // WebSocket interface for reporting workflow run state
333 // Message format:
334 // {
335 // topic: 'cord.workflow.ctlsvc.workflow.report_run_state',
336 // message: {
337 // req_id: <req_id> // optional
338 // workflow_id: <workflow_id>,
339 // workflow_run_id: <workflow_run_id>,
340 // state: one of ['success', 'running', 'failed', 'unknown']
341 // }
342 // }
343 const reportWorkflowRunState = (topic, message, cb) => {
344 const eventrouter = require('./eventrouter.js');
345
346 let errorMessage;
347 if(!message) {
348 // error
349 errorMessage = `Message body for topic ${topic} is null or empty`;
350 logger.log('warn', `Return error - ${errorMessage}`);
351 cb(errorMessage, false);
352 return;
353 }
354
355 if(!('workflow_id' in message)) {
356 // error
357 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
358 logger.log('warn', `Return error - ${errorMessage}`);
359 cb(errorMessage, false);
360 return;
361 }
362
363 if(!('workflow_run_id' in message)) {
364 // error
365 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
366 logger.log('warn', `Return error - ${errorMessage}`);
367 cb(errorMessage, false);
368 return;
369 }
370
371 if(!('state' in message)) {
372 // error
373 errorMessage = `field 'state' does not exist in message body - ${JSON.stringify(message)}`;
374 logger.log('warn', `Return error - ${errorMessage}`);
375 cb(errorMessage, false);
376 return;
377 }
378
379 let workflowRunId = message.workflow_run_id;
380 let state = message.state;
381
382 // there must be a workflow matching
383 // set workflow state
384 let result = eventrouter.setWorkflowRunState(workflowRunId, state);
385 cb(null, result);
386 return;
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700387 }
388
Illyoung Choi59820ed2019-06-24 17:01:00 -0700389 const getRouter = () => {
390 return {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700391 registerWorkflow: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700392 topic: serviceEvents.WORKFLOW_REGISTER,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700393 handler: registerWorkflow
Illyoung Choi59820ed2019-06-24 17:01:00 -0700394 },
395 registerWorkflowEssence: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700396 topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700397 handler: registerWorkflowEssence
398 },
399 listWorkflows: {
400 topic: serviceEvents.WORKFLOW_LIST,
401 handler: listWorkflows
402 },
403 listWorkflowRuns: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700404 topic: serviceEvents.WORKFLOW_LIST_RUN,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700405 handler: listWorkflowRuns
406 },
407 checkWorkflow: {
408 topic: serviceEvents.WORKFLOW_CHECK,
409 handler: checkWorkflow
410 },
Illyoung Choi59820ed2019-06-24 17:01:00 -0700411 removeWorkflow: {
412 topic: serviceEvents.WORKFLOW_REMOVE,
413 handler: removeWorkflow
414 },
415 removeWorkflowRun: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700416 topic: serviceEvents.WORKFLOW_REMOVE_RUN,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700417 handler: removeWorkflowRun
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700418 },
Illyoung Choic707c052019-07-18 13:50:49 -0700419 reportNewWorkflowRun: {
420 topic: serviceEvents.WORKFLOW_REPORT_NEW_RUN,
421 handler: reportNewWorkflowRun
422 },
423 reportWorkflowRunState: {
424 topic: serviceEvents.WORKFLOW_REPORT_RUN_STATE,
425 handler: reportWorkflowRunState
Illyoung Choi59820ed2019-06-24 17:01:00 -0700426 }
427 };
428 };
429
430 // out-going commands
431 const kickstartWorkflow = (workflowId, workflowRunId) => {
432 const eventrouter = require('./eventrouter.js');
433
434 let clients = eventrouter.getWorkflowManagerClients();
435 _.forOwn(clients, (client, _clientId) => {
436 let socket = client.getSocket();
437 if(socket) {
438 socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
439 workflow_id: workflowId,
440 workflow_run_id: workflowRunId
441 });
442 }
443 });
444 return;
445 };
446
Illyoung Choic707c052019-07-18 13:50:49 -0700447 const checkWorkflowState = (workflowId, workflowRunId) => {
448 const eventrouter = require('./eventrouter.js');
449
450 let clients = eventrouter.getWorkflowManagerClients();
451 _.forOwn(clients, (client, _clientId) => {
452 let socket = client.getSocket();
453 if(socket) {
454 socket.emit(serviceEvents.WORKFLOW_CHECK_STATE, {
455 workflow_id: workflowId,
456 workflow_run_id: workflowRunId
457 });
458 }
459 });
460 return;
461 };
462
Illyoung Choi59820ed2019-06-24 17:01:00 -0700463 module.exports = {
464 serviceEvents: serviceEvents,
465 getRouter: getRouter,
Illyoung Choic707c052019-07-18 13:50:49 -0700466 kickstartWorkflow: kickstartWorkflow,
467 checkWorkflowState: checkWorkflowState
Illyoung Choi59820ed2019-06-24 17:01:00 -0700468 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700469})();