blob: 6335dc8c099815fffab4593c85e0e49fd1e471f2 [file] [log] [blame]
Illyoung Choife121d02019-07-16 10:47:41 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Essence Extractor
19
20This module extracts essence of airflow workflows
21Following information will be extracted from workflow code
22- DAG info
23- Operator info
Illyoung Choi2e971512019-07-18 14:15:19 -070024 - CORD-related operators
Illyoung Choife121d02019-07-16 10:47:41 -070025 - Airflow operators
26- Dependency info
27"""
28
29import ast
30import json
31import os.path
32import argparse
33import pyfiglet
34
35from multistructlog import create_logger
36
37
38progargs = {
39 'logging': None
40}
41
42DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
43
44
45class NoopLogger(object):
46 def __init__(self):
47 pass
48
49 def info(self, *args):
50 pass
51
52 def debug(self, *args):
53 pass
54
55 def error(self, *args):
56 pass
57
58 def warn(self, *args):
59 pass
60
61
62class EssenceExtractor(object):
63 def __init__(self, logger=None):
64 if logger:
65 self.logger = logger
66 else:
67 self.logger = NoopLogger()
68
69 self.tree = None
70
71 def set_logger(self, logger):
72 self.logger = logger
73
74 def get_logger(self):
75 return self.logger
76
77 def get_ast(self):
78 return self.tree
79
80 def parse_code(self, code):
81 tree = ast.parse(code)
82 self.tree = self.__jsonify_ast(tree)
83
84 def parse_codefile(self, filepath):
85 code = None
86 with open(filepath, "r") as f:
87 code = f.read()
88 tree = ast.parse(code, filepath)
89 self.tree = self.__jsonify_ast(tree)
90
91 def __classname(self, cls):
92 return cls.__class__.__name__
93
94 def __jsonify_ast(self, node, level=0):
95 fields = {}
96 for k in node._fields:
97 fields[k] = '...'
98 v = getattr(node, k)
99 if isinstance(v, ast.AST):
100 if v._fields:
101 fields[k] = self.__jsonify_ast(v)
102 else:
103 fields[k] = self.__classname(v)
104
105 elif isinstance(v, list):
106 fields[k] = []
107 for e in v:
108 fields[k].append(self.__jsonify_ast(e))
109
110 elif isinstance(v, str):
111 fields[k] = v
112
113 elif isinstance(v, int) or isinstance(v, float):
114 fields[k] = v
115
116 elif v is None:
117 fields[k] = None
118
119 else:
120 fields[k] = 'unrecognized'
121
122 ret = {
123 self.__classname(node): fields
124 }
125 return ret
126
127 def __recursively_find_elements(self, tree, elem):
128 """
129 traverse AST and find elements
130 """
131 for e in tree:
132 obj = None
133 if isinstance(tree, list):
134 obj = e
135 elif isinstance(tree, dict):
136 obj = tree[e]
137
138 if e == elem:
139 yield obj
140
141 if obj and (isinstance(obj, list) or isinstance(obj, dict)):
142 for y in self.__recursively_find_elements(obj, elem):
143 yield y
144
145 def __extract_func_calls(self, tree, func_name):
146 """
147 extract function calls with assignment
148 """
149 assigns = self.__recursively_find_elements(tree, "Assign")
150 if assigns:
151 for assign in assigns:
152 found = False
153
154 calls = self.__recursively_find_elements(assign, "Call")
155 if calls:
156 for call in calls:
157 funcs = self.__recursively_find_elements(call, "func")
158 if funcs:
159 for func in funcs:
160 if "Name" in func:
161 name = func["Name"]
162 if "ctx" in name and "id" in name:
163 # found function
164 if name["id"] == func_name:
165 found = True
166
167 if found:
168 yield assign
169
170 def __extract_func_calls_airflow_operators(self, tree):
171 """
172 extract only airflow operators which end with "*Operator" or "*Sensor"
173 """
174 assigns = self.__recursively_find_elements(tree, "Assign")
175 if assigns:
176 for assign in assigns:
177 found = False
178
179 calls = self.__recursively_find_elements(assign, "Call")
180 if calls:
181 for call in calls:
182 funcs = self.__recursively_find_elements(call, "func")
183 if funcs:
184 for func in funcs:
185 if "Name" in func:
186 name = func["Name"]
187 if "ctx" in name and "id" in name:
188 # found function
189 if name["id"].endswith(("Operator", "Sensor")):
190 found = True
191
192 if found:
193 yield assign
194
195 def __extract_bin_op(self, tree, op_name):
196 """
197 extract binary operation such as >>, <<
198 """
199 ops = self.__recursively_find_elements(tree, "BinOp")
200 if ops:
201 for op in ops:
202 if op["op"] == op_name:
203 yield op
204
205 def __take_string_or_tree(self, tree):
206 if "Str" in tree:
207 return tree["Str"]["s"]
208 return tree
209
210 def __take_num_or_tree(self, tree):
211 if "Num" in tree:
212 return tree["Num"]["n"]
213 return tree
214
215 def __take_id_or_tree(self, tree):
216 if "Name" in tree:
217 return tree["Name"]["id"]
218 return tree
219
220 def __take_name_constant_or_tree(self, tree):
221 if "NameConstant" in tree:
222 return tree["NameConstant"]["value"]
223 return tree
224
225 def __take_value_or_tree(self, tree):
226 if "Str" in tree:
227 return tree["Str"]["s"]
228 elif "Num" in tree:
229 return tree["Num"]["n"]
230 elif "Name" in tree:
231 val = tree["Name"]["id"]
232 if val in ["True", "False"]:
233 return bool(val)
234 elif val == "None":
235 return None
236 return val
237 elif "NameConstant" in tree:
238 val = tree["NameConstant"]["value"]
239 if val in ["True", "False"]:
240 return bool(val)
241 elif val == "None":
242 return None
243 return val
244 elif "List" in tree:
245 vals = []
246 if "elts" in tree["List"]:
247 elts = tree["List"]["elts"]
248 for elt in elts:
249 val = self.__take_value_or_tree(elt)
250 vals.append(val)
251 return vals
252 return tree
253
254 def __make_dag(self, tree):
255 loc_val = None
256 dag_id = None
257
258 if "targets" in tree:
259 targets = tree["targets"]
260 loc_val = self.__take_id_or_tree(targets[0])
261
262 if "value" in tree:
263 value = tree["value"]
264 if "Call" in value:
265 call = value["Call"]
266 if "keywords" in call:
267 keywords = call["keywords"]
268 for keyword in keywords:
269 if "keyword" in keyword:
270 k = keyword["keyword"]
271 if k["arg"] == "dag_id":
272 dag_id = self.__take_string_or_tree(k["value"])
273
274 return {
275 'local_variable': loc_val,
276 'dag_id': dag_id
277 }
278
279 def __make_airflow_operator(self, tree):
280 airflow_operator = {}
281
282 if "targets" in tree:
283 targets = tree["targets"]
284 loc_val = self.__take_id_or_tree(targets[0])
285 airflow_operator["local_variable"] = loc_val
286
287 if "value" in tree:
288 value = tree["value"]
289 if "Call" in value:
290 call = value["Call"]
291 if "func" in call:
292 class_name = self.__take_id_or_tree(call["func"])
293 airflow_operator["class"] = class_name
294
295 if "keywords" in call:
296 keywords = call["keywords"]
297 for keyword in keywords:
298 if "keyword" in keyword:
299 k = keyword["keyword"]
300 arg = k["arg"]
301 airflow_operator[arg] = self.__take_value_or_tree(k["value"])
302
303 return airflow_operator
304
305 def __make_dependencies_bin_op(self, tree, dependencies):
306 children = []
307 parents = []
308 child = None
309 parent = None
310
311 if tree["op"] == "RShift":
312 child = self.__take_id_or_tree(tree["right"])
313 parent = self.__take_id_or_tree(tree["left"])
314 elif tree["op"] == "LShift":
315 child = self.__take_id_or_tree(tree["left"])
316 parent = self.__take_id_or_tree(tree["right"])
317
318 if child:
319 if isinstance(child, dict):
320 if "List" in child:
321 for c in child["List"]["elts"]:
322 children.append(self.__take_id_or_tree(c))
323 elif "BinOp" in child:
324 deps = self.__make_dependencies_bin_op(child["BinOp"], dependencies)
325 for dep in deps:
326 children.append(dep)
327 else:
328 children.append(self.__take_id_or_tree(child))
329 else:
330 children.append(child)
331
332 if parent:
333 if isinstance(parent, dict):
334 if "List" in parent:
335 for p in parent["List"]["elts"]:
336 parents.append(self.__take_id_or_tree(p))
337 elif "BinOp" in parent:
338 deps = self.__make_dependencies_bin_op(parent["BinOp"], dependencies)
339 for dep in deps:
340 parents.append(dep)
341 else:
342 parents.append(self.__take_id_or_tree(parent))
343 else:
344 parents.append(parent)
345
346 if len(parents) > 0 and len(children) > 0:
347 # make all-vs-all combinations
348 for p in parents:
349 for c in children:
350 dep = {
351 'parent': p,
352 'child': c
353 }
354 dependencies.append(dep)
355
356 if tree["op"] == "RShift":
357 return children
358 elif tree["op"] == "LShift":
359 return parents
360 return children
361
362 def __extract_dep_operations(self, tree):
363 # extract dependency definition using ">>"
364 ops = self.__extract_bin_op(tree, "RShift")
365 if ops:
366 for op in ops:
367 deps = []
368 self.__make_dependencies_bin_op(op, deps)
369 for dep in deps:
370 yield dep
371
372 # extract dependency definition using "<<"
373 ops = self.__extract_bin_op(tree, "LShift")
374 if ops:
375 for op in ops:
376 deps = []
377 self.__make_dependencies_bin_op(op, deps)
378 for dep in deps:
379 yield dep
380
381 def __extract_dags(self, tree):
382 dags = {}
383 calls = self.__extract_func_calls(tree, "DAG")
384 if calls:
385 for call in calls:
386 dag = self.__make_dag(call)
387 dagid = dag["dag_id"]
388 dags[dagid] = dag
389 return dags
390
Illyoung Choi2e971512019-07-18 14:15:19 -0700391 def __extract_CORD_event_sensors(self, tree):
Illyoung Choife121d02019-07-16 10:47:41 -0700392 operators = {}
Illyoung Choi2e971512019-07-18 14:15:19 -0700393 calls = self.__extract_func_calls(tree, "CORDEventSensor")
Illyoung Choife121d02019-07-16 10:47:41 -0700394 if calls:
395 for call in calls:
396 operator = self.__make_airflow_operator(call)
397 operatorid = operator["task_id"]
398 operators[operatorid] = operator
399 return operators
400
Illyoung Choi2e971512019-07-18 14:15:19 -0700401 def __extract_CORD_model_sensors(self, tree):
Illyoung Choife121d02019-07-16 10:47:41 -0700402 operators = {}
Illyoung Choi2e971512019-07-18 14:15:19 -0700403 calls = self.__extract_func_calls(tree, "CORDModelSensor")
404 if calls:
405 for call in calls:
406 operator = self.__make_airflow_operator(call)
407 operatorid = operator["task_id"]
408 operators[operatorid] = operator
409 return operators
410
411 def __extract_CORD_model_operators(self, tree):
412 operators = {}
413 calls = self.__extract_func_calls(tree, "CORDModelOperator")
Illyoung Choife121d02019-07-16 10:47:41 -0700414 if calls:
415 for call in calls:
416 operator = self.__make_airflow_operator(call)
417 operatorid = operator["task_id"]
418 operators[operatorid] = operator
419 return operators
420
421 def __extract_airflow_operators(self, tree):
422 operators = {}
423 calls = self.__extract_func_calls_airflow_operators(tree)
424 if calls:
425 for call in calls:
426 operator = self.__make_airflow_operator(call)
427 operatorid = operator["task_id"]
428 operators[operatorid] = operator
429 return operators
430
431 def __extract_all_operators(self, tree):
432 operators = {}
Illyoung Choi2e971512019-07-18 14:15:19 -0700433 event_sensors = self.__extract_CORD_event_sensors(tree)
Illyoung Choife121d02019-07-16 10:47:41 -0700434 if event_sensors:
Illyoung Choi2e971512019-07-18 14:15:19 -0700435 for task_id in event_sensors:
436 operators[task_id] = event_sensors[task_id]
Illyoung Choife121d02019-07-16 10:47:41 -0700437
Illyoung Choi2e971512019-07-18 14:15:19 -0700438 model_sensors = self.__extract_CORD_model_sensors(tree)
Illyoung Choife121d02019-07-16 10:47:41 -0700439 if model_sensors:
Illyoung Choi2e971512019-07-18 14:15:19 -0700440 for task_id in model_sensors:
441 operators[task_id] = model_sensors[task_id]
442
443 model_operators = self.__extract_CORD_model_operators(tree)
444 if model_operators:
445 for task_id in model_operators:
446 operators[task_id] = model_operators[task_id]
Illyoung Choife121d02019-07-16 10:47:41 -0700447
448 airflow_operators = self.__extract_airflow_operators(tree)
449 if airflow_operators:
Illyoung Choi2e971512019-07-18 14:15:19 -0700450 for task_id in airflow_operators:
451 # add operators that are not already handled above
452 if task_id not in operators:
453 operators[task_id] = airflow_operators[task_id]
Illyoung Choife121d02019-07-16 10:47:41 -0700454
455 return operators
456
457 def __extract_dependencies(self, tree):
458 """
459 Build N-N dependencies from fragmented parent-child relations
460 A node can have multiple parents and multiple children
461 """
462 dependencies = {}
463 ops = self.__extract_dep_operations(tree)
464 if ops:
465 for op in ops:
466 p = op["parent"]
467 c = op["child"]
468
469 if p in dependencies:
470 # append to an existing list
471 node_p = dependencies[p]
472 if "children" in node_p:
473 # prevent duplicates
474 if c not in node_p["children"]:
475 node_p["children"].append(c)
476 else:
477 node_p["children"] = [c]
478 else:
479 # create a new
480 node_p = {
481 'children': [c]
482 }
483 dependencies[p] = node_p
484
485 if c in dependencies:
486 # append to an existing list
487 node_c = dependencies[c]
488 if "parents" in node_c:
489 # prevent duplicates
490 if p not in node_c["parents"]:
491 node_c["parents"].append(p)
492 else:
493 node_c["parents"] = [p]
494 else:
495 # create a new
496 node_c = {
497 'parents': [p]
498 }
499 dependencies[c] = node_c
500
501 return dependencies
502
503 def extract(self):
504 """
505 Build highlevel information of workflows dag, operators and dependencies refers to each other
506 """
507 if self.tree:
508 dags = self.__extract_dags(self.tree)
509 operators = self.__extract_all_operators(self.tree)
510 dependencies = self.__extract_dependencies(self.tree)
511
512 dag_dict = {}
513 for dag_id in dags:
514 dag = dags[dag_id]
515 dag_var = dag["local_variable"]
516
517 # filter operators that do not belong to the dag
518 my_operators = {}
519 my_operators_var = {}
520 for task_id in operators:
521 operator = operators[task_id]
522 if operator["dag"] == dag_var:
523 # set dag_id
524 operator["dag_id"] = dag_id
525 my_operators[task_id] = operator
526
527 # this is to help fast search while working with dependencies
528 operator_local_var = operator["local_variable"]
529 my_operators_var[operator_local_var] = operator
530
531 # filter dependencies that do not belong to the dag
532 my_dependencies = {}
533 for task_var in dependencies:
534 if task_var in my_operators_var:
535 dependency = dependencies[task_var]
536 task_id = my_operators_var[task_var]["task_id"]
537
538 # convert dependency task_var to task_id
539 dep = {}
540 if "children" in dependency:
541 dep["children"] = []
542 for child in dependency["children"]:
543 if child in my_operators_var:
544 child_task_id = my_operators_var[child]["task_id"]
545 dep["children"].append(child_task_id)
546
547 if "parents" in dependency:
548 dep["parents"] = []
549 for parent in dependency["parents"]:
550 if parent in my_operators_var:
551 parent_task_id = my_operators_var[parent]["task_id"]
552 dep["parents"].append(parent_task_id)
553
554 my_dependencies[task_id] = dep
555
556 d = {
557 'dag': dag,
558 'tasks': my_operators,
559 'dependencies': my_dependencies
560 }
561 dag_dict[dag_id] = d
562
563 return dag_dict
564 else:
565 return None
566
567
568"""
569Command-line tool
570"""
571
572
573def print_graffiti():
574 result = pyfiglet.figlet_format("CORD\nWorkflow\nEssence\nExtractor", font="graffiti")
575 print(result)
576
577
578def get_arg_parser():
579 parser = argparse.ArgumentParser(description='CORD Workflow Essence Extractor.', prog='essence_extractor')
580 parser.add_argument('--config', help='locate a configuration file')
581 parser.add_argument('-o', '--output', help='output file path')
582 parser.add_argument('-c', '--stdout', action='store_true', help='output to console (STDOUT)')
583 parser.add_argument('input_file', help='input airflow dag source file')
584 return parser
585
586
587def read_config(path):
588 if os.path.exists(path):
589 with open(path) as json_config_file:
590 data = json.load(json_config_file)
591 return data
592 return {}
593
594
595def pretty_format_json(j):
596 dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
597 return dumps
598
599
600# for command-line execution
601def main(args):
Illyoung Choife121d02019-07-16 10:47:41 -0700602 # check if config path is set
603 config_file_path = DEFAULT_CONFIG_FILE_PATH
604 if args.config:
605 config_file_path = args.config
606
607 if os.path.exists(config_file_path):
608 # read config
609 config = read_config(config_file_path)
610 if config:
611 global progargs
612 for k in progargs:
613 # overwrite
Illyoung Choi39262742019-07-23 13:28:00 -0700614 if k in config:
615 progargs[k] = config[k]
Illyoung Choife121d02019-07-16 10:47:41 -0700616
617 log = create_logger(progargs["logging"])
618
619 code_filepath = args.input_file
Illyoung Choi2e971512019-07-18 14:15:19 -0700620 if not os.path.exists(code_filepath):
621 raise IOError('cannot find an input file - %s' % code_filepath)
Illyoung Choife121d02019-07-16 10:47:41 -0700622
623 output_filepath = './essence.json'
624 if args.output:
625 output_filepath = args.output
626
Illyoung Choi2e971512019-07-18 14:15:19 -0700627 print_console = False
Illyoung Choife121d02019-07-16 10:47:41 -0700628 if args.stdout or output_filepath == '-':
Illyoung Choi2e971512019-07-18 14:15:19 -0700629 print_console = True
630
631 extractor = EssenceExtractor(logger=log)
632 extractor.parse_codefile(code_filepath)
633 essence = extractor.extract()
634 json_string = pretty_format_json(essence)
635 if print_console:
Illyoung Choife121d02019-07-16 10:47:41 -0700636 print(json_string)
637 else:
Illyoung Choi2e971512019-07-18 14:15:19 -0700638 print_graffiti()
Illyoung Choife121d02019-07-16 10:47:41 -0700639 with open(output_filepath, 'w') as f:
640 f.write(json_string)
641
642
643if __name__ == "__main__":
644 parser = get_arg_parser()
645 args = parser.parse_args()
646 main(args)