Class and object dependencies, schedules
diff --git a/dmdot b/dmdot
new file mode 100755
index 0000000..2d95e9d
--- /dev/null
+++ b/dmdot
@@ -0,0 +1,49 @@
+#!/usr/bin/python
+
+import os
+import pdb
+import sys
+import json
+
+sys.path.append('.')
+
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
+
+from django.db.models.fields.related import ForeignKey
+from core.models import *
+
+try:
+ output = sys.args[1]
+except:
+ output = '-json'
+
+g = globals()
+model_classes = []
+class_names = []
+for c in g.values():
+ if type(c)==type(PlCoreBase):
+ model_classes.append(c)
+ class_names.append(c.__name__)
+
+
+if (output=='-dot'):
+ print "digraph plstack {";
+ for c in model_classes:
+ fields = c._meta.fields
+ for f in fields:
+ if type(f)==ForeignKey and f.name.title() in class_names:
+ print '\t"%s"->"%s";'%(c.__name__,f.name.title())
+ print "}\n";
+elif (output=='-json'):
+ d = {}
+ for c in model_classes:
+ fields = c._meta.fields
+ for f in fields:
+ if type(f)==ForeignKey and f.name.title() in class_names:
+ try:
+ d[c.__name__].append(f.name.title())
+ except KeyError:
+ d[c.__name__]=[f.name.title()]
+ print json.dumps(d,indent=4)
+
+
diff --git a/planetstack.deps b/planetstack.deps
new file mode 100644
index 0000000..6eae1fc
--- /dev/null
+++ b/planetstack.deps
@@ -0,0 +1,47 @@
+{
+ "Node": [
+ "Site",
+ "Deployment"
+ ],
+ "Slice": [
+ "Site"
+ ],
+ "ReservedResource": [
+ "Sliver"
+ ],
+ "SliceMembership": [
+ "User",
+ "Slice",
+ "Role"
+ ],
+ "NetworkSlice": [
+ "Network",
+ "Slice"
+ ],
+ "Tag": [
+ "Project"
+ ],
+ "User": [
+ "Site"
+ ],
+ "SliceTag": [
+ "Slice"
+ ],
+ "Reservation": [
+ "Slice"
+ ],
+ "NetworkSliver": [
+ "Network",
+ "Sliver"
+ ],
+ "SitePrivilege": [
+ "User",
+ "Site",
+ "Role"
+ ],
+ "Sliver": [
+ "Image",
+ "Slice",
+ "Node"
+ ]
+}
diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py
index 4b11504..b565a15 100644
--- a/planetstack/observer/event_loop.py
+++ b/planetstack/observer/event_loop.py
@@ -12,9 +12,13 @@
from util.logger import Logger, logging, logger
#from timeout import timeout
+debug_mode = False
logger = Logger(logfile='observer.log', level=logging.INFO)
+class StepNotReady(Exception):
+ pass
+
def toposort(g, steps):
reverse = {}
@@ -54,23 +58,23 @@
class PlanetStackObserver:
sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps']
- def __init__(self):
- self.manager = OpenStackManager()
- # The Condition object that gets signalled by Feefie events
+ def __init__(self):
+ self.manager = OpenStackManager()
+ # The Condition object that gets signalled by Feefie events
self.load_sync_steps()
- self.event_cond = threading.Condition()
+ self.event_cond = threading.Condition()
self.load_enacted()
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r'%self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
+ def wait_for_event(self, timeout):
+ self.event_cond.acquire()
+ self.event_cond.wait(timeout)
+ self.event_cond.release()
+
+ def wake_up(self):
+ logger.info('Wake up routine called. Event cond %r'%self.event_cond)
+ self.event_cond.acquire()
+ self.event_cond.notify()
+ self.event_cond.release()
def load_sync_steps(self):
dep_path = Config().pl_dependency_path
@@ -100,6 +104,7 @@
try:
dest = provides_dict[m]
except KeyError:
+ pass
# no deps, pass
step_graph[source]=dest
@@ -121,6 +126,7 @@
dest = backend_dict[m]
except KeyError:
# no deps, pass
+ pass
step_graph[source]=dest
except KeyError:
@@ -130,32 +136,85 @@
dependency_graph = step_graph
self.ordered_steps = toposort(dependency_graph, steps)
-
+ self.last_run_times={}
+ for e in self.ordered_steps:
+ self.last_run_times[e.name]=0
- def run(self):
- if not self.manager.enabled or not self.manager.has_openstack:
- return
+ def check_duration(self):
+ try:
+ if (duration > S.deadline):
+ logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
+ except AttributeError:
+ # S doesn't have a deadline
+ pass
-
- while True:
- try:
- start_time=time.time()
-
- logger.info('Waiting for event')
- tBeforeWait = time.time()
- self.wait_for_event(timeout=300)
+ def update_run_time(self, step):
+ self.last_run_times[step.name]=time.time()
+
+ def check_schedule(self, step):
+ time_since_last_run = time.time() - self.last_run_times[step.name]
+ try:
+ if (time_since_last_run < step.requested_interval):
+ raise StepNotReady
+ except AttributeError:
+ logger.info('Step %s does not have requested_interval set'%step.name)
+ raise StepNotReady
+
+ def check_class_dependency(self, step, failed_steps):
+ for failed_step in failed_steps:
+ if (failed_step in self.dependency_graph[step.name]):
+ raise StepNotReady
+
+ def run(self):
+ if not self.manager.enabled or not self.manager.has_openstack:
+ return
+
+ while True:
+ try:
+ logger.info('Waiting for event')
+ tBeforeWait = time.time()
+ self.wait_for_event(timeout=300)
+ logger.info('Observer woke up')
+
+ # Set of whole steps that failed
+ failed_steps = []
+
+ # Set of individual objects within steps that failed
+ failed_step_objects = []
for S in self.ordered_steps:
+ start_time=time.time()
+
sync_step = S()
- sync_step()
+ sync_step.dependencies = self.dependencies[sync_step.name]
+ sync_step.debug_mode = debug_mode
- # Enforce 5 minutes between wakeups
- tSleep = 300 - (time.time() - tBeforeWait)
- if tSleep > 0:
- logger.info('Sleeping for %d seconds' % tSleep)
- time.sleep(tSleep)
+ should_run = False
+ try:
+ # Various checks that decide whether
+ # this step runs or not
+ self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
+ self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
+ should_run = True
+ except StepNotReady:
+ logging.info('Step not ready: %s'%sync_step.name)
+ failed_steps.add(sync_step)
+ except:
+ failed_steps.add(sync_step)
- logger.info('Observer woke up')
- except:
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
+ if (should_run):
+ try:
+ duration=time.time() - start_time
+
+ # ********* This is the actual sync step
+ failed_objects = sync_step(failed=failed_step_objects)
+
+
+ check_deadline(sync_step, duration)
+ failed_step_objects.extend(failed_objects)
+ self.update_run_time(sync_step)
+ except:
+ failed_steps.add(S)
+ except:
+ logger.log_exc("Exception in observer run loop")
+ traceback.print_exc()
diff --git a/planetstack/observer/openstacksyncstep.py b/planetstack/observer/openstacksyncstep.py
index 7bfe9f4..3ce3c68 100644
--- a/planetstack/observer/openstacksyncstep.py
+++ b/planetstack/observer/openstacksyncstep.py
@@ -10,17 +10,7 @@
super(SyncStep,self).__init__(**args)
return
- def call(self):
- pending = self.fetch_pending()
- failed = []
- for o in pending:
- if (not self.depends_on(o, failed)):
- try:
- self.sync_record(o)
- o.enacted = datetime.now() # Is this the same timezone? XXX
- o.save(update_fields=['enacted'])
- except:
- failed.append(o)
+
def __call__(self):
diff --git a/planetstack/observer/syncstep.py b/planetstack/observer/syncstep.py
index b206106..f3eb4ba 100644
--- a/planetstack/observer/syncstep.py
+++ b/planetstack/observer/syncstep.py
@@ -2,6 +2,9 @@
import base64
from planetstack.config import Config
+class FailedDependency(Exception):
+ pass
+
class SyncStep:
""" A PlanetStack Sync step.
@@ -24,6 +27,7 @@
name -- Name of the step
provides -- PlanetStack models sync'd by this step
"""
+ dependencies = []
try:
self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
except:
@@ -33,9 +37,26 @@
def fetch_pending(self):
return Sliver.objects.filter(ip=None)
+
+ def check_dependencies(self, obj):
+ for dep in dependencies:
+ peer_object = getattr(obj, dep.name.lowercase())
+ if (peer_object.pk==dep.pk):
+ raise DependencyFailed
- def call(self):
- return True
+ def call(self, failed=failed_objects):
+ pending = self.fetch_pending()
+ failed = []
+ for o in pending:
+ if (not self.depends_on(o, failed)):
+ try:
+ check_dependencies(o) # Raises exception if failed
+ self.sync_record(o)
+ o.enacted = datetime.now() # Is this the same timezone? XXX
+ o.save(update_fields=['enacted'])
+ except:
+ failed.append(o)
+ return failed
def __call__(self):
return self.call()