Add test scripts
Add model-accessor for model-operator
Add more sample workflows
Rework AT&T workflow

Change-Id: I33b5713e221c70bdde5768f1061c06dbbb1dccd6
diff --git a/VERSION b/VERSION
index 79a2734..5d4294b 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.5.0
\ No newline at end of file
+0.5.1
\ No newline at end of file
diff --git a/docker/Dockerfile b/docker/Dockerfile
index c35b212..553b087 100644
--- a/docker/Dockerfile
+++ b/docker/Dockerfile
@@ -111,17 +111,22 @@
 # drop plugin code to plugin dir of airflow
 COPY ./src/cord_workflow_airflow_extensions/cord_workflow_plugin.py ${AIRFLOW_HOME}/plugins/cord_workflow_plugin.py
 
+# make xosapi directory
+RUN mkdir -p /var/run/xosapi \
+    && chmod a+rwx /var/run/xosapi
+
 # drop sample workflow code to dags dir of airflow
-COPY ./workflow_examples/simple-cord-workflow/simple_cord_workflow.py ${AIRFLOW_HOME}/dags/simple_cord_workflow.py
-COPY ./workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json ${HOME}/simple_cord_workflow_essence.json
+COPY ./workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py ${AIRFLOW_HOME}/dags/sequential_cord_workflow.py
+COPY ./workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json ${HOME}/sequential_cord_workflow_essence.json
+COPY ./workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py ${AIRFLOW_HOME}/dags/parallel_cord_workflow.py
+COPY ./workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json ${HOME}/parallel_cord_workflow_essence.json
+COPY ./workflow_examples/att-workflow/att_workflow.py ${AIRFLOW_HOME}/dags/att_workflow.py
+COPY ./workflow_examples/att-workflow/att_workflow_essence.json ${HOME}/att_workflow_essence.json
 COPY ./workflow_examples/simple-airflow-workflow/simple_airflow_workflow.py ${AIRFLOW_HOME}/dags/simple_airflow_workflow.py
 COPY ./workflow_examples/simple-airflow-workflow/simple_airflow_workflow_essence.json ${HOME}/simple_airflow_workflow_essence.json
 
 # copy scripts
-COPY ./workflow_examples/connection_setup.sh ${HOME}/connection_setup.sh
-COPY ./workflow_examples/register_essence.sh ${HOME}/register_essence.sh
-COPY ./workflow_examples/emit_events_model.sh ${HOME}/emit_events_model.sh
-COPY ./workflow_examples/emit_events_onu.sh ${HOME}/emit_events_onu.sh
+COPY ./workflow_examples/scripts ${HOME}/scripts
 
 # copy kickstarter code & workflow ctl code
 COPY ./src/tools/kickstarter.py ${HOME}/kickstarter.py
@@ -131,10 +136,7 @@
 COPY ./src/tools/config.json /etc/cord_workflow_airflow_extensions/config.json
 
 RUN chown -R ${AIRFLOW_USER}:${AIRFLOW_USER} ${HOME} \
-    && chmod 755 ${HOME}/connection_setup.sh \
-    && chmod 755 ${HOME}/register_essence.sh \
-    && chmod 755 ${HOME}/emit_events_model.sh \
-    && chmod 755 ${HOME}/emit_events_onu.sh \
+    && chmod 755 -R ${HOME}/scripts \
     && chmod 755 ${HOME}/kickstarter.py \
     && chmod 755 ${HOME}/workflow_ctl.py
 
diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml
index 3f14ec5..89ddc3d 100644
--- a/docker/docker-compose.yml
+++ b/docker/docker-compose.yml
@@ -27,13 +27,13 @@
             - "5432:5432"
 
     controller:
-        image: opencord/cord-workflow-controller:0.5.0
+        image: opencord/cord-workflow-controller:0.5.2
         ports:
             - "3030:3030"
 
     airflow:
         # image: opencord/cord-workflow-airflow
-        image: cord-workflow-airflow:0.5.0
+        image: cord-workflow-airflow:0.5.1
         restart: always
         depends_on:
             - postgres
diff --git a/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py b/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
index 2532477..3a6c83c 100644
--- a/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
+++ b/src/cord_workflow_airflow_extensions/cord_workflow_plugin.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import time
 from airflow.plugins_manager import AirflowPlugin
 from airflow.hooks.base_hook import BaseHook
 from airflow.operators.python_operator import PythonOperator
@@ -162,16 +163,42 @@
             *args,
             **kwargs)
         self.cord_event_sensor_task_id = cord_event_sensor_task_id
+        self.model_accessor = None
+
+    def create_model_accessor(self):
+        self.log.info("Creating model accessor...")
+        from xossynchronizer.modelaccessor import model_accessor
+        self.model_accessor = model_accessor
+
+    def wait_for_ready(self):
+        self.log.info("Waiting for model accessor to get ready...")
+        models_active = False
+        # wait = False
+        while not models_active:
+            try:
+                # variable is unused
+                _i = self.model_accessor.Site.objects.first()  # noqa: F841
+                models_active = True
+            except Exception as e:
+                self.log.info("Exception", e=e)
+                self.log.info("Waiting for data model to come up before starting...")
+                time.sleep(10)
+                # wait = True
+
+        # if wait:
+        #     # Safety factor, seeing that we stumbled waiting for the data model to come up.
+        #     time.sleep(60)
 
     def execute_callable(self):
-        # TODO
-        model_accessor = None
+        # temporarily comment out this two lines
+        # self.create_model_accessor()
+        # self.wait_for_ready()
 
         message = None
         if self.cord_event_sensor_task_id:
             message = self.op_kwargs['ti'].xcom_pull(task_ids=self.cord_event_sensor_task_id)
 
-        new_op_kwargs = dict(self.op_kwargs, model_accessor=model_accessor, message=message)
+        new_op_kwargs = dict(self.op_kwargs, model_accessor=self.model_accessor, message=message)
         return self.python_callable(*self.op_args, **new_op_kwargs)
 
 
diff --git a/workflow_examples/att-workflow/README.md b/workflow_examples/att-workflow/README.md
index e211d2a..27d2b7d 100644
--- a/workflow_examples/att-workflow/README.md
+++ b/workflow_examples/att-workflow/README.md
@@ -1,5 +1,3 @@
 # AT&T workflow example
 
-This is not a working version. This code is only to show how a new workflow implementation will look like.
-
 Original workfing version of code implemented in Synchronizer is at [att-workflow-driver](https://github.com/opencord/att-workflow-driver).
\ No newline at end of file
diff --git a/workflow_examples/att-workflow/__init__.py b/workflow_examples/att-workflow/__init__.py
deleted file mode 100644
index 1eb71b1..0000000
--- a/workflow_examples/att-workflow/__init__.py
+++ /dev/null
@@ -1,13 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
\ No newline at end of file
diff --git a/workflow_examples/att-workflow/att_dag.py b/workflow_examples/att-workflow/att_dag.py
deleted file mode 100644
index a464176..0000000
--- a/workflow_examples/att-workflow/att_dag.py
+++ /dev/null
@@ -1,237 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Example AT&T workflow using Airflow
-"""
-import json
-import logging
-import airflow
-from datetime import datetime
-from airflow import DAG
-from airflow import AirflowException
-from airflow.operators import PythonOperator
-from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
-from cord_workflow_airflow_extensions.operators import CORDModelOperator
-
-from att_helpers import *
-from att_service_instance_funcs import *
-
-
-args = {
-    'start_date': datetime.utcnow(),
-    'owner': 'ATT',
-}
-
-dag_att = DAG(
-    dag_id='att_workflow_onu',
-    default_args=args,
-    # this dag will be triggered by external systems
-    schedule_interval=None,
-)
-
-dag_att.doc_md = __doc__
-
-
-def ONU_event(model_accessor, message, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info('onu.events: received event', message=message)
-
-    si = find_or_create_att_si(model_accessor, logging, message)
-    if message['status'] == 'activated':
-        logging.info('onu.events: activated onu', message=message)
-        si.no_sync = False
-        si.uni_port_id = long(message['portNumber'])
-        si.of_dpid = message['deviceId']
-        si.oper_onu_status = 'ENABLED'
-        si.save_changed_fields(always_update_timestamp=True)
-    elif message['status'] == 'disabled':
-        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
-        si.oper_onu_status = 'DISABLED'
-        si.save_changed_fields(always_update_timestamp=True)
-    else:
-        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
-        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-
-
-def AUTH_event(model_accessor, message, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info('authentication.events: Got event for subscriber', message=message)
-
-    si = find_or_create_att_si(model_accessor, logging, message)
-    logging.debug('authentication.events: Updating service instance', si=si)
-    si.authentication_state = message['authenticationState']
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, message, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info('dhcp.events: Got event for subscriber', message=message)
-
-    si = find_or_create_att_si(model_accessor, logging, message)
-    logging.debug('dhcp.events: Updating service instance', si=si)
-    si.dhcp_state = message['messageType']
-    si.ip_address = message['ipAddress']
-    si.mac_address = message['macAddress']
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DriverService_event(model_accessor, message, si, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    event_type = message['event_type']
-
-    go = False
-    if event_type == 'create':
-        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
-        go = True
-    elif event_type == 'update':
-        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
-                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
-        go = True
-    elif event_type == 'delete':
-        pass
-    else:
-        pass
-
-    if not go:
-        return
-
-    # handle only create & update events
-
-    # Changing ONU state can change auth state
-    # Changing auth state can change DHCP state
-    # So need to process in this order
-    process_onu_state(model_accessor, si)
-    process_auth_state(si)
-    process_dhcp_state(si)
-
-    validate_states(si)
-
-    # handling the subscriber status
-    # It's a combination of all the other states
-    subscriber = get_subscriber(model_accessor, si.serial_number)
-    if subscriber:
-        update_subscriber(model_accessor, subscriber, si)
-
-    si.save_changed_fields()
-
-
-onu_event_sensor = CORDEventSensor(
-    task_id='onu_event_sensor',
-    topic='onu.events',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_att
-)
-
-onu_event_handler = CORDModelOperator(
-    task_id='onu_event_handler',
-    python_callable=ONU_event,
-    cord_event_sensor_task_id='onu_event_sensor',
-    dag=dag_att
-)
-
-auth_event_sensor = CORDEventSensor(
-    task_id='auth_event_sensor',
-    topic='authentication.events',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_att
-)
-
-auth_event_handler = CORDModelOperator(
-    task_id='auth_event_handler',
-    python_callable=AUTH_event,
-    cord_event_sensor_task_id='auth_event_sensor',
-    dag=dag_att
-)
-
-dhcp_event_sensor = CORDEventSensor(
-    task_id='dhcp_event_sensor',
-    topic='dhcp.events',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_att
-)
-
-dhcp_event_handler = CORDModelOperator(
-    task_id='dhcp_event_handler',
-    python_callable=DHCP_event,
-    cord_event_sensor_task_id='dhcp_event_sensor',
-    dag=dag_att
-)
-
-att_model_event_sensor1 = CORDModelSensor(
-    task_id='att_model_event_sensor1',
-    model_name='AttWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_att
-)
-
-att_model_event_handler1 = CORDModelOperator(
-    task_id='att_model_event_handler1',
-    python_callable=DriverService_event,
-    cord_event_sensor_task_id='att_model_event_sensor1',
-    dag=dag_att
-)
-
-att_model_event_sensor2 = CORDModelSensor(
-    task_id='att_model_event_sensor2',
-    model_name='AttWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_att
-)
-
-att_model_event_handler2 = CORDModelOperator(
-    task_id='att_model_event_handler2',
-    python_callable=DriverService_event,
-    cord_event_sensor_task_id='att_model_event_sensor2',
-    dag=dag_att
-)
-
-att_model_event_sensor3 = CORDModelSensor(
-    task_id='att_model_event_sensor3',
-    model_name='AttWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_att
-)
-
-att_model_event_handler3 = CORDModelOperator(
-    task_id='att_model_event_handler3',
-    python_callable=DriverService_event,
-    cord_event_sensor_task_id='att_model_event_sensor3',
-    dag=dag_att
-)
-
-
-onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
-    auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
-    dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/workflow_examples/att-workflow/att_helpers.py b/workflow_examples/att-workflow/att_helpers.py
deleted file mode 100644
index 2abd2ab..0000000
--- a/workflow_examples/att-workflow/att_helpers.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from xossynchronizer.steps.syncstep import DeferredException
-
-def validate_onu(model_accessor, logging, att_si):
-    """
-    This method validate an ONU against the whitelist and set the appropriate state.
-    It's expected that the deferred exception is managed in the caller method,
-    for example a model_policy or a sync_step.
-
-    :param att_si: AttWorkflowDriverServiceInstance
-    :return: [boolean, string]
-    """
-
-    oss_service = att_si.owner.leaf_model
-
-    # See if there is a matching entry in the whitelist.
-    matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
-        owner_id=oss_service.id,
-    )
-    matching_entries = [e for e in matching_entries if e.serial_number.lower() == att_si.serial_number.lower()]
-
-    if len(matching_entries) == 0:
-        logging.warn("ONU not found in whitelist", object=str(att_si), serial_number=att_si.serial_number, **att_si.tologdict())
-        return [False, "ONU not found in whitelist"]
-
-    whitelisted = matching_entries[0]
-    try:
-        onu = model_accessor.ONUDevice.objects.get(serial_number=att_si.serial_number)
-        pon_port = onu.pon_port
-    except IndexError:
-        raise DeferredException("ONU device %s is not know to XOS yet" % att_si.serial_number)
-
-    if onu.admin_state == "ADMIN_DISABLED":
-        return [False, "ONU has been manually disabled"]
-
-    if pon_port.port_no != whitelisted.pon_port_id or att_si.of_dpid != whitelisted.device_id:
-        logging.warn("ONU disable as location don't match",
-                    object=str(att_si),
-                    serial_number=att_si.serial_number,
-                    pon_port=pon_port.port_no,
-                    whitelisted_pon_port=whitelisted.pon_port_id,
-                    device_id=att_si.of_dpid,
-                    whitelisted_device_id=whitelisted.device_id,
-                    **att_si.tologdict())
-        return [False, "ONU activated in wrong location"]
-
-    return [True, "ONU has been validated"]
-
-def find_or_create_att_si(model_accessor, logging, event):
-    try:
-        att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
-            serial_number=event["serialNumber"]
-        )
-        logging.debug("AttHelpers: Found existing AttWorkflowDriverServiceInstance", si=att_si)
-    except IndexError:
-        # create an AttWorkflowDriverServiceInstance, the validation will be
-        # triggered in the corresponding sync step
-        att_si = model_accessor.AttWorkflowDriverServiceInstance(
-            serial_number=event["serialNumber"],
-            of_dpid=event["deviceId"],
-            uni_port_id=long(event["portNumber"]),
-            # we assume there is only one AttWorkflowDriverService
-            owner=model_accessor.AttWorkflowDriverService.objects.first()
-        )
-        logging.debug("AttHelpers: Created new AttWorkflowDriverServiceInstance", si=att_si)
-    return att_si
\ No newline at end of file
diff --git a/workflow_examples/att-workflow/att_service_instance_funcs.py b/workflow_examples/att-workflow/att_service_instance_funcs.py
deleted file mode 100644
index df179f9..0000000
--- a/workflow_examples/att-workflow/att_service_instance_funcs.py
+++ /dev/null
@@ -1,190 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from att_helpers import *
-
-# Check the whitelist to see if the ONU is valid.  If it is, make sure that it's enabled.
-def process_onu_state(model_accessor, si):
-    [valid, message] = validate_onu(model_accessor, logging, si)
-    si.status_message = message
-    if valid:
-        si.admin_onu_state = "ENABLED"
-        update_onu(model_accessor, si.serial_number, "ENABLED")
-    else:
-        si.admin_onu_state = "DISABLED"
-        update_onu(model_accessor, si.serial_number, "DISABLED")
-
-
-# If the ONU has been disabled then we force re-authentication when it
-# is re-enabled.
-# Setting si.authentication_state = AWAITING:
-#   -> subscriber status = "awaiting_auth"
-#   -> service chain deleted
-#   -> need authentication to restore connectivity after ONU enabled
-def process_auth_state(si):
-    auth_msgs = {
-        "AWAITING": " - Awaiting Authentication",
-        "REQUESTED": " - Authentication requested",
-        "STARTED": " - Authentication started",
-        "APPROVED": " - Authentication succeeded",
-        "DENIED": " - Authentication denied"
-    }
-    if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
-        si.authentication_state = "AWAITING"
-    else:
-        si.status_message += auth_msgs[si.authentication_state]
-
-
-# The DhcpL2Relay ONOS app generates events that update the fields below.
-# It only sends events when it processes DHCP packets.  It keeps no internal state.
-# We reset dhcp_state when:
-# si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
-#   -> subscriber status = "awaiting_auth"
-#   -> service chain not present
-#   -> subscriber's OLT flow rules, xconnect not present
-#   -> DHCP packets won't go through
-# Note, however, that the DHCP state at the endpoints is not changed.
-# A previously issued DHCP lease may still be valid.
-def process_dhcp_state(si):
-    if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
-        si.ip_address = ""
-        si.mac_address = ""
-        si.dhcp_state = "AWAITING"
-
-
-# Make sure the object is in a legitimate state
-# It should be after the above processing steps
-# However this might still fail if an event has fired in the meantime
-# Valid states:
-# ONU       | Auth     | DHCP
-# ===============================
-# AWAITING  | AWAITING | AWAITING
-# ENABLED   | *        | AWAITING
-# ENABLED   | APPROVED | *
-# DISABLED  | AWAITING | AWAITING
-def validate_states(si):
-    if (si.admin_onu_state == "AWAITING" or si.admin_onu_state ==
-            "DISABLED") and si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
-        return
-    if si.admin_onu_state == "ENABLED" and (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
-        return
-    logging.warning(
-        "MODEL_POLICY (validate_states): invalid state combination",
-        onu_state=si.admin_onu_state,
-        auth_state=si.authentication_state,
-        dhcp_state=si.dhcp_state)
-
-
-def update_onu(model_accessor, serial_number, admin_state):
-    onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
-            == serial_number.lower()][0]
-    if onu.admin_state == "ADMIN_DISABLED":
-        logging.debug(
-            "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
-            (serial_number, admin_state))
-        return
-    if onu.admin_state == admin_state:
-        logging.debug(
-            "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
-            (serial_number, admin_state))
-    else:
-        logging.debug("MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" % (serial_number, admin_state))
-        onu.admin_state = admin_state
-        onu.save_changed_fields(always_update_timestamp=True)
-
-
-def get_subscriber(model_accessor, serial_number):
-    try:
-        return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
-                == serial_number.lower()][0]
-    except IndexError:
-        # If the subscriber doesn't exist we don't do anything
-        logging.debug(
-            "MODEL_POLICY: subscriber does not exists for this SI, doing nothing",
-            onu_device=serial_number)
-        return None
-
-
-def update_subscriber_ip(model_accessor, subscriber, ip):
-    # TODO check if the subscriber has an IP and update it,
-    # or create a new one
-    try:
-        ip = model_accessor.RCORDIpAddress.objects.filter(
-            subscriber_id=subscriber.id,
-            ip=ip
-        )[0]
-        logging.debug("MODEL_POLICY: found existing RCORDIpAddress for subscriber",
-                            onu_device=subscriber.onu_device, subscriber_status=subscriber.status, ip=ip)
-        ip.save_changed_fields()
-    except IndexError:
-        logging.debug(
-            "MODEL_POLICY: Creating new RCORDIpAddress for subscriber",
-            onu_device=subscriber.onu_device,
-            subscriber_status=subscriber.status,
-            ip=ip)
-        ip = model_accessor.RCORDIpAddress(
-            subscriber_id=subscriber.id,
-            ip=ip,
-            description="DHCP Assigned IP Address"
-        )
-        ip.save()
-
-
-def delete_subscriber_ip(model_accessor, subscriber, ip):
-    try:
-        ip = model_accessor.RCORDIpAddress.objects.filter(
-            subscriber_id=subscriber.id,
-            ip=ip
-        )[0]
-        logging.debug(
-            "MODEL_POLICY: delete RCORDIpAddress for subscriber",
-            onu_device=subscriber.onu_device,
-            subscriber_status=subscriber.status,
-            ip=ip)
-        ip.delete()
-    except BaseException:
-        logging.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
-
-
-def update_subscriber(model_accessor, subscriber, si):
-    cur_status = subscriber.status
-    # Don't change state if someone has disabled the subscriber
-    if subscriber.status != "disabled":
-        if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
-            subscriber.status = "awaiting-auth"
-        elif si.authentication_state == "APPROVED":
-            subscriber.status = "enabled"
-        elif si.authentication_state == "DENIED":
-            subscriber.status = "auth-failed"
-
-        # NOTE we save the subscriber only if:
-        # - the status has changed
-        # - we get a DHCPACK event
-        if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
-            logging.debug(
-                "MODEL_POLICY: updating subscriber",
-                onu_device=subscriber.onu_device,
-                authentication_state=si.authentication_state,
-                subscriber_status=subscriber.status)
-            if subscriber.status == "awaiting-auth":
-                delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
-                subscriber.mac_address = ""
-            elif si.ip_address and si.mac_address:
-                update_subscriber_ip(model_accessor, subscriber, si.ip_address)
-                subscriber.mac_address = si.mac_address
-            subscriber.save_changed_fields(always_update_timestamp=True)
-        else:
-            logging.debug("MODEL_POLICY: subscriber status has not changed", onu_device=subscriber.onu_device,
-                              authentication_state=si.authentication_state, subscriber_status=subscriber.status)
diff --git a/workflow_examples/att-workflow/att_workflow.py b/workflow_examples/att-workflow/att_workflow.py
new file mode 100644
index 0000000..0ad0ae7
--- /dev/null
+++ b/workflow_examples/att-workflow/att_workflow.py
@@ -0,0 +1,462 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+from datetime import datetime
+from airflow import DAG
+from airflow import AirflowException
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
+from airflow.operators.cord_workflow_plugin import CORDModelOperator
+
+log = logging.getLogger(__name__)
+args = {
+    # hard coded date
+    'start_date': datetime(2019, 1, 1),
+    'owner': 'ATT'
+}
+
+dag_att = DAG(
+    dag_id='att_workflow',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None
+)
+dag_att.doc_md = __doc__
+
+dag_att_admin = DAG(
+    dag_id='att_admin_workflow',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None
+)
+dag_att_admin.doc_md = __doc__
+
+def find_or_create_att_si(model_accessor, event):
+    try:
+        att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
+            serial_number=event["serialNumber"]
+        )
+        log.debug("Found existing AttWorkflowDriverServiceInstance - si = %s" % att_si)
+    except IndexError:
+        # create an AttWorkflowDriverServiceInstance, the validation will be
+        # triggered in the corresponding sync step
+        att_si = model_accessor.AttWorkflowDriverServiceInstance(
+            serial_number=event["serialNumber"],
+            of_dpid=event["deviceId"],
+            uni_port_id=long(event["portNumber"]),
+            # we assume there is only one AttWorkflowDriverService
+            owner=model_accessor.AttWorkflowDriverService.objects.first()
+        )
+        log.debug("Created new AttWorkflowDriverServiceInstance - si = %s" % att_si)
+    return att_si
+
+
+def validate_onu(model_accessor, si):
+    """
+    This method validate an ONU against the whitelist and set the appropriate state.
+    It's expected that the deferred exception is managed in the caller method,
+    for example a model_policy or a sync_step.
+
+    :param si: AttWorkflowDriverServiceInstance
+    :return: [boolean, string]
+    """
+
+    oss_service = si.owner.leaf_model
+
+    # See if there is a matching entry in the whitelist.
+    matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
+        owner_id=oss_service.id,
+    )
+    matching_entries = [e for e in matching_entries if e.serial_number.lower() == si.serial_number.lower()]
+
+    if len(matching_entries) == 0:
+        log.warn("ONU not found in whitelist - serial_number = %s" % si.serial_number)
+        return [False, "ONU not found in whitelist"]
+
+    whitelisted = matching_entries[0]
+    try:
+        onu = model_accessor.ONUDevice.objects.get(serial_number=si.serial_number)
+        pon_port = onu.pon_port
+    except IndexError:
+        raise DeferredException("ONU device %s is not know to XOS yet" % si.serial_number)
+
+    if onu.admin_state == "ADMIN_DISABLED":
+        return [False, "ONU has been manually disabled"]
+
+    if pon_port.port_no != whitelisted.pon_port_id or si.of_dpid != whitelisted.device_id:
+        log.warn("ONU disable as location don't match - serial_number = %s, device_id= %s" % (si.serial_number, si.of_dpid))
+        return [False, "ONU activated in wrong location"]
+
+    return [True, "ONU has been validated"]
+
+
+def update_onu(model_accessor, serial_number, admin_state):
+    onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
+            == serial_number.lower()][0]
+    if onu.admin_state == "ADMIN_DISABLED":
+        log.debug(
+            "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
+            (serial_number, admin_state))
+        return
+    if onu.admin_state == admin_state:
+        log.debug(
+            "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
+            (serial_number, admin_state))
+    else:
+        log.debug(
+            "MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" %
+            (serial_number, admin_state))
+        onu.admin_state = admin_state
+        onu.save_changed_fields(always_update_timestamp=True)
+
+
+def process_onu_state(model_accessor, si):
+    """
+    Check the whitelist to see if the ONU is valid.  If it is, make sure that it's enabled.
+    """
+
+    [valid, message] = validate_onu(model_accessor, si)
+    si.status_message = message
+    if valid:
+        si.admin_onu_state = "ENABLED"
+        update_onu(model_accessor, si.serial_number, "ENABLED")
+    else:
+        si.admin_onu_state = "DISABLED"
+        update_onu(model_accessor, si.serial_number, "DISABLED")
+
+
+def process_auth_state(si):
+    """
+    If the ONU has been disabled then we force re-authentication when it
+    is re-enabled.
+    Setting si.authentication_state = AWAITING:
+        -> subscriber status = "awaiting_auth"
+        -> service chain deleted
+        -> need authentication to restore connectivity after ONU enabled
+    """
+
+    auth_msgs = {
+        "AWAITING": " - Awaiting Authentication",
+        "REQUESTED": " - Authentication requested",
+        "STARTED": " - Authentication started",
+        "APPROVED": " - Authentication succeeded",
+        "DENIED": " - Authentication denied"
+    }
+    if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
+        si.authentication_state = "AWAITING"
+    else:
+        si.status_message += auth_msgs[si.authentication_state]
+
+
+def process_dhcp_state(si):
+    """
+    The DhcpL2Relay ONOS app generates events that update the fields below.
+    It only sends events when it processes DHCP packets.  It keeps no internal state.
+    We reset dhcp_state when:
+    si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
+        -> subscriber status = "awaiting_auth"
+        -> service chain not present
+        -> subscriber's OLT flow rules, xconnect not present
+        -> DHCP packets won't go through
+    Note, however, that the DHCP state at the endpoints is not changed.
+    A previously issued DHCP lease may still be valid.
+    """
+
+    if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+        si.ip_address = ""
+        si.mac_address = ""
+        si.dhcp_state = "AWAITING"
+
+
+def validate_states(si):
+    """
+    Make sure the object is in a legitimate state
+    It should be after the above processing steps
+    However this might still fail if an event has fired in the meantime
+    Valid states:
+    ONU       | Auth     | DHCP
+    ===============================
+    AWAITING  | AWAITING | AWAITING
+    ENABLED   | *        | AWAITING
+    ENABLED   | APPROVED | *
+    DISABLED  | AWAITING | AWAITING
+    """
+
+    if (si.admin_onu_state == "AWAITING" or si.admin_onu_state == "DISABLED") and \
+        si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
+        return
+
+    if si.admin_onu_state == "ENABLED" and \
+        (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
+        return
+
+    log.warning(
+        "validate_states: invalid state combination - onu_state = %s, \
+        auth_state = %s, dhcp_state = %s" %
+        (si.admin_onu_state, si.authentication_state, si.dhcp_state)
+    )
+
+
+def get_subscriber(model_accessor, serial_number):
+    try:
+        return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
+                == serial_number.lower()][0]
+    except IndexError:
+        # If the subscriber doesn't exist we don't do anything
+        log.debug(
+            "subscriber does not exists for this SI, doing nothing - onu_device = %s" %
+            serial_number
+        )
+        return None
+
+
+def update_subscriber_ip(model_accessor, subscriber, ip):
+    # TODO check if the subscriber has an IP and update it,
+    # or create a new one
+    try:
+        ip = model_accessor.RCORDIpAddress.objects.filter(
+            subscriber_id=subscriber.id,
+            ip=ip
+        )[0]
+        log.debug(
+            "found existing RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip
+        )
+        ip.save_changed_fields()
+    except IndexError:
+        log.debug(
+            "Creating new RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip)
+        ip = model_accessor.RCORDIpAddress(
+            subscriber_id=subscriber.id,
+            ip=ip,
+            description="DHCP Assigned IP Address"
+        )
+        ip.save()
+
+
+def delete_subscriber_ip(model_accessor, subscriber, ip):
+    try:
+        ip = model_accessor.RCORDIpAddress.objects.filter(
+            subscriber_id=subscriber.id,
+            ip=ip
+        )[0]
+        log.debug(
+            "MODEL_POLICY: delete RCORDIpAddress for subscriber",
+            onu_device=subscriber.onu_device,
+            subscriber_status=subscriber.status,
+            ip=ip)
+        ip.delete()
+    except BaseException:
+        log.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
+
+
+def update_subscriber(model_accessor, subscriber, si):
+    cur_status = subscriber.status
+    # Don't change state if someone has disabled the subscriber
+    if subscriber.status != "disabled":
+        if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
+            subscriber.status = "awaiting-auth"
+        elif si.authentication_state == "APPROVED":
+            subscriber.status = "enabled"
+        elif si.authentication_state == "DENIED":
+            subscriber.status = "auth-failed"
+
+        # NOTE we save the subscriber only if:
+        # - the status has changed
+        # - we get a DHCPACK event
+        if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
+            log.debug(
+                "updating subscriber",
+                onu_device=subscriber.onu_device,
+                authentication_state=si.authentication_state,
+                subscriber_status=subscriber.status
+            )
+
+            if subscriber.status == "awaiting-auth":
+                delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
+                subscriber.mac_address = ""
+            elif si.ip_address and si.mac_address:
+                update_subscriber_ip(model_accessor, subscriber, si.ip_address)
+                subscriber.mac_address = si.mac_address
+            subscriber.save_changed_fields(always_update_timestamp=True)
+        else:
+            log.debug(
+                "subscriber status has not changed",
+                onu_device=subscriber.onu_device,
+                authentication_state=si.authentication_state,
+                subscriber_status=subscriber.status
+            )
+
+
+def update_model(model_accessor, si):
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def on_onu_event(model_accessor, message, **kwargs):
+    log.info('onu.events: received an event - message = %s' % message)
+
+    si = find_or_create_att_si(model_accessor, message)
+    if message['status'] == 'activated':
+        log.info('onu.events: activated onu')
+        si.no_sync = False
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
+    elif message['status'] == 'disabled':
+        log.info('onu.events: disabled onu, resetting the subscriber')
+        si.oper_onu_status = 'DISABLED'
+    else:
+        log.error('onu.events: Unknown status value: %s' % message['status'])
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'])
+
+    update_model(model_accessor, si)
+
+def on_auth_event(model_accessor, message, **kwargs):
+    log.info('authentication.events: received an event - message = %s' % message)
+
+    si = find_or_create_att_si(model_accessor, message)
+    log.debug('authentication.events: Updating service instance')
+    si.authentication_state = message['authenticationState']
+    update_model(model_accessor, si)
+
+
+def on_dhcp_event(model_accessor, message, **kwargs):
+    log.info('dhcp.events: received an event - message = %s' % message)
+
+    si = find_or_create_att_si(model_accessor, message)
+    log.debug('dhcp.events: Updating service instance')
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    update_model(model_accessor, si)
+
+
+def DriverService_event(model_accessor, message, **kwargs):
+    log.info('model event: received an event - %s' % message)
+
+    # handle only create & update events
+    event_type = message['event_type']
+    if event_type is None or event_type.lower() not in ['create', 'update']:
+        log.error('can not handle an event type - %s' % event_type)
+        return
+
+    si = find_or_create_att_si(model_accessor, message)
+    update_model(model_accessor, si)
+
+
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
+    topic='onu.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
+    python_callable=on_onu_event,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
+)
+
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+auth_event_handler = CORDModelOperator(
+    task_id='auth_event_handler',
+    python_callable=on_auth_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
+)
+
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+dhcp_event_handler = CORDModelOperator(
+    task_id='dhcp_event_handler',
+    python_callable=on_dhcp_event,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
+)
+
+join = DummyOperator(
+    task_id='join',
+    trigger_rule=TriggerRule.ALL_DONE,
+    dag=dag_att
+)
+
+att_model_event_sensor = CORDModelSensor(
+    task_id='att_model_event_sensor',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att_admin
+)
+
+att_model_event_handler = CORDModelOperator(
+    task_id='att_model_event_handler',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor',
+    dag=dag_att_admin
+)
+
+# handle standard flow
+onu_event_sensor >> onu_event_handler >> join
+auth_event_sensor >> auth_event_handler >> join
+dhcp_event_sensor >> dhcp_event_handler >> join
+
+# handle admin flow
+att_model_event_sensor >> att_model_event_handler
+
diff --git a/workflow_examples/att-workflow/att_workflow_essence.json b/workflow_examples/att-workflow/att_workflow_essence.json
new file mode 100644
index 0000000..50a9441
--- /dev/null
+++ b/workflow_examples/att-workflow/att_workflow_essence.json
@@ -0,0 +1,177 @@
+{
+    "att_admin_workflow": {
+        "dag": {
+            "dag_id": "att_admin_workflow",
+            "local_variable": "dag_att_admin"
+        },
+        "dependencies": {
+            "att_model_event_handler": {
+                "parents": [
+                    "att_model_event_sensor"
+                ]
+            },
+            "att_model_event_sensor": {
+                "children": [
+                    "att_model_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "att_model_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor",
+                "dag": "dag_att_admin",
+                "dag_id": "att_admin_workflow",
+                "local_variable": "att_model_event_handler",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler"
+            },
+            "att_model_event_sensor": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att_admin",
+                "dag_id": "att_admin_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor"
+            }
+        }
+    },
+    "att_workflow": {
+        "dag": {
+            "dag_id": "att_workflow",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "join"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "join"
+                ],
+                "parents": [
+                    "dhcp_event_sensor"
+                ]
+            },
+            "dhcp_event_sensor": {
+                "children": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "join": {
+                "parents": [
+                    "onu_event_handler",
+                    "auth_event_handler",
+                    "dhcp_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "join"
+                ],
+                "parents": [
+                    "onu_event_sensor"
+                ]
+            },
+            "onu_event_sensor": {
+                "children": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "local_variable": "auth_event_handler",
+                "python_callable": "on_auth_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
+                "topic": "authentication.events"
+            },
+            "dhcp_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "local_variable": "dhcp_event_handler",
+                "python_callable": "on_dhcp_event",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_sensor",
+                "poke_interval": 5,
+                "task_id": "dhcp_event_sensor",
+                "topic": "dhcp.events"
+            },
+            "join": {
+                "class": "DummyOperator",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "local_variable": "join",
+                "task_id": "join",
+                "trigger_rule": {
+                    "Attribute": {
+                        "attr": "ALL_DONE",
+                        "ctx": "Load",
+                        "value": {
+                            "Name": {
+                                "ctx": "Load",
+                                "id": "TriggerRule"
+                            }
+                        }
+                    }
+                }
+            },
+            "onu_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "local_variable": "onu_event_handler",
+                "python_callable": "on_onu_event",
+                "task_id": "onu_event_handler"
+            },
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_sensor",
+                "poke_interval": 5,
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/workflow_examples/emit_events_model.sh b/workflow_examples/emit_events_model.sh
deleted file mode 100644
index 6adbcbb..0000000
--- a/workflow_examples/emit_events_model.sh
+++ /dev/null
@@ -1,18 +0,0 @@
-#! /bin/bash
-
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-# emit events
-python ./workflow_ctl.py emit "datamodel.cordWorkflowDriverServiceInstance" "{'event_type': 'update', 'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py
new file mode 100644
index 0000000..42e0e2d
--- /dev/null
+++ b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow.py
@@ -0,0 +1,147 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example parallel workflow
+"""
+import json
+import logging
+from datetime import datetime
+from airflow import DAG
+from airflow import AirflowException
+from airflow.operators.python_operator import PythonOperator
+from airflow.operators.dummy_operator import DummyOperator
+from airflow.utils.trigger_rule import TriggerRule
+from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
+from airflow.operators.cord_workflow_plugin import CORDModelOperator
+
+
+log = logging.getLogger(__name__)
+args = {
+    # hard coded date
+    'start_date': datetime(2019, 1, 1),
+    'owner': 'iychoi'
+}
+
+dag_parallel_cord = DAG(
+    dag_id='parallel_cord_workflow',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None
+)
+dag_parallel_cord.doc_md = __doc__
+
+dag_parallel_cord_admin = DAG(
+    dag_id='parallel_cord_workflow_admin',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None
+)
+dag_parallel_cord_admin.doc_md = __doc__
+
+
+def on_onu_event(model_accessor, message, **kwargs):
+    log.info('onu.events: received an event - %s' % message)
+
+
+def on_auth_event(model_accessor, message, **kwargs):
+    log.info('authentication.events: received an event - %s' % message)
+
+
+def on_dhcp_event(model_accessor, message, **kwargs):
+    log.info('dhcp.events: received an event - %s' % message)
+
+
+def on_model_event(model_accessor, message, **kwargs):
+    log.info('model event: received an event - %s' % message)
+
+
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
+    topic='onu.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_parallel_cord
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
+    python_callable=on_onu_event,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_parallel_cord
+)
+
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_parallel_cord
+)
+
+auth_event_handler = CORDModelOperator(
+    task_id='auth_event_handler',
+    python_callable=on_auth_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_parallel_cord
+)
+
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_parallel_cord
+)
+
+dhcp_event_handler = CORDModelOperator(
+    task_id='dhcp_event_handler',
+    python_callable=on_dhcp_event,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_parallel_cord
+)
+
+join = DummyOperator(
+    task_id='join',
+    trigger_rule=TriggerRule.ALL_DONE,
+    dag=dag_parallel_cord
+)
+
+att_model_event_sensor = CORDModelSensor(
+    task_id='att_model_event_sensor',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_parallel_cord_admin
+)
+
+att_model_event_handler = CORDModelOperator(
+    task_id='att_model_event_handler',
+    python_callable=on_model_event,
+    cord_event_sensor_task_id='att_model_event_sensor',
+    dag=dag_parallel_cord_admin
+)
+
+# handle standard flow
+onu_event_sensor >> onu_event_handler >> join
+auth_event_sensor >> auth_event_handler >> join
+dhcp_event_sensor >> dhcp_event_handler >> join
+
+# handle admin flow
+att_model_event_sensor >> att_model_event_handler
+
diff --git a/workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json
new file mode 100644
index 0000000..713426c
--- /dev/null
+++ b/workflow_examples/parallel-cord-workflow/parallel_cord_workflow_essence.json
@@ -0,0 +1,177 @@
+{
+    "parallel_cord_workflow": {
+        "dag": {
+            "dag_id": "parallel_cord_workflow",
+            "local_variable": "dag_parallel_cord"
+        },
+        "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "join"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "join"
+                ],
+                "parents": [
+                    "dhcp_event_sensor"
+                ]
+            },
+            "dhcp_event_sensor": {
+                "children": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "join": {
+                "parents": [
+                    "onu_event_handler",
+                    "auth_event_handler",
+                    "dhcp_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "join"
+                ],
+                "parents": [
+                    "onu_event_sensor"
+                ]
+            },
+            "onu_event_sensor": {
+                "children": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "local_variable": "auth_event_handler",
+                "python_callable": "on_auth_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
+                "topic": "authentication.events"
+            },
+            "dhcp_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "local_variable": "dhcp_event_handler",
+                "python_callable": "on_dhcp_event",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_sensor",
+                "poke_interval": 5,
+                "task_id": "dhcp_event_sensor",
+                "topic": "dhcp.events"
+            },
+            "join": {
+                "class": "DummyOperator",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "local_variable": "join",
+                "task_id": "join",
+                "trigger_rule": {
+                    "Attribute": {
+                        "attr": "ALL_DONE",
+                        "ctx": "Load",
+                        "value": {
+                            "Name": {
+                                "ctx": "Load",
+                                "id": "TriggerRule"
+                            }
+                        }
+                    }
+                }
+            },
+            "onu_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "local_variable": "onu_event_handler",
+                "python_callable": "on_onu_event",
+                "task_id": "onu_event_handler"
+            },
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_parallel_cord",
+                "dag_id": "parallel_cord_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_sensor",
+                "poke_interval": 5,
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
+            }
+        }
+    },
+    "parallel_cord_workflow_admin": {
+        "dag": {
+            "dag_id": "parallel_cord_workflow_admin",
+            "local_variable": "dag_parallel_cord_admin"
+        },
+        "dependencies": {
+            "att_model_event_handler": {
+                "parents": [
+                    "att_model_event_sensor"
+                ]
+            },
+            "att_model_event_sensor": {
+                "children": [
+                    "att_model_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "att_model_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor",
+                "dag": "dag_parallel_cord_admin",
+                "dag_id": "parallel_cord_workflow_admin",
+                "local_variable": "att_model_event_handler",
+                "python_callable": "on_model_event",
+                "task_id": "att_model_event_handler"
+            },
+            "att_model_event_sensor": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_parallel_cord_admin",
+                "dag_id": "parallel_cord_workflow_admin",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor"
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/workflow_examples/connection_setup.sh b/workflow_examples/scripts/connection_setup.sh
similarity index 100%
rename from workflow_examples/connection_setup.sh
rename to workflow_examples/scripts/connection_setup.sh
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_auth.sh
similarity index 83%
copy from workflow_examples/emit_events_onu.sh
copy to workflow_examples/scripts/emit_events_auth.sh
index 03888e7..3edd9de 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_auth.sh
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 # emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "authentication.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_dhcp.sh
similarity index 84%
copy from workflow_examples/emit_events_onu.sh
copy to workflow_examples/scripts/emit_events_dhcp.sh
index 03888e7..f112232 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_dhcp.sh
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 # emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "dhcp.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_model.sh
similarity index 78%
copy from workflow_examples/emit_events_onu.sh
copy to workflow_examples/scripts/emit_events_model.sh
index 03888e7..8c481c2 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_model.sh
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 # emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "datamodel.cordWorkflowDriverServiceInstance" "{'event_type': 'update', 'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/emit_events_onu.sh b/workflow_examples/scripts/emit_events_onu.sh
similarity index 84%
rename from workflow_examples/emit_events_onu.sh
rename to workflow_examples/scripts/emit_events_onu.sh
index 03888e7..a9038d7 100644
--- a/workflow_examples/emit_events_onu.sh
+++ b/workflow_examples/scripts/emit_events_onu.sh
@@ -15,4 +15,4 @@
 # limitations under the License.
 
 # emit events
-python ./workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
+python ../workflow_ctl.py emit "onu.events" "{'serialNumber': 'testSerialXXX', 'other': 'test_other_field'}"
diff --git a/workflow_examples/register_essence.sh b/workflow_examples/scripts/register_essence.sh
similarity index 69%
rename from workflow_examples/register_essence.sh
rename to workflow_examples/scripts/register_essence.sh
index 1d1f4c0..9bd8395 100644
--- a/workflow_examples/register_essence.sh
+++ b/workflow_examples/scripts/register_essence.sh
@@ -15,5 +15,7 @@
 # limitations under the License.
 
 # register essence
-python ./workflow_ctl.py reg ./simple_cord_workflow_essence.json
-python ./workflow_ctl.py reg ./simple_airflow_workflow_essence.json
+python ../workflow_ctl.py reg ../sequential_cord_workflow_essence.json
+python ../workflow_ctl.py reg ../parallel_cord_workflow_essence.json
+python ../workflow_ctl.py reg ../simple_airflow_workflow_essence.json
+python ../workflow_ctl.py reg ../att_workflow_essence.json
diff --git a/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
new file mode 100644
index 0000000..40dfe6e
--- /dev/null
+++ b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow.py
@@ -0,0 +1,158 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example sequential workflow
+"""
+
+
+import logging
+from datetime import datetime
+from airflow import DAG
+from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
+from airflow.operators.cord_workflow_plugin import CORDModelOperator
+
+
+log = logging.getLogger(__name__)
+
+args = {
+    # hard coded date
+    'start_date': datetime(2019, 1, 1),
+    'owner': 'iychoi'
+}
+
+dag_sequential_cord = DAG(
+    dag_id='sequential_cord_workflow',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None
+)
+dag_sequential_cord.doc_md = __doc__
+
+
+def on_onu_event(model_accessor, message, **kwargs):
+    log.info('onu.events: received an event - %s' % message)
+
+
+def on_auth_event(model_accessor, message, **kwargs):
+    log.info('authentication.events: received an event - %s' % message)
+
+
+def on_dhcp_event(model_accessor, message, **kwargs):
+    log.info('dhcp.events: received an event - %s' % message)
+
+
+def on_model_event(model_accessor, message, **kwargs):
+    log.info('model event: received an event - %s' % message)
+
+
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
+    topic='onu.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_sequential_cord
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
+    python_callable=on_onu_event,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_sequential_cord
+)
+
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_sequential_cord
+)
+
+auth_event_handler = CORDModelOperator(
+    task_id='auth_event_handler',
+    python_callable=on_auth_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_sequential_cord
+)
+
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_sequential_cord
+)
+
+dhcp_event_handler = CORDModelOperator(
+    task_id='dhcp_event_handler',
+    python_callable=on_dhcp_event,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_sequential_cord
+)
+
+cord_model_event_sensor1 = CORDModelSensor(
+    task_id='cord_model_event_sensor1',
+    model_name='cordWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_sequential_cord
+)
+
+cord_model_event_handler1 = CORDModelOperator(
+    task_id='cord_model_event_handler1',
+    python_callable=on_model_event,
+    cord_event_sensor_task_id='cord_model_event_sensor1',
+    dag=dag_sequential_cord
+)
+
+cord_model_event_sensor2 = CORDModelSensor(
+    task_id='cord_model_event_sensor2',
+    model_name='cordWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_sequential_cord
+)
+
+cord_model_event_handler2 = CORDModelOperator(
+    task_id='cord_model_event_handler2',
+    python_callable=on_model_event,
+    cord_event_sensor_task_id='cord_model_event_sensor2',
+    dag=dag_sequential_cord
+)
+
+cord_model_event_sensor3 = CORDModelSensor(
+    task_id='cord_model_event_sensor3',
+    model_name='cordWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_sequential_cord
+)
+
+cord_model_event_handler3 = CORDModelOperator(
+    task_id='cord_model_event_handler3',
+    python_callable=on_model_event,
+    cord_event_sensor_task_id='cord_model_event_sensor3',
+    dag=dag_sequential_cord
+)
+
+onu_event_sensor >> onu_event_handler >> cord_model_event_sensor1 >> cord_model_event_handler1 >> \
+    auth_event_sensor >> auth_event_handler >> cord_model_event_sensor2 >> cord_model_event_handler2 >> \
+    dhcp_event_sensor >> dhcp_event_handler >> cord_model_event_sensor3 >> cord_model_event_handler3
diff --git a/workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
similarity index 90%
rename from workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json
rename to workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
index 9d4499b..f8b6ba6 100644
--- a/workflow_examples/simple-cord-workflow/simple_cord_workflow_essence.json
+++ b/workflow_examples/sequential-cord-workflow/sequential_cord_workflow_essence.json
@@ -2,9 +2,25 @@
     "simple_cord_workflow": {
         "dag": {
             "dag_id": "simple_cord_workflow",
-            "local_variable": "dag_cord"
+            "local_variable": "dag_sequential_cord"
         },
         "dependencies": {
+            "auth_event_handler": {
+                "children": [
+                    "cord_model_event_sensor2"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "cord_model_event_handler1"
+                ]
+            },
             "cord_model_event_handler1": {
                 "children": [
                     "auth_event_sensor"
@@ -50,22 +66,6 @@
                     "dhcp_event_handler"
                 ]
             },
-            "auth_event_handler": {
-                "children": [
-                    "cord_model_event_sensor2"
-                ],
-                "parents": [
-                    "auth_event_sensor"
-                ]
-            },
-            "auth_event_sensor": {
-                "children": [
-                    "auth_event_handler"
-                ],
-                "parents": [
-                    "cord_model_event_handler1"
-                ]
-            },
             "dhcp_event_handler": {
                 "children": [
                     "cord_model_event_sensor3"
@@ -97,19 +97,39 @@
             }
         },
         "tasks": {
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_sequential_cord",
+                "dag_id": "simple_cord_workflow",
+                "local_variable": "auth_event_handler",
+                "python_callable": "on_auth_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_sequential_cord",
+                "dag_id": "simple_cord_workflow",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
+                "topic": "authentication.events"
+            },
             "cord_model_event_handler1": {
                 "class": "CORDModelOperator",
                 "cord_event_sensor_task_id": "cord_model_event_sensor1",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "local_variable": "cord_model_event_handler1",
-                "python_callable": "DriverService_event",
+                "python_callable": "on_model_event",
                 "task_id": "cord_model_event_handler1"
             },
             "cord_model_event_handler2": {
                 "class": "CORDModelOperator",
                 "cord_event_sensor_task_id": "cord_model_event_sensor2",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "local_variable": "cord_model_event_handler2",
                 "python_callable": "DriverService_event",
@@ -118,7 +138,7 @@
             "cord_model_event_handler3": {
                 "class": "CORDModelOperator",
                 "cord_event_sensor_task_id": "cord_model_event_sensor3",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "local_variable": "cord_model_event_handler3",
                 "python_callable": "DriverService_event",
@@ -127,7 +147,7 @@
             "cord_model_event_sensor1": {
                 "class": "CORDModelSensor",
                 "controller_conn_id": "local_cord_controller",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "cord_model_event_sensor1",
@@ -138,7 +158,7 @@
             "cord_model_event_sensor2": {
                 "class": "CORDModelSensor",
                 "controller_conn_id": "local_cord_controller",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "cord_model_event_sensor2",
@@ -149,7 +169,7 @@
             "cord_model_event_sensor3": {
                 "class": "CORDModelSensor",
                 "controller_conn_id": "local_cord_controller",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "cord_model_event_sensor3",
@@ -157,39 +177,19 @@
                 "poke_interval": 5,
                 "task_id": "cord_model_event_sensor3"
             },
-            "auth_event_handler": {
-                "class": "CORDModelOperator",
-                "cord_event_sensor_task_id": "auth_event_sensor",
-                "dag": "dag_cord",
-                "dag_id": "simple_cord_workflow",
-                "local_variable": "auth_event_handler",
-                "python_callable": "AUTH_event",
-                "task_id": "auth_event_handler"
-            },
-            "auth_event_sensor": {
-                "class": "CORDEventSensor",
-                "controller_conn_id": "local_cord_controller",
-                "dag": "dag_cord",
-                "dag_id": "simple_cord_workflow",
-                "key_field": "serialNumber",
-                "local_variable": "auth_event_sensor",
-                "poke_interval": 5,
-                "task_id": "auth_event_sensor",
-                "topic": "authentication.events"
-            },
             "dhcp_event_handler": {
                 "class": "CORDModelOperator",
                 "cord_event_sensor_task_id": "dhcp_event_sensor",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "local_variable": "dhcp_event_handler",
-                "python_callable": "DHCP_event",
+                "python_callable": "on_dhcp_event",
                 "task_id": "dhcp_event_handler"
             },
             "dhcp_event_sensor": {
                 "class": "CORDEventSensor",
                 "controller_conn_id": "local_cord_controller",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "dhcp_event_sensor",
@@ -200,16 +200,16 @@
             "onu_event_handler": {
                 "class": "CORDModelOperator",
                 "cord_event_sensor_task_id": "onu_event_sensor",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "local_variable": "onu_event_handler",
-                "python_callable": "ONU_event",
+                "python_callable": "on_onu_event",
                 "task_id": "onu_event_handler"
             },
             "onu_event_sensor": {
                 "class": "CORDEventSensor",
                 "controller_conn_id": "local_cord_controller",
-                "dag": "dag_cord",
+                "dag": "dag_sequential_cord",
                 "dag_id": "simple_cord_workflow",
                 "key_field": "serialNumber",
                 "local_variable": "onu_event_sensor",
diff --git a/workflow_examples/simple-cord-workflow/simple_cord_workflow.py b/workflow_examples/simple-cord-workflow/simple_cord_workflow.py
deleted file mode 100644
index 0eee50d..0000000
--- a/workflow_examples/simple-cord-workflow/simple_cord_workflow.py
+++ /dev/null
@@ -1,235 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Example CORD workflow using Airflow
-"""
-
-
-import logging
-from datetime import datetime
-from airflow import DAG
-from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
-from airflow.operators.cord_workflow_plugin import CORDModelOperator
-
-
-log = logging.getLogger(__name__)
-
-args = {
-    # hard coded date
-    'start_date': datetime(2019, 1, 1),
-    'owner': 'iychoi'
-}
-
-dag_cord = DAG(
-    dag_id='simple_cord_workflow',
-    default_args=args,
-    # this dag will be triggered by external systems
-    schedule_interval=None
-)
-
-dag_cord.doc_md = __doc__
-
-
-def ONU_event(model_accessor, message, **kwargs):
-    log.info('onu.events: received an event - %s' % message)
-
-    """
-    si = find_or_create_cord_si(model_accessor, logging, message)
-    if message['status'] == 'activated':
-        logging.info('onu.events: activated onu', message=message)
-        si.no_sync = False
-        si.uni_port_id = long(message['portNumber'])
-        si.of_dpid = message['deviceId']
-        si.oper_onu_status = 'ENABLED'
-        si.save_changed_fields(always_update_timestamp=True)
-    elif message['status'] == 'disabled':
-        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
-        si.oper_onu_status = 'DISABLED'
-        si.save_changed_fields(always_update_timestamp=True)
-    else:
-        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
-        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-    """
-
-
-def AUTH_event(model_accessor, message, **kwargs):
-    log.info('authentication.events: received an event - %s' % message)
-
-    """
-    si = find_or_create_cord_si(model_accessor, logging, message)
-    logging.debug('authentication.events: Updating service instance', si=si)
-    si.authentication_state = message['authenticationState']
-    si.save_changed_fields(always_update_timestamp=True)
-    """
-
-
-def DHCP_event(model_accessor, message, **kwargs):
-    log.info('dhcp.events: received an event - %s' % message)
-
-    """
-    si = find_or_create_cord_si(model_accessor, logging, message)
-    logging.debug('dhcp.events: Updating service instance', si=si)
-    si.dhcp_state = message['messageType']
-    si.ip_address = message['ipAddress']
-    si.mac_address = message['macAddress']
-    si.save_changed_fields(always_update_timestamp=True)
-    """
-
-
-def DriverService_event(model_accessor, message, **kwargs):
-    log.info('model event: received an event - %s' % message)
-
-    """
-    event_type = message['event_type']
-
-    go = False
-    si = find_or_create_cord_si(model_accessor, logging, message)
-
-    if event_type == 'create':
-        logging.debug('MODEL_POLICY: handle_create for cordWorkflowDriverServiceInstance %s ' % si.id)
-        go = True
-    elif event_type == 'update':
-        logging.debug('MODEL_POLICY: handle_update for cordWorkflowDriverServiceInstance %s ' %
-                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
-        go = True
-    elif event_type == 'delete':
-        pass
-    else:
-        pass
-
-    if not go:
-        return
-
-    # handle only create & update events
-
-    # Changing ONU state can change auth state
-    # Changing auth state can change DHCP state
-    # So need to process in this order
-    process_onu_state(model_accessor, si)
-    process_auth_state(si)
-    process_dhcp_state(si)
-
-    validate_states(si)
-
-    # handling the subscriber status
-    # It's a combination of all the other states
-    subscriber = get_subscriber(model_accessor, si.serial_number)
-    if subscriber:
-        update_subscriber(model_accessor, subscriber, si)
-
-    si.save_changed_fields()
-    """
-
-
-onu_event_sensor = CORDEventSensor(
-    task_id='onu_event_sensor',
-    topic='onu.events',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_cord
-)
-
-onu_event_handler = CORDModelOperator(
-    task_id='onu_event_handler',
-    python_callable=ONU_event,
-    cord_event_sensor_task_id='onu_event_sensor',
-    dag=dag_cord
-)
-
-auth_event_sensor = CORDEventSensor(
-    task_id='auth_event_sensor',
-    topic='authentication.events',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_cord
-)
-
-auth_event_handler = CORDModelOperator(
-    task_id='auth_event_handler',
-    python_callable=AUTH_event,
-    cord_event_sensor_task_id='auth_event_sensor',
-    dag=dag_cord
-)
-
-dhcp_event_sensor = CORDEventSensor(
-    task_id='dhcp_event_sensor',
-    topic='dhcp.events',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_cord
-)
-
-dhcp_event_handler = CORDModelOperator(
-    task_id='dhcp_event_handler',
-    python_callable=DHCP_event,
-    cord_event_sensor_task_id='dhcp_event_sensor',
-    dag=dag_cord
-)
-
-cord_model_event_sensor1 = CORDModelSensor(
-    task_id='cord_model_event_sensor1',
-    model_name='cordWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_cord
-)
-
-cord_model_event_handler1 = CORDModelOperator(
-    task_id='cord_model_event_handler1',
-    python_callable=DriverService_event,
-    cord_event_sensor_task_id='cord_model_event_sensor1',
-    dag=dag_cord
-)
-
-cord_model_event_sensor2 = CORDModelSensor(
-    task_id='cord_model_event_sensor2',
-    model_name='cordWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_cord
-)
-
-cord_model_event_handler2 = CORDModelOperator(
-    task_id='cord_model_event_handler2',
-    python_callable=DriverService_event,
-    cord_event_sensor_task_id='cord_model_event_sensor2',
-    dag=dag_cord
-)
-
-cord_model_event_sensor3 = CORDModelSensor(
-    task_id='cord_model_event_sensor3',
-    model_name='cordWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    controller_conn_id='local_cord_controller',
-    poke_interval=5,
-    dag=dag_cord
-)
-
-cord_model_event_handler3 = CORDModelOperator(
-    task_id='cord_model_event_handler3',
-    python_callable=DriverService_event,
-    cord_event_sensor_task_id='cord_model_event_sensor3',
-    dag=dag_cord
-)
-
-
-onu_event_sensor >> onu_event_handler >> cord_model_event_sensor1 >> cord_model_event_handler1 >> \
-    auth_event_sensor >> auth_event_handler >> cord_model_event_sensor2 >> cord_model_event_handler2 >> \
-    dhcp_event_sensor >> dhcp_event_handler >> cord_model_event_sensor3 >> cord_model_event_handler3