blob: 901d40fe6669d24d0b60502e0c3715eabda77deb [file] [log] [blame]
Illyoung Choife121d02019-07-16 10:47:41 -07001#!/usr/bin/env python3
2
3# Copyright 2019-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""
18Workflow Essence Extractor
19
20This module extracts essence of airflow workflows
21Following information will be extracted from workflow code
22- DAG info
23- Operator info
24 - XOS-related operators
25 - Airflow operators
26- Dependency info
27"""
28
29import ast
30import json
31import os.path
32import argparse
33import pyfiglet
34
35from multistructlog import create_logger
36
37
38progargs = {
39 'logging': None
40}
41
42DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
43
44
45class NoopLogger(object):
46 def __init__(self):
47 pass
48
49 def info(self, *args):
50 pass
51
52 def debug(self, *args):
53 pass
54
55 def error(self, *args):
56 pass
57
58 def warn(self, *args):
59 pass
60
61
62class EssenceExtractor(object):
63 def __init__(self, logger=None):
64 if logger:
65 self.logger = logger
66 else:
67 self.logger = NoopLogger()
68
69 self.tree = None
70
71 def set_logger(self, logger):
72 self.logger = logger
73
74 def get_logger(self):
75 return self.logger
76
77 def get_ast(self):
78 return self.tree
79
80 def parse_code(self, code):
81 tree = ast.parse(code)
82 self.tree = self.__jsonify_ast(tree)
83
84 def parse_codefile(self, filepath):
85 code = None
86 with open(filepath, "r") as f:
87 code = f.read()
88 tree = ast.parse(code, filepath)
89 self.tree = self.__jsonify_ast(tree)
90
91 def __classname(self, cls):
92 return cls.__class__.__name__
93
94 def __jsonify_ast(self, node, level=0):
95 fields = {}
96 for k in node._fields:
97 fields[k] = '...'
98 v = getattr(node, k)
99 if isinstance(v, ast.AST):
100 if v._fields:
101 fields[k] = self.__jsonify_ast(v)
102 else:
103 fields[k] = self.__classname(v)
104
105 elif isinstance(v, list):
106 fields[k] = []
107 for e in v:
108 fields[k].append(self.__jsonify_ast(e))
109
110 elif isinstance(v, str):
111 fields[k] = v
112
113 elif isinstance(v, int) or isinstance(v, float):
114 fields[k] = v
115
116 elif v is None:
117 fields[k] = None
118
119 else:
120 fields[k] = 'unrecognized'
121
122 ret = {
123 self.__classname(node): fields
124 }
125 return ret
126
127 def __recursively_find_elements(self, tree, elem):
128 """
129 traverse AST and find elements
130 """
131 for e in tree:
132 obj = None
133 if isinstance(tree, list):
134 obj = e
135 elif isinstance(tree, dict):
136 obj = tree[e]
137
138 if e == elem:
139 yield obj
140
141 if obj and (isinstance(obj, list) or isinstance(obj, dict)):
142 for y in self.__recursively_find_elements(obj, elem):
143 yield y
144
145 def __extract_func_calls(self, tree, func_name):
146 """
147 extract function calls with assignment
148 """
149 assigns = self.__recursively_find_elements(tree, "Assign")
150 if assigns:
151 for assign in assigns:
152 found = False
153
154 calls = self.__recursively_find_elements(assign, "Call")
155 if calls:
156 for call in calls:
157 funcs = self.__recursively_find_elements(call, "func")
158 if funcs:
159 for func in funcs:
160 if "Name" in func:
161 name = func["Name"]
162 if "ctx" in name and "id" in name:
163 # found function
164 if name["id"] == func_name:
165 found = True
166
167 if found:
168 yield assign
169
170 def __extract_func_calls_airflow_operators(self, tree):
171 """
172 extract only airflow operators which end with "*Operator" or "*Sensor"
173 """
174 assigns = self.__recursively_find_elements(tree, "Assign")
175 if assigns:
176 for assign in assigns:
177 found = False
178
179 calls = self.__recursively_find_elements(assign, "Call")
180 if calls:
181 for call in calls:
182 funcs = self.__recursively_find_elements(call, "func")
183 if funcs:
184 for func in funcs:
185 if "Name" in func:
186 name = func["Name"]
187 if "ctx" in name and "id" in name:
188 # found function
189 if name["id"].endswith(("Operator", "Sensor")):
190 found = True
191
192 if found:
193 yield assign
194
195 def __extract_bin_op(self, tree, op_name):
196 """
197 extract binary operation such as >>, <<
198 """
199 ops = self.__recursively_find_elements(tree, "BinOp")
200 if ops:
201 for op in ops:
202 if op["op"] == op_name:
203 yield op
204
205 def __take_string_or_tree(self, tree):
206 if "Str" in tree:
207 return tree["Str"]["s"]
208 return tree
209
210 def __take_num_or_tree(self, tree):
211 if "Num" in tree:
212 return tree["Num"]["n"]
213 return tree
214
215 def __take_id_or_tree(self, tree):
216 if "Name" in tree:
217 return tree["Name"]["id"]
218 return tree
219
220 def __take_name_constant_or_tree(self, tree):
221 if "NameConstant" in tree:
222 return tree["NameConstant"]["value"]
223 return tree
224
225 def __take_value_or_tree(self, tree):
226 if "Str" in tree:
227 return tree["Str"]["s"]
228 elif "Num" in tree:
229 return tree["Num"]["n"]
230 elif "Name" in tree:
231 val = tree["Name"]["id"]
232 if val in ["True", "False"]:
233 return bool(val)
234 elif val == "None":
235 return None
236 return val
237 elif "NameConstant" in tree:
238 val = tree["NameConstant"]["value"]
239 if val in ["True", "False"]:
240 return bool(val)
241 elif val == "None":
242 return None
243 return val
244 elif "List" in tree:
245 vals = []
246 if "elts" in tree["List"]:
247 elts = tree["List"]["elts"]
248 for elt in elts:
249 val = self.__take_value_or_tree(elt)
250 vals.append(val)
251 return vals
252 return tree
253
254 def __make_dag(self, tree):
255 loc_val = None
256 dag_id = None
257
258 if "targets" in tree:
259 targets = tree["targets"]
260 loc_val = self.__take_id_or_tree(targets[0])
261
262 if "value" in tree:
263 value = tree["value"]
264 if "Call" in value:
265 call = value["Call"]
266 if "keywords" in call:
267 keywords = call["keywords"]
268 for keyword in keywords:
269 if "keyword" in keyword:
270 k = keyword["keyword"]
271 if k["arg"] == "dag_id":
272 dag_id = self.__take_string_or_tree(k["value"])
273
274 return {
275 'local_variable': loc_val,
276 'dag_id': dag_id
277 }
278
279 def __make_airflow_operator(self, tree):
280 airflow_operator = {}
281
282 if "targets" in tree:
283 targets = tree["targets"]
284 loc_val = self.__take_id_or_tree(targets[0])
285 airflow_operator["local_variable"] = loc_val
286
287 if "value" in tree:
288 value = tree["value"]
289 if "Call" in value:
290 call = value["Call"]
291 if "func" in call:
292 class_name = self.__take_id_or_tree(call["func"])
293 airflow_operator["class"] = class_name
294
295 if "keywords" in call:
296 keywords = call["keywords"]
297 for keyword in keywords:
298 if "keyword" in keyword:
299 k = keyword["keyword"]
300 arg = k["arg"]
301 airflow_operator[arg] = self.__take_value_or_tree(k["value"])
302
303 return airflow_operator
304
305 def __make_dependencies_bin_op(self, tree, dependencies):
306 children = []
307 parents = []
308 child = None
309 parent = None
310
311 if tree["op"] == "RShift":
312 child = self.__take_id_or_tree(tree["right"])
313 parent = self.__take_id_or_tree(tree["left"])
314 elif tree["op"] == "LShift":
315 child = self.__take_id_or_tree(tree["left"])
316 parent = self.__take_id_or_tree(tree["right"])
317
318 if child:
319 if isinstance(child, dict):
320 if "List" in child:
321 for c in child["List"]["elts"]:
322 children.append(self.__take_id_or_tree(c))
323 elif "BinOp" in child:
324 deps = self.__make_dependencies_bin_op(child["BinOp"], dependencies)
325 for dep in deps:
326 children.append(dep)
327 else:
328 children.append(self.__take_id_or_tree(child))
329 else:
330 children.append(child)
331
332 if parent:
333 if isinstance(parent, dict):
334 if "List" in parent:
335 for p in parent["List"]["elts"]:
336 parents.append(self.__take_id_or_tree(p))
337 elif "BinOp" in parent:
338 deps = self.__make_dependencies_bin_op(parent["BinOp"], dependencies)
339 for dep in deps:
340 parents.append(dep)
341 else:
342 parents.append(self.__take_id_or_tree(parent))
343 else:
344 parents.append(parent)
345
346 if len(parents) > 0 and len(children) > 0:
347 # make all-vs-all combinations
348 for p in parents:
349 for c in children:
350 dep = {
351 'parent': p,
352 'child': c
353 }
354 dependencies.append(dep)
355
356 if tree["op"] == "RShift":
357 return children
358 elif tree["op"] == "LShift":
359 return parents
360 return children
361
362 def __extract_dep_operations(self, tree):
363 # extract dependency definition using ">>"
364 ops = self.__extract_bin_op(tree, "RShift")
365 if ops:
366 for op in ops:
367 deps = []
368 self.__make_dependencies_bin_op(op, deps)
369 for dep in deps:
370 yield dep
371
372 # extract dependency definition using "<<"
373 ops = self.__extract_bin_op(tree, "LShift")
374 if ops:
375 for op in ops:
376 deps = []
377 self.__make_dependencies_bin_op(op, deps)
378 for dep in deps:
379 yield dep
380
381 def __extract_dags(self, tree):
382 dags = {}
383 calls = self.__extract_func_calls(tree, "DAG")
384 if calls:
385 for call in calls:
386 dag = self.__make_dag(call)
387 dagid = dag["dag_id"]
388 dags[dagid] = dag
389 return dags
390
391 def __extract_XOS_event_sensors(self, tree):
392 operators = {}
393 calls = self.__extract_func_calls(tree, "XOSEventSensor")
394 if calls:
395 for call in calls:
396 operator = self.__make_airflow_operator(call)
397 operatorid = operator["task_id"]
398 operators[operatorid] = operator
399 return operators
400
401 def __extract_XOS_model_sensors(self, tree):
402 operators = {}
403 calls = self.__extract_func_calls(tree, "XOSModelSensor")
404 if calls:
405 for call in calls:
406 operator = self.__make_airflow_operator(call)
407 operatorid = operator["task_id"]
408 operators[operatorid] = operator
409 return operators
410
411 def __extract_airflow_operators(self, tree):
412 operators = {}
413 calls = self.__extract_func_calls_airflow_operators(tree)
414 if calls:
415 for call in calls:
416 operator = self.__make_airflow_operator(call)
417 operatorid = operator["task_id"]
418 operators[operatorid] = operator
419 return operators
420
421 def __extract_all_operators(self, tree):
422 operators = {}
423 event_sensors = self.__extract_XOS_event_sensors(tree)
424 if event_sensors:
425 for event_sensor in event_sensors:
426 operators[event_sensor] = event_sensors[event_sensor]
427
428 model_sensors = self.__extract_XOS_model_sensors(tree)
429 if model_sensors:
430 for model_sensor in model_sensors:
431 operators[model_sensor] = model_sensors[model_sensor]
432
433 airflow_operators = self.__extract_airflow_operators(tree)
434 if airflow_operators:
435 for airflow_operator in airflow_operators:
436 operators[airflow_operator] = airflow_operators[airflow_operator]
437
438 return operators
439
440 def __extract_dependencies(self, tree):
441 """
442 Build N-N dependencies from fragmented parent-child relations
443 A node can have multiple parents and multiple children
444 """
445 dependencies = {}
446 ops = self.__extract_dep_operations(tree)
447 if ops:
448 for op in ops:
449 p = op["parent"]
450 c = op["child"]
451
452 if p in dependencies:
453 # append to an existing list
454 node_p = dependencies[p]
455 if "children" in node_p:
456 # prevent duplicates
457 if c not in node_p["children"]:
458 node_p["children"].append(c)
459 else:
460 node_p["children"] = [c]
461 else:
462 # create a new
463 node_p = {
464 'children': [c]
465 }
466 dependencies[p] = node_p
467
468 if c in dependencies:
469 # append to an existing list
470 node_c = dependencies[c]
471 if "parents" in node_c:
472 # prevent duplicates
473 if p not in node_c["parents"]:
474 node_c["parents"].append(p)
475 else:
476 node_c["parents"] = [p]
477 else:
478 # create a new
479 node_c = {
480 'parents': [p]
481 }
482 dependencies[c] = node_c
483
484 return dependencies
485
486 def extract(self):
487 """
488 Build highlevel information of workflows dag, operators and dependencies refers to each other
489 """
490 if self.tree:
491 dags = self.__extract_dags(self.tree)
492 operators = self.__extract_all_operators(self.tree)
493 dependencies = self.__extract_dependencies(self.tree)
494
495 dag_dict = {}
496 for dag_id in dags:
497 dag = dags[dag_id]
498 dag_var = dag["local_variable"]
499
500 # filter operators that do not belong to the dag
501 my_operators = {}
502 my_operators_var = {}
503 for task_id in operators:
504 operator = operators[task_id]
505 if operator["dag"] == dag_var:
506 # set dag_id
507 operator["dag_id"] = dag_id
508 my_operators[task_id] = operator
509
510 # this is to help fast search while working with dependencies
511 operator_local_var = operator["local_variable"]
512 my_operators_var[operator_local_var] = operator
513
514 # filter dependencies that do not belong to the dag
515 my_dependencies = {}
516 for task_var in dependencies:
517 if task_var in my_operators_var:
518 dependency = dependencies[task_var]
519 task_id = my_operators_var[task_var]["task_id"]
520
521 # convert dependency task_var to task_id
522 dep = {}
523 if "children" in dependency:
524 dep["children"] = []
525 for child in dependency["children"]:
526 if child in my_operators_var:
527 child_task_id = my_operators_var[child]["task_id"]
528 dep["children"].append(child_task_id)
529
530 if "parents" in dependency:
531 dep["parents"] = []
532 for parent in dependency["parents"]:
533 if parent in my_operators_var:
534 parent_task_id = my_operators_var[parent]["task_id"]
535 dep["parents"].append(parent_task_id)
536
537 my_dependencies[task_id] = dep
538
539 d = {
540 'dag': dag,
541 'tasks': my_operators,
542 'dependencies': my_dependencies
543 }
544 dag_dict[dag_id] = d
545
546 return dag_dict
547 else:
548 return None
549
550
551"""
552Command-line tool
553"""
554
555
556def print_graffiti():
557 result = pyfiglet.figlet_format("CORD\nWorkflow\nEssence\nExtractor", font="graffiti")
558 print(result)
559
560
561def get_arg_parser():
562 parser = argparse.ArgumentParser(description='CORD Workflow Essence Extractor.', prog='essence_extractor')
563 parser.add_argument('--config', help='locate a configuration file')
564 parser.add_argument('-o', '--output', help='output file path')
565 parser.add_argument('-c', '--stdout', action='store_true', help='output to console (STDOUT)')
566 parser.add_argument('input_file', help='input airflow dag source file')
567 return parser
568
569
570def read_config(path):
571 if os.path.exists(path):
572 with open(path) as json_config_file:
573 data = json.load(json_config_file)
574 return data
575 return {}
576
577
578def pretty_format_json(j):
579 dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
580 return dumps
581
582
583# for command-line execution
584def main(args):
585 print_graffiti()
586
587 # check if config path is set
588 config_file_path = DEFAULT_CONFIG_FILE_PATH
589 if args.config:
590 config_file_path = args.config
591
592 if os.path.exists(config_file_path):
593 # read config
594 config = read_config(config_file_path)
595 if config:
596 global progargs
597 for k in progargs:
598 # overwrite
599 progargs[k] = config[k]
600
601 log = create_logger(progargs["logging"])
602
603 code_filepath = args.input_file
604 if os.path.exists(code_filepath):
605 raise 'cannot find an input file - %s' % code_filepath
606
607 extractor = EssenceExtractor(logger=log)
608 extractor.parse_codefile(code_filepath)
609 essence = extractor.extract()
610
611 output_filepath = './essence.json'
612 if args.output:
613 output_filepath = args.output
614
615 json_string = pretty_format_json(essence)
616 if args.stdout or output_filepath == '-':
617 print(json_string)
618 else:
619 with open(output_filepath, 'w') as f:
620 f.write(json_string)
621
622
623if __name__ == "__main__":
624 parser = get_arg_parser()
625 args = parser.parse_args()
626 main(args)