Rework directory structures
Implement kickstarter daemon
Implement a command line interface to manage workflow registration
Refine workflow essence extractor code

Change-Id: I61fd1f497a55af501c579e70a9f6c51f32f5e15c
diff --git a/test/__init__.py b/test/__init__.py
new file mode 100644
index 0000000..19d1424
--- /dev/null
+++ b/test/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
diff --git a/test/test_essence_extractor.py b/test/test_essence_extractor.py
new file mode 100644
index 0000000..8024611
--- /dev/null
+++ b/test/test_essence_extractor.py
@@ -0,0 +1,98 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+import json
+import os
+import collections
+
+from cord_workflow_airflow_extensions.essence_extractor import EssenceExtractor
+
+
+test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
+examples_dir = os.path.join(test_path, "workflow_examples")
+extension_expected_result = ".expected.json"
+
+try:
+    basestring
+except NameError:
+    basestring = str
+
+
+# convert unicode string object to plain string object
+def convert(data):
+    if isinstance(data, basestring):
+        return str(data)
+    elif isinstance(data, collections.Mapping):
+        v = {}
+        for item in data:
+            v[convert(item)] = convert(data[item])
+        return v
+    elif isinstance(data, collections.Iterable):
+        v = []
+        for item in data:
+            v.append(convert(item))
+        return v
+    else:
+        return data
+
+
+class TestEssenceExtractor(unittest.TestCase):
+    """
+    Try extract essence from all examples under workflow-examples dir.
+    Then compares results with expected solution.
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def isDagFile(self, filepath):
+        _, file_extension = os.path.splitext(filepath)
+        if file_extension == ".py":
+            return True
+        return False
+
+    def testExtract(self):
+        dags = [f for f in os.listdir(examples_dir) if self.isDagFile(f)]
+        for dag in dags:
+            dag_path = os.path.join(examples_dir, dag)
+
+            essence_extractor = EssenceExtractor()
+            essence_extractor.parse_codefile(dag_path)
+            workflow_info = essence_extractor.extract()
+
+            # find its solution file
+            expected_result_file = dag_path + extension_expected_result
+            self.assertTrue(os.path.exists(expected_result_file))
+
+            # compare content
+            with open(dag_path + extension_expected_result) as json_file:
+                # this builds a dict with unicode strings
+                expected_workflow_info_uni = json.load(json_file)
+                expected_workflow_info = convert(expected_workflow_info_uni)
+                if workflow_info != expected_workflow_info:
+                    print("Expected")
+                    print(expected_workflow_info)
+
+                    print("We got")
+                    print(workflow_info)
+                    self.fail("produced result is different")
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_kickstarter.py b/test/test_kickstarter.py
new file mode 100644
index 0000000..c121a57
--- /dev/null
+++ b/test/test_kickstarter.py
@@ -0,0 +1,41 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+
+from cord_workflow_airflow_extensions.kickstarter import check_web_live
+
+
+class TestKickstarter(unittest.TestCase):
+    """
+    Check if some private functions work.
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def testCheckWebLive(self):
+        live = check_web_live('http://google.com', 1, 3, 3)
+        self.assertTrue(live, 'failed to connect to http://google.com')
+
+        live = check_web_live('http://google.com:1234', 2, 2, 3)
+        self.assertFalse(live, 'should fail but succeeded')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/test_workflow_ctl.py b/test/test_workflow_ctl.py
new file mode 100644
index 0000000..38090d7
--- /dev/null
+++ b/test/test_workflow_ctl.py
@@ -0,0 +1,43 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from __future__ import absolute_import
+import unittest
+
+from cord_workflow_airflow_extensions.workflow_ctl import register_workflow
+
+
+class TestWorkflowCtl(unittest.TestCase):
+    """
+    Check if some private functions work.
+    """
+
+    def setUp(self):
+        pass
+
+    def tearDown(self):
+        pass
+
+    def testRegisterWorkflow(self):
+        failed = False
+        try:
+            register_workflow(None)
+        except BaseException:
+            failed = True
+
+        self.assertTrue(failed, 'invalid args should fail')
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git a/test/workflow_examples/att_dag.py b/test/workflow_examples/att_dag.py
new file mode 100644
index 0000000..c3bd3ea
--- /dev/null
+++ b/test/workflow_examples/att_dag.py
@@ -0,0 +1,198 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler_task',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler_task',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler_task',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler_task',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler_task',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler_task',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler >> \
+    auth_event_handler >> auth_model_event_handler >> \
+    dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/test/workflow_examples/att_dag.py.expected.json b/test/workflow_examples/att_dag.py.expected.json
new file mode 100644
index 0000000..109b2a9
--- /dev/null
+++ b/test/workflow_examples/att_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler_task": {
+                "children": [
+                    "auth_model_event_handler_task"
+                ],
+                "parents": [
+                    "onu_model_event_handler_task"
+                ]
+            },
+            "auth_model_event_handler_task": {
+                "children": [
+                    "dhcp_event_handler_task"
+                ],
+                "parents": [
+                    "auth_event_handler_task"
+                ]
+            },
+            "dhcp_event_handler_task": {
+                "children": [
+                    "dhcp_model_event_handler_task"
+                ],
+                "parents": [
+                    "auth_model_event_handler_task"
+                ]
+            },
+            "dhcp_model_event_handler_task": {
+                "parents": [
+                    "dhcp_event_handler_task"
+                ]
+            },
+            "onu_event_handler_task": {
+                "children": [
+                    "onu_model_event_handler_task"
+                ]
+            },
+            "onu_model_event_handler_task": {
+                "children": [
+                    "auth_event_handler_task"
+                ],
+                "parents": [
+                    "onu_event_handler_task"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler_task": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler_task",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler_task": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler_task"
+            },
+            "dhcp_event_handler_task": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler_task",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler_task": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler_task"
+            },
+            "onu_event_handler_task": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler_task",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler_task": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler_task"
+            }
+        }
+    }
+}
diff --git a/test/workflow_examples/left_right_mix_dag.py b/test/workflow_examples/left_right_mix_dag.py
new file mode 100644
index 0000000..e734d0d
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag.py
@@ -0,0 +1,200 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler << onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
+dhcp_event_handler << auth_model_event_handler
+dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
diff --git a/test/workflow_examples/left_right_mix_dag.py.expected.json b/test/workflow_examples/left_right_mix_dag.py.expected.json
new file mode 100644
index 0000000..05607ba
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag.py.expected.json
@@ -0,0 +1,126 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "auth_model_event_handler"
+                ],
+                "parents": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "auth_model_event_handler": {
+                "children": [
+                    "dhcp_event_handler"
+                ],
+                "parents": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "dhcp_model_event_handler"
+                ],
+                "parents": [
+                    "auth_model_event_handler"
+                ]
+            },
+            "dhcp_model_event_handler": {
+                "parents": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            },
+            "dhcp_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler"
+            },
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/test/workflow_examples/left_right_mix_dag2.py b/test/workflow_examples/left_right_mix_dag2.py
new file mode 100644
index 0000000..6fa1df1
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag2.py
@@ -0,0 +1,196 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> onu_model_event_handler << auth_event_handler
diff --git a/test/workflow_examples/left_right_mix_dag2.py.expected.json b/test/workflow_examples/left_right_mix_dag2.py.expected.json
new file mode 100644
index 0000000..7d9430b
--- /dev/null
+++ b/test/workflow_examples/left_right_mix_dag2.py.expected.json
@@ -0,0 +1,100 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "parents": [
+                    "onu_event_handler",
+                    "auth_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            },
+            "dhcp_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler"
+            },
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/test/workflow_examples/multi_children_parents_dag.py b/test/workflow_examples/multi_children_parents_dag.py
new file mode 100644
index 0000000..8e59e22
--- /dev/null
+++ b/test/workflow_examples/multi_children_parents_dag.py
@@ -0,0 +1,197 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_event_handler = XOSEventSensor(
+    task_id='dhcp_event_handler',
+    topic="dhcp.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DHCP_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+dhcp_model_event_handler = XOSModelSensor(
+    task_id='dhcp_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att,
+)
+
+onu_event_handler >> [onu_model_event_handler, auth_model_event_handler, dhcp_model_event_handler] >> \
+    auth_event_handler >> dhcp_event_handler
\ No newline at end of file
diff --git a/test/workflow_examples/multi_children_parents_dag.py.expected.json b/test/workflow_examples/multi_children_parents_dag.py.expected.json
new file mode 100644
index 0000000..e0dc284
--- /dev/null
+++ b/test/workflow_examples/multi_children_parents_dag.py.expected.json
@@ -0,0 +1,130 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "dhcp_event_handler"
+                ],
+                "parents": [
+                    "onu_model_event_handler",
+                    "auth_model_event_handler",
+                    "dhcp_model_event_handler"
+                ]
+            },
+            "auth_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "parents": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler",
+                    "auth_model_event_handler",
+                    "dhcp_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            },
+            "dhcp_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler",
+                "topic": "dhcp.events"
+            },
+            "dhcp_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "dhcp_model_event_handler"
+            },
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    }
+}
diff --git a/test/workflow_examples/two_dags.py b/test/workflow_examples/two_dags.py
new file mode 100644
index 0000000..1906881
--- /dev/null
+++ b/test/workflow_examples/two_dags.py
@@ -0,0 +1,184 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging  
+from datetime import datetime
+
+import airflow
+from airflow import DAG
+from airflow import AirflowException
+
+import xossynchronizer.airflow.sensors.XOSModelSensor
+import xossynchronizer.airflow.sensors.XOSEventSensor
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att1 = DAG(
+    dag_id='att_workflow_onu1',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att2 = DAG(
+    dag_id='att_workflow_onu2',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att1.doc_md = __doc__
+dag_att2.doc_md = __doc__
+
+def ONU_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("onu.events: received event", event=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    if event["status"] == "activated":
+        logging.info("onu.events: activated onu", value=event)
+        si.no_sync = False
+        si.uni_port_id = long(event["portNumber"])
+        si.of_dpid = event["deviceId"]
+        si.oper_onu_status = "ENABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    elif event["status"] == "disabled":
+        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
+        si.oper_onu_status = "DISABLED"
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
+        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+
+
+def DriverService_event(event_type, model_accessor, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+    
+    go = False
+    if event_type == 'create':
+        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+def Auth_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("authentication.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("authentication.events: Updating service instance", si=si)
+    si.authentication_state = event["authenticationState"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, event, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info("dhcp.events: Got event for subscriber", event_value=event)
+
+    si = find_or_create_att_si(model_accessor, logging, event)
+    logging.debug("dhcp.events: Updating service instance", si=si)
+    si.dhcp_state = event["messageType"]
+    si.ip_address = event["ipAddress"]
+    si.mac_address = event["macAddress"]
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+onu_event_handler = XOSEventSensor(
+    task_id='onu_event_handler',
+    topic='onu.events',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=ONU_event,
+    poke_interval=5,
+    dag=dag_att1,
+)
+
+onu_model_event_handler = XOSModelSensor(
+    task_id='onu_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att1,
+)
+
+auth_event_handler = XOSEventSensor(
+    task_id='auth_event_handler',
+    topic="authentication.events",
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=Auth_event,
+    poke_interval=5,
+    dag=dag_att2,
+)
+
+auth_model_event_handler = XOSModelSensor(
+    task_id='auth_model_event_handler',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    provide_context=True,
+    python_callable=DriverService_event,
+    poke_interval=5,
+    dag=dag_att2,
+)
+
+onu_event_handler >> onu_model_event_handler
+auth_event_handler >> auth_model_event_handler
diff --git a/test/workflow_examples/two_dags.py.expected.json b/test/workflow_examples/two_dags.py.expected.json
new file mode 100644
index 0000000..dac7c12
--- /dev/null
+++ b/test/workflow_examples/two_dags.py.expected.json
@@ -0,0 +1,90 @@
+{
+    "att_workflow_onu1": {
+        "dag": {
+            "dag_id": "att_workflow_onu1",
+            "local_variable": "dag_att1"
+        },
+        "dependencies": {
+            "onu_event_handler": {
+                "children": [
+                    "onu_model_event_handler"
+                ]
+            },
+            "onu_model_event_handler": {
+                "parents": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "onu_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att1",
+                "dag_id": "att_workflow_onu1",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler",
+                "topic": "onu.events"
+            },
+            "onu_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att1",
+                "dag_id": "att_workflow_onu1",
+                "key_field": "serialNumber",
+                "local_variable": "onu_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "onu_model_event_handler"
+            }
+        }
+    },
+    "att_workflow_onu2": {
+        "dag": {
+            "dag_id": "att_workflow_onu2",
+            "local_variable": "dag_att2"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "auth_model_event_handler"
+                ]
+            },
+            "auth_model_event_handler": {
+                "parents": [
+                    "auth_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "XOSEventSensor",
+                "dag": "dag_att2",
+                "dag_id": "att_workflow_onu2",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_handler",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "Auth_event",
+                "task_id": "auth_event_handler",
+                "topic": "authentication.events"
+            },
+            "auth_model_event_handler": {
+                "class": "XOSModelSensor",
+                "dag": "dag_att2",
+                "dag_id": "att_workflow_onu2",
+                "key_field": "serialNumber",
+                "local_variable": "auth_model_event_handler",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "provide_context": true,
+                "python_callable": "DriverService_event",
+                "task_id": "auth_model_event_handler"
+            }
+        }
+    }
+}