blob: bdff10bf0942a4ae426216ffd9b28995fbdb69d9 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# TODO:
16# Add unit tests:
17# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
18
19import time
20import threading
21import json
22
23from collections import defaultdict
24from networkx import (
25 DiGraph,
26 weakly_connected_component_subgraphs,
27 all_shortest_paths,
28 NetworkXNoPath,
29)
30from networkx.algorithms.dag import topological_sort
31
32from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
Scott Bakerbba67b62019-01-28 17:38:21 -080033
34from xosconfig import Config
35from multistructlog import create_logger
36
37log = create_logger(Config().get("logging"))
38
39
40class StepNotReady(Exception):
41 pass
42
43
44class ExternalDependencyFailed(Exception):
45 pass
46
47
48# FIXME: Move drivers into a context shared across sync steps.
49
50
51class NoOpDriver:
52 def __init__(self):
53 self.enabled = True
54 self.dependency_graph = None
55
56
57# Everyone gets NoOpDriver by default. To use a different driver, call
58# set_driver() below.
59DRIVER = NoOpDriver()
60
61DIRECT_EDGE = 1
62PROXY_EDGE = 2
63
64
65def set_driver(x):
66 global DRIVER
67 DRIVER = x
68
69
70class XOSObserver(object):
71 sync_steps = []
72
Scott Bakerc2fddaa2019-01-30 15:45:03 -080073 def __init__(self, sync_steps, model_accessor, log=log):
Scott Bakerbba67b62019-01-28 17:38:21 -080074 self.log = log
Scott Bakerc2fddaa2019-01-30 15:45:03 -080075 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080076
77 self.step_lookup = {}
78 self.sync_steps = sync_steps
79 self.load_sync_steps()
80
81 self.load_dependency_graph()
82
83 self.event_cond = threading.Condition()
84
85 self.driver = DRIVER
86 self.observer_name = Config.get("name")
87
88 def wait_for_event(self, timeout):
89 self.event_cond.acquire()
90 self.event_cond.wait(timeout)
91 self.event_cond.release()
92
93 def wake_up(self):
94 self.log.debug("Wake up routine called")
95 self.event_cond.acquire()
96 self.event_cond.notify()
97 self.event_cond.release()
98
99 def load_dependency_graph(self):
100
101 try:
102 if Config.get("dependency_graph"):
103 self.log.trace(
104 "Loading model dependency graph",
105 path=Config.get("dependency_graph"),
106 )
107 dep_graph_str = open(Config.get("dependency_graph")).read()
108 else:
109 self.log.trace("Using default model dependency graph", graph={})
110 dep_graph_str = "{}"
111
112 # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
113 # src_port is the field that accesses Model2 from Model1
114 # dst_port is the field that accesses Model1 from Model2
115 static_dependencies = json.loads(dep_graph_str)
116 dynamic_dependencies = self.compute_service_dependencies()
117
118 joint_dependencies = dict(
119 static_dependencies.items() + dynamic_dependencies
120 )
121
122 model_dependency_graph = DiGraph()
123 for src_model, deps in joint_dependencies.items():
124 for dep in deps:
125 dst_model, src_accessor, dst_accessor = dep
126 if src_model != dst_model:
127 edge_label = {
128 "src_accessor": src_accessor,
129 "dst_accessor": dst_accessor,
130 }
131 model_dependency_graph.add_edge(
132 src_model, dst_model, edge_label
133 )
134
135 model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
136 self.model_dependency_graph = {
137 # deletion
138 True: model_dependency_graph_rev,
139 False: model_dependency_graph,
140 }
141 self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
142 except Exception as e:
143 self.log.exception("Error loading dependency graph", e=e)
144 raise e
145
146 def load_sync_steps(self):
147 model_to_step = defaultdict(list)
148 external_dependencies = []
149
150 for s in self.sync_steps:
151 if not isinstance(s.observes, list):
152 observes = [s.observes]
153 else:
154 observes = s.observes
Scott Bakerbba67b62019-01-28 17:38:21 -0800155 for m in observes:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800156 if isinstance(m, str):
157 # observes is a string that names the model
158 model_to_step[m].append(s.__name__)
159 else:
160 # observes is the model class
161 model_to_step[m.__name__].append(s.__name__)
Scott Bakerbba67b62019-01-28 17:38:21 -0800162
163 try:
164 external_dependencies.extend(s.external_dependencies)
165 except AttributeError:
166 pass
167
168 self.step_lookup[s.__name__] = s
169
170 self.model_to_step = model_to_step
171 self.external_dependencies = list(set(external_dependencies))
172 self.log.info(
173 "Loaded external dependencies", external_dependencies=external_dependencies
174 )
175 self.log.info("Loaded model_map", **model_to_step)
176
177 def reset_model_accessor(self, o=None):
178 try:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800179 self.model_accessor.reset_queries()
Scott Bakerbba67b62019-01-28 17:38:21 -0800180 except BaseException:
181 # this shouldn't happen, but in case it does, catch it...
182 if o:
183 logdict = o.tologdict()
184 else:
185 logdict = {}
186
187 self.log.error("exception in reset_queries", **logdict)
188
189 def delete_record(self, o, dr_log=None):
190
191 if dr_log is None:
192 dr_log = self.log
193
194 if getattr(o, "backend_need_reap", False):
195 # the object has already been deleted and marked for reaping
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800196 self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800197 else:
198 step = getattr(o, "synchronizer_step", None)
199 if not step:
200 raise ExternalDependencyFailed
201
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800202 self.model_accessor.journal_object(o, "syncstep.call.delete_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800203
204 dr_log.debug("Deleting object", **o.tologdict())
205
206 step.log = dr_log.new(step=step)
207 step.delete_record(o)
208 step.log = dr_log
209
210 dr_log.debug("Deleted object", **o.tologdict())
211
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800212 self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800213 o.backend_need_reap = True
214 o.save(update_fields=["backend_need_reap"])
215
216 def sync_record(self, o, sr_log=None):
217 try:
218 step = o.synchronizer_step
219 except AttributeError:
220 step = None
221
222 if step is None:
223 raise ExternalDependencyFailed
224
225 if sr_log is None:
226 sr_log = self.log
227
228 # Mark this as an object that will require delete. Do
229 # this now rather than after the syncstep,
230 if not (o.backend_need_delete):
231 o.backend_need_delete = True
232 o.save(update_fields=["backend_need_delete"])
233
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800234 self.model_accessor.journal_object(o, "syncstep.call.sync_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800235
236 sr_log.debug("Syncing object", **o.tologdict())
237
238 step.log = sr_log.new(step=step)
239 step.sync_record(o)
240 step.log = sr_log
241
242 sr_log.debug("Synced object", **o.tologdict())
243
244 o.enacted = max(o.updated, o.changed_by_policy)
245 scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
246 o.backend_register = json.dumps(scratchpad)
247 o.backend_status = "OK"
248 o.backend_code = 1
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800249 self.model_accessor.journal_object(o, "syncstep.call.save_update")
Scott Bakerbba67b62019-01-28 17:38:21 -0800250 o.save(
251 update_fields=[
252 "enacted",
253 "backend_status",
254 "backend_register",
255 "backend_code",
256 ]
257 )
258
259 if hasattr(step, "after_sync_save"):
260 step.log = sr_log.new(step=step)
261 step.after_sync_save(o)
262 step.log = sr_log
263
264 sr_log.info("Saved sync object", o=o)
265
266 """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
267
268 def handle_sync_exception(self, o, e):
269 self.log.exception("sync step failed!", e=e, **o.tologdict())
270 current_code = o.backend_code
271
272 if hasattr(e, "message"):
273 status = str(e.message)
274 else:
275 status = str(e)
276
277 if isinstance(e, InnocuousException):
278 code = 1
279 elif isinstance(e, DeferredException):
280 # NOTE if the synchronization is Deferred it means that synchronization is still in progress
281 code = 0
282 else:
283 code = 2
284
285 self.set_object_error(o, status, code)
286
287 dependency_error = "Failed due to error in model %s id %d: %s" % (
288 o.leaf_model_name,
289 o.id,
290 status,
291 )
292 return dependency_error, code
293
294 def set_object_error(self, o, status, code):
295 if o.backend_status:
296 error_list = o.backend_status.split(" // ")
297 else:
298 error_list = []
299
300 if status not in error_list:
301 error_list.append(status)
302
303 # Keep last two errors
304 error_list = error_list[-2:]
305
306 o.backend_code = code
307 o.backend_status = " // ".join(error_list)
308
309 try:
310 scratchpad = json.loads(o.backend_register)
311 scratchpad["exponent"]
312 except BaseException:
313 scratchpad = {
314 "next_run": 0,
315 "exponent": 0,
316 "last_success": time.time(),
317 "failures": 0,
318 }
319
320 # Second failure
321 if scratchpad["exponent"]:
322 if code == 1:
323 delay = scratchpad["exponent"] * 60 # 1 minute
324 else:
325 delay = scratchpad["exponent"] * 600 # 10 minutes
326
327 # cap delays at 8 hours
328 if delay > 8 * 60 * 60:
329 delay = 8 * 60 * 60
330 scratchpad["next_run"] = time.time() + delay
331
332 scratchpad["exponent"] += 1
333
334 try:
335 scratchpad["failures"] += 1
336 except KeyError:
337 scratchpad["failures"] = 1
338
339 scratchpad["last_failure"] = time.time()
340
341 o.backend_register = json.dumps(scratchpad)
342
343 # TOFIX:
344 # DatabaseError: value too long for type character varying(140)
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800345 if self.model_accessor.obj_exists(o):
Scott Bakerbba67b62019-01-28 17:38:21 -0800346 try:
347 o.backend_status = o.backend_status[:1024]
348 o.save(
349 update_fields=["backend_status", "backend_register"],
350 always_update_timestamp=True,
351 )
352 except BaseException as e:
353 self.log.exception("Could not update backend status field!", e=e)
354 pass
355
356 def sync_cohort(self, cohort, deletion):
357 threading.current_thread().is_sync_thread = True
358
359 sc_log = self.log.new(thread_id=threading.current_thread().ident)
360
361 try:
362 start_time = time.time()
363 sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
364
365 cohort_emptied = False
366 dependency_error = None
367 dependency_error_code = None
368
369 itty = iter(cohort)
370
371 while not cohort_emptied:
372 try:
373 self.reset_model_accessor()
374 o = next(itty)
375
376 if dependency_error:
377 self.set_object_error(
378 o, dependency_error, dependency_error_code
379 )
380 continue
381
382 try:
383 if deletion:
384 self.delete_record(o, sc_log)
385 else:
386 self.sync_record(o, sc_log)
387 except ExternalDependencyFailed:
388 dependency_error = (
389 "External dependency on object %s id %d not met"
390 % (o.leaf_model_name, o.id)
391 )
392 dependency_error_code = 1
393 except (DeferredException, InnocuousException, Exception) as e:
394 dependency_error, dependency_error_code = self.handle_sync_exception(
395 o, e
396 )
397
398 except StopIteration:
399 sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
400 cohort_emptied = True
401 finally:
402 self.reset_model_accessor()
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800403 self.model_accessor.connection_close()
Scott Bakerbba67b62019-01-28 17:38:21 -0800404
405 def tenant_class_name_from_service(self, service_name):
406 """ This code supports legacy functionality. To be cleaned up. """
407 name1 = service_name + "Instance"
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800408 if hasattr(self.model_accessor.Slice().stub, name1):
Scott Bakerbba67b62019-01-28 17:38:21 -0800409 return name1
410 else:
411 name2 = service_name.replace("Service", "Tenant")
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800412 if hasattr(self.model_accessor.Slice().stub, name2):
Scott Bakerbba67b62019-01-28 17:38:21 -0800413 return name2
414 else:
415 return None
416
417 def compute_service_dependencies(self):
418 """ FIXME: Implement more cleanly via xproto """
419
420 model_names = self.model_to_step.keys()
421 ugly_tuples = [
422 (m, m.replace("Instance", "").replace("Tenant", "Service"))
423 for m in model_names
424 if m.endswith("ServiceInstance") or m.endswith("Tenant")
425 ]
426 ugly_rtuples = [(v, k) for k, v in ugly_tuples]
427
428 ugly_map = dict(ugly_tuples)
429 ugly_rmap = dict(ugly_rtuples)
430
431 s_model_names = [v for k, v in ugly_tuples]
432 s_models0 = [
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800433 getattr(self.model_accessor.Slice().stub, model_name, None) for model_name in s_model_names
Scott Bakerbba67b62019-01-28 17:38:21 -0800434 ]
435 s_models1 = [model.objects.first() for model in s_models0]
436 s_models = [m for m in s_models1 if m is not None]
437
438 dependencies = []
439 for model in s_models:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800440 deps = self.model_accessor.ServiceDependency.objects.filter(subscriber_service_id=model.id)
Scott Bakerbba67b62019-01-28 17:38:21 -0800441 if deps:
442 services = [
443 self.tenant_class_name_from_service(
444 d.provider_service.leaf_model_name
445 )
446 for d in deps
447 ]
448 dependencies.append(
449 (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
450 )
451
452 return dependencies
453
454 def compute_service_instance_dependencies(self, objects):
455 link_set = [
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800456 self.model_accessor.ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
Scott Bakerbba67b62019-01-28 17:38:21 -0800457 for o in objects
458 ]
459
460 dependencies = [
461 (l.provider_service_instance, l.subscriber_service_instance)
462 for links in link_set
463 for l in links
464 ]
465 providers = []
466
467 for p, s in dependencies:
468 if not p.enacted or p.enacted < p.updated:
469 p.dependent = s
470 providers.append(p)
471
472 return providers
473
474 def run(self):
475 # Cleanup: Move self.driver into a synchronizer context
476 # made available to every sync step.
477 if not self.driver.enabled:
478 self.log.warning("Driver is not enabled. Not running sync steps.")
479 return
480
481 while True:
482 self.log.trace("Waiting for event or timeout")
483 self.wait_for_event(timeout=5)
484 self.log.trace("Synchronizer awake")
485
486 self.run_once()
487
488 def fetch_pending(self, deletion=False):
489 unique_model_list = list(set(self.model_to_step.keys()))
490 pending_objects = []
491 pending_steps = []
492 step_list = self.step_lookup.values()
493
494 for e in self.external_dependencies:
495 s = SyncStep
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800496 if isinstance(e,str):
497 # external dependency is a string that names a model class
498 s.observes = self.model_accessor.get_model_class(e)
499 else:
500 # external dependency is a model class
501 s.observes = e
Scott Bakerbba67b62019-01-28 17:38:21 -0800502 step_list.append(s)
503
504 for step_class in step_list:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800505 step = step_class(driver=self.driver, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800506 step.log = self.log.new(step=step)
507
508 if not hasattr(step, "call"):
509 pending = step.fetch_pending(deletion)
510 for obj in pending:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800511 step = step_class(driver=self.driver, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800512 step.log = self.log.new(step=step)
513 obj.synchronizer_step = step
514
515 pending_service_dependencies = self.compute_service_instance_dependencies(
516 pending
517 )
518
519 for obj in pending_service_dependencies:
520 obj.synchronizer_step = None
521
522 pending_objects.extend(pending)
523 pending_objects.extend(pending_service_dependencies)
524 else:
525 # Support old and broken legacy synchronizers
526 # This needs to be dropped soon.
527 pending_steps.append(step)
528
529 self.log.trace(
530 "Fetched pending data",
531 pending_objects=pending_objects,
532 legacy_steps=pending_steps,
533 )
534 return pending_objects, pending_steps
535
536 def linked_objects(self, o):
537 if o is None:
538 return [], None
539 try:
540 o_lst = [o for o in o.all()]
541 edge_type = PROXY_EDGE
542 except (AttributeError, TypeError):
543 o_lst = [o]
544 edge_type = DIRECT_EDGE
545 return o_lst, edge_type
546
547 """ Automatically test if a real dependency path exists between two objects. e.g.
548 given an Instance, and a ControllerSite, the test amounts to:
549 instance.slice.site == controller.site
550
551 Then the two objects are related, and should be put in the same cohort.
552 If the models of the two objects are not dependent, then the check trivially
553 returns False.
554 """
555
556 def same_object(self, o1, o2):
557 if not o1 or not o2:
558 return False, None
559
560 o1_lst, edge_type = self.linked_objects(o1)
561
562 try:
563 found = next(
564 obj
565 for obj in o1_lst
566 if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
567 )
568 except AttributeError as e:
569 self.log.exception("Compared objects could not be identified", e=e)
570 raise e
571 except StopIteration:
572 # This is a temporary workaround to establish dependencies between
573 # deleted proxy objects. A better solution would be for the ORM to
574 # return the set of deleted objects via foreign keys. At that point,
575 # the following line would change back to found = False
576 # - Sapan
577
578 found = getattr(o2, "deleted", False)
579
580 return found, edge_type
581
582 def concrete_path_exists(self, o1, o2):
583 try:
584 m1 = o1.leaf_model_name
585 m2 = o2.leaf_model_name
586 except AttributeError:
587 # One of the nodes is not in the dependency graph
588 # No dependency
589 return False, None
590
591 if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
592 return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
593
594 # FIXME: Dynamic dependency check
595 G = self.model_dependency_graph[False]
596 paths = all_shortest_paths(G, m1, m2)
597
598 try:
599 any(paths)
600 paths = all_shortest_paths(G, m1, m2)
601 except NetworkXNoPath:
602 # Easy. The two models are unrelated.
603 return False, None
604
605 for p in paths:
606 src_object = o1
607 edge_type = DIRECT_EDGE
608
609 for i in range(len(p) - 1):
610 src = p[i]
611 dst = p[i + 1]
612 edge_label = G[src][dst]
613 sa = edge_label["src_accessor"]
614 try:
615 dst_accessor = getattr(src_object, sa)
616 dst_objects, link_edge_type = self.linked_objects(dst_accessor)
617 if link_edge_type == PROXY_EDGE:
618 edge_type = link_edge_type
619
620 """
621
622 True If no linked objects and deletion
623 False If no linked objects
624 True If multiple linked objects
625 <continue traversal> If single linked object
626
627 """
628
629 if dst_objects == []:
630 # Workaround for ORM not returning linked deleted
631 # objects
632 if o2.deleted:
633 return True, edge_type
634 else:
635 dst_object = None
636 elif len(dst_objects) > 1:
637 # Multiple linked objects. Assume anything could be among those multiple objects.
638 raise AttributeError
639 else:
640 dst_object = dst_objects[0]
641 except AttributeError as e:
642 if sa != "fake_accessor":
643 self.log.debug(
644 "Could not check object dependencies, making conservative choice %s",
645 e,
646 src_object=src_object,
647 sa=sa,
648 o1=o1,
649 o2=o2,
650 )
651 return True, edge_type
652
653 src_object = dst_object
654
655 if not src_object:
656 break
657
658 verdict, edge_type = self.same_object(src_object, o2)
659 if verdict:
660 return verdict, edge_type
661
662 # Otherwise try other paths
663
664 return False, None
665
666 """
667
668 This function implements the main scheduling logic
669 of the Synchronizer. It divides incoming work (dirty objects)
670 into cohorts of dependent objects, and runs each such cohort
671 in its own thread.
672
673 Future work:
674
675 * Run event thread in parallel to the scheduling thread, and
676 add incoming objects to existing cohorts. Doing so should
677 greatly improve synchronizer performance.
678 * A single object might need to be added to multiple cohorts.
679 In this case, the last cohort handles such an object.
680 * This algorithm is horizontal-scale-ready. Multiple synchronizers
681 could run off a shared runqueue of cohorts.
682
683 """
684
685 def compute_dependent_cohorts(self, objects, deletion):
686 model_map = defaultdict(list)
687 n = len(objects)
688 r = range(n)
689 indexed_objects = zip(r, objects)
690
691 oG = DiGraph()
692
693 for i in r:
694 oG.add_node(i)
695
696 try:
697 for i0 in range(n):
698 for i1 in range(n):
699 if i0 != i1:
700 if deletion:
701 path_args = (objects[i1], objects[i0])
702 else:
703 path_args = (objects[i0], objects[i1])
704
705 is_connected, edge_type = self.concrete_path_exists(*path_args)
706 if is_connected:
707 try:
708 edge_type = oG[i1][i0]["type"]
709 if edge_type == PROXY_EDGE:
710 oG.remove_edge(i1, i0)
711 oG.add_edge(i0, i1, {"type": edge_type})
712 except KeyError:
713 oG.add_edge(i0, i1, {"type": edge_type})
714 except KeyError:
715 pass
716
717 components = weakly_connected_component_subgraphs(oG)
718 cohort_indexes = [reversed(topological_sort(g)) for g in components]
719 cohorts = [
720 [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
721 ]
722
723 return cohorts
724
725 def run_once(self):
726 self.load_dependency_graph()
727
728 try:
729 # Why are we checking the DB connection here?
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800730 self.model_accessor.check_db_connection_okay()
Scott Bakerbba67b62019-01-28 17:38:21 -0800731
732 loop_start = time.time()
733
734 # Two passes. One for sync, the other for deletion.
735 for deletion in (False, True):
736 objects_to_process = []
737
738 objects_to_process, steps_to_process = self.fetch_pending(deletion)
739 dependent_cohorts = self.compute_dependent_cohorts(
740 objects_to_process, deletion
741 )
742
743 threads = []
744 self.log.trace("In run once inner loop", deletion=deletion)
745
746 for cohort in dependent_cohorts:
747 thread = threading.Thread(
748 target=self.sync_cohort,
749 name="synchronizer",
750 args=(cohort, deletion),
751 )
752
753 threads.append(thread)
754
755 # Start threads
756 for t in threads:
757 t.start()
758
759 self.reset_model_accessor()
760
761 # Wait for all threads to finish before continuing with the run
762 # loop
763 for t in threads:
764 t.join()
765
766 # Run legacy synchronizers, which do everything in call()
767 for step in steps_to_process:
768 try:
769 step.call(deletion=deletion)
770 except Exception as e:
771 self.log.exception("Legacy step failed", step=step, e=e)
772
773 loop_end = time.time()
774
775 except Exception as e:
776 self.log.exception(
777 "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
778 e=e,
779 )
780 self.log.error("Exception in observer run loop")