blob: 5c77ebdae5dd24feab066e5487106ab1deb15ff8 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const Workflow = require('../types/workflow.js');
23 const logger = require('../config/logger.js');
24
25 let serviceEvents = {
26 WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
27 WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
28 WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
29 WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
30 WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
31 WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
32 WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
33 WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
34 };
35
36 // WebSocket interface for workflow registration
37 // Message format:
38 // {
39 // topic: 'cord.workflow.ctlsvc.workflow.reg',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070040 // message: {
41 // req_id: <req_id> // optional
42 // workflow: <workflow>
43 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -070044 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070045 const registerWorkflow = (topic, message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -070046 const distributor = require('./eventrouter.js/index.js');
47
48 let errorMessage;
49 if(!message) {
50 // error
51 errorMessage = `Message body for topic ${topic} is null or empty`;
52 logger.log('warn', `Return error - ${errorMessage}`);
53 cb(errorMessage, false);
54 return;
55 }
56
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070057 if(!('workflow' in message)) {
58 // error
59 errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
60 logger.log('warn', `Return error - ${errorMessage}`);
61 cb(errorMessage, false);
62 return;
63 }
64
65 let workflow = message.workflow;
Illyoung Choi59820ed2019-06-24 17:01:00 -070066
67 logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
68
69 let result = distributor.addWorkflow(workflow);
70 if(!result) {
71 errorMessage = `failed to register a workflow ${workflow.getId()}`;
72 cb(errorMessage, false);
73 }
74 else {
75 cb(null, true);
76 }
77 return;
78 };
79
80 // WebSocket interface for workflow registration (via essence)
81 // Message format:
82 // {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070083 // topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
84 // message: {
85 // req_id: <req_id> // optional
86 // essence: <workflow essence>
87 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -070088 // }
89 const registerWorkflowEssence = (topic, message, cb) => {
90 const eventrouter = require('./eventrouter.js');
91 let errorMessage;
92 if(!message) {
93 // error
94 errorMessage = `Message body for topic ${topic} is null or empty`;
95 logger.log('warn', `Return error - ${errorMessage}`);
96 cb(errorMessage, false);
97 return;
98 }
99
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700100 if(!('essence' in message)) {
101 // error
102 errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
103 logger.log('warn', `Return error - ${errorMessage}`);
104 cb(errorMessage, false);
105 return;
106 }
107
108 let essence = message.essence;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700109 let result = true;
110 let errorResults = [];
111
112 logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
113
114 let workflows = Workflow.loadWorkflowsFromEssence(essence);
115 workflows.forEach((workflow) => {
116 if(workflow) {
117 let localResult = eventrouter.addWorkflow(workflow);
118 errorResults.push(localResult);
119 result = result && localResult; // false if any of registrations fails
120 }
121 });
122
123 if(!result) {
124 errorMessage = `failed to register workflows ${errorResults}`;
125 cb(errorMessage, false);
126 }
127 else {
128 cb(null, true);
129 }
130 return;
131 };
132
133 // WebSocket interface for workflow listing
134 // Message format:
135 // {
136 // topic: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700137 // message: {
138 // req_id: <req_id> // optional
139 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700140 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700141 const listWorkflows = (_topic, _message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700142 const eventrouter = require('./eventrouter.js');
143
144 let result = eventrouter.listWorkflows();
145 cb(null, result);
146 return;
147 };
148
149 // WebSocket interface for workflow run listing
150 // Message format:
151 // {
152 // topic: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700153 // message: {
154 // req_id: <req_id> // optional
155 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700156 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700157 const listWorkflowRuns = (_topic, _message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700158 const eventrouter = require('./eventrouter.js');
159
160 let result = eventrouter.listWorkflowRuns();
161 cb(null, result);
162 return;
163 };
164
165 // WebSocket interface for workflow check
166 // Message format:
167 // {
168 // topic: 'cord.workflow.ctlsvc.workflow.check',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700169 // message: {
170 // req_id: <req_id> // optional
171 // workflow_id: <workflow_id>
172 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700173 // }
174 const checkWorkflow = (topic, message, cb) => {
175 const eventrouter = require('./eventrouter.js');
176
177 let errorMessage;
178 if(!message) {
179 // error
180 errorMessage = `Message body for topic ${topic} is null or empty`;
181 logger.log('warn', `Return error - ${errorMessage}`);
182 cb(errorMessage, false);
183 return;
184 }
185
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700186 if(!('workflow_id' in message)) {
187 // error
188 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
189 logger.log('warn', `Return error - ${errorMessage}`);
190 cb(errorMessage, false);
191 return;
192 }
193
194 let workflowId = message.workflow_id;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700195 let result = eventrouter.checkWorkflow(workflowId);
196 cb(null, result);
197 return;
198 };
199
200 // WebSocket interface for workflow start notification
201 // Message format:
202 // {
203 // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
204 // message: {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700205 // req_id: <req_id> // optional
206 // workflow_id: <workflow_id>,
207 // workflow_run_id: <workflow_run_id>
Illyoung Choi59820ed2019-06-24 17:01:00 -0700208 // }
209 // }
210 const notifyWorkflowStart = (topic, message, cb) => {
211 const eventrouter = require('./eventrouter.js');
212
213 let errorMessage;
214 if(!message) {
215 // error
216 errorMessage = `Message body for topic ${topic} is null or empty`;
217 logger.log('warn', `Return error - ${errorMessage}`);
218 cb(errorMessage, false);
219 return;
220 }
221
222 if(!('workflow_id' in message)) {
223 // error
224 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
225 logger.log('warn', `Return error - ${errorMessage}`);
226 cb(errorMessage, false);
227 return;
228 }
229
230 if(!('workflow_run_id' in message)) {
231 // error
232 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
233 logger.log('warn', `Return error - ${errorMessage}`);
234 cb(errorMessage, false);
235 return;
236 }
237
238 let workflowRunId = message.workflow_run_id;
239
240 // there must be a workflow matching
241 // set the workflow kickstarted
242 eventrouter.setWorkflowRunKickstarted(workflowRunId);
243 cb(null, true);
244 return;
245 }
246
247 // WebSocket interface for workflow removal
248 // Message format:
249 // {
250 // topic: 'cord.workflow.ctlsvc.workflow.remove',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700251 // message: {
252 // req_id: <req_id> // optional
253 // workflow_id: <workflow_id>
254 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700255 // }
256 const removeWorkflow = (topic, message, cb) => {
257 const eventrouter = require('./eventrouter.js');
258
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700259 let errorMessage;
260 if(!message) {
261 // error
262 errorMessage = `Message body for topic ${topic} is null or empty`;
263 logger.log('warn', `Return error - ${errorMessage}`);
264 cb(errorMessage, false);
265 return;
266 }
267
268 if(!('workflow_id' in message)) {
269 // error
270 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
271 logger.log('warn', `Return error - ${errorMessage}`);
272 cb(errorMessage, false);
273 return;
274 }
275
276 let workflowId = message.workflow_id;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700277 let result = eventrouter.removeWorkflow(workflowId);
278 cb(null, result);
279 return;
280 }
281
282 // WebSocket interface for workflow run removal
283 // Message format:
284 // {
285 // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
286 // message: {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700287 // req_id: <req_id> // optional
288 // workflow_id: <workflow_id>,
289 // workflow_run_id: <workflow_run_id>
Illyoung Choi59820ed2019-06-24 17:01:00 -0700290 // }
291 // }
292 const removeWorkflowRun = (topic, message, cb) => {
293 const eventrouter = require('./eventrouter.js');
294
295 let errorMessage;
296 if(!message) {
297 // error
298 errorMessage = `Message body for topic ${topic} is null or empty`;
299 logger.log('warn', `Return error - ${errorMessage}`);
300 cb(errorMessage, false);
301 return;
302 }
303
304 if(!('workflow_id' in message)) {
305 // error
306 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
307 logger.log('warn', `Return error - ${errorMessage}`);
308 cb(errorMessage, false);
309 return;
310 }
311
312 if(!('workflow_run_id' in message)) {
313 // error
314 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
315 logger.log('warn', `Return error - ${errorMessage}`);
316 cb(errorMessage, false);
317 return;
318 }
319
320 let workflowRunId = message.workflow_run_id;
321
322 let result = eventrouter.removeWorkflowRun(workflowRunId);
323 cb(null, result);
324 return;
325 }
326
327 const getRouter = () => {
328 return {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700329 registerWorkflow: {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700330 topic: serviceEvents.WORKFLOW_REG,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700331 handler: registerWorkflow
Illyoung Choi59820ed2019-06-24 17:01:00 -0700332 },
333 registerWorkflowEssence: {
334 topic: serviceEvents.WORKFLOW_REG_ESSENCE,
335 handler: registerWorkflowEssence
336 },
337 listWorkflows: {
338 topic: serviceEvents.WORKFLOW_LIST,
339 handler: listWorkflows
340 },
341 listWorkflowRuns: {
342 topic: serviceEvents.WORKFLOW_RUN_LIST,
343 handler: listWorkflowRuns
344 },
345 checkWorkflow: {
346 topic: serviceEvents.WORKFLOW_CHECK,
347 handler: checkWorkflow
348 },
349 notifyWorkflowStart: {
350 topic: serviceEvents.WORKFLOW_KICKSTART,
351 handler: notifyWorkflowStart,
352 return: false
353 },
354 removeWorkflow: {
355 topic: serviceEvents.WORKFLOW_REMOVE,
356 handler: removeWorkflow
357 },
358 removeWorkflowRun: {
359 topic: serviceEvents.WORKFLOW_RUN_REMOVE,
360 handler: removeWorkflowRun
361 }
362 };
363 };
364
365 // out-going commands
366 const kickstartWorkflow = (workflowId, workflowRunId) => {
367 const eventrouter = require('./eventrouter.js');
368
369 let clients = eventrouter.getWorkflowManagerClients();
370 _.forOwn(clients, (client, _clientId) => {
371 let socket = client.getSocket();
372 if(socket) {
373 socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
374 workflow_id: workflowId,
375 workflow_run_id: workflowRunId
376 });
377 }
378 });
379 return;
380 };
381
382 module.exports = {
383 serviceEvents: serviceEvents,
384 getRouter: getRouter,
385 kickstartWorkflow: kickstartWorkflow
386 };
387})();