blob: 6335dc8c099815fffab4593c85e0e49fd1e471f2 [file] [log] [blame]
#!/usr/bin/env python3
# Copyright 2019-present Open Networking Foundation
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# See the License for the specific language governing permissions and
# limitations under the License.
Workflow Essence Extractor
This module extracts essence of airflow workflows
Following information will be extracted from workflow code
- DAG info
- Operator info
- CORD-related operators
- Airflow operators
- Dependency info
import ast
import json
import os.path
import argparse
import pyfiglet
from multistructlog import create_logger
progargs = {
'logging': None
DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
class NoopLogger(object):
def __init__(self):
def info(self, *args):
def debug(self, *args):
def error(self, *args):
def warn(self, *args):
class EssenceExtractor(object):
def __init__(self, logger=None):
if logger:
self.logger = logger
self.logger = NoopLogger()
self.tree = None
def set_logger(self, logger):
self.logger = logger
def get_logger(self):
return self.logger
def get_ast(self):
return self.tree
def parse_code(self, code):
tree = ast.parse(code)
self.tree = self.__jsonify_ast(tree)
def parse_codefile(self, filepath):
code = None
with open(filepath, "r") as f:
code =
tree = ast.parse(code, filepath)
self.tree = self.__jsonify_ast(tree)
def __classname(self, cls):
return cls.__class__.__name__
def __jsonify_ast(self, node, level=0):
fields = {}
for k in node._fields:
fields[k] = '...'
v = getattr(node, k)
if isinstance(v, ast.AST):
if v._fields:
fields[k] = self.__jsonify_ast(v)
fields[k] = self.__classname(v)
elif isinstance(v, list):
fields[k] = []
for e in v:
elif isinstance(v, str):
fields[k] = v
elif isinstance(v, int) or isinstance(v, float):
fields[k] = v
elif v is None:
fields[k] = None
fields[k] = 'unrecognized'
ret = {
self.__classname(node): fields
return ret
def __recursively_find_elements(self, tree, elem):
traverse AST and find elements
for e in tree:
obj = None
if isinstance(tree, list):
obj = e
elif isinstance(tree, dict):
obj = tree[e]
if e == elem:
yield obj
if obj and (isinstance(obj, list) or isinstance(obj, dict)):
for y in self.__recursively_find_elements(obj, elem):
yield y
def __extract_func_calls(self, tree, func_name):
extract function calls with assignment
assigns = self.__recursively_find_elements(tree, "Assign")
if assigns:
for assign in assigns:
found = False
calls = self.__recursively_find_elements(assign, "Call")
if calls:
for call in calls:
funcs = self.__recursively_find_elements(call, "func")
if funcs:
for func in funcs:
if "Name" in func:
name = func["Name"]
if "ctx" in name and "id" in name:
# found function
if name["id"] == func_name:
found = True
if found:
yield assign
def __extract_func_calls_airflow_operators(self, tree):
extract only airflow operators which end with "*Operator" or "*Sensor"
assigns = self.__recursively_find_elements(tree, "Assign")
if assigns:
for assign in assigns:
found = False
calls = self.__recursively_find_elements(assign, "Call")
if calls:
for call in calls:
funcs = self.__recursively_find_elements(call, "func")
if funcs:
for func in funcs:
if "Name" in func:
name = func["Name"]
if "ctx" in name and "id" in name:
# found function
if name["id"].endswith(("Operator", "Sensor")):
found = True
if found:
yield assign
def __extract_bin_op(self, tree, op_name):
extract binary operation such as >>, <<
ops = self.__recursively_find_elements(tree, "BinOp")
if ops:
for op in ops:
if op["op"] == op_name:
yield op
def __take_string_or_tree(self, tree):
if "Str" in tree:
return tree["Str"]["s"]
return tree
def __take_num_or_tree(self, tree):
if "Num" in tree:
return tree["Num"]["n"]
return tree
def __take_id_or_tree(self, tree):
if "Name" in tree:
return tree["Name"]["id"]
return tree
def __take_name_constant_or_tree(self, tree):
if "NameConstant" in tree:
return tree["NameConstant"]["value"]
return tree
def __take_value_or_tree(self, tree):
if "Str" in tree:
return tree["Str"]["s"]
elif "Num" in tree:
return tree["Num"]["n"]
elif "Name" in tree:
val = tree["Name"]["id"]
if val in ["True", "False"]:
return bool(val)
elif val == "None":
return None
return val
elif "NameConstant" in tree:
val = tree["NameConstant"]["value"]
if val in ["True", "False"]:
return bool(val)
elif val == "None":
return None
return val
elif "List" in tree:
vals = []
if "elts" in tree["List"]:
elts = tree["List"]["elts"]
for elt in elts:
val = self.__take_value_or_tree(elt)
return vals
return tree
def __make_dag(self, tree):
loc_val = None
dag_id = None
if "targets" in tree:
targets = tree["targets"]
loc_val = self.__take_id_or_tree(targets[0])
if "value" in tree:
value = tree["value"]
if "Call" in value:
call = value["Call"]
if "keywords" in call:
keywords = call["keywords"]
for keyword in keywords:
if "keyword" in keyword:
k = keyword["keyword"]
if k["arg"] == "dag_id":
dag_id = self.__take_string_or_tree(k["value"])
return {
'local_variable': loc_val,
'dag_id': dag_id
def __make_airflow_operator(self, tree):
airflow_operator = {}
if "targets" in tree:
targets = tree["targets"]
loc_val = self.__take_id_or_tree(targets[0])
airflow_operator["local_variable"] = loc_val
if "value" in tree:
value = tree["value"]
if "Call" in value:
call = value["Call"]
if "func" in call:
class_name = self.__take_id_or_tree(call["func"])
airflow_operator["class"] = class_name
if "keywords" in call:
keywords = call["keywords"]
for keyword in keywords:
if "keyword" in keyword:
k = keyword["keyword"]
arg = k["arg"]
airflow_operator[arg] = self.__take_value_or_tree(k["value"])
return airflow_operator
def __make_dependencies_bin_op(self, tree, dependencies):
children = []
parents = []
child = None
parent = None
if tree["op"] == "RShift":
child = self.__take_id_or_tree(tree["right"])
parent = self.__take_id_or_tree(tree["left"])
elif tree["op"] == "LShift":
child = self.__take_id_or_tree(tree["left"])
parent = self.__take_id_or_tree(tree["right"])
if child:
if isinstance(child, dict):
if "List" in child:
for c in child["List"]["elts"]:
elif "BinOp" in child:
deps = self.__make_dependencies_bin_op(child["BinOp"], dependencies)
for dep in deps:
if parent:
if isinstance(parent, dict):
if "List" in parent:
for p in parent["List"]["elts"]:
elif "BinOp" in parent:
deps = self.__make_dependencies_bin_op(parent["BinOp"], dependencies)
for dep in deps:
if len(parents) > 0 and len(children) > 0:
# make all-vs-all combinations
for p in parents:
for c in children:
dep = {
'parent': p,
'child': c
if tree["op"] == "RShift":
return children
elif tree["op"] == "LShift":
return parents
return children
def __extract_dep_operations(self, tree):
# extract dependency definition using ">>"
ops = self.__extract_bin_op(tree, "RShift")
if ops:
for op in ops:
deps = []
self.__make_dependencies_bin_op(op, deps)
for dep in deps:
yield dep
# extract dependency definition using "<<"
ops = self.__extract_bin_op(tree, "LShift")
if ops:
for op in ops:
deps = []
self.__make_dependencies_bin_op(op, deps)
for dep in deps:
yield dep
def __extract_dags(self, tree):
dags = {}
calls = self.__extract_func_calls(tree, "DAG")
if calls:
for call in calls:
dag = self.__make_dag(call)
dagid = dag["dag_id"]
dags[dagid] = dag
return dags
def __extract_CORD_event_sensors(self, tree):
operators = {}
calls = self.__extract_func_calls(tree, "CORDEventSensor")
if calls:
for call in calls:
operator = self.__make_airflow_operator(call)
operatorid = operator["task_id"]
operators[operatorid] = operator
return operators
def __extract_CORD_model_sensors(self, tree):
operators = {}
calls = self.__extract_func_calls(tree, "CORDModelSensor")
if calls:
for call in calls:
operator = self.__make_airflow_operator(call)
operatorid = operator["task_id"]
operators[operatorid] = operator
return operators
def __extract_CORD_model_operators(self, tree):
operators = {}
calls = self.__extract_func_calls(tree, "CORDModelOperator")
if calls:
for call in calls:
operator = self.__make_airflow_operator(call)
operatorid = operator["task_id"]
operators[operatorid] = operator
return operators
def __extract_airflow_operators(self, tree):
operators = {}
calls = self.__extract_func_calls_airflow_operators(tree)
if calls:
for call in calls:
operator = self.__make_airflow_operator(call)
operatorid = operator["task_id"]
operators[operatorid] = operator
return operators
def __extract_all_operators(self, tree):
operators = {}
event_sensors = self.__extract_CORD_event_sensors(tree)
if event_sensors:
for task_id in event_sensors:
operators[task_id] = event_sensors[task_id]
model_sensors = self.__extract_CORD_model_sensors(tree)
if model_sensors:
for task_id in model_sensors:
operators[task_id] = model_sensors[task_id]
model_operators = self.__extract_CORD_model_operators(tree)
if model_operators:
for task_id in model_operators:
operators[task_id] = model_operators[task_id]
airflow_operators = self.__extract_airflow_operators(tree)
if airflow_operators:
for task_id in airflow_operators:
# add operators that are not already handled above
if task_id not in operators:
operators[task_id] = airflow_operators[task_id]
return operators
def __extract_dependencies(self, tree):
Build N-N dependencies from fragmented parent-child relations
A node can have multiple parents and multiple children
dependencies = {}
ops = self.__extract_dep_operations(tree)
if ops:
for op in ops:
p = op["parent"]
c = op["child"]
if p in dependencies:
# append to an existing list
node_p = dependencies[p]
if "children" in node_p:
# prevent duplicates
if c not in node_p["children"]:
node_p["children"] = [c]
# create a new
node_p = {
'children': [c]
dependencies[p] = node_p
if c in dependencies:
# append to an existing list
node_c = dependencies[c]
if "parents" in node_c:
# prevent duplicates
if p not in node_c["parents"]:
node_c["parents"] = [p]
# create a new
node_c = {
'parents': [p]
dependencies[c] = node_c
return dependencies
def extract(self):
Build highlevel information of workflows dag, operators and dependencies refers to each other
if self.tree:
dags = self.__extract_dags(self.tree)
operators = self.__extract_all_operators(self.tree)
dependencies = self.__extract_dependencies(self.tree)
dag_dict = {}
for dag_id in dags:
dag = dags[dag_id]
dag_var = dag["local_variable"]
# filter operators that do not belong to the dag
my_operators = {}
my_operators_var = {}
for task_id in operators:
operator = operators[task_id]
if operator["dag"] == dag_var:
# set dag_id
operator["dag_id"] = dag_id
my_operators[task_id] = operator
# this is to help fast search while working with dependencies
operator_local_var = operator["local_variable"]
my_operators_var[operator_local_var] = operator
# filter dependencies that do not belong to the dag
my_dependencies = {}
for task_var in dependencies:
if task_var in my_operators_var:
dependency = dependencies[task_var]
task_id = my_operators_var[task_var]["task_id"]
# convert dependency task_var to task_id
dep = {}
if "children" in dependency:
dep["children"] = []
for child in dependency["children"]:
if child in my_operators_var:
child_task_id = my_operators_var[child]["task_id"]
if "parents" in dependency:
dep["parents"] = []
for parent in dependency["parents"]:
if parent in my_operators_var:
parent_task_id = my_operators_var[parent]["task_id"]
my_dependencies[task_id] = dep
d = {
'dag': dag,
'tasks': my_operators,
'dependencies': my_dependencies
dag_dict[dag_id] = d
return dag_dict
return None
Command-line tool
def print_graffiti():
result = pyfiglet.figlet_format("CORD\nWorkflow\nEssence\nExtractor", font="graffiti")
def get_arg_parser():
parser = argparse.ArgumentParser(description='CORD Workflow Essence Extractor.', prog='essence_extractor')
parser.add_argument('--config', help='locate a configuration file')
parser.add_argument('-o', '--output', help='output file path')
parser.add_argument('-c', '--stdout', action='store_true', help='output to console (STDOUT)')
parser.add_argument('input_file', help='input airflow dag source file')
return parser
def read_config(path):
if os.path.exists(path):
with open(path) as json_config_file:
data = json.load(json_config_file)
return data
return {}
def pretty_format_json(j):
dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
return dumps
# for command-line execution
def main(args):
# check if config path is set
config_file_path = DEFAULT_CONFIG_FILE_PATH
if args.config:
config_file_path = args.config
if os.path.exists(config_file_path):
# read config
config = read_config(config_file_path)
if config:
global progargs
for k in progargs:
# overwrite
if k in config:
progargs[k] = config[k]
log = create_logger(progargs["logging"])
code_filepath = args.input_file
if not os.path.exists(code_filepath):
raise IOError('cannot find an input file - %s' % code_filepath)
output_filepath = './essence.json'
if args.output:
output_filepath = args.output
print_console = False
if args.stdout or output_filepath == '-':
print_console = True
extractor = EssenceExtractor(logger=log)
essence = extractor.extract()
json_string = pretty_format_json(essence)
if print_console:
with open(output_filepath, 'w') as f:
if __name__ == "__main__":
parser = get_arg_parser()
args = parser.parse_args()