Raise InputError when input parameter is wrong
Separate roles of Airflow sensor and operator in CORD Event handling
Update all workflow examples to reflect the sensor/operator design change

Change-Id: I3a00698c7744e67708f3bacc9cd4023ac4fbc02a

Report task status to controller

Change-Id: I3cc5be25421bcd12bd298b363a5131ef33d0f174
diff --git a/VERSION b/VERSION
index 50e6e9d..341cf11 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.0-dev1
\ No newline at end of file
+0.2.0
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index f35ec37..1b0f05e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,4 +4,4 @@
 multistructlog~=2.1.0
 requests~=2.22.0
 pyfiglet~=0.7
-cord-workflow-controller-client~=0.1.0
+cord-workflow-controller-client~=0.2.0
diff --git a/src/cord_workflow_airflow_extensions/essence_extractor.py b/src/cord_workflow_airflow_extensions/essence_extractor.py
index 901d40f..72c8fdb 100644
--- a/src/cord_workflow_airflow_extensions/essence_extractor.py
+++ b/src/cord_workflow_airflow_extensions/essence_extractor.py
@@ -21,7 +21,7 @@
 Following information will be extracted from workflow code
 - DAG info
 - Operator info
-    - XOS-related operators
+    - CORD-related operators
     - Airflow operators
 - Dependency info
 """
@@ -388,9 +388,9 @@
                 dags[dagid] = dag
         return dags
 
-    def __extract_XOS_event_sensors(self, tree):
+    def __extract_CORD_event_sensors(self, tree):
         operators = {}
-        calls = self.__extract_func_calls(tree, "XOSEventSensor")
+        calls = self.__extract_func_calls(tree, "CORDEventSensor")
         if calls:
             for call in calls:
                 operator = self.__make_airflow_operator(call)
@@ -398,9 +398,19 @@
                 operators[operatorid] = operator
         return operators
 
-    def __extract_XOS_model_sensors(self, tree):
+    def __extract_CORD_model_sensors(self, tree):
         operators = {}
-        calls = self.__extract_func_calls(tree, "XOSModelSensor")
+        calls = self.__extract_func_calls(tree, "CORDModelSensor")
+        if calls:
+            for call in calls:
+                operator = self.__make_airflow_operator(call)
+                operatorid = operator["task_id"]
+                operators[operatorid] = operator
+        return operators
+
+    def __extract_CORD_model_operators(self, tree):
+        operators = {}
+        calls = self.__extract_func_calls(tree, "CORDModelOperator")
         if calls:
             for call in calls:
                 operator = self.__make_airflow_operator(call)
@@ -420,20 +430,27 @@
 
     def __extract_all_operators(self, tree):
         operators = {}
-        event_sensors = self.__extract_XOS_event_sensors(tree)
+        event_sensors = self.__extract_CORD_event_sensors(tree)
         if event_sensors:
-            for event_sensor in event_sensors:
-                operators[event_sensor] = event_sensors[event_sensor]
+            for task_id in event_sensors:
+                operators[task_id] = event_sensors[task_id]
 
-        model_sensors = self.__extract_XOS_model_sensors(tree)
+        model_sensors = self.__extract_CORD_model_sensors(tree)
         if model_sensors:
-            for model_sensor in model_sensors:
-                operators[model_sensor] = model_sensors[model_sensor]
+            for task_id in model_sensors:
+                operators[task_id] = model_sensors[task_id]
+
+        model_operators = self.__extract_CORD_model_operators(tree)
+        if model_operators:
+            for task_id in model_operators:
+                operators[task_id] = model_operators[task_id]
 
         airflow_operators = self.__extract_airflow_operators(tree)
         if airflow_operators:
-            for airflow_operator in airflow_operators:
-                operators[airflow_operator] = airflow_operators[airflow_operator]
+            for task_id in airflow_operators:
+                # add operators that are not already handled above
+                if task_id not in operators:
+                    operators[task_id] = airflow_operators[task_id]
 
         return operators
 
@@ -582,8 +599,6 @@
 
 # for command-line execution
 def main(args):
-    print_graffiti()
-
     # check if config path is set
     config_file_path = DEFAULT_CONFIG_FILE_PATH
     if args.config:
@@ -601,21 +616,25 @@
     log = create_logger(progargs["logging"])
 
     code_filepath = args.input_file
-    if os.path.exists(code_filepath):
-        raise 'cannot find an input file - %s' % code_filepath
-
-    extractor = EssenceExtractor(logger=log)
-    extractor.parse_codefile(code_filepath)
-    essence = extractor.extract()
+    if not os.path.exists(code_filepath):
+        raise IOError('cannot find an input file - %s' % code_filepath)
 
     output_filepath = './essence.json'
     if args.output:
         output_filepath = args.output
 
-    json_string = pretty_format_json(essence)
+    print_console = False
     if args.stdout or output_filepath == '-':
+        print_console = True
+
+    extractor = EssenceExtractor(logger=log)
+    extractor.parse_codefile(code_filepath)
+    essence = extractor.extract()
+    json_string = pretty_format_json(essence)
+    if print_console:
         print(json_string)
     else:
+        print_graffiti()
         with open(output_filepath, 'w') as f:
             f.write(json_string)
 
diff --git a/src/cord_workflow_airflow_extensions/hook.py b/src/cord_workflow_airflow_extensions/hook.py
new file mode 100644
index 0000000..a2ebb4e
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/hook.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Airflow Hook
+"""
+
+from airflow.hooks.base_hook import BaseHook
+from cord_workflow_controller_client.workflow_run import WorkflowRun
+
+
+class CORDWorkflowControllerException(Exception):
+    """
+    Alias for Exception.
+    """
+
+
+class CORDWorkflowControllerHook(BaseHook):
+    """
+    Hook for accessing CORD Workflow Controller
+    """
+
+    def __init__(
+            self,
+            workflow_id,
+            workflow_run_id,
+            controller_conn_id='cord_controller_default'):
+        super().__init__(source=None)
+        self.workflow_id = workflow_id
+        self.workflow_run_id = workflow_run_id
+        self.controller_conn_id = controller_conn_id
+
+        self.workflow_run_client = None
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        if self.workflow_run_client is not None:
+            self.close_conn()
+
+    def get_conn(self):
+        """
+        Connect a Workflow Run client.
+        """
+        if self.workflow_run_client is None:
+            # find connection info from database or environment
+            # ENV: AIRFLOW_CONN_CORD_CONTROLLER_DEFAULT
+            connection_params = self.get_connection(self.controller_conn_id)
+            # connection_params have three fields
+            # host
+            # login - we don't use this yet
+            # password - we don't use this yet
+            try:
+                self.workflow_run_client = WorkflowRun(self.workflow_id, self.workflow_run_id)
+                self.workflow_run_client.connect(connection_params.host)
+            except BaseException as ex:
+                raise CORDWorkflowControllerException(ex)
+
+        return self.workflow_run_client
+
+    def close_conn(self):
+        """
+        Close the Workflow Run client
+        """
+        if self.workflow_run_client:
+            try:
+                self.workflow_run_client.disconnect()
+            except BaseException as ex:
+                raise CORDWorkflowControllerException(ex)
+
+        self.workflow_run_client = None
+
+    def update_status(self, task_id, status):
+        """
+        Update status of the workflow run.
+        'state' should be one of ['begin', 'end']
+        """
+        client = self.get_conn()
+        try:
+            return client.update_status(task_id, status)
+        except BaseException as ex:
+            raise CORDWorkflowControllerException(ex)
+
+    def count_events(self):
+        """
+        Count queued events for the workflow run.
+        """
+        client = self.get_conn()
+        try:
+            return client.count_events()
+        except BaseException as ex:
+            raise CORDWorkflowControllerException(ex)
+
+    def fetch_event(self, task_id, topic):
+        """
+        Fetch an event for the workflow run.
+        """
+        client = self.get_conn()
+        try:
+            return client.fetch_event(task_id, topic)
+        except BaseException as ex:
+            raise CORDWorkflowControllerException(ex)
diff --git a/src/cord_workflow_airflow_extensions/kickstarter.py b/src/cord_workflow_airflow_extensions/kickstarter.py
index 2a0be45..6f9924e 100644
--- a/src/cord_workflow_airflow_extensions/kickstarter.py
+++ b/src/cord_workflow_airflow_extensions/kickstarter.py
@@ -30,9 +30,11 @@
 
 from multistructlog import create_logger
 from cord_workflow_controller_client.manager import Manager
-from airflow.api.client.json_client import Client as AirflowClient
-from requests.auth import HTTPBasicAuth
+from importlib import import_module
 from urlparse import urlparse
+from airflow import configuration as AirflowConf
+from airflow import api
+from airflow.models import DagRun
 
 
 log = create_logger()
@@ -41,9 +43,6 @@
 
 progargs = {
     'controller_url': 'http://localhost:3030',
-    'airflow_url': 'http://localhost:8080',
-    'airflow_username': '',
-    'airflow_password': '',
     'logging': None
 }
 
@@ -62,9 +61,6 @@
     parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
     parser.add_argument('--config', help='locate a configuration file')
     parser.add_argument('--controller', help='CORD Workflow Controller URL')
-    parser.add_argument('--airflow', help='Airflow REST URL')
-    parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
-    parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
     return parser
 
 
@@ -127,12 +123,46 @@
             log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
 
             airflow_client.trigger_dag(dag_id=workflow_id, run_id=workflow_run_id)
+            message = airflow_client.trigger_dag(
+                dag_id=workflow_id,
+                run_id=workflow_run_id
+            )
+            log.info('> Airflow Response: %s' % message)
 
             # let controller know that the new workflow run is created
             log.info('> Notifying a workflow (%s), a workflow run (%s)' % (workflow_id, workflow_run_id))
             manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+        except Exception as e:
+            log.error('> Error : %s' % e)
+            log.debug(traceback.format_exc())
 
-            log.info('> OK')
+
+def on_check_state(workflow_id, workflow_run_id):
+    if manager and airflow_client:
+        try:
+            log.info('> Checking state of a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
+
+            run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
+            state = 'unknown'
+            if run:
+                # run is an array
+                # this should be one of ['success', 'running', 'failed']
+                state = run[0].state
+            else:
+                log.error(
+                    'Cannot retrieve state of a workflow run (%s, %s)' %
+                    (workflow_id, workflow_run_id)
+                )
+                state = 'unknown'
+
+            log.info('> state : %s' % state)
+
+            # let controller know the state of the workflow run
+            log.info(
+                '> Notifying update of state of a workflow (%s), a workflow run (%s) - state : %s' %
+                (workflow_id, workflow_run_id, state)
+            )
+            manager.report_workflow_run_state(workflow_id, workflow_run_id, state)
         except Exception as e:
             log.error('> Error : %s' % e)
             log.debug(traceback.format_exc())
@@ -159,18 +189,9 @@
     global log
     log = create_logger(progargs["logging"])
 
-    if args.airflow:
-        progargs['airflow_url'] = args.airflow
-
     if args.controller:
         progargs['controller_url'] = args.controller
 
-    if args.airflow_user:
-        progargs['airflow_user'] = args.airflow_user
-
-    if args.airflow_passwd:
-        progargs['airflow_passwd'] = args.airflow_passwd
-
     print('=CONFIG=')
     config_json_string = pretty_format_json(progargs)
     print(config_json_string)
@@ -181,13 +202,7 @@
     controller_live = check_web_live(progargs['controller_url'])
     if not controller_live:
         log.error('Controller (%s) appears to be down' % progargs['controller_url'])
-        raise 'Controller (%s) appears to be down' % progargs['controller_url']
-
-    log.info('Checking if Airflow (%s) is live...' % progargs['airflow_url'])
-    airflow_live = check_web_live(progargs['airflow_url'])
-    if not airflow_live:
-        log.error('Airflow (%s) appears to be down' % progargs['airflow_url'])
-        raise 'Airflow (%s) appears to be down' % progargs['airflow_url']
+        raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
 
     # connect to workflow controller
     log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
@@ -198,13 +213,14 @@
 
     # connect to airflow
     global airflow_client
-    log.info('Connecting to Airflow (%s)...' % progargs['airflow_url'])
-    http_auth = None
-    if progargs['airflow_user'] and progargs['airflow_passwd']:
-        log.info('Using a username %s' % progargs['airflow_user'])
-        http_auth = HTTPBasicAuth(progargs['airflow_user'], progargs['airflow_passwd'])
+    log.info('Connecting to Airflow...')
 
-    airflow_client = AirflowClient(progargs['airflow_url'], auth=http_auth)
+    api.load_auth()
+    api_module = import_module(AirflowConf.get('cli', 'api_client'))
+    airflow_client = api_module.Client(
+        api_base_url=AirflowConf.get('cli', 'endpoint_url'),
+        auth=api.api_auth.client_auth
+    )
 
     log.info('Waiting for kickstart events from Workflow Controller...')
     try:
diff --git a/src/cord_workflow_airflow_extensions/operators.py b/src/cord_workflow_airflow_extensions/operators.py
new file mode 100644
index 0000000..ab1ddd9
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/operators.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Airflow Operators
+"""
+
+from airflow.operators import PythonOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class CORDModelOperator(PythonOperator):
+    """
+    Calls a python function with model accessor.
+    """
+
+    # SCARLET
+    # http://bootflat.github.io/color-picker.html
+    ui_color = '#cf3a24'
+
+    @apply_defaults
+    def __init__(
+        self,
+        python_callable,
+        cord_event_sensor_task_id=None,
+        op_args=None,
+        op_kwargs=None,
+        provide_context=True,
+        templates_dict=None,
+        templates_exts=None,
+        *args,
+        **kwargs
+    ):
+        super().__init__(
+            python_callable=python_callable,
+            op_args=op_args,
+            op_kwargs=op_kwargs,
+            provide_context=True,
+            templates_dict=templates_dict,
+            templates_exts=templates_exts,
+            *args,
+            **kwargs)
+        self.cord_event_sensor_task_id = cord_event_sensor_task_id
+
+    def execute_callable(self):
+        # TODO
+        model_accessor = None
+
+        message = None
+        if self.cord_event_sensor_task_id:
+            message = self.op_kwargs['ti'].xcom_pull(task_ids=self.cord_event_sensor_task_id)
+
+        new_op_kwargs = dict(self.op_kwargs, model_accessor=model_accessor, message=message)
+        return self.python_callable(*self.op_args, **new_op_kwargs)
diff --git a/src/cord_workflow_airflow_extensions/sensors.py b/src/cord_workflow_airflow_extensions/sensors.py
new file mode 100644
index 0000000..cba72e3
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/sensors.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Airflow Sensors
+"""
+
+from .hook import CORDWorkflowControllerHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class CORDEventSensor(BaseSensorOperator):
+    # STEEL BLUE
+    # http://bootflat.github.io/color-picker.html
+    ui_color = '#4b77be'
+
+    @apply_defaults
+    def __init__(
+            self,
+            topic,
+            key_field,
+            controller_conn_id='cord_controller_default',
+            *args,
+            **kwargs):
+        super().__init__(*args, **kwargs)
+
+        self.topic = topic
+        self.key_field = key_field
+        self.controller_conn_id = controller_conn_id
+        self.message = None
+        self.hook = None
+
+    def __create_hook(self, context):
+        """
+        Return connection hook.
+        """
+        return CORDWorkflowControllerHook(self.dag_id, context['dag_run'].run_id, self.controller_conn_id)
+
+    def execute(self, context):
+        """
+        Overridden to allow messages to be passed to next tasks via XCOM
+        """
+        if self.hook is None:
+            self.hook = self.__create_hook(context)
+
+        self.hook.update_status(self.task_id, 'begin')
+
+        super().execute(context)
+
+        self.hook.update_status(self.task_id, 'end')
+        self.hook.close_conn()
+        self.hook = None
+        return self.message
+
+    def poke(self, context):
+        # we need to use notification to immediately react at event
+        # https://github.com/apache/airflow/blob/master/airflow/sensors/base_sensor_operator.py#L122
+        self.log.info('Poking : trying to fetch a message with a topic %s', self.topic)
+        event = self.hook.fetch_event(self.task_id, self.topic)
+        if event:
+            self.message = event
+            return True
+        return False
+
+
+class CORDModelSensor(CORDEventSensor):
+    # SISKIN SPROUT YELLOW
+    # http://bootflat.github.io/color-picker.html
+    ui_color = '#7a942e'
+
+    @apply_defaults
+    def __init__(
+            self,
+            model_name,
+            key_field,
+            controller_conn_id='cord_controller_default',
+            *args,
+            **kwargs):
+        topic = 'datamodel.%s' % model_name
+        super().__init__(topic=topic, *args, **kwargs)
diff --git a/src/cord_workflow_airflow_extensions/workflow_ctl.py b/src/cord_workflow_airflow_extensions/workflow_ctl.py
index e32215b..563d966 100644
--- a/src/cord_workflow_airflow_extensions/workflow_ctl.py
+++ b/src/cord_workflow_airflow_extensions/workflow_ctl.py
@@ -31,22 +31,27 @@
 log = create_logger()
 progargs = {
     'controller_url': 'http://localhost:3030',
-    'airflow_url': 'http://localhost:8080',
-    'airflow_username': '',
-    'airflow_password': '',
     'logging': None
 }
 
 DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
 
 
+class InputError(Exception):
+    """Exception raised for errors in the input.
+
+    Attributes:
+        message -- explanation of the error
+    """
+
+    def __init__(self, message):
+        self.message = message
+
+
 def get_arg_parser():
     parser = argparse.ArgumentParser(description='CORD Workflow Control CLI.', prog='workflow_ctl')
     parser.add_argument('--config', help='locate a configuration file')
     parser.add_argument('--controller', help='CORD Workflow Controller URL')
-    parser.add_argument('--airflow', help='Airflow REST URL')
-    parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
-    parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
     parser.add_argument('cmd', help='Command')
     parser.add_argument('cmd_args', help='Arguments for the command', nargs='*')
     return parser
@@ -70,7 +75,7 @@
 def register_workflow(args):
     # expect args should be a list of essence files
     if not args:
-        raise 'no essence file is given'
+        raise InputError('no essence file is given')
 
     log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
     manager = Manager(logger=log)
@@ -121,25 +126,16 @@
     global log
     log = create_logger(progargs["logging"])
 
-    if args.airflow:
-        progargs['airflow_url'] = args.airflow
-
     if args.controller:
         progargs['controller_url'] = args.controller
 
-    if args.airflow_user:
-        progargs['airflow_user'] = args.airflow_user
-
-    if args.airflow_passwd:
-        progargs['airflow_passwd'] = args.airflow_passwd
-
     if args.cmd:
         if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
             results = register_workflow(args.cmd_args)
             print(results)
         else:
             log.error('unknown command %s' % args.cmd)
-            raise 'unknown command %s' % args.cmd
+            raise InputError('unknown command %s' % args.cmd)
 
 
 if __name__ == "__main__":
diff --git a/test/workflow_examples/att_dag.py b/test/workflow_examples/att_dag.py
index c3bd3ea..a464176 100644
--- a/test/workflow_examples/att_dag.py
+++ b/test/workflow_examples/att_dag.py
@@ -16,19 +16,19 @@
 Example AT&T workflow using Airflow
 """
 import json
-import logging  
-from datetime import datetime
-
+import logging
 import airflow
+from datetime import datetime
 from airflow import DAG
 from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
 
 from att_helpers import *
 from att_service_instance_funcs import *
 
+
 args = {
     'start_date': datetime.utcnow(),
     'owner': 'ATT',
@@ -44,39 +44,67 @@
 dag_att.doc_md = __doc__
 
 
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
 
-    logging.info("onu.events: received event", event=event)
+    logging.info('onu.events: received event', message=message)
 
-    si = find_or_create_att_si(model_accessor, logging, event)
-    if event["status"] == "activated":
-        logging.info("onu.events: activated onu", value=event)
+    si = find_or_create_att_si(model_accessor, logging, message)
+    if message['status'] == 'activated':
+        logging.info('onu.events: activated onu', message=message)
         si.no_sync = False
-        si.uni_port_id = long(event["portNumber"])
-        si.of_dpid = event["deviceId"]
-        si.oper_onu_status = "ENABLED"
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
         si.save_changed_fields(always_update_timestamp=True)
-    elif event["status"] == "disabled":
-        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
-        si.oper_onu_status = "DISABLED"
+    elif message['status'] == 'disabled':
+        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+        si.oper_onu_status = 'DISABLED'
         si.save_changed_fields(always_update_timestamp=True)
     else:
-        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
-        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
 
 
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
-    
+
+    logging.info('authentication.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('authentication.events: Updating service instance', si=si)
+    si.authentication_state = message['authenticationState']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('dhcp.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('dhcp.events: Updating service instance', si=si)
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    event_type = message['event_type']
+
     go = False
     if event_type == 'create':
-        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
         go = True
     elif event_type == 'update':
-        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
                           (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
         go = True
     elif event_type == 'delete':
@@ -107,92 +135,103 @@
     si.save_changed_fields()
 
 
-def Auth_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("authentication.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("authentication.events: Updating service instance", si=si)
-    si.authentication_state = event["authenticationState"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("dhcp.events: Updating service instance", si=si)
-    si.dhcp_state = event["messageType"]
-    si.ip_address = event["ipAddress"]
-    si.mac_address = event["macAddress"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
-    task_id='onu_event_handler_task',
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
     topic='onu.events',
     key_field='serialNumber',
-    provide_context=True,
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
     python_callable=ONU_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
 )
 
-onu_model_event_handler = XOSModelSensor(
-    task_id='onu_model_event_handler_task',
-    model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-auth_event_handler = XOSEventSensor(
-    task_id='auth_event_handler_task',
-    topic="authentication.events",
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=Auth_event,
-    poke_interval=5,
-    dag=dag_att,
+auth_event_handler = CORDModelOperator(
+    task_id='auth_event_handler',
+    python_callable=AUTH_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
 )
 
-auth_model_event_handler = XOSModelSensor(
-    task_id='auth_model_event_handler_task',
-    model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-dhcp_event_handler = XOSEventSensor(
-    task_id='dhcp_event_handler_task',
-    topic="dhcp.events",
-    key_field='serialNumber',
-    provide_context=True,
+dhcp_event_handler = CORDModelOperator(
+    task_id='dhcp_event_handler',
     python_callable=DHCP_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
 )
 
-dhcp_model_event_handler = XOSModelSensor(
-    task_id='dhcp_model_event_handler_task',
+att_model_event_sensor1 = CORDModelSensor(
+    task_id='att_model_event_sensor1',
     model_name='AttWorkflowDriverServiceInstance',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-onu_event_handler >> onu_model_event_handler >> \
-    auth_event_handler >> auth_model_event_handler >> \
-    dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+    task_id='att_model_event_handler1',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor1',
+    dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+    task_id='att_model_event_sensor2',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+    task_id='att_model_event_handler2',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor2',
+    dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+    task_id='att_model_event_sensor3',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+    task_id='att_model_event_handler3',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor3',
+    dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
+    auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
+    dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/test/workflow_examples/att_dag.py.expected.json b/test/workflow_examples/att_dag.py.expected.json
index 109b2a9..6d4c367 100644
--- a/test/workflow_examples/att_dag.py.expected.json
+++ b/test/workflow_examples/att_dag.py.expected.json
@@ -5,122 +5,218 @@
             "local_variable": "dag_att"
         },
         "dependencies": {
-            "auth_event_handler_task": {
+            "att_model_event_handler1": {
                 "children": [
-                    "auth_model_event_handler_task"
+                    "auth_event_sensor"
                 ],
                 "parents": [
-                    "onu_model_event_handler_task"
+                    "att_model_event_sensor1"
                 ]
             },
-            "auth_model_event_handler_task": {
+            "att_model_event_handler2": {
                 "children": [
-                    "dhcp_event_handler_task"
+                    "dhcp_event_sensor"
                 ],
                 "parents": [
-                    "auth_event_handler_task"
+                    "att_model_event_sensor2"
                 ]
             },
-            "dhcp_event_handler_task": {
+            "att_model_event_handler3": {
+                "parents": [
+                    "att_model_event_sensor3"
+                ]
+            },
+            "att_model_event_sensor1": {
                 "children": [
-                    "dhcp_model_event_handler_task"
+                    "att_model_event_handler1"
                 ],
                 "parents": [
-                    "auth_model_event_handler_task"
+                    "onu_event_handler"
                 ]
             },
-            "dhcp_model_event_handler_task": {
-                "parents": [
-                    "dhcp_event_handler_task"
-                ]
-            },
-            "onu_event_handler_task": {
+            "att_model_event_sensor2": {
                 "children": [
-                    "onu_model_event_handler_task"
-                ]
-            },
-            "onu_model_event_handler_task": {
-                "children": [
-                    "auth_event_handler_task"
+                    "att_model_event_handler2"
                 ],
                 "parents": [
-                    "onu_event_handler_task"
+                    "auth_event_handler"
+                ]
+            },
+            "att_model_event_sensor3": {
+                "children": [
+                    "att_model_event_handler3"
+                ],
+                "parents": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "auth_event_handler": {
+                "children": [
+                    "att_model_event_sensor2"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "att_model_event_handler1"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "att_model_event_sensor3"
+                ],
+                "parents": [
+                    "dhcp_event_sensor"
+                ]
+            },
+            "dhcp_event_sensor": {
+                "children": [
+                    "dhcp_event_handler"
+                ],
+                "parents": [
+                    "att_model_event_handler2"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "att_model_event_sensor1"
+                ],
+                "parents": [
+                    "onu_event_sensor"
+                ]
+            },
+            "onu_event_sensor": {
+                "children": [
+                    "onu_event_handler"
                 ]
             }
         },
         "tasks": {
-            "auth_event_handler_task": {
-                "class": "XOSEventSensor",
+            "att_model_event_handler1": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor1",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler1",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler1"
+            },
+            "att_model_event_handler2": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor2",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler2",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler2"
+            },
+            "att_model_event_handler3": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor3",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler3",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler3"
+            },
+            "att_model_event_sensor1": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "auth_event_handler",
+                "local_variable": "att_model_event_sensor1",
+                "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "Auth_event",
-                "task_id": "auth_event_handler_task",
+                "task_id": "att_model_event_sensor1"
+            },
+            "att_model_event_sensor2": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor2",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor2"
+            },
+            "att_model_event_sensor3": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor3",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor3"
+            },
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "auth_event_handler",
+                "python_callable": "AUTH_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
                 "topic": "authentication.events"
             },
-            "auth_model_event_handler_task": {
-                "class": "XOSModelSensor",
+            "dhcp_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "auth_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "auth_model_event_handler_task"
-            },
-            "dhcp_event_handler_task": {
-                "class": "XOSEventSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
                 "local_variable": "dhcp_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
                 "python_callable": "DHCP_event",
-                "task_id": "dhcp_event_handler_task",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_sensor",
+                "poke_interval": 5,
+                "task_id": "dhcp_event_sensor",
                 "topic": "dhcp.events"
             },
-            "dhcp_model_event_handler_task": {
-                "class": "XOSModelSensor",
+            "onu_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "dhcp_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "dhcp_model_event_handler_task"
-            },
-            "onu_event_handler_task": {
-                "class": "XOSEventSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
                 "python_callable": "ONU_event",
-                "task_id": "onu_event_handler_task",
-                "topic": "onu.events"
+                "task_id": "onu_event_handler"
             },
-            "onu_model_event_handler_task": {
-                "class": "XOSModelSensor",
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "onu_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
+                "local_variable": "onu_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "onu_model_event_handler_task"
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/left_right_mix_dag.py b/test/workflow_examples/left_right_mix_dag.py
index e734d0d..32363b5 100644
--- a/test/workflow_examples/left_right_mix_dag.py
+++ b/test/workflow_examples/left_right_mix_dag.py
@@ -16,19 +16,19 @@
 Example AT&T workflow using Airflow
 """
 import json
-import logging  
-from datetime import datetime
-
+import logging
 import airflow
+from datetime import datetime
 from airflow import DAG
 from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
 
 from att_helpers import *
 from att_service_instance_funcs import *
 
+
 args = {
     'start_date': datetime.utcnow(),
     'owner': 'ATT',
@@ -44,39 +44,67 @@
 dag_att.doc_md = __doc__
 
 
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
 
-    logging.info("onu.events: received event", event=event)
+    logging.info('onu.events: received event', message=message)
 
-    si = find_or_create_att_si(model_accessor, logging, event)
-    if event["status"] == "activated":
-        logging.info("onu.events: activated onu", value=event)
+    si = find_or_create_att_si(model_accessor, logging, message)
+    if message['status'] == 'activated':
+        logging.info('onu.events: activated onu', message=message)
         si.no_sync = False
-        si.uni_port_id = long(event["portNumber"])
-        si.of_dpid = event["deviceId"]
-        si.oper_onu_status = "ENABLED"
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
         si.save_changed_fields(always_update_timestamp=True)
-    elif event["status"] == "disabled":
-        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
-        si.oper_onu_status = "DISABLED"
+    elif message['status'] == 'disabled':
+        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+        si.oper_onu_status = 'DISABLED'
         si.save_changed_fields(always_update_timestamp=True)
     else:
-        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
-        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
 
 
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
-    
+
+    logging.info('authentication.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('authentication.events: Updating service instance', si=si)
+    si.authentication_state = message['authenticationState']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('dhcp.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('dhcp.events: Updating service instance', si=si)
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    event_type = message['event_type']
+
     go = False
     if event_type == 'create':
-        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
         go = True
     elif event_type == 'update':
-        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
                           (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
         go = True
     elif event_type == 'delete':
@@ -107,94 +135,107 @@
     si.save_changed_fields()
 
 
-def Auth_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("authentication.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("authentication.events: Updating service instance", si=si)
-    si.authentication_state = event["authenticationState"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("dhcp.events: Updating service instance", si=si)
-    si.dhcp_state = event["messageType"]
-    si.ip_address = event["ipAddress"]
-    si.mac_address = event["macAddress"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
-    task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
     topic='onu.events',
     key_field='serialNumber',
-    provide_context=True,
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
     python_callable=ONU_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
 )
 
-onu_model_event_handler = XOSModelSensor(
-    task_id='onu_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
     task_id='auth_event_handler',
-    topic="authentication.events",
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=Auth_event,
-    poke_interval=5,
-    dag=dag_att,
+    python_callable=AUTH_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
 )
 
-auth_model_event_handler = XOSModelSensor(
-    task_id='auth_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
     task_id='dhcp_event_handler',
-    topic="dhcp.events",
-    key_field='serialNumber',
-    provide_context=True,
     python_callable=DHCP_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
 )
 
-dhcp_model_event_handler = XOSModelSensor(
-    task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+    task_id='att_model_event_sensor1',
     model_name='AttWorkflowDriverServiceInstance',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-onu_event_handler >> onu_model_event_handler
-auth_event_handler << onu_model_event_handler
-auth_event_handler >> auth_model_event_handler
-dhcp_event_handler << auth_model_event_handler
-dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+    task_id='att_model_event_handler1',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor1',
+    dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+    task_id='att_model_event_sensor2',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+    task_id='att_model_event_handler2',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor2',
+    dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+    task_id='att_model_event_sensor3',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+    task_id='att_model_event_handler3',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor3',
+    dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1
+att_model_event_handler1 >> auth_event_sensor
+
+att_model_event_handler2 << att_model_event_sensor2 << auth_event_handler << auth_event_sensor
+
+att_model_event_handler2 >> dhcp_event_sensor
+dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/test/workflow_examples/left_right_mix_dag.py.expected.json b/test/workflow_examples/left_right_mix_dag.py.expected.json
index 05607ba..6d4c367 100644
--- a/test/workflow_examples/left_right_mix_dag.py.expected.json
+++ b/test/workflow_examples/left_right_mix_dag.py.expected.json
@@ -5,122 +5,218 @@
             "local_variable": "dag_att"
         },
         "dependencies": {
-            "auth_event_handler": {
+            "att_model_event_handler1": {
                 "children": [
-                    "auth_model_event_handler"
+                    "auth_event_sensor"
                 ],
                 "parents": [
-                    "onu_model_event_handler"
+                    "att_model_event_sensor1"
                 ]
             },
-            "auth_model_event_handler": {
+            "att_model_event_handler2": {
                 "children": [
-                    "dhcp_event_handler"
+                    "dhcp_event_sensor"
+                ],
+                "parents": [
+                    "att_model_event_sensor2"
+                ]
+            },
+            "att_model_event_handler3": {
+                "parents": [
+                    "att_model_event_sensor3"
+                ]
+            },
+            "att_model_event_sensor1": {
+                "children": [
+                    "att_model_event_handler1"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            },
+            "att_model_event_sensor2": {
+                "children": [
+                    "att_model_event_handler2"
                 ],
                 "parents": [
                     "auth_event_handler"
                 ]
             },
+            "att_model_event_sensor3": {
+                "children": [
+                    "att_model_event_handler3"
+                ],
+                "parents": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "auth_event_handler": {
+                "children": [
+                    "att_model_event_sensor2"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ],
+                "parents": [
+                    "att_model_event_handler1"
+                ]
+            },
             "dhcp_event_handler": {
                 "children": [
-                    "dhcp_model_event_handler"
+                    "att_model_event_sensor3"
                 ],
                 "parents": [
-                    "auth_model_event_handler"
+                    "dhcp_event_sensor"
                 ]
             },
-            "dhcp_model_event_handler": {
-                "parents": [
+            "dhcp_event_sensor": {
+                "children": [
                     "dhcp_event_handler"
+                ],
+                "parents": [
+                    "att_model_event_handler2"
                 ]
             },
             "onu_event_handler": {
                 "children": [
-                    "onu_model_event_handler"
-                ]
-            },
-            "onu_model_event_handler": {
-                "children": [
-                    "auth_event_handler"
+                    "att_model_event_sensor1"
                 ],
                 "parents": [
+                    "onu_event_sensor"
+                ]
+            },
+            "onu_event_sensor": {
+                "children": [
                     "onu_event_handler"
                 ]
             }
         },
         "tasks": {
-            "auth_event_handler": {
-                "class": "XOSEventSensor",
+            "att_model_event_handler1": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor1",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler1",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler1"
+            },
+            "att_model_event_handler2": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor2",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler2",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler2"
+            },
+            "att_model_event_handler3": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor3",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler3",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler3"
+            },
+            "att_model_event_sensor1": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "auth_event_handler",
+                "local_variable": "att_model_event_sensor1",
+                "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "Auth_event",
-                "task_id": "auth_event_handler",
+                "task_id": "att_model_event_sensor1"
+            },
+            "att_model_event_sensor2": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor2",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor2"
+            },
+            "att_model_event_sensor3": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor3",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor3"
+            },
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "auth_event_handler",
+                "python_callable": "AUTH_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
                 "topic": "authentication.events"
             },
-            "auth_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "auth_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "auth_model_event_handler"
-            },
             "dhcp_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "dhcp_event_handler",
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "dhcp_event_handler",
+                "local_variable": "dhcp_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DHCP_event",
-                "task_id": "dhcp_event_handler",
+                "task_id": "dhcp_event_sensor",
                 "topic": "dhcp.events"
             },
-            "dhcp_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "dhcp_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "dhcp_model_event_handler"
-            },
             "onu_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
                 "python_callable": "ONU_event",
-                "task_id": "onu_event_handler",
-                "topic": "onu.events"
+                "task_id": "onu_event_handler"
             },
-            "onu_model_event_handler": {
-                "class": "XOSModelSensor",
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "onu_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
+                "local_variable": "onu_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "onu_model_event_handler"
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/left_right_mix_dag2.py b/test/workflow_examples/left_right_mix_dag2.py
index 6fa1df1..697edc3 100644
--- a/test/workflow_examples/left_right_mix_dag2.py
+++ b/test/workflow_examples/left_right_mix_dag2.py
@@ -16,19 +16,19 @@
 Example AT&T workflow using Airflow
 """
 import json
-import logging  
-from datetime import datetime
-
+import logging
 import airflow
+from datetime import datetime
 from airflow import DAG
 from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
 
 from att_helpers import *
 from att_service_instance_funcs import *
 
+
 args = {
     'start_date': datetime.utcnow(),
     'owner': 'ATT',
@@ -44,39 +44,67 @@
 dag_att.doc_md = __doc__
 
 
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
 
-    logging.info("onu.events: received event", event=event)
+    logging.info('onu.events: received event', message=message)
 
-    si = find_or_create_att_si(model_accessor, logging, event)
-    if event["status"] == "activated":
-        logging.info("onu.events: activated onu", value=event)
+    si = find_or_create_att_si(model_accessor, logging, message)
+    if message['status'] == 'activated':
+        logging.info('onu.events: activated onu', message=message)
         si.no_sync = False
-        si.uni_port_id = long(event["portNumber"])
-        si.of_dpid = event["deviceId"]
-        si.oper_onu_status = "ENABLED"
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
         si.save_changed_fields(always_update_timestamp=True)
-    elif event["status"] == "disabled":
-        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
-        si.oper_onu_status = "DISABLED"
+    elif message['status'] == 'disabled':
+        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+        si.oper_onu_status = 'DISABLED'
         si.save_changed_fields(always_update_timestamp=True)
     else:
-        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
-        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
 
 
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
-    
+
+    logging.info('authentication.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('authentication.events: Updating service instance', si=si)
+    si.authentication_state = message['authenticationState']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('dhcp.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('dhcp.events: Updating service instance', si=si)
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    event_type = message['event_type']
+
     go = False
     if event_type == 'create':
-        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
         go = True
     elif event_type == 'update':
-        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
                           (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
         go = True
     elif event_type == 'delete':
@@ -107,90 +135,71 @@
     si.save_changed_fields()
 
 
-def Auth_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("authentication.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("authentication.events: Updating service instance", si=si)
-    si.authentication_state = event["authenticationState"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("dhcp.events: Updating service instance", si=si)
-    si.dhcp_state = event["messageType"]
-    si.ip_address = event["ipAddress"]
-    si.mac_address = event["macAddress"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
-    task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
     topic='onu.events',
     key_field='serialNumber',
-    provide_context=True,
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
     python_callable=ONU_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
 )
 
-onu_model_event_handler = XOSModelSensor(
-    task_id='onu_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
     task_id='auth_event_handler',
-    topic="authentication.events",
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=Auth_event,
-    poke_interval=5,
-    dag=dag_att,
+    python_callable=AUTH_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
 )
 
-auth_model_event_handler = XOSModelSensor(
-    task_id='auth_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
     task_id='dhcp_event_handler',
-    topic="dhcp.events",
-    key_field='serialNumber',
-    provide_context=True,
     python_callable=DHCP_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
 )
 
-dhcp_model_event_handler = XOSModelSensor(
-    task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+    task_id='att_model_event_sensor1',
     model_name='AttWorkflowDriverServiceInstance',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-onu_event_handler >> onu_model_event_handler << auth_event_handler
+att_model_event_handler1 = CORDModelOperator(
+    task_id='att_model_event_handler1',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor1',
+    dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 << auth_event_handler << auth_event_sensor
+dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor1
+att_model_event_sensor1 >> att_model_event_handler1
diff --git a/test/workflow_examples/left_right_mix_dag2.py.expected.json b/test/workflow_examples/left_right_mix_dag2.py.expected.json
index 7d9430b..ce05e7e 100644
--- a/test/workflow_examples/left_right_mix_dag2.py.expected.json
+++ b/test/workflow_examples/left_right_mix_dag2.py.expected.json
@@ -5,96 +5,142 @@
             "local_variable": "dag_att"
         },
         "dependencies": {
+            "att_model_event_handler1": {
+                "parents": [
+                    "att_model_event_sensor1"
+                ]
+            },
+            "att_model_event_sensor1": {
+                "children": [
+                    "att_model_event_handler1"
+                ],
+                "parents": [
+                    "onu_event_handler",
+                    "dhcp_event_handler",
+                    "auth_event_handler"
+                ]
+            },
             "auth_event_handler": {
                 "children": [
-                    "onu_model_event_handler"
+                    "att_model_event_sensor1"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "att_model_event_sensor1"
+                ],
+                "parents": [
+                    "dhcp_event_sensor"
+                ]
+            },
+            "dhcp_event_sensor": {
+                "children": [
+                    "dhcp_event_handler"
                 ]
             },
             "onu_event_handler": {
                 "children": [
-                    "onu_model_event_handler"
+                    "att_model_event_sensor1"
+                ],
+                "parents": [
+                    "onu_event_sensor"
                 ]
             },
-            "onu_model_event_handler": {
-                "parents": [
-                    "onu_event_handler",
-                    "auth_event_handler"
+            "onu_event_sensor": {
+                "children": [
+                    "onu_event_handler"
                 ]
             }
         },
         "tasks": {
-            "auth_event_handler": {
-                "class": "XOSEventSensor",
+            "att_model_event_handler1": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor1",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler1",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler1"
+            },
+            "att_model_event_sensor1": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "auth_event_handler",
+                "local_variable": "att_model_event_sensor1",
+                "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "Auth_event",
-                "task_id": "auth_event_handler",
+                "task_id": "att_model_event_sensor1"
+            },
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "auth_event_handler",
+                "python_callable": "AUTH_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
                 "topic": "authentication.events"
             },
-            "auth_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "auth_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "auth_model_event_handler"
-            },
             "dhcp_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "dhcp_event_handler",
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "dhcp_event_handler",
+                "local_variable": "dhcp_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DHCP_event",
-                "task_id": "dhcp_event_handler",
+                "task_id": "dhcp_event_sensor",
                 "topic": "dhcp.events"
             },
-            "dhcp_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "dhcp_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "dhcp_model_event_handler"
-            },
             "onu_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
                 "python_callable": "ONU_event",
-                "task_id": "onu_event_handler",
-                "topic": "onu.events"
+                "task_id": "onu_event_handler"
             },
-            "onu_model_event_handler": {
-                "class": "XOSModelSensor",
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "onu_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
+                "local_variable": "onu_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "onu_model_event_handler"
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/multi_children_parents_dag.py b/test/workflow_examples/multi_children_parents_dag.py
index 8e59e22..88256b6 100644
--- a/test/workflow_examples/multi_children_parents_dag.py
+++ b/test/workflow_examples/multi_children_parents_dag.py
@@ -16,19 +16,19 @@
 Example AT&T workflow using Airflow
 """
 import json
-import logging  
-from datetime import datetime
-
+import logging
 import airflow
+from datetime import datetime
 from airflow import DAG
 from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
 
 from att_helpers import *
 from att_service_instance_funcs import *
 
+
 args = {
     'start_date': datetime.utcnow(),
     'owner': 'ATT',
@@ -44,39 +44,67 @@
 dag_att.doc_md = __doc__
 
 
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
 
-    logging.info("onu.events: received event", event=event)
+    logging.info('onu.events: received event', message=message)
 
-    si = find_or_create_att_si(model_accessor, logging, event)
-    if event["status"] == "activated":
-        logging.info("onu.events: activated onu", value=event)
+    si = find_or_create_att_si(model_accessor, logging, message)
+    if message['status'] == 'activated':
+        logging.info('onu.events: activated onu', message=message)
         si.no_sync = False
-        si.uni_port_id = long(event["portNumber"])
-        si.of_dpid = event["deviceId"]
-        si.oper_onu_status = "ENABLED"
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
         si.save_changed_fields(always_update_timestamp=True)
-    elif event["status"] == "disabled":
-        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
-        si.oper_onu_status = "DISABLED"
+    elif message['status'] == 'disabled':
+        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+        si.oper_onu_status = 'DISABLED'
         si.save_changed_fields(always_update_timestamp=True)
     else:
-        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
-        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
 
 
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
-    
+
+    logging.info('authentication.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('authentication.events: Updating service instance', si=si)
+    si.authentication_state = message['authenticationState']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('dhcp.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('dhcp.events: Updating service instance', si=si)
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    event_type = message['event_type']
+
     go = False
     if event_type == 'create':
-        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
         go = True
     elif event_type == 'update':
-        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
                           (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
         go = True
     elif event_type == 'delete':
@@ -107,91 +135,73 @@
     si.save_changed_fields()
 
 
-def Auth_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("authentication.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("authentication.events: Updating service instance", si=si)
-    si.authentication_state = event["authenticationState"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("dhcp.events: Updating service instance", si=si)
-    si.dhcp_state = event["messageType"]
-    si.ip_address = event["ipAddress"]
-    si.mac_address = event["macAddress"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
-    task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
     topic='onu.events',
     key_field='serialNumber',
-    provide_context=True,
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
     python_callable=ONU_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
 )
 
-onu_model_event_handler = XOSModelSensor(
-    task_id='onu_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
     task_id='auth_event_handler',
-    topic="authentication.events",
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=Auth_event,
-    poke_interval=5,
-    dag=dag_att,
+    python_callable=AUTH_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
 )
 
-auth_model_event_handler = XOSModelSensor(
-    task_id='auth_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
     task_id='dhcp_event_handler',
-    topic="dhcp.events",
-    key_field='serialNumber',
-    provide_context=True,
     python_callable=DHCP_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
 )
 
-dhcp_model_event_handler = XOSModelSensor(
-    task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+    task_id='att_model_event_sensor1',
     model_name='AttWorkflowDriverServiceInstance',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-onu_event_handler >> [onu_model_event_handler, auth_model_event_handler, dhcp_model_event_handler] >> \
-    auth_event_handler >> dhcp_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+    task_id='att_model_event_handler1',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor1',
+    dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler
+auth_event_sensor >> auth_event_handler
+dhcp_event_sensor >> dhcp_event_handler
+
+[onu_event_handler, auth_event_handler, dhcp_event_handler] >> att_model_event_sensor1 >> att_model_event_handler1
diff --git a/test/workflow_examples/multi_children_parents_dag.py.expected.json b/test/workflow_examples/multi_children_parents_dag.py.expected.json
index e0dc284..130b8f8 100644
--- a/test/workflow_examples/multi_children_parents_dag.py.expected.json
+++ b/test/workflow_examples/multi_children_parents_dag.py.expected.json
@@ -5,126 +5,142 @@
             "local_variable": "dag_att"
         },
         "dependencies": {
-            "auth_event_handler": {
-                "children": [
-                    "dhcp_event_handler"
-                ],
+            "att_model_event_handler1": {
                 "parents": [
-                    "onu_model_event_handler",
-                    "auth_model_event_handler",
-                    "dhcp_model_event_handler"
+                    "att_model_event_sensor1"
                 ]
             },
-            "auth_model_event_handler": {
+            "att_model_event_sensor1": {
                 "children": [
-                    "auth_event_handler"
+                    "att_model_event_handler1"
                 ],
                 "parents": [
-                    "onu_event_handler"
+                    "onu_event_handler",
+                    "auth_event_handler",
+                    "dhcp_event_handler"
+                ]
+            },
+            "auth_event_handler": {
+                "children": [
+                    "att_model_event_sensor1"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
                 ]
             },
             "dhcp_event_handler": {
-                "parents": [
-                    "auth_event_handler"
-                ]
-            },
-            "dhcp_model_event_handler": {
                 "children": [
-                    "auth_event_handler"
+                    "att_model_event_sensor1"
                 ],
                 "parents": [
-                    "onu_event_handler"
+                    "dhcp_event_sensor"
+                ]
+            },
+            "dhcp_event_sensor": {
+                "children": [
+                    "dhcp_event_handler"
                 ]
             },
             "onu_event_handler": {
                 "children": [
-                    "onu_model_event_handler",
-                    "auth_model_event_handler",
-                    "dhcp_model_event_handler"
-                ]
-            },
-            "onu_model_event_handler": {
-                "children": [
-                    "auth_event_handler"
+                    "att_model_event_sensor1"
                 ],
                 "parents": [
+                    "onu_event_sensor"
+                ]
+            },
+            "onu_event_sensor": {
+                "children": [
                     "onu_event_handler"
                 ]
             }
         },
         "tasks": {
-            "auth_event_handler": {
-                "class": "XOSEventSensor",
+            "att_model_event_handler1": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor1",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler1",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler1"
+            },
+            "att_model_event_sensor1": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "auth_event_handler",
+                "local_variable": "att_model_event_sensor1",
+                "model_name": "AttWorkflowDriverServiceInstance",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "Auth_event",
-                "task_id": "auth_event_handler",
+                "task_id": "att_model_event_sensor1"
+            },
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "auth_event_handler",
+                "python_callable": "AUTH_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
                 "topic": "authentication.events"
             },
-            "auth_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "auth_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "auth_model_event_handler"
-            },
             "dhcp_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "dhcp_event_handler",
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "dhcp_event_handler",
+                "local_variable": "dhcp_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DHCP_event",
-                "task_id": "dhcp_event_handler",
+                "task_id": "dhcp_event_sensor",
                 "topic": "dhcp.events"
             },
-            "dhcp_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att",
-                "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
-                "local_variable": "dhcp_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "dhcp_model_event_handler"
-            },
             "onu_event_handler": {
-                "class": "XOSEventSensor",
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
-                "key_field": "serialNumber",
                 "local_variable": "onu_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
                 "python_callable": "ONU_event",
-                "task_id": "onu_event_handler",
-                "topic": "onu.events"
+                "task_id": "onu_event_handler"
             },
-            "onu_model_event_handler": {
-                "class": "XOSModelSensor",
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
                 "dag": "dag_att",
                 "dag_id": "att_workflow_onu",
                 "key_field": "serialNumber",
-                "local_variable": "onu_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
+                "local_variable": "onu_event_sensor",
                 "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "onu_model_event_handler"
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
             }
         }
     }
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/multi_dags.py b/test/workflow_examples/multi_dags.py
new file mode 100644
index 0000000..9cda441
--- /dev/null
+++ b/test/workflow_examples/multi_dags.py
@@ -0,0 +1,237 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+import airflow
+from datetime import datetime
+from airflow import DAG
+from airflow import AirflowException
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+
+args = {
+    'start_date': datetime.utcnow(),
+    'owner': 'ATT',
+}
+
+dag_att = DAG(
+    dag_id='att_workflow_onu',
+    default_args=args,
+    # this dag will be triggered by external systems
+    schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('onu.events: received event', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    if message['status'] == 'activated':
+        logging.info('onu.events: activated onu', message=message)
+        si.no_sync = False
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
+        si.save_changed_fields(always_update_timestamp=True)
+    elif message['status'] == 'disabled':
+        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+        si.oper_onu_status = 'DISABLED'
+        si.save_changed_fields(always_update_timestamp=True)
+    else:
+        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
+
+
+def AUTH_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('authentication.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('authentication.events: Updating service instance', si=si)
+    si.authentication_state = message['authenticationState']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('dhcp.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('dhcp.events: Updating service instance', si=si)
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    event_type = message['event_type']
+
+    go = False
+    if event_type == 'create':
+        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
+        go = True
+    elif event_type == 'update':
+        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
+                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+        go = True
+    elif event_type == 'delete':
+        pass
+    else:
+        pass
+
+    if not go:
+        return
+
+    # handle only create & update events
+
+    # Changing ONU state can change auth state
+    # Changing auth state can change DHCP state
+    # So need to process in this order
+    process_onu_state(model_accessor, si)
+    process_auth_state(si)
+    process_dhcp_state(si)
+
+    validate_states(si)
+
+    # handling the subscriber status
+    # It's a combination of all the other states
+    subscriber = get_subscriber(model_accessor, si.serial_number)
+    if subscriber:
+        update_subscriber(model_accessor, subscriber, si)
+
+    si.save_changed_fields()
+
+
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
+    topic='onu.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
+    python_callable=ONU_event,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
+)
+
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+auth_event_handler = CORDModelOperator(
+    task_id='auth_event_handler',
+    python_callable=AUTH_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
+)
+
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+dhcp_event_handler = CORDModelOperator(
+    task_id='dhcp_event_handler',
+    python_callable=DHCP_event,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
+)
+
+att_model_event_sensor1 = CORDModelSensor(
+    task_id='att_model_event_sensor1',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler1 = CORDModelOperator(
+    task_id='att_model_event_handler1',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor1',
+    dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+    task_id='att_model_event_sensor2',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+    task_id='att_model_event_handler2',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor2',
+    dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+    task_id='att_model_event_sensor3',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+    task_id='att_model_event_handler3',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor3',
+    dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1
+auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2
+dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/test/workflow_examples/multi_dags.py.expected.json b/test/workflow_examples/multi_dags.py.expected.json
new file mode 100644
index 0000000..89f2e4c
--- /dev/null
+++ b/test/workflow_examples/multi_dags.py.expected.json
@@ -0,0 +1,210 @@
+{
+    "att_workflow_onu": {
+        "dag": {
+            "dag_id": "att_workflow_onu",
+            "local_variable": "dag_att"
+        },
+        "dependencies": {
+            "att_model_event_handler1": {
+                "parents": [
+                    "att_model_event_sensor1"
+                ]
+            },
+            "att_model_event_handler2": {
+                "parents": [
+                    "att_model_event_sensor2"
+                ]
+            },
+            "att_model_event_handler3": {
+                "parents": [
+                    "att_model_event_sensor3"
+                ]
+            },
+            "att_model_event_sensor1": {
+                "children": [
+                    "att_model_event_handler1"
+                ],
+                "parents": [
+                    "onu_event_handler"
+                ]
+            },
+            "att_model_event_sensor2": {
+                "children": [
+                    "att_model_event_handler2"
+                ],
+                "parents": [
+                    "auth_event_handler"
+                ]
+            },
+            "att_model_event_sensor3": {
+                "children": [
+                    "att_model_event_handler3"
+                ],
+                "parents": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "auth_event_handler": {
+                "children": [
+                    "att_model_event_sensor2"
+                ],
+                "parents": [
+                    "auth_event_sensor"
+                ]
+            },
+            "auth_event_sensor": {
+                "children": [
+                    "auth_event_handler"
+                ]
+            },
+            "dhcp_event_handler": {
+                "children": [
+                    "att_model_event_sensor3"
+                ],
+                "parents": [
+                    "dhcp_event_sensor"
+                ]
+            },
+            "dhcp_event_sensor": {
+                "children": [
+                    "dhcp_event_handler"
+                ]
+            },
+            "onu_event_handler": {
+                "children": [
+                    "att_model_event_sensor1"
+                ],
+                "parents": [
+                    "onu_event_sensor"
+                ]
+            },
+            "onu_event_sensor": {
+                "children": [
+                    "onu_event_handler"
+                ]
+            }
+        },
+        "tasks": {
+            "att_model_event_handler1": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor1",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler1",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler1"
+            },
+            "att_model_event_handler2": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor2",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler2",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler2"
+            },
+            "att_model_event_handler3": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "att_model_event_sensor3",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "att_model_event_handler3",
+                "python_callable": "DriverService_event",
+                "task_id": "att_model_event_handler3"
+            },
+            "att_model_event_sensor1": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor1",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor1"
+            },
+            "att_model_event_sensor2": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor2",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor2"
+            },
+            "att_model_event_sensor3": {
+                "class": "CORDModelSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "att_model_event_sensor3",
+                "model_name": "AttWorkflowDriverServiceInstance",
+                "poke_interval": 5,
+                "task_id": "att_model_event_sensor3"
+            },
+            "auth_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "auth_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "auth_event_handler",
+                "python_callable": "AUTH_event",
+                "task_id": "auth_event_handler"
+            },
+            "auth_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "auth_event_sensor",
+                "poke_interval": 5,
+                "task_id": "auth_event_sensor",
+                "topic": "authentication.events"
+            },
+            "dhcp_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "dhcp_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "dhcp_event_handler",
+                "python_callable": "DHCP_event",
+                "task_id": "dhcp_event_handler"
+            },
+            "dhcp_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "dhcp_event_sensor",
+                "poke_interval": 5,
+                "task_id": "dhcp_event_sensor",
+                "topic": "dhcp.events"
+            },
+            "onu_event_handler": {
+                "class": "CORDModelOperator",
+                "cord_event_sensor_task_id": "onu_event_sensor",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "local_variable": "onu_event_handler",
+                "python_callable": "ONU_event",
+                "task_id": "onu_event_handler"
+            },
+            "onu_event_sensor": {
+                "class": "CORDEventSensor",
+                "controller_conn_id": "local_cord_controller",
+                "dag": "dag_att",
+                "dag_id": "att_workflow_onu",
+                "key_field": "serialNumber",
+                "local_variable": "onu_event_sensor",
+                "poke_interval": 5,
+                "task_id": "onu_event_sensor",
+                "topic": "onu.events"
+            }
+        }
+    }
+}
\ No newline at end of file
diff --git a/test/workflow_examples/two_dags.py b/test/workflow_examples/two_dags.py
deleted file mode 100644
index 1906881..0000000
--- a/test/workflow_examples/two_dags.py
+++ /dev/null
@@ -1,184 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Example AT&T workflow using Airflow
-"""
-import json
-import logging  
-from datetime import datetime
-
-import airflow
-from airflow import DAG
-from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
-
-from att_helpers import *
-from att_service_instance_funcs import *
-
-args = {
-    'start_date': datetime.utcnow(),
-    'owner': 'ATT',
-}
-
-dag_att1 = DAG(
-    dag_id='att_workflow_onu1',
-    default_args=args,
-    # this dag will be triggered by external systems
-    schedule_interval=None,
-)
-
-dag_att2 = DAG(
-    dag_id='att_workflow_onu2',
-    default_args=args,
-    # this dag will be triggered by external systems
-    schedule_interval=None,
-)
-
-dag_att1.doc_md = __doc__
-dag_att2.doc_md = __doc__
-
-def ONU_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("onu.events: received event", event=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    if event["status"] == "activated":
-        logging.info("onu.events: activated onu", value=event)
-        si.no_sync = False
-        si.uni_port_id = long(event["portNumber"])
-        si.of_dpid = event["deviceId"]
-        si.oper_onu_status = "ENABLED"
-        si.save_changed_fields(always_update_timestamp=True)
-    elif event["status"] == "disabled":
-        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
-        si.oper_onu_status = "DISABLED"
-        si.save_changed_fields(always_update_timestamp=True)
-    else:
-        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
-        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
-
-
-def DriverService_event(event_type, model_accessor, si, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-    
-    go = False
-    if event_type == 'create':
-        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
-        go = True
-    elif event_type == 'update':
-        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
-                          (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
-        go = True
-    elif event_type == 'delete':
-        pass
-    else:
-        pass
-
-    if not go:
-        return
-
-    # handle only create & update events
-
-    # Changing ONU state can change auth state
-    # Changing auth state can change DHCP state
-    # So need to process in this order
-    process_onu_state(model_accessor, si)
-    process_auth_state(si)
-    process_dhcp_state(si)
-
-    validate_states(si)
-
-    # handling the subscriber status
-    # It's a combination of all the other states
-    subscriber = get_subscriber(model_accessor, si.serial_number)
-    if subscriber:
-        update_subscriber(model_accessor, subscriber, si)
-
-    si.save_changed_fields()
-
-
-def Auth_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("authentication.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("authentication.events: Updating service instance", si=si)
-    si.authentication_state = event["authenticationState"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("dhcp.events: Updating service instance", si=si)
-    si.dhcp_state = event["messageType"]
-    si.ip_address = event["ipAddress"]
-    si.mac_address = event["macAddress"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
-    task_id='onu_event_handler',
-    topic='onu.events',
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=ONU_event,
-    poke_interval=5,
-    dag=dag_att1,
-)
-
-onu_model_event_handler = XOSModelSensor(
-    task_id='onu_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
-    poke_interval=5,
-    dag=dag_att1,
-)
-
-auth_event_handler = XOSEventSensor(
-    task_id='auth_event_handler',
-    topic="authentication.events",
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=Auth_event,
-    poke_interval=5,
-    dag=dag_att2,
-)
-
-auth_model_event_handler = XOSModelSensor(
-    task_id='auth_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
-    poke_interval=5,
-    dag=dag_att2,
-)
-
-onu_event_handler >> onu_model_event_handler
-auth_event_handler >> auth_model_event_handler
diff --git a/test/workflow_examples/two_dags.py.expected.json b/test/workflow_examples/two_dags.py.expected.json
deleted file mode 100644
index dac7c12..0000000
--- a/test/workflow_examples/two_dags.py.expected.json
+++ /dev/null
@@ -1,90 +0,0 @@
-{
-    "att_workflow_onu1": {
-        "dag": {
-            "dag_id": "att_workflow_onu1",
-            "local_variable": "dag_att1"
-        },
-        "dependencies": {
-            "onu_event_handler": {
-                "children": [
-                    "onu_model_event_handler"
-                ]
-            },
-            "onu_model_event_handler": {
-                "parents": [
-                    "onu_event_handler"
-                ]
-            }
-        },
-        "tasks": {
-            "onu_event_handler": {
-                "class": "XOSEventSensor",
-                "dag": "dag_att1",
-                "dag_id": "att_workflow_onu1",
-                "key_field": "serialNumber",
-                "local_variable": "onu_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "ONU_event",
-                "task_id": "onu_event_handler",
-                "topic": "onu.events"
-            },
-            "onu_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att1",
-                "dag_id": "att_workflow_onu1",
-                "key_field": "serialNumber",
-                "local_variable": "onu_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "onu_model_event_handler"
-            }
-        }
-    },
-    "att_workflow_onu2": {
-        "dag": {
-            "dag_id": "att_workflow_onu2",
-            "local_variable": "dag_att2"
-        },
-        "dependencies": {
-            "auth_event_handler": {
-                "children": [
-                    "auth_model_event_handler"
-                ]
-            },
-            "auth_model_event_handler": {
-                "parents": [
-                    "auth_event_handler"
-                ]
-            }
-        },
-        "tasks": {
-            "auth_event_handler": {
-                "class": "XOSEventSensor",
-                "dag": "dag_att2",
-                "dag_id": "att_workflow_onu2",
-                "key_field": "serialNumber",
-                "local_variable": "auth_event_handler",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "Auth_event",
-                "task_id": "auth_event_handler",
-                "topic": "authentication.events"
-            },
-            "auth_model_event_handler": {
-                "class": "XOSModelSensor",
-                "dag": "dag_att2",
-                "dag_id": "att_workflow_onu2",
-                "key_field": "serialNumber",
-                "local_variable": "auth_model_event_handler",
-                "model_name": "AttWorkflowDriverServiceInstance",
-                "poke_interval": 5,
-                "provide_context": true,
-                "python_callable": "DriverService_event",
-                "task_id": "auth_model_event_handler"
-            }
-        }
-    }
-}
diff --git a/workflow_examples/att-workflow/att_dag.py b/workflow_examples/att-workflow/att_dag.py
index 7df5d03..a464176 100644
--- a/workflow_examples/att-workflow/att_dag.py
+++ b/workflow_examples/att-workflow/att_dag.py
@@ -16,19 +16,19 @@
 Example AT&T workflow using Airflow
 """
 import json
-import logging  
-from datetime import datetime
-
+import logging
 import airflow
+from datetime import datetime
 from airflow import DAG
 from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
 
 from att_helpers import *
 from att_service_instance_funcs import *
 
+
 args = {
     'start_date': datetime.utcnow(),
     'owner': 'ATT',
@@ -44,39 +44,67 @@
 dag_att.doc_md = __doc__
 
 
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
 
-    logging.info("onu.events: received event", event=event)
+    logging.info('onu.events: received event', message=message)
 
-    si = find_or_create_att_si(model_accessor, logging, event)
-    if event["status"] == "activated":
-        logging.info("onu.events: activated onu", value=event)
+    si = find_or_create_att_si(model_accessor, logging, message)
+    if message['status'] == 'activated':
+        logging.info('onu.events: activated onu', message=message)
         si.no_sync = False
-        si.uni_port_id = long(event["portNumber"])
-        si.of_dpid = event["deviceId"]
-        si.oper_onu_status = "ENABLED"
+        si.uni_port_id = long(message['portNumber'])
+        si.of_dpid = message['deviceId']
+        si.oper_onu_status = 'ENABLED'
         si.save_changed_fields(always_update_timestamp=True)
-    elif event["status"] == "disabled":
-        logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
-        si.oper_onu_status = "DISABLED"
+    elif message['status'] == 'disabled':
+        logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+        si.oper_onu_status = 'DISABLED'
         si.save_changed_fields(always_update_timestamp=True)
     else:
-        logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
-        raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+        logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+        raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
 
 
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
     #context = kwargs
     #run_id = context['dag_run'].run_id
-    
+
+    logging.info('authentication.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('authentication.events: Updating service instance', si=si)
+    si.authentication_state = message['authenticationState']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    logging.info('dhcp.events: Got event for subscriber', message=message)
+
+    si = find_or_create_att_si(model_accessor, logging, message)
+    logging.debug('dhcp.events: Updating service instance', si=si)
+    si.dhcp_state = message['messageType']
+    si.ip_address = message['ipAddress']
+    si.mac_address = message['macAddress']
+    si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+    #context = kwargs
+    #run_id = context['dag_run'].run_id
+
+    event_type = message['event_type']
+
     go = False
     if event_type == 'create':
-        logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+        logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
         go = True
     elif event_type == 'update':
-        logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+        logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
                           (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
         go = True
     elif event_type == 'delete':
@@ -107,92 +135,103 @@
     si.save_changed_fields()
 
 
-def Auth_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("authentication.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("authentication.events: Updating service instance", si=si)
-    si.authentication_state = event["authenticationState"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
-    #context = kwargs
-    #run_id = context['dag_run'].run_id
-
-    logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
-    si = find_or_create_att_si(model_accessor, logging, event)
-    logging.debug("dhcp.events: Updating service instance", si=si)
-    si.dhcp_state = event["messageType"]
-    si.ip_address = event["ipAddress"]
-    si.mac_address = event["macAddress"]
-    si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
-    task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+    task_id='onu_event_sensor',
     topic='onu.events',
     key_field='serialNumber',
-    provide_context=True,
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+    task_id='onu_event_handler',
     python_callable=ONU_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='onu_event_sensor',
+    dag=dag_att
 )
 
-onu_model_event_handler = XOSModelSensor(
-    task_id='onu_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+    task_id='auth_event_sensor',
+    topic='authentication.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
     task_id='auth_event_handler',
-    topic="authentication.events",
-    key_field='serialNumber',
-    provide_context=True,
-    python_callable=Auth_event,
-    poke_interval=5,
-    dag=dag_att,
+    python_callable=AUTH_event,
+    cord_event_sensor_task_id='auth_event_sensor',
+    dag=dag_att
 )
 
-auth_model_event_handler = XOSModelSensor(
-    task_id='auth_model_event_handler',
-    model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+    task_id='dhcp_event_sensor',
+    topic='dhcp.events',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
     task_id='dhcp_event_handler',
-    topic="dhcp.events",
-    key_field='serialNumber',
-    provide_context=True,
     python_callable=DHCP_event,
-    poke_interval=5,
-    dag=dag_att,
+    cord_event_sensor_task_id='dhcp_event_sensor',
+    dag=dag_att
 )
 
-dhcp_model_event_handler = XOSModelSensor(
-    task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+    task_id='att_model_event_sensor1',
     model_name='AttWorkflowDriverServiceInstance',
     key_field='serialNumber',
-    provide_context=True,
-    python_callable=DriverService_event,
+    controller_conn_id='local_cord_controller',
     poke_interval=5,
-    dag=dag_att,
+    dag=dag_att
 )
 
-onu_event_handler >> onu_model_event_handler >> \
-    auth_event_handler >> auth_model_event_handler >> \
-    dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+    task_id='att_model_event_handler1',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor1',
+    dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+    task_id='att_model_event_sensor2',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+    task_id='att_model_event_handler2',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor2',
+    dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+    task_id='att_model_event_sensor3',
+    model_name='AttWorkflowDriverServiceInstance',
+    key_field='serialNumber',
+    controller_conn_id='local_cord_controller',
+    poke_interval=5,
+    dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+    task_id='att_model_event_handler3',
+    python_callable=DriverService_event,
+    cord_event_sensor_task_id='att_model_event_sensor3',
+    dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
+    auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
+    dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3