blob: d788fe9486a04541aa374d0b1394f6aa385cbc27 [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Manager
19
20This module implements Workflow Manager interface
21"""
22
23import json
24import socketio
25
26from .countdown_latch import CountDownLatch
27from .utils import get_noop_logger, gen_id, gen_seq_id
28from .errors import ClientRPCError, ClientInputError, ClientResponseError
29
30WAIT_TIMEOUT = 10 # 10 seconds
31
32# controller -> manager
33GREETING = 'cord.workflow.ctlsvc.greeting'
34WORKFLOW_KICKSTART = 'cord.workflow.ctlsvc.workflow.kickstart'
35
36# manager -> controller -> manager
37WORKFLOW_REGISTER = 'cord.workflow.ctlsvc.workflow.register'
38WORKFLOW_REGISTER_ESSENCE = 'cord.workflow.ctlsvc.workflow.register_essence'
39WORKFLOW_LIST = 'cord.workflow.ctlsvc.workflow.list'
40WORKFLOW_LIST_RUN = 'cord.workflow.ctlsvc.workflow.run.list'
41WORKFLOW_CHECK = 'cord.workflow.ctlsvc.workflow.check'
42WORKFLOW_REMOVE = 'cord.workflow.ctlsvc.workflow.remove'
43WORKFLOW_REMOVE_RUN = 'cord.workflow.ctlsvc.workflow.run.remove'
44WORKFLOW_NOTIFY_NEW_RUN = 'cord.workflow.ctlsvc.workflow.notify_new_run'
45
46
47class Manager(object):
48 def __init__(self, logger=None, name=None):
49 self.sio = socketio.Client()
50
51 if logger:
52 self.logger = logger
53 else:
54 self.logger = get_noop_logger()
55
56 if name:
57 self.name = name
58 else:
59 self.name = 'manager_%s' % gen_id()
60
61 self.req_id = gen_seq_id()
62
63 # set sio handlers
64 self.logger.debug('Setting event handlers to Socket.IO')
65 self.sio.on('connect', self.__on_sio_connect)
66 self.sio.on('disconnect', self.__on_sio_disconnect)
67 self.sio.on(WORKFLOW_KICKSTART, self.__on_kickstart_message)
68 self.sio.on(GREETING, self.__on_greeting_message)
69 self.sio.on(WORKFLOW_REGISTER, self.__on_workflow_reg_message)
70 self.sio.on(WORKFLOW_REGISTER_ESSENCE, self.__on_workflow_reg_essence_message)
71 self.sio.on(WORKFLOW_LIST, self.__on_workflow_list_message)
72 self.sio.on(WORKFLOW_LIST_RUN, self.__on_workflow_list_run_message)
73 self.sio.on(WORKFLOW_CHECK, self.__on_workflow_check_message)
74 self.sio.on(WORKFLOW_REMOVE, self.__on_workflow_remove_message)
75 self.sio.on(WORKFLOW_REMOVE_RUN, self.__on_workflow_remove_run_message)
76 self.sio.on(WORKFLOW_NOTIFY_NEW_RUN, self.__on_workflow_notify_new_run_message)
77
78 self.handlers = {
79 'connect': self.__noop_connect_handler,
80 'disconnect': self.__noop_disconnect_handler,
81 'kickstart': self.__noop_kickstart_handler
82 }
83
84 # key is req_id
85 self.pending_requests = {}
86
87 def set_logger(self, logger):
88 self.logger = logger
89
90 def get_logger(self):
91 return self.logger
92
93 def __on_sio_connect(self):
94 self.logger.debug('connected to the server')
95 handler = self.handlers['connect']
96 if callable(handler):
97 handler()
98
99 def __noop_connect_handler(self):
100 self.logger.debug('no-op connect handler')
101
102 def __on_sio_disconnect(self):
103 self.logger.debug('disconnected from the server')
104 handler = self.handlers['disconnect']
105 if callable(handler):
106 handler()
107
108 def __noop_disconnect_handler(self):
109 self.logger.debug('no-op disconnect handler')
110
111 def __noop_kickstart_handler(self, workflow_id, workflow_run_id):
112 self.logger.debug('no-op kickstart handler')
113
114 def __get_next_req_id(self):
115 req_id = self.req_id
116 self.req_id += 1
117 return req_id
118
119 def __on_greeting_message(self, data):
120 self.logger.debug('received a gretting message from the server')
121
122 def __on_kickstart_message(self, data):
123 """
124 Handler for a kickstart event
125 REQ = {
126 'workflow_id': <workflow_id>,
127 'workflow_run_id': <workflow_run_id>
128 }
129 """
130 self.logger.info('received a kickstart message from the server')
131 workflow_id = data['workflow_id']
132 workflow_run_id = data['workflow_run_id']
133
134 self.logger.info(
135 'a kickstart message - workflow_id (%s), workflow_run_id (%s)' %
136 (workflow_id, workflow_run_id)
137 )
138 if workflow_id and workflow_run_id:
139 handler = self.handlers['kickstart']
140 if callable(handler):
141 self.logger.info('calling a kickstart handler - %s' % handler)
142 handler(workflow_id, workflow_run_id)
143
144 def __on_workflow_reg_message(self, data):
145 self.__on_response(WORKFLOW_REGISTER, data)
146
147 def __on_workflow_reg_essence_message(self, data):
148 self.__on_response(WORKFLOW_REGISTER_ESSENCE, data)
149
150 def __on_workflow_list_message(self, data):
151 self.__on_response(WORKFLOW_LIST, data)
152
153 def __on_workflow_list_run_message(self, data):
154 self.__on_response(WORKFLOW_LIST_RUN, data)
155
156 def __on_workflow_check_message(self, data):
157 self.__on_response(WORKFLOW_CHECK, data)
158
159 def __on_workflow_remove_message(self, data):
160 self.__on_response(WORKFLOW_REMOVE, data)
161
162 def __on_workflow_remove_run_message(self, data):
163 self.__on_response(WORKFLOW_REMOVE_RUN, data)
164
165 def __on_workflow_notify_new_run_message(self, data):
166 self.__on_response(WORKFLOW_NOTIFY_NEW_RUN, data)
167
168 def __check_pending_request(self, req_id):
169 """
170 Check a pending request
171 """
172 if req_id in self.pending_requests:
173 return True
174 return False
175
176 def __put_pending_request(self, api, params):
177 """
178 Put a pending request to a queue
179 """
180 req_id = self.__get_next_req_id()
181 latch = CountDownLatch()
182 params['req_id'] = req_id # inject req_id
183 self.sio.emit(api, params)
184 self.pending_requests[req_id] = {
185 'req_id': req_id,
186 'latch': latch,
187 'api': api,
188 'params': params,
189 'result': None
190 }
191 return req_id
192
193 def __wait_response(self, req_id):
194 """
195 Wait for completion of a request
196 """
197 if req_id in self.pending_requests:
198 req = self.pending_requests[req_id]
199 # python v 3.2 or below does not return a result
200 # that tells whether it is timedout or not
201 return req['latch'].wait(WAIT_TIMEOUT)
202 else:
203 self.logger.error(
204 'cannot find a pending request (%s) from a queue' % req_id
205 )
206 raise ClientRPCError(
207 req_id,
208 'cannot find a pending request (%s) from a queue' % req_id
209 )
210
211 def __complete_request(self, req_id, result):
212 """
213 Compelete a pending request
214 """
215 if req_id in self.pending_requests:
216 req = self.pending_requests[req_id]
217 req['latch'].count_down()
218 req['result'] = result
219 return
220
221 self.logger.error(
222 'cannot find a pending request (%s) from a queue' % req_id
223 )
224 raise ClientRPCError(
225 req_id,
226 'cannot find a pending request (%s) from a queue' % req_id
227 )
228
229 def __pop_pending_request(self, req_name):
230 """
231 Pop a pending request from a queue
232 """
233 return self.pending_requests.pop(req_name, None)
234
235 def connect(self, url):
236 """
237 Connect to the given url
238 """
239 query_string = 'id=%s&type=workflow_manager&name=%s' % (self.name, self.name)
240 connect_url = '%s?%s' % (url, query_string)
241
242 if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
243 connect_url = 'http://%s' % connect_url
244
245 self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
246 self.sio.connect(url=connect_url, transports=['websocket'])
247
248 def disconnect(self):
249 """
250 Disconnect from the server
251 """
252 self.sio.disconnect()
253
254 def wait(self):
255 self.sio.wait()
256
257 def sleep(self, sec):
258 self.sio.sleep(sec)
259
260 def get_handlers(self):
261 return self.handlers
262
263 def set_handlers(self, new_handlers):
264 for k in self.handlers:
265 if k in new_handlers:
266 self.handlers[k] = new_handlers[k]
267
268 def __request(self, api, params={}):
269 if api and params:
270 req_id = self.__put_pending_request(api, params)
271 self.logger.debug('waiting for a response for req_id (%s)' % req_id)
272 self.__wait_response(req_id) # wait for completion
273 req = self.__pop_pending_request(req_id)
274 if req:
275 if req['latch'].get_count() > 0:
276 # timed out
277 self.logger.error('request (%s) timed out' % req_id)
278 raise ClientRPCError(
279 req_id,
280 'request (%s) timed out' % req_id
281 )
282 else:
283 return req['result']
284 else:
285 self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
286 raise ClientRPCError(
287 req_id,
288 'cannot find a pending request (%s) from a queue' % req_id
289 )
290 else:
291 self.logger.error(
292 'invalid arguments api (%s), params (%s)' %
293 (api, json.dumps(params))
294 )
295 raise ClientInputError(
296 'invalid arguments api (%s), params (%s)' %
297 (api, json.dumps(params))
298 )
299
300 def __on_response(self, api, result):
301 if result and 'req_id' in result:
302 self.logger.debug('completing a request (%s)' % result['req_id'])
303 self.__complete_request(result['req_id'], result)
304 else:
305 self.logger.error(
306 'invalid arguments api (%s), result (%s)' %
307 (api, json.dumps(result))
308 )
309 raise ClientInputError(
310 'invalid arguments api (%s), result (%s)' %
311 (api, json.dumps(result))
312 )
313
314 def register_workflow(self, workflow):
315 """
316 Register a workflow.
317 Workflow parameter is a workflow object
318 """
319 if workflow:
320 result = self.__request(WORKFLOW_REGISTER, {
321 'workflow': workflow
322 })
323 if result['error']:
324 self.logger.error(
325 'request (%s) failed with an error - %s' %
326 (result['req_id'], result['message'])
327 )
328 raise ClientResponseError(
329 'request (%s) failed with an error - %s' %
330 (result['req_id'], result['message'])
331 )
332 else:
333 return result['result']
334 else:
335 self.logger.error(
336 'invalid arguments workflow (%s)' %
337 json.dumps(workflow)
338 )
339 raise ClientInputError(
340 'invalid arguments workflow (%s)' %
341 json.dumps(workflow)
342 )
343
344 def register_workflow_essence(self, essence):
345 """
346 Register a workflow by essence.
347 """
348 if essence:
349 result = self.__request(WORKFLOW_REGISTER_ESSENCE, {
350 'essence': essence
351 })
352 if result['error']:
353 self.logger.error(
354 'request (%s) failed with an error - %s' %
355 (result['req_id'], result['message'])
356 )
357 raise ClientResponseError(
358 'request (%s) failed with an error - %s' %
359 (result['req_id'], result['message'])
360 )
361 else:
362 return result['result']
363 else:
364 self.logger.error(
365 'invalid arguments workflow essence (%s)' %
366 json.dumps(essence)
367 )
368 raise ClientInputError(
369 'invalid arguments workflow essence (%s)' %
370 json.dumps(essence)
371 )
372
373 def list_workflows(self):
374 """
375 List workflows.
376 """
377 result = self.__request(WORKFLOW_LIST, {})
378 if result['error']:
379 self.logger.error(
380 'request (%s) failed with an error - %s' %
381 (result['req_id'], result['message'])
382 )
383 raise ClientResponseError(
384 'request (%s) failed with an error - %s' %
385 (result['req_id'], result['message'])
386 )
387 else:
388 return result['result']
389
390 def list_workflow_runs(self):
391 """
392 List workflow runs.
393 """
394 result = self.__request(WORKFLOW_LIST_RUN, {})
395 if result['error']:
396 self.logger.error(
397 'request (%s) failed with an error - %s' %
398 (result['req_id'], result['message'])
399 )
400 raise ClientResponseError(
401 'request (%s) failed with an error - %s' %
402 (result['req_id'], result['message'])
403 )
404 else:
405 return result['result']
406
407 def check_workflow(self, workflow_id):
408 """
409 Check a workflow.
410 """
411 if workflow_id:
412 result = self.__request(WORKFLOW_CHECK, {
413 'workflow_id': workflow_id
414 })
415 if result['error']:
416 self.logger.error(
417 'request (%s) failed with an error - %s' %
418 (result['req_id'], result['message'])
419 )
420 raise ClientResponseError(
421 'request (%s) failed with an error - %s' %
422 (result['req_id'], result['message'])
423 )
424 else:
425 return result['result']
426 else:
427 self.logger.error(
428 'invalid arguments workflow_id (%s)' %
429 workflow_id
430 )
431 raise ClientInputError(
432 'invalid arguments workflow_id (%s)' %
433 workflow_id
434 )
435
436 def remove_workflow(self, workflow_id):
437 """
438 Remove a workflow.
439 """
440 if workflow_id:
441 result = self.__request(WORKFLOW_REMOVE, {
442 'workflow_id': workflow_id
443 })
444 if result['error']:
445 self.logger.error(
446 'request (%s) failed with an error - %s' %
447 (result['req_id'], result['message'])
448 )
449 raise ClientResponseError(
450 'request (%s) failed with an error - %s' %
451 (result['req_id'], result['message'])
452 )
453 else:
454 return result['result']
455 else:
456 self.logger.error(
457 'invalid arguments workflow_id (%s)' %
458 workflow_id
459 )
460 raise ClientInputError(
461 'invalid arguments workflow_id (%s)' %
462 workflow_id
463 )
464
465 def remove_workflow_run(self, workflow_id, workflow_run_id):
466 """
467 Remove a workflow run.
468 """
469 if workflow_id and workflow_run_id:
470 result = self.__request(WORKFLOW_REMOVE_RUN, {
471 'workflow_id': workflow_id,
472 'workflow_run_id': workflow_run_id
473 })
474 if result['error']:
475 self.logger.error(
476 'request (%s) failed with an error - %s' %
477 (result['req_id'], result['message'])
478 )
479 raise ClientResponseError(
480 'request (%s) failed with an error - %s' %
481 (result['req_id'], result['message'])
482 )
483 else:
484 return result['result']
485 else:
486 self.logger.error(
487 'invalid arguments workflow_id (%s) workflow_run_id (%s)' %
488 (workflow_id, workflow_run_id)
489 )
490 raise ClientInputError(
491 'invalid arguments workflow_id (%s) workflow_run_id (%s)' %
492 (workflow_id, workflow_run_id)
493 )
494
495 def notify_new_workflow_run(self, workflow_id, workflow_run_id):
496 """
497 Notify a new workflow run
498 """
499 if workflow_id and workflow_run_id:
500 result = self.__request(WORKFLOW_NOTIFY_NEW_RUN, {
501 'workflow_id': workflow_id,
502 'workflow_run_id': workflow_run_id
503 })
504 if result['error']:
505 self.logger.error(
506 'request (%s) failed with an error - %s' %
507 (result['req_id'], result['message'])
508 )
509 raise ClientResponseError(
510 'request (%s) failed with an error - %s' %
511 (result['req_id'], result['message'])
512 )
513 else:
514 return result['result']
515 else:
516 self.logger.error(
517 'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
518 (workflow_id, workflow_run_id)
519 )
520 raise ClientInputError(
521 'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
522 (workflow_id, workflow_run_id)
523 )