blob: 0ad0ae793d8ec4cf83fc27d3bbd51e86f6b0c1fc [file] [log] [blame]
Illyoung Choi18e656a2019-07-30 11:27:36 -07001# Copyright 2019-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15"""
16Example AT&T workflow using Airflow
17"""
18import json
19import logging
20from datetime import datetime
21from airflow import DAG
22from airflow import AirflowException
23from airflow.operators.python_operator import PythonOperator
24from airflow.operators.dummy_operator import DummyOperator
25from airflow.utils.trigger_rule import TriggerRule
26from airflow.sensors.cord_workflow_plugin import CORDEventSensor, CORDModelSensor
27from airflow.operators.cord_workflow_plugin import CORDModelOperator
28
29log = logging.getLogger(__name__)
30args = {
31 # hard coded date
32 'start_date': datetime(2019, 1, 1),
33 'owner': 'ATT'
34}
35
36dag_att = DAG(
37 dag_id='att_workflow',
38 default_args=args,
39 # this dag will be triggered by external systems
40 schedule_interval=None
41)
42dag_att.doc_md = __doc__
43
44dag_att_admin = DAG(
45 dag_id='att_admin_workflow',
46 default_args=args,
47 # this dag will be triggered by external systems
48 schedule_interval=None
49)
50dag_att_admin.doc_md = __doc__
51
52def find_or_create_att_si(model_accessor, event):
53 try:
54 att_si = model_accessor.AttWorkflowDriverServiceInstance.objects.get(
55 serial_number=event["serialNumber"]
56 )
57 log.debug("Found existing AttWorkflowDriverServiceInstance - si = %s" % att_si)
58 except IndexError:
59 # create an AttWorkflowDriverServiceInstance, the validation will be
60 # triggered in the corresponding sync step
61 att_si = model_accessor.AttWorkflowDriverServiceInstance(
62 serial_number=event["serialNumber"],
63 of_dpid=event["deviceId"],
64 uni_port_id=long(event["portNumber"]),
65 # we assume there is only one AttWorkflowDriverService
66 owner=model_accessor.AttWorkflowDriverService.objects.first()
67 )
68 log.debug("Created new AttWorkflowDriverServiceInstance - si = %s" % att_si)
69 return att_si
70
71
72def validate_onu(model_accessor, si):
73 """
74 This method validate an ONU against the whitelist and set the appropriate state.
75 It's expected that the deferred exception is managed in the caller method,
76 for example a model_policy or a sync_step.
77
78 :param si: AttWorkflowDriverServiceInstance
79 :return: [boolean, string]
80 """
81
82 oss_service = si.owner.leaf_model
83
84 # See if there is a matching entry in the whitelist.
85 matching_entries = model_accessor.AttWorkflowDriverWhiteListEntry.objects.filter(
86 owner_id=oss_service.id,
87 )
88 matching_entries = [e for e in matching_entries if e.serial_number.lower() == si.serial_number.lower()]
89
90 if len(matching_entries) == 0:
91 log.warn("ONU not found in whitelist - serial_number = %s" % si.serial_number)
92 return [False, "ONU not found in whitelist"]
93
94 whitelisted = matching_entries[0]
95 try:
96 onu = model_accessor.ONUDevice.objects.get(serial_number=si.serial_number)
97 pon_port = onu.pon_port
98 except IndexError:
99 raise DeferredException("ONU device %s is not know to XOS yet" % si.serial_number)
100
101 if onu.admin_state == "ADMIN_DISABLED":
102 return [False, "ONU has been manually disabled"]
103
104 if pon_port.port_no != whitelisted.pon_port_id or si.of_dpid != whitelisted.device_id:
105 log.warn("ONU disable as location don't match - serial_number = %s, device_id= %s" % (si.serial_number, si.of_dpid))
106 return [False, "ONU activated in wrong location"]
107
108 return [True, "ONU has been validated"]
109
110
111def update_onu(model_accessor, serial_number, admin_state):
112 onu = [onu for onu in model_accessor.ONUDevice.objects.all() if onu.serial_number.lower()
113 == serial_number.lower()][0]
114 if onu.admin_state == "ADMIN_DISABLED":
115 log.debug(
116 "MODEL_POLICY: ONUDevice [%s] has been manually disabled, not changing state to %s" %
117 (serial_number, admin_state))
118 return
119 if onu.admin_state == admin_state:
120 log.debug(
121 "MODEL_POLICY: ONUDevice [%s] already has admin_state to %s" %
122 (serial_number, admin_state))
123 else:
124 log.debug(
125 "MODEL_POLICY: setting ONUDevice [%s] admin_state to %s" %
126 (serial_number, admin_state))
127 onu.admin_state = admin_state
128 onu.save_changed_fields(always_update_timestamp=True)
129
130
131def process_onu_state(model_accessor, si):
132 """
133 Check the whitelist to see if the ONU is valid. If it is, make sure that it's enabled.
134 """
135
136 [valid, message] = validate_onu(model_accessor, si)
137 si.status_message = message
138 if valid:
139 si.admin_onu_state = "ENABLED"
140 update_onu(model_accessor, si.serial_number, "ENABLED")
141 else:
142 si.admin_onu_state = "DISABLED"
143 update_onu(model_accessor, si.serial_number, "DISABLED")
144
145
146def process_auth_state(si):
147 """
148 If the ONU has been disabled then we force re-authentication when it
149 is re-enabled.
150 Setting si.authentication_state = AWAITING:
151 -> subscriber status = "awaiting_auth"
152 -> service chain deleted
153 -> need authentication to restore connectivity after ONU enabled
154 """
155
156 auth_msgs = {
157 "AWAITING": " - Awaiting Authentication",
158 "REQUESTED": " - Authentication requested",
159 "STARTED": " - Authentication started",
160 "APPROVED": " - Authentication succeeded",
161 "DENIED": " - Authentication denied"
162 }
163 if si.admin_onu_state == "DISABLED" or si.oper_onu_status == "DISABLED":
164 si.authentication_state = "AWAITING"
165 else:
166 si.status_message += auth_msgs[si.authentication_state]
167
168
169def process_dhcp_state(si):
170 """
171 The DhcpL2Relay ONOS app generates events that update the fields below.
172 It only sends events when it processes DHCP packets. It keeps no internal state.
173 We reset dhcp_state when:
174 si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]
175 -> subscriber status = "awaiting_auth"
176 -> service chain not present
177 -> subscriber's OLT flow rules, xconnect not present
178 -> DHCP packets won't go through
179 Note, however, that the DHCP state at the endpoints is not changed.
180 A previously issued DHCP lease may still be valid.
181 """
182
183 if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
184 si.ip_address = ""
185 si.mac_address = ""
186 si.dhcp_state = "AWAITING"
187
188
189def validate_states(si):
190 """
191 Make sure the object is in a legitimate state
192 It should be after the above processing steps
193 However this might still fail if an event has fired in the meantime
194 Valid states:
195 ONU | Auth | DHCP
196 ===============================
197 AWAITING | AWAITING | AWAITING
198 ENABLED | * | AWAITING
199 ENABLED | APPROVED | *
200 DISABLED | AWAITING | AWAITING
201 """
202
203 if (si.admin_onu_state == "AWAITING" or si.admin_onu_state == "DISABLED") and \
204 si.authentication_state == "AWAITING" and si.dhcp_state == "AWAITING":
205 return
206
207 if si.admin_onu_state == "ENABLED" and \
208 (si.authentication_state == "APPROVED" or si.dhcp_state == "AWAITING"):
209 return
210
211 log.warning(
212 "validate_states: invalid state combination - onu_state = %s, \
213 auth_state = %s, dhcp_state = %s" %
214 (si.admin_onu_state, si.authentication_state, si.dhcp_state)
215 )
216
217
218def get_subscriber(model_accessor, serial_number):
219 try:
220 return [s for s in model_accessor.RCORDSubscriber.objects.all() if s.onu_device.lower()
221 == serial_number.lower()][0]
222 except IndexError:
223 # If the subscriber doesn't exist we don't do anything
224 log.debug(
225 "subscriber does not exists for this SI, doing nothing - onu_device = %s" %
226 serial_number
227 )
228 return None
229
230
231def update_subscriber_ip(model_accessor, subscriber, ip):
232 # TODO check if the subscriber has an IP and update it,
233 # or create a new one
234 try:
235 ip = model_accessor.RCORDIpAddress.objects.filter(
236 subscriber_id=subscriber.id,
237 ip=ip
238 )[0]
239 log.debug(
240 "found existing RCORDIpAddress for subscriber",
241 onu_device=subscriber.onu_device,
242 subscriber_status=subscriber.status,
243 ip=ip
244 )
245 ip.save_changed_fields()
246 except IndexError:
247 log.debug(
248 "Creating new RCORDIpAddress for subscriber",
249 onu_device=subscriber.onu_device,
250 subscriber_status=subscriber.status,
251 ip=ip)
252 ip = model_accessor.RCORDIpAddress(
253 subscriber_id=subscriber.id,
254 ip=ip,
255 description="DHCP Assigned IP Address"
256 )
257 ip.save()
258
259
260def delete_subscriber_ip(model_accessor, subscriber, ip):
261 try:
262 ip = model_accessor.RCORDIpAddress.objects.filter(
263 subscriber_id=subscriber.id,
264 ip=ip
265 )[0]
266 log.debug(
267 "MODEL_POLICY: delete RCORDIpAddress for subscriber",
268 onu_device=subscriber.onu_device,
269 subscriber_status=subscriber.status,
270 ip=ip)
271 ip.delete()
272 except BaseException:
273 log.warning("MODEL_POLICY: no RCORDIpAddress object found, cannot delete", ip=ip)
274
275
276def update_subscriber(model_accessor, subscriber, si):
277 cur_status = subscriber.status
278 # Don't change state if someone has disabled the subscriber
279 if subscriber.status != "disabled":
280 if si.authentication_state in ["AWAITING", "REQUESTED", "STARTED"]:
281 subscriber.status = "awaiting-auth"
282 elif si.authentication_state == "APPROVED":
283 subscriber.status = "enabled"
284 elif si.authentication_state == "DENIED":
285 subscriber.status = "auth-failed"
286
287 # NOTE we save the subscriber only if:
288 # - the status has changed
289 # - we get a DHCPACK event
290 if cur_status != subscriber.status or si.dhcp_state == "DHCPACK":
291 log.debug(
292 "updating subscriber",
293 onu_device=subscriber.onu_device,
294 authentication_state=si.authentication_state,
295 subscriber_status=subscriber.status
296 )
297
298 if subscriber.status == "awaiting-auth":
299 delete_subscriber_ip(model_accessor, subscriber, si.ip_address)
300 subscriber.mac_address = ""
301 elif si.ip_address and si.mac_address:
302 update_subscriber_ip(model_accessor, subscriber, si.ip_address)
303 subscriber.mac_address = si.mac_address
304 subscriber.save_changed_fields(always_update_timestamp=True)
305 else:
306 log.debug(
307 "subscriber status has not changed",
308 onu_device=subscriber.onu_device,
309 authentication_state=si.authentication_state,
310 subscriber_status=subscriber.status
311 )
312
313
314def update_model(model_accessor, si):
315 # Changing ONU state can change auth state
316 # Changing auth state can change DHCP state
317 # So need to process in this order
318 process_onu_state(model_accessor, si)
319 process_auth_state(si)
320 process_dhcp_state(si)
321
322 validate_states(si)
323
324 # handling the subscriber status
325 # It's a combination of all the other states
326 subscriber = get_subscriber(model_accessor, si.serial_number)
327 if subscriber:
328 update_subscriber(model_accessor, subscriber, si)
329
330 si.save_changed_fields(always_update_timestamp=True)
331
332
333def on_onu_event(model_accessor, message, **kwargs):
334 log.info('onu.events: received an event - message = %s' % message)
335
336 si = find_or_create_att_si(model_accessor, message)
337 if message['status'] == 'activated':
338 log.info('onu.events: activated onu')
339 si.no_sync = False
340 si.uni_port_id = long(message['portNumber'])
341 si.of_dpid = message['deviceId']
342 si.oper_onu_status = 'ENABLED'
343 elif message['status'] == 'disabled':
344 log.info('onu.events: disabled onu, resetting the subscriber')
345 si.oper_onu_status = 'DISABLED'
346 else:
347 log.error('onu.events: Unknown status value: %s' % message['status'])
348 raise AirflowException('onu.events: Unknown status value: %s' % message['status'])
349
350 update_model(model_accessor, si)
351
352def on_auth_event(model_accessor, message, **kwargs):
353 log.info('authentication.events: received an event - message = %s' % message)
354
355 si = find_or_create_att_si(model_accessor, message)
356 log.debug('authentication.events: Updating service instance')
357 si.authentication_state = message['authenticationState']
358 update_model(model_accessor, si)
359
360
361def on_dhcp_event(model_accessor, message, **kwargs):
362 log.info('dhcp.events: received an event - message = %s' % message)
363
364 si = find_or_create_att_si(model_accessor, message)
365 log.debug('dhcp.events: Updating service instance')
366 si.dhcp_state = message['messageType']
367 si.ip_address = message['ipAddress']
368 si.mac_address = message['macAddress']
369 update_model(model_accessor, si)
370
371
372def DriverService_event(model_accessor, message, **kwargs):
373 log.info('model event: received an event - %s' % message)
374
375 # handle only create & update events
376 event_type = message['event_type']
377 if event_type is None or event_type.lower() not in ['create', 'update']:
378 log.error('can not handle an event type - %s' % event_type)
379 return
380
381 si = find_or_create_att_si(model_accessor, message)
382 update_model(model_accessor, si)
383
384
385onu_event_sensor = CORDEventSensor(
386 task_id='onu_event_sensor',
387 topic='onu.events',
388 key_field='serialNumber',
389 controller_conn_id='local_cord_controller',
390 poke_interval=5,
391 dag=dag_att
392)
393
394onu_event_handler = CORDModelOperator(
395 task_id='onu_event_handler',
396 python_callable=on_onu_event,
397 cord_event_sensor_task_id='onu_event_sensor',
398 dag=dag_att
399)
400
401auth_event_sensor = CORDEventSensor(
402 task_id='auth_event_sensor',
403 topic='authentication.events',
404 key_field='serialNumber',
405 controller_conn_id='local_cord_controller',
406 poke_interval=5,
407 dag=dag_att
408)
409
410auth_event_handler = CORDModelOperator(
411 task_id='auth_event_handler',
412 python_callable=on_auth_event,
413 cord_event_sensor_task_id='auth_event_sensor',
414 dag=dag_att
415)
416
417dhcp_event_sensor = CORDEventSensor(
418 task_id='dhcp_event_sensor',
419 topic='dhcp.events',
420 key_field='serialNumber',
421 controller_conn_id='local_cord_controller',
422 poke_interval=5,
423 dag=dag_att
424)
425
426dhcp_event_handler = CORDModelOperator(
427 task_id='dhcp_event_handler',
428 python_callable=on_dhcp_event,
429 cord_event_sensor_task_id='dhcp_event_sensor',
430 dag=dag_att
431)
432
433join = DummyOperator(
434 task_id='join',
435 trigger_rule=TriggerRule.ALL_DONE,
436 dag=dag_att
437)
438
439att_model_event_sensor = CORDModelSensor(
440 task_id='att_model_event_sensor',
441 model_name='AttWorkflowDriverServiceInstance',
442 key_field='serialNumber',
443 controller_conn_id='local_cord_controller',
444 poke_interval=5,
445 dag=dag_att_admin
446)
447
448att_model_event_handler = CORDModelOperator(
449 task_id='att_model_event_handler',
450 python_callable=DriverService_event,
451 cord_event_sensor_task_id='att_model_event_sensor',
452 dag=dag_att_admin
453)
454
455# handle standard flow
456onu_event_sensor >> onu_event_handler >> join
457auth_event_sensor >> auth_event_handler >> join
458dhcp_event_sensor >> dhcp_event_handler >> join
459
460# handle admin flow
461att_model_event_sensor >> att_model_event_handler
462