blob: a464176804fbd6c5278d55222cfd1be61a4d987d [file] [log] [blame]
Illyoung Choi5d59ab62019-06-24 16:15:27 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Example AT&T workflow using Airflow
17"""
18import json
Illyoung Choi2e971512019-07-18 14:15:19 -070019import logging
Illyoung Choi5d59ab62019-06-24 16:15:27 -070020import airflow
Illyoung Choi2e971512019-07-18 14:15:19 -070021from datetime import datetime
Illyoung Choi5d59ab62019-06-24 16:15:27 -070022from airflow import DAG
23from airflow import AirflowException
Illyoung Choi2e971512019-07-18 14:15:19 -070024from airflow.operators import PythonOperator
25from cord_workflow_airflow_extensions.sensors import CORDEventSensor, CORDModelSensor
26from cord_workflow_airflow_extensions.operators import CORDModelOperator
Illyoung Choi5d59ab62019-06-24 16:15:27 -070027
28from att_helpers import *
29from att_service_instance_funcs import *
30
Illyoung Choi2e971512019-07-18 14:15:19 -070031
Illyoung Choi5d59ab62019-06-24 16:15:27 -070032args = {
33 'start_date': datetime.utcnow(),
34 'owner': 'ATT',
35}
36
37dag_att = DAG(
38 dag_id='att_workflow_onu',
39 default_args=args,
40 # this dag will be triggered by external systems
41 schedule_interval=None,
42)
43
44dag_att.doc_md = __doc__
45
46
Illyoung Choi2e971512019-07-18 14:15:19 -070047def ONU_event(model_accessor, message, **kwargs):
Illyoung Choi5d59ab62019-06-24 16:15:27 -070048 #context = kwargs
49 #run_id = context['dag_run'].run_id
50
Illyoung Choi2e971512019-07-18 14:15:19 -070051 logging.info('onu.events: received event', message=message)
Illyoung Choi5d59ab62019-06-24 16:15:27 -070052
Illyoung Choi2e971512019-07-18 14:15:19 -070053 si = find_or_create_att_si(model_accessor, logging, message)
54 if message['status'] == 'activated':
55 logging.info('onu.events: activated onu', message=message)
Illyoung Choi5d59ab62019-06-24 16:15:27 -070056 si.no_sync = False
Illyoung Choi2e971512019-07-18 14:15:19 -070057 si.uni_port_id = long(message['portNumber'])
58 si.of_dpid = message['deviceId']
59 si.oper_onu_status = 'ENABLED'
Illyoung Choi5d59ab62019-06-24 16:15:27 -070060 si.save_changed_fields(always_update_timestamp=True)
Illyoung Choi2e971512019-07-18 14:15:19 -070061 elif message['status'] == 'disabled':
62 logging.info('onu.events: disabled onu, resetting the subscriber', value=message)
63 si.oper_onu_status = 'DISABLED'
Illyoung Choi5d59ab62019-06-24 16:15:27 -070064 si.save_changed_fields(always_update_timestamp=True)
65 else:
Illyoung Choi2e971512019-07-18 14:15:19 -070066 logging.warn('onu.events: Unknown status value: %s' % message['status'], value=message)
67 raise AirflowException('onu.events: Unknown status value: %s' % message['status'], value=message)
Illyoung Choi5d59ab62019-06-24 16:15:27 -070068
69
Illyoung Choi2e971512019-07-18 14:15:19 -070070def AUTH_event(model_accessor, message, **kwargs):
Illyoung Choi5d59ab62019-06-24 16:15:27 -070071 #context = kwargs
72 #run_id = context['dag_run'].run_id
Illyoung Choi2e971512019-07-18 14:15:19 -070073
74 logging.info('authentication.events: Got event for subscriber', message=message)
75
76 si = find_or_create_att_si(model_accessor, logging, message)
77 logging.debug('authentication.events: Updating service instance', si=si)
78 si.authentication_state = message['authenticationState']
79 si.save_changed_fields(always_update_timestamp=True)
80
81
82def DHCP_event(model_accessor, message, **kwargs):
83 #context = kwargs
84 #run_id = context['dag_run'].run_id
85
86 logging.info('dhcp.events: Got event for subscriber', message=message)
87
88 si = find_or_create_att_si(model_accessor, logging, message)
89 logging.debug('dhcp.events: Updating service instance', si=si)
90 si.dhcp_state = message['messageType']
91 si.ip_address = message['ipAddress']
92 si.mac_address = message['macAddress']
93 si.save_changed_fields(always_update_timestamp=True)
94
95
96def DriverService_event(model_accessor, message, si, **kwargs):
97 #context = kwargs
98 #run_id = context['dag_run'].run_id
99
100 event_type = message['event_type']
101
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700102 go = False
103 if event_type == 'create':
Illyoung Choi2e971512019-07-18 14:15:19 -0700104 logging.debug('MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s ' % si.id)
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700105 go = True
106 elif event_type == 'update':
Illyoung Choi2e971512019-07-18 14:15:19 -0700107 logging.debug('MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s ' %
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700108 (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
109 go = True
110 elif event_type == 'delete':
111 pass
112 else:
113 pass
114
115 if not go:
116 return
117
118 # handle only create & update events
119
120 # Changing ONU state can change auth state
121 # Changing auth state can change DHCP state
122 # So need to process in this order
123 process_onu_state(model_accessor, si)
124 process_auth_state(si)
125 process_dhcp_state(si)
126
127 validate_states(si)
128
129 # handling the subscriber status
130 # It's a combination of all the other states
131 subscriber = get_subscriber(model_accessor, si.serial_number)
132 if subscriber:
133 update_subscriber(model_accessor, subscriber, si)
134
135 si.save_changed_fields()
136
137
Illyoung Choi2e971512019-07-18 14:15:19 -0700138onu_event_sensor = CORDEventSensor(
139 task_id='onu_event_sensor',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700140 topic='onu.events',
141 key_field='serialNumber',
Illyoung Choi2e971512019-07-18 14:15:19 -0700142 controller_conn_id='local_cord_controller',
143 poke_interval=5,
144 dag=dag_att
145)
146
147onu_event_handler = CORDModelOperator(
148 task_id='onu_event_handler',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700149 python_callable=ONU_event,
Illyoung Choi2e971512019-07-18 14:15:19 -0700150 cord_event_sensor_task_id='onu_event_sensor',
151 dag=dag_att
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700152)
153
Illyoung Choi2e971512019-07-18 14:15:19 -0700154auth_event_sensor = CORDEventSensor(
155 task_id='auth_event_sensor',
156 topic='authentication.events',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700157 key_field='serialNumber',
Illyoung Choi2e971512019-07-18 14:15:19 -0700158 controller_conn_id='local_cord_controller',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700159 poke_interval=5,
Illyoung Choi2e971512019-07-18 14:15:19 -0700160 dag=dag_att
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700161)
162
Illyoung Choi2e971512019-07-18 14:15:19 -0700163auth_event_handler = CORDModelOperator(
164 task_id='auth_event_handler',
165 python_callable=AUTH_event,
166 cord_event_sensor_task_id='auth_event_sensor',
167 dag=dag_att
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700168)
169
Illyoung Choi2e971512019-07-18 14:15:19 -0700170dhcp_event_sensor = CORDEventSensor(
171 task_id='dhcp_event_sensor',
172 topic='dhcp.events',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700173 key_field='serialNumber',
Illyoung Choi2e971512019-07-18 14:15:19 -0700174 controller_conn_id='local_cord_controller',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700175 poke_interval=5,
Illyoung Choi2e971512019-07-18 14:15:19 -0700176 dag=dag_att
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700177)
178
Illyoung Choi2e971512019-07-18 14:15:19 -0700179dhcp_event_handler = CORDModelOperator(
180 task_id='dhcp_event_handler',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700181 python_callable=DHCP_event,
Illyoung Choi2e971512019-07-18 14:15:19 -0700182 cord_event_sensor_task_id='dhcp_event_sensor',
183 dag=dag_att
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700184)
185
Illyoung Choi2e971512019-07-18 14:15:19 -0700186att_model_event_sensor1 = CORDModelSensor(
187 task_id='att_model_event_sensor1',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700188 model_name='AttWorkflowDriverServiceInstance',
189 key_field='serialNumber',
Illyoung Choi2e971512019-07-18 14:15:19 -0700190 controller_conn_id='local_cord_controller',
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700191 poke_interval=5,
Illyoung Choi2e971512019-07-18 14:15:19 -0700192 dag=dag_att
Illyoung Choi5d59ab62019-06-24 16:15:27 -0700193)
194
Illyoung Choi2e971512019-07-18 14:15:19 -0700195att_model_event_handler1 = CORDModelOperator(
196 task_id='att_model_event_handler1',
197 python_callable=DriverService_event,
198 cord_event_sensor_task_id='att_model_event_sensor1',
199 dag=dag_att
200)
201
202att_model_event_sensor2 = CORDModelSensor(
203 task_id='att_model_event_sensor2',
204 model_name='AttWorkflowDriverServiceInstance',
205 key_field='serialNumber',
206 controller_conn_id='local_cord_controller',
207 poke_interval=5,
208 dag=dag_att
209)
210
211att_model_event_handler2 = CORDModelOperator(
212 task_id='att_model_event_handler2',
213 python_callable=DriverService_event,
214 cord_event_sensor_task_id='att_model_event_sensor2',
215 dag=dag_att
216)
217
218att_model_event_sensor3 = CORDModelSensor(
219 task_id='att_model_event_sensor3',
220 model_name='AttWorkflowDriverServiceInstance',
221 key_field='serialNumber',
222 controller_conn_id='local_cord_controller',
223 poke_interval=5,
224 dag=dag_att
225)
226
227att_model_event_handler3 = CORDModelOperator(
228 task_id='att_model_event_handler3',
229 python_callable=DriverService_event,
230 cord_event_sensor_task_id='att_model_event_sensor3',
231 dag=dag_att
232)
233
234
235onu_event_sensor >> onu_event_handler >> att_model_event_sensor1 >> att_model_event_handler1 >> \
236 auth_event_sensor >> auth_event_handler >> att_model_event_sensor2 >> att_model_event_handler2 >> \
237 dhcp_event_sensor >> dhcp_event_handler >> att_model_event_sensor3 >> att_model_event_handler3