blob: 3d81bac962bcc2a34041f0c24261107d02e60297 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const Workflow = require('../types/workflow.js');
23 const logger = require('../config/logger.js');
24
25 let serviceEvents = {
Illyoung Choib4fc0d82019-07-16 10:29:39 -070026 // manager -> controller -> manager
27 WORKFLOW_REGISTER: 'cord.workflow.ctlsvc.workflow.register',
28 WORKFLOW_REGISTER_ESSENCE: 'cord.workflow.ctlsvc.workflow.register_essence',
Illyoung Choi59820ed2019-06-24 17:01:00 -070029 WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choib4fc0d82019-07-16 10:29:39 -070030 WORKFLOW_LIST_RUN: 'cord.workflow.ctlsvc.workflow.run.list',
Illyoung Choi59820ed2019-06-24 17:01:00 -070031 WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
Illyoung Choi59820ed2019-06-24 17:01:00 -070032 WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
Illyoung Choib4fc0d82019-07-16 10:29:39 -070033 WORKFLOW_REMOVE_RUN: 'cord.workflow.ctlsvc.workflow.run.remove',
34 WORKFLOW_NOTIFY_NEW_RUN: 'cord.workflow.ctlsvc.workflow.notify_new_run',
35 // controller -> manager
36 WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart'
Illyoung Choi59820ed2019-06-24 17:01:00 -070037 };
38
39 // WebSocket interface for workflow registration
40 // Message format:
41 // {
42 // topic: 'cord.workflow.ctlsvc.workflow.reg',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070043 // message: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -070044 // req_id: <req_id>, // optional
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070045 // workflow: <workflow>
46 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -070047 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070048 const registerWorkflow = (topic, message, cb) => {
Illyoung Choib4fc0d82019-07-16 10:29:39 -070049 const eventrouter = require('./eventrouter.js');
Illyoung Choi59820ed2019-06-24 17:01:00 -070050
51 let errorMessage;
52 if(!message) {
53 // error
54 errorMessage = `Message body for topic ${topic} is null or empty`;
55 logger.log('warn', `Return error - ${errorMessage}`);
56 cb(errorMessage, false);
57 return;
58 }
59
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070060 if(!('workflow' in message)) {
61 // error
62 errorMessage = `field 'workflow' does not exist in message body - ${JSON.stringify(message)}`;
63 logger.log('warn', `Return error - ${errorMessage}`);
64 cb(errorMessage, false);
65 return;
66 }
67
68 let workflow = message.workflow;
Illyoung Choi59820ed2019-06-24 17:01:00 -070069
70 logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
71
Illyoung Choib4fc0d82019-07-16 10:29:39 -070072 let result = eventrouter.addWorkflow(workflow);
Illyoung Choi59820ed2019-06-24 17:01:00 -070073 if(!result) {
74 errorMessage = `failed to register a workflow ${workflow.getId()}`;
75 cb(errorMessage, false);
76 }
77 else {
78 cb(null, true);
79 }
80 return;
81 };
82
83 // WebSocket interface for workflow registration (via essence)
84 // Message format:
85 // {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -070086 // topic: 'cord.workflow.ctlsvc.workflow.reg_essence',
87 // message: {
88 // req_id: <req_id> // optional
89 // essence: <workflow essence>
90 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -070091 // }
92 const registerWorkflowEssence = (topic, message, cb) => {
93 const eventrouter = require('./eventrouter.js');
94 let errorMessage;
95 if(!message) {
96 // error
97 errorMessage = `Message body for topic ${topic} is null or empty`;
98 logger.log('warn', `Return error - ${errorMessage}`);
99 cb(errorMessage, false);
100 return;
101 }
102
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700103 if(!('essence' in message)) {
104 // error
105 errorMessage = `field 'essence' does not exist in message body - ${JSON.stringify(message)}`;
106 logger.log('warn', `Return error - ${errorMessage}`);
107 cb(errorMessage, false);
108 return;
109 }
110
111 let essence = message.essence;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700112 let result = true;
113 let errorResults = [];
114
115 logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
116
117 let workflows = Workflow.loadWorkflowsFromEssence(essence);
118 workflows.forEach((workflow) => {
119 if(workflow) {
120 let localResult = eventrouter.addWorkflow(workflow);
121 errorResults.push(localResult);
122 result = result && localResult; // false if any of registrations fails
123 }
124 });
125
126 if(!result) {
127 errorMessage = `failed to register workflows ${errorResults}`;
128 cb(errorMessage, false);
129 }
130 else {
131 cb(null, true);
132 }
133 return;
134 };
135
136 // WebSocket interface for workflow listing
137 // Message format:
138 // {
139 // topic: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700140 // message: {
141 // req_id: <req_id> // optional
142 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700143 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700144 const listWorkflows = (_topic, _message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700145 const eventrouter = require('./eventrouter.js');
146
147 let result = eventrouter.listWorkflows();
148 cb(null, result);
149 return;
150 };
151
152 // WebSocket interface for workflow run listing
153 // Message format:
154 // {
155 // topic: 'cord.workflow.ctlsvc.workflow.list',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700156 // message: {
157 // req_id: <req_id> // optional
158 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700159 // }
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700160 const listWorkflowRuns = (_topic, _message, cb) => {
Illyoung Choi59820ed2019-06-24 17:01:00 -0700161 const eventrouter = require('./eventrouter.js');
162
163 let result = eventrouter.listWorkflowRuns();
164 cb(null, result);
165 return;
166 };
167
168 // WebSocket interface for workflow check
169 // Message format:
170 // {
171 // topic: 'cord.workflow.ctlsvc.workflow.check',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700172 // message: {
173 // req_id: <req_id> // optional
174 // workflow_id: <workflow_id>
175 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700176 // }
177 const checkWorkflow = (topic, message, cb) => {
178 const eventrouter = require('./eventrouter.js');
179
180 let errorMessage;
181 if(!message) {
182 // error
183 errorMessage = `Message body for topic ${topic} is null or empty`;
184 logger.log('warn', `Return error - ${errorMessage}`);
185 cb(errorMessage, false);
186 return;
187 }
188
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700189 if(!('workflow_id' in message)) {
190 // error
191 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
192 logger.log('warn', `Return error - ${errorMessage}`);
193 cb(errorMessage, false);
194 return;
195 }
196
197 let workflowId = message.workflow_id;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700198 let result = eventrouter.checkWorkflow(workflowId);
199 cb(null, result);
200 return;
201 };
202
Illyoung Choi59820ed2019-06-24 17:01:00 -0700203 // WebSocket interface for workflow removal
204 // Message format:
205 // {
206 // topic: 'cord.workflow.ctlsvc.workflow.remove',
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700207 // message: {
208 // req_id: <req_id> // optional
209 // workflow_id: <workflow_id>
210 // }
Illyoung Choi59820ed2019-06-24 17:01:00 -0700211 // }
212 const removeWorkflow = (topic, message, cb) => {
213 const eventrouter = require('./eventrouter.js');
214
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700215 let errorMessage;
216 if(!message) {
217 // error
218 errorMessage = `Message body for topic ${topic} is null or empty`;
219 logger.log('warn', `Return error - ${errorMessage}`);
220 cb(errorMessage, false);
221 return;
222 }
223
224 if(!('workflow_id' in message)) {
225 // error
226 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
227 logger.log('warn', `Return error - ${errorMessage}`);
228 cb(errorMessage, false);
229 return;
230 }
231
232 let workflowId = message.workflow_id;
Illyoung Choi59820ed2019-06-24 17:01:00 -0700233 let result = eventrouter.removeWorkflow(workflowId);
234 cb(null, result);
235 return;
236 }
237
238 // WebSocket interface for workflow run removal
239 // Message format:
240 // {
241 // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
242 // message: {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700243 // req_id: <req_id> // optional
244 // workflow_id: <workflow_id>,
245 // workflow_run_id: <workflow_run_id>
Illyoung Choi59820ed2019-06-24 17:01:00 -0700246 // }
247 // }
248 const removeWorkflowRun = (topic, message, cb) => {
249 const eventrouter = require('./eventrouter.js');
250
251 let errorMessage;
252 if(!message) {
253 // error
254 errorMessage = `Message body for topic ${topic} is null or empty`;
255 logger.log('warn', `Return error - ${errorMessage}`);
256 cb(errorMessage, false);
257 return;
258 }
259
260 if(!('workflow_id' in message)) {
261 // error
262 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
263 logger.log('warn', `Return error - ${errorMessage}`);
264 cb(errorMessage, false);
265 return;
266 }
267
268 if(!('workflow_run_id' in message)) {
269 // error
270 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
271 logger.log('warn', `Return error - ${errorMessage}`);
272 cb(errorMessage, false);
273 return;
274 }
275
276 let workflowRunId = message.workflow_run_id;
277
278 let result = eventrouter.removeWorkflowRun(workflowRunId);
279 cb(null, result);
280 return;
281 }
282
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700283 // WebSocket interface for notifying a new workflow run
284 // Message format:
285 // {
286 // topic: 'cord.workflow.ctlsvc.workflow.notify_new_run',
287 // message: {
288 // req_id: <req_id> // optional
289 // workflow_id: <workflow_id>,
290 // workflow_run_id: <workflow_run_id>
291 // }
292 // }
293 const notifyNewWorkflowRun = (topic, message, cb) => {
294 const eventrouter = require('./eventrouter.js');
295
296 let errorMessage;
297 if(!message) {
298 // error
299 errorMessage = `Message body for topic ${topic} is null or empty`;
300 logger.log('warn', `Return error - ${errorMessage}`);
301 cb(errorMessage, false);
302 return;
303 }
304
305 if(!('workflow_id' in message)) {
306 // error
307 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
308 logger.log('warn', `Return error - ${errorMessage}`);
309 cb(errorMessage, false);
310 return;
311 }
312
313 if(!('workflow_run_id' in message)) {
314 // error
315 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
316 logger.log('warn', `Return error - ${errorMessage}`);
317 cb(errorMessage, false);
318 return;
319 }
320
321 let workflowRunId = message.workflow_run_id;
322
323 // there must be a workflow matching
324 // set the workflow kickstarted
325 let result = eventrouter.setWorkflowRunKickstarted(workflowRunId);
326 cb(null, result);
327 return;
328 }
329
Illyoung Choi59820ed2019-06-24 17:01:00 -0700330 const getRouter = () => {
331 return {
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700332 registerWorkflow: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700333 topic: serviceEvents.WORKFLOW_REGISTER,
Illyoung Choie3ce4cf2019-06-28 11:07:47 -0700334 handler: registerWorkflow
Illyoung Choi59820ed2019-06-24 17:01:00 -0700335 },
336 registerWorkflowEssence: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700337 topic: serviceEvents.WORKFLOW_REGISTER_ESSENCE,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700338 handler: registerWorkflowEssence
339 },
340 listWorkflows: {
341 topic: serviceEvents.WORKFLOW_LIST,
342 handler: listWorkflows
343 },
344 listWorkflowRuns: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700345 topic: serviceEvents.WORKFLOW_LIST_RUN,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700346 handler: listWorkflowRuns
347 },
348 checkWorkflow: {
349 topic: serviceEvents.WORKFLOW_CHECK,
350 handler: checkWorkflow
351 },
Illyoung Choi59820ed2019-06-24 17:01:00 -0700352 removeWorkflow: {
353 topic: serviceEvents.WORKFLOW_REMOVE,
354 handler: removeWorkflow
355 },
356 removeWorkflowRun: {
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700357 topic: serviceEvents.WORKFLOW_REMOVE_RUN,
Illyoung Choi59820ed2019-06-24 17:01:00 -0700358 handler: removeWorkflowRun
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700359 },
360 notifyNewWorkflowRun: {
361 topic: serviceEvents.WORKFLOW_NOTIFY_NEW_RUN,
362 handler: notifyNewWorkflowRun
Illyoung Choi59820ed2019-06-24 17:01:00 -0700363 }
364 };
365 };
366
367 // out-going commands
368 const kickstartWorkflow = (workflowId, workflowRunId) => {
369 const eventrouter = require('./eventrouter.js');
370
371 let clients = eventrouter.getWorkflowManagerClients();
372 _.forOwn(clients, (client, _clientId) => {
373 let socket = client.getSocket();
374 if(socket) {
375 socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
376 workflow_id: workflowId,
377 workflow_run_id: workflowRunId
378 });
379 }
380 });
381 return;
382 };
383
384 module.exports = {
385 serviceEvents: serviceEvents,
386 getRouter: getRouter,
387 kickstartWorkflow: kickstartWorkflow
388 };
Illyoung Choib4fc0d82019-07-16 10:29:39 -0700389})();