blob: d127cbde54a59090d02ed31808a5a445c9cbed43 [file] [log] [blame]
Illyoung Choia9d2c2c2019-07-12 13:29:42 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15from __future__ import absolute_import
16import socketio
17import psutil
18import time
19import datetime
20
21from threading import Timer
22from gevent import pywsgi
23from geventwebsocket.handler import WebSocketHandler
24from multiprocessing import Process
25from multistructlog import create_logger
Illyoung Choi67e54e72019-07-25 10:44:59 -070026from cord_workflow_controller_client.probe import GREETING, EVENT_EMIT
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070027from cord_workflow_controller_client.manager \
28 import (WORKFLOW_KICKSTART,
29 WORKFLOW_REGISTER, WORKFLOW_REGISTER_ESSENCE, WORKFLOW_LIST, WORKFLOW_LIST_RUN,
Illyoung Choi4df34b72019-07-18 13:55:18 -070030 WORKFLOW_CHECK, WORKFLOW_REMOVE, WORKFLOW_REMOVE_RUN, WORKFLOW_REPORT_NEW_RUN)
Illyoung Choia9d2c2c2019-07-12 13:29:42 -070031from cord_workflow_controller_client.workflow_run \
32 import (WORKFLOW_RUN_NOTIFY_EVENT,
33 WORKFLOW_RUN_UPDATE_STATUS, WORKFLOW_RUN_COUNT_EVENTS, WORKFLOW_RUN_FETCH_EVENT)
34
35
36"""
37Run a dummy socket.io server as a separate process.
38serve_forever() blocks until the process is killed,
39so I had to use multi-process approach.
40"""
41
42log = create_logger()
43
44# Socket IO
45sio = None
46
47manager_clients = {}
48workflows = {}
49workflow_essences = {}
50workflow_runs = {}
51workflow_run_clients = {}
52seq_no = 1
53
54
55class repeatableTimer():
56 def __init__(self, time, handler, arg):
57 self.time = time
58 self.handler = handler
59 self.arg = arg
60 self.thread = Timer(self.time, self.on_tick)
61
62 def on_tick(self):
63 self.handler(self.arg)
64 self.thread = Timer(self.time, self.on_tick)
65 self.thread.start()
66
67 def start(self):
68 self.thread.start()
69
70 def cancel(self):
71 self.thread.cancel()
72
73
74def make_query_string_dict(query_string):
75 obj = {}
76 params = query_string.split('&')
77 for param in params:
78 kv = param.split('=')
79 key = kv[0]
80 val = kv[1]
81 obj[key] = val
82
83 return obj
84
85
86def _send_kickstart_event(sid):
87 global seq_no
88
89 workflow_id = 'dummy_workflow_%d' % seq_no
90 workflow_run_id = 'dummy_workflow_run_%d' % seq_no
91
92 seq_no += 1
93 log.info('sending a kickstart event to sid %s' % sid)
94 sio.emit(
95 event=WORKFLOW_KICKSTART,
96 data={
97 'workflow_id': workflow_id,
98 'workflow_run_id': workflow_run_id,
99 'timestamp': str(datetime.datetime.now())
100 },
101 room=sid
102 )
103
104
105def _send_notify_event(sid):
106 global seq_no
107
108 topic = 'topic_%s' % seq_no
109 message = {
110 'sample_key': 'sample_value'
111 }
112 seq_no += 1
113
114 run_client = workflow_run_clients[sid]
115 if run_client:
116 workflow_run_id = run_client['workflow_run_id']
117 workflow_run = workflow_runs[workflow_run_id]
118 if workflow_run:
119 workflow_run['queue'].append({
120 'topic': topic,
121 'message': message
122 })
123
124 log.info('sending a notify event to sid %s' % sid)
125 sio.emit(
126 event=WORKFLOW_RUN_NOTIFY_EVENT,
127 data={
128 'topic': topic,
129 'timestamp': str(datetime.datetime.now())
130 },
131 room=sid
132 )
133
134
135def _handle_event_connect(sid, query):
136 sio.emit(GREETING, {})
137
138 global last_client_action_time
139 last_client_action_time = datetime.datetime.now
140
141 # if the client is a manager, send kickstart events every 3 sec
142 if query['type'] == 'workflow_manager':
143 log.info('manager (%s) is connected' % sid)
144 kickstart_timer = repeatableTimer(2, _send_kickstart_event, sid)
145 manager_clients[sid] = {
146 'kickstart_timer': kickstart_timer
147 }
148
149 kickstart_timer.start()
150 elif query['type'] == 'workflow_run':
151 log.info('workflow run (%s) is connected' % sid)
152 notify_event_timer = repeatableTimer(2, _send_notify_event, sid)
153 workflow_run_clients[sid] = {
154 'workflow_id': query['workflow_id'],
155 'workflow_run_id': query['workflow_run_id'],
156 'notify_event_timer': notify_event_timer
157 }
158
159 notify_event_timer.start()
160
161
162def _handle_event_disconnect(sid):
163 if sid in manager_clients:
164 log.info('manager (%s) is disconnected' % sid)
165 if manager_clients[sid]['kickstart_timer']:
166 manager_clients[sid]['kickstart_timer'].cancel()
167
168 del manager_clients[sid]
169
170 if sid in workflow_run_clients:
171 log.info('workflow run (%s) is disconnected' % sid)
172 if workflow_run_clients[sid]['notify_event_timer']:
173 workflow_run_clients[sid]['notify_event_timer'].cancel()
174
175 del workflow_run_clients[sid]
176
177 global last_client_action_time
178 last_client_action_time = datetime.datetime.now
179
180
181def _get_req_id(body):
182 req_id = 101010
183 if 'req_id' in body:
184 req_id = int(body['req_id'])
185 return req_id
186
187
188def _handle_event_workflow_reg(sid, body):
189 data = {
190 'req_id': _get_req_id(body)
191 }
192
193 if 'workflow' in body:
194 workflow = body['workflow']
195 workflow_id = workflow['id']
196
197 if workflow_id in workflows:
198 # already exist
199 data['error'] = True
200 data['result'] = False
201 data['message'] = 'workflow is already registered'
202 else:
203 log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
204 workflows[workflow_id] = workflow
205
206 data['error'] = False
207 data['result'] = True
208 else:
209 data['error'] = True
210 data['result'] = False
211 data['message'] = 'workflow is not in the message body'
212
213 log.info('returning a result for workflow register event to sid %s' % sid)
214 sio.emit(
215 event=WORKFLOW_REGISTER,
216 data=data,
217 room=sid
218 )
219
220
221def _handle_event_workflow_reg_essence(sid, body):
222 data = {
223 'req_id': _get_req_id(body)
224 }
225
226 if 'essence' in body:
227 essence = body['essence']
228 for wid in essence:
229 workflow_essence = essence[wid]
230 if 'dag' in workflow_essence and 'dag_id' in workflow_essence['dag']:
231 dag = workflow_essence['dag']
232 workflow_id = dag['dag_id']
233
234 if workflow_id in workflow_essences or workflow_id in workflows:
235 # already exist
236 data['error'] = True
237 data['result'] = False
238 data['message'] = 'workflow is already registered'
239 else:
240 log.info('manager (%s) registers a workflow (%s)' % (sid, workflow_id))
241 workflow_essences[workflow_id] = workflow_essence
242
243 data['error'] = False
244 data['result'] = True
245 else:
246 data['error'] = True
247 data['result'] = False
248 data['message'] = 'essence is not in the message body'
249 else:
250 data['error'] = True
251 data['result'] = False
252 data['message'] = 'essence is not in the message body'
253
254 log.info('returning a result for workflow essence register event to sid %s' % sid)
255 sio.emit(
256 event=WORKFLOW_REGISTER_ESSENCE,
257 data=data,
258 room=sid
259 )
260
261
262def _handle_event_workflow_list(sid, body):
263 data = {
264 'req_id': _get_req_id(body)
265 }
266
267 workflow_ids = []
268
269 for workflow_id in workflows:
270 workflow_ids.append(workflow_id)
271
272 for workflow_id in workflow_essences:
273 workflow_ids.append(workflow_id)
274
275 data['error'] = False
276 data['result'] = workflow_ids
277
278 log.info('returning a result for workflow list event to sid %s' % sid)
279 sio.emit(
280 event=WORKFLOW_LIST,
281 data=data,
282 room=sid
283 )
284
285
286def _handle_event_workflow_run_list(sid, body):
287 data = {
288 'req_id': _get_req_id(body)
289 }
290
291 workflow_run_ids = []
292
293 for workflow_run_id in workflow_runs:
294 workflow_run_ids.append(workflow_run_id)
295
296 data['error'] = False
297 data['result'] = workflow_run_ids
298
299 log.info('returning a result for workflow run list event to sid %s' % sid)
300 sio.emit(
301 event=WORKFLOW_LIST_RUN,
302 data=data,
303 room=sid
304 )
305
306
307def _handle_event_workflow_check(sid, body):
308 data = {
309 'req_id': _get_req_id(body)
310 }
311
312 if 'workflow_id' in body:
313 workflow_id = body['workflow_id']
314 if workflow_id in workflows:
315 data['error'] = False
316 data['result'] = True
317 else:
318 data['error'] = False
319 data['result'] = False
320 else:
321 data['error'] = True
322 data['result'] = False
323 data['message'] = 'workflow_id is not in the message body'
324
325 log.info('returning a result for workflow check event to sid %s' % sid)
326 sio.emit(
327 event=WORKFLOW_CHECK,
328 data=data,
329 room=sid
330 )
331
332
333def _handle_event_workflow_remove(sid, body):
334 data = {
335 'req_id': _get_req_id(body)
336 }
337
338 if 'workflow_id' in body:
339 workflow_id = body['workflow_id']
340 if workflow_id in workflows:
341
342 hasWorkflowRuns = False
343 for workflow_run_id in workflow_runs:
344 workflow_run = workflow_runs[workflow_run_id]
345 wid = workflow_run['workflow_id']
346 if wid == workflow_id:
347 # there is a workflow run for the workflow id
348 hasWorkflowRuns = True
349 break
350
351 if hasWorkflowRuns:
352 data['error'] = False
353 data['result'] = False
354 else:
355 del workflows[workflow_id]
356
357 data['error'] = False
358 data['result'] = True
359 else:
360 data['error'] = False
361 data['result'] = False
362 else:
363 data['error'] = True
364 data['result'] = False
365 data['message'] = 'workflow_id is not in the message body'
366
367 log.info('returning a result for workflow remove event to sid %s' % sid)
368 sio.emit(
369 event=WORKFLOW_REMOVE,
370 data=data,
371 room=sid
372 )
373
374
375def _handle_event_workflow_run_remove(sid, body):
376 data = {
377 'req_id': _get_req_id(body)
378 }
379
380 if 'workflow_id' in body and 'workflow_run_id' in body:
381 # workflow_id = body['workflow_id']
382 workflow_run_id = body['workflow_run_id']
383
384 if workflow_run_id in workflow_runs:
385 del workflow_runs[workflow_run_id]
386
387 data['error'] = False
388 data['result'] = True
389 else:
390 data['error'] = False
391 data['result'] = False
392 else:
393 data['error'] = True
394 data['result'] = False
395 data['message'] = 'workflow_id or workflow_run_id is not in the message body'
396
397 log.info('returning a result for workflow run remove event to sid %s' % sid)
398 sio.emit(
399 event=WORKFLOW_REMOVE_RUN,
400 data=data,
401 room=sid
402 )
403
404
405def _handle_event_new_workflow_run(sid, body):
406 data = {
407 'req_id': _get_req_id(body)
408 }
409
410 if 'workflow_id' in body and 'workflow_run_id' in body:
411 workflow_id = body['workflow_id']
412 workflow_run_id = body['workflow_run_id']
413
414 log.info('manager (%s) started a new workflow (%s), workflow_run (%s)' % (sid, workflow_id, workflow_run_id))
415 workflow_runs[workflow_run_id] = {
416 'workflow_id': workflow_id,
417 'workflow_run_id': workflow_run_id,
418 'queue': []
419 }
420
421 data['error'] = False
422 data['result'] = True
423 else:
424 data['error'] = True
425 data['result'] = False
426 data['message'] = 'workflow_id or workflow_run_id is not in the message body'
427
428 log.info('returning a result for a new workflow run event to sid %s' % sid)
429 sio.emit(
Illyoung Choi4df34b72019-07-18 13:55:18 -0700430 event=WORKFLOW_REPORT_NEW_RUN,
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700431 data=data,
432 room=sid
433 )
434
435
436def _handle_event_workflow_run_update_status(sid, body):
437 data = {
438 'req_id': _get_req_id(body)
439 }
440
441 if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'status' in body:
442 # workflow_id = body['workflow_id']
443 workflow_run_id = body['workflow_run_id']
444 task_id = body['task_id']
445 status = body['status']
446
447 if workflow_run_id in workflow_runs:
448 workflow_run = workflow_runs[workflow_run_id]
449 workflow_run[task_id] = status
450
451 data['error'] = False
452 data['result'] = True
453 else:
454 data['error'] = True
455 data['result'] = False
456 data['message'] = 'cannot find workflow run'
457 else:
458 data['error'] = True
459 data['result'] = False
460 data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
461
462 log.info('returning a result for workflow run update status event to sid %s' % sid)
463 sio.emit(
464 event=WORKFLOW_RUN_UPDATE_STATUS,
465 data=data,
466 room=sid
467 )
468
469
470def _handle_event_workflow_run_count_events(sid, body):
471 data = {
472 'req_id': _get_req_id(body)
473 }
474
475 if 'workflow_id' in body and 'workflow_run_id' in body:
476 # workflow_id = body['workflow_id']
477 workflow_run_id = body['workflow_run_id']
478
479 if workflow_run_id in workflow_runs:
480 workflow_run = workflow_runs[workflow_run_id]
481 queue = workflow_run['queue']
482 count = len(queue)
483
484 data['error'] = False
485 data['result'] = count
486 else:
487 data['error'] = True
488 data['result'] = 0
489 data['message'] = 'cannot find workflow run'
490 else:
491 data['error'] = True
492 data['result'] = 0
493 data['message'] = 'workflow_id, workflow_run_id, task_id or status is not in the message body'
494
495 log.info('returning a result for workflow run count events to sid %s' % sid)
496 sio.emit(
497 event=WORKFLOW_RUN_COUNT_EVENTS,
498 data=data,
499 room=sid
500 )
501
502
503def _handle_event_workflow_run_fetch_event(sid, body):
504 data = {
505 'req_id': _get_req_id(body)
506 }
507
508 if 'workflow_id' in body and 'workflow_run_id' in body and 'task_id' in body and 'topic' in body:
509 # workflow_id = body['workflow_id']
510 workflow_run_id = body['workflow_run_id']
511 # task_id = body['task_id']
512 topic = body['topic']
513
514 if workflow_run_id in workflow_runs:
515 workflow_run = workflow_runs[workflow_run_id]
516 queue = workflow_run['queue']
517
518 event = None
519 for idx in range(len(queue)):
520 if queue[idx]['topic'] == topic:
521 # found
522 event = queue.pop(idx)
523 break
524
525 if event:
526 data['error'] = False
527 data['result'] = event
528 else:
529 data['error'] = False
530 data['result'] = {}
531 else:
532 data['error'] = False
533 data['result'] = False
534 data['message'] = 'cannot find workflow run'
535 else:
536 data['error'] = True
537 data['result'] = False
538 data['message'] = 'workflow_id, workflow_run_id, task_id or topic is not in the message body'
539
540 log.info('returning a result for workflow run fetch event to sid %s' % sid)
541 sio.emit(
542 event=WORKFLOW_RUN_FETCH_EVENT,
543 data=data,
544 room=sid
545 )
546
547
Illyoung Choi67e54e72019-07-25 10:44:59 -0700548def _handle_event_emit(sid, body):
549 data = {
550 'req_id': _get_req_id(body)
551 }
552
553 if 'topic' in body and 'message' in body:
554 # workflow_id = body['workflow_id']
555 topic = body['topic']
556 message = body['message']
557
558 log.info('probe topic %s - message %s' % (topic, message))
559
560 data['error'] = False
561 data['result'] = True
562 else:
563 data['error'] = True
564 data['result'] = False
565 data['message'] = 'topic or message is not in the message body'
566
567 log.info('returning a result for event emit to sid %s' % sid)
568 sio.emit(
569 event=EVENT_EMIT,
570 data=data,
571 room=sid
572 )
573
574
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700575def _handle_event(event, sid, body):
576 log.info('event %s - body %s (%s)' % (event, body, type(body)))
577
578
579class ServerEventHandler(socketio.namespace.Namespace):
580 def trigger_event(self, event, *args):
581 sid = args[0]
582 if event == 'connect':
583 querystr = args[1]['QUERY_STRING']
584 query = make_query_string_dict(querystr)
585 _handle_event_connect(sid, query)
586 elif event == 'disconnect':
587 _handle_event_disconnect(sid)
588
589 # manager
Illyoung Choi4df34b72019-07-18 13:55:18 -0700590 elif event == WORKFLOW_REPORT_NEW_RUN:
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700591 _handle_event_new_workflow_run(sid, args[1])
592 elif event == WORKFLOW_REGISTER_ESSENCE:
593 _handle_event_workflow_reg_essence(sid, args[1])
594 elif event == WORKFLOW_REGISTER:
595 _handle_event_workflow_reg(sid, args[1])
596 elif event == WORKFLOW_LIST:
597 _handle_event_workflow_list(sid, args[1])
598 elif event == WORKFLOW_LIST_RUN:
599 _handle_event_workflow_run_list(sid, args[1])
600 elif event == WORKFLOW_CHECK:
601 _handle_event_workflow_check(sid, args[1])
602 elif event == WORKFLOW_REMOVE:
603 _handle_event_workflow_remove(sid, args[1])
604 elif event == WORKFLOW_REMOVE_RUN:
605 _handle_event_workflow_run_remove(sid, args[1])
606
607 # workflow run
608 elif event == WORKFLOW_RUN_UPDATE_STATUS:
609 _handle_event_workflow_run_update_status(sid, args[1])
610 elif event == WORKFLOW_RUN_COUNT_EVENTS:
611 _handle_event_workflow_run_count_events(sid, args[1])
612 elif event == WORKFLOW_RUN_FETCH_EVENT:
613 _handle_event_workflow_run_fetch_event(sid, args[1])
Illyoung Choi67e54e72019-07-25 10:44:59 -0700614 elif event == EVENT_EMIT:
615 _handle_event_emit(sid, args[1])
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700616 else:
617 _handle_event(event, args[0], args[1])
618
619
620def _run(port):
621 global sio
622 sio = socketio.Server(ping_timeout=5, ping_interval=1)
623 app = socketio.WSGIApp(sio)
624 sio.register_namespace(ServerEventHandler('/'))
625
626 server = pywsgi.WSGIServer(
627 ('', port),
628 app,
629 handler_class=WebSocketHandler
630 )
631
632 server.serve_forever()
633
634
635def start(port):
636 p = Process(target=_run, args=(port, ))
637 p.start()
638 time.sleep(3)
639
640 log.info('Dummy server is started!')
641 return p
642
643
644def stop(p):
645 log.info('Stopping dummy server!')
646
647 try:
648 process = psutil.Process(p.pid)
649 for proc in process.children(recursive=True):
650 proc.kill()
651 process.kill()
652 p.join()
Illyoung Choi4df34b72019-07-18 13:55:18 -0700653 except BaseException:
654 pass
Illyoung Choia9d2c2c2019-07-12 13:29:42 -0700655 except psutil.NoSuchProcess:
656 pass
657
658 # clean-up
659 global sio, manager_clients, workflow_runs, seq_no
660 sio = None
661 manager_clients = {}
662 workflow_runs = {}
663 seq_no = 1
664
665 time.sleep(3)
666
667 log.info('Dummy server is stopped!')