SEBA-405 Cleanup synchronizer imports of model_accessor to globals;
Move mock modelaccessor to /tmp;
Easier mock modelaccessor configuration
Change-Id: I67a17b9a72ea69f61d92206f1b520a11c2f18d80
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
index 96ce727..bdff10b 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_loop.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -30,7 +30,6 @@
from networkx.algorithms.dag import topological_sort
from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
-from xossynchronizer.modelaccessor import *
from xosconfig import Config
from multistructlog import create_logger
@@ -71,9 +70,9 @@
class XOSObserver(object):
sync_steps = []
- def __init__(self, sync_steps, log=log):
- # The Condition object via which events are received
+ def __init__(self, sync_steps, model_accessor, log=log):
self.log = log
+ self.model_accessor = model_accessor
self.step_lookup = {}
self.sync_steps = sync_steps
@@ -153,9 +152,13 @@
observes = [s.observes]
else:
observes = s.observes
-
for m in observes:
- model_to_step[m.__name__].append(s.__name__)
+ if isinstance(m, str):
+ # observes is a string that names the model
+ model_to_step[m].append(s.__name__)
+ else:
+ # observes is the model class
+ model_to_step[m.__name__].append(s.__name__)
try:
external_dependencies.extend(s.external_dependencies)
@@ -173,7 +176,7 @@
def reset_model_accessor(self, o=None):
try:
- model_accessor.reset_queries()
+ self.model_accessor.reset_queries()
except BaseException:
# this shouldn't happen, but in case it does, catch it...
if o:
@@ -190,13 +193,13 @@
if getattr(o, "backend_need_reap", False):
# the object has already been deleted and marked for reaping
- model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
+ self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
else:
step = getattr(o, "synchronizer_step", None)
if not step:
raise ExternalDependencyFailed
- model_accessor.journal_object(o, "syncstep.call.delete_record")
+ self.model_accessor.journal_object(o, "syncstep.call.delete_record")
dr_log.debug("Deleting object", **o.tologdict())
@@ -206,7 +209,7 @@
dr_log.debug("Deleted object", **o.tologdict())
- model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+ self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=["backend_need_reap"])
@@ -228,7 +231,7 @@
o.backend_need_delete = True
o.save(update_fields=["backend_need_delete"])
- model_accessor.journal_object(o, "syncstep.call.sync_record")
+ self.model_accessor.journal_object(o, "syncstep.call.sync_record")
sr_log.debug("Syncing object", **o.tologdict())
@@ -243,7 +246,7 @@
o.backend_register = json.dumps(scratchpad)
o.backend_status = "OK"
o.backend_code = 1
- model_accessor.journal_object(o, "syncstep.call.save_update")
+ self.model_accessor.journal_object(o, "syncstep.call.save_update")
o.save(
update_fields=[
"enacted",
@@ -339,7 +342,7 @@
# TOFIX:
# DatabaseError: value too long for type character varying(140)
- if model_accessor.obj_exists(o):
+ if self.model_accessor.obj_exists(o):
try:
o.backend_status = o.backend_status[:1024]
o.save(
@@ -397,16 +400,16 @@
cohort_emptied = True
finally:
self.reset_model_accessor()
- model_accessor.connection_close()
+ self.model_accessor.connection_close()
def tenant_class_name_from_service(self, service_name):
""" This code supports legacy functionality. To be cleaned up. """
name1 = service_name + "Instance"
- if hasattr(Slice().stub, name1):
+ if hasattr(self.model_accessor.Slice().stub, name1):
return name1
else:
name2 = service_name.replace("Service", "Tenant")
- if hasattr(Slice().stub, name2):
+ if hasattr(self.model_accessor.Slice().stub, name2):
return name2
else:
return None
@@ -427,14 +430,14 @@
s_model_names = [v for k, v in ugly_tuples]
s_models0 = [
- getattr(Slice().stub, model_name, None) for model_name in s_model_names
+ getattr(self.model_accessor.Slice().stub, model_name, None) for model_name in s_model_names
]
s_models1 = [model.objects.first() for model in s_models0]
s_models = [m for m in s_models1 if m is not None]
dependencies = []
for model in s_models:
- deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
+ deps = self.model_accessor.ServiceDependency.objects.filter(subscriber_service_id=model.id)
if deps:
services = [
self.tenant_class_name_from_service(
@@ -450,7 +453,7 @@
def compute_service_instance_dependencies(self, objects):
link_set = [
- ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
+ self.model_accessor.ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
for o in objects
]
@@ -490,17 +493,22 @@
for e in self.external_dependencies:
s = SyncStep
- s.observes = e
+ if isinstance(e,str):
+ # external dependency is a string that names a model class
+ s.observes = self.model_accessor.get_model_class(e)
+ else:
+ # external dependency is a model class
+ s.observes = e
step_list.append(s)
for step_class in step_list:
- step = step_class(driver=self.driver)
+ step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
if not hasattr(step, "call"):
pending = step.fetch_pending(deletion)
for obj in pending:
- step = step_class(driver=self.driver)
+ step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
obj.synchronizer_step = step
@@ -719,7 +727,7 @@
try:
# Why are we checking the DB connection here?
- model_accessor.check_db_connection_okay()
+ self.model_accessor.check_db_connection_okay()
loop_start = time.time()