blob: bb62493834d4c5ee18cb14f8ae0fbe9e8cc78999 [file] [log] [blame]
Illyoung Choi5d59ab62019-06-24 16:15:27 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Essence Extractor
19
20This module extracts essence of airflow workflows
21Following information will be extracted from workflow code
22- DAG info
23- Operator info
24 - XOS-related operators
25 - Airflow operators
26- Dependency info
27"""
28
29import ast
30import sys
31import json
32import os.path
33
34
35def classname(cls):
36 return cls.__class__.__name__
37
38
39def jsonify_ast(node, level=0):
40 fields = {}
41 for k in node._fields:
42 fields[k] = '...'
43 v = getattr(node, k)
44 if isinstance(v, ast.AST):
45 if v._fields:
46 fields[k] = jsonify_ast(v)
47 else:
48 fields[k] = classname(v)
49
50 elif isinstance(v, list):
51 fields[k] = []
52 for e in v:
53 fields[k].append(jsonify_ast(e))
54
55 elif isinstance(v, str):
56 fields[k] = v
57
58 elif isinstance(v, int) or isinstance(v, float):
59 fields[k] = v
60
61 elif v is None:
62 fields[k] = None
63
64 else:
65 fields[k] = 'unrecognized'
66
67 ret = {
68 classname(node): fields
69 }
70 return ret
71
72
73def parse(code):
74 lines = code.split("\n")
75 if len(lines) == 1:
76 if code.endswith(".py") and os.path.exists(code):
77 return parse_codefile(code)
78 return parse_code(code)
79
80
81def parse_code(code):
82 tree = ast.parse(code)
83 return jsonify_ast(tree)
84
85
86def parse_codefile(code_filepath):
87 code = None
88 with open(code_filepath, "r") as f:
89 code = f.read()
90 tree = ast.parse(code, code_filepath)
91 return jsonify_ast(tree)
92
93
94def pretty_print_json(j):
95 dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
96 print(dumps)
97
98
99def recursively_find_elements(tree, elem):
100 """
101 traverse AST and find elements
102 """
103 for e in tree:
104 obj = None
105 if isinstance(tree, list):
106 obj = e
107 elif isinstance(tree, dict):
108 obj = tree[e]
109
110 if e == elem:
111 yield obj
112
113 if obj and (isinstance(obj, list) or isinstance(obj, dict)):
114 for y in recursively_find_elements(obj, elem):
115 yield y
116
117
118def extract_func_calls(tree, func_name):
119 """
120 extract function calls with assignment
121 """
122 assigns = recursively_find_elements(tree, "Assign")
123 if assigns:
124 for assign in assigns:
125 found = False
126
127 calls = recursively_find_elements(assign, "Call")
128 if calls:
129 for call in calls:
130 funcs = recursively_find_elements(call, "func")
131 if funcs:
132 for func in funcs:
133 if "Name" in func:
134 name = func["Name"]
135 if "ctx" in name and "id" in name:
136 # found function
137 if name["id"] == func_name:
138 found = True
139
140 if found:
141 yield assign
142
143
144def extract_func_calls_airflow_operators(tree):
145 """
146 extract only airflow operators which end with "*Operator" or "*Sensor"
147 """
148 assigns = recursively_find_elements(tree, "Assign")
149 if assigns:
150 for assign in assigns:
151 found = False
152
153 calls = recursively_find_elements(assign, "Call")
154 if calls:
155 for call in calls:
156 funcs = recursively_find_elements(call, "func")
157 if funcs:
158 for func in funcs:
159 if "Name" in func:
160 name = func["Name"]
161 if "ctx" in name and "id" in name:
162 # found function
163 if name["id"].endswith(("Operator", "Sensor")):
164 found = True
165
166 if found:
167 yield assign
168
169
170def extract_bin_op(tree, op_name):
171 """
172 extract binary operation such as >>, <<
173 """
174 ops = recursively_find_elements(tree, "BinOp")
175 if ops:
176 for op in ops:
177 if op["op"] == op_name:
178 yield op
179
180
181def take_string_or_tree(tree):
182 if "Str" in tree:
183 return tree["Str"]["s"]
184 return tree
185
186
187def take_num_or_tree(tree):
188 if "Num" in tree:
189 return tree["Num"]["n"]
190 return tree
191
192
193def take_id_or_tree(tree):
194 if "Name" in tree:
195 return tree["Name"]["id"]
196 return tree
197
198
199def take_name_constant_or_tree(tree):
200 if "NameConstant" in tree:
201 return tree["NameConstant"]["value"]
202 return tree
203
204
205def take_value_or_tree(tree):
206 if "Str" in tree:
207 return tree["Str"]["s"]
208 elif "Num" in tree:
209 return tree["Num"]["n"]
210 elif "Name" in tree:
211 val = tree["Name"]["id"]
212 if val in ["True", "False"]:
213 return bool(val)
214 elif val == "None":
215 return None
216 return val
217 elif "NameConstant" in tree:
218 val = tree["NameConstant"]["value"]
219 if val in ["True", "False"]:
220 return bool(val)
221 elif val == "None":
222 return None
223 return val
224 elif "List" in tree:
225 vals = []
226 if "elts" in tree["List"]:
227 elts = tree["List"]["elts"]
228 for elt in elts:
229 val = take_value_or_tree(elt)
230 vals.append(val)
231 return vals
232 return tree
233
234
235def make_dag(tree):
236 loc_val = None
237 dag_id = None
238
239 if "targets" in tree:
240 targets = tree["targets"]
241 loc_val = take_id_or_tree(targets[0])
242
243 if "value" in tree:
244 value = tree["value"]
245 if "Call" in value:
246 call = value["Call"]
247 if "keywords" in call:
248 keywords = call["keywords"]
249 for keyword in keywords:
250 if "keyword" in keyword:
251 k = keyword["keyword"]
252 if k["arg"] == "dag_id":
253 dag_id = take_string_or_tree(k["value"])
254
255 return {
256 'local_variable': loc_val,
257 'dag_id': dag_id
258 }
259
260
261def make_airflow_operator(tree):
262 airflow_operator = {}
263
264 if "targets" in tree:
265 targets = tree["targets"]
266 loc_val = take_id_or_tree(targets[0])
267 airflow_operator["local_variable"] = loc_val
268
269 if "value" in tree:
270 value = tree["value"]
271 if "Call" in value:
272 call = value["Call"]
273 if "func" in call:
274 class_name = take_id_or_tree(call["func"])
275 airflow_operator["class"] = class_name
276
277 if "keywords" in call:
278 keywords = call["keywords"]
279 for keyword in keywords:
280 if "keyword" in keyword:
281 k = keyword["keyword"]
282 arg = k["arg"]
283 airflow_operator[arg] = take_value_or_tree(k["value"])
284
285 return airflow_operator
286
287
288def make_dependencies_bin_op(tree, dependencies):
289 children = []
290 parents = []
291 child = None
292 parent = None
293
294 if tree["op"] == "RShift":
295 child = take_id_or_tree(tree["right"])
296 parent = take_id_or_tree(tree["left"])
297 elif tree["op"] == "LShift":
298 child = take_id_or_tree(tree["left"])
299 parent = take_id_or_tree(tree["right"])
300
301 if child:
302 if isinstance(child, dict):
303 if "List" in child:
304 for c in child["List"]["elts"]:
305 children.append(take_id_or_tree(c))
306 elif "BinOp" in child:
307 deps = make_dependencies_bin_op(child["BinOp"], dependencies)
308 for dep in deps:
309 children.append(dep)
310 else:
311 children.append(take_id_or_tree(child))
312 else:
313 children.append(child)
314
315 if parent:
316 if isinstance(parent, dict):
317 if "List" in parent:
318 for p in parent["List"]["elts"]:
319 parents.append(take_id_or_tree(p))
320 elif "BinOp" in parent:
321 deps = make_dependencies_bin_op(parent["BinOp"], dependencies)
322 for dep in deps:
323 parents.append(dep)
324 else:
325 parents.append(take_id_or_tree(parent))
326 else:
327 parents.append(parent)
328
329 if len(parents) > 0 and len(children) > 0:
330 # make all-vs-all combinations
331 for p in parents:
332 for c in children:
333 dep = {
334 'parent': p,
335 'child': c
336 }
337 dependencies.append(dep)
338
339 if tree["op"] == "RShift":
340 return children
341 elif tree["op"] == "LShift":
342 return parents
343 return children
344
345
346def extract_dep_operations(tree):
347 # extract dependency definition using ">>"
348 ops = extract_bin_op(tree, "RShift")
349 if ops:
350 for op in ops:
351 deps = []
352 make_dependencies_bin_op(op, deps)
353 for dep in deps:
354 yield dep
355
356 # extract dependency definition using "<<"
357 ops = extract_bin_op(tree, "LShift")
358 if ops:
359 for op in ops:
360 deps = []
361 make_dependencies_bin_op(op, deps)
362 for dep in deps:
363 yield dep
364
365
366def extract_dags(tree):
367 dags = {}
368 calls = extract_func_calls(tree, "DAG")
369 if calls:
370 for call in calls:
371 dag = make_dag(call)
372 dagid = dag["dag_id"]
373 dags[dagid] = dag
374 return dags
375
376
377def extract_XOS_event_sensors(tree):
378 operators = {}
379 calls = extract_func_calls(tree, "XOSEventSensor")
380 if calls:
381 for call in calls:
382 operator = make_airflow_operator(call)
383 operatorid = operator["task_id"]
384 operators[operatorid] = operator
385 return operators
386
387
388def extract_XOS_model_sensors(tree):
389 operators = {}
390 calls = extract_func_calls(tree, "XOSModelSensor")
391 if calls:
392 for call in calls:
393 operator = make_airflow_operator(call)
394 operatorid = operator["task_id"]
395 operators[operatorid] = operator
396 return operators
397
398
399def extract_airflow_operators(tree):
400 operators = {}
401 calls = extract_func_calls_airflow_operators(tree)
402 if calls:
403 for call in calls:
404 operator = make_airflow_operator(call)
405 operatorid = operator["task_id"]
406 operators[operatorid] = operator
407 return operators
408
409
410def extract_all_operators(tree):
411 operators = {}
412 event_sensors = extract_XOS_event_sensors(tree)
413 if event_sensors:
414 for event_sensor in event_sensors:
415 operators[event_sensor] = event_sensors[event_sensor]
416
417 model_sensors = extract_XOS_model_sensors(tree)
418 if model_sensors:
419 for model_sensor in model_sensors:
420 operators[model_sensor] = model_sensors[model_sensor]
421
422 airflow_operators = extract_airflow_operators(tree)
423 if airflow_operators:
424 for airflow_operator in airflow_operators:
425 operators[airflow_operator] = airflow_operators[airflow_operator]
426
427 return operators
428
429
430def extract_dependencies(tree):
431 """
432 Build N-N dependencies from fragmented parent-child relations
433 A node can have multiple parents and multiple children
434 """
435 dependencies = {}
436 ops = extract_dep_operations(tree)
437 if ops:
438 for op in ops:
439 p = op["parent"]
440 c = op["child"]
441
442 if p in dependencies:
443 # append to an existing list
444 node_p = dependencies[p]
445 if "children" in node_p:
446 # prevent duplicates
447 if c not in node_p["children"]:
448 node_p["children"].append(c)
449 else:
450 node_p["children"] = [c]
451 else:
452 # create a new
453 node_p = {
454 'children': [c]
455 }
456 dependencies[p] = node_p
457
458 if c in dependencies:
459 # append to an existing list
460 node_c = dependencies[c]
461 if "parents" in node_c:
462 # prevent duplicates
463 if p not in node_c["parents"]:
464 node_c["parents"].append(p)
465 else:
466 node_c["parents"] = [p]
467 else:
468 # create a new
469 node_c = {
470 'parents': [p]
471 }
472 dependencies[c] = node_c
473
474 return dependencies
475
476
477def extract_all(tree):
478 """
479 Build highlevel information of workflows dag, operators and dependencies refers to each other
480 """
481 dags = extract_dags(tree)
482 operators = extract_all_operators(tree)
483 dependencies = extract_dependencies(tree)
484
485 dag_dict = {}
486 for dag_id in dags:
487 dag = dags[dag_id]
488 dag_var = dag["local_variable"]
489
490 # filter operators that do not belong to the dag
491 my_operators = {}
492 my_operators_var = {}
493 for task_id in operators:
494 operator = operators[task_id]
495 if operator["dag"] == dag_var:
496 # set dag_id
497 operator["dag_id"] = dag_id
498 my_operators[task_id] = operator
499
500 # this is to help fast search while working with dependencies
501 operator_local_var = operator["local_variable"]
502 my_operators_var[operator_local_var] = operator
503
504 # filter dependencies that do not belong to the dag
505 my_dependencies = {}
506 for task_var in dependencies:
507 if task_var in my_operators_var:
508 dependency = dependencies[task_var]
509 task_id = my_operators_var[task_var]["task_id"]
510
511 # convert dependency task_var to task_id
512 dep = {}
513 if "children" in dependency:
514 dep["children"] = []
515 for child in dependency["children"]:
516 if child in my_operators_var:
517 child_task_id = my_operators_var[child]["task_id"]
518 dep["children"].append(child_task_id)
519
520 if "parents" in dependency:
521 dep["parents"] = []
522 for parent in dependency["parents"]:
523 if parent in my_operators_var:
524 parent_task_id = my_operators_var[parent]["task_id"]
525 dep["parents"].append(parent_task_id)
526
527 my_dependencies[task_id] = dep
528
529 d = {
530 'dag': dag,
531 'tasks': my_operators,
532 'dependencies': my_dependencies
533 }
534 dag_dict[dag_id] = d
535
536 return dag_dict
537
538
539# for command-line execution
540def main(argv):
541 if len(argv) < 1:
542 sys.exit("Error: Need a filepath")
543
544 code_filepath = argv[0]
545
546 tree = parse_codefile(code_filepath)
547 all = extract_all(tree)
548 pretty_print_json(all)
549
550
551if __name__ == "__main__":
552 main(sys.argv[1:])