blob: 6b42ed9f7abc67fe6e571a4358ff6819ae152cf0 [file] [log] [blame]
Illyoung Choi18e656a2019-07-30 11:27:36 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
Illyoung Choi4fed65e2019-07-31 13:06:24 -070016Example parallel workflow
Illyoung Choi18e656a2019-07-30 11:27:36 -070017"""
Illyoung Choi4fed65e2019-07-31 13:06:24 -070018import json
Illyoung Choi18e656a2019-07-30 11:27:36 -070019import logging
20from datetime import datetime
21from airflow import DAG
Illyoung Choi4fed65e2019-07-31 13:06:24 -070022from airflow import AirflowException
23from airflow.operators.python_operator import PythonOperator
Illyoung Choi18e656a2019-07-30 11:27:36 -070024from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
25from airflow.operators.cord_workflow_plugin import CORDModelOperator
26
27
28log = logging.getLogger(__name__)
Illyoung Choi18e656a2019-07-30 11:27:36 -070029args = {
30 # hard coded date
31 'start_date': datetime(2019, 1, 1),
32 'owner': 'iychoi'
33}
34
Illyoung Choi4fed65e2019-07-31 13:06:24 -070035dag_parallel_cord = DAG(
36 dag_id='parallel_cord_workflow',
Illyoung Choi18e656a2019-07-30 11:27:36 -070037 default_args=args,
38 # this dag will be triggered by external systems
39 schedule_interval=None
40)
Illyoung Choi4fed65e2019-07-31 13:06:24 -070041dag_parallel_cord.doc_md = __doc__
42
43dag_parallel_cord_admin = DAG(
44 dag_id='parallel_cord_workflow_admin',
45 default_args=args,
46 # this dag will be triggered by external systems
47 schedule_interval=None
48)
49dag_parallel_cord_admin.doc_md = __doc__
Illyoung Choi18e656a2019-07-30 11:27:36 -070050
51
52def on_onu_event(model_accessor, message, **kwargs):
53 log.info('onu.events: received an event - %s' % message)
54
55
56def on_auth_event(model_accessor, message, **kwargs):
57 log.info('authentication.events: received an event - %s' % message)
58
59
60def on_dhcp_event(model_accessor, message, **kwargs):
61 log.info('dhcp.events: received an event - %s' % message)
62
63
64def on_model_event(model_accessor, message, **kwargs):
65 log.info('model event: received an event - %s' % message)
66
67
68onu_event_sensor = CORDEventSensor(
69 task_id='onu_event_sensor',
70 topic='onu.events',
71 key_field='serialNumber',
72 controller_conn_id='local_cord_controller',
73 poke_interval=5,
Illyoung Choi4fed65e2019-07-31 13:06:24 -070074 dag=dag_parallel_cord
Illyoung Choi18e656a2019-07-30 11:27:36 -070075)
76
77onu_event_handler = CORDModelOperator(
78 task_id='onu_event_handler',
79 python_callable=on_onu_event,
80 cord_event_sensor_task_id='onu_event_sensor',
Illyoung Choi4fed65e2019-07-31 13:06:24 -070081 dag=dag_parallel_cord
Illyoung Choi18e656a2019-07-30 11:27:36 -070082)
83
84auth_event_sensor = CORDEventSensor(
85 task_id='auth_event_sensor',
86 topic='authentication.events',
87 key_field='serialNumber',
88 controller_conn_id='local_cord_controller',
89 poke_interval=5,
Illyoung Choi4fed65e2019-07-31 13:06:24 -070090 dag=dag_parallel_cord
Illyoung Choi18e656a2019-07-30 11:27:36 -070091)
92
93auth_event_handler = CORDModelOperator(
94 task_id='auth_event_handler',
95 python_callable=on_auth_event,
96 cord_event_sensor_task_id='auth_event_sensor',
Illyoung Choi4fed65e2019-07-31 13:06:24 -070097 dag=dag_parallel_cord
Illyoung Choi18e656a2019-07-30 11:27:36 -070098)
99
100dhcp_event_sensor = CORDEventSensor(
101 task_id='dhcp_event_sensor',
102 topic='dhcp.events',
103 key_field='serialNumber',
104 controller_conn_id='local_cord_controller',
105 poke_interval=5,
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700106 dag=dag_parallel_cord
Illyoung Choi18e656a2019-07-30 11:27:36 -0700107)
108
109dhcp_event_handler = CORDModelOperator(
110 task_id='dhcp_event_handler',
111 python_callable=on_dhcp_event,
112 cord_event_sensor_task_id='dhcp_event_sensor',
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700113 dag=dag_parallel_cord
Illyoung Choi18e656a2019-07-30 11:27:36 -0700114)
115
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700116att_model_event_sensor = CORDModelSensor(
117 task_id='att_model_event_sensor',
118 model_name='AttWorkflowDriverServiceInstance',
Illyoung Choi18e656a2019-07-30 11:27:36 -0700119 key_field='serialNumber',
120 controller_conn_id='local_cord_controller',
121 poke_interval=5,
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700122 dag=dag_parallel_cord_admin
Illyoung Choi18e656a2019-07-30 11:27:36 -0700123)
124
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700125att_model_event_handler = CORDModelOperator(
126 task_id='att_model_event_handler',
Illyoung Choi18e656a2019-07-30 11:27:36 -0700127 python_callable=on_model_event,
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700128 cord_event_sensor_task_id='att_model_event_sensor',
129 dag=dag_parallel_cord_admin
Illyoung Choi18e656a2019-07-30 11:27:36 -0700130)
131
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700132# handle standard flow
133onu_event_sensor >> onu_event_handler >> auth_event_sensor >> auth_event_handler >> dhcp_event_sensor >> dhcp_event_handler
134# handle admin flow
135att_model_event_sensor >> att_model_event_handler
Illyoung Choi18e656a2019-07-30 11:27:36 -0700136