add run_once methods for model_policy and event_loop
diff --git a/xos/model_policy.py b/xos/model_policy.py
index ce59a32..ced785e 100644
--- a/xos/model_policy.py
+++ b/xos/model_policy.py
@@ -97,9 +97,14 @@
pass
def run_policy():
- from core.models import Instance,Slice,Controller,Network,User,SlicePrivilege,Site,SitePrivilege,Image,ControllerSlice,ControllerUser,ControllerSite
while (True):
start = time.time()
+ run_policy_once()
+ if (time.time()-start<1):
+ time.sleep(1)
+
+def run_policy_once():
+ from core.models import Instance,Slice,Controller,Network,User,SlicePrivilege,Site,SitePrivilege,Image,ControllerSlice,ControllerUser,ControllerSite
models = [Instance,Slice, Controller, Network, User, SlicePrivilege, Site, SitePrivilege, Image, ControllerSlice, ControllerSite, ControllerUser]
objects = []
deleted_objects = []
@@ -132,6 +137,3 @@
except:
# this shouldn't happen, but in case it does, catch it...
logger.log_exc("exception in reset_queries")
-
- if (time.time()-start<1):
- time.sleep(1)
diff --git a/xos/openstack_observer/event_loop.py b/xos/openstack_observer/event_loop.py
index 31db875..aec667b 100644
--- a/xos/openstack_observer/event_loop.py
+++ b/xos/openstack_observer/event_loop.py
@@ -410,72 +410,74 @@
return
while True:
- try:
- loop_start = time.time()
- error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
- self.error_mapper = ErrorMapper(error_map_file)
+ logger.info('Waiting for event')
+ self.wait_for_event(timeout=5)
+ logger.info('Observer woke up')
- logger.info('Waiting for event')
- tBeforeWait = time.time()
- self.wait_for_event(timeout=5)
- logger.info('Observer woke up')
+ self.run_once()
- # Two passes. One for sync, the other for deletion.
- for deletion in [False,True]:
- # Set of individual objects within steps that failed
- self.failed_step_objects = set()
+ def run_once(self):
+ try:
+ loop_start = time.time()
+ error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
+ self.error_mapper = ErrorMapper(error_map_file)
- # Set up conditions and step status
- # This is needed for steps to run in parallel
- # while obeying dependencies.
+ # Two passes. One for sync, the other for deletion.
+ for deletion in [False,True]:
+ # Set of individual objects within steps that failed
+ self.failed_step_objects = set()
- providers = set()
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
+ # Set up conditions and step status
+ # This is needed for steps to run in parallel
+ # while obeying dependencies.
- for v in dependency_graph.values():
- if (v):
- providers.update(v)
+ providers = set()
+ dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
- self.step_conditions = {}
- self.step_status = {}
+ for v in dependency_graph.values():
+ if (v):
+ providers.update(v)
- for p in list(providers):
- self.step_conditions[p] = threading.Condition()
+ self.step_conditions = {}
+ self.step_status = {}
- self.step_status[p] = STEP_STATUS_WORKING
+ for p in list(providers):
+ self.step_conditions[p] = threading.Condition()
- self.failed_steps = []
+ self.step_status[p] = STEP_STATUS_WORKING
- threads = []
- logger.info('Deletion=%r...'%deletion)
- schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
+ self.failed_steps = []
- for S in schedule:
- thread = threading.Thread(target=self.sync, args=(S, deletion))
+ threads = []
+ logger.info('Deletion=%r...'%deletion)
+ schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
- logger.info('Deletion=%r...'%deletion)
- threads.append(thread)
+ for S in schedule:
+ thread = threading.Thread(target=self.sync, args=(S, deletion))
- # Start threads
- for t in threads:
- t.start()
+ logger.info('Deletion=%r...'%deletion)
+ threads.append(thread)
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries")
+ # Start threads
+ for t in threads:
+ t.start()
- # Wait for all threads to finish before continuing with the run loop
- for t in threads:
- t.join()
+ # another spot to clean up debug state
+ try:
+ reset_queries()
+ except:
+ # this shouldn't happen, but in case it does, catch it...
+ logger.log_exc("exception in reset_queries")
- self.save_run_times()
+ # Wait for all threads to finish before continuing with the run loop
+ for t in threads:
+ t.join()
- loop_end = time.time()
- open('/tmp/%sobserver_last_run'%self.observer_name,'w').write(json.dumps({'last_run': loop_end, 'last_duration':loop_end - loop_start}))
- except Exception, e:
- logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
+ self.save_run_times()
+
+ loop_end = time.time()
+ open('/tmp/%sobserver_last_run'%self.observer_name,'w').write(json.dumps({'last_run': loop_end, 'last_duration':loop_end - loop_start}))
+ except Exception, e:
+ logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
+ logger.log_exc("Exception in observer run loop")
+ traceback.print_exc()