blob: b3e88776681fd7bd2da7e2df7534f0cee02f1957 [file] [log] [blame]
Illyoung Choi59820ed2019-06-24 17:01:00 -07001/*
2 * Copyright 2019-present Open Networking Foundation
3
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7
8 * http://www.apache.org/licenses/LICENSE-2.0
9
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17
18(function () {
19 'use strict';
20
21 const _ = require('lodash');
22 const Workflow = require('../types/workflow.js');
23 const logger = require('../config/logger.js');
24
25 let serviceEvents = {
26 WORKFLOW_REG: 'cord.workflow.ctlsvc.workflow.reg',
27 WORKFLOW_REG_ESSENCE: 'cord.workflow.ctlsvc.workflow.reg_essence',
28 WORKFLOW_LIST: 'cord.workflow.ctlsvc.workflow.list',
29 WORKFLOW_RUN_LIST: 'cord.workflow.ctlsvc.workflow.run.list',
30 WORKFLOW_CHECK: 'cord.workflow.ctlsvc.workflow.check',
31 WORKFLOW_KICKSTART: 'cord.workflow.ctlsvc.workflow.kickstart',
32 WORKFLOW_REMOVE: 'cord.workflow.ctlsvc.workflow.remove',
33 WORKFLOW_RUN_REMOVE: 'cord.workflow.ctlsvc.workflow.run.remove'
34 };
35
36 // WebSocket interface for workflow registration
37 // Message format:
38 // {
39 // topic: 'cord.workflow.ctlsvc.workflow.reg',
40 // message: <workflow>
41 // }
42 const registWorkflow = (topic, message, cb) => {
43 const distributor = require('./eventrouter.js/index.js');
44
45 let errorMessage;
46 if(!message) {
47 // error
48 errorMessage = `Message body for topic ${topic} is null or empty`;
49 logger.log('warn', `Return error - ${errorMessage}`);
50 cb(errorMessage, false);
51 return;
52 }
53
54 let workflow = message;
55
56 logger.log('debug', `workflow - ${JSON.stringify(workflow)}`);
57
58 let result = distributor.addWorkflow(workflow);
59 if(!result) {
60 errorMessage = `failed to register a workflow ${workflow.getId()}`;
61 cb(errorMessage, false);
62 }
63 else {
64 cb(null, true);
65 }
66 return;
67 };
68
69 // WebSocket interface for workflow registration (via essence)
70 // Message format:
71 // {
72 // topic: 'cord.workflow.ctlsvc.workflow.reg',
73 // message: <workflow essence>
74 // }
75 const registerWorkflowEssence = (topic, message, cb) => {
76 const eventrouter = require('./eventrouter.js');
77 let errorMessage;
78 if(!message) {
79 // error
80 errorMessage = `Message body for topic ${topic} is null or empty`;
81 logger.log('warn', `Return error - ${errorMessage}`);
82 cb(errorMessage, false);
83 return;
84 }
85
86 let essence = message;
87 let result = true;
88 let errorResults = [];
89
90 logger.log('debug', `workflow essence - ${JSON.stringify(essence)}`);
91
92 let workflows = Workflow.loadWorkflowsFromEssence(essence);
93 workflows.forEach((workflow) => {
94 if(workflow) {
95 let localResult = eventrouter.addWorkflow(workflow);
96 errorResults.push(localResult);
97 result = result && localResult; // false if any of registrations fails
98 }
99 });
100
101 if(!result) {
102 errorMessage = `failed to register workflows ${errorResults}`;
103 cb(errorMessage, false);
104 }
105 else {
106 cb(null, true);
107 }
108 return;
109 };
110
111 // WebSocket interface for workflow listing
112 // Message format:
113 // {
114 // topic: 'cord.workflow.ctlsvc.workflow.list',
115 // message: null
116 // }
117 const listWorkflows = (topic, message, cb) => {
118 const eventrouter = require('./eventrouter.js');
119
120 let result = eventrouter.listWorkflows();
121 cb(null, result);
122 return;
123 };
124
125 // WebSocket interface for workflow run listing
126 // Message format:
127 // {
128 // topic: 'cord.workflow.ctlsvc.workflow.list',
129 // message: null
130 // }
131 const listWorkflowRuns = (topic, message, cb) => {
132 const eventrouter = require('./eventrouter.js');
133
134 let result = eventrouter.listWorkflowRuns();
135 cb(null, result);
136 return;
137 };
138
139 // WebSocket interface for workflow check
140 // Message format:
141 // {
142 // topic: 'cord.workflow.ctlsvc.workflow.check',
143 // message: <workflow_id>
144 // }
145 const checkWorkflow = (topic, message, cb) => {
146 const eventrouter = require('./eventrouter.js');
147
148 let errorMessage;
149 if(!message) {
150 // error
151 errorMessage = `Message body for topic ${topic} is null or empty`;
152 logger.log('warn', `Return error - ${errorMessage}`);
153 cb(errorMessage, false);
154 return;
155 }
156
157 let workflowId = message;
158 let result = eventrouter.checkWorkflow(workflowId);
159 cb(null, result);
160 return;
161 };
162
163 // WebSocket interface for workflow start notification
164 // Message format:
165 // {
166 // topic: 'cord.workflow.ctlsvc.workflow.kickstart',
167 // message: {
168 // workflow_id: <workflow_id>,
169 // workflow_run_id: <workflow_run_id>
170 // }
171 // }
172 const notifyWorkflowStart = (topic, message, cb) => {
173 const eventrouter = require('./eventrouter.js');
174
175 let errorMessage;
176 if(!message) {
177 // error
178 errorMessage = `Message body for topic ${topic} is null or empty`;
179 logger.log('warn', `Return error - ${errorMessage}`);
180 cb(errorMessage, false);
181 return;
182 }
183
184 if(!('workflow_id' in message)) {
185 // error
186 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
187 logger.log('warn', `Return error - ${errorMessage}`);
188 cb(errorMessage, false);
189 return;
190 }
191
192 if(!('workflow_run_id' in message)) {
193 // error
194 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
195 logger.log('warn', `Return error - ${errorMessage}`);
196 cb(errorMessage, false);
197 return;
198 }
199
200 let workflowRunId = message.workflow_run_id;
201
202 // there must be a workflow matching
203 // set the workflow kickstarted
204 eventrouter.setWorkflowRunKickstarted(workflowRunId);
205 cb(null, true);
206 return;
207 }
208
209 // WebSocket interface for workflow removal
210 // Message format:
211 // {
212 // topic: 'cord.workflow.ctlsvc.workflow.remove',
213 // message: <workflow_id>
214 // }
215 const removeWorkflow = (topic, message, cb) => {
216 const eventrouter = require('./eventrouter.js');
217
218 let workflowId = message;
219 let result = eventrouter.removeWorkflow(workflowId);
220 cb(null, result);
221 return;
222 }
223
224 // WebSocket interface for workflow run removal
225 // Message format:
226 // {
227 // topic: 'cord.workflow.ctlsvc.workflow.run.remove',
228 // message: {
229 // workflow_id: <workflow_id>,
230 // workflow_run_id: <workflow_run_id>
231 // }
232 // }
233 const removeWorkflowRun = (topic, message, cb) => {
234 const eventrouter = require('./eventrouter.js');
235
236 let errorMessage;
237 if(!message) {
238 // error
239 errorMessage = `Message body for topic ${topic} is null or empty`;
240 logger.log('warn', `Return error - ${errorMessage}`);
241 cb(errorMessage, false);
242 return;
243 }
244
245 if(!('workflow_id' in message)) {
246 // error
247 errorMessage = `field 'workflow_id' does not exist in message body - ${JSON.stringify(message)}`;
248 logger.log('warn', `Return error - ${errorMessage}`);
249 cb(errorMessage, false);
250 return;
251 }
252
253 if(!('workflow_run_id' in message)) {
254 // error
255 errorMessage = `field 'workflow_run_id' does not exist in message body - ${JSON.stringify(message)}`;
256 logger.log('warn', `Return error - ${errorMessage}`);
257 cb(errorMessage, false);
258 return;
259 }
260
261 let workflowRunId = message.workflow_run_id;
262
263 let result = eventrouter.removeWorkflowRun(workflowRunId);
264 cb(null, result);
265 return;
266 }
267
268 const getRouter = () => {
269 return {
270 registWorkflow: {
271 topic: serviceEvents.WORKFLOW_REG,
272 handler: registWorkflow
273 },
274 registerWorkflowEssence: {
275 topic: serviceEvents.WORKFLOW_REG_ESSENCE,
276 handler: registerWorkflowEssence
277 },
278 listWorkflows: {
279 topic: serviceEvents.WORKFLOW_LIST,
280 handler: listWorkflows
281 },
282 listWorkflowRuns: {
283 topic: serviceEvents.WORKFLOW_RUN_LIST,
284 handler: listWorkflowRuns
285 },
286 checkWorkflow: {
287 topic: serviceEvents.WORKFLOW_CHECK,
288 handler: checkWorkflow
289 },
290 notifyWorkflowStart: {
291 topic: serviceEvents.WORKFLOW_KICKSTART,
292 handler: notifyWorkflowStart,
293 return: false
294 },
295 removeWorkflow: {
296 topic: serviceEvents.WORKFLOW_REMOVE,
297 handler: removeWorkflow
298 },
299 removeWorkflowRun: {
300 topic: serviceEvents.WORKFLOW_RUN_REMOVE,
301 handler: removeWorkflowRun
302 }
303 };
304 };
305
306 // out-going commands
307 const kickstartWorkflow = (workflowId, workflowRunId) => {
308 const eventrouter = require('./eventrouter.js');
309
310 let clients = eventrouter.getWorkflowManagerClients();
311 _.forOwn(clients, (client, _clientId) => {
312 let socket = client.getSocket();
313 if(socket) {
314 socket.emit(serviceEvents.WORKFLOW_KICKSTART, {
315 workflow_id: workflowId,
316 workflow_run_id: workflowRunId
317 });
318 }
319 });
320 return;
321 };
322
323 module.exports = {
324 serviceEvents: serviceEvents,
325 getRouter: getRouter,
326 kickstartWorkflow: kickstartWorkflow
327 };
328})();