CORD-1242 Update to support policy-only synchronizers
Change-Id: Ifd8d73f9eb01feda554daca5484f956a68fc7fcf
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 72de08f..eccd570 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -23,13 +23,8 @@
def __init__(self):
pass
- def load_sync_step_modules(self, step_dir=None):
+ def load_sync_step_modules(self, step_dir):
sync_steps = []
- if step_dir is None:
- try:
- step_dir = Config().observer_steps_dir
- except:
- step_dir = '/opt/xos/synchronizers/openstack/steps'
logger.info("Loading sync steps from %s" % step_dir)
@@ -55,23 +50,28 @@
return sync_steps
def run(self):
+ observer_thread = None
watcher_thread = None
model_policy_thread = None
model_accessor.update_diag(sync_start=time.time(), backend_status="0 - Synchronizer Start")
- sync_steps = self.load_sync_step_modules()
+ steps_dir = Config().observer_steps_dir
+ if steps_dir:
+ sync_steps = self.load_sync_step_modules(steps_dir)
+ if sync_steps:
+ # start the observer
+ observer = XOSObserver(sync_steps)
+ observer_thread = threading.Thread(target=observer.run,name='synchronizer')
+ observer_thread.start()
- # start the observer
- observer = XOSObserver(sync_steps)
- observer_thread = threading.Thread(target=observer.run,name='synchronizer')
- observer_thread.start()
-
- # start the watcher thread
- if (watchers_enabled):
- watcher = XOSWatcher(sync_steps)
- watcher_thread = threading.Thread(target=watcher.run,name='watcher')
- watcher_thread.start()
+ # start the watcher thread
+ if (watchers_enabled):
+ watcher = XOSWatcher(sync_steps)
+ watcher_thread = threading.Thread(target=watcher.run,name='watcher')
+ watcher_thread.start()
+ else:
+ logger.info("Skipping observer and watcher threads due to no steps dir.")
# start model policies thread
policies_dir = getattr(Config(), "observer_model_policies_dir", None)
@@ -80,7 +80,6 @@
model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine")
model_policy_thread.start()
else:
- model_policy_thread = None
logger.info("Skipping model policies thread due to no model_policies dir.")
while True:
@@ -89,7 +88,8 @@
except KeyboardInterrupt:
print "exiting due to keyboard interrupt"
# TODO: See about setting the threads as daemons
- observer_thread._Thread__stop()
+ if observer_thread:
+ observer_thread._Thread__stop()
if watcher_thread:
watcher_thread._Thread__stop()
if model_policy_thread:
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
index fe15f09..0c55cb1 100644
--- a/xos/synchronizers/new_base/model_policy_loop.py
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -93,6 +93,7 @@
#elif (sender_name in delete_policy_models):
# walk_inv_deps(self.delete_if_inactive, instance)
+ policies_failed = False
for policy in self.policies_by_name.get(sender_name, None):
method_name= "handle_%s" % action
if hasattr(policy, method_name):
@@ -102,12 +103,14 @@
logger.debug("MODEL POLICY: completed handler %s %s %s %s" % (sender_name, instance, policy.__name__, method_name))
except:
logger.log_exc("MODEL POLICY: Exception when running handler")
+ policies_failed = True
- try:
- instance.policed=model_accessor.now()
- instance.save(update_fields=['policed'])
- except:
- logger.log_exc('MODEL POLICY: Object %r failed to update policed timestamp' % instance)
+ if not policies_failed:
+ try:
+ instance.policed=model_accessor.now()
+ instance.save(update_fields=['policed'])
+ except:
+ logger.log_exc('MODEL POLICY: Object %r failed to update policed timestamp' % instance)
def noop(self, o,p):
pass