Updated OpenStack observer to parallel implementation
diff --git a/planetstack/openstack_observer/event_loop.py b/planetstack/openstack_observer/event_loop.py
index 12e88ab..1f15a8e 100644
--- a/planetstack/openstack_observer/event_loop.py
+++ b/planetstack/openstack_observer/event_loop.py
@@ -6,6 +6,7 @@
import commands
import threading
import json
+import pdb
from datetime import datetime
from collections import defaultdict
@@ -31,11 +32,27 @@
class NoOpDriver:
def __init__(self):
self.enabled = True
+ self.dependency_graph = None
+
+STEP_STATUS_WORKING=1
+STEP_STATUS_OK=2
+STEP_STATUS_KO=3
+
+def invert_graph(g):
+ ig = {}
+ for k,v in g.items():
+ for v0 in v:
+ try:
+ ig[v0].append(k)
+ except:
+ ig=[k]
+ return ig
class PlanetStackObserver:
#sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
sync_steps = []
+
def __init__(self):
# The Condition object that gets signalled by Feefie events
self.step_lookup = {}
@@ -154,9 +171,10 @@
pass
# no dependencies, pass
- dependency_graph = step_graph
+ self.dependency_graph = step_graph
+ self.deletion_dependency_graph = invert_graph(step_graph)
- self.ordered_steps = toposort(dependency_graph, map(lambda s:s.__name__,self.sync_steps))
+ self.ordered_steps = toposort(self.dependency_graph, map(lambda s:s.__name__,self.sync_steps))
print "Order of steps=",self.ordered_steps
self.load_run_times()
@@ -204,7 +222,6 @@
self.last_deletion_run_times[e]=0
-
def save_run_times(self):
run_times = json.dumps(self.last_run_times)
open('/tmp/observer_run_times','w').write(run_times)
@@ -220,18 +237,35 @@
if (failed_step in step.dependencies):
raise StepNotReady
- def sync(self, ordered_steps, error_mapper, deletion):
- # Set of whole steps that failed
- failed_steps = []
+ def sync(self, S, deletion):
+ step = self.step_lookup[S]
+ start_time=time.time()
+
+ dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
- # Set of individual objects within steps that failed
- failed_step_objects = set()
+ # Wait for step dependencies to be met
+ try:
+ deps = self.dependency_graph[S]
+ has_deps = True
+ except KeyError:
+ has_deps = False
- for S in ordered_steps:
- step = self.step_lookup[S]
- start_time=time.time()
-
- sync_step = step(driver=self.driver,error_map=error_mapper)
+ if (has_deps):
+ for d in deps:
+ cond = self.step_conditions[d]
+ cond.acquire()
+ if (self.step_status[d] is STEP_STATUS_WORKING):
+ cond.wait()
+ cond.release()
+ go = self.step_status[d] == STEP_STATUS_OK
+ else:
+ go = True
+
+ if (not go):
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
+ else:
+ sync_step = step(driver=self.driver,error_map=self.error_mapper)
sync_step.__name__ = step.__name__
sync_step.dependencies = []
try:
@@ -247,16 +281,18 @@
try:
# Various checks that decide whether
# this step runs or not
- self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
+ self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
should_run = True
except StepNotReady:
logging.info('Step not ready: %s'%sync_step.__name__)
- failed_steps.append(sync_step)
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
except Exception,e:
logging.error('%r',e)
logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
- failed_steps.append(sync_step)
+ self.failed_steps.append(sync_step)
+ my_status = STEP_STATUS_KO
if (should_run):
try:
@@ -264,31 +300,66 @@
logger.info('Executing step %s' % sync_step.__name__)
- # ********* This is the actual sync step
- #import pdb
- #pdb.set_trace()
- failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
-
+ failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
self.check_duration(sync_step, duration)
- if failed_objects:
- failed_step_objects.update(failed_objects)
+ if failed_objects:
+ self.failed_step_objects.update(failed_objects)
+
+ my_status = STEP_STATUS_OK
self.update_run_time(sync_step,deletion)
except Exception,e:
logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
logger.log_exc(e)
- failed_steps.append(S)
+ self.failed_steps.append(S)
+ my_status = STEP_STATUS_KO
+ else:
+ my_status = STEP_STATUS_OK
+
+ try:
+ my_cond = self.step_conditions[S]
+ my_cond.acquire()
+ self.step_status[S]=my_status
+ my_cond.notify_all()
+ my_cond.release()
+ except KeyError,e:
+ logging.info('Step %r is a leaf')
+ pass
+
def run(self):
if not self.driver.enabled:
return
+
if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
return
while True:
try:
error_map_file = getattr(Config(), "error_map_path", "/opt/planetstack/error_map.txt")
- error_mapper = ErrorMapper(error_map_file)
+ self.error_mapper = ErrorMapper(error_map_file)
+
+ # Set of whole steps that failed
+ self.failed_steps = []
+
+ # Set of individual objects within steps that failed
+ self.failed_step_objects = set()
+
+ # Set up conditions and step status
+ # This is needed for steps to run in parallel
+ # while obeying dependencies.
+
+ providers = set()
+ for v in self.dependency_graph.values():
+ if (v):
+ providers.update(v)
+
+ self.step_conditions = {}
+ self.step_status = {}
+ for p in list(providers):
+ self.step_conditions[p] = threading.Condition()
+ self.step_status[p] = STEP_STATUS_WORKING
+
logger.info('Waiting for event')
tBeforeWait = time.time()
@@ -299,10 +370,13 @@
for deletion in [False,True]:
threads = []
logger.info('Deletion=%r...'%deletion)
- ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps)
- thread = threading.Thread(target=self.sync, args=(ordered_steps,error_mapper,deletion))
- logger.info('Deletion=%r...'%deletion)
- threads.append(thread)
+ schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
+
+ for S in schedule:
+ thread = threading.Thread(target=self.sync, args=(S, deletion))
+
+ logger.info('Deletion=%r...'%deletion)
+ threads.append(thread)
# Start threads
for t in threads: