blob: 96ce72717e61039e1ba7c9df330e8af6b063d7b8 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# TODO:
16# Add unit tests:
17# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
18
19import time
20import threading
21import json
22
23from collections import defaultdict
24from networkx import (
25 DiGraph,
26 weakly_connected_component_subgraphs,
27 all_shortest_paths,
28 NetworkXNoPath,
29)
30from networkx.algorithms.dag import topological_sort
31
32from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
33from xossynchronizer.modelaccessor import *
34
35from xosconfig import Config
36from multistructlog import create_logger
37
38log = create_logger(Config().get("logging"))
39
40
41class StepNotReady(Exception):
42 pass
43
44
45class ExternalDependencyFailed(Exception):
46 pass
47
48
49# FIXME: Move drivers into a context shared across sync steps.
50
51
52class NoOpDriver:
53 def __init__(self):
54 self.enabled = True
55 self.dependency_graph = None
56
57
58# Everyone gets NoOpDriver by default. To use a different driver, call
59# set_driver() below.
60DRIVER = NoOpDriver()
61
62DIRECT_EDGE = 1
63PROXY_EDGE = 2
64
65
66def set_driver(x):
67 global DRIVER
68 DRIVER = x
69
70
71class XOSObserver(object):
72 sync_steps = []
73
74 def __init__(self, sync_steps, log=log):
75 # The Condition object via which events are received
76 self.log = log
77
78 self.step_lookup = {}
79 self.sync_steps = sync_steps
80 self.load_sync_steps()
81
82 self.load_dependency_graph()
83
84 self.event_cond = threading.Condition()
85
86 self.driver = DRIVER
87 self.observer_name = Config.get("name")
88
89 def wait_for_event(self, timeout):
90 self.event_cond.acquire()
91 self.event_cond.wait(timeout)
92 self.event_cond.release()
93
94 def wake_up(self):
95 self.log.debug("Wake up routine called")
96 self.event_cond.acquire()
97 self.event_cond.notify()
98 self.event_cond.release()
99
100 def load_dependency_graph(self):
101
102 try:
103 if Config.get("dependency_graph"):
104 self.log.trace(
105 "Loading model dependency graph",
106 path=Config.get("dependency_graph"),
107 )
108 dep_graph_str = open(Config.get("dependency_graph")).read()
109 else:
110 self.log.trace("Using default model dependency graph", graph={})
111 dep_graph_str = "{}"
112
113 # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
114 # src_port is the field that accesses Model2 from Model1
115 # dst_port is the field that accesses Model1 from Model2
116 static_dependencies = json.loads(dep_graph_str)
117 dynamic_dependencies = self.compute_service_dependencies()
118
119 joint_dependencies = dict(
120 static_dependencies.items() + dynamic_dependencies
121 )
122
123 model_dependency_graph = DiGraph()
124 for src_model, deps in joint_dependencies.items():
125 for dep in deps:
126 dst_model, src_accessor, dst_accessor = dep
127 if src_model != dst_model:
128 edge_label = {
129 "src_accessor": src_accessor,
130 "dst_accessor": dst_accessor,
131 }
132 model_dependency_graph.add_edge(
133 src_model, dst_model, edge_label
134 )
135
136 model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
137 self.model_dependency_graph = {
138 # deletion
139 True: model_dependency_graph_rev,
140 False: model_dependency_graph,
141 }
142 self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
143 except Exception as e:
144 self.log.exception("Error loading dependency graph", e=e)
145 raise e
146
147 def load_sync_steps(self):
148 model_to_step = defaultdict(list)
149 external_dependencies = []
150
151 for s in self.sync_steps:
152 if not isinstance(s.observes, list):
153 observes = [s.observes]
154 else:
155 observes = s.observes
156
157 for m in observes:
158 model_to_step[m.__name__].append(s.__name__)
159
160 try:
161 external_dependencies.extend(s.external_dependencies)
162 except AttributeError:
163 pass
164
165 self.step_lookup[s.__name__] = s
166
167 self.model_to_step = model_to_step
168 self.external_dependencies = list(set(external_dependencies))
169 self.log.info(
170 "Loaded external dependencies", external_dependencies=external_dependencies
171 )
172 self.log.info("Loaded model_map", **model_to_step)
173
174 def reset_model_accessor(self, o=None):
175 try:
176 model_accessor.reset_queries()
177 except BaseException:
178 # this shouldn't happen, but in case it does, catch it...
179 if o:
180 logdict = o.tologdict()
181 else:
182 logdict = {}
183
184 self.log.error("exception in reset_queries", **logdict)
185
186 def delete_record(self, o, dr_log=None):
187
188 if dr_log is None:
189 dr_log = self.log
190
191 if getattr(o, "backend_need_reap", False):
192 # the object has already been deleted and marked for reaping
193 model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
194 else:
195 step = getattr(o, "synchronizer_step", None)
196 if not step:
197 raise ExternalDependencyFailed
198
199 model_accessor.journal_object(o, "syncstep.call.delete_record")
200
201 dr_log.debug("Deleting object", **o.tologdict())
202
203 step.log = dr_log.new(step=step)
204 step.delete_record(o)
205 step.log = dr_log
206
207 dr_log.debug("Deleted object", **o.tologdict())
208
209 model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
210 o.backend_need_reap = True
211 o.save(update_fields=["backend_need_reap"])
212
213 def sync_record(self, o, sr_log=None):
214 try:
215 step = o.synchronizer_step
216 except AttributeError:
217 step = None
218
219 if step is None:
220 raise ExternalDependencyFailed
221
222 if sr_log is None:
223 sr_log = self.log
224
225 # Mark this as an object that will require delete. Do
226 # this now rather than after the syncstep,
227 if not (o.backend_need_delete):
228 o.backend_need_delete = True
229 o.save(update_fields=["backend_need_delete"])
230
231 model_accessor.journal_object(o, "syncstep.call.sync_record")
232
233 sr_log.debug("Syncing object", **o.tologdict())
234
235 step.log = sr_log.new(step=step)
236 step.sync_record(o)
237 step.log = sr_log
238
239 sr_log.debug("Synced object", **o.tologdict())
240
241 o.enacted = max(o.updated, o.changed_by_policy)
242 scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
243 o.backend_register = json.dumps(scratchpad)
244 o.backend_status = "OK"
245 o.backend_code = 1
246 model_accessor.journal_object(o, "syncstep.call.save_update")
247 o.save(
248 update_fields=[
249 "enacted",
250 "backend_status",
251 "backend_register",
252 "backend_code",
253 ]
254 )
255
256 if hasattr(step, "after_sync_save"):
257 step.log = sr_log.new(step=step)
258 step.after_sync_save(o)
259 step.log = sr_log
260
261 sr_log.info("Saved sync object", o=o)
262
263 """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
264
265 def handle_sync_exception(self, o, e):
266 self.log.exception("sync step failed!", e=e, **o.tologdict())
267 current_code = o.backend_code
268
269 if hasattr(e, "message"):
270 status = str(e.message)
271 else:
272 status = str(e)
273
274 if isinstance(e, InnocuousException):
275 code = 1
276 elif isinstance(e, DeferredException):
277 # NOTE if the synchronization is Deferred it means that synchronization is still in progress
278 code = 0
279 else:
280 code = 2
281
282 self.set_object_error(o, status, code)
283
284 dependency_error = "Failed due to error in model %s id %d: %s" % (
285 o.leaf_model_name,
286 o.id,
287 status,
288 )
289 return dependency_error, code
290
291 def set_object_error(self, o, status, code):
292 if o.backend_status:
293 error_list = o.backend_status.split(" // ")
294 else:
295 error_list = []
296
297 if status not in error_list:
298 error_list.append(status)
299
300 # Keep last two errors
301 error_list = error_list[-2:]
302
303 o.backend_code = code
304 o.backend_status = " // ".join(error_list)
305
306 try:
307 scratchpad = json.loads(o.backend_register)
308 scratchpad["exponent"]
309 except BaseException:
310 scratchpad = {
311 "next_run": 0,
312 "exponent": 0,
313 "last_success": time.time(),
314 "failures": 0,
315 }
316
317 # Second failure
318 if scratchpad["exponent"]:
319 if code == 1:
320 delay = scratchpad["exponent"] * 60 # 1 minute
321 else:
322 delay = scratchpad["exponent"] * 600 # 10 minutes
323
324 # cap delays at 8 hours
325 if delay > 8 * 60 * 60:
326 delay = 8 * 60 * 60
327 scratchpad["next_run"] = time.time() + delay
328
329 scratchpad["exponent"] += 1
330
331 try:
332 scratchpad["failures"] += 1
333 except KeyError:
334 scratchpad["failures"] = 1
335
336 scratchpad["last_failure"] = time.time()
337
338 o.backend_register = json.dumps(scratchpad)
339
340 # TOFIX:
341 # DatabaseError: value too long for type character varying(140)
342 if model_accessor.obj_exists(o):
343 try:
344 o.backend_status = o.backend_status[:1024]
345 o.save(
346 update_fields=["backend_status", "backend_register"],
347 always_update_timestamp=True,
348 )
349 except BaseException as e:
350 self.log.exception("Could not update backend status field!", e=e)
351 pass
352
353 def sync_cohort(self, cohort, deletion):
354 threading.current_thread().is_sync_thread = True
355
356 sc_log = self.log.new(thread_id=threading.current_thread().ident)
357
358 try:
359 start_time = time.time()
360 sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
361
362 cohort_emptied = False
363 dependency_error = None
364 dependency_error_code = None
365
366 itty = iter(cohort)
367
368 while not cohort_emptied:
369 try:
370 self.reset_model_accessor()
371 o = next(itty)
372
373 if dependency_error:
374 self.set_object_error(
375 o, dependency_error, dependency_error_code
376 )
377 continue
378
379 try:
380 if deletion:
381 self.delete_record(o, sc_log)
382 else:
383 self.sync_record(o, sc_log)
384 except ExternalDependencyFailed:
385 dependency_error = (
386 "External dependency on object %s id %d not met"
387 % (o.leaf_model_name, o.id)
388 )
389 dependency_error_code = 1
390 except (DeferredException, InnocuousException, Exception) as e:
391 dependency_error, dependency_error_code = self.handle_sync_exception(
392 o, e
393 )
394
395 except StopIteration:
396 sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
397 cohort_emptied = True
398 finally:
399 self.reset_model_accessor()
400 model_accessor.connection_close()
401
402 def tenant_class_name_from_service(self, service_name):
403 """ This code supports legacy functionality. To be cleaned up. """
404 name1 = service_name + "Instance"
405 if hasattr(Slice().stub, name1):
406 return name1
407 else:
408 name2 = service_name.replace("Service", "Tenant")
409 if hasattr(Slice().stub, name2):
410 return name2
411 else:
412 return None
413
414 def compute_service_dependencies(self):
415 """ FIXME: Implement more cleanly via xproto """
416
417 model_names = self.model_to_step.keys()
418 ugly_tuples = [
419 (m, m.replace("Instance", "").replace("Tenant", "Service"))
420 for m in model_names
421 if m.endswith("ServiceInstance") or m.endswith("Tenant")
422 ]
423 ugly_rtuples = [(v, k) for k, v in ugly_tuples]
424
425 ugly_map = dict(ugly_tuples)
426 ugly_rmap = dict(ugly_rtuples)
427
428 s_model_names = [v for k, v in ugly_tuples]
429 s_models0 = [
430 getattr(Slice().stub, model_name, None) for model_name in s_model_names
431 ]
432 s_models1 = [model.objects.first() for model in s_models0]
433 s_models = [m for m in s_models1 if m is not None]
434
435 dependencies = []
436 for model in s_models:
437 deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
438 if deps:
439 services = [
440 self.tenant_class_name_from_service(
441 d.provider_service.leaf_model_name
442 )
443 for d in deps
444 ]
445 dependencies.append(
446 (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
447 )
448
449 return dependencies
450
451 def compute_service_instance_dependencies(self, objects):
452 link_set = [
453 ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
454 for o in objects
455 ]
456
457 dependencies = [
458 (l.provider_service_instance, l.subscriber_service_instance)
459 for links in link_set
460 for l in links
461 ]
462 providers = []
463
464 for p, s in dependencies:
465 if not p.enacted or p.enacted < p.updated:
466 p.dependent = s
467 providers.append(p)
468
469 return providers
470
471 def run(self):
472 # Cleanup: Move self.driver into a synchronizer context
473 # made available to every sync step.
474 if not self.driver.enabled:
475 self.log.warning("Driver is not enabled. Not running sync steps.")
476 return
477
478 while True:
479 self.log.trace("Waiting for event or timeout")
480 self.wait_for_event(timeout=5)
481 self.log.trace("Synchronizer awake")
482
483 self.run_once()
484
485 def fetch_pending(self, deletion=False):
486 unique_model_list = list(set(self.model_to_step.keys()))
487 pending_objects = []
488 pending_steps = []
489 step_list = self.step_lookup.values()
490
491 for e in self.external_dependencies:
492 s = SyncStep
493 s.observes = e
494 step_list.append(s)
495
496 for step_class in step_list:
497 step = step_class(driver=self.driver)
498 step.log = self.log.new(step=step)
499
500 if not hasattr(step, "call"):
501 pending = step.fetch_pending(deletion)
502 for obj in pending:
503 step = step_class(driver=self.driver)
504 step.log = self.log.new(step=step)
505 obj.synchronizer_step = step
506
507 pending_service_dependencies = self.compute_service_instance_dependencies(
508 pending
509 )
510
511 for obj in pending_service_dependencies:
512 obj.synchronizer_step = None
513
514 pending_objects.extend(pending)
515 pending_objects.extend(pending_service_dependencies)
516 else:
517 # Support old and broken legacy synchronizers
518 # This needs to be dropped soon.
519 pending_steps.append(step)
520
521 self.log.trace(
522 "Fetched pending data",
523 pending_objects=pending_objects,
524 legacy_steps=pending_steps,
525 )
526 return pending_objects, pending_steps
527
528 def linked_objects(self, o):
529 if o is None:
530 return [], None
531 try:
532 o_lst = [o for o in o.all()]
533 edge_type = PROXY_EDGE
534 except (AttributeError, TypeError):
535 o_lst = [o]
536 edge_type = DIRECT_EDGE
537 return o_lst, edge_type
538
539 """ Automatically test if a real dependency path exists between two objects. e.g.
540 given an Instance, and a ControllerSite, the test amounts to:
541 instance.slice.site == controller.site
542
543 Then the two objects are related, and should be put in the same cohort.
544 If the models of the two objects are not dependent, then the check trivially
545 returns False.
546 """
547
548 def same_object(self, o1, o2):
549 if not o1 or not o2:
550 return False, None
551
552 o1_lst, edge_type = self.linked_objects(o1)
553
554 try:
555 found = next(
556 obj
557 for obj in o1_lst
558 if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
559 )
560 except AttributeError as e:
561 self.log.exception("Compared objects could not be identified", e=e)
562 raise e
563 except StopIteration:
564 # This is a temporary workaround to establish dependencies between
565 # deleted proxy objects. A better solution would be for the ORM to
566 # return the set of deleted objects via foreign keys. At that point,
567 # the following line would change back to found = False
568 # - Sapan
569
570 found = getattr(o2, "deleted", False)
571
572 return found, edge_type
573
574 def concrete_path_exists(self, o1, o2):
575 try:
576 m1 = o1.leaf_model_name
577 m2 = o2.leaf_model_name
578 except AttributeError:
579 # One of the nodes is not in the dependency graph
580 # No dependency
581 return False, None
582
583 if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
584 return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
585
586 # FIXME: Dynamic dependency check
587 G = self.model_dependency_graph[False]
588 paths = all_shortest_paths(G, m1, m2)
589
590 try:
591 any(paths)
592 paths = all_shortest_paths(G, m1, m2)
593 except NetworkXNoPath:
594 # Easy. The two models are unrelated.
595 return False, None
596
597 for p in paths:
598 src_object = o1
599 edge_type = DIRECT_EDGE
600
601 for i in range(len(p) - 1):
602 src = p[i]
603 dst = p[i + 1]
604 edge_label = G[src][dst]
605 sa = edge_label["src_accessor"]
606 try:
607 dst_accessor = getattr(src_object, sa)
608 dst_objects, link_edge_type = self.linked_objects(dst_accessor)
609 if link_edge_type == PROXY_EDGE:
610 edge_type = link_edge_type
611
612 """
613
614 True If no linked objects and deletion
615 False If no linked objects
616 True If multiple linked objects
617 <continue traversal> If single linked object
618
619 """
620
621 if dst_objects == []:
622 # Workaround for ORM not returning linked deleted
623 # objects
624 if o2.deleted:
625 return True, edge_type
626 else:
627 dst_object = None
628 elif len(dst_objects) > 1:
629 # Multiple linked objects. Assume anything could be among those multiple objects.
630 raise AttributeError
631 else:
632 dst_object = dst_objects[0]
633 except AttributeError as e:
634 if sa != "fake_accessor":
635 self.log.debug(
636 "Could not check object dependencies, making conservative choice %s",
637 e,
638 src_object=src_object,
639 sa=sa,
640 o1=o1,
641 o2=o2,
642 )
643 return True, edge_type
644
645 src_object = dst_object
646
647 if not src_object:
648 break
649
650 verdict, edge_type = self.same_object(src_object, o2)
651 if verdict:
652 return verdict, edge_type
653
654 # Otherwise try other paths
655
656 return False, None
657
658 """
659
660 This function implements the main scheduling logic
661 of the Synchronizer. It divides incoming work (dirty objects)
662 into cohorts of dependent objects, and runs each such cohort
663 in its own thread.
664
665 Future work:
666
667 * Run event thread in parallel to the scheduling thread, and
668 add incoming objects to existing cohorts. Doing so should
669 greatly improve synchronizer performance.
670 * A single object might need to be added to multiple cohorts.
671 In this case, the last cohort handles such an object.
672 * This algorithm is horizontal-scale-ready. Multiple synchronizers
673 could run off a shared runqueue of cohorts.
674
675 """
676
677 def compute_dependent_cohorts(self, objects, deletion):
678 model_map = defaultdict(list)
679 n = len(objects)
680 r = range(n)
681 indexed_objects = zip(r, objects)
682
683 oG = DiGraph()
684
685 for i in r:
686 oG.add_node(i)
687
688 try:
689 for i0 in range(n):
690 for i1 in range(n):
691 if i0 != i1:
692 if deletion:
693 path_args = (objects[i1], objects[i0])
694 else:
695 path_args = (objects[i0], objects[i1])
696
697 is_connected, edge_type = self.concrete_path_exists(*path_args)
698 if is_connected:
699 try:
700 edge_type = oG[i1][i0]["type"]
701 if edge_type == PROXY_EDGE:
702 oG.remove_edge(i1, i0)
703 oG.add_edge(i0, i1, {"type": edge_type})
704 except KeyError:
705 oG.add_edge(i0, i1, {"type": edge_type})
706 except KeyError:
707 pass
708
709 components = weakly_connected_component_subgraphs(oG)
710 cohort_indexes = [reversed(topological_sort(g)) for g in components]
711 cohorts = [
712 [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
713 ]
714
715 return cohorts
716
717 def run_once(self):
718 self.load_dependency_graph()
719
720 try:
721 # Why are we checking the DB connection here?
722 model_accessor.check_db_connection_okay()
723
724 loop_start = time.time()
725
726 # Two passes. One for sync, the other for deletion.
727 for deletion in (False, True):
728 objects_to_process = []
729
730 objects_to_process, steps_to_process = self.fetch_pending(deletion)
731 dependent_cohorts = self.compute_dependent_cohorts(
732 objects_to_process, deletion
733 )
734
735 threads = []
736 self.log.trace("In run once inner loop", deletion=deletion)
737
738 for cohort in dependent_cohorts:
739 thread = threading.Thread(
740 target=self.sync_cohort,
741 name="synchronizer",
742 args=(cohort, deletion),
743 )
744
745 threads.append(thread)
746
747 # Start threads
748 for t in threads:
749 t.start()
750
751 self.reset_model_accessor()
752
753 # Wait for all threads to finish before continuing with the run
754 # loop
755 for t in threads:
756 t.join()
757
758 # Run legacy synchronizers, which do everything in call()
759 for step in steps_to_process:
760 try:
761 step.call(deletion=deletion)
762 except Exception as e:
763 self.log.exception("Legacy step failed", step=step, e=e)
764
765 loop_end = time.time()
766
767 except Exception as e:
768 self.log.exception(
769 "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
770 e=e,
771 )
772 self.log.error("Exception in observer run loop")