blob: 21040c317d1c07376a1dfa268d69d77af4fc3faa [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# TODO:
16# Add unit tests:
17# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
18
Zack Williams5c2ea232019-01-30 15:23:01 -070019from __future__ import absolute_import
Scott Bakerbba67b62019-01-28 17:38:21 -080020
Zack Williams5c2ea232019-01-30 15:23:01 -070021import json
22import threading
23import time
Scott Bakerbba67b62019-01-28 17:38:21 -080024from collections import defaultdict
Zack Williams5c2ea232019-01-30 15:23:01 -070025
26from multistructlog import create_logger
Scott Bakerbba67b62019-01-28 17:38:21 -080027from networkx import (
28 DiGraph,
Scott Bakerbba67b62019-01-28 17:38:21 -080029 NetworkXNoPath,
Scott Baker810c44a2019-03-28 08:01:50 -070030 NodeNotFound,
Zack Williams5c2ea232019-01-30 15:23:01 -070031 all_shortest_paths,
32 weakly_connected_component_subgraphs,
Scott Bakerbba67b62019-01-28 17:38:21 -080033)
34from networkx.algorithms.dag import topological_sort
Scott Bakerbba67b62019-01-28 17:38:21 -080035from xosconfig import Config
Zack Williams5c2ea232019-01-30 15:23:01 -070036from xossynchronizer.steps.syncstep import (
37 DeferredException,
38 InnocuousException,
39 SyncStep,
40)
41from six.moves import range
Scott Bakerbba67b62019-01-28 17:38:21 -080042
43log = create_logger(Config().get("logging"))
44
45
46class StepNotReady(Exception):
47 pass
48
49
50class ExternalDependencyFailed(Exception):
51 pass
52
53
54# FIXME: Move drivers into a context shared across sync steps.
55
56
57class NoOpDriver:
58 def __init__(self):
59 self.enabled = True
60 self.dependency_graph = None
61
62
63# Everyone gets NoOpDriver by default. To use a different driver, call
64# set_driver() below.
65DRIVER = NoOpDriver()
66
67DIRECT_EDGE = 1
68PROXY_EDGE = 2
69
70
71def set_driver(x):
72 global DRIVER
73 DRIVER = x
74
75
76class XOSObserver(object):
77 sync_steps = []
78
Scott Bakerc2fddaa2019-01-30 15:45:03 -080079 def __init__(self, sync_steps, model_accessor, log=log):
Scott Bakerbba67b62019-01-28 17:38:21 -080080 self.log = log
Scott Bakerc2fddaa2019-01-30 15:45:03 -080081 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080082
83 self.step_lookup = {}
84 self.sync_steps = sync_steps
85 self.load_sync_steps()
86
87 self.load_dependency_graph()
88
89 self.event_cond = threading.Condition()
90
91 self.driver = DRIVER
92 self.observer_name = Config.get("name")
93
94 def wait_for_event(self, timeout):
95 self.event_cond.acquire()
96 self.event_cond.wait(timeout)
97 self.event_cond.release()
98
99 def wake_up(self):
100 self.log.debug("Wake up routine called")
101 self.event_cond.acquire()
102 self.event_cond.notify()
103 self.event_cond.release()
104
105 def load_dependency_graph(self):
106
107 try:
108 if Config.get("dependency_graph"):
Zack Williamsda69db22019-01-29 16:44:52 -0700109 self.log.debug(
Scott Bakerbba67b62019-01-28 17:38:21 -0800110 "Loading model dependency graph",
111 path=Config.get("dependency_graph"),
112 )
113 dep_graph_str = open(Config.get("dependency_graph")).read()
114 else:
Zack Williamsda69db22019-01-29 16:44:52 -0700115 self.log.debug("Using default model dependency graph", graph={})
Scott Bakerbba67b62019-01-28 17:38:21 -0800116 dep_graph_str = "{}"
117
118 # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
119 # src_port is the field that accesses Model2 from Model1
120 # dst_port is the field that accesses Model1 from Model2
121 static_dependencies = json.loads(dep_graph_str)
Zack Williams5c2ea232019-01-30 15:23:01 -0700122 dynamic_dependencies = (
123 []
124 ) # Dropped Service and ServiceInstance dynamic dependencies
Scott Bakerbba67b62019-01-28 17:38:21 -0800125
126 joint_dependencies = dict(
Zack Williams5c2ea232019-01-30 15:23:01 -0700127 list(static_dependencies.items()) + dynamic_dependencies
Scott Bakerbba67b62019-01-28 17:38:21 -0800128 )
129
130 model_dependency_graph = DiGraph()
131 for src_model, deps in joint_dependencies.items():
132 for dep in deps:
133 dst_model, src_accessor, dst_accessor = dep
134 if src_model != dst_model:
135 edge_label = {
136 "src_accessor": src_accessor,
137 "dst_accessor": dst_accessor,
138 }
139 model_dependency_graph.add_edge(
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700140 src_model, dst_model, **edge_label
Scott Bakerbba67b62019-01-28 17:38:21 -0800141 )
142
143 model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
144 self.model_dependency_graph = {
145 # deletion
146 True: model_dependency_graph_rev,
147 False: model_dependency_graph,
148 }
Zack Williamsda69db22019-01-29 16:44:52 -0700149 self.log.debug("Loaded dependencies", edges=model_dependency_graph.edges())
Scott Bakerbba67b62019-01-28 17:38:21 -0800150 except Exception as e:
151 self.log.exception("Error loading dependency graph", e=e)
152 raise e
153
154 def load_sync_steps(self):
155 model_to_step = defaultdict(list)
156 external_dependencies = []
157
158 for s in self.sync_steps:
159 if not isinstance(s.observes, list):
160 observes = [s.observes]
161 else:
162 observes = s.observes
Scott Bakerbba67b62019-01-28 17:38:21 -0800163 for m in observes:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800164 if isinstance(m, str):
165 # observes is a string that names the model
166 model_to_step[m].append(s.__name__)
167 else:
168 # observes is the model class
169 model_to_step[m.__name__].append(s.__name__)
Scott Bakerbba67b62019-01-28 17:38:21 -0800170
171 try:
172 external_dependencies.extend(s.external_dependencies)
173 except AttributeError:
174 pass
175
176 self.step_lookup[s.__name__] = s
177
178 self.model_to_step = model_to_step
179 self.external_dependencies = list(set(external_dependencies))
180 self.log.info(
181 "Loaded external dependencies", external_dependencies=external_dependencies
182 )
183 self.log.info("Loaded model_map", **model_to_step)
184
185 def reset_model_accessor(self, o=None):
186 try:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800187 self.model_accessor.reset_queries()
Scott Bakerbba67b62019-01-28 17:38:21 -0800188 except BaseException:
189 # this shouldn't happen, but in case it does, catch it...
190 if o:
191 logdict = o.tologdict()
192 else:
193 logdict = {}
194
195 self.log.error("exception in reset_queries", **logdict)
196
197 def delete_record(self, o, dr_log=None):
198
199 if dr_log is None:
200 dr_log = self.log
201
202 if getattr(o, "backend_need_reap", False):
203 # the object has already been deleted and marked for reaping
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800204 self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800205 else:
206 step = getattr(o, "synchronizer_step", None)
207 if not step:
208 raise ExternalDependencyFailed
209
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800210 self.model_accessor.journal_object(o, "syncstep.call.delete_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800211
212 dr_log.debug("Deleting object", **o.tologdict())
213
214 step.log = dr_log.new(step=step)
215 step.delete_record(o)
216 step.log = dr_log
217
218 dr_log.debug("Deleted object", **o.tologdict())
219
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800220 self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800221 o.backend_need_reap = True
222 o.save(update_fields=["backend_need_reap"])
223
224 def sync_record(self, o, sr_log=None):
225 try:
226 step = o.synchronizer_step
227 except AttributeError:
228 step = None
229
230 if step is None:
231 raise ExternalDependencyFailed
232
233 if sr_log is None:
234 sr_log = self.log
235
236 # Mark this as an object that will require delete. Do
237 # this now rather than after the syncstep,
238 if not (o.backend_need_delete):
239 o.backend_need_delete = True
240 o.save(update_fields=["backend_need_delete"])
241
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800242 self.model_accessor.journal_object(o, "syncstep.call.sync_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800243
244 sr_log.debug("Syncing object", **o.tologdict())
245
246 step.log = sr_log.new(step=step)
247 step.sync_record(o)
248 step.log = sr_log
249
250 sr_log.debug("Synced object", **o.tologdict())
251
252 o.enacted = max(o.updated, o.changed_by_policy)
253 scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
254 o.backend_register = json.dumps(scratchpad)
255 o.backend_status = "OK"
256 o.backend_code = 1
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800257 self.model_accessor.journal_object(o, "syncstep.call.save_update")
Scott Bakerbba67b62019-01-28 17:38:21 -0800258 o.save(
259 update_fields=[
260 "enacted",
261 "backend_status",
262 "backend_register",
263 "backend_code",
264 ]
265 )
266
267 if hasattr(step, "after_sync_save"):
268 step.log = sr_log.new(step=step)
269 step.after_sync_save(o)
270 step.log = sr_log
271
272 sr_log.info("Saved sync object", o=o)
273
274 """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
275
276 def handle_sync_exception(self, o, e):
277 self.log.exception("sync step failed!", e=e, **o.tologdict())
Scott Bakerbba67b62019-01-28 17:38:21 -0800278
279 if hasattr(e, "message"):
280 status = str(e.message)
281 else:
282 status = str(e)
283
284 if isinstance(e, InnocuousException):
285 code = 1
286 elif isinstance(e, DeferredException):
287 # NOTE if the synchronization is Deferred it means that synchronization is still in progress
288 code = 0
289 else:
290 code = 2
291
292 self.set_object_error(o, status, code)
293
294 dependency_error = "Failed due to error in model %s id %d: %s" % (
295 o.leaf_model_name,
296 o.id,
297 status,
298 )
299 return dependency_error, code
300
301 def set_object_error(self, o, status, code):
302 if o.backend_status:
303 error_list = o.backend_status.split(" // ")
304 else:
305 error_list = []
306
307 if status not in error_list:
308 error_list.append(status)
309
310 # Keep last two errors
311 error_list = error_list[-2:]
312
313 o.backend_code = code
314 o.backend_status = " // ".join(error_list)
315
316 try:
317 scratchpad = json.loads(o.backend_register)
318 scratchpad["exponent"]
319 except BaseException:
320 scratchpad = {
321 "next_run": 0,
322 "exponent": 0,
323 "last_success": time.time(),
324 "failures": 0,
325 }
326
327 # Second failure
328 if scratchpad["exponent"]:
329 if code == 1:
330 delay = scratchpad["exponent"] * 60 # 1 minute
331 else:
332 delay = scratchpad["exponent"] * 600 # 10 minutes
333
334 # cap delays at 8 hours
335 if delay > 8 * 60 * 60:
336 delay = 8 * 60 * 60
337 scratchpad["next_run"] = time.time() + delay
338
339 scratchpad["exponent"] += 1
340
341 try:
342 scratchpad["failures"] += 1
343 except KeyError:
344 scratchpad["failures"] = 1
345
346 scratchpad["last_failure"] = time.time()
347
348 o.backend_register = json.dumps(scratchpad)
349
350 # TOFIX:
351 # DatabaseError: value too long for type character varying(140)
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800352 if self.model_accessor.obj_exists(o):
Scott Bakerbba67b62019-01-28 17:38:21 -0800353 try:
354 o.backend_status = o.backend_status[:1024]
355 o.save(
356 update_fields=["backend_status", "backend_register"],
357 always_update_timestamp=True,
358 )
359 except BaseException as e:
360 self.log.exception("Could not update backend status field!", e=e)
361 pass
362
363 def sync_cohort(self, cohort, deletion):
364 threading.current_thread().is_sync_thread = True
365
366 sc_log = self.log.new(thread_id=threading.current_thread().ident)
367
368 try:
Scott Bakerbba67b62019-01-28 17:38:21 -0800369 sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
370
371 cohort_emptied = False
372 dependency_error = None
373 dependency_error_code = None
374
375 itty = iter(cohort)
376
377 while not cohort_emptied:
378 try:
379 self.reset_model_accessor()
380 o = next(itty)
381
382 if dependency_error:
383 self.set_object_error(
384 o, dependency_error, dependency_error_code
385 )
386 continue
387
388 try:
389 if deletion:
390 self.delete_record(o, sc_log)
391 else:
392 self.sync_record(o, sc_log)
393 except ExternalDependencyFailed:
394 dependency_error = (
395 "External dependency on object %s id %d not met"
396 % (o.leaf_model_name, o.id)
397 )
398 dependency_error_code = 1
399 except (DeferredException, InnocuousException, Exception) as e:
400 dependency_error, dependency_error_code = self.handle_sync_exception(
401 o, e
402 )
403
404 except StopIteration:
405 sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
406 cohort_emptied = True
407 finally:
408 self.reset_model_accessor()
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800409 self.model_accessor.connection_close()
Scott Bakerbba67b62019-01-28 17:38:21 -0800410
Scott Bakerbba67b62019-01-28 17:38:21 -0800411 def run(self):
412 # Cleanup: Move self.driver into a synchronizer context
413 # made available to every sync step.
414 if not self.driver.enabled:
415 self.log.warning("Driver is not enabled. Not running sync steps.")
416 return
417
418 while True:
Zack Williamsda69db22019-01-29 16:44:52 -0700419 self.log.debug("Waiting for event or timeout")
Scott Bakerbba67b62019-01-28 17:38:21 -0800420 self.wait_for_event(timeout=5)
Zack Williamsda69db22019-01-29 16:44:52 -0700421 self.log.debug("Synchronizer awake")
Scott Bakerbba67b62019-01-28 17:38:21 -0800422
423 self.run_once()
424
425 def fetch_pending(self, deletion=False):
Scott Bakerbba67b62019-01-28 17:38:21 -0800426 pending_objects = []
427 pending_steps = []
Zack Williams5c2ea232019-01-30 15:23:01 -0700428 step_list = list(self.step_lookup.values())
Scott Bakerbba67b62019-01-28 17:38:21 -0800429
430 for e in self.external_dependencies:
431 s = SyncStep
Zack Williams5c2ea232019-01-30 15:23:01 -0700432 if isinstance(e, str):
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800433 # external dependency is a string that names a model class
434 s.observes = self.model_accessor.get_model_class(e)
435 else:
436 # external dependency is a model class
437 s.observes = e
Scott Bakerbba67b62019-01-28 17:38:21 -0800438 step_list.append(s)
439
440 for step_class in step_list:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800441 step = step_class(driver=self.driver, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800442 step.log = self.log.new(step=step)
443
444 if not hasattr(step, "call"):
445 pending = step.fetch_pending(deletion)
446 for obj in pending:
Zack Williams5c2ea232019-01-30 15:23:01 -0700447 step = step_class(
448 driver=self.driver, model_accessor=self.model_accessor
449 )
Scott Bakerbba67b62019-01-28 17:38:21 -0800450 step.log = self.log.new(step=step)
451 obj.synchronizer_step = step
452
Scott Bakerbba67b62019-01-28 17:38:21 -0800453 pending_objects.extend(pending)
Scott Bakerbba67b62019-01-28 17:38:21 -0800454 else:
455 # Support old and broken legacy synchronizers
456 # This needs to be dropped soon.
457 pending_steps.append(step)
458
Zack Williamsda69db22019-01-29 16:44:52 -0700459 self.log.debug(
Scott Bakerbba67b62019-01-28 17:38:21 -0800460 "Fetched pending data",
461 pending_objects=pending_objects,
462 legacy_steps=pending_steps,
463 )
464 return pending_objects, pending_steps
465
466 def linked_objects(self, o):
467 if o is None:
468 return [], None
469 try:
Zack Williams5c2ea232019-01-30 15:23:01 -0700470 o_lst = [oa for oa in o.all()]
Scott Bakerbba67b62019-01-28 17:38:21 -0800471 edge_type = PROXY_EDGE
472 except (AttributeError, TypeError):
473 o_lst = [o]
474 edge_type = DIRECT_EDGE
475 return o_lst, edge_type
476
477 """ Automatically test if a real dependency path exists between two objects. e.g.
478 given an Instance, and a ControllerSite, the test amounts to:
479 instance.slice.site == controller.site
480
481 Then the two objects are related, and should be put in the same cohort.
482 If the models of the two objects are not dependent, then the check trivially
483 returns False.
484 """
485
486 def same_object(self, o1, o2):
487 if not o1 or not o2:
488 return False, None
489
490 o1_lst, edge_type = self.linked_objects(o1)
491
492 try:
493 found = next(
494 obj
495 for obj in o1_lst
496 if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
497 )
498 except AttributeError as e:
499 self.log.exception("Compared objects could not be identified", e=e)
500 raise e
501 except StopIteration:
502 # This is a temporary workaround to establish dependencies between
503 # deleted proxy objects. A better solution would be for the ORM to
504 # return the set of deleted objects via foreign keys. At that point,
505 # the following line would change back to found = False
506 # - Sapan
507
508 found = getattr(o2, "deleted", False)
509
510 return found, edge_type
511
512 def concrete_path_exists(self, o1, o2):
513 try:
514 m1 = o1.leaf_model_name
515 m2 = o2.leaf_model_name
516 except AttributeError:
517 # One of the nodes is not in the dependency graph
518 # No dependency
519 return False, None
520
521 if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
522 return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
523
524 # FIXME: Dynamic dependency check
525 G = self.model_dependency_graph[False]
526 paths = all_shortest_paths(G, m1, m2)
527
528 try:
529 any(paths)
530 paths = all_shortest_paths(G, m1, m2)
Scott Baker810c44a2019-03-28 08:01:50 -0700531 except (NetworkXNoPath, NodeNotFound):
Scott Bakerbba67b62019-01-28 17:38:21 -0800532 # Easy. The two models are unrelated.
533 return False, None
534
535 for p in paths:
536 src_object = o1
537 edge_type = DIRECT_EDGE
538
539 for i in range(len(p) - 1):
540 src = p[i]
541 dst = p[i + 1]
542 edge_label = G[src][dst]
543 sa = edge_label["src_accessor"]
544 try:
545 dst_accessor = getattr(src_object, sa)
546 dst_objects, link_edge_type = self.linked_objects(dst_accessor)
547 if link_edge_type == PROXY_EDGE:
548 edge_type = link_edge_type
549
550 """
551
552 True If no linked objects and deletion
553 False If no linked objects
554 True If multiple linked objects
555 <continue traversal> If single linked object
556
557 """
558
559 if dst_objects == []:
560 # Workaround for ORM not returning linked deleted
561 # objects
562 if o2.deleted:
563 return True, edge_type
564 else:
565 dst_object = None
566 elif len(dst_objects) > 1:
567 # Multiple linked objects. Assume anything could be among those multiple objects.
568 raise AttributeError
569 else:
570 dst_object = dst_objects[0]
571 except AttributeError as e:
572 if sa != "fake_accessor":
573 self.log.debug(
574 "Could not check object dependencies, making conservative choice %s",
575 e,
576 src_object=src_object,
577 sa=sa,
578 o1=o1,
579 o2=o2,
580 )
581 return True, edge_type
582
583 src_object = dst_object
584
585 if not src_object:
586 break
587
588 verdict, edge_type = self.same_object(src_object, o2)
589 if verdict:
590 return verdict, edge_type
591
592 # Otherwise try other paths
593
594 return False, None
595
596 """
597
598 This function implements the main scheduling logic
599 of the Synchronizer. It divides incoming work (dirty objects)
600 into cohorts of dependent objects, and runs each such cohort
601 in its own thread.
602
603 Future work:
604
605 * Run event thread in parallel to the scheduling thread, and
606 add incoming objects to existing cohorts. Doing so should
607 greatly improve synchronizer performance.
608 * A single object might need to be added to multiple cohorts.
609 In this case, the last cohort handles such an object.
610 * This algorithm is horizontal-scale-ready. Multiple synchronizers
611 could run off a shared runqueue of cohorts.
612
613 """
614
615 def compute_dependent_cohorts(self, objects, deletion):
Scott Bakerbba67b62019-01-28 17:38:21 -0800616 n = len(objects)
Zack Williams5c2ea232019-01-30 15:23:01 -0700617 r = list(range(n))
Scott Bakerbba67b62019-01-28 17:38:21 -0800618
619 oG = DiGraph()
620
621 for i in r:
622 oG.add_node(i)
623
624 try:
625 for i0 in range(n):
626 for i1 in range(n):
627 if i0 != i1:
628 if deletion:
629 path_args = (objects[i1], objects[i0])
630 else:
631 path_args = (objects[i0], objects[i1])
632
633 is_connected, edge_type = self.concrete_path_exists(*path_args)
634 if is_connected:
635 try:
636 edge_type = oG[i1][i0]["type"]
637 if edge_type == PROXY_EDGE:
638 oG.remove_edge(i1, i0)
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700639 oG.add_edge(i0, i1, type=edge_type)
Scott Bakerbba67b62019-01-28 17:38:21 -0800640 except KeyError:
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700641 oG.add_edge(i0, i1, type=edge_type)
Scott Bakerbba67b62019-01-28 17:38:21 -0800642 except KeyError:
643 pass
644
645 components = weakly_connected_component_subgraphs(oG)
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700646 cohort_indexes = [reversed(list(topological_sort(g))) for g in components]
Scott Bakerbba67b62019-01-28 17:38:21 -0800647 cohorts = [
648 [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
649 ]
650
651 return cohorts
652
653 def run_once(self):
654 self.load_dependency_graph()
655
656 try:
657 # Why are we checking the DB connection here?
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800658 self.model_accessor.check_db_connection_okay()
Scott Bakerbba67b62019-01-28 17:38:21 -0800659
Scott Bakerbba67b62019-01-28 17:38:21 -0800660 # Two passes. One for sync, the other for deletion.
661 for deletion in (False, True):
662 objects_to_process = []
663
664 objects_to_process, steps_to_process = self.fetch_pending(deletion)
665 dependent_cohorts = self.compute_dependent_cohorts(
666 objects_to_process, deletion
667 )
668
669 threads = []
Zack Williamsda69db22019-01-29 16:44:52 -0700670 self.log.debug("In run once inner loop", deletion=deletion)
Scott Bakerbba67b62019-01-28 17:38:21 -0800671
672 for cohort in dependent_cohorts:
673 thread = threading.Thread(
674 target=self.sync_cohort,
675 name="synchronizer",
676 args=(cohort, deletion),
677 )
678
679 threads.append(thread)
680
681 # Start threads
682 for t in threads:
683 t.start()
684
685 self.reset_model_accessor()
686
687 # Wait for all threads to finish before continuing with the run
688 # loop
689 for t in threads:
690 t.join()
691
692 # Run legacy synchronizers, which do everything in call()
693 for step in steps_to_process:
694 try:
695 step.call(deletion=deletion)
696 except Exception as e:
697 self.log.exception("Legacy step failed", step=step, e=e)
698
Scott Bakerbba67b62019-01-28 17:38:21 -0800699 except Exception as e:
700 self.log.exception(
701 "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
702 e=e,
703 )
704 self.log.error("Exception in observer run loop")