blob: 190688179614b5bc96af8743007ebae1903a70d6 [file] [log] [blame]
Illyoung Choi5d59ab62019-06-24 16:15:27 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Example AT&T workflow using Airflow
17"""
18import json
19import logging
20from datetime import datetime
21
22import airflow
23from airflow import DAG
24from airflow import AirflowException
25
26import xossynchronizer.airflow.sensors.XOSModelSensor
27import xossynchronizer.airflow.sensors.XOSEventSensor
28
29from att_helpers import *
30from att_service_instance_funcs import *
31
32args = {
33 'start_date': datetime.utcnow(),
34 'owner': 'ATT',
35}
36
37dag_att1 = DAG(
38 dag_id='att_workflow_onu1',
39 default_args=args,
40 # this dag will be triggered by external systems
41 schedule_interval=None,
42)
43
44dag_att2 = DAG(
45 dag_id='att_workflow_onu2',
46 default_args=args,
47 # this dag will be triggered by external systems
48 schedule_interval=None,
49)
50
51dag_att1.doc_md = __doc__
52dag_att2.doc_md = __doc__
53
54def ONU_event(model_accessor, event, **kwargs):
55 #context = kwargs
56 #run_id = context['dag_run'].run_id
57
58 logging.info("onu.events: received event", event=event)
59
60 si = find_or_create_att_si(model_accessor, logging, event)
61 if event["status"] == "activated":
62 logging.info("onu.events: activated onu", value=event)
63 si.no_sync = False
64 si.uni_port_id = long(event["portNumber"])
65 si.of_dpid = event["deviceId"]
66 si.oper_onu_status = "ENABLED"
67 si.save_changed_fields(always_update_timestamp=True)
68 elif event["status"] == "disabled":
69 logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
70 si.oper_onu_status = "DISABLED"
71 si.save_changed_fields(always_update_timestamp=True)
72 else:
73 logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
74 raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
75
76
77def DriverService_event(event_type, model_accessor, si, **kwargs):
78 #context = kwargs
79 #run_id = context['dag_run'].run_id
80
81 go = False
82 if event_type == 'create':
83 logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
84 go = True
85 elif event_type == 'update':
86 logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
87 (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
88 go = True
89 elif event_type == 'delete':
90 pass
91 else:
92 pass
93
94 if not go:
95 return
96
97 # handle only create & update events
98
99 # Changing ONU state can change auth state
100 # Changing auth state can change DHCP state
101 # So need to process in this order
102 process_onu_state(model_accessor, si)
103 process_auth_state(si)
104 process_dhcp_state(si)
105
106 validate_states(si)
107
108 # handling the subscriber status
109 # It's a combination of all the other states
110 subscriber = get_subscriber(model_accessor, si.serial_number)
111 if subscriber:
112 update_subscriber(model_accessor, subscriber, si)
113
114 si.save_changed_fields()
115
116
117def Auth_event(model_accessor, event, **kwargs):
118 #context = kwargs
119 #run_id = context['dag_run'].run_id
120
121 logging.info("authentication.events: Got event for subscriber", event_value=event)
122
123 si = find_or_create_att_si(model_accessor, logging, event)
124 logging.debug("authentication.events: Updating service instance", si=si)
125 si.authentication_state = event["authenticationState"]
126 si.save_changed_fields(always_update_timestamp=True)
127
128
129def DHCP_event(model_accessor, event, **kwargs):
130 #context = kwargs
131 #run_id = context['dag_run'].run_id
132
133 logging.info("dhcp.events: Got event for subscriber", event_value=event)
134
135 si = find_or_create_att_si(model_accessor, logging, event)
136 logging.debug("dhcp.events: Updating service instance", si=si)
137 si.dhcp_state = event["messageType"]
138 si.ip_address = event["ipAddress"]
139 si.mac_address = event["macAddress"]
140 si.save_changed_fields(always_update_timestamp=True)
141
142
143onu_event_handler = XOSEventSensor(
144 task_id='onu_event_handler',
145 topic='onu.events',
146 key_field='serialNumber',
147 provide_context=True,
148 python_callable=ONU_event,
149 poke_interval=5,
150 dag=dag_att1,
151)
152
153onu_model_event_handler = XOSModelSensor(
154 task_id='onu_model_event_handler',
155 model_name='AttWorkflowDriverServiceInstance',
156 key_field='serialNumber',
157 provide_context=True,
158 python_callable=DriverService_event,
159 poke_interval=5,
160 dag=dag_att1,
161)
162
163auth_event_handler = XOSEventSensor(
164 task_id='auth_event_handler',
165 topic="authentication.events",
166 key_field='serialNumber',
167 provide_context=True,
168 python_callable=Auth_event,
169 poke_interval=5,
170 dag=dag_att2,
171)
172
173auth_model_event_handler = XOSModelSensor(
174 task_id='auth_model_event_handler',
175 model_name='AttWorkflowDriverServiceInstance',
176 key_field='serialNumber',
177 provide_context=True,
178 python_callable=DriverService_event,
179 poke_interval=5,
180 dag=dag_att2,
181)
182
183onu_event_handler >> onu_model_event_handler
184auth_event_handler >> auth_model_event_handler