Raise InputError when input parameter is wrong
Separate roles of Airflow sensor and operator in CORD Event handling
Update all workflow examples to reflect the sensor/operator design change
Change-Id: I3a00698c7744e67708f3bacc9cd4023ac4fbc02a
Report task status to controller
Change-Id: I3cc5be25421bcd12bd298b363a5131ef33d0f174
diff --git a/VERSION b/VERSION
index 50e6e9d..341cf11 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.0-dev1
\ No newline at end of file
+0.2.0
\ No newline at end of file
diff --git a/requirements.txt b/requirements.txt
index f35ec37..1b0f05e 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -4,4 +4,4 @@
multistructlog~=2.1.0
requests~=2.22.0
pyfiglet~=0.7
-cord-workflow-controller-client~=0.1.0
+cord-workflow-controller-client~=0.2.0
diff --git a/src/cord_workflow_airflow_extensions/essence_extractor.py b/src/cord_workflow_airflow_extensions/essence_extractor.py
index 901d40f..72c8fdb 100644
--- a/src/cord_workflow_airflow_extensions/essence_extractor.py
+++ b/src/cord_workflow_airflow_extensions/essence_extractor.py
@@ -21,7 +21,7 @@
Following information will be extracted from workflow code
- DAG info
- Operator info
- - XOS-related operators
+ - CORD-related operators
- Airflow operators
- Dependency info
"""
@@ -388,9 +388,9 @@
dags[dagid] = dag
return dags
- def __extract_XOS_event_sensors(self, tree):
+ def __extract_CORD_event_sensors(self, tree):
operators = {}
- calls = self.__extract_func_calls(tree, "XOSEventSensor")
+ calls = self.__extract_func_calls(tree, "CORDEventSensor")
if calls:
for call in calls:
operator = self.__make_airflow_operator(call)
@@ -398,9 +398,19 @@
operators[operatorid] = operator
return operators
- def __extract_XOS_model_sensors(self, tree):
+ def __extract_CORD_model_sensors(self, tree):
operators = {}
- calls = self.__extract_func_calls(tree, "XOSModelSensor")
+ calls = self.__extract_func_calls(tree, "CORDModelSensor")
+ if calls:
+ for call in calls:
+ operator = self.__make_airflow_operator(call)
+ operatorid = operator["task_id"]
+ operators[operatorid] = operator
+ return operators
+
+ def __extract_CORD_model_operators(self, tree):
+ operators = {}
+ calls = self.__extract_func_calls(tree, "CORDModelOperator")
if calls:
for call in calls:
operator = self.__make_airflow_operator(call)
@@ -420,20 +430,27 @@
def __extract_all_operators(self, tree):
operators = {}
- event_sensors = self.__extract_XOS_event_sensors(tree)
+ event_sensors = self.__extract_CORD_event_sensors(tree)
if event_sensors:
- for event_sensor in event_sensors:
- operators[event_sensor] = event_sensors[event_sensor]
+ for task_id in event_sensors:
+ operators[task_id] = event_sensors[task_id]
- model_sensors = self.__extract_XOS_model_sensors(tree)
+ model_sensors = self.__extract_CORD_model_sensors(tree)
if model_sensors:
- for model_sensor in model_sensors:
- operators[model_sensor] = model_sensors[model_sensor]
+ for task_id in model_sensors:
+ operators[task_id] = model_sensors[task_id]
+
+ model_operators = self.__extract_CORD_model_operators(tree)
+ if model_operators:
+ for task_id in model_operators:
+ operators[task_id] = model_operators[task_id]
airflow_operators = self.__extract_airflow_operators(tree)
if airflow_operators:
- for airflow_operator in airflow_operators:
- operators[airflow_operator] = airflow_operators[airflow_operator]
+ for task_id in airflow_operators:
+ # add operators that are not already handled above
+ if task_id not in operators:
+ operators[task_id] = airflow_operators[task_id]
return operators
@@ -582,8 +599,6 @@
# for command-line execution
def main(args):
- print_graffiti()
-
# check if config path is set
config_file_path = DEFAULT_CONFIG_FILE_PATH
if args.config:
@@ -601,21 +616,25 @@
log = create_logger(progargs["logging"])
code_filepath = args.input_file
- if os.path.exists(code_filepath):
- raise 'cannot find an input file - %s' % code_filepath
-
- extractor = EssenceExtractor(logger=log)
- extractor.parse_codefile(code_filepath)
- essence = extractor.extract()
+ if not os.path.exists(code_filepath):
+ raise IOError('cannot find an input file - %s' % code_filepath)
output_filepath = './essence.json'
if args.output:
output_filepath = args.output
- json_string = pretty_format_json(essence)
+ print_console = False
if args.stdout or output_filepath == '-':
+ print_console = True
+
+ extractor = EssenceExtractor(logger=log)
+ extractor.parse_codefile(code_filepath)
+ essence = extractor.extract()
+ json_string = pretty_format_json(essence)
+ if print_console:
print(json_string)
else:
+ print_graffiti()
with open(output_filepath, 'w') as f:
f.write(json_string)
diff --git a/src/cord_workflow_airflow_extensions/hook.py b/src/cord_workflow_airflow_extensions/hook.py
new file mode 100644
index 0000000..a2ebb4e
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/hook.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Airflow Hook
+"""
+
+from airflow.hooks.base_hook import BaseHook
+from cord_workflow_controller_client.workflow_run import WorkflowRun
+
+
+class CORDWorkflowControllerException(Exception):
+ """
+ Alias for Exception.
+ """
+
+
+class CORDWorkflowControllerHook(BaseHook):
+ """
+ Hook for accessing CORD Workflow Controller
+ """
+
+ def __init__(
+ self,
+ workflow_id,
+ workflow_run_id,
+ controller_conn_id='cord_controller_default'):
+ super().__init__(source=None)
+ self.workflow_id = workflow_id
+ self.workflow_run_id = workflow_run_id
+ self.controller_conn_id = controller_conn_id
+
+ self.workflow_run_client = None
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, exc_type, exc_val, exc_tb):
+ if self.workflow_run_client is not None:
+ self.close_conn()
+
+ def get_conn(self):
+ """
+ Connect a Workflow Run client.
+ """
+ if self.workflow_run_client is None:
+ # find connection info from database or environment
+ # ENV: AIRFLOW_CONN_CORD_CONTROLLER_DEFAULT
+ connection_params = self.get_connection(self.controller_conn_id)
+ # connection_params have three fields
+ # host
+ # login - we don't use this yet
+ # password - we don't use this yet
+ try:
+ self.workflow_run_client = WorkflowRun(self.workflow_id, self.workflow_run_id)
+ self.workflow_run_client.connect(connection_params.host)
+ except BaseException as ex:
+ raise CORDWorkflowControllerException(ex)
+
+ return self.workflow_run_client
+
+ def close_conn(self):
+ """
+ Close the Workflow Run client
+ """
+ if self.workflow_run_client:
+ try:
+ self.workflow_run_client.disconnect()
+ except BaseException as ex:
+ raise CORDWorkflowControllerException(ex)
+
+ self.workflow_run_client = None
+
+ def update_status(self, task_id, status):
+ """
+ Update status of the workflow run.
+ 'state' should be one of ['begin', 'end']
+ """
+ client = self.get_conn()
+ try:
+ return client.update_status(task_id, status)
+ except BaseException as ex:
+ raise CORDWorkflowControllerException(ex)
+
+ def count_events(self):
+ """
+ Count queued events for the workflow run.
+ """
+ client = self.get_conn()
+ try:
+ return client.count_events()
+ except BaseException as ex:
+ raise CORDWorkflowControllerException(ex)
+
+ def fetch_event(self, task_id, topic):
+ """
+ Fetch an event for the workflow run.
+ """
+ client = self.get_conn()
+ try:
+ return client.fetch_event(task_id, topic)
+ except BaseException as ex:
+ raise CORDWorkflowControllerException(ex)
diff --git a/src/cord_workflow_airflow_extensions/kickstarter.py b/src/cord_workflow_airflow_extensions/kickstarter.py
index 2a0be45..6f9924e 100644
--- a/src/cord_workflow_airflow_extensions/kickstarter.py
+++ b/src/cord_workflow_airflow_extensions/kickstarter.py
@@ -30,9 +30,11 @@
from multistructlog import create_logger
from cord_workflow_controller_client.manager import Manager
-from airflow.api.client.json_client import Client as AirflowClient
-from requests.auth import HTTPBasicAuth
+from importlib import import_module
from urlparse import urlparse
+from airflow import configuration as AirflowConf
+from airflow import api
+from airflow.models import DagRun
log = create_logger()
@@ -41,9 +43,6 @@
progargs = {
'controller_url': 'http://localhost:3030',
- 'airflow_url': 'http://localhost:8080',
- 'airflow_username': '',
- 'airflow_password': '',
'logging': None
}
@@ -62,9 +61,6 @@
parser = argparse.ArgumentParser(description='CORD Workflow Kickstarter Daemon.', prog='kickstarter')
parser.add_argument('--config', help='locate a configuration file')
parser.add_argument('--controller', help='CORD Workflow Controller URL')
- parser.add_argument('--airflow', help='Airflow REST URL')
- parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
- parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
return parser
@@ -127,12 +123,46 @@
log.info('> Kickstarting a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
airflow_client.trigger_dag(dag_id=workflow_id, run_id=workflow_run_id)
+ message = airflow_client.trigger_dag(
+ dag_id=workflow_id,
+ run_id=workflow_run_id
+ )
+ log.info('> Airflow Response: %s' % message)
# let controller know that the new workflow run is created
log.info('> Notifying a workflow (%s), a workflow run (%s)' % (workflow_id, workflow_run_id))
manager.notify_new_workflow_run(workflow_id, workflow_run_id)
+ except Exception as e:
+ log.error('> Error : %s' % e)
+ log.debug(traceback.format_exc())
- log.info('> OK')
+
+def on_check_state(workflow_id, workflow_run_id):
+ if manager and airflow_client:
+ try:
+ log.info('> Checking state of a workflow (%s) => workflow run (%s)' % (workflow_id, workflow_run_id))
+
+ run = DagRun.find(dag_id=workflow_id, run_id=workflow_run_id)
+ state = 'unknown'
+ if run:
+ # run is an array
+ # this should be one of ['success', 'running', 'failed']
+ state = run[0].state
+ else:
+ log.error(
+ 'Cannot retrieve state of a workflow run (%s, %s)' %
+ (workflow_id, workflow_run_id)
+ )
+ state = 'unknown'
+
+ log.info('> state : %s' % state)
+
+ # let controller know the state of the workflow run
+ log.info(
+ '> Notifying update of state of a workflow (%s), a workflow run (%s) - state : %s' %
+ (workflow_id, workflow_run_id, state)
+ )
+ manager.report_workflow_run_state(workflow_id, workflow_run_id, state)
except Exception as e:
log.error('> Error : %s' % e)
log.debug(traceback.format_exc())
@@ -159,18 +189,9 @@
global log
log = create_logger(progargs["logging"])
- if args.airflow:
- progargs['airflow_url'] = args.airflow
-
if args.controller:
progargs['controller_url'] = args.controller
- if args.airflow_user:
- progargs['airflow_user'] = args.airflow_user
-
- if args.airflow_passwd:
- progargs['airflow_passwd'] = args.airflow_passwd
-
print('=CONFIG=')
config_json_string = pretty_format_json(progargs)
print(config_json_string)
@@ -181,13 +202,7 @@
controller_live = check_web_live(progargs['controller_url'])
if not controller_live:
log.error('Controller (%s) appears to be down' % progargs['controller_url'])
- raise 'Controller (%s) appears to be down' % progargs['controller_url']
-
- log.info('Checking if Airflow (%s) is live...' % progargs['airflow_url'])
- airflow_live = check_web_live(progargs['airflow_url'])
- if not airflow_live:
- log.error('Airflow (%s) appears to be down' % progargs['airflow_url'])
- raise 'Airflow (%s) appears to be down' % progargs['airflow_url']
+ raise IOError('Controller (%s) appears to be down' % progargs['controller_url'])
# connect to workflow controller
log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
@@ -198,13 +213,14 @@
# connect to airflow
global airflow_client
- log.info('Connecting to Airflow (%s)...' % progargs['airflow_url'])
- http_auth = None
- if progargs['airflow_user'] and progargs['airflow_passwd']:
- log.info('Using a username %s' % progargs['airflow_user'])
- http_auth = HTTPBasicAuth(progargs['airflow_user'], progargs['airflow_passwd'])
+ log.info('Connecting to Airflow...')
- airflow_client = AirflowClient(progargs['airflow_url'], auth=http_auth)
+ api.load_auth()
+ api_module = import_module(AirflowConf.get('cli', 'api_client'))
+ airflow_client = api_module.Client(
+ api_base_url=AirflowConf.get('cli', 'endpoint_url'),
+ auth=api.api_auth.client_auth
+ )
log.info('Waiting for kickstart events from Workflow Controller...')
try:
diff --git a/src/cord_workflow_airflow_extensions/operators.py b/src/cord_workflow_airflow_extensions/operators.py
new file mode 100644
index 0000000..ab1ddd9
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/operators.py
@@ -0,0 +1,67 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Airflow Operators
+"""
+
+from airflow.operators import PythonOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class CORDModelOperator(PythonOperator):
+ """
+ Calls a python function with model accessor.
+ """
+
+ # SCARLET
+ # http://bootflat.github.io/color-picker.html
+ ui_color = '#cf3a24'
+
+ @apply_defaults
+ def __init__(
+ self,
+ python_callable,
+ cord_event_sensor_task_id=None,
+ op_args=None,
+ op_kwargs=None,
+ provide_context=True,
+ templates_dict=None,
+ templates_exts=None,
+ *args,
+ **kwargs
+ ):
+ super().__init__(
+ python_callable=python_callable,
+ op_args=op_args,
+ op_kwargs=op_kwargs,
+ provide_context=True,
+ templates_dict=templates_dict,
+ templates_exts=templates_exts,
+ *args,
+ **kwargs)
+ self.cord_event_sensor_task_id = cord_event_sensor_task_id
+
+ def execute_callable(self):
+ # TODO
+ model_accessor = None
+
+ message = None
+ if self.cord_event_sensor_task_id:
+ message = self.op_kwargs['ti'].xcom_pull(task_ids=self.cord_event_sensor_task_id)
+
+ new_op_kwargs = dict(self.op_kwargs, model_accessor=model_accessor, message=message)
+ return self.python_callable(*self.op_args, **new_op_kwargs)
diff --git a/src/cord_workflow_airflow_extensions/sensors.py b/src/cord_workflow_airflow_extensions/sensors.py
new file mode 100644
index 0000000..cba72e3
--- /dev/null
+++ b/src/cord_workflow_airflow_extensions/sensors.py
@@ -0,0 +1,94 @@
+#!/usr/bin/env python3
+
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Airflow Sensors
+"""
+
+from .hook import CORDWorkflowControllerHook
+from airflow.sensors.base_sensor_operator import BaseSensorOperator
+from airflow.utils.decorators import apply_defaults
+
+
+class CORDEventSensor(BaseSensorOperator):
+ # STEEL BLUE
+ # http://bootflat.github.io/color-picker.html
+ ui_color = '#4b77be'
+
+ @apply_defaults
+ def __init__(
+ self,
+ topic,
+ key_field,
+ controller_conn_id='cord_controller_default',
+ *args,
+ **kwargs):
+ super().__init__(*args, **kwargs)
+
+ self.topic = topic
+ self.key_field = key_field
+ self.controller_conn_id = controller_conn_id
+ self.message = None
+ self.hook = None
+
+ def __create_hook(self, context):
+ """
+ Return connection hook.
+ """
+ return CORDWorkflowControllerHook(self.dag_id, context['dag_run'].run_id, self.controller_conn_id)
+
+ def execute(self, context):
+ """
+ Overridden to allow messages to be passed to next tasks via XCOM
+ """
+ if self.hook is None:
+ self.hook = self.__create_hook(context)
+
+ self.hook.update_status(self.task_id, 'begin')
+
+ super().execute(context)
+
+ self.hook.update_status(self.task_id, 'end')
+ self.hook.close_conn()
+ self.hook = None
+ return self.message
+
+ def poke(self, context):
+ # we need to use notification to immediately react at event
+ # https://github.com/apache/airflow/blob/master/airflow/sensors/base_sensor_operator.py#L122
+ self.log.info('Poking : trying to fetch a message with a topic %s', self.topic)
+ event = self.hook.fetch_event(self.task_id, self.topic)
+ if event:
+ self.message = event
+ return True
+ return False
+
+
+class CORDModelSensor(CORDEventSensor):
+ # SISKIN SPROUT YELLOW
+ # http://bootflat.github.io/color-picker.html
+ ui_color = '#7a942e'
+
+ @apply_defaults
+ def __init__(
+ self,
+ model_name,
+ key_field,
+ controller_conn_id='cord_controller_default',
+ *args,
+ **kwargs):
+ topic = 'datamodel.%s' % model_name
+ super().__init__(topic=topic, *args, **kwargs)
diff --git a/src/cord_workflow_airflow_extensions/workflow_ctl.py b/src/cord_workflow_airflow_extensions/workflow_ctl.py
index e32215b..563d966 100644
--- a/src/cord_workflow_airflow_extensions/workflow_ctl.py
+++ b/src/cord_workflow_airflow_extensions/workflow_ctl.py
@@ -31,22 +31,27 @@
log = create_logger()
progargs = {
'controller_url': 'http://localhost:3030',
- 'airflow_url': 'http://localhost:8080',
- 'airflow_username': '',
- 'airflow_password': '',
'logging': None
}
DEFAULT_CONFIG_FILE_PATH = '/etc/cord_workflow_airflow_extensions/config.json'
+class InputError(Exception):
+ """Exception raised for errors in the input.
+
+ Attributes:
+ message -- explanation of the error
+ """
+
+ def __init__(self, message):
+ self.message = message
+
+
def get_arg_parser():
parser = argparse.ArgumentParser(description='CORD Workflow Control CLI.', prog='workflow_ctl')
parser.add_argument('--config', help='locate a configuration file')
parser.add_argument('--controller', help='CORD Workflow Controller URL')
- parser.add_argument('--airflow', help='Airflow REST URL')
- parser.add_argument('--airflow_user', help='User Name to access Airflow Web Interface')
- parser.add_argument('--airflow_passwd', help='Password to access Airlfow Web Interface')
parser.add_argument('cmd', help='Command')
parser.add_argument('cmd_args', help='Arguments for the command', nargs='*')
return parser
@@ -70,7 +75,7 @@
def register_workflow(args):
# expect args should be a list of essence files
if not args:
- raise 'no essence file is given'
+ raise InputError('no essence file is given')
log.info('Connecting to Workflow Controller (%s)...' % progargs['controller_url'])
manager = Manager(logger=log)
@@ -121,25 +126,16 @@
global log
log = create_logger(progargs["logging"])
- if args.airflow:
- progargs['airflow_url'] = args.airflow
-
if args.controller:
progargs['controller_url'] = args.controller
- if args.airflow_user:
- progargs['airflow_user'] = args.airflow_user
-
- if args.airflow_passwd:
- progargs['airflow_passwd'] = args.airflow_passwd
-
if args.cmd:
if args.cmd.strip().lower() in ['reg', 'register', 'register_workflow']:
results = register_workflow(args.cmd_args)
print(results)
else:
log.error('unknown command %s' % args.cmd)
- raise 'unknown command %s' % args.cmd
+ raise InputError('unknown command %s' % args.cmd)
if __name__ == "__main__":
diff --git a/test/workflow_examples/att_dag.py b/test/workflow_examples/att_dag.py
index c3bd3ea..a464176 100644
--- a/test/workflow_examples/att_dag.py
+++ b/test/workflow_examples/att_dag.py
@@ -16,19 +16,19 @@
Example AT&T workflow using Airflow
"""
import json
-import logging
-from datetime import datetime
-
+import logging
import airflow
+from datetime import datetime
from airflow import DAG
from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
from att_helpers import *
from att_service_instance_funcs import *
+
args = {
'start_date': datetime.utcnow(),
'owner': 'ATT',
@@ -44,39 +44,67 @@
dag_att.doc_md = __doc__
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
- logging.info("onu.events: received event", event=event)
+ logging.info('onu.events: received event', message=message)
- si = find_or_create_att_si(model_accessor, logging, event)
- if event["status"] == "activated":
- logging.info("onu.events: activated onu", value=event)
+ si = find_or_create_att_si(model_accessor, logging, message)
+ if message['status'] == 'activated':
+ logging.info('onu.events: activated onu', message=message)
si.no_sync = False
- si.uni_port_id = long(event["portNumber"])
- si.of_dpid = event["deviceId"]
- si.oper_onu_status = "ENABLED"
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
si.save_changed_fields(always_update_timestamp=True)
- elif event["status"] == "disabled":
- logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
- si.oper_onu_status = "DISABLED"
+ elif message['status'] == 'disabled':
+ logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+ si.oper_onu_status = 'DISABLED'
si.save_changed_fields(always_update_timestamp=True)
else:
- logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
- raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+ logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
-
+
+ logging.info('authentication.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('authentication.events: Updating service instance', si=si)
+ si.authentication_state = message['authenticationState']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('dhcp.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('dhcp.events: Updating service instance', si=si)
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ event_type = message['event_type']
+
go = False
if event_type == 'create':
- logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
go = True
elif event_type == 'update':
- logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
(si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
go = True
elif event_type == 'delete':
@@ -107,92 +135,103 @@
si.save_changed_fields()
-def Auth_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("authentication.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("authentication.events: Updating service instance", si=si)
- si.authentication_state = event["authenticationState"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("dhcp.events: Updating service instance", si=si)
- si.dhcp_state = event["messageType"]
- si.ip_address = event["ipAddress"]
- si.mac_address = event["macAddress"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
- task_id='onu_event_handler_task',
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
topic='onu.events',
key_field='serialNumber',
- provide_context=True,
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
python_callable=ONU_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
)
-onu_model_event_handler = XOSModelSensor(
- task_id='onu_model_event_handler_task',
- model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-auth_event_handler = XOSEventSensor(
- task_id='auth_event_handler_task',
- topic="authentication.events",
- key_field='serialNumber',
- provide_context=True,
- python_callable=Auth_event,
- poke_interval=5,
- dag=dag_att,
+auth_event_handler = CORDModelOperator(
+ task_id='auth_event_handler',
+ python_callable=AUTH_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
)
-auth_model_event_handler = XOSModelSensor(
- task_id='auth_model_event_handler_task',
- model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-dhcp_event_handler = XOSEventSensor(
- task_id='dhcp_event_handler_task',
- topic="dhcp.events",
- key_field='serialNumber',
- provide_context=True,
+dhcp_event_handler = CORDModelOperator(
+ task_id='dhcp_event_handler',
python_callable=DHCP_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
)
-dhcp_model_event_handler = XOSModelSensor(
- task_id='dhcp_model_event_handler_task',
+att_model_event_sensor1 = CORDModelSensor(
+ task_id='att_model_event_sensor1',
model_name='AttWorkflowDriverServiceInstance',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-onu_event_handler >> onu_model_event_handler >> \
- auth_event_handler >> auth_model_event_handler >> \
- dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+ task_id='att_model_event_handler1',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor1',
+ dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+ task_id='att_model_event_sensor2',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+ task_id='att_model_event_handler2',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor2',
+ dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+ task_id='att_model_event_sensor3',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+ task_id='att_model_event_handler3',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor3',
+ dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
+ auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
+ dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/test/workflow_examples/att_dag.py.expected.json b/test/workflow_examples/att_dag.py.expected.json
index 109b2a9..6d4c367 100644
--- a/test/workflow_examples/att_dag.py.expected.json
+++ b/test/workflow_examples/att_dag.py.expected.json
@@ -5,122 +5,218 @@
"local_variable": "dag_att"
},
"dependencies": {
- "auth_event_handler_task": {
+ "att_model_event_handler1": {
"children": [
- "auth_model_event_handler_task"
+ "auth_event_sensor"
],
"parents": [
- "onu_model_event_handler_task"
+ "att_model_event_sensor1"
]
},
- "auth_model_event_handler_task": {
+ "att_model_event_handler2": {
"children": [
- "dhcp_event_handler_task"
+ "dhcp_event_sensor"
],
"parents": [
- "auth_event_handler_task"
+ "att_model_event_sensor2"
]
},
- "dhcp_event_handler_task": {
+ "att_model_event_handler3": {
+ "parents": [
+ "att_model_event_sensor3"
+ ]
+ },
+ "att_model_event_sensor1": {
"children": [
- "dhcp_model_event_handler_task"
+ "att_model_event_handler1"
],
"parents": [
- "auth_model_event_handler_task"
+ "onu_event_handler"
]
},
- "dhcp_model_event_handler_task": {
- "parents": [
- "dhcp_event_handler_task"
- ]
- },
- "onu_event_handler_task": {
+ "att_model_event_sensor2": {
"children": [
- "onu_model_event_handler_task"
- ]
- },
- "onu_model_event_handler_task": {
- "children": [
- "auth_event_handler_task"
+ "att_model_event_handler2"
],
"parents": [
- "onu_event_handler_task"
+ "auth_event_handler"
+ ]
+ },
+ "att_model_event_sensor3": {
+ "children": [
+ "att_model_event_handler3"
+ ],
+ "parents": [
+ "dhcp_event_handler"
+ ]
+ },
+ "auth_event_handler": {
+ "children": [
+ "att_model_event_sensor2"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "att_model_event_handler1"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "att_model_event_sensor3"
+ ],
+ "parents": [
+ "dhcp_event_sensor"
+ ]
+ },
+ "dhcp_event_sensor": {
+ "children": [
+ "dhcp_event_handler"
+ ],
+ "parents": [
+ "att_model_event_handler2"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "att_model_event_sensor1"
+ ],
+ "parents": [
+ "onu_event_sensor"
+ ]
+ },
+ "onu_event_sensor": {
+ "children": [
+ "onu_event_handler"
]
}
},
"tasks": {
- "auth_event_handler_task": {
- "class": "XOSEventSensor",
+ "att_model_event_handler1": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor1",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler1",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler1"
+ },
+ "att_model_event_handler2": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor2",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler2",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler2"
+ },
+ "att_model_event_handler3": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor3",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler3",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler3"
+ },
+ "att_model_event_sensor1": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "auth_event_handler",
+ "local_variable": "att_model_event_sensor1",
+ "model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "Auth_event",
- "task_id": "auth_event_handler_task",
+ "task_id": "att_model_event_sensor1"
+ },
+ "att_model_event_sensor2": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor2",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor2"
+ },
+ "att_model_event_sensor3": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor3",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor3"
+ },
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "auth_event_handler",
+ "python_callable": "AUTH_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
"topic": "authentication.events"
},
- "auth_model_event_handler_task": {
- "class": "XOSModelSensor",
+ "dhcp_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "auth_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "auth_model_event_handler_task"
- },
- "dhcp_event_handler_task": {
- "class": "XOSEventSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
"local_variable": "dhcp_event_handler",
- "poke_interval": 5,
- "provide_context": true,
"python_callable": "DHCP_event",
- "task_id": "dhcp_event_handler_task",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_sensor",
+ "poke_interval": 5,
+ "task_id": "dhcp_event_sensor",
"topic": "dhcp.events"
},
- "dhcp_model_event_handler_task": {
- "class": "XOSModelSensor",
+ "onu_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "dhcp_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "dhcp_model_event_handler_task"
- },
- "onu_event_handler_task": {
- "class": "XOSEventSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
"local_variable": "onu_event_handler",
- "poke_interval": 5,
- "provide_context": true,
"python_callable": "ONU_event",
- "task_id": "onu_event_handler_task",
- "topic": "onu.events"
+ "task_id": "onu_event_handler"
},
- "onu_model_event_handler_task": {
- "class": "XOSModelSensor",
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "onu_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
+ "local_variable": "onu_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "onu_model_event_handler_task"
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
}
}
}
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/left_right_mix_dag.py b/test/workflow_examples/left_right_mix_dag.py
index e734d0d..32363b5 100644
--- a/test/workflow_examples/left_right_mix_dag.py
+++ b/test/workflow_examples/left_right_mix_dag.py
@@ -16,19 +16,19 @@
Example AT&T workflow using Airflow
"""
import json
-import logging
-from datetime import datetime
-
+import logging
import airflow
+from datetime import datetime
from airflow import DAG
from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
from att_helpers import *
from att_service_instance_funcs import *
+
args = {
'start_date': datetime.utcnow(),
'owner': 'ATT',
@@ -44,39 +44,67 @@
dag_att.doc_md = __doc__
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
- logging.info("onu.events: received event", event=event)
+ logging.info('onu.events: received event', message=message)
- si = find_or_create_att_si(model_accessor, logging, event)
- if event["status"] == "activated":
- logging.info("onu.events: activated onu", value=event)
+ si = find_or_create_att_si(model_accessor, logging, message)
+ if message['status'] == 'activated':
+ logging.info('onu.events: activated onu', message=message)
si.no_sync = False
- si.uni_port_id = long(event["portNumber"])
- si.of_dpid = event["deviceId"]
- si.oper_onu_status = "ENABLED"
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
si.save_changed_fields(always_update_timestamp=True)
- elif event["status"] == "disabled":
- logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
- si.oper_onu_status = "DISABLED"
+ elif message['status'] == 'disabled':
+ logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+ si.oper_onu_status = 'DISABLED'
si.save_changed_fields(always_update_timestamp=True)
else:
- logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
- raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+ logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
-
+
+ logging.info('authentication.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('authentication.events: Updating service instance', si=si)
+ si.authentication_state = message['authenticationState']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('dhcp.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('dhcp.events: Updating service instance', si=si)
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ event_type = message['event_type']
+
go = False
if event_type == 'create':
- logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
go = True
elif event_type == 'update':
- logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
(si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
go = True
elif event_type == 'delete':
@@ -107,94 +135,107 @@
si.save_changed_fields()
-def Auth_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("authentication.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("authentication.events: Updating service instance", si=si)
- si.authentication_state = event["authenticationState"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("dhcp.events: Updating service instance", si=si)
- si.dhcp_state = event["messageType"]
- si.ip_address = event["ipAddress"]
- si.mac_address = event["macAddress"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
- task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
topic='onu.events',
key_field='serialNumber',
- provide_context=True,
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
python_callable=ONU_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
)
-onu_model_event_handler = XOSModelSensor(
- task_id='onu_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
task_id='auth_event_handler',
- topic="authentication.events",
- key_field='serialNumber',
- provide_context=True,
- python_callable=Auth_event,
- poke_interval=5,
- dag=dag_att,
+ python_callable=AUTH_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
)
-auth_model_event_handler = XOSModelSensor(
- task_id='auth_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
task_id='dhcp_event_handler',
- topic="dhcp.events",
- key_field='serialNumber',
- provide_context=True,
python_callable=DHCP_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
)
-dhcp_model_event_handler = XOSModelSensor(
- task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+ task_id='att_model_event_sensor1',
model_name='AttWorkflowDriverServiceInstance',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-onu_event_handler >> onu_model_event_handler
-auth_event_handler << onu_model_event_handler
-auth_event_handler >> auth_model_event_handler
-dhcp_event_handler << auth_model_event_handler
-dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+ task_id='att_model_event_handler1',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor1',
+ dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+ task_id='att_model_event_sensor2',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+ task_id='att_model_event_handler2',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor2',
+ dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+ task_id='att_model_event_sensor3',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+ task_id='att_model_event_handler3',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor3',
+ dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1
+att_model_event_handler1 >> auth_event_sensor
+
+att_model_event_handler2 << att_model_event_sensor2 << auth_event_handler << auth_event_sensor
+
+att_model_event_handler2 >> dhcp_event_sensor
+dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/test/workflow_examples/left_right_mix_dag.py.expected.json b/test/workflow_examples/left_right_mix_dag.py.expected.json
index 05607ba..6d4c367 100644
--- a/test/workflow_examples/left_right_mix_dag.py.expected.json
+++ b/test/workflow_examples/left_right_mix_dag.py.expected.json
@@ -5,122 +5,218 @@
"local_variable": "dag_att"
},
"dependencies": {
- "auth_event_handler": {
+ "att_model_event_handler1": {
"children": [
- "auth_model_event_handler"
+ "auth_event_sensor"
],
"parents": [
- "onu_model_event_handler"
+ "att_model_event_sensor1"
]
},
- "auth_model_event_handler": {
+ "att_model_event_handler2": {
"children": [
- "dhcp_event_handler"
+ "dhcp_event_sensor"
+ ],
+ "parents": [
+ "att_model_event_sensor2"
+ ]
+ },
+ "att_model_event_handler3": {
+ "parents": [
+ "att_model_event_sensor3"
+ ]
+ },
+ "att_model_event_sensor1": {
+ "children": [
+ "att_model_event_handler1"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ },
+ "att_model_event_sensor2": {
+ "children": [
+ "att_model_event_handler2"
],
"parents": [
"auth_event_handler"
]
},
+ "att_model_event_sensor3": {
+ "children": [
+ "att_model_event_handler3"
+ ],
+ "parents": [
+ "dhcp_event_handler"
+ ]
+ },
+ "auth_event_handler": {
+ "children": [
+ "att_model_event_sensor2"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ],
+ "parents": [
+ "att_model_event_handler1"
+ ]
+ },
"dhcp_event_handler": {
"children": [
- "dhcp_model_event_handler"
+ "att_model_event_sensor3"
],
"parents": [
- "auth_model_event_handler"
+ "dhcp_event_sensor"
]
},
- "dhcp_model_event_handler": {
- "parents": [
+ "dhcp_event_sensor": {
+ "children": [
"dhcp_event_handler"
+ ],
+ "parents": [
+ "att_model_event_handler2"
]
},
"onu_event_handler": {
"children": [
- "onu_model_event_handler"
- ]
- },
- "onu_model_event_handler": {
- "children": [
- "auth_event_handler"
+ "att_model_event_sensor1"
],
"parents": [
+ "onu_event_sensor"
+ ]
+ },
+ "onu_event_sensor": {
+ "children": [
"onu_event_handler"
]
}
},
"tasks": {
- "auth_event_handler": {
- "class": "XOSEventSensor",
+ "att_model_event_handler1": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor1",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler1",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler1"
+ },
+ "att_model_event_handler2": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor2",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler2",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler2"
+ },
+ "att_model_event_handler3": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor3",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler3",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler3"
+ },
+ "att_model_event_sensor1": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "auth_event_handler",
+ "local_variable": "att_model_event_sensor1",
+ "model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "Auth_event",
- "task_id": "auth_event_handler",
+ "task_id": "att_model_event_sensor1"
+ },
+ "att_model_event_sensor2": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor2",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor2"
+ },
+ "att_model_event_sensor3": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor3",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor3"
+ },
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "auth_event_handler",
+ "python_callable": "AUTH_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
"topic": "authentication.events"
},
- "auth_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "auth_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "auth_model_event_handler"
- },
"dhcp_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dhcp_event_handler",
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "dhcp_event_handler",
+ "local_variable": "dhcp_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DHCP_event",
- "task_id": "dhcp_event_handler",
+ "task_id": "dhcp_event_sensor",
"topic": "dhcp.events"
},
- "dhcp_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "dhcp_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "dhcp_model_event_handler"
- },
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
"local_variable": "onu_event_handler",
- "poke_interval": 5,
- "provide_context": true,
"python_callable": "ONU_event",
- "task_id": "onu_event_handler",
- "topic": "onu.events"
+ "task_id": "onu_event_handler"
},
- "onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "onu_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
+ "local_variable": "onu_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "onu_model_event_handler"
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
}
}
}
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/left_right_mix_dag2.py b/test/workflow_examples/left_right_mix_dag2.py
index 6fa1df1..697edc3 100644
--- a/test/workflow_examples/left_right_mix_dag2.py
+++ b/test/workflow_examples/left_right_mix_dag2.py
@@ -16,19 +16,19 @@
Example AT&T workflow using Airflow
"""
import json
-import logging
-from datetime import datetime
-
+import logging
import airflow
+from datetime import datetime
from airflow import DAG
from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
from att_helpers import *
from att_service_instance_funcs import *
+
args = {
'start_date': datetime.utcnow(),
'owner': 'ATT',
@@ -44,39 +44,67 @@
dag_att.doc_md = __doc__
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
- logging.info("onu.events: received event", event=event)
+ logging.info('onu.events: received event', message=message)
- si = find_or_create_att_si(model_accessor, logging, event)
- if event["status"] == "activated":
- logging.info("onu.events: activated onu", value=event)
+ si = find_or_create_att_si(model_accessor, logging, message)
+ if message['status'] == 'activated':
+ logging.info('onu.events: activated onu', message=message)
si.no_sync = False
- si.uni_port_id = long(event["portNumber"])
- si.of_dpid = event["deviceId"]
- si.oper_onu_status = "ENABLED"
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
si.save_changed_fields(always_update_timestamp=True)
- elif event["status"] == "disabled":
- logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
- si.oper_onu_status = "DISABLED"
+ elif message['status'] == 'disabled':
+ logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+ si.oper_onu_status = 'DISABLED'
si.save_changed_fields(always_update_timestamp=True)
else:
- logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
- raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+ logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
-
+
+ logging.info('authentication.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('authentication.events: Updating service instance', si=si)
+ si.authentication_state = message['authenticationState']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('dhcp.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('dhcp.events: Updating service instance', si=si)
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ event_type = message['event_type']
+
go = False
if event_type == 'create':
- logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
go = True
elif event_type == 'update':
- logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
(si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
go = True
elif event_type == 'delete':
@@ -107,90 +135,71 @@
si.save_changed_fields()
-def Auth_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("authentication.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("authentication.events: Updating service instance", si=si)
- si.authentication_state = event["authenticationState"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("dhcp.events: Updating service instance", si=si)
- si.dhcp_state = event["messageType"]
- si.ip_address = event["ipAddress"]
- si.mac_address = event["macAddress"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
- task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
topic='onu.events',
key_field='serialNumber',
- provide_context=True,
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
python_callable=ONU_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
)
-onu_model_event_handler = XOSModelSensor(
- task_id='onu_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
task_id='auth_event_handler',
- topic="authentication.events",
- key_field='serialNumber',
- provide_context=True,
- python_callable=Auth_event,
- poke_interval=5,
- dag=dag_att,
+ python_callable=AUTH_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
)
-auth_model_event_handler = XOSModelSensor(
- task_id='auth_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
task_id='dhcp_event_handler',
- topic="dhcp.events",
- key_field='serialNumber',
- provide_context=True,
python_callable=DHCP_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
)
-dhcp_model_event_handler = XOSModelSensor(
- task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+ task_id='att_model_event_sensor1',
model_name='AttWorkflowDriverServiceInstance',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-onu_event_handler >> onu_model_event_handler << auth_event_handler
+att_model_event_handler1 = CORDModelOperator(
+ task_id='att_model_event_handler1',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor1',
+ dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 << auth_event_handler << auth_event_sensor
+dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor1
+att_model_event_sensor1 >> att_model_event_handler1
diff --git a/test/workflow_examples/left_right_mix_dag2.py.expected.json b/test/workflow_examples/left_right_mix_dag2.py.expected.json
index 7d9430b..ce05e7e 100644
--- a/test/workflow_examples/left_right_mix_dag2.py.expected.json
+++ b/test/workflow_examples/left_right_mix_dag2.py.expected.json
@@ -5,96 +5,142 @@
"local_variable": "dag_att"
},
"dependencies": {
+ "att_model_event_handler1": {
+ "parents": [
+ "att_model_event_sensor1"
+ ]
+ },
+ "att_model_event_sensor1": {
+ "children": [
+ "att_model_event_handler1"
+ ],
+ "parents": [
+ "onu_event_handler",
+ "dhcp_event_handler",
+ "auth_event_handler"
+ ]
+ },
"auth_event_handler": {
"children": [
- "onu_model_event_handler"
+ "att_model_event_sensor1"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "att_model_event_sensor1"
+ ],
+ "parents": [
+ "dhcp_event_sensor"
+ ]
+ },
+ "dhcp_event_sensor": {
+ "children": [
+ "dhcp_event_handler"
]
},
"onu_event_handler": {
"children": [
- "onu_model_event_handler"
+ "att_model_event_sensor1"
+ ],
+ "parents": [
+ "onu_event_sensor"
]
},
- "onu_model_event_handler": {
- "parents": [
- "onu_event_handler",
- "auth_event_handler"
+ "onu_event_sensor": {
+ "children": [
+ "onu_event_handler"
]
}
},
"tasks": {
- "auth_event_handler": {
- "class": "XOSEventSensor",
+ "att_model_event_handler1": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor1",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler1",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler1"
+ },
+ "att_model_event_sensor1": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "auth_event_handler",
+ "local_variable": "att_model_event_sensor1",
+ "model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "Auth_event",
- "task_id": "auth_event_handler",
+ "task_id": "att_model_event_sensor1"
+ },
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "auth_event_handler",
+ "python_callable": "AUTH_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
"topic": "authentication.events"
},
- "auth_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "auth_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "auth_model_event_handler"
- },
"dhcp_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dhcp_event_handler",
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "dhcp_event_handler",
+ "local_variable": "dhcp_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DHCP_event",
- "task_id": "dhcp_event_handler",
+ "task_id": "dhcp_event_sensor",
"topic": "dhcp.events"
},
- "dhcp_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "dhcp_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "dhcp_model_event_handler"
- },
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
"local_variable": "onu_event_handler",
- "poke_interval": 5,
- "provide_context": true,
"python_callable": "ONU_event",
- "task_id": "onu_event_handler",
- "topic": "onu.events"
+ "task_id": "onu_event_handler"
},
- "onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "onu_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
+ "local_variable": "onu_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "onu_model_event_handler"
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
}
}
}
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/multi_children_parents_dag.py b/test/workflow_examples/multi_children_parents_dag.py
index 8e59e22..88256b6 100644
--- a/test/workflow_examples/multi_children_parents_dag.py
+++ b/test/workflow_examples/multi_children_parents_dag.py
@@ -16,19 +16,19 @@
Example AT&T workflow using Airflow
"""
import json
-import logging
-from datetime import datetime
-
+import logging
import airflow
+from datetime import datetime
from airflow import DAG
from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
from att_helpers import *
from att_service_instance_funcs import *
+
args = {
'start_date': datetime.utcnow(),
'owner': 'ATT',
@@ -44,39 +44,67 @@
dag_att.doc_md = __doc__
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
- logging.info("onu.events: received event", event=event)
+ logging.info('onu.events: received event', message=message)
- si = find_or_create_att_si(model_accessor, logging, event)
- if event["status"] == "activated":
- logging.info("onu.events: activated onu", value=event)
+ si = find_or_create_att_si(model_accessor, logging, message)
+ if message['status'] == 'activated':
+ logging.info('onu.events: activated onu', message=message)
si.no_sync = False
- si.uni_port_id = long(event["portNumber"])
- si.of_dpid = event["deviceId"]
- si.oper_onu_status = "ENABLED"
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
si.save_changed_fields(always_update_timestamp=True)
- elif event["status"] == "disabled":
- logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
- si.oper_onu_status = "DISABLED"
+ elif message['status'] == 'disabled':
+ logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+ si.oper_onu_status = 'DISABLED'
si.save_changed_fields(always_update_timestamp=True)
else:
- logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
- raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+ logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
-
+
+ logging.info('authentication.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('authentication.events: Updating service instance', si=si)
+ si.authentication_state = message['authenticationState']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('dhcp.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('dhcp.events: Updating service instance', si=si)
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ event_type = message['event_type']
+
go = False
if event_type == 'create':
- logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
go = True
elif event_type == 'update':
- logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
(si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
go = True
elif event_type == 'delete':
@@ -107,91 +135,73 @@
si.save_changed_fields()
-def Auth_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("authentication.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("authentication.events: Updating service instance", si=si)
- si.authentication_state = event["authenticationState"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("dhcp.events: Updating service instance", si=si)
- si.dhcp_state = event["messageType"]
- si.ip_address = event["ipAddress"]
- si.mac_address = event["macAddress"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
- task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
topic='onu.events',
key_field='serialNumber',
- provide_context=True,
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
python_callable=ONU_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
)
-onu_model_event_handler = XOSModelSensor(
- task_id='onu_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
task_id='auth_event_handler',
- topic="authentication.events",
- key_field='serialNumber',
- provide_context=True,
- python_callable=Auth_event,
- poke_interval=5,
- dag=dag_att,
+ python_callable=AUTH_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
)
-auth_model_event_handler = XOSModelSensor(
- task_id='auth_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
task_id='dhcp_event_handler',
- topic="dhcp.events",
- key_field='serialNumber',
- provide_context=True,
python_callable=DHCP_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
)
-dhcp_model_event_handler = XOSModelSensor(
- task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+ task_id='att_model_event_sensor1',
model_name='AttWorkflowDriverServiceInstance',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-onu_event_handler >> [onu_model_event_handler, auth_model_event_handler, dhcp_model_event_handler] >> \
- auth_event_handler >> dhcp_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+ task_id='att_model_event_handler1',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor1',
+ dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler
+auth_event_sensor >> auth_event_handler
+dhcp_event_sensor >> dhcp_event_handler
+
+[onu_event_handler, auth_event_handler, dhcp_event_handler] >> att_model_event_sensor1 >> att_model_event_handler1
diff --git a/test/workflow_examples/multi_children_parents_dag.py.expected.json b/test/workflow_examples/multi_children_parents_dag.py.expected.json
index e0dc284..130b8f8 100644
--- a/test/workflow_examples/multi_children_parents_dag.py.expected.json
+++ b/test/workflow_examples/multi_children_parents_dag.py.expected.json
@@ -5,126 +5,142 @@
"local_variable": "dag_att"
},
"dependencies": {
- "auth_event_handler": {
- "children": [
- "dhcp_event_handler"
- ],
+ "att_model_event_handler1": {
"parents": [
- "onu_model_event_handler",
- "auth_model_event_handler",
- "dhcp_model_event_handler"
+ "att_model_event_sensor1"
]
},
- "auth_model_event_handler": {
+ "att_model_event_sensor1": {
"children": [
- "auth_event_handler"
+ "att_model_event_handler1"
],
"parents": [
- "onu_event_handler"
+ "onu_event_handler",
+ "auth_event_handler",
+ "dhcp_event_handler"
+ ]
+ },
+ "auth_event_handler": {
+ "children": [
+ "att_model_event_sensor1"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
]
},
"dhcp_event_handler": {
- "parents": [
- "auth_event_handler"
- ]
- },
- "dhcp_model_event_handler": {
"children": [
- "auth_event_handler"
+ "att_model_event_sensor1"
],
"parents": [
- "onu_event_handler"
+ "dhcp_event_sensor"
+ ]
+ },
+ "dhcp_event_sensor": {
+ "children": [
+ "dhcp_event_handler"
]
},
"onu_event_handler": {
"children": [
- "onu_model_event_handler",
- "auth_model_event_handler",
- "dhcp_model_event_handler"
- ]
- },
- "onu_model_event_handler": {
- "children": [
- "auth_event_handler"
+ "att_model_event_sensor1"
],
"parents": [
+ "onu_event_sensor"
+ ]
+ },
+ "onu_event_sensor": {
+ "children": [
"onu_event_handler"
]
}
},
"tasks": {
- "auth_event_handler": {
- "class": "XOSEventSensor",
+ "att_model_event_handler1": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor1",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler1",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler1"
+ },
+ "att_model_event_sensor1": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "auth_event_handler",
+ "local_variable": "att_model_event_sensor1",
+ "model_name": "AttWorkflowDriverServiceInstance",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "Auth_event",
- "task_id": "auth_event_handler",
+ "task_id": "att_model_event_sensor1"
+ },
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "auth_event_handler",
+ "python_callable": "AUTH_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
"topic": "authentication.events"
},
- "auth_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "auth_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "auth_model_event_handler"
- },
"dhcp_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dhcp_event_handler",
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "dhcp_event_handler",
+ "local_variable": "dhcp_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DHCP_event",
- "task_id": "dhcp_event_handler",
+ "task_id": "dhcp_event_sensor",
"topic": "dhcp.events"
},
- "dhcp_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att",
- "dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
- "local_variable": "dhcp_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "dhcp_model_event_handler"
- },
"onu_event_handler": {
- "class": "XOSEventSensor",
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
- "key_field": "serialNumber",
"local_variable": "onu_event_handler",
- "poke_interval": 5,
- "provide_context": true,
"python_callable": "ONU_event",
- "task_id": "onu_event_handler",
- "topic": "onu.events"
+ "task_id": "onu_event_handler"
},
- "onu_model_event_handler": {
- "class": "XOSModelSensor",
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
"dag": "dag_att",
"dag_id": "att_workflow_onu",
"key_field": "serialNumber",
- "local_variable": "onu_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
+ "local_variable": "onu_event_sensor",
"poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "onu_model_event_handler"
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
}
}
}
-}
+}
\ No newline at end of file
diff --git a/test/workflow_examples/multi_dags.py b/test/workflow_examples/multi_dags.py
new file mode 100644
index 0000000..9cda441
--- /dev/null
+++ b/test/workflow_examples/multi_dags.py
@@ -0,0 +1,237 @@
+# Copyright 2019-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Example AT&T workflow using Airflow
+"""
+import json
+import logging
+import airflow
+from datetime import datetime
+from airflow import DAG
+from airflow import AirflowException
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
+
+from att_helpers import *
+from att_service_instance_funcs import *
+
+
+args = {
+ 'start_date': datetime.utcnow(),
+ 'owner': 'ATT',
+}
+
+dag_att = DAG(
+ dag_id='att_workflow_onu',
+ default_args=args,
+ # this dag will be triggered by external systems
+ schedule_interval=None,
+)
+
+dag_att.doc_md = __doc__
+
+
+def ONU_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('onu.events: received event', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ if message['status'] == 'activated':
+ logging.info('onu.events: activated onu', message=message)
+ si.no_sync = False
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
+ si.save_changed_fields(always_update_timestamp=True)
+ elif message['status'] == 'disabled':
+ logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+ si.oper_onu_status = 'DISABLED'
+ si.save_changed_fields(always_update_timestamp=True)
+ else:
+ logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
+
+
+def AUTH_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('authentication.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('authentication.events: Updating service instance', si=si)
+ si.authentication_state = message['authenticationState']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('dhcp.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('dhcp.events: Updating service instance', si=si)
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ event_type = message['event_type']
+
+ go = False
+ if event_type == 'create':
+ logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
+ go = True
+ elif event_type == 'update':
+ logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
+ (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
+ go = True
+ elif event_type == 'delete':
+ pass
+ else:
+ pass
+
+ if not go:
+ return
+
+ # handle only create & update events
+
+ # Changing ONU state can change auth state
+ # Changing auth state can change DHCP state
+ # So need to process in this order
+ process_onu_state(model_accessor, si)
+ process_auth_state(si)
+ process_dhcp_state(si)
+
+ validate_states(si)
+
+ # handling the subscriber status
+ # It's a combination of all the other states
+ subscriber = get_subscriber(model_accessor, si.serial_number)
+ if subscriber:
+ update_subscriber(model_accessor, subscriber, si)
+
+ si.save_changed_fields()
+
+
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
+ topic='onu.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
+ python_callable=ONU_event,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
+)
+
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+auth_event_handler = CORDModelOperator(
+ task_id='auth_event_handler',
+ python_callable=AUTH_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
+)
+
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+dhcp_event_handler = CORDModelOperator(
+ task_id='dhcp_event_handler',
+ python_callable=DHCP_event,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
+)
+
+att_model_event_sensor1 = CORDModelSensor(
+ task_id='att_model_event_sensor1',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler1 = CORDModelOperator(
+ task_id='att_model_event_handler1',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor1',
+ dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+ task_id='att_model_event_sensor2',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+ task_id='att_model_event_handler2',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor2',
+ dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+ task_id='att_model_event_sensor3',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+ task_id='att_model_event_handler3',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor3',
+ dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1
+auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2
+dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3
diff --git a/test/workflow_examples/multi_dags.py.expected.json b/test/workflow_examples/multi_dags.py.expected.json
new file mode 100644
index 0000000..89f2e4c
--- /dev/null
+++ b/test/workflow_examples/multi_dags.py.expected.json
@@ -0,0 +1,210 @@
+{
+ "att_workflow_onu": {
+ "dag": {
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dag_att"
+ },
+ "dependencies": {
+ "att_model_event_handler1": {
+ "parents": [
+ "att_model_event_sensor1"
+ ]
+ },
+ "att_model_event_handler2": {
+ "parents": [
+ "att_model_event_sensor2"
+ ]
+ },
+ "att_model_event_handler3": {
+ "parents": [
+ "att_model_event_sensor3"
+ ]
+ },
+ "att_model_event_sensor1": {
+ "children": [
+ "att_model_event_handler1"
+ ],
+ "parents": [
+ "onu_event_handler"
+ ]
+ },
+ "att_model_event_sensor2": {
+ "children": [
+ "att_model_event_handler2"
+ ],
+ "parents": [
+ "auth_event_handler"
+ ]
+ },
+ "att_model_event_sensor3": {
+ "children": [
+ "att_model_event_handler3"
+ ],
+ "parents": [
+ "dhcp_event_handler"
+ ]
+ },
+ "auth_event_handler": {
+ "children": [
+ "att_model_event_sensor2"
+ ],
+ "parents": [
+ "auth_event_sensor"
+ ]
+ },
+ "auth_event_sensor": {
+ "children": [
+ "auth_event_handler"
+ ]
+ },
+ "dhcp_event_handler": {
+ "children": [
+ "att_model_event_sensor3"
+ ],
+ "parents": [
+ "dhcp_event_sensor"
+ ]
+ },
+ "dhcp_event_sensor": {
+ "children": [
+ "dhcp_event_handler"
+ ]
+ },
+ "onu_event_handler": {
+ "children": [
+ "att_model_event_sensor1"
+ ],
+ "parents": [
+ "onu_event_sensor"
+ ]
+ },
+ "onu_event_sensor": {
+ "children": [
+ "onu_event_handler"
+ ]
+ }
+ },
+ "tasks": {
+ "att_model_event_handler1": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor1",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler1",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler1"
+ },
+ "att_model_event_handler2": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor2",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler2",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler2"
+ },
+ "att_model_event_handler3": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "att_model_event_sensor3",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "att_model_event_handler3",
+ "python_callable": "DriverService_event",
+ "task_id": "att_model_event_handler3"
+ },
+ "att_model_event_sensor1": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor1",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor1"
+ },
+ "att_model_event_sensor2": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor2",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor2"
+ },
+ "att_model_event_sensor3": {
+ "class": "CORDModelSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "att_model_event_sensor3",
+ "model_name": "AttWorkflowDriverServiceInstance",
+ "poke_interval": 5,
+ "task_id": "att_model_event_sensor3"
+ },
+ "auth_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "auth_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "auth_event_handler",
+ "python_callable": "AUTH_event",
+ "task_id": "auth_event_handler"
+ },
+ "auth_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "auth_event_sensor",
+ "poke_interval": 5,
+ "task_id": "auth_event_sensor",
+ "topic": "authentication.events"
+ },
+ "dhcp_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "dhcp_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "dhcp_event_handler",
+ "python_callable": "DHCP_event",
+ "task_id": "dhcp_event_handler"
+ },
+ "dhcp_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "dhcp_event_sensor",
+ "poke_interval": 5,
+ "task_id": "dhcp_event_sensor",
+ "topic": "dhcp.events"
+ },
+ "onu_event_handler": {
+ "class": "CORDModelOperator",
+ "cord_event_sensor_task_id": "onu_event_sensor",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "local_variable": "onu_event_handler",
+ "python_callable": "ONU_event",
+ "task_id": "onu_event_handler"
+ },
+ "onu_event_sensor": {
+ "class": "CORDEventSensor",
+ "controller_conn_id": "local_cord_controller",
+ "dag": "dag_att",
+ "dag_id": "att_workflow_onu",
+ "key_field": "serialNumber",
+ "local_variable": "onu_event_sensor",
+ "poke_interval": 5,
+ "task_id": "onu_event_sensor",
+ "topic": "onu.events"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/test/workflow_examples/two_dags.py b/test/workflow_examples/two_dags.py
deleted file mode 100644
index 1906881..0000000
--- a/test/workflow_examples/two_dags.py
+++ /dev/null
@@ -1,184 +0,0 @@
-# Copyright 2019-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Example AT&T workflow using Airflow
-"""
-import json
-import logging
-from datetime import datetime
-
-import airflow
-from airflow import DAG
-from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
-
-from att_helpers import *
-from att_service_instance_funcs import *
-
-args = {
- 'start_date': datetime.utcnow(),
- 'owner': 'ATT',
-}
-
-dag_att1 = DAG(
- dag_id='att_workflow_onu1',
- default_args=args,
- # this dag will be triggered by external systems
- schedule_interval=None,
-)
-
-dag_att2 = DAG(
- dag_id='att_workflow_onu2',
- default_args=args,
- # this dag will be triggered by external systems
- schedule_interval=None,
-)
-
-dag_att1.doc_md = __doc__
-dag_att2.doc_md = __doc__
-
-def ONU_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("onu.events: received event", event=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- if event["status"] == "activated":
- logging.info("onu.events: activated onu", value=event)
- si.no_sync = False
- si.uni_port_id = long(event["portNumber"])
- si.of_dpid = event["deviceId"]
- si.oper_onu_status = "ENABLED"
- si.save_changed_fields(always_update_timestamp=True)
- elif event["status"] == "disabled":
- logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
- si.oper_onu_status = "DISABLED"
- si.save_changed_fields(always_update_timestamp=True)
- else:
- logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
- raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
-
-
-def DriverService_event(event_type, model_accessor, si, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- go = False
- if event_type == 'create':
- logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
- go = True
- elif event_type == 'update':
- logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
- (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
- go = True
- elif event_type == 'delete':
- pass
- else:
- pass
-
- if not go:
- return
-
- # handle only create & update events
-
- # Changing ONU state can change auth state
- # Changing auth state can change DHCP state
- # So need to process in this order
- process_onu_state(model_accessor, si)
- process_auth_state(si)
- process_dhcp_state(si)
-
- validate_states(si)
-
- # handling the subscriber status
- # It's a combination of all the other states
- subscriber = get_subscriber(model_accessor, si.serial_number)
- if subscriber:
- update_subscriber(model_accessor, subscriber, si)
-
- si.save_changed_fields()
-
-
-def Auth_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("authentication.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("authentication.events: Updating service instance", si=si)
- si.authentication_state = event["authenticationState"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("dhcp.events: Updating service instance", si=si)
- si.dhcp_state = event["messageType"]
- si.ip_address = event["ipAddress"]
- si.mac_address = event["macAddress"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
- task_id='onu_event_handler',
- topic='onu.events',
- key_field='serialNumber',
- provide_context=True,
- python_callable=ONU_event,
- poke_interval=5,
- dag=dag_att1,
-)
-
-onu_model_event_handler = XOSModelSensor(
- task_id='onu_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
- key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
- poke_interval=5,
- dag=dag_att1,
-)
-
-auth_event_handler = XOSEventSensor(
- task_id='auth_event_handler',
- topic="authentication.events",
- key_field='serialNumber',
- provide_context=True,
- python_callable=Auth_event,
- poke_interval=5,
- dag=dag_att2,
-)
-
-auth_model_event_handler = XOSModelSensor(
- task_id='auth_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
- key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
- poke_interval=5,
- dag=dag_att2,
-)
-
-onu_event_handler >> onu_model_event_handler
-auth_event_handler >> auth_model_event_handler
diff --git a/test/workflow_examples/two_dags.py.expected.json b/test/workflow_examples/two_dags.py.expected.json
deleted file mode 100644
index dac7c12..0000000
--- a/test/workflow_examples/two_dags.py.expected.json
+++ /dev/null
@@ -1,90 +0,0 @@
-{
- "att_workflow_onu1": {
- "dag": {
- "dag_id": "att_workflow_onu1",
- "local_variable": "dag_att1"
- },
- "dependencies": {
- "onu_event_handler": {
- "children": [
- "onu_model_event_handler"
- ]
- },
- "onu_model_event_handler": {
- "parents": [
- "onu_event_handler"
- ]
- }
- },
- "tasks": {
- "onu_event_handler": {
- "class": "XOSEventSensor",
- "dag": "dag_att1",
- "dag_id": "att_workflow_onu1",
- "key_field": "serialNumber",
- "local_variable": "onu_event_handler",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "ONU_event",
- "task_id": "onu_event_handler",
- "topic": "onu.events"
- },
- "onu_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att1",
- "dag_id": "att_workflow_onu1",
- "key_field": "serialNumber",
- "local_variable": "onu_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "onu_model_event_handler"
- }
- }
- },
- "att_workflow_onu2": {
- "dag": {
- "dag_id": "att_workflow_onu2",
- "local_variable": "dag_att2"
- },
- "dependencies": {
- "auth_event_handler": {
- "children": [
- "auth_model_event_handler"
- ]
- },
- "auth_model_event_handler": {
- "parents": [
- "auth_event_handler"
- ]
- }
- },
- "tasks": {
- "auth_event_handler": {
- "class": "XOSEventSensor",
- "dag": "dag_att2",
- "dag_id": "att_workflow_onu2",
- "key_field": "serialNumber",
- "local_variable": "auth_event_handler",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "Auth_event",
- "task_id": "auth_event_handler",
- "topic": "authentication.events"
- },
- "auth_model_event_handler": {
- "class": "XOSModelSensor",
- "dag": "dag_att2",
- "dag_id": "att_workflow_onu2",
- "key_field": "serialNumber",
- "local_variable": "auth_model_event_handler",
- "model_name": "AttWorkflowDriverServiceInstance",
- "poke_interval": 5,
- "provide_context": true,
- "python_callable": "DriverService_event",
- "task_id": "auth_model_event_handler"
- }
- }
- }
-}
diff --git a/workflow_examples/att-workflow/att_dag.py b/workflow_examples/att-workflow/att_dag.py
index 7df5d03..a464176 100644
--- a/workflow_examples/att-workflow/att_dag.py
+++ b/workflow_examples/att-workflow/att_dag.py
@@ -16,19 +16,19 @@
Example AT&T workflow using Airflow
"""
import json
-import logging
-from datetime import datetime
-
+import logging
import airflow
+from datetime import datetime
from airflow import DAG
from airflow import AirflowException
-
-import xossynchronizer.airflow.sensors.XOSModelSensor
-import xossynchronizer.airflow.sensors.XOSEventSensor
+from airflow.operators import PythonOperator
+from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
+from cord_workflow_airflow_extensions.operators import CORDModelOperator
from att_helpers import *
from att_service_instance_funcs import *
+
args = {
'start_date': datetime.utcnow(),
'owner': 'ATT',
@@ -44,39 +44,67 @@
dag_att.doc_md = __doc__
-def ONU_event(model_accessor, event, **kwargs):
+def ONU_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
- logging.info("onu.events: received event", event=event)
+ logging.info('onu.events: received event', message=message)
- si = find_or_create_att_si(model_accessor, logging, event)
- if event["status"] == "activated":
- logging.info("onu.events: activated onu", value=event)
+ si = find_or_create_att_si(model_accessor, logging, message)
+ if message['status'] == 'activated':
+ logging.info('onu.events: activated onu', message=message)
si.no_sync = False
- si.uni_port_id = long(event["portNumber"])
- si.of_dpid = event["deviceId"]
- si.oper_onu_status = "ENABLED"
+ si.uni_port_id = long(message['portNumber'])
+ si.of_dpid = message['deviceId']
+ si.oper_onu_status = 'ENABLED'
si.save_changed_fields(always_update_timestamp=True)
- elif event["status"] == "disabled":
- logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
- si.oper_onu_status = "DISABLED"
+ elif message['status'] == 'disabled':
+ logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
+ si.oper_onu_status = 'DISABLED'
si.save_changed_fields(always_update_timestamp=True)
else:
- logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
- raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
+ logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
+ raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
-def DriverService_event(event_type, model_accessor, si, **kwargs):
+def AUTH_event(model_accessor, message, **kwargs):
#context = kwargs
#run_id = context['dag_run'].run_id
-
+
+ logging.info('authentication.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('authentication.events: Updating service instance', si=si)
+ si.authentication_state = message['authenticationState']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DHCP_event(model_accessor, message, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ logging.info('dhcp.events: Got event for subscriber', message=message)
+
+ si = find_or_create_att_si(model_accessor, logging, message)
+ logging.debug('dhcp.events: Updating service instance', si=si)
+ si.dhcp_state = message['messageType']
+ si.ip_address = message['ipAddress']
+ si.mac_address = message['macAddress']
+ si.save_changed_fields(always_update_timestamp=True)
+
+
+def DriverService_event(model_accessor, message, si, **kwargs):
+ #context = kwargs
+ #run_id = context['dag_run'].run_id
+
+ event_type = message['event_type']
+
go = False
if event_type == 'create':
- logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
+ logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
go = True
elif event_type == 'update':
- logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
+ logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
(si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
go = True
elif event_type == 'delete':
@@ -107,92 +135,103 @@
si.save_changed_fields()
-def Auth_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("authentication.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("authentication.events: Updating service instance", si=si)
- si.authentication_state = event["authenticationState"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-def DHCP_event(model_accessor, event, **kwargs):
- #context = kwargs
- #run_id = context['dag_run'].run_id
-
- logging.info("dhcp.events: Got event for subscriber", event_value=event)
-
- si = find_or_create_att_si(model_accessor, logging, event)
- logging.debug("dhcp.events: Updating service instance", si=si)
- si.dhcp_state = event["messageType"]
- si.ip_address = event["ipAddress"]
- si.mac_address = event["macAddress"]
- si.save_changed_fields(always_update_timestamp=True)
-
-
-onu_event_handler = XOSEventSensor(
- task_id='onu_event_handler',
+onu_event_sensor = CORDEventSensor(
+ task_id='onu_event_sensor',
topic='onu.events',
key_field='serialNumber',
- provide_context=True,
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+onu_event_handler = CORDModelOperator(
+ task_id='onu_event_handler',
python_callable=ONU_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='onu_event_sensor',
+ dag=dag_att
)
-onu_model_event_handler = XOSModelSensor(
- task_id='onu_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+auth_event_sensor = CORDEventSensor(
+ task_id='auth_event_sensor',
+ topic='authentication.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-auth_event_handler = XOSEventSensor(
+auth_event_handler = CORDModelOperator(
task_id='auth_event_handler',
- topic="authentication.events",
- key_field='serialNumber',
- provide_context=True,
- python_callable=Auth_event,
- poke_interval=5,
- dag=dag_att,
+ python_callable=AUTH_event,
+ cord_event_sensor_task_id='auth_event_sensor',
+ dag=dag_att
)
-auth_model_event_handler = XOSModelSensor(
- task_id='auth_model_event_handler',
- model_name='AttWorkflowDriverServiceInstance',
+dhcp_event_sensor = CORDEventSensor(
+ task_id='dhcp_event_sensor',
+ topic='dhcp.events',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-dhcp_event_handler = XOSEventSensor(
+dhcp_event_handler = CORDModelOperator(
task_id='dhcp_event_handler',
- topic="dhcp.events",
- key_field='serialNumber',
- provide_context=True,
python_callable=DHCP_event,
- poke_interval=5,
- dag=dag_att,
+ cord_event_sensor_task_id='dhcp_event_sensor',
+ dag=dag_att
)
-dhcp_model_event_handler = XOSModelSensor(
- task_id='dhcp_model_event_handler',
+att_model_event_sensor1 = CORDModelSensor(
+ task_id='att_model_event_sensor1',
model_name='AttWorkflowDriverServiceInstance',
key_field='serialNumber',
- provide_context=True,
- python_callable=DriverService_event,
+ controller_conn_id='local_cord_controller',
poke_interval=5,
- dag=dag_att,
+ dag=dag_att
)
-onu_event_handler >> onu_model_event_handler >> \
- auth_event_handler >> auth_model_event_handler >> \
- dhcp_event_handler >> dhcp_model_event_handler
\ No newline at end of file
+att_model_event_handler1 = CORDModelOperator(
+ task_id='att_model_event_handler1',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor1',
+ dag=dag_att
+)
+
+att_model_event_sensor2 = CORDModelSensor(
+ task_id='att_model_event_sensor2',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler2 = CORDModelOperator(
+ task_id='att_model_event_handler2',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor2',
+ dag=dag_att
+)
+
+att_model_event_sensor3 = CORDModelSensor(
+ task_id='att_model_event_sensor3',
+ model_name='AttWorkflowDriverServiceInstance',
+ key_field='serialNumber',
+ controller_conn_id='local_cord_controller',
+ poke_interval=5,
+ dag=dag_att
+)
+
+att_model_event_handler3 = CORDModelOperator(
+ task_id='att_model_event_handler3',
+ python_callable=DriverService_event,
+ cord_event_sensor_task_id='att_model_event_sensor3',
+ dag=dag_att
+)
+
+
+onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
+ auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
+ dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3