Implement workflow essence extractor
- Extract useful information from airflow workflow code
- Produce "essence" as a json output
- Output will be passed to workflow controller for workflow management

No source code change.

Change-Id: I01de9939fdf699522e81c369676c33c73a38b4bc
diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..7e0d89e
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,12 @@
+.noseids
+.vscode
+build
+dist
+.coverage
+coverage.xml
+cover
+.tox
+.DS_Store
+nose2-results.xml
+venv-service
+*.pyc
\ No newline at end of file
diff --git a/.gitreview b/.gitreview
new file mode 100644
index 0000000..1c0dbdc
--- /dev/null
+++ b/.gitreview
@@ -0,0 +1,5 @@
+[gerrit]
+host=gerrit.opencord.org
+port=29418
+project=cord-workflow-airflow.git
+defaultremote=origin
diff --git a/LICENSE.txt b/LICENSE.txt
new file mode 100644
index 0000000..261eeb9
--- /dev/null
+++ b/LICENSE.txt
@@ -0,0 +1,201 @@
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
diff --git a/Makefile b/Makefile
new file mode 100644
index 0000000..5e0520b
--- /dev/null
+++ b/Makefile
@@ -0,0 +1,58 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# set default shell
+SHELL = bash -e -o pipefail
+
+# Variables
+VERSION                  ?= $(shell cat ./VERSION)
+
+## Testing related
+CORDWORKFLOWAIRFLOW_LIBRARIES            := $(wildcard lib/*)
+
+# Targets
+all: test
+
+# Create a virtualenv and install all the libraries
+venv-workflowengine:
+	virtualenv $@;\
+    source ./$@/bin/activate ; set -u ;\
+    pip install -r requirements.txt nose2 ;\
+    pip install -e lib/cord-workflow-essence-extractor
+
+# tests
+test: lib-test unit-test
+
+lib-test:
+	for lib in $(CORDWORKFLOWAIRFLOW_LIBRARIES); do pushd $$lib; tox; popd; done
+
+unit-test:
+	tox
+
+clean:
+	find . -name '*.pyc' | xargs rm -f
+	find . -name '__pycache__' | xargs rm -rf
+	rm -rf \
+    .coverage \
+    coverage.xml \
+    nose2-results.xml \
+    venv-workflowengine \
+    lib/*/.tox \
+    lib/*/build \
+    lib/*/dist \
+    lib/*/*.egg-info \
+    lib/*/.coverage \
+    lib/*/coverage.xml \
+    lib/*/*results.xml \
+    lib/*/*/VERSION
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..ea09804
--- /dev/null
+++ b/README.md
@@ -0,0 +1,3 @@
+# cord_workflow_airflow
+
+A workflow engine for CORD implemented on top of Airflow.
diff --git a/VERSION b/VERSION
new file mode 100644
index 0000000..b9fd26f
--- /dev/null
+++ b/VERSION
@@ -0,0 +1 @@
+0.1.0-dev1
\ No newline at end of file
diff --git a/examples/README.md b/examples/README.md
new file mode 100644
index 0000000..cd1437d
--- /dev/null
+++ b/examples/README.md
@@ -0,0 +1,3 @@
+# Workflow examples
+
+This directory contains workflow examples.
diff --git a/examples/att-workflow/README.md b/examples/att-workflow/README.md
new file mode 100644
index 0000000..e211d2a
--- /dev/null
+++ b/examples/att-workflow/README.md
@@ -0,0 +1,5 @@
+# AT&T workflow example
+
+This is not a working version. This code is only to show how a new workflow implementation will look like.
+
+Original workfing version of code implemented in Synchronizer is at [att-workflow-driver](https://github.com/opencord/att-workflow-driver).
\ No newline at end of file
diff --git a/examples/att-workflow/__init__.py b/examples/att-workflow/__init__.py
new file mode 100644
index 0000000..1eb71b1
--- /dev/null
+++ b/examples/att-workflow/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/examples/att-workflow/att_dag.py b/examples/att-workflow/att_dag.py
new file mode 100644
index 0000000..7df5d03
--- /dev/null
+++ b/examples/att-workflow/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+    auth_event_handler >> auth_model_event_handler >> \
+    dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/examples/att-workflow/att_helpers.py b/examples/att-workflow/att_helpers.py
new file mode 100644
index 0000000..2abd2ab
--- /dev/null
+++ b/examples/att-workflow/att_helpers.py
@@ -0,0 +1,79 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.steps.syncstep import DeferredException
+
+def validate_onu(model_accessor, logging, att_si):
+    """
+    This method validate an ONU against the whitelist and set the appropriate state.
+    It's expected that the deferred exception is managed in the caller method,
+    for example a model_policy or a sync_step.
+
+    :param att_si: AttWorkflowDriverServiceInstance
+    :return: [boolean, string]
+    """
+
+    oss_service = att_si.owner.leaf_model
+
+    # See if there is a matching entry in the whitelist.
+    matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
+        owner_id=oss_service.id,
+    )
+    matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
+
+    if len(matching_entries) == 0:
+        logging.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
+        return [False, "ONU not found in whitelist"]
+
+    whitelisted = matching_entries[0]
+    try:
+        onu = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number)
+        pon_port = onu.pon_port
+    except IndexError:
+        raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
+
+    if onu.admin_state == "ADMIN_DISABLED":
+        return [False, "ONU has been manually disabled"]
+
+    if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
+        logging.warn("ONU disable as location don't match",
+                    object=str(att_si),
+                    serial_number=att_si.serial_number,
+                    pon_port=pon_port.port_no,
+                    whitelisted_pon_port=whitelisted.pon_port_id,
+                    device_id=att_si.of_dpid,
+                    whitelisted_device_id=whitelisted.device_id,
+                    **att_si.tologdict())
+        return [False, "ONU activated in wrong location"]
+
+    return [True, "ONU has been validated"]
+
+def find_or_create_att_si(model_accessor, logging, event):
+    try:
+        att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
+            serial_number=event["serialNumber"]
+        )
+        logging.debug("AttHelpers: Found existing AttWorkflowDriverServiceInstance", si=att_si)
+    except IndexError:
+        # create an AttWorkflowDriverServiceInstance, the validation will be
+        # triggered in the corresponding sync step
+        att_si = model_accessor.AttWorkflowDriverServiceInstance(
+            serial_number=event["serialNumber"],
+            of_dpid=event["deviceId"],
+            uni_port_id=long(event["portNumber"]),
+            # we assume there is only one AttWorkflowDriverService
+            owner=model_accessor.AttWorkflowDriverService.objects.first()
+        )
+        logging.debug("AttHelpers: Created new AttWorkflowDriverServiceInstance", si=att_si)
+    return att_si
\ No newline at end of file
diff --git a/examples/att-workflow/att_service_instance_funcs.py b/examples/att-workflow/att_service_instance_funcs.py
new file mode 100644
index 0000000..df179f9
--- /dev/null
+++ b/examples/att-workflow/att_service_instance_funcs.py
@@ -0,0 +1,190 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from att_helpers import *
+
+# Check the whitelist to see if the ONU is valid.  If it is, make sure that it's enabled.
+def process_onu_state(model_accessor, si):
+    [valid, message] = validate_onu(model_accessor, logging, si)
+    si.status_message = message
+    if valid:
+        si.admin_onu_state = "ENABLED"
+        update_onu(model_accessor, si.serial_number, "ENABLED")
+    else:
+        si.admin_onu_state = "DISABLED"
+        update_onu(model_accessor, si.serial_number, "DISABLED")
+
+
+# If the ONU has been disabled then we force re-authentication when it
+# is re-enabled.
+# Setting si.authentication_state = AWAITING:
+#   -> subscriber status = "awaiting_auth"
+#   -> service chain deleted
+#   -> need authentication to restore connectivity after ONU enabled
+def process_auth_state(si):
+    auth_msgs = {
+        "AWAITING": " - Awaiting Authentication",
+        "REQUESTED": " - Authentication requested",
+        "STARTED": " - Authentication started",
+        "APPROVED": " - Authentication succeeded",
+        "DENIED": " - Authentication denied"
+    }
+    if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
+        si.authentication_state = "AWAITING"
+    else:
+        si.status_message += auth_msgs[si.authentication_state]
+
+
+# The DhcpL2Relay ONOS app generates events that update the fields below.
+# It only sends events when it processes DHCP packets.  It keeps no internal state.
+# We reset dhcp_state when:
+# si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
+#   -> subscriber status = "awaiting_auth"
+#   -> service chain not present
+#   -> subscriber's OLT flow rules, xconnect not present
+#   -> DHCP packets won't go through
+# Note, however, that the DHCP state at the endpoints is not changed.
+# A previously issued DHCP lease may still be valid.
+def process_dhcp_state(si):
+    if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+        si.ip_address = ""
+        si.mac_address = ""
+        si.dhcp_state = "AWAITING"
+
+
+# Make sure the object is in a legitimate state
+# It should be after the above processing steps
+# However this might still fail if an event has fired in the meantime
+# Valid states:
+# ONU       | Auth     | DHCP
+# ===============================
+# AWAITING  | AWAITING | AWAITING
+# ENABLED   | *        | AWAITING
+# ENABLED   | APPROVED | *
+# DISABLED  | AWAITING | AWAITING
+def validate_states(si):
+    if (si.admin_onu_state == "AWAITING" or si.admin_onu_state ==
+            "DISABLED") and si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
+        return
+    if si.admin_onu_state == "ENABLED" and (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
+        return
+    logging.warning(
+        "MODEL_POLICY (validate_states): invalid state combination",
+        onu_state=si.admin_onu_state,
+        auth_state=si.authentication_state,
+        dhcp_state=si.dhcp_state)
+
+
+def update_onu(model_accessor, serial_number, admin_state):
+    onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+            == serial_number.lower()][0]
+    if onu.admin_state == "ADMIN_DISABLED":
+        logging.debug(
+            "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+            (serial_number, admin_state))
+        return
+    if onu.admin_state == admin_state:
+        logging.debug(
+            "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+            (serial_number, admin_state))
+    else:
+        logging.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
+        onu.admin_state = admin_state
+        onu.save_changed_fields(always_update_timestamp=True)
+
+
+def get_subscriber(model_accessor, serial_number):
+    try:
+        return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+                == serial_number.lower()][0]
+    except IndexError:
+        # If the subscriber doesn't exist we don't do anything
+        logging.debug(
+            "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
+            onu_device=serial_number)
+        return None
+
+
+def update_subscriber_ip(model_accessor, subscriber, ip):
+    # TODO check if the subscriber has an IP and update it,
+    # or create a new one
+    try:
+        ip = model_accessor.RCORDIpAddress.objects.filter(
+            subscriber_id=subscriber.id,
+            ip=ip
+        )[0]
+        logging.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
+                            onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
+        ip.save_changed_fields()
+    except IndexError:
+        logging.debug(
+            "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip)
+        ip = model_accessor.RCORDIpAddress(
+            subscriber_id=subscriber.id,
+            ip=ip,
+            description="DHCP Assigned IP Address"
+        )
+        ip.save()
+
+
+def delete_subscriber_ip(model_accessor, subscriber, ip):
+    try:
+        ip = model_accessor.RCORDIpAddress.objects.filter(
+            subscriber_id=subscriber.id,
+            ip=ip
+        )[0]
+        logging.debug(
+            "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip)
+        ip.delete()
+    except BaseException:
+        logging.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+
+def update_subscriber(model_accessor, subscriber, si):
+    cur_status = subscriber.status
+    # Don't change state if someone has disabled the subscriber
+    if subscriber.status != "disabled":
+        if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+            subscriber.status = "awaiting-auth"
+        elif si.authentication_state == "APPROVED":
+            subscriber.status = "enabled"
+        elif si.authentication_state == "DENIED":
+            subscriber.status = "auth-failed"
+
+        # NOTE we save the subscriber only if:
+        # - the status has changed
+        # - we get a DHCPACK event
+        if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+            logging.debug(
+                "MODEL_POLICY: updating subscriber",
+                onu_device=subscriber.onu_device,
+                authentication_state=si.authentication_state,
+                subscriber_status=subscriber.status)
+            if subscriber.status == "awaiting-auth":
+                delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
+                subscriber.mac_address = ""
+            elif si.ip_address and si.mac_address:
+                update_subscriber_ip(model_accessor, subscriber, si.ip_address)
+                subscriber.mac_address = si.mac_address
+            subscriber.save_changed_fields(always_update_timestamp=True)
+        else:
+            logging.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
+                              authentication_state=si.authentication_state, subscriber_status=subscriber.status)
diff --git a/lib/cord-workflow-essence-extractor/.gitignore b/lib/cord-workflow-essence-extractor/.gitignore
new file mode 100644
index 0000000..f3a7ffa
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/.gitignore
@@ -0,0 +1,11 @@
+.noseids
+build
+cordworkflowessenceextractor.egg-info
+dist
+.coverage
+coverage.xml
+cover
+.DS_Store
+
+# setup.py copies this, don't commit it
+VERSION
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/MANIFEST.in b/lib/cord-workflow-essence-extractor/MANIFEST.in
new file mode 100644
index 0000000..b3a0b9c
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/MANIFEST.in
@@ -0,0 +1,3 @@
+include README.rst
+include requirements.txt
+include cordworkflowessenceextractor/VERSION
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/README.rst b/lib/cord-workflow-essence-extractor/README.rst
new file mode 100644
index 0000000..680d130
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/README.rst
@@ -0,0 +1,11 @@
+CORD Workflow Essence Extractor
+===============================
+
+Extract workflow essence from Airflow workflow code. The essence information
+will be later passed to Workflow Controller for control.
+
+The essence extractor extracts following information from Airflow workflow code written in python:
+- DAG information (DAG ID)
+- Operators (class name, event and other parameters)
+- Task dependencies (parents and children)
+
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py
new file mode 100644
index 0000000..8190914
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/test_parse.py
@@ -0,0 +1,96 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import json
+import cordworkflowessenceextractor.workflow_essence_extractor as extractor
+
+import os
+import collections
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+examples_dir = os.path.join(test_path, "workflow-examples")
+extension_expected_result = ".expected.json"
+
+try:
+    basestring
+except NameError:
+    basestring = str
+
+
+def convert(data):
+    if isinstance(data, basestring):
+        return str(data)
+    elif isinstance(data, collections.Mapping):
+        v = {}
+        for item in data:
+            v[convert(item)] = convert(data[item])
+        return v
+    elif isinstance(data, collections.Iterable):
+        v = []
+        for item in data:
+            v.append(convert(item))
+        return v
+    else:
+        return data
+
+
+class TestParse(unittest.TestCase):
+
+    """
+    Try parse all examples under workflow-examples dir.
+    Then compares results with expected solution.
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def isDagFile(self, filepath):
+        _, file_extension = os.path.splitext(filepath)
+        if file_extension == ".py":
+            return True
+        return False
+
+    def test_parse(self):
+        dags = [f for f in os.listdir(examples_dir) if self.isDagFile(f)]
+
+        for dag in dags:
+            dag_path = os.path.join(examples_dir, dag)
+            tree = extractor.parse_codefile(dag_path)
+            workflow_info = extractor.extract_all(tree)
+
+            # check if its expected solution fil
+            expected_result_file = dag_path + extension_expected_result
+            self.assertTrue(os.path.exists(expected_result_file))
+
+            # compare content
+            with open(dag_path + extension_expected_result) as json_file:
+                # this builds a dict with unicode strings
+                expected_workflow_info_uni = json.load(json_file)
+                expected_workflow_info = convert(expected_workflow_info_uni)
+                if workflow_info != expected_workflow_info:
+                    print("Expected")
+                    print(expected_workflow_info)
+
+                    print("We got")
+                    print(workflow_info)
+                    self.fail("produced result is different")
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py
new file mode 100644
index 0000000..c3bd3ea
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler_task',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler_task',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler_task',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler_task',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler_task',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler_task',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+    auth_event_handler >> auth_model_event_handler >> \
+    dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json
new file mode 100644
index 0000000..109b2a9
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/att_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler_task": {
+                "children": [
+                    "auth_model_event_handler_task"
+                ],
+                "parents": [
+                    "onu_model_event_handler_task"
+                ]
+            },
+            "auth_model_event_handler_task": {
+                "children": [
+                    "dhcp_event_handler_task"
+                ],
+                "parents": [
+                    "auth_event_handler_task"
+                ]
+            },
+            "dhcp_event_handler_task": {
+                "children": [
+                    "dhcp_model_event_handler_task"
+                ],
+                "parents": [
+                    "auth_model_event_handler_task"
+                ]
+            },
+            "dhcp_model_event_handler_task": {
+                "parents": [
+                    "dhcp_event_handler_task"
+                ]
+            },
+            "onu_event_handler_task": {
+                "children": [
+                    "onu_model_event_handler_task"
+                ]
+            },
+            "onu_model_event_handler_task": {
+                "children": [
+                    "auth_event_handler_task"
+                ],
+                "parents": [
+                    "onu_event_handler_task"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler_task": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler_task",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler_task": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler_task"
+            },
+            "dhcp_event_handler_task": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler_task",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler_task": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler_task"
+            },
+            "onu_event_handler_task": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler_task",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler_task": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler_task"
+            }
+        }
+    }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py
new file mode 100644
index 0000000..e734d0d
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py
@@ -0,0 +1,200 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler << onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
+dhcp_event_handler << auth_model_event_handler
+dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json
new file mode 100644
index 0000000..05607ba
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "auth_model_event_handler"
+                ],
+                "parents": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "auth_model_event_handler": {
+                "children": [
+                    "dhcp_event_handler"
+                ],
+                "parents": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "dhcp_model_event_handler"
+                ],
+                "parents": [
+                    "auth_model_event_handler"
+                ]
+            },
+            "dhcp_model_event_handler": {
+                "parents": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            },
+            "dhcp_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler"
+            },
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py
new file mode 100644
index 0000000..6fa1df1
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py
@@ -0,0 +1,196 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler << auth_event_handler
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json
new file mode 100644
index 0000000..7d9430b
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/left_right_mix_dag2.py.expected.json
@@ -0,0 +1,100 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "parents": [
+                    "onu_event_handler",
+                    "auth_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            },
+            "dhcp_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler"
+            },
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py
new file mode 100644
index 0000000..8e59e22
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py
@@ -0,0 +1,197 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> [onu_model_event_handler, auth_model_event_handler, dhcp_model_event_handler] >> \
+    auth_event_handler >> dhcp_event_handler
\ No newline at end of file
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json
new file mode 100644
index 0000000..e0dc284
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/multi_children_parents_dag.py.expected.json
@@ -0,0 +1,130 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "dhcp_event_handler"
+                ],
+                "parents": [
+                    "onu_model_event_handler",
+                    "auth_model_event_handler",
+                    "dhcp_model_event_handler"
+                ]
+            },
+            "auth_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "parents": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler",
+                    "auth_model_event_handler",
+                    "dhcp_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            },
+            "dhcp_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler"
+            },
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py
new file mode 100644
index 0000000..1906881
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py
@@ -0,0 +1,184 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att1 = DAG(
+    dag_id='att_workflow_onu1',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att2 = DAG(
+    dag_id='att_workflow_onu2',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att1.doc_md = __doc__
+dag_att2.doc_md = __doc__
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att1,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att1,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att2,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att2,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
diff --git a/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json
new file mode 100644
index 0000000..dac7c12
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cord-workflow-essence-extractor-tests/workflow-examples/two_dags.py.expected.json
@@ -0,0 +1,90 @@
+{
+    "att_workflow_onu1": {
+        "dag": {
+            "dag_id": "att_workflow_onu1",
+            "local_variable": "dag_att1"
+        },
+        "dependencies": {
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "parents": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att1",
+                "dag_id": "att_workflow_onu1",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att1",
+                "dag_id": "att_workflow_onu1",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    },
+    "att_workflow_onu2": {
+        "dag": {
+            "dag_id": "att_workflow_onu2",
+            "local_variable": "dag_att2"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "auth_model_event_handler"
+                ]
+            },
+            "auth_model_event_handler": {
+                "parents": [
+                    "auth_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att2",
+                "dag_id": "att_workflow_onu2",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att2",
+                "dag_id": "att_workflow_onu2",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py
new file mode 100644
index 0000000..a0a748a
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/__init__.py
@@ -0,0 +1,18 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from .workflow_essence_extractor import parse_codefile, extract_all
+
+__all__ = ["parse_codefile", "extract_all"]
diff --git a/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py
new file mode 100644
index 0000000..bb62493
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/cordworkflowessenceextractor/workflow_essence_extractor.py
@@ -0,0 +1,552 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow Essence Extractor
+
+This module extracts essence of airflow workflows
+Following information will be extracted from workflow code
+- DAG info
+- Operator info
+    - XOS-related operators
+    - Airflow operators
+- Dependency info
+"""
+
+import ast
+import sys
+import json
+import os.path
+
+
+def classname(cls):
+    return cls.__class__.__name__
+
+
+def jsonify_ast(node, level=0):
+    fields = {}
+    for k in node._fields:
+        fields[k] = '...'
+        v = getattr(node, k)
+        if isinstance(v, ast.AST):
+            if v._fields:
+                fields[k] = jsonify_ast(v)
+            else:
+                fields[k] = classname(v)
+
+        elif isinstance(v, list):
+            fields[k] = []
+            for e in v:
+                fields[k].append(jsonify_ast(e))
+
+        elif isinstance(v, str):
+            fields[k] = v
+
+        elif isinstance(v, int) or isinstance(v, float):
+            fields[k] = v
+
+        elif v is None:
+            fields[k] = None
+
+        else:
+            fields[k] = 'unrecognized'
+
+    ret = {
+        classname(node): fields
+    }
+    return ret
+
+
+def parse(code):
+    lines = code.split("\n")
+    if len(lines) == 1:
+        if code.endswith(".py") and os.path.exists(code):
+            return parse_codefile(code)
+    return parse_code(code)
+
+
+def parse_code(code):
+    tree = ast.parse(code)
+    return jsonify_ast(tree)
+
+
+def parse_codefile(code_filepath):
+    code = None
+    with open(code_filepath, "r") as f:
+        code = f.read()
+    tree = ast.parse(code, code_filepath)
+    return jsonify_ast(tree)
+
+
+def pretty_print_json(j):
+    dumps = json.dumps(j, sort_keys=True, indent=4, separators=(',', ': '))
+    print(dumps)
+
+
+def recursively_find_elements(tree, elem):
+    """
+    traverse AST and find elements
+    """
+    for e in tree:
+        obj = None
+        if isinstance(tree, list):
+            obj = e
+        elif isinstance(tree, dict):
+            obj = tree[e]
+
+        if e == elem:
+            yield obj
+
+        if obj and (isinstance(obj, list) or isinstance(obj, dict)):
+            for y in recursively_find_elements(obj, elem):
+                yield y
+
+
+def extract_func_calls(tree, func_name):
+    """
+    extract function calls with assignment
+    """
+    assigns = recursively_find_elements(tree, "Assign")
+    if assigns:
+        for assign in assigns:
+            found = False
+
+            calls = recursively_find_elements(assign, "Call")
+            if calls:
+                for call in calls:
+                    funcs = recursively_find_elements(call, "func")
+                    if funcs:
+                        for func in funcs:
+                            if "Name" in func:
+                                name = func["Name"]
+                                if "ctx" in name and "id" in name:
+                                    # found function
+                                    if name["id"] == func_name:
+                                        found = True
+
+            if found:
+                yield assign
+
+
+def extract_func_calls_airflow_operators(tree):
+    """
+    extract only airflow operators which end with "*Operator" or "*Sensor"
+    """
+    assigns = recursively_find_elements(tree, "Assign")
+    if assigns:
+        for assign in assigns:
+            found = False
+
+            calls = recursively_find_elements(assign, "Call")
+            if calls:
+                for call in calls:
+                    funcs = recursively_find_elements(call, "func")
+                    if funcs:
+                        for func in funcs:
+                            if "Name" in func:
+                                name = func["Name"]
+                                if "ctx" in name and "id" in name:
+                                    # found function
+                                    if name["id"].endswith(("Operator", "Sensor")):
+                                        found = True
+
+            if found:
+                yield assign
+
+
+def extract_bin_op(tree, op_name):
+    """
+    extract binary operation such as >>, <<
+    """
+    ops = recursively_find_elements(tree, "BinOp")
+    if ops:
+        for op in ops:
+            if op["op"] == op_name:
+                yield op
+
+
+def take_string_or_tree(tree):
+    if "Str" in tree:
+        return tree["Str"]["s"]
+    return tree
+
+
+def take_num_or_tree(tree):
+    if "Num" in tree:
+        return tree["Num"]["n"]
+    return tree
+
+
+def take_id_or_tree(tree):
+    if "Name" in tree:
+        return tree["Name"]["id"]
+    return tree
+
+
+def take_name_constant_or_tree(tree):
+    if "NameConstant" in tree:
+        return tree["NameConstant"]["value"]
+    return tree
+
+
+def take_value_or_tree(tree):
+    if "Str" in tree:
+        return tree["Str"]["s"]
+    elif "Num" in tree:
+        return tree["Num"]["n"]
+    elif "Name" in tree:
+        val = tree["Name"]["id"]
+        if val in ["True", "False"]:
+            return bool(val)
+        elif val == "None":
+            return None
+        return val
+    elif "NameConstant" in tree:
+        val = tree["NameConstant"]["value"]
+        if val in ["True", "False"]:
+            return bool(val)
+        elif val == "None":
+            return None
+        return val
+    elif "List" in tree:
+        vals = []
+        if "elts" in tree["List"]:
+            elts = tree["List"]["elts"]
+            for elt in elts:
+                val = take_value_or_tree(elt)
+                vals.append(val)
+        return vals
+    return tree
+
+
+def make_dag(tree):
+    loc_val = None
+    dag_id = None
+
+    if "targets" in tree:
+        targets = tree["targets"]
+        loc_val = take_id_or_tree(targets[0])
+
+    if "value" in tree:
+        value = tree["value"]
+        if "Call" in value:
+            call = value["Call"]
+            if "keywords" in call:
+                keywords = call["keywords"]
+                for keyword in keywords:
+                    if "keyword" in keyword:
+                        k = keyword["keyword"]
+                        if k["arg"] == "dag_id":
+                            dag_id = take_string_or_tree(k["value"])
+
+    return {
+        'local_variable': loc_val,
+        'dag_id': dag_id
+    }
+
+
+def make_airflow_operator(tree):
+    airflow_operator = {}
+
+    if "targets" in tree:
+        targets = tree["targets"]
+        loc_val = take_id_or_tree(targets[0])
+        airflow_operator["local_variable"] = loc_val
+
+    if "value" in tree:
+        value = tree["value"]
+        if "Call" in value:
+            call = value["Call"]
+            if "func" in call:
+                class_name = take_id_or_tree(call["func"])
+                airflow_operator["class"] = class_name
+
+            if "keywords" in call:
+                keywords = call["keywords"]
+                for keyword in keywords:
+                    if "keyword" in keyword:
+                        k = keyword["keyword"]
+                        arg = k["arg"]
+                        airflow_operator[arg] = take_value_or_tree(k["value"])
+
+    return airflow_operator
+
+
+def make_dependencies_bin_op(tree, dependencies):
+    children = []
+    parents = []
+    child = None
+    parent = None
+
+    if tree["op"] == "RShift":
+        child = take_id_or_tree(tree["right"])
+        parent = take_id_or_tree(tree["left"])
+    elif tree["op"] == "LShift":
+        child = take_id_or_tree(tree["left"])
+        parent = take_id_or_tree(tree["right"])
+
+    if child:
+        if isinstance(child, dict):
+            if "List" in child:
+                for c in child["List"]["elts"]:
+                    children.append(take_id_or_tree(c))
+            elif "BinOp" in child:
+                deps = make_dependencies_bin_op(child["BinOp"], dependencies)
+                for dep in deps:
+                    children.append(dep)
+            else:
+                children.append(take_id_or_tree(child))
+        else:
+            children.append(child)
+
+    if parent:
+        if isinstance(parent, dict):
+            if "List" in parent:
+                for p in parent["List"]["elts"]:
+                    parents.append(take_id_or_tree(p))
+            elif "BinOp" in parent:
+                deps = make_dependencies_bin_op(parent["BinOp"], dependencies)
+                for dep in deps:
+                    parents.append(dep)
+            else:
+                parents.append(take_id_or_tree(parent))
+        else:
+            parents.append(parent)
+
+    if len(parents) > 0 and len(children) > 0:
+        # make all-vs-all combinations
+        for p in parents:
+            for c in children:
+                dep = {
+                    'parent': p,
+                    'child': c
+                }
+                dependencies.append(dep)
+
+    if tree["op"] == "RShift":
+        return children
+    elif tree["op"] == "LShift":
+        return parents
+    return children
+
+
+def extract_dep_operations(tree):
+    # extract dependency definition using ">>"
+    ops = extract_bin_op(tree, "RShift")
+    if ops:
+        for op in ops:
+            deps = []
+            make_dependencies_bin_op(op, deps)
+            for dep in deps:
+                yield dep
+
+    # extract dependency definition using "<<"
+    ops = extract_bin_op(tree, "LShift")
+    if ops:
+        for op in ops:
+            deps = []
+            make_dependencies_bin_op(op, deps)
+            for dep in deps:
+                yield dep
+
+
+def extract_dags(tree):
+    dags = {}
+    calls = extract_func_calls(tree, "DAG")
+    if calls:
+        for call in calls:
+            dag = make_dag(call)
+            dagid = dag["dag_id"]
+            dags[dagid] = dag
+    return dags
+
+
+def extract_XOS_event_sensors(tree):
+    operators = {}
+    calls = extract_func_calls(tree, "XOSEventSensor")
+    if calls:
+        for call in calls:
+            operator = make_airflow_operator(call)
+            operatorid = operator["task_id"]
+            operators[operatorid] = operator
+    return operators
+
+
+def extract_XOS_model_sensors(tree):
+    operators = {}
+    calls = extract_func_calls(tree, "XOSModelSensor")
+    if calls:
+        for call in calls:
+            operator = make_airflow_operator(call)
+            operatorid = operator["task_id"]
+            operators[operatorid] = operator
+    return operators
+
+
+def extract_airflow_operators(tree):
+    operators = {}
+    calls = extract_func_calls_airflow_operators(tree)
+    if calls:
+        for call in calls:
+            operator = make_airflow_operator(call)
+            operatorid = operator["task_id"]
+            operators[operatorid] = operator
+    return operators
+
+
+def extract_all_operators(tree):
+    operators = {}
+    event_sensors = extract_XOS_event_sensors(tree)
+    if event_sensors:
+        for event_sensor in event_sensors:
+            operators[event_sensor] = event_sensors[event_sensor]
+
+    model_sensors = extract_XOS_model_sensors(tree)
+    if model_sensors:
+        for model_sensor in model_sensors:
+            operators[model_sensor] = model_sensors[model_sensor]
+
+    airflow_operators = extract_airflow_operators(tree)
+    if airflow_operators:
+        for airflow_operator in airflow_operators:
+            operators[airflow_operator] = airflow_operators[airflow_operator]
+
+    return operators
+
+
+def extract_dependencies(tree):
+    """
+    Build N-N dependencies from fragmented parent-child relations
+    A node can have multiple parents and multiple children
+    """
+    dependencies = {}
+    ops = extract_dep_operations(tree)
+    if ops:
+        for op in ops:
+            p = op["parent"]
+            c = op["child"]
+
+            if p in dependencies:
+                # append to an existing list
+                node_p = dependencies[p]
+                if "children" in node_p:
+                    # prevent duplicates
+                    if c not in node_p["children"]:
+                        node_p["children"].append(c)
+                else:
+                    node_p["children"] = [c]
+            else:
+                # create a new
+                node_p = {
+                    'children': [c]
+                }
+                dependencies[p] = node_p
+
+            if c in dependencies:
+                # append to an existing list
+                node_c = dependencies[c]
+                if "parents" in node_c:
+                    # prevent duplicates
+                    if p not in node_c["parents"]:
+                        node_c["parents"].append(p)
+                else:
+                    node_c["parents"] = [p]
+            else:
+                # create a new
+                node_c = {
+                    'parents': [p]
+                }
+                dependencies[c] = node_c
+
+    return dependencies
+
+
+def extract_all(tree):
+    """
+    Build highlevel information of workflows dag, operators and dependencies refers to each other
+    """
+    dags = extract_dags(tree)
+    operators = extract_all_operators(tree)
+    dependencies = extract_dependencies(tree)
+
+    dag_dict = {}
+    for dag_id in dags:
+        dag = dags[dag_id]
+        dag_var = dag["local_variable"]
+
+        # filter operators that do not belong to the dag
+        my_operators = {}
+        my_operators_var = {}
+        for task_id in operators:
+            operator = operators[task_id]
+            if operator["dag"] == dag_var:
+                # set dag_id
+                operator["dag_id"] = dag_id
+                my_operators[task_id] = operator
+
+                # this is to help fast search while working with dependencies
+                operator_local_var = operator["local_variable"]
+                my_operators_var[operator_local_var] = operator
+
+        # filter dependencies that do not belong to the dag
+        my_dependencies = {}
+        for task_var in dependencies:
+            if task_var in my_operators_var:
+                dependency = dependencies[task_var]
+                task_id = my_operators_var[task_var]["task_id"]
+
+                # convert dependency task_var to task_id
+                dep = {}
+                if "children" in dependency:
+                    dep["children"] = []
+                    for child in dependency["children"]:
+                        if child in my_operators_var:
+                            child_task_id = my_operators_var[child]["task_id"]
+                            dep["children"].append(child_task_id)
+
+                if "parents" in dependency:
+                    dep["parents"] = []
+                    for parent in dependency["parents"]:
+                        if parent in my_operators_var:
+                            parent_task_id = my_operators_var[parent]["task_id"]
+                            dep["parents"].append(parent_task_id)
+
+                my_dependencies[task_id] = dep
+
+        d = {
+            'dag': dag,
+            'tasks': my_operators,
+            'dependencies': my_dependencies
+        }
+        dag_dict[dag_id] = d
+
+    return dag_dict
+
+
+# for command-line execution
+def main(argv):
+    if len(argv) < 1:
+        sys.exit("Error: Need a filepath")
+
+    code_filepath = argv[0]
+
+    tree = parse_codefile(code_filepath)
+    all = extract_all(tree)
+    pretty_print_json(all)
+
+
+if __name__ == "__main__":
+    main(sys.argv[1:])
diff --git a/lib/cord-workflow-essence-extractor/requirements.txt b/lib/cord-workflow-essence-extractor/requirements.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/requirements.txt
diff --git a/lib/cord-workflow-essence-extractor/setup.py b/lib/cord-workflow-essence-extractor/setup.py
new file mode 100644
index 0000000..1e0311d
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/setup.py
@@ -0,0 +1,57 @@
+# Copyright 2018-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+
+import os
+from shutil import copyfile
+
+from setuptools import setup
+
+
+def readme():
+    with open("README.rst") as f:
+        return f.read()
+
+
+def version():
+    # Copy VERSION file of parent to module directory if not found
+    if not os.path.exists("cordworkflowessenceextractor/VERSION"):
+        copyfile("../../VERSION", "cordworkflowessenceextractor/VERSION")
+    with open("cordworkflowessenceextractor/VERSION") as f:
+        return f.read().strip()
+
+
+def parse_requirements(filename):
+    # parse a requirements.txt file, allowing for blank lines and comments
+    requirements = []
+    for line in open(filename):
+        if line and not line.startswith("#"):
+            requirements.append(line)
+    return requirements
+
+
+setup(
+    name="cordworkflowessenceextractor",
+    version=version(),
+    description="Extract workflow essence from airflow workflow code",
+    long_description=readme(),
+    author="Illyoung Choi",
+    author_email="iychoi@opennetworking.org",
+    classifiers=["License :: OSI Approved :: Apache Software License"],
+    license="Apache v2",
+    packages=["cordworkflowessenceextractor"],
+    install_requires=parse_requirements("requirements.txt"),
+    include_package_data=True,
+)
diff --git a/lib/cord-workflow-essence-extractor/tox.ini b/lib/cord-workflow-essence-extractor/tox.ini
new file mode 100644
index 0000000..eb3f3f6
--- /dev/null
+++ b/lib/cord-workflow-essence-extractor/tox.ini
@@ -0,0 +1,50 @@
+; Copyright 2019-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36,py37
+skip_missing_interpreters = true
+skipsdist = True
+
+[testenv]
+deps =
+  -r requirements.txt
+  nose2
+  flake8
+  
+commands =
+  nose2 -c tox.ini --verbose --junit-xml
+  flake8
+
+[flake8]
+max-line-length = 119
+exclude =
+  .tox
+  build
+  cord-workflow-essence-extractor-tests/workflow-examples
+
+[unittest]
+plugins = nose2.plugins.junitxml
+
+[junit-xml]
+path = nose2-results.xml
+
+[coverage]
+always-on = True
+coverage =
+  cordworkflowessenceextractor
+  cord-workflow-essence-extractor-tests
+coverage-report =
+  term
+  xml
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/requirements.txt
diff --git a/tox.ini b/tox.ini
new file mode 100644
index 0000000..2cf47b7
--- /dev/null
+++ b/tox.ini
@@ -0,0 +1,44 @@
+; Copyright 2019-present Open Networking Foundation
+;
+; Licensed under the Apache License, Version 2.0 (the "License");
+; you may not use this file except in compliance with the License.
+; You may obtain a copy of the License at
+;
+; http://www.apache.org/licenses/LICENSE-2.0
+;
+; Unless required by applicable law or agreed to in writing, software
+; distributed under the License is distributed on an "AS IS" BASIS,
+; WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+; See the License for the specific language governing permissions and
+; limitations under the License.
+
+[tox]
+envlist = py27,py35,py36,py37
+skip_missing_interpreters = True
+skipsdist = True
+
+[testenv]
+deps =
+  -r requirements.txt
+;  -e lib/cord-workflow-essence-extractor
+  nose2
+  flake8
+  
+commands =
+  nose2 -c tox.ini --verbose --junit-xml
+#  flake8
+
+[flake8]
+max-line-length = 119
+
+[unittest]
+plugins = nose2.plugins.junitxml
+
+[junit-xml]
+path = nose2-results.xml
+
+[coverage]
+always-on = False
+coverage-report =
+  term
+  xml
\ No newline at end of file