blob: 0c696943883e3823c62973c75a3ff31ff911eaee [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15# TODO:
16# Add unit tests:
17# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
18
Zack Williams5c2ea232019-01-30 15:23:01 -070019from __future__ import absolute_import
Scott Bakerbba67b62019-01-28 17:38:21 -080020
Zack Williams5c2ea232019-01-30 15:23:01 -070021import json
22import threading
23import time
Scott Bakerbba67b62019-01-28 17:38:21 -080024from collections import defaultdict
Zack Williams5c2ea232019-01-30 15:23:01 -070025
26from multistructlog import create_logger
Scott Bakerbba67b62019-01-28 17:38:21 -080027from networkx import (
28 DiGraph,
Scott Bakerbba67b62019-01-28 17:38:21 -080029 NetworkXNoPath,
Zack Williams5c2ea232019-01-30 15:23:01 -070030 all_shortest_paths,
31 weakly_connected_component_subgraphs,
Scott Bakerbba67b62019-01-28 17:38:21 -080032)
33from networkx.algorithms.dag import topological_sort
Scott Bakerbba67b62019-01-28 17:38:21 -080034from xosconfig import Config
Zack Williams5c2ea232019-01-30 15:23:01 -070035from xossynchronizer.steps.syncstep import (
36 DeferredException,
37 InnocuousException,
38 SyncStep,
39)
40from six.moves import range
Scott Bakerbba67b62019-01-28 17:38:21 -080041
42log = create_logger(Config().get("logging"))
43
44
45class StepNotReady(Exception):
46 pass
47
48
49class ExternalDependencyFailed(Exception):
50 pass
51
52
53# FIXME: Move drivers into a context shared across sync steps.
54
55
56class NoOpDriver:
57 def __init__(self):
58 self.enabled = True
59 self.dependency_graph = None
60
61
62# Everyone gets NoOpDriver by default. To use a different driver, call
63# set_driver() below.
64DRIVER = NoOpDriver()
65
66DIRECT_EDGE = 1
67PROXY_EDGE = 2
68
69
70def set_driver(x):
71 global DRIVER
72 DRIVER = x
73
74
75class XOSObserver(object):
76 sync_steps = []
77
Scott Bakerc2fddaa2019-01-30 15:45:03 -080078 def __init__(self, sync_steps, model_accessor, log=log):
Scott Bakerbba67b62019-01-28 17:38:21 -080079 self.log = log
Scott Bakerc2fddaa2019-01-30 15:45:03 -080080 self.model_accessor = model_accessor
Scott Bakerbba67b62019-01-28 17:38:21 -080081
82 self.step_lookup = {}
83 self.sync_steps = sync_steps
84 self.load_sync_steps()
85
86 self.load_dependency_graph()
87
88 self.event_cond = threading.Condition()
89
90 self.driver = DRIVER
91 self.observer_name = Config.get("name")
92
93 def wait_for_event(self, timeout):
94 self.event_cond.acquire()
95 self.event_cond.wait(timeout)
96 self.event_cond.release()
97
98 def wake_up(self):
99 self.log.debug("Wake up routine called")
100 self.event_cond.acquire()
101 self.event_cond.notify()
102 self.event_cond.release()
103
104 def load_dependency_graph(self):
105
106 try:
107 if Config.get("dependency_graph"):
Zack Williamsda69db22019-01-29 16:44:52 -0700108 self.log.debug(
Scott Bakerbba67b62019-01-28 17:38:21 -0800109 "Loading model dependency graph",
110 path=Config.get("dependency_graph"),
111 )
112 dep_graph_str = open(Config.get("dependency_graph")).read()
113 else:
Zack Williamsda69db22019-01-29 16:44:52 -0700114 self.log.debug("Using default model dependency graph", graph={})
Scott Bakerbba67b62019-01-28 17:38:21 -0800115 dep_graph_str = "{}"
116
117 # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
118 # src_port is the field that accesses Model2 from Model1
119 # dst_port is the field that accesses Model1 from Model2
120 static_dependencies = json.loads(dep_graph_str)
Zack Williams5c2ea232019-01-30 15:23:01 -0700121 dynamic_dependencies = (
122 []
123 ) # Dropped Service and ServiceInstance dynamic dependencies
Scott Bakerbba67b62019-01-28 17:38:21 -0800124
125 joint_dependencies = dict(
Zack Williams5c2ea232019-01-30 15:23:01 -0700126 list(static_dependencies.items()) + dynamic_dependencies
Scott Bakerbba67b62019-01-28 17:38:21 -0800127 )
128
129 model_dependency_graph = DiGraph()
130 for src_model, deps in joint_dependencies.items():
131 for dep in deps:
132 dst_model, src_accessor, dst_accessor = dep
133 if src_model != dst_model:
134 edge_label = {
135 "src_accessor": src_accessor,
136 "dst_accessor": dst_accessor,
137 }
138 model_dependency_graph.add_edge(
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700139 src_model, dst_model, **edge_label
Scott Bakerbba67b62019-01-28 17:38:21 -0800140 )
141
142 model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
143 self.model_dependency_graph = {
144 # deletion
145 True: model_dependency_graph_rev,
146 False: model_dependency_graph,
147 }
Zack Williamsda69db22019-01-29 16:44:52 -0700148 self.log.debug("Loaded dependencies", edges=model_dependency_graph.edges())
Scott Bakerbba67b62019-01-28 17:38:21 -0800149 except Exception as e:
150 self.log.exception("Error loading dependency graph", e=e)
151 raise e
152
153 def load_sync_steps(self):
154 model_to_step = defaultdict(list)
155 external_dependencies = []
156
157 for s in self.sync_steps:
158 if not isinstance(s.observes, list):
159 observes = [s.observes]
160 else:
161 observes = s.observes
Scott Bakerbba67b62019-01-28 17:38:21 -0800162 for m in observes:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800163 if isinstance(m, str):
164 # observes is a string that names the model
165 model_to_step[m].append(s.__name__)
166 else:
167 # observes is the model class
168 model_to_step[m.__name__].append(s.__name__)
Scott Bakerbba67b62019-01-28 17:38:21 -0800169
170 try:
171 external_dependencies.extend(s.external_dependencies)
172 except AttributeError:
173 pass
174
175 self.step_lookup[s.__name__] = s
176
177 self.model_to_step = model_to_step
178 self.external_dependencies = list(set(external_dependencies))
179 self.log.info(
180 "Loaded external dependencies", external_dependencies=external_dependencies
181 )
182 self.log.info("Loaded model_map", **model_to_step)
183
184 def reset_model_accessor(self, o=None):
185 try:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800186 self.model_accessor.reset_queries()
Scott Bakerbba67b62019-01-28 17:38:21 -0800187 except BaseException:
188 # this shouldn't happen, but in case it does, catch it...
189 if o:
190 logdict = o.tologdict()
191 else:
192 logdict = {}
193
194 self.log.error("exception in reset_queries", **logdict)
195
196 def delete_record(self, o, dr_log=None):
197
198 if dr_log is None:
199 dr_log = self.log
200
201 if getattr(o, "backend_need_reap", False):
202 # the object has already been deleted and marked for reaping
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800203 self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800204 else:
205 step = getattr(o, "synchronizer_step", None)
206 if not step:
207 raise ExternalDependencyFailed
208
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800209 self.model_accessor.journal_object(o, "syncstep.call.delete_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800210
211 dr_log.debug("Deleting object", **o.tologdict())
212
213 step.log = dr_log.new(step=step)
214 step.delete_record(o)
215 step.log = dr_log
216
217 dr_log.debug("Deleted object", **o.tologdict())
218
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800219 self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
Scott Bakerbba67b62019-01-28 17:38:21 -0800220 o.backend_need_reap = True
221 o.save(update_fields=["backend_need_reap"])
222
223 def sync_record(self, o, sr_log=None):
224 try:
225 step = o.synchronizer_step
226 except AttributeError:
227 step = None
228
229 if step is None:
230 raise ExternalDependencyFailed
231
232 if sr_log is None:
233 sr_log = self.log
234
235 # Mark this as an object that will require delete. Do
236 # this now rather than after the syncstep,
237 if not (o.backend_need_delete):
238 o.backend_need_delete = True
239 o.save(update_fields=["backend_need_delete"])
240
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800241 self.model_accessor.journal_object(o, "syncstep.call.sync_record")
Scott Bakerbba67b62019-01-28 17:38:21 -0800242
243 sr_log.debug("Syncing object", **o.tologdict())
244
245 step.log = sr_log.new(step=step)
246 step.sync_record(o)
247 step.log = sr_log
248
249 sr_log.debug("Synced object", **o.tologdict())
250
251 o.enacted = max(o.updated, o.changed_by_policy)
252 scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
253 o.backend_register = json.dumps(scratchpad)
254 o.backend_status = "OK"
255 o.backend_code = 1
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800256 self.model_accessor.journal_object(o, "syncstep.call.save_update")
Scott Bakerbba67b62019-01-28 17:38:21 -0800257 o.save(
258 update_fields=[
259 "enacted",
260 "backend_status",
261 "backend_register",
262 "backend_code",
263 ]
264 )
265
266 if hasattr(step, "after_sync_save"):
267 step.log = sr_log.new(step=step)
268 step.after_sync_save(o)
269 step.log = sr_log
270
271 sr_log.info("Saved sync object", o=o)
272
273 """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
274
275 def handle_sync_exception(self, o, e):
276 self.log.exception("sync step failed!", e=e, **o.tologdict())
Scott Bakerbba67b62019-01-28 17:38:21 -0800277
278 if hasattr(e, "message"):
279 status = str(e.message)
280 else:
281 status = str(e)
282
283 if isinstance(e, InnocuousException):
284 code = 1
285 elif isinstance(e, DeferredException):
286 # NOTE if the synchronization is Deferred it means that synchronization is still in progress
287 code = 0
288 else:
289 code = 2
290
291 self.set_object_error(o, status, code)
292
293 dependency_error = "Failed due to error in model %s id %d: %s" % (
294 o.leaf_model_name,
295 o.id,
296 status,
297 )
298 return dependency_error, code
299
300 def set_object_error(self, o, status, code):
301 if o.backend_status:
302 error_list = o.backend_status.split(" // ")
303 else:
304 error_list = []
305
306 if status not in error_list:
307 error_list.append(status)
308
309 # Keep last two errors
310 error_list = error_list[-2:]
311
312 o.backend_code = code
313 o.backend_status = " // ".join(error_list)
314
315 try:
316 scratchpad = json.loads(o.backend_register)
317 scratchpad["exponent"]
318 except BaseException:
319 scratchpad = {
320 "next_run": 0,
321 "exponent": 0,
322 "last_success": time.time(),
323 "failures": 0,
324 }
325
326 # Second failure
327 if scratchpad["exponent"]:
328 if code == 1:
329 delay = scratchpad["exponent"] * 60 # 1 minute
330 else:
331 delay = scratchpad["exponent"] * 600 # 10 minutes
332
333 # cap delays at 8 hours
334 if delay > 8 * 60 * 60:
335 delay = 8 * 60 * 60
336 scratchpad["next_run"] = time.time() + delay
337
338 scratchpad["exponent"] += 1
339
340 try:
341 scratchpad["failures"] += 1
342 except KeyError:
343 scratchpad["failures"] = 1
344
345 scratchpad["last_failure"] = time.time()
346
347 o.backend_register = json.dumps(scratchpad)
348
349 # TOFIX:
350 # DatabaseError: value too long for type character varying(140)
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800351 if self.model_accessor.obj_exists(o):
Scott Bakerbba67b62019-01-28 17:38:21 -0800352 try:
353 o.backend_status = o.backend_status[:1024]
354 o.save(
355 update_fields=["backend_status", "backend_register"],
356 always_update_timestamp=True,
357 )
358 except BaseException as e:
359 self.log.exception("Could not update backend status field!", e=e)
360 pass
361
362 def sync_cohort(self, cohort, deletion):
363 threading.current_thread().is_sync_thread = True
364
365 sc_log = self.log.new(thread_id=threading.current_thread().ident)
366
367 try:
Scott Bakerbba67b62019-01-28 17:38:21 -0800368 sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
369
370 cohort_emptied = False
371 dependency_error = None
372 dependency_error_code = None
373
374 itty = iter(cohort)
375
376 while not cohort_emptied:
377 try:
378 self.reset_model_accessor()
379 o = next(itty)
380
381 if dependency_error:
382 self.set_object_error(
383 o, dependency_error, dependency_error_code
384 )
385 continue
386
387 try:
388 if deletion:
389 self.delete_record(o, sc_log)
390 else:
391 self.sync_record(o, sc_log)
392 except ExternalDependencyFailed:
393 dependency_error = (
394 "External dependency on object %s id %d not met"
395 % (o.leaf_model_name, o.id)
396 )
397 dependency_error_code = 1
398 except (DeferredException, InnocuousException, Exception) as e:
399 dependency_error, dependency_error_code = self.handle_sync_exception(
400 o, e
401 )
402
403 except StopIteration:
404 sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
405 cohort_emptied = True
406 finally:
407 self.reset_model_accessor()
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800408 self.model_accessor.connection_close()
Scott Bakerbba67b62019-01-28 17:38:21 -0800409
Scott Bakerbba67b62019-01-28 17:38:21 -0800410 def run(self):
411 # Cleanup: Move self.driver into a synchronizer context
412 # made available to every sync step.
413 if not self.driver.enabled:
414 self.log.warning("Driver is not enabled. Not running sync steps.")
415 return
416
417 while True:
Zack Williamsda69db22019-01-29 16:44:52 -0700418 self.log.debug("Waiting for event or timeout")
Scott Bakerbba67b62019-01-28 17:38:21 -0800419 self.wait_for_event(timeout=5)
Zack Williamsda69db22019-01-29 16:44:52 -0700420 self.log.debug("Synchronizer awake")
Scott Bakerbba67b62019-01-28 17:38:21 -0800421
422 self.run_once()
423
424 def fetch_pending(self, deletion=False):
Scott Bakerbba67b62019-01-28 17:38:21 -0800425 pending_objects = []
426 pending_steps = []
Zack Williams5c2ea232019-01-30 15:23:01 -0700427 step_list = list(self.step_lookup.values())
Scott Bakerbba67b62019-01-28 17:38:21 -0800428
429 for e in self.external_dependencies:
430 s = SyncStep
Zack Williams5c2ea232019-01-30 15:23:01 -0700431 if isinstance(e, str):
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800432 # external dependency is a string that names a model class
433 s.observes = self.model_accessor.get_model_class(e)
434 else:
435 # external dependency is a model class
436 s.observes = e
Scott Bakerbba67b62019-01-28 17:38:21 -0800437 step_list.append(s)
438
439 for step_class in step_list:
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800440 step = step_class(driver=self.driver, model_accessor=self.model_accessor)
Scott Bakerbba67b62019-01-28 17:38:21 -0800441 step.log = self.log.new(step=step)
442
443 if not hasattr(step, "call"):
444 pending = step.fetch_pending(deletion)
445 for obj in pending:
Zack Williams5c2ea232019-01-30 15:23:01 -0700446 step = step_class(
447 driver=self.driver, model_accessor=self.model_accessor
448 )
Scott Bakerbba67b62019-01-28 17:38:21 -0800449 step.log = self.log.new(step=step)
450 obj.synchronizer_step = step
451
Scott Bakerbba67b62019-01-28 17:38:21 -0800452 pending_objects.extend(pending)
Scott Bakerbba67b62019-01-28 17:38:21 -0800453 else:
454 # Support old and broken legacy synchronizers
455 # This needs to be dropped soon.
456 pending_steps.append(step)
457
Zack Williamsda69db22019-01-29 16:44:52 -0700458 self.log.debug(
Scott Bakerbba67b62019-01-28 17:38:21 -0800459 "Fetched pending data",
460 pending_objects=pending_objects,
461 legacy_steps=pending_steps,
462 )
463 return pending_objects, pending_steps
464
465 def linked_objects(self, o):
466 if o is None:
467 return [], None
468 try:
Zack Williams5c2ea232019-01-30 15:23:01 -0700469 o_lst = [oa for oa in o.all()]
Scott Bakerbba67b62019-01-28 17:38:21 -0800470 edge_type = PROXY_EDGE
471 except (AttributeError, TypeError):
472 o_lst = [o]
473 edge_type = DIRECT_EDGE
474 return o_lst, edge_type
475
476 """ Automatically test if a real dependency path exists between two objects. e.g.
477 given an Instance, and a ControllerSite, the test amounts to:
478 instance.slice.site == controller.site
479
480 Then the two objects are related, and should be put in the same cohort.
481 If the models of the two objects are not dependent, then the check trivially
482 returns False.
483 """
484
485 def same_object(self, o1, o2):
486 if not o1 or not o2:
487 return False, None
488
489 o1_lst, edge_type = self.linked_objects(o1)
490
491 try:
492 found = next(
493 obj
494 for obj in o1_lst
495 if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
496 )
497 except AttributeError as e:
498 self.log.exception("Compared objects could not be identified", e=e)
499 raise e
500 except StopIteration:
501 # This is a temporary workaround to establish dependencies between
502 # deleted proxy objects. A better solution would be for the ORM to
503 # return the set of deleted objects via foreign keys. At that point,
504 # the following line would change back to found = False
505 # - Sapan
506
507 found = getattr(o2, "deleted", False)
508
509 return found, edge_type
510
511 def concrete_path_exists(self, o1, o2):
512 try:
513 m1 = o1.leaf_model_name
514 m2 = o2.leaf_model_name
515 except AttributeError:
516 # One of the nodes is not in the dependency graph
517 # No dependency
518 return False, None
519
520 if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
521 return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
522
523 # FIXME: Dynamic dependency check
524 G = self.model_dependency_graph[False]
525 paths = all_shortest_paths(G, m1, m2)
526
527 try:
528 any(paths)
529 paths = all_shortest_paths(G, m1, m2)
530 except NetworkXNoPath:
531 # Easy. The two models are unrelated.
532 return False, None
533
534 for p in paths:
535 src_object = o1
536 edge_type = DIRECT_EDGE
537
538 for i in range(len(p) - 1):
539 src = p[i]
540 dst = p[i + 1]
541 edge_label = G[src][dst]
542 sa = edge_label["src_accessor"]
543 try:
544 dst_accessor = getattr(src_object, sa)
545 dst_objects, link_edge_type = self.linked_objects(dst_accessor)
546 if link_edge_type == PROXY_EDGE:
547 edge_type = link_edge_type
548
549 """
550
551 True If no linked objects and deletion
552 False If no linked objects
553 True If multiple linked objects
554 <continue traversal> If single linked object
555
556 """
557
558 if dst_objects == []:
559 # Workaround for ORM not returning linked deleted
560 # objects
561 if o2.deleted:
562 return True, edge_type
563 else:
564 dst_object = None
565 elif len(dst_objects) > 1:
566 # Multiple linked objects. Assume anything could be among those multiple objects.
567 raise AttributeError
568 else:
569 dst_object = dst_objects[0]
570 except AttributeError as e:
571 if sa != "fake_accessor":
572 self.log.debug(
573 "Could not check object dependencies, making conservative choice %s",
574 e,
575 src_object=src_object,
576 sa=sa,
577 o1=o1,
578 o2=o2,
579 )
580 return True, edge_type
581
582 src_object = dst_object
583
584 if not src_object:
585 break
586
587 verdict, edge_type = self.same_object(src_object, o2)
588 if verdict:
589 return verdict, edge_type
590
591 # Otherwise try other paths
592
593 return False, None
594
595 """
596
597 This function implements the main scheduling logic
598 of the Synchronizer. It divides incoming work (dirty objects)
599 into cohorts of dependent objects, and runs each such cohort
600 in its own thread.
601
602 Future work:
603
604 * Run event thread in parallel to the scheduling thread, and
605 add incoming objects to existing cohorts. Doing so should
606 greatly improve synchronizer performance.
607 * A single object might need to be added to multiple cohorts.
608 In this case, the last cohort handles such an object.
609 * This algorithm is horizontal-scale-ready. Multiple synchronizers
610 could run off a shared runqueue of cohorts.
611
612 """
613
614 def compute_dependent_cohorts(self, objects, deletion):
Scott Bakerbba67b62019-01-28 17:38:21 -0800615 n = len(objects)
Zack Williams5c2ea232019-01-30 15:23:01 -0700616 r = list(range(n))
Scott Bakerbba67b62019-01-28 17:38:21 -0800617
618 oG = DiGraph()
619
620 for i in r:
621 oG.add_node(i)
622
623 try:
624 for i0 in range(n):
625 for i1 in range(n):
626 if i0 != i1:
627 if deletion:
628 path_args = (objects[i1], objects[i0])
629 else:
630 path_args = (objects[i0], objects[i1])
631
632 is_connected, edge_type = self.concrete_path_exists(*path_args)
633 if is_connected:
634 try:
635 edge_type = oG[i1][i0]["type"]
636 if edge_type == PROXY_EDGE:
637 oG.remove_edge(i1, i0)
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700638 oG.add_edge(i0, i1, type=edge_type)
Scott Bakerbba67b62019-01-28 17:38:21 -0800639 except KeyError:
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700640 oG.add_edge(i0, i1, type=edge_type)
Scott Bakerbba67b62019-01-28 17:38:21 -0800641 except KeyError:
642 pass
643
644 components = weakly_connected_component_subgraphs(oG)
Zack Williamsbe5ee1c2019-03-18 15:33:07 -0700645 cohort_indexes = [reversed(list(topological_sort(g))) for g in components]
Scott Bakerbba67b62019-01-28 17:38:21 -0800646 cohorts = [
647 [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
648 ]
649
650 return cohorts
651
652 def run_once(self):
653 self.load_dependency_graph()
654
655 try:
656 # Why are we checking the DB connection here?
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800657 self.model_accessor.check_db_connection_okay()
Scott Bakerbba67b62019-01-28 17:38:21 -0800658
Scott Bakerbba67b62019-01-28 17:38:21 -0800659 # Two passes. One for sync, the other for deletion.
660 for deletion in (False, True):
661 objects_to_process = []
662
663 objects_to_process, steps_to_process = self.fetch_pending(deletion)
664 dependent_cohorts = self.compute_dependent_cohorts(
665 objects_to_process, deletion
666 )
667
668 threads = []
Zack Williamsda69db22019-01-29 16:44:52 -0700669 self.log.debug("In run once inner loop", deletion=deletion)
Scott Bakerbba67b62019-01-28 17:38:21 -0800670
671 for cohort in dependent_cohorts:
672 thread = threading.Thread(
673 target=self.sync_cohort,
674 name="synchronizer",
675 args=(cohort, deletion),
676 )
677
678 threads.append(thread)
679
680 # Start threads
681 for t in threads:
682 t.start()
683
684 self.reset_model_accessor()
685
686 # Wait for all threads to finish before continuing with the run
687 # loop
688 for t in threads:
689 t.join()
690
691 # Run legacy synchronizers, which do everything in call()
692 for step in steps_to_process:
693 try:
694 step.call(deletion=deletion)
695 except Exception as e:
696 self.log.exception("Legacy step failed", step=step, e=e)
697
Scott Bakerbba67b62019-01-28 17:38:21 -0800698 except Exception as e:
699 self.log.exception(
700 "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
701 e=e,
702 )
703 self.log.error("Exception in observer run loop")