blob: 2e5645c85b62dc6957961f7012431e2fa7aee69c [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Manager
19
20This module implements Workflow Manager interface
21"""
22
23import json
24import socketio
25
26from .countdown_latch import CountDownLatch
27from .utils import get_noop_logger, gen_id, gen_seq_id
28from .errors import ClientRPCError, ClientInputError, ClientResponseError
29
30WAIT_TIMEOUT = 10 # 10 seconds
31
32# controller -> manager
33GREETING = 'cord.workflow.ctlsvc.greeting'
34WORKFLOW_KICKSTART = 'cord.workflow.ctlsvc.workflow.kickstart'
Illyoung Choi4df34b72019-07-18 13:55:18 -070035WORKFLOW_CHECK_STATE = 'cord.workflow.ctlsvc.workflow.check.state'
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070036
37# manager -> controller -> manager
38WORKFLOW_REGISTER = 'cord.workflow.ctlsvc.workflow.register'
39WORKFLOW_REGISTER_ESSENCE = 'cord.workflow.ctlsvc.workflow.register_essence'
40WORKFLOW_LIST = 'cord.workflow.ctlsvc.workflow.list'
41WORKFLOW_LIST_RUN = 'cord.workflow.ctlsvc.workflow.run.list'
42WORKFLOW_CHECK = 'cord.workflow.ctlsvc.workflow.check'
43WORKFLOW_REMOVE = 'cord.workflow.ctlsvc.workflow.remove'
44WORKFLOW_REMOVE_RUN = 'cord.workflow.ctlsvc.workflow.run.remove'
Illyoung Choi4df34b72019-07-18 13:55:18 -070045WORKFLOW_REPORT_NEW_RUN = 'cord.workflow.ctlsvc.workflow.report_new_run'
46WORKFLOW_REPORT_RUN_STATE = 'cord.workflow.ctlsvc.workflow.report_run_state'
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070047
48
49class Manager(object):
50 def __init__(self, logger=None, name=None):
51 self.sio = socketio.Client()
52
53 if logger:
54 self.logger = logger
55 else:
56 self.logger = get_noop_logger()
57
58 if name:
59 self.name = name
60 else:
61 self.name = 'manager_%s' % gen_id()
62
63 self.req_id = gen_seq_id()
64
65 # set sio handlers
66 self.logger.debug('Setting event handlers to Socket.IO')
67 self.sio.on('connect', self.__on_sio_connect)
68 self.sio.on('disconnect', self.__on_sio_disconnect)
69 self.sio.on(WORKFLOW_KICKSTART, self.__on_kickstart_message)
Illyoung Choi4df34b72019-07-18 13:55:18 -070070 self.sio.on(WORKFLOW_CHECK_STATE, self.__on_check_state_message)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070071 self.sio.on(GREETING, self.__on_greeting_message)
72 self.sio.on(WORKFLOW_REGISTER, self.__on_workflow_reg_message)
73 self.sio.on(WORKFLOW_REGISTER_ESSENCE, self.__on_workflow_reg_essence_message)
74 self.sio.on(WORKFLOW_LIST, self.__on_workflow_list_message)
75 self.sio.on(WORKFLOW_LIST_RUN, self.__on_workflow_list_run_message)
76 self.sio.on(WORKFLOW_CHECK, self.__on_workflow_check_message)
77 self.sio.on(WORKFLOW_REMOVE, self.__on_workflow_remove_message)
78 self.sio.on(WORKFLOW_REMOVE_RUN, self.__on_workflow_remove_run_message)
Illyoung Choi4df34b72019-07-18 13:55:18 -070079 self.sio.on(WORKFLOW_REPORT_NEW_RUN, self.__on_workflow_report_new_run_message)
80 self.sio.on(WORKFLOW_REPORT_RUN_STATE, self.__on_workflow_report_run_state_message)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070081
82 self.handlers = {
83 'connect': self.__noop_connect_handler,
84 'disconnect': self.__noop_disconnect_handler,
Illyoung Choi4df34b72019-07-18 13:55:18 -070085 'kickstart': self.__noop_kickstart_handler,
86 'check_state': self.__noop_check_state_handler
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070087 }
88
89 # key is req_id
90 self.pending_requests = {}
91
92 def set_logger(self, logger):
93 self.logger = logger
94
95 def get_logger(self):
96 return self.logger
97
98 def __on_sio_connect(self):
99 self.logger.debug('connected to the server')
100 handler = self.handlers['connect']
101 if callable(handler):
102 handler()
103
104 def __noop_connect_handler(self):
105 self.logger.debug('no-op connect handler')
106
107 def __on_sio_disconnect(self):
108 self.logger.debug('disconnected from the server')
109 handler = self.handlers['disconnect']
110 if callable(handler):
111 handler()
112
113 def __noop_disconnect_handler(self):
114 self.logger.debug('no-op disconnect handler')
115
116 def __noop_kickstart_handler(self, workflow_id, workflow_run_id):
117 self.logger.debug('no-op kickstart handler')
118
Illyoung Choi4df34b72019-07-18 13:55:18 -0700119 def __noop_check_state_handler(self, workflow_id, workflow_run_id):
120 self.logger.debug('no-op check-state handler')
121
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700122 def __get_next_req_id(self):
123 req_id = self.req_id
124 self.req_id += 1
125 return req_id
126
127 def __on_greeting_message(self, data):
128 self.logger.debug('received a gretting message from the server')
129
130 def __on_kickstart_message(self, data):
131 """
132 Handler for a kickstart event
133 REQ = {
134 'workflow_id': <workflow_id>,
135 'workflow_run_id': <workflow_run_id>
136 }
137 """
138 self.logger.info('received a kickstart message from the server')
139 workflow_id = data['workflow_id']
140 workflow_run_id = data['workflow_run_id']
141
142 self.logger.info(
143 'a kickstart message - workflow_id (%s), workflow_run_id (%s)' %
144 (workflow_id, workflow_run_id)
145 )
146 if workflow_id and workflow_run_id:
147 handler = self.handlers['kickstart']
148 if callable(handler):
149 self.logger.info('calling a kickstart handler - %s' % handler)
150 handler(workflow_id, workflow_run_id)
151
Illyoung Choi4df34b72019-07-18 13:55:18 -0700152 def __on_check_state_message(self, data):
153 """
154 Handler for a check-state event
155 REQ = {
156 'workflow_id': <workflow_id>,
157 'workflow_run_id': <workflow_run_id>
158 }
159 """
160 self.logger.info('received a check-state message from the server')
161 workflow_id = data['workflow_id']
162 workflow_run_id = data['workflow_run_id']
163
164 self.logger.info(
165 'a check-state message - workflow_id (%s), workflow_run_id (%s)' %
166 (workflow_id, workflow_run_id)
167 )
168 if workflow_id and workflow_run_id:
169 handler = self.handlers['check_state']
170 if callable(handler):
171 self.logger.info('calling a check-state handler - %s' % handler)
172 handler(workflow_id, workflow_run_id)
173
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700174 def __on_workflow_reg_message(self, data):
175 self.__on_response(WORKFLOW_REGISTER, data)
176
177 def __on_workflow_reg_essence_message(self, data):
178 self.__on_response(WORKFLOW_REGISTER_ESSENCE, data)
179
180 def __on_workflow_list_message(self, data):
181 self.__on_response(WORKFLOW_LIST, data)
182
183 def __on_workflow_list_run_message(self, data):
184 self.__on_response(WORKFLOW_LIST_RUN, data)
185
186 def __on_workflow_check_message(self, data):
187 self.__on_response(WORKFLOW_CHECK, data)
188
189 def __on_workflow_remove_message(self, data):
190 self.__on_response(WORKFLOW_REMOVE, data)
191
192 def __on_workflow_remove_run_message(self, data):
193 self.__on_response(WORKFLOW_REMOVE_RUN, data)
194
Illyoung Choi4df34b72019-07-18 13:55:18 -0700195 def __on_workflow_report_new_run_message(self, data):
196 self.__on_response(WORKFLOW_REPORT_NEW_RUN, data)
197
198 def __on_workflow_report_run_state_message(self, data):
199 self.__on_response(WORKFLOW_REPORT_RUN_STATE, data)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700200
201 def __check_pending_request(self, req_id):
202 """
203 Check a pending request
204 """
205 if req_id in self.pending_requests:
206 return True
207 return False
208
209 def __put_pending_request(self, api, params):
210 """
211 Put a pending request to a queue
212 """
213 req_id = self.__get_next_req_id()
214 latch = CountDownLatch()
215 params['req_id'] = req_id # inject req_id
216 self.sio.emit(api, params)
217 self.pending_requests[req_id] = {
218 'req_id': req_id,
219 'latch': latch,
220 'api': api,
221 'params': params,
222 'result': None
223 }
224 return req_id
225
226 def __wait_response(self, req_id):
227 """
228 Wait for completion of a request
229 """
230 if req_id in self.pending_requests:
231 req = self.pending_requests[req_id]
232 # python v 3.2 or below does not return a result
233 # that tells whether it is timedout or not
234 return req['latch'].wait(WAIT_TIMEOUT)
235 else:
236 self.logger.error(
237 'cannot find a pending request (%s) from a queue' % req_id
238 )
239 raise ClientRPCError(
240 req_id,
241 'cannot find a pending request (%s) from a queue' % req_id
242 )
243
244 def __complete_request(self, req_id, result):
245 """
246 Compelete a pending request
247 """
248 if req_id in self.pending_requests:
249 req = self.pending_requests[req_id]
250 req['latch'].count_down()
251 req['result'] = result
252 return
253
254 self.logger.error(
255 'cannot find a pending request (%s) from a queue' % req_id
256 )
257 raise ClientRPCError(
258 req_id,
259 'cannot find a pending request (%s) from a queue' % req_id
260 )
261
262 def __pop_pending_request(self, req_name):
263 """
264 Pop a pending request from a queue
265 """
266 return self.pending_requests.pop(req_name, None)
267
268 def connect(self, url):
269 """
270 Connect to the given url
271 """
272 query_string = 'id=%s&type=workflow_manager&name=%s' % (self.name, self.name)
273 connect_url = '%s?%s' % (url, query_string)
274
275 if not (connect_url.startswith('http://') or connect_url.startswith('https://')):
276 connect_url = 'http://%s' % connect_url
277
278 self.logger.debug('Connecting to a Socket.IO server (%s)' % connect_url)
279 self.sio.connect(url=connect_url, transports=['websocket'])
280
281 def disconnect(self):
282 """
283 Disconnect from the server
284 """
285 self.sio.disconnect()
286
287 def wait(self):
288 self.sio.wait()
289
290 def sleep(self, sec):
291 self.sio.sleep(sec)
292
293 def get_handlers(self):
294 return self.handlers
295
296 def set_handlers(self, new_handlers):
297 for k in self.handlers:
298 if k in new_handlers:
299 self.handlers[k] = new_handlers[k]
300
301 def __request(self, api, params={}):
302 if api and params:
303 req_id = self.__put_pending_request(api, params)
304 self.logger.debug('waiting for a response for req_id (%s)' % req_id)
305 self.__wait_response(req_id) # wait for completion
306 req = self.__pop_pending_request(req_id)
307 if req:
308 if req['latch'].get_count() > 0:
309 # timed out
310 self.logger.error('request (%s) timed out' % req_id)
311 raise ClientRPCError(
312 req_id,
313 'request (%s) timed out' % req_id
314 )
315 else:
316 return req['result']
317 else:
318 self.logger.error('cannot find a pending request (%s) from a queue' % req_id)
319 raise ClientRPCError(
320 req_id,
321 'cannot find a pending request (%s) from a queue' % req_id
322 )
323 else:
324 self.logger.error(
325 'invalid arguments api (%s), params (%s)' %
326 (api, json.dumps(params))
327 )
328 raise ClientInputError(
329 'invalid arguments api (%s), params (%s)' %
330 (api, json.dumps(params))
331 )
332
333 def __on_response(self, api, result):
334 if result and 'req_id' in result:
335 self.logger.debug('completing a request (%s)' % result['req_id'])
336 self.__complete_request(result['req_id'], result)
337 else:
338 self.logger.error(
339 'invalid arguments api (%s), result (%s)' %
340 (api, json.dumps(result))
341 )
342 raise ClientInputError(
343 'invalid arguments api (%s), result (%s)' %
344 (api, json.dumps(result))
345 )
346
347 def register_workflow(self, workflow):
348 """
349 Register a workflow.
350 Workflow parameter is a workflow object
351 """
352 if workflow:
353 result = self.__request(WORKFLOW_REGISTER, {
354 'workflow': workflow
355 })
356 if result['error']:
357 self.logger.error(
358 'request (%s) failed with an error - %s' %
359 (result['req_id'], result['message'])
360 )
361 raise ClientResponseError(
362 'request (%s) failed with an error - %s' %
363 (result['req_id'], result['message'])
364 )
365 else:
366 return result['result']
367 else:
368 self.logger.error(
369 'invalid arguments workflow (%s)' %
370 json.dumps(workflow)
371 )
372 raise ClientInputError(
373 'invalid arguments workflow (%s)' %
374 json.dumps(workflow)
375 )
376
377 def register_workflow_essence(self, essence):
378 """
379 Register a workflow by essence.
380 """
381 if essence:
382 result = self.__request(WORKFLOW_REGISTER_ESSENCE, {
383 'essence': essence
384 })
385 if result['error']:
386 self.logger.error(
387 'request (%s) failed with an error - %s' %
388 (result['req_id'], result['message'])
389 )
390 raise ClientResponseError(
391 'request (%s) failed with an error - %s' %
392 (result['req_id'], result['message'])
393 )
394 else:
395 return result['result']
396 else:
397 self.logger.error(
398 'invalid arguments workflow essence (%s)' %
399 json.dumps(essence)
400 )
401 raise ClientInputError(
402 'invalid arguments workflow essence (%s)' %
403 json.dumps(essence)
404 )
405
406 def list_workflows(self):
407 """
408 List workflows.
409 """
410 result = self.__request(WORKFLOW_LIST, {})
411 if result['error']:
412 self.logger.error(
413 'request (%s) failed with an error - %s' %
414 (result['req_id'], result['message'])
415 )
416 raise ClientResponseError(
417 'request (%s) failed with an error - %s' %
418 (result['req_id'], result['message'])
419 )
420 else:
421 return result['result']
422
423 def list_workflow_runs(self):
424 """
425 List workflow runs.
426 """
427 result = self.__request(WORKFLOW_LIST_RUN, {})
428 if result['error']:
429 self.logger.error(
430 'request (%s) failed with an error - %s' %
431 (result['req_id'], result['message'])
432 )
433 raise ClientResponseError(
434 'request (%s) failed with an error - %s' %
435 (result['req_id'], result['message'])
436 )
437 else:
438 return result['result']
439
440 def check_workflow(self, workflow_id):
441 """
442 Check a workflow.
443 """
444 if workflow_id:
445 result = self.__request(WORKFLOW_CHECK, {
446 'workflow_id': workflow_id
447 })
448 if result['error']:
449 self.logger.error(
450 'request (%s) failed with an error - %s' %
451 (result['req_id'], result['message'])
452 )
453 raise ClientResponseError(
454 'request (%s) failed with an error - %s' %
455 (result['req_id'], result['message'])
456 )
457 else:
458 return result['result']
459 else:
460 self.logger.error(
461 'invalid arguments workflow_id (%s)' %
462 workflow_id
463 )
464 raise ClientInputError(
465 'invalid arguments workflow_id (%s)' %
466 workflow_id
467 )
468
469 def remove_workflow(self, workflow_id):
470 """
471 Remove a workflow.
472 """
473 if workflow_id:
474 result = self.__request(WORKFLOW_REMOVE, {
475 'workflow_id': workflow_id
476 })
477 if result['error']:
478 self.logger.error(
479 'request (%s) failed with an error - %s' %
480 (result['req_id'], result['message'])
481 )
482 raise ClientResponseError(
483 'request (%s) failed with an error - %s' %
484 (result['req_id'], result['message'])
485 )
486 else:
487 return result['result']
488 else:
489 self.logger.error(
490 'invalid arguments workflow_id (%s)' %
491 workflow_id
492 )
493 raise ClientInputError(
494 'invalid arguments workflow_id (%s)' %
495 workflow_id
496 )
497
498 def remove_workflow_run(self, workflow_id, workflow_run_id):
499 """
500 Remove a workflow run.
501 """
502 if workflow_id and workflow_run_id:
503 result = self.__request(WORKFLOW_REMOVE_RUN, {
504 'workflow_id': workflow_id,
505 'workflow_run_id': workflow_run_id
506 })
507 if result['error']:
508 self.logger.error(
509 'request (%s) failed with an error - %s' %
510 (result['req_id'], result['message'])
511 )
512 raise ClientResponseError(
513 'request (%s) failed with an error - %s' %
514 (result['req_id'], result['message'])
515 )
516 else:
517 return result['result']
518 else:
519 self.logger.error(
520 'invalid arguments workflow_id (%s) workflow_run_id (%s)' %
521 (workflow_id, workflow_run_id)
522 )
523 raise ClientInputError(
524 'invalid arguments workflow_id (%s) workflow_run_id (%s)' %
525 (workflow_id, workflow_run_id)
526 )
527
Illyoung Choi4df34b72019-07-18 13:55:18 -0700528 def report_new_workflow_run(self, workflow_id, workflow_run_id):
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700529 """
Illyoung Choi4df34b72019-07-18 13:55:18 -0700530 Report a new workflow run
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700531 """
532 if workflow_id and workflow_run_id:
Illyoung Choi4df34b72019-07-18 13:55:18 -0700533 result = self.__request(WORKFLOW_REPORT_NEW_RUN, {
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700534 'workflow_id': workflow_id,
535 'workflow_run_id': workflow_run_id
536 })
537 if result['error']:
538 self.logger.error(
539 'request (%s) failed with an error - %s' %
540 (result['req_id'], result['message'])
541 )
542 raise ClientResponseError(
543 'request (%s) failed with an error - %s' %
544 (result['req_id'], result['message'])
545 )
546 else:
547 return result['result']
548 else:
549 self.logger.error(
550 'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
551 (workflow_id, workflow_run_id)
552 )
553 raise ClientInputError(
554 'invalid arguments workflow_id (%s), workflow_run_id (%s)' %
555 (workflow_id, workflow_run_id)
556 )
Illyoung Choi4df34b72019-07-18 13:55:18 -0700557
558 def report_workflow_run_state(self, workflow_id, workflow_run_id, state):
559 """
560 Report a new workflow run
561 """
562 if workflow_id and workflow_run_id and state:
563 result = self.__request(WORKFLOW_REPORT_RUN_STATE, {
564 'workflow_id': workflow_id,
565 'workflow_run_id': workflow_run_id,
566 'state': state
567 })
568 if result['error']:
569 self.logger.error(
570 'request (%s) failed with an error - %s' %
571 (result['req_id'], result['message'])
572 )
573 raise ClientResponseError(
574 'request (%s) failed with an error - %s' %
575 (result['req_id'], result['message'])
576 )
577 else:
578 return result['result']
579 else:
580 self.logger.error(
581 'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
582 (workflow_id, workflow_run_id, state)
583 )
584 raise ClientInputError(
585 'invalid arguments workflow_id (%s), workflow_run_id (%s), state (%s)' %
586 (workflow_id, workflow_run_id, state)
587 )