Synchronization logic for parallel steps
diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py
index 5a3dae9..f6b92ea 100644
--- a/planetstack/ec2_observer/event_loop.py
+++ b/planetstack/ec2_observer/event_loop.py
@@ -234,59 +234,65 @@
for d in deps:
cond = self.step_conditions[d]
acquire(cond)
- if (self.step_status is STEP_STATUS_WORKING):
+ if (self.step_status[S] is STEP_STATUS_WORKING):
cond.wait()
cond.release()
- sync_step = step(driver=self.driver,error_map=error_mapper)
- sync_step.__name__ = step.__name__
- sync_step.dependencies = []
- try:
- mlist = sync_step.provides
-
- for m in mlist:
- sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
- except KeyError:
- pass
- sync_step.debug_mode = debug_mode
-
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
- self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
- should_run = True
- except StepNotReady:
- logging.info('Step not ready: %s'%sync_step.__name__)
+ if (self.step_status[S] is not STEP_STATUS_OK):
self.failed_steps.append(sync_step)
- except Exception,e:
- logging.error('%r',e)
- logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
- self.failed_steps.append(sync_step)
-
- if (should_run):
- try:
- duration=time.time() - start_time
-
- logger.info('Executing step %s' % sync_step.__name__)
-
- failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
-
- self.check_duration(sync_step, duration)
-
- if failed_objects:
- self.failed_step_objects.update(failed_objects)
-
- my_status = STEP_STATUS_OK
- self.update_run_time(sync_step,deletion)
- except Exception,e:
- logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
- logger.log_exc(e)
- self.failed_steps.append(S)
- my_status = STEP_STATUS_KO
+ my_status = STEP_STATUS_KO
else:
- my_status = STEP_STATUS_OK
+ sync_step = step(driver=self.driver,error_map=error_mapper)
+ sync_step.__name__ = step.__name__
+ sync_step.dependencies = []
+ try:
+ mlist = sync_step.provides
+
+ for m in mlist:
+ sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
+ except KeyError:
+ pass
+ sync_step.debug_mode = debug_mode
+
+ should_run = False
+ try:
+ # Various checks that decide whether
+ # this step runs or not
+ self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
+ self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
+ should_run = True
+ except StepNotReady:
+ logging.info('Step not ready: %s'%sync_step.__name__)
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
+ except Exception,e:
+ logging.error('%r',e)
+ logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
+
+ if (should_run):
+ try:
+ duration=time.time() - start_time
+
+ logger.info('Executing step %s' % sync_step.__name__)
+
+ failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
+
+ self.check_duration(sync_step, duration)
+
+ if failed_objects:
+ self.failed_step_objects.update(failed_objects)
+
+ my_status = STEP_STATUS_OK
+ self.update_run_time(sync_step,deletion)
+ except Exception,e:
+ logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
+ logger.log_exc(e)
+ self.failed_steps.append(S)
+ my_status = STEP_STATUS_KO
+ else:
+ my_status = STEP_STATUS_OK
try:
my_cond = self.step_conditions[S]
@@ -294,11 +300,9 @@
self.step_status[S]=my_status
my_cond.notify_all()
my_cond.release()
- except:
+ except KeyError,e:
+ logging.info('Step %r is a leaf')
pass
- if (self.step_conditions.has_key(S)):
-
-
def run(self):
if not self.driver.enabled: