blob: 6df3fab3018db7b4ed534d47844c8bdef5d14ca0 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# TODO:
16# Add unit tests:
17# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
18
19import time
20import threading
21import json
22
23from collections import defaultdict
24from networkx import (
25 DiGraph,
26 weakly_connected_component_subgraphs,
27 all_shortest_paths,
28 NetworkXNoPath,
29)
30from networkx.algorithms.dag import topological_sort
31
32from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
Scott Bakerbba67b62019-01-28 17:38:21 -080033
34from xosconfig import Config
35from multistructlog import create_logger
36
37log = create_logger(Config().get("logging"))
38
39
40class StepNotReady(Exception):
41 pass
42
43
44class ExternalDependencyFailed(Exception):
45 pass
46
47
48# FIXME: Move drivers into a context shared across sync steps.
49
50
51class NoOpDriver:
52 def __init__(self):
53 self.enabled = True
54 self.dependency_graph = None
55
56
57# Everyone gets NoOpDriver by default. To use a different driver, call
58# set_driver() below.
59DRIVER = NoOpDriver()
60
61DIRECT_EDGE = 1
62PROXY_EDGE = 2
63
64
65def set_driver(x):
66 global DRIVER
67 DRIVER = x
68
69
70class XOSObserver(object):
71 sync_steps = []
72
Scott Bakerc2fddaa2019-01-30 15:45:03 -080073 def __init__(self, sync_steps, model_accessor, log=log):
Scott Bakerbba67b62019-01-28 17:38:21 -080074 self.log = log
Scott Bakerc2fddaa2019-01-30 15:45:03 -080075 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080076
77 self.step_lookup = {}
78 self.sync_steps = sync_steps
79 self.load_sync_steps()
80
81 self.load_dependency_graph()
82
83 self.event_cond = threading.Condition()
84
85 self.driver = DRIVER
86 self.observer_name = Config.get("name")
87
88 def wait_for_event(self, timeout):
89 self.event_cond.acquire()
90 self.event_cond.wait(timeout)
91 self.event_cond.release()
92
93 def wake_up(self):
94 self.log.debug("Wake up routine called")
95 self.event_cond.acquire()
96 self.event_cond.notify()
97 self.event_cond.release()
98
99 def load_dependency_graph(self):
100
101 try:
102 if Config.get("dependency_graph"):
103 self.log.trace(
104 "Loading model dependency graph",
105 path=Config.get("dependency_graph"),
106 )
107 dep_graph_str = open(Config.get("dependency_graph")).read()
108 else:
109 self.log.trace("Using default model dependency graph", graph={})
110 dep_graph_str = "{}"
111
112 # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
113 # src_port is the field that accesses Model2 from Model1
114 # dst_port is the field that accesses Model1 from Model2
115 static_dependencies = json.loads(dep_graph_str)
Scott Bakera02f4392019-02-05 10:54:32 -0800116 dynamic_dependencies = [] # Dropped Service and ServiceInstance dynamic dependencies
Scott Bakerbba67b62019-01-28 17:38:21 -0800117
118 joint_dependencies = dict(
119 static_dependencies.items() + dynamic_dependencies
120 )
121
122 model_dependency_graph = DiGraph()
123 for src_model, deps in joint_dependencies.items():
124 for dep in deps:
125 dst_model, src_accessor, dst_accessor = dep
126 if src_model != dst_model:
127 edge_label = {
128 "src_accessor": src_accessor,
129 "dst_accessor": dst_accessor,
130 }
131 model_dependency_graph.add_edge(
132 src_model, dst_model, edge_label
133 )
134
135 model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
136 self.model_dependency_graph = {
137 # deletion
138 True: model_dependency_graph_rev,
139 False: model_dependency_graph,
140 }
141 self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
142 except Exception as e:
143 self.log.exception("Error loading dependency graph", e=e)
144 raise e
145
146 def load_sync_steps(self):
147 model_to_step = defaultdict(list)
148 external_dependencies = []
149
150 for s in self.sync_steps:
151 if not isinstance(s.observes, list):
152 observes = [s.observes]
153 else:
154 observes = s.observes
Scott Bakerbba67b62019-01-28 17:38:21 -0800155 for m in observes:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800156 if isinstance(m, str):
157 # observes is a string that names the model
158 model_to_step[m].append(s.__name__)
159 else:
160 # observes is the model class
161 model_to_step[m.__name__].append(s.__name__)
Scott Bakerbba67b62019-01-28 17:38:21 -0800162
163 try:
164 external_dependencies.extend(s.external_dependencies)
165 except AttributeError:
166 pass
167
168 self.step_lookup[s.__name__] = s
169
170 self.model_to_step = model_to_step
171 self.external_dependencies = list(set(external_dependencies))
172 self.log.info(
173 "Loaded external dependencies", external_dependencies=external_dependencies
174 )
175 self.log.info("Loaded model_map", **model_to_step)
176
177 def reset_model_accessor(self, o=None):
178 try:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800179 self.model_accessor.reset_queries()
Scott Bakerbba67b62019-01-28 17:38:21 -0800180 except BaseException:
181 # this shouldn't happen, but in case it does, catch it...
182 if o:
183 logdict = o.tologdict()
184 else:
185 logdict = {}
186
187 self.log.error("exception in reset_queries", **logdict)
188
189 def delete_record(self, o, dr_log=None):
190
191 if dr_log is None:
192 dr_log = self.log
193
194 if getattr(o, "backend_need_reap", False):
195 # the object has already been deleted and marked for reaping
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800196 self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800197 else:
198 step = getattr(o, "synchronizer_step", None)
199 if not step:
200 raise ExternalDependencyFailed
201
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800202 self.model_accessor.journal_object(o, "syncstep.call.delete_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800203
204 dr_log.debug("Deleting object", **o.tologdict())
205
206 step.log = dr_log.new(step=step)
207 step.delete_record(o)
208 step.log = dr_log
209
210 dr_log.debug("Deleted object", **o.tologdict())
211
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800212 self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800213 o.backend_need_reap = True
214 o.save(update_fields=["backend_need_reap"])
215
216 def sync_record(self, o, sr_log=None):
217 try:
218 step = o.synchronizer_step
219 except AttributeError:
220 step = None
221
222 if step is None:
223 raise ExternalDependencyFailed
224
225 if sr_log is None:
226 sr_log = self.log
227
228 # Mark this as an object that will require delete. Do
229 # this now rather than after the syncstep,
230 if not (o.backend_need_delete):
231 o.backend_need_delete = True
232 o.save(update_fields=["backend_need_delete"])
233
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800234 self.model_accessor.journal_object(o, "syncstep.call.sync_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800235
236 sr_log.debug("Syncing object", **o.tologdict())
237
238 step.log = sr_log.new(step=step)
239 step.sync_record(o)
240 step.log = sr_log
241
242 sr_log.debug("Synced object", **o.tologdict())
243
244 o.enacted = max(o.updated, o.changed_by_policy)
245 scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
246 o.backend_register = json.dumps(scratchpad)
247 o.backend_status = "OK"
248 o.backend_code = 1
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800249 self.model_accessor.journal_object(o, "syncstep.call.save_update")
Scott Bakerbba67b62019-01-28 17:38:21 -0800250 o.save(
251 update_fields=[
252 "enacted",
253 "backend_status",
254 "backend_register",
255 "backend_code",
256 ]
257 )
258
259 if hasattr(step, "after_sync_save"):
260 step.log = sr_log.new(step=step)
261 step.after_sync_save(o)
262 step.log = sr_log
263
264 sr_log.info("Saved sync object", o=o)
265
266 """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
267
268 def handle_sync_exception(self, o, e):
269 self.log.exception("sync step failed!", e=e, **o.tologdict())
270 current_code = o.backend_code
271
272 if hasattr(e, "message"):
273 status = str(e.message)
274 else:
275 status = str(e)
276
277 if isinstance(e, InnocuousException):
278 code = 1
279 elif isinstance(e, DeferredException):
280 # NOTE if the synchronization is Deferred it means that synchronization is still in progress
281 code = 0
282 else:
283 code = 2
284
285 self.set_object_error(o, status, code)
286
287 dependency_error = "Failed due to error in model %s id %d: %s" % (
288 o.leaf_model_name,
289 o.id,
290 status,
291 )
292 return dependency_error, code
293
294 def set_object_error(self, o, status, code):
295 if o.backend_status:
296 error_list = o.backend_status.split(" // ")
297 else:
298 error_list = []
299
300 if status not in error_list:
301 error_list.append(status)
302
303 # Keep last two errors
304 error_list = error_list[-2:]
305
306 o.backend_code = code
307 o.backend_status = " // ".join(error_list)
308
309 try:
310 scratchpad = json.loads(o.backend_register)
311 scratchpad["exponent"]
312 except BaseException:
313 scratchpad = {
314 "next_run": 0,
315 "exponent": 0,
316 "last_success": time.time(),
317 "failures": 0,
318 }
319
320 # Second failure
321 if scratchpad["exponent"]:
322 if code == 1:
323 delay = scratchpad["exponent"] * 60 # 1 minute
324 else:
325 delay = scratchpad["exponent"] * 600 # 10 minutes
326
327 # cap delays at 8 hours
328 if delay > 8 * 60 * 60:
329 delay = 8 * 60 * 60
330 scratchpad["next_run"] = time.time() + delay
331
332 scratchpad["exponent"] += 1
333
334 try:
335 scratchpad["failures"] += 1
336 except KeyError:
337 scratchpad["failures"] = 1
338
339 scratchpad["last_failure"] = time.time()
340
341 o.backend_register = json.dumps(scratchpad)
342
343 # TOFIX:
344 # DatabaseError: value too long for type character varying(140)
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800345 if self.model_accessor.obj_exists(o):
Scott Bakerbba67b62019-01-28 17:38:21 -0800346 try:
347 o.backend_status = o.backend_status[:1024]
348 o.save(
349 update_fields=["backend_status", "backend_register"],
350 always_update_timestamp=True,
351 )
352 except BaseException as e:
353 self.log.exception("Could not update backend status field!", e=e)
354 pass
355
356 def sync_cohort(self, cohort, deletion):
357 threading.current_thread().is_sync_thread = True
358
359 sc_log = self.log.new(thread_id=threading.current_thread().ident)
360
361 try:
362 start_time = time.time()
363 sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
364
365 cohort_emptied = False
366 dependency_error = None
367 dependency_error_code = None
368
369 itty = iter(cohort)
370
371 while not cohort_emptied:
372 try:
373 self.reset_model_accessor()
374 o = next(itty)
375
376 if dependency_error:
377 self.set_object_error(
378 o, dependency_error, dependency_error_code
379 )
380 continue
381
382 try:
383 if deletion:
384 self.delete_record(o, sc_log)
385 else:
386 self.sync_record(o, sc_log)
387 except ExternalDependencyFailed:
388 dependency_error = (
389 "External dependency on object %s id %d not met"
390 % (o.leaf_model_name, o.id)
391 )
392 dependency_error_code = 1
393 except (DeferredException, InnocuousException, Exception) as e:
394 dependency_error, dependency_error_code = self.handle_sync_exception(
395 o, e
396 )
397
398 except StopIteration:
399 sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
400 cohort_emptied = True
401 finally:
402 self.reset_model_accessor()
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800403 self.model_accessor.connection_close()
Scott Bakerbba67b62019-01-28 17:38:21 -0800404
Scott Bakerbba67b62019-01-28 17:38:21 -0800405 def run(self):
406 # Cleanup: Move self.driver into a synchronizer context
407 # made available to every sync step.
408 if not self.driver.enabled:
409 self.log.warning("Driver is not enabled. Not running sync steps.")
410 return
411
412 while True:
413 self.log.trace("Waiting for event or timeout")
414 self.wait_for_event(timeout=5)
415 self.log.trace("Synchronizer awake")
416
417 self.run_once()
418
419 def fetch_pending(self, deletion=False):
420 unique_model_list = list(set(self.model_to_step.keys()))
421 pending_objects = []
422 pending_steps = []
423 step_list = self.step_lookup.values()
424
425 for e in self.external_dependencies:
426 s = SyncStep
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800427 if isinstance(e,str):
428 # external dependency is a string that names a model class
429 s.observes = self.model_accessor.get_model_class(e)
430 else:
431 # external dependency is a model class
432 s.observes = e
Scott Bakerbba67b62019-01-28 17:38:21 -0800433 step_list.append(s)
434
435 for step_class in step_list:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800436 step = step_class(driver=self.driver, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800437 step.log = self.log.new(step=step)
438
439 if not hasattr(step, "call"):
440 pending = step.fetch_pending(deletion)
441 for obj in pending:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800442 step = step_class(driver=self.driver, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800443 step.log = self.log.new(step=step)
444 obj.synchronizer_step = step
445
Scott Bakerbba67b62019-01-28 17:38:21 -0800446 pending_objects.extend(pending)
Scott Bakerbba67b62019-01-28 17:38:21 -0800447 else:
448 # Support old and broken legacy synchronizers
449 # This needs to be dropped soon.
450 pending_steps.append(step)
451
452 self.log.trace(
453 "Fetched pending data",
454 pending_objects=pending_objects,
455 legacy_steps=pending_steps,
456 )
457 return pending_objects, pending_steps
458
459 def linked_objects(self, o):
460 if o is None:
461 return [], None
462 try:
463 o_lst = [o for o in o.all()]
464 edge_type = PROXY_EDGE
465 except (AttributeError, TypeError):
466 o_lst = [o]
467 edge_type = DIRECT_EDGE
468 return o_lst, edge_type
469
470 """ Automatically test if a real dependency path exists between two objects. e.g.
471 given an Instance, and a ControllerSite, the test amounts to:
472 instance.slice.site == controller.site
473
474 Then the two objects are related, and should be put in the same cohort.
475 If the models of the two objects are not dependent, then the check trivially
476 returns False.
477 """
478
479 def same_object(self, o1, o2):
480 if not o1 or not o2:
481 return False, None
482
483 o1_lst, edge_type = self.linked_objects(o1)
484
485 try:
486 found = next(
487 obj
488 for obj in o1_lst
489 if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
490 )
491 except AttributeError as e:
492 self.log.exception("Compared objects could not be identified", e=e)
493 raise e
494 except StopIteration:
495 # This is a temporary workaround to establish dependencies between
496 # deleted proxy objects. A better solution would be for the ORM to
497 # return the set of deleted objects via foreign keys. At that point,
498 # the following line would change back to found = False
499 # - Sapan
500
501 found = getattr(o2, "deleted", False)
502
503 return found, edge_type
504
505 def concrete_path_exists(self, o1, o2):
506 try:
507 m1 = o1.leaf_model_name
508 m2 = o2.leaf_model_name
509 except AttributeError:
510 # One of the nodes is not in the dependency graph
511 # No dependency
512 return False, None
513
514 if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
515 return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
516
517 # FIXME: Dynamic dependency check
518 G = self.model_dependency_graph[False]
519 paths = all_shortest_paths(G, m1, m2)
520
521 try:
522 any(paths)
523 paths = all_shortest_paths(G, m1, m2)
524 except NetworkXNoPath:
525 # Easy. The two models are unrelated.
526 return False, None
527
528 for p in paths:
529 src_object = o1
530 edge_type = DIRECT_EDGE
531
532 for i in range(len(p) - 1):
533 src = p[i]
534 dst = p[i + 1]
535 edge_label = G[src][dst]
536 sa = edge_label["src_accessor"]
537 try:
538 dst_accessor = getattr(src_object, sa)
539 dst_objects, link_edge_type = self.linked_objects(dst_accessor)
540 if link_edge_type == PROXY_EDGE:
541 edge_type = link_edge_type
542
543 """
544
545 True If no linked objects and deletion
546 False If no linked objects
547 True If multiple linked objects
548 <continue traversal> If single linked object
549
550 """
551
552 if dst_objects == []:
553 # Workaround for ORM not returning linked deleted
554 # objects
555 if o2.deleted:
556 return True, edge_type
557 else:
558 dst_object = None
559 elif len(dst_objects) > 1:
560 # Multiple linked objects. Assume anything could be among those multiple objects.
561 raise AttributeError
562 else:
563 dst_object = dst_objects[0]
564 except AttributeError as e:
565 if sa != "fake_accessor":
566 self.log.debug(
567 "Could not check object dependencies, making conservative choice %s",
568 e,
569 src_object=src_object,
570 sa=sa,
571 o1=o1,
572 o2=o2,
573 )
574 return True, edge_type
575
576 src_object = dst_object
577
578 if not src_object:
579 break
580
581 verdict, edge_type = self.same_object(src_object, o2)
582 if verdict:
583 return verdict, edge_type
584
585 # Otherwise try other paths
586
587 return False, None
588
589 """
590
591 This function implements the main scheduling logic
592 of the Synchronizer. It divides incoming work (dirty objects)
593 into cohorts of dependent objects, and runs each such cohort
594 in its own thread.
595
596 Future work:
597
598 * Run event thread in parallel to the scheduling thread, and
599 add incoming objects to existing cohorts. Doing so should
600 greatly improve synchronizer performance.
601 * A single object might need to be added to multiple cohorts.
602 In this case, the last cohort handles such an object.
603 * This algorithm is horizontal-scale-ready. Multiple synchronizers
604 could run off a shared runqueue of cohorts.
605
606 """
607
608 def compute_dependent_cohorts(self, objects, deletion):
609 model_map = defaultdict(list)
610 n = len(objects)
611 r = range(n)
612 indexed_objects = zip(r, objects)
613
614 oG = DiGraph()
615
616 for i in r:
617 oG.add_node(i)
618
619 try:
620 for i0 in range(n):
621 for i1 in range(n):
622 if i0 != i1:
623 if deletion:
624 path_args = (objects[i1], objects[i0])
625 else:
626 path_args = (objects[i0], objects[i1])
627
628 is_connected, edge_type = self.concrete_path_exists(*path_args)
629 if is_connected:
630 try:
631 edge_type = oG[i1][i0]["type"]
632 if edge_type == PROXY_EDGE:
633 oG.remove_edge(i1, i0)
634 oG.add_edge(i0, i1, {"type": edge_type})
635 except KeyError:
636 oG.add_edge(i0, i1, {"type": edge_type})
637 except KeyError:
638 pass
639
640 components = weakly_connected_component_subgraphs(oG)
641 cohort_indexes = [reversed(topological_sort(g)) for g in components]
642 cohorts = [
643 [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
644 ]
645
646 return cohorts
647
648 def run_once(self):
649 self.load_dependency_graph()
650
651 try:
652 # Why are we checking the DB connection here?
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800653 self.model_accessor.check_db_connection_okay()
Scott Bakerbba67b62019-01-28 17:38:21 -0800654
655 loop_start = time.time()
656
657 # Two passes. One for sync, the other for deletion.
658 for deletion in (False, True):
659 objects_to_process = []
660
661 objects_to_process, steps_to_process = self.fetch_pending(deletion)
662 dependent_cohorts = self.compute_dependent_cohorts(
663 objects_to_process, deletion
664 )
665
666 threads = []
667 self.log.trace("In run once inner loop", deletion=deletion)
668
669 for cohort in dependent_cohorts:
670 thread = threading.Thread(
671 target=self.sync_cohort,
672 name="synchronizer",
673 args=(cohort, deletion),
674 )
675
676 threads.append(thread)
677
678 # Start threads
679 for t in threads:
680 t.start()
681
682 self.reset_model_accessor()
683
684 # Wait for all threads to finish before continuing with the run
685 # loop
686 for t in threads:
687 t.join()
688
689 # Run legacy synchronizers, which do everything in call()
690 for step in steps_to_process:
691 try:
692 step.call(deletion=deletion)
693 except Exception as e:
694 self.log.exception("Legacy step failed", step=step, e=e)
695
696 loop_end = time.time()
697
698 except Exception as e:
699 self.log.exception(
700 "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
701 e=e,
702 )
703 self.log.error("Exception in observer run loop")