Read airflow connection parameters properly
Read configuration json file properly even if it doesn't define all parameters
Add emit command to workflow_ctl
Add some example commands for emit message & register essence
Make docker-compose configuration files to allow communication between containers
Rework kickstarter as api_client does not work properly

Change-Id: I59d00bfe17027a7ab367e6acde6a9eaaed3b6937
diff --git a/src/tools/__init__.py b/src/tools/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/src/tools/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/src/tools/config.json b/src/tools/config.json
new file mode 100644
index 0000000..601d686
--- /dev/null
+++ b/src/tools/config.json
@@ -0,0 +1,4 @@
+{
+    "controller_url": "http://controller:3030",
+    "airflow_bin": "/usr/local/bin"
+}
diff --git a/src/tools/essence_extractor.py b/src/tools/essence_extractor.py
new file mode 100644
index 0000000..6335dc8
--- /dev/null
+++ b/src/tools/essence_extractor.py
@@ -0,0 +1,646 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Essence Extractor
+
+This module extracts essence of airflow workflows
+Following information will be extracted from workflow code
+- DAG info
+- Operator info
+    - CORD-related operators
+    - Airflow operators
+- Dependency info
+"""
+
+import ast
+import json
+import os.path
+import argparse
+import pyfiglet
+
+from multistructlog import create_logger
+
+
+progargs = {
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+
+
+class NoopLogger(object):
+    def __init__(self):
+        pass
+
+    def info(self, *args):
+        pass
+
+    def debug(self, *args):
+        pass
+
+    def error(self, *args):
+        pass
+
+    def warn(self, *args):
+        pass
+
+
+class EssenceExtractor(object):
+    def __init__(self, logger=None):
+        if logger:
+            self.logger = logger
+        else:
+            self.logger = NoopLogger()
+
+        self.tree = None
+
+    def set_logger(self, logger):
+        self.logger = logger
+
+    def get_logger(self):
+        return self.logger
+
+    def get_ast(self):
+        return self.tree
+
+    def parse_code(self, code):
+        tree = ast.parse(code)
+        self.tree = self.__jsonify_ast(tree)
+
+    def parse_codefile(self, filepath):
+        code = None
+        with open(filepath, "r") as f:
+            code = f.read()
+            tree = ast.parse(code, filepath)
+            self.tree = self.__jsonify_ast(tree)
+
+    def __classname(self, cls):
+        return cls.__class__.__name__
+
+    def __jsonify_ast(self, node, level=0):
+        fields = {}
+        for k in node._fields:
+            fields[k] = '...'
+            v = getattr(node, k)
+            if isinstance(v, ast.AST):
+                if v._fields:
+                    fields[k] = self.__jsonify_ast(v)
+                else:
+                    fields[k] = self.__classname(v)
+
+            elif isinstance(v, list):
+                fields[k] = []
+                for e in v:
+                    fields[k].append(self.__jsonify_ast(e))
+
+            elif isinstance(v, str):
+                fields[k] = v
+
+            elif isinstance(v, int) or isinstance(v, float):
+                fields[k] = v
+
+            elif v is None:
+                fields[k] = None
+
+            else:
+                fields[k] = 'unrecognized'
+
+        ret = {
+            self.__classname(node): fields
+        }
+        return ret
+
+    def __recursively_find_elements(self, tree, elem):
+        """
+        traverse AST and find elements
+        """
+        for e in tree:
+            obj = None
+            if isinstance(tree, list):
+                obj = e
+            elif isinstance(tree, dict):
+                obj = tree[e]
+
+            if e == elem:
+                yield obj
+
+            if obj and (isinstance(obj, list) or isinstance(obj, dict)):
+                for y in self.__recursively_find_elements(obj, elem):
+                    yield y
+
+    def __extract_func_calls(self, tree, func_name):
+        """
+        extract function calls with assignment
+        """
+        assigns = self.__recursively_find_elements(tree, "Assign")
+        if assigns:
+            for assign in assigns:
+                found = False
+
+                calls = self.__recursively_find_elements(assign, "Call")
+                if calls:
+                    for call in calls:
+                        funcs = self.__recursively_find_elements(call, "func")
+                        if funcs:
+                            for func in funcs:
+                                if "Name" in func:
+                                    name = func["Name"]
+                                    if "ctx" in name and "id" in name:
+                                        # found function
+                                        if name["id"] == func_name:
+                                            found = True
+
+                if found:
+                    yield assign
+
+    def __extract_func_calls_airflow_operators(self, tree):
+        """
+        extract only airflow operators which end with "*Operator" or "*Sensor"
+        """
+        assigns = self.__recursively_find_elements(tree, "Assign")
+        if assigns:
+            for assign in assigns:
+                found = False
+
+                calls = self.__recursively_find_elements(assign, "Call")
+                if calls:
+                    for call in calls:
+                        funcs = self.__recursively_find_elements(call, "func")
+                        if funcs:
+                            for func in funcs:
+                                if "Name" in func:
+                                    name = func["Name"]
+                                    if "ctx" in name and "id" in name:
+                                        # found function
+                                        if name["id"].endswith(("Operator", "Sensor")):
+                                            found = True
+
+                if found:
+                    yield assign
+
+    def __extract_bin_op(self, tree, op_name):
+        """
+        extract binary operation such as >>, <<
+        """
+        ops = self.__recursively_find_elements(tree, "BinOp")
+        if ops:
+            for op in ops:
+                if op["op"] == op_name:
+                    yield op
+
+    def __take_string_or_tree(self, tree):
+        if "Str" in tree:
+            return tree["Str"]["s"]
+        return tree
+
+    def __take_num_or_tree(self, tree):
+        if "Num" in tree:
+            return tree["Num"]["n"]
+        return tree
+
+    def __take_id_or_tree(self, tree):
+        if "Name" in tree:
+            return tree["Name"]["id"]
+        return tree
+
+    def __take_name_constant_or_tree(self, tree):
+        if "NameConstant" in tree:
+            return tree["NameConstant"]["value"]
+        return tree
+
+    def __take_value_or_tree(self, tree):
+        if "Str" in tree:
+            return tree["Str"]["s"]
+        elif "Num" in tree:
+            return tree["Num"]["n"]
+        elif "Name" in tree:
+            val = tree["Name"]["id"]
+            if val in ["True", "False"]:
+                return bool(val)
+            elif val == "None":
+                return None
+            return val
+        elif "NameConstant" in tree:
+            val = tree["NameConstant"]["value"]
+            if val in ["True", "False"]:
+                return bool(val)
+            elif val == "None":
+                return None
+            return val
+        elif "List" in tree:
+            vals = []
+            if "elts" in tree["List"]:
+                elts = tree["List"]["elts"]
+                for elt in elts:
+                    val = self.__take_value_or_tree(elt)
+                    vals.append(val)
+            return vals
+        return tree
+
+    def __make_dag(self, tree):
+        loc_val = None
+        dag_id = None
+
+        if "targets" in tree:
+            targets = tree["targets"]
+            loc_val = self.__take_id_or_tree(targets[0])
+
+        if "value" in tree:
+            value = tree["value"]
+            if "Call" in value:
+                call = value["Call"]
+                if "keywords" in call:
+                    keywords = call["keywords"]
+                    for keyword in keywords:
+                        if "keyword" in keyword:
+                            k = keyword["keyword"]
+                            if k["arg"] == "dag_id":
+                                dag_id = self.__take_string_or_tree(k["value"])
+
+        return {
+            'local_variable': loc_val,
+            'dag_id': dag_id
+        }
+
+    def __make_airflow_operator(self, tree):
+        airflow_operator = {}
+
+        if "targets" in tree:
+            targets = tree["targets"]
+            loc_val = self.__take_id_or_tree(targets[0])
+            airflow_operator["local_variable"] = loc_val
+
+        if "value" in tree:
+            value = tree["value"]
+            if "Call" in value:
+                call = value["Call"]
+                if "func" in call:
+                    class_name = self.__take_id_or_tree(call["func"])
+                    airflow_operator["class"] = class_name
+
+                if "keywords" in call:
+                    keywords = call["keywords"]
+                    for keyword in keywords:
+                        if "keyword" in keyword:
+                            k = keyword["keyword"]
+                            arg = k["arg"]
+                            airflow_operator[arg] = self.__take_value_or_tree(k["value"])
+
+        return airflow_operator
+
+    def __make_dependencies_bin_op(self, tree, dependencies):
+        children = []
+        parents = []
+        child = None
+        parent = None
+
+        if tree["op"] == "RShift":
+            child = self.__take_id_or_tree(tree["right"])
+            parent = self.__take_id_or_tree(tree["left"])
+        elif tree["op"] == "LShift":
+            child = self.__take_id_or_tree(tree["left"])
+            parent = self.__take_id_or_tree(tree["right"])
+
+        if child:
+            if isinstance(child, dict):
+                if "List" in child:
+                    for c in child["List"]["elts"]:
+                        children.append(self.__take_id_or_tree(c))
+                elif "BinOp" in child:
+                    deps = self.__make_dependencies_bin_op(child["BinOp"], dependencies)
+                    for dep in deps:
+                        children.append(dep)
+                else:
+                    children.append(self.__take_id_or_tree(child))
+            else:
+                children.append(child)
+
+        if parent:
+            if isinstance(parent, dict):
+                if "List" in parent:
+                    for p in parent["List"]["elts"]:
+                        parents.append(self.__take_id_or_tree(p))
+                elif "BinOp" in parent:
+                    deps = self.__make_dependencies_bin_op(parent["BinOp"], dependencies)
+                    for dep in deps:
+                        parents.append(dep)
+                else:
+                    parents.append(self.__take_id_or_tree(parent))
+            else:
+                parents.append(parent)
+
+        if len(parents) > 0 and len(children) > 0:
+            # make all-vs-all combinations
+            for p in parents:
+                for c in children:
+                    dep = {
+                        'parent': p,
+                        'child': c
+                    }
+                    dependencies.append(dep)
+
+        if tree["op"] == "RShift":
+            return children
+        elif tree["op"] == "LShift":
+            return parents
+        return children
+
+    def __extract_dep_operations(self, tree):
+        # extract dependency definition using ">>"
+        ops = self.__extract_bin_op(tree, "RShift")
+        if ops:
+            for op in ops:
+                deps = []
+                self.__make_dependencies_bin_op(op, deps)
+                for dep in deps:
+                    yield dep
+
+        # extract dependency definition using "<<"
+        ops = self.__extract_bin_op(tree, "LShift")
+        if ops:
+            for op in ops:
+                deps = []
+                self.__make_dependencies_bin_op(op, deps)
+                for dep in deps:
+                    yield dep
+
+    def __extract_dags(self, tree):
+        dags = {}
+        calls = self.__extract_func_calls(tree, "DAG")
+        if calls:
+            for call in calls:
+                dag = self.__make_dag(call)
+                dagid = dag["dag_id"]
+                dags[dagid] = dag
+        return dags
+
+    def __extract_CORD_event_sensors(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls(tree, "CORDEventSensor")
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_CORD_model_sensors(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls(tree, "CORDModelSensor")
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_CORD_model_operators(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls(tree, "CORDModelOperator")
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_airflow_operators(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls_airflow_operators(tree)
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_all_operators(self, tree):
+        operators = {}
+        event_sensors = self.__extract_CORD_event_sensors(tree)
+        if event_sensors:
+            for task_id in event_sensors:
+                operators[task_id] = event_sensors[task_id]
+
+        model_sensors = self.__extract_CORD_model_sensors(tree)
+        if model_sensors:
+            for task_id in model_sensors:
+                operators[task_id] = model_sensors[task_id]
+
+        model_operators = self.__extract_CORD_model_operators(tree)
+        if model_operators:
+            for task_id in model_operators:
+                operators[task_id] = model_operators[task_id]
+
+        airflow_operators = self.__extract_airflow_operators(tree)
+        if airflow_operators:
+            for task_id in airflow_operators:
+                # add operators that are not already handled above
+                if task_id not in operators:
+                    operators[task_id] = airflow_operators[task_id]
+
+        return operators
+
+    def __extract_dependencies(self, tree):
+        """
+        Build N-N dependencies from fragmented parent-child relations
+        A node can have multiple parents and multiple children
+        """
+        dependencies = {}
+        ops = self.__extract_dep_operations(tree)
+        if ops:
+            for op in ops:
+                p = op["parent"]
+                c = op["child"]
+
+                if p in dependencies:
+                    # append to an existing list
+                    node_p = dependencies[p]
+                    if "children" in node_p:
+                        # prevent duplicates
+                        if c not in node_p["children"]:
+                            node_p["children"].append(c)
+                    else:
+                        node_p["children"] = [c]
+                else:
+                    # create a new
+                    node_p = {
+                        'children': [c]
+                    }
+                    dependencies[p] = node_p
+
+                if c in dependencies:
+                    # append to an existing list
+                    node_c = dependencies[c]
+                    if "parents" in node_c:
+                        # prevent duplicates
+                        if p not in node_c["parents"]:
+                            node_c["parents"].append(p)
+                    else:
+                        node_c["parents"] = [p]
+                else:
+                    # create a new
+                    node_c = {
+                        'parents': [p]
+                    }
+                    dependencies[c] = node_c
+
+        return dependencies
+
+    def extract(self):
+        """
+        Build highlevel information of workflows dag, operators and dependencies refers to each other
+        """
+        if self.tree:
+            dags = self.__extract_dags(self.tree)
+            operators = self.__extract_all_operators(self.tree)
+            dependencies = self.__extract_dependencies(self.tree)
+
+            dag_dict = {}
+            for dag_id in dags:
+                dag = dags[dag_id]
+                dag_var = dag["local_variable"]
+
+                # filter operators that do not belong to the dag
+                my_operators = {}
+                my_operators_var = {}
+                for task_id in operators:
+                    operator = operators[task_id]
+                    if operator["dag"] == dag_var:
+                        # set dag_id
+                        operator["dag_id"] = dag_id
+                        my_operators[task_id] = operator
+
+                        # this is to help fast search while working with dependencies
+                        operator_local_var = operator["local_variable"]
+                        my_operators_var[operator_local_var] = operator
+
+                # filter dependencies that do not belong to the dag
+                my_dependencies = {}
+                for task_var in dependencies:
+                    if task_var in my_operators_var:
+                        dependency = dependencies[task_var]
+                        task_id = my_operators_var[task_var]["task_id"]
+
+                        # convert dependency task_var to task_id
+                        dep = {}
+                        if "children" in dependency:
+                            dep["children"] = []
+                            for child in dependency["children"]:
+                                if child in my_operators_var:
+                                    child_task_id = my_operators_var[child]["task_id"]
+                                    dep["children"].append(child_task_id)
+
+                        if "parents" in dependency:
+                            dep["parents"] = []
+                            for parent in dependency["parents"]:
+                                if parent in my_operators_var:
+                                    parent_task_id = my_operators_var[parent]["task_id"]
+                                    dep["parents"].append(parent_task_id)
+
+                        my_dependencies[task_id] = dep
+
+                d = {
+                    'dag': dag,
+                    'tasks': my_operators,
+                    'dependencies': my_dependencies
+                }
+                dag_dict[dag_id] = d
+
+            return dag_dict
+        else:
+            return None
+
+
+"""
+Command-line tool
+"""
+
+
+def print_graffiti():
+    result = pyfiglet.figlet_format("CORD\nWorkflow\nEssence\nExtractor", font="graffiti")
+    print(result)
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Essence Extractor.', prog='essence_extractor')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('-o', '--output', help='output file path')
+    parser.add_argument('-c', '--stdout', action='store_true', help='output to console (STDOUT)')
+    parser.add_argument('input_file', help='input airflow dag source file')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def pretty_format_json(j):
+    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+    return dumps
+
+
+# for command-line execution
+def main(args):
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                if k in config:
+                    progargs[k] = config[k]
+
+    log = create_logger(progargs["logging"])
+
+    code_filepath = args.input_file
+    if not os.path.exists(code_filepath):
+        raise IOError('cannot find an input file - %s' % code_filepath)
+
+    output_filepath = './essence.json'
+    if args.output:
+        output_filepath = args.output
+
+    print_console = False
+    if args.stdout or output_filepath == '-':
+        print_console = True
+
+    extractor = EssenceExtractor(logger=log)
+    extractor.parse_codefile(code_filepath)
+    essence = extractor.extract()
+    json_string = pretty_format_json(essence)
+    if print_console:
+        print(json_string)
+    else:
+        print_graffiti()
+        with open(output_filepath, 'w') as f:
+            f.write(json_string)
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)
diff --git a/src/tools/kickstarter.py b/src/tools/kickstarter.py
new file mode 100644
index 0000000..7c5a9de
--- /dev/null
+++ b/src/tools/kickstarter.py
@@ -0,0 +1,377 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Kickstarter
+
+This module kickstarts Airflow workflows for requests from Workflow Controller
+"""
+
+import json
+import os
+import os.path
+import argparse
+import pyfiglet
+import traceback
+import socket
+import time
+import subprocess
+
+from multistructlog import create_logger
+from cord_workflow_controller_client.manager import Manager
+
+# We can't use experimental APIs for managing workflows/workflow runs of Airflow
+# - REST API does not provide sufficient features at this version
+# - API_Client does not work if a caller is not in main thread
+
+# from importlib import import_module
+# from airflow import configuration as AirflowConf
+# from airflow import api
+# from airflow.models import DagRun
+
+try:
+    from urllib.parse import urlparse
+except ImportError:
+    from urlparse import urlparse
+
+log = create_logger()
+manager = None
+# airflow_client = None
+
+airflow_bin = os.getenv('AIRFLOW_BIN', '/usr/local/bin')
+
+progargs = {
+    'controller_url': 'http://localhost:3030',
+    'airflow_bin': airflow_bin,
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+SOCKET_CONNECTION_TEST_TIMEOUT = 5
+DEFAULT_CONNECTION_TEST_DELAY = 5
+DEFAULT_CONNECTION_TEST_RETRY = 999999
+
+
+def print_graffiti():
+    result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
+    print(result)
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('--controller', help='CORD Workflow Controller URL')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def pretty_format_json(j):
+    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+    return dumps
+
+
+def is_port_open(url, timeout):
+    o = urlparse(url)
+    hostname = o.hostname
+    port = o.port
+
+    if (not port) or port <= 0:
+        if o.scheme.lower() == 'http':
+            port = 80
+        elif o.scheme.lower() == 'https':
+            port = 443
+
+    succeed = False
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.settimeout(timeout)
+    try:
+        sock.connect((hostname, int(port)))
+        sock.shutdown(socket.SHUT_RDWR)
+        succeed = True
+    except BaseException:
+        pass
+    finally:
+        sock.close()
+
+    return succeed
+
+
+def check_web_live(url,
+                   retry=DEFAULT_CONNECTION_TEST_RETRY,
+                   delay=DEFAULT_CONNECTION_TEST_DELAY,
+                   timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
+    ipup = False
+    for _ in range(retry):
+        if is_port_open(url, timeout):
+            ipup = True
+            break
+        else:
+            time.sleep(delay)
+    return ipup
+
+
+def get_airflow_cli():
+    return os.path.join(progargs['airflow_bin'], 'airflow')
+
+
+def check_airflow_live():
+    try:
+        subprocess.check_call([get_airflow_cli(), 'list_dags'])
+        return True
+    except subprocess.CalledProcessError as e:
+        log.error(e)
+        return False
+
+
+def on_kickstart(workflow_id, workflow_run_id):
+    # if manager and airflow_client:
+    if manager:
+        try:
+            created = False
+            log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
+            # message = airflow_client.trigger_dag(
+            #     dag_id=workflow_id,
+            #     run_id=workflow_run_id
+            # )
+            # log.info('> Airflow Response: %s' % message)
+
+            output = subprocess.Popen(
+                [get_airflow_cli(), 'trigger_dag', '-r', workflow_run_id, workflow_id],
+                stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                encoding='utf8'
+            )
+
+            for line in output.stdout.readlines():
+                if 'Created <DagRun ' in line:
+                    created = True
+                    break
+
+            if created:
+                # let controller know that the new workflow run is created
+                log.info('> Notifying a new workflow run (%s) for a workflow (%s)' % (workflow_run_id, workflow_id))
+                manager.report_new_workflow_run(workflow_id, workflow_run_id)
+        except subprocess.CalledProcessError as e:
+            # when shell exited with non-zero code
+            log.error('> Error : %s' % e)
+        except Exception as e:
+            log.error('> Error : %s' % e)
+            log.debug(traceback.format_exc())
+
+
+def on_check_status(workflow_id, workflow_run_id):
+    # if manager and airflow_client:
+    if manager:
+        try:
+            status = 'unknown'
+            log.info('> Checking status of workflow run (%s)' % (workflow_run_id))
+
+            # run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
+            # status = 'unknown'
+            # if run:
+            #     # run is an array
+            #     # this should be one of ['success', 'running', 'failed']
+            #     status = run[0].state
+            # else:
+            #     log.error(
+            #         'Cannot retrieve status of a workflow run (%s, %s)' %
+            #         (workflow_id, workflow_run_id)
+            #     )
+            #     status = 'unknown'
+
+            output = subprocess.Popen(
+                [get_airflow_cli(), 'list_dag_runs', workflow_id],
+                stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                encoding='utf8'
+            )
+
+            title = False
+            body = False
+            for line in output.stdout.readlines():
+                if 'DAG RUNS' in line:
+                    title = True
+                elif title and ('--------' in line):
+                    body = True
+                elif body:
+                    # id  | run_id | state | execution_date | state_date |
+                    if workflow_run_id in line:
+                        # found the line
+                        # 1 | ssss | running | 2019-07-25T21:35:06+00:00 |
+                        # 2019-07-25T21:35:06.242130+00:00 |
+                        fields = line.split('|')
+                        status = fields[2].strip()
+                        break
+
+            log.info('> status : %s' % status)
+
+            # let controller know the status of the workflow run
+            log.info(
+                '> Updating status of a workflow run (%s) - status : %s' %
+                (workflow_run_id, status)
+            )
+            manager.report_workflow_run_status(workflow_id, workflow_run_id, status)
+        except subprocess.CalledProcessError as e:
+            # when shell exited with non-zero code
+            log.error('> Error : %s' % e)
+        except Exception as e:
+            log.error('> Error : %s' % e)
+            log.debug(traceback.format_exc())
+
+
+def on_check_status_bulk(requests):
+    # if manager and airflow_client:
+    if requests:
+        req = {}
+        for req in requests:
+            workflow_id = req['workflow_id']
+            workflow_run_id = req['workflow_run_id']
+
+            if workflow_id not in req:
+                req[workflow_id] = []
+
+            req[workflow_id].append(workflow_run_id)
+
+        if manager:
+            try:
+                log.info('> Checking status of workflow runs')
+
+                statuses = []
+                for workflow_id in req:
+                    workflow_run_ids = req[workflow_id]
+
+                    output = subprocess.Popen(
+                        [get_airflow_cli(), 'list_dag_runs', workflow_id],
+                        stdout=subprocess.PIPE, stderr=subprocess.STDOUT,
+                        encoding='utf8'
+                    )
+
+                    title = False
+                    body = False
+                    for line in output.stdout.readlines():
+                        if 'DAG RUNS' in line:
+                            title = True
+                        elif title and ('--------' in line):
+                            body = True
+                        elif body:
+                            # id  | run_id | state | execution_date | state_date |
+                            for workflow_run_id in workflow_run_ids:
+                                if workflow_run_id in line:
+                                    # found the line
+                                    # 1 | ssss | running | 2019-07-25T21:35:06+00:00 |
+                                    # 2019-07-25T21:35:06.242130+00:00 |
+                                    fields = line.split('|')
+                                    status = fields[2].strip()
+
+                                    log.info('> status of a workflow run (%s) : %s' % (workflow_run_id, status))
+                                    statuses.append({
+                                        'workflow_id': workflow_id,
+                                        'workflow_run_id': workflow_run_id,
+                                        'status': status
+                                    })
+
+                # let controller know statuses of workflow runs
+                log.info('> Updating status of workflow runs')
+                manager.report_workflow_run_status_bulk(statuses)
+            except subprocess.CalledProcessError as e:
+                # when shell exited with non-zero code
+                log.error('> Error : %s' % e)
+            except Exception as e:
+                log.error('> Error : %s' % e)
+                log.debug(traceback.format_exc())
+
+
+# for command-line execution
+def main(args):
+    print_graffiti()
+
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                if k in config:
+                    progargs[k] = config[k]
+
+    global log
+    log = create_logger(progargs["logging"])
+
+    if args.controller:
+        progargs['controller_url'] = args.controller
+
+    print('=CONFIG=')
+    config_json_string = pretty_format_json(progargs)
+    print(config_json_string)
+    print('\n')
+
+    # checking controller and airflow web interface
+    log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
+    controller_live = check_web_live(progargs['controller_url'])
+    if not controller_live:
+        log.error('Controller (%s) appears to be down' % progargs['controller_url'])
+        raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
+
+    airflow_live = check_airflow_live()
+    if not airflow_live:
+        log.error('Airflow appears to be down')
+        raise IOError('Airflow appears to be down')
+
+    # connect to workflow controller
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    global manager
+    manager = Manager(logger=log)
+    manager.connect(progargs['controller_url'])
+    manager.set_handlers({
+        'kickstart': on_kickstart,
+        'check_status': on_check_status,
+        'check_status_bulk': on_check_status_bulk
+    })
+
+    # connect to airflow
+    # global airflow_client
+    # log.info('Connecting to Airflow...')
+
+    # api.load_auth()
+    # api_module = import_module(AirflowConf.get('cli', 'api_client'))
+    # airflow_client = api_module.Client(
+    #     api_base_url=AirflowConf.get('cli', 'endpoint_url'),
+    #     auth=api.api_auth.client_auth
+    # )
+
+    log.info('Waiting for kickstart events from Workflow Controller...')
+    try:
+        manager.wait()
+    finally:
+        log.info('Terminating the program...')
+        manager.disconnect()
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)
diff --git a/src/tools/workflow_ctl.py b/src/tools/workflow_ctl.py
new file mode 100644
index 0000000..751356a
--- /dev/null
+++ b/src/tools/workflow_ctl.py
@@ -0,0 +1,198 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Control CLI
+
+This module kickstarts Airflow workflows for requests from Workflow Controller
+"""
+
+import json
+import os.path
+import argparse
+import re
+
+from multistructlog import create_logger
+from cord_workflow_controller_client.manager import Manager
+from cord_workflow_controller_client.probe import Probe
+
+
+log = create_logger()
+progargs = {
+    'controller_url': 'http://localhost:3030',
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+
+
+class InputError(Exception):
+    """Exception raised for errors in the input.
+
+    Attributes:
+        message -- explanation of the error
+    """
+
+    def __init__(self, message):
+        self.message = message
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Control CLI.', prog='workflow_ctl')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('--controller', help='CORD Workflow Controller URL')
+    parser.add_argument('cmd', help='Command')
+    parser.add_argument('cmd_args', help='Arguments for the command', nargs='*')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def read_json_file(filename):
+    if filename:
+        with open(filename, 'r') as f:
+            return json.load(f)
+    return None
+
+
+def read_json_string(str):
+    if str:
+        try:
+            return json.loads(str)
+        except json.decoder.JSONDecodeError:
+            return load_dirty_json(str)
+    return None
+
+
+def load_dirty_json(dirty_json):
+    regex_replace = [
+        (r"([ \{,:\[])(u)?'([^']+)'", r'\1"\3"'),
+        (r" False([, \}\]])", r' false\1'),
+        (r" True([, \}\]])", r' true\1')
+    ]
+    for r, s in regex_replace:
+        dirty_json = re.sub(r, s, dirty_json)
+    clean_json = json.loads(dirty_json)
+    return clean_json
+
+
+def register_workflow(args):
+    # expect args should be a list of essence files
+    if not args:
+        raise InputError('no essence file is given')
+
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    manager = Manager(logger=log)
+    connected = False
+    results = []
+
+    try:
+        manager.connect(progargs['controller_url'])
+        connected = True
+
+        for essence_file in args:
+            if not os.path.exists(essence_file):
+                log.error('cannot find the essence file (%s)' % essence_file)
+                continue
+
+            essence = read_json_file(essence_file)
+            log.info('Registering an essence file (%s)...' % essence_file)
+            result = manager.register_workflow_essence(essence)
+            if result:
+                log.info('registered an essence file (%s)' % essence_file)
+            else:
+                log.error('cannot register an essence file (%s)' % essence_file)
+
+            results.append(result)
+    finally:
+        if connected:
+            manager.disconnect()
+
+    return results
+
+
+def emit_event(args):
+    # expect args should be a json event
+    if not args or len(args) != 2:
+        raise InputError('parameter should be <topic> <message>')
+
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    probe = Probe(logger=log)
+    connected = False
+
+    try:
+        probe.connect(progargs['controller_url'])
+        connected = True
+
+        topic = args[0]
+        message = read_json_string(args[1])
+
+        log.info('Emitting an event (%s - %s)...' % (topic, message))
+        probe.emit_event(topic, message)
+        log.info('Emitted an event (%s - %s)...' % (topic, message))
+        return True
+    finally:
+        if connected:
+            probe.disconnect()
+
+    return False
+
+
+# for command-line execution
+def main(args):
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                if k in config:
+                    progargs[k] = config[k]
+
+    global log
+    log = create_logger(progargs["logging"])
+
+    if args.controller:
+        progargs['controller_url'] = args.controller
+
+    if args.cmd:
+        if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
+            results = register_workflow(args.cmd_args)
+            print(results)
+        elif args.cmd.strip().lower() in ['emit', 'send', 'event', 'message']:
+            results = emit_event(args.cmd_args)
+            print(results)
+        else:
+            log.error('unknown command %s' % args.cmd)
+            raise InputError('unknown command %s' % args.cmd)
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)