Rework directory structures
Implement kickstarter daemon
Implement a command line interface to manage workflow registration
Refine workflow essence extractor code

Change-Id: I61fd1f497a55af501c579e70a9f6c51f32f5e15c
diff --git a/.gitignore b/.gitignore
index 7e0d89e..5811e16 100644
--- a/.gitignore
+++ b/.gitignore
@@ -8,5 +8,5 @@
 .tox
 .DS_Store
 nose2-results.xml
-venv-service
+venv-cordworkflowairflow
 *.pyc
\ No newline at end of file
diff --git a/MANIFEST.in b/MANIFEST.in
new file mode 100644
index 0000000..45c47fc
--- /dev/null
+++ b/MANIFEST.in
@@ -0,0 +1,3 @@
+include README.rst
+include requirements.txt
+include VERSION
\ No newline at end of file
diff --git a/Makefile b/Makefile
index 5e0520b..7b82614 100644
--- a/Makefile
+++ b/Makefile
@@ -18,24 +18,18 @@
 # Variables
 VERSION                  ?= $(shell cat ./VERSION)
 
-## Testing related
-CORDWORKFLOWAIRFLOW_LIBRARIES            := $(wildcard lib/*)
-
 # Targets
 all: test
 
 # Create a virtualenv and install all the libraries
-venv-workflowengine:
+venv-cordworkflowairflow:
 	virtualenv $@;\
     source ./$@/bin/activate ; set -u ;\
     pip install -r requirements.txt nose2 ;\
-    pip install -e lib/cord-workflow-essence-extractor
+    pip install -e ./
 
 # tests
-test: lib-test unit-test
-
-lib-test:
-	for lib in $(CORDWORKFLOWAIRFLOW_LIBRARIES); do pushd $$lib; tox; popd; done
+test: unit-test
 
 unit-test:
 	tox
@@ -47,12 +41,9 @@
     .coverage \
     coverage.xml \
     nose2-results.xml \
-    venv-workflowengine \
-    lib/*/.tox \
-    lib/*/build \
-    lib/*/dist \
-    lib/*/*.egg-info \
-    lib/*/.coverage \
-    lib/*/coverage.xml \
-    lib/*/*results.xml \
-    lib/*/*/VERSION
+    venv-cordworkflowairflow \
+    .tox \
+    build \
+    dist \
+    *.egg-info \
+    *results.xml
diff --git a/README.md b/README.md
deleted file mode 100644
index ea09804..0000000
--- a/README.md
+++ /dev/null
@@ -1,3 +0,0 @@
-# cord_workflow_airflow
-
-A workflow engine for CORD implemented on top of Airflow.
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..6dd0408
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,27 @@
+Airflow Extensions for CORD Workflow
+====================================
+This library provides airflow extensions and tools for CORD Workflow.
+Three packages are provided as below:
+
+Essence Extractor
+-----------------
+A tool to extract **essence** from Airflow workflow code. Extracted essence is
+used to register the workflow to CORD Workflow Controller.
+
+Workflow Kickstarter
+--------------------
+A daemon that monitors kickstart events from CORD Workflow Controller, and
+instantiate Airflow Workflows.
+
+Airflow Extensions
+------------------
+A set of Airflow extensions that provide Python library to access CORD data
+models and Airflow operators/sensors.
+
+
+
+Note
+----
+Installing Airflow 1.10 has an issue of PEP 517 when installing it with pip 19.1.
+PEP 517 makes installing pendulum 1.4.4 which is required by Airflow failed.
+Higher version of pendulum is not compatible to Airflow.
\ No newline at end of file
diff --git a/VERSION b/VERSION
index b9fd26f..50e6e9d 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.1.0-dev1
\ No newline at end of file
+0.2.0-dev1
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/.gitignore b/lib/cord-workflow-essence-extractor/.gitignore
deleted file mode 100644
index f3a7ffa..0000000
--- a/lib/cord-workflow-essence-extractor/.gitignore
+++ /dev/null
@@ -1,11 +0,0 @@
-.noseids
-build
-cordworkflowessenceextractor.egg-info
-dist
-.coverage
-coverage.xml
-cover
-.DS_Store
-
-# setup.py copies this, don't commit it
-VERSION
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/MANIFEST.in b/lib/cord-workflow-essence-extractor/MANIFEST.in
deleted file mode 100644
index b3a0b9c..0000000
--- a/lib/cord-workflow-essence-extractor/MANIFEST.in
+++ /dev/null
@@ -1,3 +0,0 @@
-include README.rst
-include requirements.txt
-include cordworkflowessenceextractor/VERSION
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/README.rst b/lib/cord-workflow-essence-extractor/README.rst
deleted file mode 100644
index 680d130..0000000
--- a/lib/cord-workflow-essence-extractor/README.rst
+++ /dev/null
@@ -1,11 +0,0 @@
-CORD Workflow Essence Extractor
-===============================
-
-Extract workflow essence from Airflow workflow code. The essence information
-will be later passed to Workflow Controller for control.
-
-The essence extractor extracts following information from Airflow workflow code written in python:
-- DAG information (DAG ID)
-- Operators (class name, event and other parameters)
-- Task dependencies (parents and children)
-
diff --git a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py
deleted file mode 100644
index a0a748a..0000000
--- a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py
+++ /dev/null
@@ -1,18 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from .workflow_essence_extractor import parse_codefile, extract_all
-
-__all__ = ["parse_codefile", "extract_all"]
diff --git a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py
deleted file mode 100644
index bb62493..0000000
--- a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py
+++ /dev/null
@@ -1,552 +0,0 @@
-#!/usr/bin/env python3
-
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow Essence Extractor
-
-This module extracts essence of airflow workflows
-Following information will be extracted from workflow code
-- DAG info
-- Operator info
-    - XOS-related operators
-    - Airflow operators
-- Dependency info
-"""
-
-import ast
-import sys
-import json
-import os.path
-
-
-def classname(cls):
-    return cls.__class__.__name__
-
-
-def jsonify_ast(node, level=0):
-    fields = {}
-    for k in node._fields:
-        fields[k] = '...'
-        v = getattr(node, k)
-        if isinstance(v, ast.AST):
-            if v._fields:
-                fields[k] = jsonify_ast(v)
-            else:
-                fields[k] = classname(v)
-
-        elif isinstance(v, list):
-            fields[k] = []
-            for e in v:
-                fields[k].append(jsonify_ast(e))
-
-        elif isinstance(v, str):
-            fields[k] = v
-
-        elif isinstance(v, int) or isinstance(v, float):
-            fields[k] = v
-
-        elif v is None:
-            fields[k] = None
-
-        else:
-            fields[k] = 'unrecognized'
-
-    ret = {
-        classname(node): fields
-    }
-    return ret
-
-
-def parse(code):
-    lines = code.split("\n")
-    if len(lines) == 1:
-        if code.endswith(".py") and os.path.exists(code):
-            return parse_codefile(code)
-    return parse_code(code)
-
-
-def parse_code(code):
-    tree = ast.parse(code)
-    return jsonify_ast(tree)
-
-
-def parse_codefile(code_filepath):
-    code = None
-    with open(code_filepath, "r") as f:
-        code = f.read()
-    tree = ast.parse(code, code_filepath)
-    return jsonify_ast(tree)
-
-
-def pretty_print_json(j):
-    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
-    print(dumps)
-
-
-def recursively_find_elements(tree, elem):
-    """
-    traverse AST and find elements
-    """
-    for e in tree:
-        obj = None
-        if isinstance(tree, list):
-            obj = e
-        elif isinstance(tree, dict):
-            obj = tree[e]
-
-        if e == elem:
-            yield obj
-
-        if obj and (isinstance(obj, list) or isinstance(obj, dict)):
-            for y in recursively_find_elements(obj, elem):
-                yield y
-
-
-def extract_func_calls(tree, func_name):
-    """
-    extract function calls with assignment
-    """
-    assigns = recursively_find_elements(tree, "Assign")
-    if assigns:
-        for assign in assigns:
-            found = False
-
-            calls = recursively_find_elements(assign, "Call")
-            if calls:
-                for call in calls:
-                    funcs = recursively_find_elements(call, "func")
-                    if funcs:
-                        for func in funcs:
-                            if "Name" in func:
-                                name = func["Name"]
-                                if "ctx" in name and "id" in name:
-                                    # found function
-                                    if name["id"] == func_name:
-                                        found = True
-
-            if found:
-                yield assign
-
-
-def extract_func_calls_airflow_operators(tree):
-    """
-    extract only airflow operators which end with "*Operator" or "*Sensor"
-    """
-    assigns = recursively_find_elements(tree, "Assign")
-    if assigns:
-        for assign in assigns:
-            found = False
-
-            calls = recursively_find_elements(assign, "Call")
-            if calls:
-                for call in calls:
-                    funcs = recursively_find_elements(call, "func")
-                    if funcs:
-                        for func in funcs:
-                            if "Name" in func:
-                                name = func["Name"]
-                                if "ctx" in name and "id" in name:
-                                    # found function
-                                    if name["id"].endswith(("Operator", "Sensor")):
-                                        found = True
-
-            if found:
-                yield assign
-
-
-def extract_bin_op(tree, op_name):
-    """
-    extract binary operation such as >>, <<
-    """
-    ops = recursively_find_elements(tree, "BinOp")
-    if ops:
-        for op in ops:
-            if op["op"] == op_name:
-                yield op
-
-
-def take_string_or_tree(tree):
-    if "Str" in tree:
-        return tree["Str"]["s"]
-    return tree
-
-
-def take_num_or_tree(tree):
-    if "Num" in tree:
-        return tree["Num"]["n"]
-    return tree
-
-
-def take_id_or_tree(tree):
-    if "Name" in tree:
-        return tree["Name"]["id"]
-    return tree
-
-
-def take_name_constant_or_tree(tree):
-    if "NameConstant" in tree:
-        return tree["NameConstant"]["value"]
-    return tree
-
-
-def take_value_or_tree(tree):
-    if "Str" in tree:
-        return tree["Str"]["s"]
-    elif "Num" in tree:
-        return tree["Num"]["n"]
-    elif "Name" in tree:
-        val = tree["Name"]["id"]
-        if val in ["True", "False"]:
-            return bool(val)
-        elif val == "None":
-            return None
-        return val
-    elif "NameConstant" in tree:
-        val = tree["NameConstant"]["value"]
-        if val in ["True", "False"]:
-            return bool(val)
-        elif val == "None":
-            return None
-        return val
-    elif "List" in tree:
-        vals = []
-        if "elts" in tree["List"]:
-            elts = tree["List"]["elts"]
-            for elt in elts:
-                val = take_value_or_tree(elt)
-                vals.append(val)
-        return vals
-    return tree
-
-
-def make_dag(tree):
-    loc_val = None
-    dag_id = None
-
-    if "targets" in tree:
-        targets = tree["targets"]
-        loc_val = take_id_or_tree(targets[0])
-
-    if "value" in tree:
-        value = tree["value"]
-        if "Call" in value:
-            call = value["Call"]
-            if "keywords" in call:
-                keywords = call["keywords"]
-                for keyword in keywords:
-                    if "keyword" in keyword:
-                        k = keyword["keyword"]
-                        if k["arg"] == "dag_id":
-                            dag_id = take_string_or_tree(k["value"])
-
-    return {
-        'local_variable': loc_val,
-        'dag_id': dag_id
-    }
-
-
-def make_airflow_operator(tree):
-    airflow_operator = {}
-
-    if "targets" in tree:
-        targets = tree["targets"]
-        loc_val = take_id_or_tree(targets[0])
-        airflow_operator["local_variable"] = loc_val
-
-    if "value" in tree:
-        value = tree["value"]
-        if "Call" in value:
-            call = value["Call"]
-            if "func" in call:
-                class_name = take_id_or_tree(call["func"])
-                airflow_operator["class"] = class_name
-
-            if "keywords" in call:
-                keywords = call["keywords"]
-                for keyword in keywords:
-                    if "keyword" in keyword:
-                        k = keyword["keyword"]
-                        arg = k["arg"]
-                        airflow_operator[arg] = take_value_or_tree(k["value"])
-
-    return airflow_operator
-
-
-def make_dependencies_bin_op(tree, dependencies):
-    children = []
-    parents = []
-    child = None
-    parent = None
-
-    if tree["op"] == "RShift":
-        child = take_id_or_tree(tree["right"])
-        parent = take_id_or_tree(tree["left"])
-    elif tree["op"] == "LShift":
-        child = take_id_or_tree(tree["left"])
-        parent = take_id_or_tree(tree["right"])
-
-    if child:
-        if isinstance(child, dict):
-            if "List" in child:
-                for c in child["List"]["elts"]:
-                    children.append(take_id_or_tree(c))
-            elif "BinOp" in child:
-                deps = make_dependencies_bin_op(child["BinOp"], dependencies)
-                for dep in deps:
-                    children.append(dep)
-            else:
-                children.append(take_id_or_tree(child))
-        else:
-            children.append(child)
-
-    if parent:
-        if isinstance(parent, dict):
-            if "List" in parent:
-                for p in parent["List"]["elts"]:
-                    parents.append(take_id_or_tree(p))
-            elif "BinOp" in parent:
-                deps = make_dependencies_bin_op(parent["BinOp"], dependencies)
-                for dep in deps:
-                    parents.append(dep)
-            else:
-                parents.append(take_id_or_tree(parent))
-        else:
-            parents.append(parent)
-
-    if len(parents) > 0 and len(children) > 0:
-        # make all-vs-all combinations
-        for p in parents:
-            for c in children:
-                dep = {
-                    'parent': p,
-                    'child': c
-                }
-                dependencies.append(dep)
-
-    if tree["op"] == "RShift":
-        return children
-    elif tree["op"] == "LShift":
-        return parents
-    return children
-
-
-def extract_dep_operations(tree):
-    # extract dependency definition using ">>"
-    ops = extract_bin_op(tree, "RShift")
-    if ops:
-        for op in ops:
-            deps = []
-            make_dependencies_bin_op(op, deps)
-            for dep in deps:
-                yield dep
-
-    # extract dependency definition using "<<"
-    ops = extract_bin_op(tree, "LShift")
-    if ops:
-        for op in ops:
-            deps = []
-            make_dependencies_bin_op(op, deps)
-            for dep in deps:
-                yield dep
-
-
-def extract_dags(tree):
-    dags = {}
-    calls = extract_func_calls(tree, "DAG")
-    if calls:
-        for call in calls:
-            dag = make_dag(call)
-            dagid = dag["dag_id"]
-            dags[dagid] = dag
-    return dags
-
-
-def extract_XOS_event_sensors(tree):
-    operators = {}
-    calls = extract_func_calls(tree, "XOSEventSensor")
-    if calls:
-        for call in calls:
-            operator = make_airflow_operator(call)
-            operatorid = operator["task_id"]
-            operators[operatorid] = operator
-    return operators
-
-
-def extract_XOS_model_sensors(tree):
-    operators = {}
-    calls = extract_func_calls(tree, "XOSModelSensor")
-    if calls:
-        for call in calls:
-            operator = make_airflow_operator(call)
-            operatorid = operator["task_id"]
-            operators[operatorid] = operator
-    return operators
-
-
-def extract_airflow_operators(tree):
-    operators = {}
-    calls = extract_func_calls_airflow_operators(tree)
-    if calls:
-        for call in calls:
-            operator = make_airflow_operator(call)
-            operatorid = operator["task_id"]
-            operators[operatorid] = operator
-    return operators
-
-
-def extract_all_operators(tree):
-    operators = {}
-    event_sensors = extract_XOS_event_sensors(tree)
-    if event_sensors:
-        for event_sensor in event_sensors:
-            operators[event_sensor] = event_sensors[event_sensor]
-
-    model_sensors = extract_XOS_model_sensors(tree)
-    if model_sensors:
-        for model_sensor in model_sensors:
-            operators[model_sensor] = model_sensors[model_sensor]
-
-    airflow_operators = extract_airflow_operators(tree)
-    if airflow_operators:
-        for airflow_operator in airflow_operators:
-            operators[airflow_operator] = airflow_operators[airflow_operator]
-
-    return operators
-
-
-def extract_dependencies(tree):
-    """
-    Build N-N dependencies from fragmented parent-child relations
-    A node can have multiple parents and multiple children
-    """
-    dependencies = {}
-    ops = extract_dep_operations(tree)
-    if ops:
-        for op in ops:
-            p = op["parent"]
-            c = op["child"]
-
-            if p in dependencies:
-                # append to an existing list
-                node_p = dependencies[p]
-                if "children" in node_p:
-                    # prevent duplicates
-                    if c not in node_p["children"]:
-                        node_p["children"].append(c)
-                else:
-                    node_p["children"] = [c]
-            else:
-                # create a new
-                node_p = {
-                    'children': [c]
-                }
-                dependencies[p] = node_p
-
-            if c in dependencies:
-                # append to an existing list
-                node_c = dependencies[c]
-                if "parents" in node_c:
-                    # prevent duplicates
-                    if p not in node_c["parents"]:
-                        node_c["parents"].append(p)
-                else:
-                    node_c["parents"] = [p]
-            else:
-                # create a new
-                node_c = {
-                    'parents': [p]
-                }
-                dependencies[c] = node_c
-
-    return dependencies
-
-
-def extract_all(tree):
-    """
-    Build highlevel information of workflows dag, operators and dependencies refers to each other
-    """
-    dags = extract_dags(tree)
-    operators = extract_all_operators(tree)
-    dependencies = extract_dependencies(tree)
-
-    dag_dict = {}
-    for dag_id in dags:
-        dag = dags[dag_id]
-        dag_var = dag["local_variable"]
-
-        # filter operators that do not belong to the dag
-        my_operators = {}
-        my_operators_var = {}
-        for task_id in operators:
-            operator = operators[task_id]
-            if operator["dag"] == dag_var:
-                # set dag_id
-                operator["dag_id"] = dag_id
-                my_operators[task_id] = operator
-
-                # this is to help fast search while working with dependencies
-                operator_local_var = operator["local_variable"]
-                my_operators_var[operator_local_var] = operator
-
-        # filter dependencies that do not belong to the dag
-        my_dependencies = {}
-        for task_var in dependencies:
-            if task_var in my_operators_var:
-                dependency = dependencies[task_var]
-                task_id = my_operators_var[task_var]["task_id"]
-
-                # convert dependency task_var to task_id
-                dep = {}
-                if "children" in dependency:
-                    dep["children"] = []
-                    for child in dependency["children"]:
-                        if child in my_operators_var:
-                            child_task_id = my_operators_var[child]["task_id"]
-                            dep["children"].append(child_task_id)
-
-                if "parents" in dependency:
-                    dep["parents"] = []
-                    for parent in dependency["parents"]:
-                        if parent in my_operators_var:
-                            parent_task_id = my_operators_var[parent]["task_id"]
-                            dep["parents"].append(parent_task_id)
-
-                my_dependencies[task_id] = dep
-
-        d = {
-            'dag': dag,
-            'tasks': my_operators,
-            'dependencies': my_dependencies
-        }
-        dag_dict[dag_id] = d
-
-    return dag_dict
-
-
-# for command-line execution
-def main(argv):
-    if len(argv) < 1:
-        sys.exit("Error: Need a filepath")
-
-    code_filepath = argv[0]
-
-    tree = parse_codefile(code_filepath)
-    all = extract_all(tree)
-    pretty_print_json(all)
-
-
-if __name__ == "__main__":
-    main(sys.argv[1:])
diff --git a/lib/cord-workflow-essence-extractor/requirements.txt b/lib/cord-workflow-essence-extractor/requirements.txt
deleted file mode 100644
index e69de29..0000000
--- a/lib/cord-workflow-essence-extractor/requirements.txt
+++ /dev/null
diff --git a/lib/cord-workflow-essence-extractor/tox.ini b/lib/cord-workflow-essence-extractor/tox.ini
deleted file mode 100644
index eb3f3f6..0000000
--- a/lib/cord-workflow-essence-extractor/tox.ini
+++ /dev/null
@@ -1,50 +0,0 @@
-; Copyright 2019-present Open Networking Foundation
-;
-; Licensed under the Apache License, Version 2.0 (the "License");
-; you may not use this file except in compliance with the License.
-; You may obtain a copy of the License at
-;
-; http://www.apache.org/licenses/LICENSE-2.0
-;
-; Unless required by applicable law or agreed to in writing, software
-; distributed under the License is distributed on an "AS IS" BASIS,
-; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-; See the License for the specific language governing permissions and
-; limitations under the License.
-
-[tox]
-envlist = py27,py35,py36,py37
-skip_missing_interpreters = true
-skipsdist = True
-
-[testenv]
-deps =
-  -r requirements.txt
-  nose2
-  flake8
-  
-commands =
-  nose2 -c tox.ini --verbose --junit-xml
-  flake8
-
-[flake8]
-max-line-length = 119
-exclude =
-  .tox
-  build
-  cord-workflow-essence-extractor-tests/workflow-examples
-
-[unittest]
-plugins = nose2.plugins.junitxml
-
-[junit-xml]
-path = nose2-results.xml
-
-[coverage]
-always-on = True
-coverage =
-  cordworkflowessenceextractor
-  cord-workflow-essence-extractor-tests
-coverage-report =
-  term
-  xml
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index e69de29..f35ec37 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -0,0 +1,7 @@
+jinja2~=2.10.1
+werkzeug~=0.15
+apache-airflow~=1.10.2
+multistructlog~=2.1.0
+requests~=2.22.0
+pyfiglet~=0.7
+cord-workflow-controller-client~=0.1.0
diff --git a/lib/cord-workflow-essence-extractor/setup.py b/setup.py
similarity index 71%
rename from lib/cord-workflow-essence-extractor/setup.py
rename to setup.py
index 1e0311d..fc03af8 100644
--- a/lib/cord-workflow-essence-extractor/setup.py
+++ b/setup.py
@@ -1,4 +1,4 @@
-# Copyright 2018-present Open Networking Foundation
+# Copyright 2019-present Open Networking Foundation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,9 +14,6 @@
 
 from __future__ import absolute_import
 
-import os
-from shutil import copyfile
-
 from setuptools import setup
 
 
@@ -26,10 +23,7 @@
 
 
 def version():
-    # Copy VERSION file of parent to module directory if not found
-    if not os.path.exists("cordworkflowessenceextractor/VERSION"):
-        copyfile("../../VERSION", "cordworkflowessenceextractor/VERSION")
-    with open("cordworkflowessenceextractor/VERSION") as f:
+    with open("VERSION") as f:
         return f.read().strip()
 
 
@@ -43,15 +37,20 @@
 
 
 setup(
-    name="cordworkflowessenceextractor",
+    name="cord_workflow_airflow_extensions",
     version=version(),
-    description="Extract workflow essence from airflow workflow code",
+    description="Airflow extensions for CORD Workflow Manager",
     long_description=readme(),
     author="Illyoung Choi",
     author_email="iychoi@opennetworking.org",
     classifiers=["License :: OSI Approved :: Apache Software License"],
     license="Apache v2",
-    packages=["cordworkflowessenceextractor"],
+    packages=[
+        "cord_workflow_airflow_extensions"
+    ],
+    package_dir={
+        "cord_workflow_airflow_extensions": "src/cord_workflow_airflow_extensions"
+    },
     install_requires=parse_requirements("requirements.txt"),
     include_package_data=True,
 )
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py b/src/cord_workflow_airflow_extensions/__init__.py
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py
rename to src/cord_workflow_airflow_extensions/__init__.py
diff --git a/src/cord_workflow_airflow_extensions/essence_extractor.py b/src/cord_workflow_airflow_extensions/essence_extractor.py
new file mode 100644
index 0000000..901d40f
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/essence_extractor.py
@@ -0,0 +1,626 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Essence Extractor
+
+This module extracts essence of airflow workflows
+Following information will be extracted from workflow code
+- DAG info
+- Operator info
+    - XOS-related operators
+    - Airflow operators
+- Dependency info
+"""
+
+import ast
+import json
+import os.path
+import argparse
+import pyfiglet
+
+from multistructlog import create_logger
+
+
+progargs = {
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+
+
+class NoopLogger(object):
+    def __init__(self):
+        pass
+
+    def info(self, *args):
+        pass
+
+    def debug(self, *args):
+        pass
+
+    def error(self, *args):
+        pass
+
+    def warn(self, *args):
+        pass
+
+
+class EssenceExtractor(object):
+    def __init__(self, logger=None):
+        if logger:
+            self.logger = logger
+        else:
+            self.logger = NoopLogger()
+
+        self.tree = None
+
+    def set_logger(self, logger):
+        self.logger = logger
+
+    def get_logger(self):
+        return self.logger
+
+    def get_ast(self):
+        return self.tree
+
+    def parse_code(self, code):
+        tree = ast.parse(code)
+        self.tree = self.__jsonify_ast(tree)
+
+    def parse_codefile(self, filepath):
+        code = None
+        with open(filepath, "r") as f:
+            code = f.read()
+            tree = ast.parse(code, filepath)
+            self.tree = self.__jsonify_ast(tree)
+
+    def __classname(self, cls):
+        return cls.__class__.__name__
+
+    def __jsonify_ast(self, node, level=0):
+        fields = {}
+        for k in node._fields:
+            fields[k] = '...'
+            v = getattr(node, k)
+            if isinstance(v, ast.AST):
+                if v._fields:
+                    fields[k] = self.__jsonify_ast(v)
+                else:
+                    fields[k] = self.__classname(v)
+
+            elif isinstance(v, list):
+                fields[k] = []
+                for e in v:
+                    fields[k].append(self.__jsonify_ast(e))
+
+            elif isinstance(v, str):
+                fields[k] = v
+
+            elif isinstance(v, int) or isinstance(v, float):
+                fields[k] = v
+
+            elif v is None:
+                fields[k] = None
+
+            else:
+                fields[k] = 'unrecognized'
+
+        ret = {
+            self.__classname(node): fields
+        }
+        return ret
+
+    def __recursively_find_elements(self, tree, elem):
+        """
+        traverse AST and find elements
+        """
+        for e in tree:
+            obj = None
+            if isinstance(tree, list):
+                obj = e
+            elif isinstance(tree, dict):
+                obj = tree[e]
+
+            if e == elem:
+                yield obj
+
+            if obj and (isinstance(obj, list) or isinstance(obj, dict)):
+                for y in self.__recursively_find_elements(obj, elem):
+                    yield y
+
+    def __extract_func_calls(self, tree, func_name):
+        """
+        extract function calls with assignment
+        """
+        assigns = self.__recursively_find_elements(tree, "Assign")
+        if assigns:
+            for assign in assigns:
+                found = False
+
+                calls = self.__recursively_find_elements(assign, "Call")
+                if calls:
+                    for call in calls:
+                        funcs = self.__recursively_find_elements(call, "func")
+                        if funcs:
+                            for func in funcs:
+                                if "Name" in func:
+                                    name = func["Name"]
+                                    if "ctx" in name and "id" in name:
+                                        # found function
+                                        if name["id"] == func_name:
+                                            found = True
+
+                if found:
+                    yield assign
+
+    def __extract_func_calls_airflow_operators(self, tree):
+        """
+        extract only airflow operators which end with "*Operator" or "*Sensor"
+        """
+        assigns = self.__recursively_find_elements(tree, "Assign")
+        if assigns:
+            for assign in assigns:
+                found = False
+
+                calls = self.__recursively_find_elements(assign, "Call")
+                if calls:
+                    for call in calls:
+                        funcs = self.__recursively_find_elements(call, "func")
+                        if funcs:
+                            for func in funcs:
+                                if "Name" in func:
+                                    name = func["Name"]
+                                    if "ctx" in name and "id" in name:
+                                        # found function
+                                        if name["id"].endswith(("Operator", "Sensor")):
+                                            found = True
+
+                if found:
+                    yield assign
+
+    def __extract_bin_op(self, tree, op_name):
+        """
+        extract binary operation such as >>, <<
+        """
+        ops = self.__recursively_find_elements(tree, "BinOp")
+        if ops:
+            for op in ops:
+                if op["op"] == op_name:
+                    yield op
+
+    def __take_string_or_tree(self, tree):
+        if "Str" in tree:
+            return tree["Str"]["s"]
+        return tree
+
+    def __take_num_or_tree(self, tree):
+        if "Num" in tree:
+            return tree["Num"]["n"]
+        return tree
+
+    def __take_id_or_tree(self, tree):
+        if "Name" in tree:
+            return tree["Name"]["id"]
+        return tree
+
+    def __take_name_constant_or_tree(self, tree):
+        if "NameConstant" in tree:
+            return tree["NameConstant"]["value"]
+        return tree
+
+    def __take_value_or_tree(self, tree):
+        if "Str" in tree:
+            return tree["Str"]["s"]
+        elif "Num" in tree:
+            return tree["Num"]["n"]
+        elif "Name" in tree:
+            val = tree["Name"]["id"]
+            if val in ["True", "False"]:
+                return bool(val)
+            elif val == "None":
+                return None
+            return val
+        elif "NameConstant" in tree:
+            val = tree["NameConstant"]["value"]
+            if val in ["True", "False"]:
+                return bool(val)
+            elif val == "None":
+                return None
+            return val
+        elif "List" in tree:
+            vals = []
+            if "elts" in tree["List"]:
+                elts = tree["List"]["elts"]
+                for elt in elts:
+                    val = self.__take_value_or_tree(elt)
+                    vals.append(val)
+            return vals
+        return tree
+
+    def __make_dag(self, tree):
+        loc_val = None
+        dag_id = None
+
+        if "targets" in tree:
+            targets = tree["targets"]
+            loc_val = self.__take_id_or_tree(targets[0])
+
+        if "value" in tree:
+            value = tree["value"]
+            if "Call" in value:
+                call = value["Call"]
+                if "keywords" in call:
+                    keywords = call["keywords"]
+                    for keyword in keywords:
+                        if "keyword" in keyword:
+                            k = keyword["keyword"]
+                            if k["arg"] == "dag_id":
+                                dag_id = self.__take_string_or_tree(k["value"])
+
+        return {
+            'local_variable': loc_val,
+            'dag_id': dag_id
+        }
+
+    def __make_airflow_operator(self, tree):
+        airflow_operator = {}
+
+        if "targets" in tree:
+            targets = tree["targets"]
+            loc_val = self.__take_id_or_tree(targets[0])
+            airflow_operator["local_variable"] = loc_val
+
+        if "value" in tree:
+            value = tree["value"]
+            if "Call" in value:
+                call = value["Call"]
+                if "func" in call:
+                    class_name = self.__take_id_or_tree(call["func"])
+                    airflow_operator["class"] = class_name
+
+                if "keywords" in call:
+                    keywords = call["keywords"]
+                    for keyword in keywords:
+                        if "keyword" in keyword:
+                            k = keyword["keyword"]
+                            arg = k["arg"]
+                            airflow_operator[arg] = self.__take_value_or_tree(k["value"])
+
+        return airflow_operator
+
+    def __make_dependencies_bin_op(self, tree, dependencies):
+        children = []
+        parents = []
+        child = None
+        parent = None
+
+        if tree["op"] == "RShift":
+            child = self.__take_id_or_tree(tree["right"])
+            parent = self.__take_id_or_tree(tree["left"])
+        elif tree["op"] == "LShift":
+            child = self.__take_id_or_tree(tree["left"])
+            parent = self.__take_id_or_tree(tree["right"])
+
+        if child:
+            if isinstance(child, dict):
+                if "List" in child:
+                    for c in child["List"]["elts"]:
+                        children.append(self.__take_id_or_tree(c))
+                elif "BinOp" in child:
+                    deps = self.__make_dependencies_bin_op(child["BinOp"], dependencies)
+                    for dep in deps:
+                        children.append(dep)
+                else:
+                    children.append(self.__take_id_or_tree(child))
+            else:
+                children.append(child)
+
+        if parent:
+            if isinstance(parent, dict):
+                if "List" in parent:
+                    for p in parent["List"]["elts"]:
+                        parents.append(self.__take_id_or_tree(p))
+                elif "BinOp" in parent:
+                    deps = self.__make_dependencies_bin_op(parent["BinOp"], dependencies)
+                    for dep in deps:
+                        parents.append(dep)
+                else:
+                    parents.append(self.__take_id_or_tree(parent))
+            else:
+                parents.append(parent)
+
+        if len(parents) > 0 and len(children) > 0:
+            # make all-vs-all combinations
+            for p in parents:
+                for c in children:
+                    dep = {
+                        'parent': p,
+                        'child': c
+                    }
+                    dependencies.append(dep)
+
+        if tree["op"] == "RShift":
+            return children
+        elif tree["op"] == "LShift":
+            return parents
+        return children
+
+    def __extract_dep_operations(self, tree):
+        # extract dependency definition using ">>"
+        ops = self.__extract_bin_op(tree, "RShift")
+        if ops:
+            for op in ops:
+                deps = []
+                self.__make_dependencies_bin_op(op, deps)
+                for dep in deps:
+                    yield dep
+
+        # extract dependency definition using "<<"
+        ops = self.__extract_bin_op(tree, "LShift")
+        if ops:
+            for op in ops:
+                deps = []
+                self.__make_dependencies_bin_op(op, deps)
+                for dep in deps:
+                    yield dep
+
+    def __extract_dags(self, tree):
+        dags = {}
+        calls = self.__extract_func_calls(tree, "DAG")
+        if calls:
+            for call in calls:
+                dag = self.__make_dag(call)
+                dagid = dag["dag_id"]
+                dags[dagid] = dag
+        return dags
+
+    def __extract_XOS_event_sensors(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls(tree, "XOSEventSensor")
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_XOS_model_sensors(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls(tree, "XOSModelSensor")
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_airflow_operators(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls_airflow_operators(tree)
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_all_operators(self, tree):
+        operators = {}
+        event_sensors = self.__extract_XOS_event_sensors(tree)
+        if event_sensors:
+            for event_sensor in event_sensors:
+                operators[event_sensor] = event_sensors[event_sensor]
+
+        model_sensors = self.__extract_XOS_model_sensors(tree)
+        if model_sensors:
+            for model_sensor in model_sensors:
+                operators[model_sensor] = model_sensors[model_sensor]
+
+        airflow_operators = self.__extract_airflow_operators(tree)
+        if airflow_operators:
+            for airflow_operator in airflow_operators:
+                operators[airflow_operator] = airflow_operators[airflow_operator]
+
+        return operators
+
+    def __extract_dependencies(self, tree):
+        """
+        Build N-N dependencies from fragmented parent-child relations
+        A node can have multiple parents and multiple children
+        """
+        dependencies = {}
+        ops = self.__extract_dep_operations(tree)
+        if ops:
+            for op in ops:
+                p = op["parent"]
+                c = op["child"]
+
+                if p in dependencies:
+                    # append to an existing list
+                    node_p = dependencies[p]
+                    if "children" in node_p:
+                        # prevent duplicates
+                        if c not in node_p["children"]:
+                            node_p["children"].append(c)
+                    else:
+                        node_p["children"] = [c]
+                else:
+                    # create a new
+                    node_p = {
+                        'children': [c]
+                    }
+                    dependencies[p] = node_p
+
+                if c in dependencies:
+                    # append to an existing list
+                    node_c = dependencies[c]
+                    if "parents" in node_c:
+                        # prevent duplicates
+                        if p not in node_c["parents"]:
+                            node_c["parents"].append(p)
+                    else:
+                        node_c["parents"] = [p]
+                else:
+                    # create a new
+                    node_c = {
+                        'parents': [p]
+                    }
+                    dependencies[c] = node_c
+
+        return dependencies
+
+    def extract(self):
+        """
+        Build highlevel information of workflows dag, operators and dependencies refers to each other
+        """
+        if self.tree:
+            dags = self.__extract_dags(self.tree)
+            operators = self.__extract_all_operators(self.tree)
+            dependencies = self.__extract_dependencies(self.tree)
+
+            dag_dict = {}
+            for dag_id in dags:
+                dag = dags[dag_id]
+                dag_var = dag["local_variable"]
+
+                # filter operators that do not belong to the dag
+                my_operators = {}
+                my_operators_var = {}
+                for task_id in operators:
+                    operator = operators[task_id]
+                    if operator["dag"] == dag_var:
+                        # set dag_id
+                        operator["dag_id"] = dag_id
+                        my_operators[task_id] = operator
+
+                        # this is to help fast search while working with dependencies
+                        operator_local_var = operator["local_variable"]
+                        my_operators_var[operator_local_var] = operator
+
+                # filter dependencies that do not belong to the dag
+                my_dependencies = {}
+                for task_var in dependencies:
+                    if task_var in my_operators_var:
+                        dependency = dependencies[task_var]
+                        task_id = my_operators_var[task_var]["task_id"]
+
+                        # convert dependency task_var to task_id
+                        dep = {}
+                        if "children" in dependency:
+                            dep["children"] = []
+                            for child in dependency["children"]:
+                                if child in my_operators_var:
+                                    child_task_id = my_operators_var[child]["task_id"]
+                                    dep["children"].append(child_task_id)
+
+                        if "parents" in dependency:
+                            dep["parents"] = []
+                            for parent in dependency["parents"]:
+                                if parent in my_operators_var:
+                                    parent_task_id = my_operators_var[parent]["task_id"]
+                                    dep["parents"].append(parent_task_id)
+
+                        my_dependencies[task_id] = dep
+
+                d = {
+                    'dag': dag,
+                    'tasks': my_operators,
+                    'dependencies': my_dependencies
+                }
+                dag_dict[dag_id] = d
+
+            return dag_dict
+        else:
+            return None
+
+
+"""
+Command-line tool
+"""
+
+
+def print_graffiti():
+    result = pyfiglet.figlet_format("CORD\nWorkflow\nEssence\nExtractor", font="graffiti")
+    print(result)
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Essence Extractor.', prog='essence_extractor')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('-o', '--output', help='output file path')
+    parser.add_argument('-c', '--stdout', action='store_true', help='output to console (STDOUT)')
+    parser.add_argument('input_file', help='input airflow dag source file')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def pretty_format_json(j):
+    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+    return dumps
+
+
+# for command-line execution
+def main(args):
+    print_graffiti()
+
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                progargs[k] = config[k]
+
+    log = create_logger(progargs["logging"])
+
+    code_filepath = args.input_file
+    if os.path.exists(code_filepath):
+        raise 'cannot find an input file - %s' % code_filepath
+
+    extractor = EssenceExtractor(logger=log)
+    extractor.parse_codefile(code_filepath)
+    essence = extractor.extract()
+
+    output_filepath = './essence.json'
+    if args.output:
+        output_filepath = args.output
+
+    json_string = pretty_format_json(essence)
+    if args.stdout or output_filepath == '-':
+        print(json_string)
+    else:
+        with open(output_filepath, 'w') as f:
+            f.write(json_string)
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)
diff --git a/src/cord_workflow_airflow_extensions/kickstarter.py b/src/cord_workflow_airflow_extensions/kickstarter.py
new file mode 100644
index 0000000..2a0be45
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/kickstarter.py
@@ -0,0 +1,220 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Kickstarter
+
+This module kickstarts Airflow workflows for requests from Workflow Controller
+"""
+
+import json
+import os.path
+import argparse
+import pyfiglet
+import traceback
+import socket
+import time
+
+from multistructlog import create_logger
+from cord_workflow_controller_client.manager import Manager
+from airflow.api.client.json_client import Client as AirflowClient
+from requests.auth import HTTPBasicAuth
+from urlparse import urlparse
+
+
+log = create_logger()
+manager = None
+airflow_client = None
+
+progargs = {
+    'controller_url': 'http://localhost:3030',
+    'airflow_url': 'http://localhost:8080',
+    'airflow_username': '',
+    'airflow_password': '',
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+SOCKET_CONNECTION_TEST_TIMEOUT = 5
+DEFAULT_CONNECTION_TEST_DELAY = 5
+DEFAULT_CONNECTION_TEST_RETRY = 999999
+
+
+def print_graffiti():
+    result = pyfiglet.figlet_format("CORD\nWorkflow\nKickstarter", font="graffiti")
+    print(result)
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('--controller', help='CORD Workflow Controller URL')
+    parser.add_argument('--airflow', help='Airflow REST URL')
+    parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
+    parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def pretty_format_json(j):
+    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+    return dumps
+
+
+def is_port_open(url, timeout):
+    o = urlparse(url)
+    hostname = o.hostname
+    port = o.port
+
+    if (not port) or port <= 0:
+        if o.scheme.lower() == 'http':
+            port = 80
+        elif o.scheme.lower() == 'https':
+            port = 443
+
+    succeed = False
+    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+    sock.settimeout(timeout)
+    try:
+        sock.connect((hostname, int(port)))
+        sock.shutdown(socket.SHUT_RDWR)
+        succeed = True
+    except BaseException:
+        pass
+    finally:
+        sock.close()
+
+    return succeed
+
+
+def check_web_live(url,
+                   retry=DEFAULT_CONNECTION_TEST_RETRY,
+                   delay=DEFAULT_CONNECTION_TEST_DELAY,
+                   timeout=SOCKET_CONNECTION_TEST_TIMEOUT):
+    ipup = False
+    for _ in range(retry):
+        if is_port_open(url, timeout):
+            ipup = True
+            break
+        else:
+            time.sleep(delay)
+    return ipup
+
+
+def on_kickstart(workflow_id, workflow_run_id):
+    if manager and airflow_client:
+        try:
+            log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
+
+            airflow_client.trigger_dag(dag_id=workflow_id, run_id=workflow_run_id)
+
+            # let controller know that the new workflow run is created
+            log.info('> Notifying a workflow (%s), a workflow run (%s)' % (workflow_id, workflow_run_id))
+            manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+
+            log.info('> OK')
+        except Exception as e:
+            log.error('> Error : %s' % e)
+            log.debug(traceback.format_exc())
+
+
+# for command-line execution
+def main(args):
+    print_graffiti()
+
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                progargs[k] = config[k]
+
+    global log
+    log = create_logger(progargs["logging"])
+
+    if args.airflow:
+        progargs['airflow_url'] = args.airflow
+
+    if args.controller:
+        progargs['controller_url'] = args.controller
+
+    if args.airflow_user:
+        progargs['airflow_user'] = args.airflow_user
+
+    if args.airflow_passwd:
+        progargs['airflow_passwd'] = args.airflow_passwd
+
+    print('=CONFIG=')
+    config_json_string = pretty_format_json(progargs)
+    print(config_json_string)
+    print('\n')
+
+    # checking controller and airflow web interface
+    log.info('Checking if Workflow Controller (%s) is live...' % progargs['controller_url'])
+    controller_live = check_web_live(progargs['controller_url'])
+    if not controller_live:
+        log.error('Controller (%s) appears to be down' % progargs['controller_url'])
+        raise 'Controller (%s) appears to be down' % progargs['controller_url']
+
+    log.info('Checking if Airflow (%s) is live...' % progargs['airflow_url'])
+    airflow_live = check_web_live(progargs['airflow_url'])
+    if not airflow_live:
+        log.error('Airflow (%s) appears to be down' % progargs['airflow_url'])
+        raise 'Airflow (%s) appears to be down' % progargs['airflow_url']
+
+    # connect to workflow controller
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    global manager
+    manager = Manager(logger=log)
+    manager.connect(progargs['controller_url'])
+    manager.set_handlers({'kickstart': on_kickstart})
+
+    # connect to airflow
+    global airflow_client
+    log.info('Connecting to Airflow (%s)...' % progargs['airflow_url'])
+    http_auth = None
+    if progargs['airflow_user'] and progargs['airflow_passwd']:
+        log.info('Using a username %s' % progargs['airflow_user'])
+        http_auth = HTTPBasicAuth(progargs['airflow_user'], progargs['airflow_passwd'])
+
+    airflow_client = AirflowClient(progargs['airflow_url'], auth=http_auth)
+
+    log.info('Waiting for kickstart events from Workflow Controller...')
+    try:
+        manager.wait()
+    finally:
+        log.info('Terminating the program...')
+        manager.disconnect()
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)
diff --git a/src/cord_workflow_airflow_extensions/workflow_ctl.py b/src/cord_workflow_airflow_extensions/workflow_ctl.py
new file mode 100644
index 0000000..e32215b
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/workflow_ctl.py
@@ -0,0 +1,148 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Control CLI
+
+This module kickstarts Airflow workflows for requests from Workflow Controller
+"""
+
+import json
+import os.path
+import argparse
+
+from multistructlog import create_logger
+from cord_workflow_controller_client.manager import Manager
+
+
+log = create_logger()
+progargs = {
+    'controller_url': 'http://localhost:3030',
+    'airflow_url': 'http://localhost:8080',
+    'airflow_username': '',
+    'airflow_password': '',
+    'logging': None
+}
+
+DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+
+
+def get_arg_parser():
+    parser = argparse.ArgumentParser(description='CORD Workflow Control CLI.', prog='workflow_ctl')
+    parser.add_argument('--config', help='locate a configuration file')
+    parser.add_argument('--controller', help='CORD Workflow Controller URL')
+    parser.add_argument('--airflow', help='Airflow REST URL')
+    parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
+    parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
+    parser.add_argument('cmd', help='Command')
+    parser.add_argument('cmd_args', help='Arguments for the command', nargs='*')
+    return parser
+
+
+def read_config(path):
+    if os.path.exists(path):
+        with open(path) as json_config_file:
+            data = json.load(json_config_file)
+            return data
+    return {}
+
+
+def read_json_file(filename):
+    if filename:
+        with open(filename, 'r') as f:
+            return json.load(f)
+    return None
+
+
+def register_workflow(args):
+    # expect args should be a list of essence files
+    if not args:
+        raise 'no essence file is given'
+
+    log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
+    manager = Manager(logger=log)
+    connected = False
+    results = []
+
+    try:
+        manager.connect(progargs['controller_url'])
+        connected = True
+
+        for essence_file in args:
+            if not os.path.exists(essence_file):
+                log.error('cannot find the essence file (%s)' % essence_file)
+                continue
+
+            essence = read_json_file(essence_file)
+            log.info('Registering an essence file (%s)...' % essence_file)
+            result = manager.register_workflow_essence(essence)
+            if result:
+                log.inof('registered an essence file (%s)' % essence_file)
+            else:
+                log.error('cannot register an essence file (%s)' % essence_file)
+
+            results.append(result)
+    finally:
+        if connected:
+            manager.disconnect()
+
+    return results
+
+
+# for command-line execution
+def main(args):
+    # check if config path is set
+    config_file_path = DEFAULT_CONFIG_FILE_PATH
+    if args.config:
+        config_file_path = args.config
+
+    if os.path.exists(config_file_path):
+        # read config
+        config = read_config(config_file_path)
+        if config:
+            global progargs
+            for k in progargs:
+                # overwrite
+                progargs[k] = config[k]
+
+    global log
+    log = create_logger(progargs["logging"])
+
+    if args.airflow:
+        progargs['airflow_url'] = args.airflow
+
+    if args.controller:
+        progargs['controller_url'] = args.controller
+
+    if args.airflow_user:
+        progargs['airflow_user'] = args.airflow_user
+
+    if args.airflow_passwd:
+        progargs['airflow_passwd'] = args.airflow_passwd
+
+    if args.cmd:
+        if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
+            results = register_workflow(args.cmd_args)
+            print(results)
+        else:
+            log.error('unknown command %s' % args.cmd)
+            raise 'unknown command %s' % args.cmd
+
+
+if __name__ == "__main__":
+    parser = get_arg_parser()
+    args = parser.parse_args()
+    main(args)
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py b/test/__init__.py
similarity index 100%
copy from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py
copy to test/__init__.py
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py b/test/test_essence_extractor.py
similarity index 81%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py
rename to test/test_essence_extractor.py
index 8190914..8024611 100644
--- a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py
+++ b/test/test_essence_extractor.py
@@ -15,13 +15,14 @@
 from __future__ import absolute_import
 import unittest
 import json
-import cordworkflowessenceextractor.workflow_essence_extractor as extractor
-
 import os
 import collections
 
+from cord_workflow_airflow_extensions.essence_extractor import EssenceExtractor
+
+
 test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
-examples_dir = os.path.join(test_path, "workflow-examples")
+examples_dir = os.path.join(test_path, "workflow_examples")
 extension_expected_result = ".expected.json"
 
 try:
@@ -30,6 +31,7 @@
     basestring = str
 
 
+# convert unicode string object to plain string object
 def convert(data):
     if isinstance(data, basestring):
         return str(data)
@@ -47,10 +49,9 @@
         return data
 
 
-class TestParse(unittest.TestCase):
-
+class TestEssenceExtractor(unittest.TestCase):
     """
-    Try parse all examples under workflow-examples dir.
+    Try extract essence from all examples under workflow-examples dir.
     Then compares results with expected solution.
     """
 
@@ -66,15 +67,16 @@
             return True
         return False
 
-    def test_parse(self):
+    def testExtract(self):
         dags = [f for f in os.listdir(examples_dir) if self.isDagFile(f)]
-
         for dag in dags:
             dag_path = os.path.join(examples_dir, dag)
-            tree = extractor.parse_codefile(dag_path)
-            workflow_info = extractor.extract_all(tree)
 
-            # check if its expected solution fil
+            essence_extractor = EssenceExtractor()
+            essence_extractor.parse_codefile(dag_path)
+            workflow_info = essence_extractor.extract()
+
+            # find its solution file
             expected_result_file = dag_path + extension_expected_result
             self.assertTrue(os.path.exists(expected_result_file))
 
diff --git a/test/test_kickstarter.py b/test/test_kickstarter.py
new file mode 100644
index 0000000..c121a57
--- /dev/null
+++ b/test/test_kickstarter.py
@@ -0,0 +1,41 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+
+from cord_workflow_airflow_extensions.kickstarter import check_web_live
+
+
+class TestKickstarter(unittest.TestCase):
+    """
+    Check if some private functions work.
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def testCheckWebLive(self):
+        live = check_web_live('http://google.com', 1, 3, 3)
+        self.assertTrue(live, 'failed to connect to http://google.com')
+
+        live = check_web_live('http://google.com:1234', 2, 2, 3)
+        self.assertFalse(live, 'should fail but succeeded')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_workflow_ctl.py b/test/test_workflow_ctl.py
new file mode 100644
index 0000000..38090d7
--- /dev/null
+++ b/test/test_workflow_ctl.py
@@ -0,0 +1,43 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+
+from cord_workflow_airflow_extensions.workflow_ctl import register_workflow
+
+
+class TestWorkflowCtl(unittest.TestCase):
+    """
+    Check if some private functions work.
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def testRegisterWorkflow(self):
+        failed = False
+        try:
+            register_workflow(None)
+        except BaseException:
+            failed = True
+
+        self.assertTrue(failed, 'invalid args should fail')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py b/test/workflow_examples/att_dag.py
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py
rename to test/workflow_examples/att_dag.py
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json b/test/workflow_examples/att_dag.py.expected.json
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json
rename to test/workflow_examples/att_dag.py.expected.json
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py b/test/workflow_examples/left_right_mix_dag.py
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py
rename to test/workflow_examples/left_right_mix_dag.py
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json b/test/workflow_examples/left_right_mix_dag.py.expected.json
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json
rename to test/workflow_examples/left_right_mix_dag.py.expected.json
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py b/test/workflow_examples/left_right_mix_dag2.py
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py
rename to test/workflow_examples/left_right_mix_dag2.py
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json b/test/workflow_examples/left_right_mix_dag2.py.expected.json
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json
rename to test/workflow_examples/left_right_mix_dag2.py.expected.json
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py b/test/workflow_examples/multi_children_parents_dag.py
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py
rename to test/workflow_examples/multi_children_parents_dag.py
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json b/test/workflow_examples/multi_children_parents_dag.py.expected.json
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json
rename to test/workflow_examples/multi_children_parents_dag.py.expected.json
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py b/test/workflow_examples/two_dags.py
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py
rename to test/workflow_examples/two_dags.py
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json b/test/workflow_examples/two_dags.py.expected.json
similarity index 100%
rename from lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json
rename to test/workflow_examples/two_dags.py.expected.json
diff --git a/tox.ini b/tox.ini
index 2cf47b7..e99d1aa 100644
--- a/tox.ini
+++ b/tox.ini
@@ -13,23 +13,28 @@
 ; limitations under the License.
 
 [tox]
-envlist = py27,py35,py36,py37
-skip_missing_interpreters = True
+envlist = py27
+;,py35,py36,py37
+skip_missing_interpreters = true
 skipsdist = True
 
 [testenv]
 deps =
   -r requirements.txt
-;  -e lib/cord-workflow-essence-extractor
   nose2
   flake8
-  
+
 commands =
   nose2 -c tox.ini --verbose --junit-xml
-#  flake8
+  flake8
 
 [flake8]
 max-line-length = 119
+exclude =
+  .tox
+  build
+  workflow_examples
+  test/workflow_examples
 
 [unittest]
 plugins = nose2.plugins.junitxml
@@ -38,7 +43,10 @@
 path = nose2-results.xml
 
 [coverage]
-always-on = False
+always-on = True
+coverage =
+  src
+  test
 coverage-report =
   term
   xml
\ No newline at end of file
diff --git a/examples/README.md b/workflow_examples/README.md
similarity index 100%
rename from examples/README.md
rename to workflow_examples/README.md
diff --git a/examples/att-workflow/README.md b/workflow_examples/att-workflow/README.md
similarity index 100%
rename from examples/att-workflow/README.md
rename to workflow_examples/att-workflow/README.md
diff --git a/examples/att-workflow/__init__.py b/workflow_examples/att-workflow/__init__.py
similarity index 100%
rename from examples/att-workflow/__init__.py
rename to workflow_examples/att-workflow/__init__.py
diff --git a/examples/att-workflow/att_dag.py b/workflow_examples/att-workflow/att_dag.py
similarity index 100%
rename from examples/att-workflow/att_dag.py
rename to workflow_examples/att-workflow/att_dag.py
diff --git a/examples/att-workflow/att_helpers.py b/workflow_examples/att-workflow/att_helpers.py
similarity index 100%
rename from examples/att-workflow/att_helpers.py
rename to workflow_examples/att-workflow/att_helpers.py
diff --git a/examples/att-workflow/att_service_instance_funcs.py b/workflow_examples/att-workflow/att_service_instance_funcs.py
similarity index 100%
rename from examples/att-workflow/att_service_instance_funcs.py
rename to workflow_examples/att-workflow/att_service_instance_funcs.py