Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 1 | # Copyright 2019-present Open Networking Foundation |
| 2 | # |
| 3 | # Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | # you may not use this file except in compliance with the License. |
| 5 | # You may obtain a copy of the License at |
| 6 | # |
| 7 | # http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | # |
| 9 | # Unless required by applicable law or agreed to in writing, software |
| 10 | # distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | # See the License for the specific language governing permissions and |
| 13 | # limitations under the License. |
| 14 | |
| 15 | """ |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 16 | Example parallel workflow |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 17 | """ |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 18 | import json |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 19 | import logging |
| 20 | from datetime import datetime |
| 21 | from airflow import DAG |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 22 | from airflow import AirflowException |
| 23 | from airflow.operators.python_operator import PythonOperator |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 24 | from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor |
| 25 | from airflow.operators.cord_workflow_plugin import CORDModelOperator |
| 26 | |
| 27 | |
| 28 | log = logging.getLogger(__name__) |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 29 | args = { |
| 30 | # hard coded date |
| 31 | 'start_date': datetime(2019, 1, 1), |
| 32 | 'owner': 'iychoi' |
| 33 | } |
| 34 | |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 35 | dag_parallel_cord = DAG( |
| 36 | dag_id='parallel_cord_workflow', |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 37 | default_args=args, |
| 38 | # this dag will be triggered by external systems |
| 39 | schedule_interval=None |
| 40 | ) |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 41 | dag_parallel_cord.doc_md = __doc__ |
| 42 | |
| 43 | dag_parallel_cord_admin = DAG( |
| 44 | dag_id='parallel_cord_workflow_admin', |
| 45 | default_args=args, |
| 46 | # this dag will be triggered by external systems |
| 47 | schedule_interval=None |
| 48 | ) |
| 49 | dag_parallel_cord_admin.doc_md = __doc__ |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 50 | |
| 51 | |
| 52 | def on_onu_event(model_accessor, message, **kwargs): |
| 53 | log.info('onu.events: received an event - %s' % message) |
| 54 | |
| 55 | |
| 56 | def on_auth_event(model_accessor, message, **kwargs): |
| 57 | log.info('authentication.events: received an event - %s' % message) |
| 58 | |
| 59 | |
| 60 | def on_dhcp_event(model_accessor, message, **kwargs): |
| 61 | log.info('dhcp.events: received an event - %s' % message) |
| 62 | |
| 63 | |
| 64 | def on_model_event(model_accessor, message, **kwargs): |
| 65 | log.info('model event: received an event - %s' % message) |
| 66 | |
| 67 | |
| 68 | onu_event_sensor = CORDEventSensor( |
| 69 | task_id='onu_event_sensor', |
| 70 | topic='onu.events', |
| 71 | key_field='serialNumber', |
| 72 | controller_conn_id='local_cord_controller', |
| 73 | poke_interval=5, |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 74 | dag=dag_parallel_cord |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 75 | ) |
| 76 | |
| 77 | onu_event_handler = CORDModelOperator( |
| 78 | task_id='onu_event_handler', |
| 79 | python_callable=on_onu_event, |
| 80 | cord_event_sensor_task_id='onu_event_sensor', |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 81 | dag=dag_parallel_cord |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 82 | ) |
| 83 | |
| 84 | auth_event_sensor = CORDEventSensor( |
| 85 | task_id='auth_event_sensor', |
| 86 | topic='authentication.events', |
| 87 | key_field='serialNumber', |
| 88 | controller_conn_id='local_cord_controller', |
| 89 | poke_interval=5, |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 90 | dag=dag_parallel_cord |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 91 | ) |
| 92 | |
| 93 | auth_event_handler = CORDModelOperator( |
| 94 | task_id='auth_event_handler', |
| 95 | python_callable=on_auth_event, |
| 96 | cord_event_sensor_task_id='auth_event_sensor', |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 97 | dag=dag_parallel_cord |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 98 | ) |
| 99 | |
| 100 | dhcp_event_sensor = CORDEventSensor( |
| 101 | task_id='dhcp_event_sensor', |
| 102 | topic='dhcp.events', |
| 103 | key_field='serialNumber', |
| 104 | controller_conn_id='local_cord_controller', |
| 105 | poke_interval=5, |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 106 | dag=dag_parallel_cord |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 107 | ) |
| 108 | |
| 109 | dhcp_event_handler = CORDModelOperator( |
| 110 | task_id='dhcp_event_handler', |
| 111 | python_callable=on_dhcp_event, |
| 112 | cord_event_sensor_task_id='dhcp_event_sensor', |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 113 | dag=dag_parallel_cord |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 114 | ) |
| 115 | |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 116 | att_model_event_sensor = CORDModelSensor( |
| 117 | task_id='att_model_event_sensor', |
| 118 | model_name='AttWorkflowDriverServiceInstance', |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 119 | key_field='serialNumber', |
| 120 | controller_conn_id='local_cord_controller', |
| 121 | poke_interval=5, |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 122 | dag=dag_parallel_cord_admin |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 123 | ) |
| 124 | |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 125 | att_model_event_handler = CORDModelOperator( |
| 126 | task_id='att_model_event_handler', |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 127 | python_callable=on_model_event, |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 128 | cord_event_sensor_task_id='att_model_event_sensor', |
| 129 | dag=dag_parallel_cord_admin |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 130 | ) |
| 131 | |
Illyoung Choi | 4fed65e | 2019-07-31 13:06:24 -0700 | [diff] [blame^] | 132 | # handle standard flow |
| 133 | onu_event_sensor >> onu_event_handler >> auth_event_sensor >> auth_event_handler >> dhcp_event_sensor >> dhcp_event_handler |
| 134 | # handle admin flow |
| 135 | att_model_event_sensor >> att_model_event_handler |
Illyoung Choi | 18e656a | 2019-07-30 11:27:36 -0700 | [diff] [blame] | 136 | |