blob: 688eb5bd634883ba4365bd582968e47fce2661df [file] [log] [blame]
Illyoung Choi18e656a2019-07-30 11:27:36 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Example AT&T workflow using Airflow
17"""
18import json
19import logging
20from datetime import datetime
21from airflow import DAG
22from airflow import AirflowException
23from airflow.operators.python_operator import PythonOperator
Illyoung Choi18e656a2019-07-30 11:27:36 -070024from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
25from airflow.operators.cord_workflow_plugin import CORDModelOperator
Illyoung Choi4b5ebfd2019-08-01 10:39:06 -070026from xossynchronizer.steps.syncstep import DeferredException
Illyoung Choi18e656a2019-07-30 11:27:36 -070027
28log = logging.getLogger(__name__)
29args = {
30 # hard coded date
31 'start_date': datetime(2019, 1, 1),
32 'owner': 'ATT'
33}
34
35dag_att = DAG(
36 dag_id='att_workflow',
37 default_args=args,
38 # this dag will be triggered by external systems
39 schedule_interval=None
40)
41dag_att.doc_md = __doc__
42
43dag_att_admin = DAG(
44 dag_id='att_admin_workflow',
45 default_args=args,
46 # this dag will be triggered by external systems
47 schedule_interval=None
48)
49dag_att_admin.doc_md = __doc__
50
51def find_or_create_att_si(model_accessor, event):
52 try:
53 att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
54 serial_number=event["serialNumber"]
55 )
56 log.debug("Found existing AttWorkflowDriverServiceInstance - si = %s" % att_si)
57 except IndexError:
58 # create an AttWorkflowDriverServiceInstance, the validation will be
59 # triggered in the corresponding sync step
60 att_si = model_accessor.AttWorkflowDriverServiceInstance(
61 serial_number=event["serialNumber"],
62 of_dpid=event["deviceId"],
63 uni_port_id=long(event["portNumber"]),
64 # we assume there is only one AttWorkflowDriverService
65 owner=model_accessor.AttWorkflowDriverService.objects.first()
66 )
67 log.debug("Created new AttWorkflowDriverServiceInstance - si = %s" % att_si)
68 return att_si
69
70
71def validate_onu(model_accessor, si):
72 """
73 This method validate an ONU against the whitelist and set the appropriate state.
74 It's expected that the deferred exception is managed in the caller method,
75 for example a model_policy or a sync_step.
76
77 :param si: AttWorkflowDriverServiceInstance
78 :return: [boolean, string]
79 """
80
81 oss_service = si.owner.leaf_model
82
83 # See if there is a matching entry in the whitelist.
84 matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
85 owner_id=oss_service.id,
86 )
87 matching_entries = [e for e in matching_entries if e.serial_number.lower() == si.serial_number.lower()]
88
89 if len(matching_entries) == 0:
90 log.warn("ONU not found in whitelist - serial_number = %s" % si.serial_number)
91 return [False, "ONU not found in whitelist"]
92
93 whitelisted = matching_entries[0]
94 try:
95 onu = model_accessor.ONUDevice.objects.get(serial_number=si.serial_number)
96 pon_port = onu.pon_port
97 except IndexError:
98 raise DeferredException("ONU device %s is not know to XOS yet" % si.serial_number)
99
100 if onu.admin_state == "ADMIN_DISABLED":
101 return [False, "ONU has been manually disabled"]
102
103 if pon_port.port_no != whitelisted.pon_port_id or si.of_dpid != whitelisted.device_id:
104 log.warn("ONU disable as location don't match - serial_number = %s, device_id= %s" % (si.serial_number, si.of_dpid))
105 return [False, "ONU activated in wrong location"]
106
107 return [True, "ONU has been validated"]
108
109
110def update_onu(model_accessor, serial_number, admin_state):
111 onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
112 == serial_number.lower()][0]
113 if onu.admin_state == "ADMIN_DISABLED":
114 log.debug(
115 "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
116 (serial_number, admin_state))
117 return
118 if onu.admin_state == admin_state:
119 log.debug(
120 "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
121 (serial_number, admin_state))
122 else:
123 log.debug(
124 "MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" %
125 (serial_number, admin_state))
126 onu.admin_state = admin_state
127 onu.save_changed_fields(always_update_timestamp=True)
128
129
130def process_onu_state(model_accessor, si):
131 """
132 Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
133 """
134
135 [valid, message] = validate_onu(model_accessor, si)
136 si.status_message = message
137 if valid:
138 si.admin_onu_state = "ENABLED"
139 update_onu(model_accessor, si.serial_number, "ENABLED")
140 else:
141 si.admin_onu_state = "DISABLED"
142 update_onu(model_accessor, si.serial_number, "DISABLED")
143
144
145def process_auth_state(si):
146 """
147 If the ONU has been disabled then we force re-authentication when it
148 is re-enabled.
149 Setting si.authentication_state = AWAITING:
150 -> subscriber status = "awaiting_auth"
151 -> service chain deleted
152 -> need authentication to restore connectivity after ONU enabled
153 """
154
155 auth_msgs = {
156 "AWAITING": " - Awaiting Authentication",
157 "REQUESTED": " - Authentication requested",
158 "STARTED": " - Authentication started",
159 "APPROVED": " - Authentication succeeded",
160 "DENIED": " - Authentication denied"
161 }
162 if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
163 si.authentication_state = "AWAITING"
164 else:
165 si.status_message += auth_msgs[si.authentication_state]
166
167
168def process_dhcp_state(si):
169 """
170 The DhcpL2Relay ONOS app generates events that update the fields below.
171 It only sends events when it processes DHCP packets. It keeps no internal state.
172 We reset dhcp_state when:
173 si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
174 -> subscriber status = "awaiting_auth"
175 -> service chain not present
176 -> subscriber's OLT flow rules, xconnect not present
177 -> DHCP packets won't go through
178 Note, however, that the DHCP state at the endpoints is not changed.
179 A previously issued DHCP lease may still be valid.
180 """
181
182 if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
183 si.ip_address = ""
184 si.mac_address = ""
185 si.dhcp_state = "AWAITING"
186
187
188def validate_states(si):
189 """
190 Make sure the object is in a legitimate state
191 It should be after the above processing steps
192 However this might still fail if an event has fired in the meantime
193 Valid states:
194 ONU | Auth | DHCP
195 ===============================
196 AWAITING | AWAITING | AWAITING
197 ENABLED | * | AWAITING
198 ENABLED | APPROVED | *
199 DISABLED | AWAITING | AWAITING
200 """
201
202 if (si.admin_onu_state == "AWAITING" or si.admin_onu_state == "DISABLED") and \
203 si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
204 return
205
206 if si.admin_onu_state == "ENABLED" and \
207 (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
208 return
209
210 log.warning(
211 "validate_states: invalid state combination - onu_state = %s, \
212 auth_state = %s, dhcp_state = %s" %
213 (si.admin_onu_state, si.authentication_state, si.dhcp_state)
214 )
215
216
217def get_subscriber(model_accessor, serial_number):
218 try:
219 return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
220 == serial_number.lower()][0]
221 except IndexError:
222 # If the subscriber doesn't exist we don't do anything
223 log.debug(
224 "subscriber does not exists for this SI, doing nothing - onu_device = %s" %
225 serial_number
226 )
227 return None
228
229
230def update_subscriber_ip(model_accessor, subscriber, ip):
231 # TODO check if the subscriber has an IP and update it,
232 # or create a new one
233 try:
234 ip = model_accessor.RCORDIpAddress.objects.filter(
235 subscriber_id=subscriber.id,
236 ip=ip
237 )[0]
238 log.debug(
239 "found existing RCORDIpAddress for subscriber",
240 onu_device=subscriber.onu_device,
241 subscriber_status=subscriber.status,
242 ip=ip
243 )
244 ip.save_changed_fields()
245 except IndexError:
246 log.debug(
247 "Creating new RCORDIpAddress for subscriber",
248 onu_device=subscriber.onu_device,
249 subscriber_status=subscriber.status,
250 ip=ip)
251 ip = model_accessor.RCORDIpAddress(
252 subscriber_id=subscriber.id,
253 ip=ip,
254 description="DHCP Assigned IP Address"
255 )
256 ip.save()
257
258
259def delete_subscriber_ip(model_accessor, subscriber, ip):
260 try:
261 ip = model_accessor.RCORDIpAddress.objects.filter(
262 subscriber_id=subscriber.id,
263 ip=ip
264 )[0]
265 log.debug(
266 "MODEL_POLICY: delete RCORDIpAddress for subscriber",
267 onu_device=subscriber.onu_device,
268 subscriber_status=subscriber.status,
269 ip=ip)
270 ip.delete()
271 except BaseException:
272 log.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
273
274
275def update_subscriber(model_accessor, subscriber, si):
276 cur_status = subscriber.status
277 # Don't change state if someone has disabled the subscriber
278 if subscriber.status != "disabled":
279 if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
280 subscriber.status = "awaiting-auth"
281 elif si.authentication_state == "APPROVED":
282 subscriber.status = "enabled"
283 elif si.authentication_state == "DENIED":
284 subscriber.status = "auth-failed"
285
286 # NOTE we save the subscriber only if:
287 # - the status has changed
288 # - we get a DHCPACK event
289 if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
290 log.debug(
291 "updating subscriber",
292 onu_device=subscriber.onu_device,
293 authentication_state=si.authentication_state,
294 subscriber_status=subscriber.status
295 )
296
297 if subscriber.status == "awaiting-auth":
298 delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
299 subscriber.mac_address = ""
300 elif si.ip_address and si.mac_address:
301 update_subscriber_ip(model_accessor, subscriber, si.ip_address)
302 subscriber.mac_address = si.mac_address
303 subscriber.save_changed_fields(always_update_timestamp=True)
304 else:
305 log.debug(
306 "subscriber status has not changed",
307 onu_device=subscriber.onu_device,
308 authentication_state=si.authentication_state,
309 subscriber_status=subscriber.status
310 )
311
312
313def update_model(model_accessor, si):
314 # Changing ONU state can change auth state
315 # Changing auth state can change DHCP state
316 # So need to process in this order
317 process_onu_state(model_accessor, si)
318 process_auth_state(si)
319 process_dhcp_state(si)
320
321 validate_states(si)
322
323 # handling the subscriber status
324 # It's a combination of all the other states
325 subscriber = get_subscriber(model_accessor, si.serial_number)
326 if subscriber:
327 update_subscriber(model_accessor, subscriber, si)
328
329 si.save_changed_fields(always_update_timestamp=True)
330
331
332def on_onu_event(model_accessor, message, **kwargs):
333 log.info('onu.events: received an event - message = %s' % message)
334
335 si = find_or_create_att_si(model_accessor, message)
336 if message['status'] == 'activated':
337 log.info('onu.events: activated onu')
338 si.no_sync = False
339 si.uni_port_id = long(message['portNumber'])
340 si.of_dpid = message['deviceId']
341 si.oper_onu_status = 'ENABLED'
342 elif message['status'] == 'disabled':
343 log.info('onu.events: disabled onu, resetting the subscriber')
344 si.oper_onu_status = 'DISABLED'
345 else:
346 log.error('onu.events: Unknown status value: %s' % message['status'])
347 raise AirflowException('onu.events: Unknown status value: %s' % message['status'])
348
349 update_model(model_accessor, si)
350
351def on_auth_event(model_accessor, message, **kwargs):
352 log.info('authentication.events: received an event - message = %s' % message)
353
354 si = find_or_create_att_si(model_accessor, message)
355 log.debug('authentication.events: Updating service instance')
356 si.authentication_state = message['authenticationState']
357 update_model(model_accessor, si)
358
359
360def on_dhcp_event(model_accessor, message, **kwargs):
361 log.info('dhcp.events: received an event - message = %s' % message)
362
363 si = find_or_create_att_si(model_accessor, message)
364 log.debug('dhcp.events: Updating service instance')
365 si.dhcp_state = message['messageType']
366 si.ip_address = message['ipAddress']
367 si.mac_address = message['macAddress']
368 update_model(model_accessor, si)
369
370
371def DriverService_event(model_accessor, message, **kwargs):
372 log.info('model event: received an event - %s' % message)
373
374 # handle only create & update events
375 event_type = message['event_type']
376 if event_type is None or event_type.lower() not in ['create', 'update']:
377 log.error('can not handle an event type - %s' % event_type)
378 return
379
380 si = find_or_create_att_si(model_accessor, message)
381 update_model(model_accessor, si)
382
383
384onu_event_sensor = CORDEventSensor(
385 task_id='onu_event_sensor',
386 topic='onu.events',
387 key_field='serialNumber',
388 controller_conn_id='local_cord_controller',
389 poke_interval=5,
390 dag=dag_att
391)
392
393onu_event_handler = CORDModelOperator(
394 task_id='onu_event_handler',
395 python_callable=on_onu_event,
396 cord_event_sensor_task_id='onu_event_sensor',
397 dag=dag_att
398)
399
400auth_event_sensor = CORDEventSensor(
401 task_id='auth_event_sensor',
402 topic='authentication.events',
403 key_field='serialNumber',
404 controller_conn_id='local_cord_controller',
405 poke_interval=5,
406 dag=dag_att
407)
408
409auth_event_handler = CORDModelOperator(
410 task_id='auth_event_handler',
411 python_callable=on_auth_event,
412 cord_event_sensor_task_id='auth_event_sensor',
413 dag=dag_att
414)
415
416dhcp_event_sensor = CORDEventSensor(
417 task_id='dhcp_event_sensor',
418 topic='dhcp.events',
419 key_field='serialNumber',
420 controller_conn_id='local_cord_controller',
421 poke_interval=5,
422 dag=dag_att
423)
424
425dhcp_event_handler = CORDModelOperator(
426 task_id='dhcp_event_handler',
427 python_callable=on_dhcp_event,
428 cord_event_sensor_task_id='dhcp_event_sensor',
429 dag=dag_att
430)
431
Illyoung Choi18e656a2019-07-30 11:27:36 -0700432att_model_event_sensor = CORDModelSensor(
433 task_id='att_model_event_sensor',
434 model_name='AttWorkflowDriverServiceInstance',
435 key_field='serialNumber',
436 controller_conn_id='local_cord_controller',
437 poke_interval=5,
438 dag=dag_att_admin
439)
440
441att_model_event_handler = CORDModelOperator(
442 task_id='att_model_event_handler',
443 python_callable=DriverService_event,
444 cord_event_sensor_task_id='att_model_event_sensor',
445 dag=dag_att_admin
446)
447
448# handle standard flow
Illyoung Choi4fed65e2019-07-31 13:06:24 -0700449onu_event_sensor >> onu_event_handler >> auth_event_sensor >> auth_event_handler >> dhcp_event_sensor >> dhcp_event_handler
Illyoung Choi18e656a2019-07-30 11:27:36 -0700450
451# handle admin flow
452att_model_event_sensor >> att_model_event_handler
453