CORD-1801: [1/2] Drop extra loop and call deletion routine for legacy steps
Change-Id: If64f5cdf7d5a3a14317b314df78023a31eac33ab
(cherry picked from commit 4a6c307110491afccf5131e90c0c0ce42711e32f)
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 924a45b..9a87b56 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -35,9 +35,11 @@
log = create_logger(Config().get('logging'))
+
class StepNotReady(Exception):
pass
+
class ExternalDependencyFailed(Exception):
pass
@@ -55,7 +57,6 @@
DRIVER = NoOpDriver()
-
def set_driver(x):
global DRIVER
DRIVER = x
@@ -64,7 +65,7 @@
class XOSObserver:
sync_steps = []
- def __init__(self, sync_steps, log = log):
+ def __init__(self, sync_steps, log=log):
# The Condition object via which events are received
self.log = log
self.step_lookup = {}
@@ -89,7 +90,7 @@
def load_dependency_graph(self):
dep_path = Config.get("dependency_graph")
- self.log.info('Loading model dependency graph', path = dep_path)
+ self.log.info('Loading model dependency graph', path=dep_path)
try:
dep_graph_str = open(dep_path).read()
@@ -116,14 +117,16 @@
True: model_dependency_graph_rev,
False: model_dependency_graph
}
- self.log.info("Loaded dependencies", edges = model_dependency_graph.edges())
+ self.log.info(
+ "Loaded dependencies",
+ edges=model_dependency_graph.edges())
except Exception as e:
- self.log.exception("Error loading dependency graph", e = e)
+ self.log.exception("Error loading dependency graph", e=e)
raise e
def load_sync_steps(self):
model_to_step = defaultdict(list)
- external_dependencies = []
+ external_dependencies = []
for s in self.sync_steps:
if not isinstance(s.observes, list):
@@ -143,7 +146,9 @@
self.model_to_step = model_to_step
self.external_dependencies = list(set(external_dependencies))
- self.log.info('Loaded external dependencies', external_dependencies = external_dependencies)
+ self.log.info(
+ 'Loaded external dependencies',
+ external_dependencies=external_dependencies)
self.log.info('Loaded model_map', **model_to_step)
def reset_model_accessor(self, o=None):
@@ -171,7 +176,7 @@
model_accessor.journal_object(o, "syncstep.call.delete_record")
log.debug("Deleting object", **o.tologdict())
- step.log = log.bind(step = step)
+ step.log = log.bind(step=step)
step.delete_record(o)
step.log = self.log
@@ -180,7 +185,6 @@
model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=['backend_need_reap'])
-
def sync_record(self, o, log):
try:
@@ -200,7 +204,7 @@
log.debug("Syncing object", **o.tologdict())
- step.log = log.bind(step = step)
+ step.log = log.bind(step=step)
step.sync_record(o)
step.log = self.log
@@ -216,11 +220,12 @@
o.backend_code = 1
model_accessor.journal_object(o, "syncstep.call.save_update")
o.save(update_fields=['enacted', 'backend_status', 'backend_register'])
- log.info("Saved sync object, new enacted", enacted = new_enacted)
+ log.info("Saved sync object, new enacted", enacted=new_enacted)
""" This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
+
def handle_sync_exception(self, o, e):
- self.log.exception("sync step failed!", e = e, **o.tologdict())
+ self.log.exception("sync step failed!", e=e, **o.tologdict())
current_code = o.backend_code
if hasattr(e, 'message'):
@@ -228,7 +233,8 @@
else:
status = str(e)
- if isinstance(e, InnocuousException) or isinstance(e, DeferredException):
+ if isinstance(e, InnocuousException) or isinstance(
+ e, DeferredException):
code = 1
else:
code = 2
@@ -291,15 +297,19 @@
o.backend_status = o.backend_status[:1024]
o.save(update_fields=['backend_status',
'backend_register', 'updated'])
- except BaseException,e:
- self.log.exception("Could not update backend status field!", e = e)
+ except BaseException as e:
+ self.log.exception(
+ "Could not update backend status field!", e=e)
pass
def sync_cohort(self, cohort, deletion):
- log = self.log.bind(thread_id = threading.current_thread().ident)
+ log = self.log.bind(thread_id=threading.current_thread().ident)
try:
start_time = time.time()
- log.debug("Starting to work on cohort", cohort = cohort, deletion = deletion)
+ log.debug(
+ "Starting to work on cohort",
+ cohort=cohort,
+ deletion=deletion)
cohort_emptied = False
dependency_error = None
@@ -323,14 +333,18 @@
else:
self.sync_record(o, log)
except ExternalDependencyFailed:
- dependency_error = 'External dependency on object %s id %d not met'%(o.__class__.__name__, o.id)
+ dependency_error = 'External dependency on object %s id %d not met' % (
+ o.__class__.__name__, o.id)
dependency_error_code = 1
except (DeferredException, InnocuousException, Exception) as e:
dependency_error, dependency_error_code = self.handle_sync_exception(
o, e)
except StopIteration:
- log.debug("Cohort completed", cohort = cohort, deletion = deletion)
+ log.debug(
+ "Cohort completed",
+ cohort=cohort,
+ deletion=deletion)
cohort_emptied = True
finally:
self.reset_model_accessor()
@@ -362,7 +376,7 @@
for step_class in step_list:
step = step_class(driver=self.driver)
- step.log = self.log.bind(step = step)
+ step.log = self.log.bind(step=step)
if not hasattr(step, 'call'):
pending = step.fetch_pending(deletion)
@@ -374,7 +388,10 @@
# This needs to be dropped soon.
pending_steps.append(step)
- self.log.debug('Fetched pending data', pending_objects = pending_objects, legacy_steps = pending_steps)
+ self.log.debug(
+ 'Fetched pending data',
+ pending_objects=pending_objects,
+ legacy_steps=pending_steps)
return pending_objects, pending_steps
""" Automatically test if a real dependency path exists between two objects. e.g.
@@ -424,7 +441,7 @@
raise AttributeError
except AttributeError as e:
self.log.debug(
- 'Could not check object dependencies, making conservative choice', src_object = src_object, sa = sa, o1 = o1, o2 = o2)
+ 'Could not check object dependencies, making conservative choice', src_object=src_object, sa=sa, o1=o1, o2=o2)
return True
src_object = dst_object
@@ -468,18 +485,17 @@
for i in r:
oG.add_node(i)
- for v0, v1 in mG.edges():
- try:
- for i0 in range(n):
- for i1 in range(n):
- if i0 != i1:
- if not deletion and self.concrete_path_exists(
- objects[i0], objects[i1]):
- oG.add_edge(i0, i1)
- elif deletion and self.concrete_path_exists(objects[i1], objects[i0]):
- oG.add_edge(i0, i1)
- except KeyError:
- pass
+ try:
+ for i0 in range(n):
+ for i1 in range(n):
+ if i0 != i1:
+ if not deletion and self.concrete_path_exists(
+ objects[i0], objects[i1]):
+ oG.add_edge(i0, i1)
+ elif deletion and self.concrete_path_exists(objects[i1], objects[i0]):
+ oG.add_edge(i0, i1)
+ except KeyError:
+ pass
components = weakly_connected_component_subgraphs(oG)
cohort_indexes = [reversed(topological_sort(g)) for g in components]
@@ -499,12 +515,13 @@
for deletion in (False, True):
objects_to_process = []
- objects_to_process, steps_to_process = self.fetch_pending(deletion)
+ objects_to_process, steps_to_process = self.fetch_pending(
+ deletion)
dependent_cohorts = self.compute_dependent_cohorts(
objects_to_process, deletion)
threads = []
- self.log.debug('In run once inner loop', deletion = deletion)
+ self.log.debug('In run once inner loop', deletion=deletion)
for cohort in dependent_cohorts:
thread = threading.Thread(
@@ -527,9 +544,10 @@
# Run legacy synchronizers, which do everything in call()
for step in steps_to_process:
try:
- step.call()
- except Exception,e:
- self.log.exception("Legacy step failed", step = step, e = e)
+ step.call(deletion=deletion)
+ except Exception as e:
+ self.log.exception(
+ "Legacy step failed", step=step, e=e)
loop_end = time.time()
@@ -542,7 +560,7 @@
except Exception as e:
self.log.exception(
'Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!',
- e = e)
+ e=e)
self.log.error("Exception in observer run loop")
model_accessor.update_diag(