blob: e734d0db3355c53f17a594023d72da07eb041dd2 [file] [log] [blame]
Illyoung Choi5d59ab62019-06-24 16:15:27 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Example AT&T workflow using Airflow
17"""
18import json
19import logging
20from datetime import datetime
21
22import airflow
23from airflow import DAG
24from airflow import AirflowException
25
26import xossynchronizer.airflow.sensors.XOSModelSensor
27import xossynchronizer.airflow.sensors.XOSEventSensor
28
29from att_helpers import *
30from att_service_instance_funcs import *
31
32args = {
33 'start_date': datetime.utcnow(),
34 'owner': 'ATT',
35}
36
37dag_att = DAG(
38 dag_id='att_workflow_onu',
39 default_args=args,
40 # this dag will be triggered by external systems
41 schedule_interval=None,
42)
43
44dag_att.doc_md = __doc__
45
46
47def ONU_event(model_accessor, event, **kwargs):
48 #context = kwargs
49 #run_id = context['dag_run'].run_id
50
51 logging.info("onu.events: received event", event=event)
52
53 si = find_or_create_att_si(model_accessor, logging, event)
54 if event["status"] == "activated":
55 logging.info("onu.events: activated onu", value=event)
56 si.no_sync = False
57 si.uni_port_id = long(event["portNumber"])
58 si.of_dpid = event["deviceId"]
59 si.oper_onu_status = "ENABLED"
60 si.save_changed_fields(always_update_timestamp=True)
61 elif event["status"] == "disabled":
62 logging.info("onu.events: disabled onu, resetting the subscriber", value=event)
63 si.oper_onu_status = "DISABLED"
64 si.save_changed_fields(always_update_timestamp=True)
65 else:
66 logging.warn("onu.events: Unknown status value: %s" % event["status"], value=event)
67 raise AirflowException("onu.events: Unknown status value: %s" % event["status"], value=event)
68
69
70def DriverService_event(event_type, model_accessor, si, **kwargs):
71 #context = kwargs
72 #run_id = context['dag_run'].run_id
73
74 go = False
75 if event_type == 'create':
76 logging.debug("MODEL_POLICY: handle_create for AttWorkflowDriverServiceInstance %s " % si.id)
77 go = True
78 elif event_type == 'update':
79 logging.debug("MODEL_POLICY: handle_update for AttWorkflowDriverServiceInstance %s " %
80 (si.id), onu_state=si.admin_onu_state, authentication_state=si.authentication_state)
81 go = True
82 elif event_type == 'delete':
83 pass
84 else:
85 pass
86
87 if not go:
88 return
89
90 # handle only create & update events
91
92 # Changing ONU state can change auth state
93 # Changing auth state can change DHCP state
94 # So need to process in this order
95 process_onu_state(model_accessor, si)
96 process_auth_state(si)
97 process_dhcp_state(si)
98
99 validate_states(si)
100
101 # handling the subscriber status
102 # It's a combination of all the other states
103 subscriber = get_subscriber(model_accessor, si.serial_number)
104 if subscriber:
105 update_subscriber(model_accessor, subscriber, si)
106
107 si.save_changed_fields()
108
109
110def Auth_event(model_accessor, event, **kwargs):
111 #context = kwargs
112 #run_id = context['dag_run'].run_id
113
114 logging.info("authentication.events: Got event for subscriber", event_value=event)
115
116 si = find_or_create_att_si(model_accessor, logging, event)
117 logging.debug("authentication.events: Updating service instance", si=si)
118 si.authentication_state = event["authenticationState"]
119 si.save_changed_fields(always_update_timestamp=True)
120
121
122def DHCP_event(model_accessor, event, **kwargs):
123 #context = kwargs
124 #run_id = context['dag_run'].run_id
125
126 logging.info("dhcp.events: Got event for subscriber", event_value=event)
127
128 si = find_or_create_att_si(model_accessor, logging, event)
129 logging.debug("dhcp.events: Updating service instance", si=si)
130 si.dhcp_state = event["messageType"]
131 si.ip_address = event["ipAddress"]
132 si.mac_address = event["macAddress"]
133 si.save_changed_fields(always_update_timestamp=True)
134
135
136onu_event_handler = XOSEventSensor(
137 task_id='onu_event_handler',
138 topic='onu.events',
139 key_field='serialNumber',
140 provide_context=True,
141 python_callable=ONU_event,
142 poke_interval=5,
143 dag=dag_att,
144)
145
146onu_model_event_handler = XOSModelSensor(
147 task_id='onu_model_event_handler',
148 model_name='AttWorkflowDriverServiceInstance',
149 key_field='serialNumber',
150 provide_context=True,
151 python_callable=DriverService_event,
152 poke_interval=5,
153 dag=dag_att,
154)
155
156auth_event_handler = XOSEventSensor(
157 task_id='auth_event_handler',
158 topic="authentication.events",
159 key_field='serialNumber',
160 provide_context=True,
161 python_callable=Auth_event,
162 poke_interval=5,
163 dag=dag_att,
164)
165
166auth_model_event_handler = XOSModelSensor(
167 task_id='auth_model_event_handler',
168 model_name='AttWorkflowDriverServiceInstance',
169 key_field='serialNumber',
170 provide_context=True,
171 python_callable=DriverService_event,
172 poke_interval=5,
173 dag=dag_att,
174)
175
176dhcp_event_handler = XOSEventSensor(
177 task_id='dhcp_event_handler',
178 topic="dhcp.events",
179 key_field='serialNumber',
180 provide_context=True,
181 python_callable=DHCP_event,
182 poke_interval=5,
183 dag=dag_att,
184)
185
186dhcp_model_event_handler = XOSModelSensor(
187 task_id='dhcp_model_event_handler',
188 model_name='AttWorkflowDriverServiceInstance',
189 key_field='serialNumber',
190 provide_context=True,
191 python_callable=DriverService_event,
192 poke_interval=5,
193 dag=dag_att,
194)
195
196onu_event_handler >> onu_model_event_handler
197auth_event_handler << onu_model_event_handler
198auth_event_handler >> auth_model_event_handler
199dhcp_event_handler << auth_model_event_handler
200dhcp_event_handler >> dhcp_model_event_handler