tabs, bugfixes
diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py
index 492cd9a..9884390 100644
--- a/planetstack/observer/event_loop.py
+++ b/planetstack/observer/event_loop.py
@@ -20,228 +20,228 @@
logger = Logger(logfile='observer.log', level=logging.INFO)
class StepNotReady(Exception):
- pass
+ pass
def toposort(g, steps):
- reverse = {}
+ reverse = {}
- for k,v in g.items():
- for rk in v:
- try:
- reverse[rk].append(k)
- except:
- reverse[rk]=k
+ for k,v in g.items():
+ for rk in v:
+ try:
+ reverse[rk].append(k)
+ except:
+ reverse[rk]=k
- sources = []
- for k,v in g.items():
- if not reverse.has_key(k):
- sources.append(k)
+ sources = []
+ for k,v in g.items():
+ if not reverse.has_key(k):
+ sources.append(k)
- for k,v in reverse.iteritems():
- if (not v):
- sources.append(k)
+ for k,v in reverse.iteritems():
+ if (not v):
+ sources.append(k)
- order = []
- marked = []
+ order = []
+ marked = []
- while sources:
- n = sources.pop()
- try:
- for m in g[n]:
- if m not in marked:
- sources.append(m)
- marked.append(m)
- except KeyError:
- pass
- order.append(n)
- return order
+ while sources:
+ n = sources.pop()
+ try:
+ for m in g[n]:
+ if m not in marked:
+ sources.append(m)
+ marked.append(m)
+ except KeyError:
+ pass
+ order.append(n)
+ return order
class PlanetStackObserver:
- sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps]
+ sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps]
- def __init__(self):
- # The Condition object that gets signalled by Feefie events
- self.load_sync_steps()
- self.event_cond = threading.Condition()
+ def __init__(self):
+ # The Condition object that gets signalled by Feefie events
+ self.load_sync_steps()
+ self.event_cond = threading.Condition()
self.driver = OpenStackDriver()
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r'%self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
+ def wait_for_event(self, timeout):
+ self.event_cond.acquire()
+ self.event_cond.wait(timeout)
+ self.event_cond.release()
+
+ def wake_up(self):
+ logger.info('Wake up routine called. Event cond %r'%self.event_cond)
+ self.event_cond.acquire()
+ self.event_cond.notify()
+ self.event_cond.release()
- def load_sync_steps(self):
- dep_path = Config().observer_dependency_path
- try:
- # This contains dependencies between records, not sync steps
- self.model_dependency_graph = json.loads(open(dep_path).read())
- except Exception,e:
- raise e
+ def load_sync_steps(self):
+ dep_path = Config().observer_backend_dependency_graph
+ try:
+ # This contains dependencies between records, not sync steps
+ self.model_dependency_graph = json.loads(open(dep_path).read())
+ except Exception,e:
+ raise e
- backend_path = Config().observer_backend_dependency_path
- try:
- # This contains dependencies between backend records
- self.backend_dependency_graph = json.loads(open(backend_path).read())
- except Exception,e:
- # We can work without a backend graph
- self.backend_dependency_graph = {}
+ backend_path = Config().observer_backend_dependency_graph
+ try:
+ # This contains dependencies between backend records
+ self.backend_dependency_graph = json.loads(open(backend_path).read())
+ except Exception,e:
+ # We can work without a backend graph
+ self.backend_dependency_graph = {}
- provides_dict = {}
- for s in self.sync_steps:
- for m in s.provides:
- try:
- provides_dict[m.__name__].append(s.__name__)
- except KeyError:
- provides_dict[m.__name__]=[s.__name__]
+ provides_dict = {}
+ for s in self.sync_steps:
+ for m in s.provides:
+ try:
+ provides_dict[m.__name__].append(s.__name__)
+ except KeyError:
+ provides_dict[m.__name__]=[s.__name__]
-
- step_graph = {}
- for k,v in self.model_dependency_graph.iteritems():
- try:
- for source in provides_dict[k]:
- for m in v:
- try:
- for dest in provides_dict[m]:
- # no deps, pass
- try:
- step_graph[source].append(dest)
- except:
- step_graph[source]=[dest]
- except KeyError:
- pass
-
- except KeyError:
- pass
- # no dependencies, pass
-
- import pdb
- pdb.set_trace()
- if (self.backend_dependency_graph):
- backend_dict = {}
- for s in sync_steps:
- for m in s.serves:
- backend_dict[m]=s.__name__
-
- for k,v in backend_dependency_graph.iteritems():
- try:
- source = backend_dict[k]
- for m in v:
- try:
- dest = backend_dict[m]
- except KeyError:
- # no deps, pass
- pass
- step_graph[source]=dest
-
- except KeyError:
- pass
- # no dependencies, pass
+
+ step_graph = {}
+ for k,v in self.model_dependency_graph.iteritems():
+ try:
+ for source in provides_dict[k]:
+ for m in v:
+ try:
+ for dest in provides_dict[m]:
+ # no deps, pass
+ try:
+ step_graph[source].append(dest)
+ except:
+ step_graph[source]=[dest]
+ except KeyError:
+ pass
+
+ except KeyError:
+ pass
+ # no dependencies, pass
+
+ #import pdb
+ #pdb.set_trace()
+ if (self.backend_dependency_graph):
+ backend_dict = {}
+ for s in self.sync_steps:
+ for m in s.serves:
+ backend_dict[m]=s.__name__
+
+ for k,v in backend_dependency_graph.iteritems():
+ try:
+ source = backend_dict[k]
+ for m in v:
+ try:
+ dest = backend_dict[m]
+ except KeyError:
+ # no deps, pass
+ pass
+ step_graph[source]=dest
+
+ except KeyError:
+ pass
+ # no dependencies, pass
- dependency_graph = step_graph
+ dependency_graph = step_graph
- self.ordered_steps = toposort(dependency_graph, self.sync_steps)
- print "Order of steps=",self.ordered_steps
- self.load_run_times()
-
+ self.ordered_steps = toposort(dependency_graph, self.sync_steps)
+ print "Order of steps=",self.ordered_steps
+ self.load_run_times()
+
- def check_duration(self):
- try:
- if (duration > S.deadline):
- logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
- except AttributeError:
- # S doesn't have a deadline
- pass
+ def check_duration(self):
+ try:
+ if (duration > S.deadline):
+ logger.info('Sync step %s missed deadline, took %.2f seconds'%(S.name,duration))
+ except AttributeError:
+ # S doesn't have a deadline
+ pass
- def update_run_time(self, step):
- self.last_run_times[step.name]=time.time()
+ def update_run_time(self, step):
+ self.last_run_times[step.name]=time.time()
- def check_schedule(self, step):
- time_since_last_run = time.time() - self.last_run_times[step.name]
- try:
- if (time_since_last_run < step.requested_interval):
- raise StepNotReady
- except AttributeError:
- logger.info('Step %s does not have requested_interval set'%step.name)
- raise StepNotReady
-
- def load_run_times(self):
- try:
- jrun_times = open('/tmp/observer_run_times').read()
- self.last_run_times = json.loads(jrun_times)
- except:
- self.last_run_times={}
- for e in self.ordered_steps:
- self.last_run_times[e.name]=0
+ def check_schedule(self, step):
+ time_since_last_run = time.time() - self.last_run_times[step.name]
+ try:
+ if (time_since_last_run < step.requested_interval):
+ raise StepNotReady
+ except AttributeError:
+ logger.info('Step %s does not have requested_interval set'%step.name)
+ raise StepNotReady
+
+ def load_run_times(self):
+ try:
+ jrun_times = open('/tmp/observer_run_times').read()
+ self.last_run_times = json.loads(jrun_times)
+ except:
+ self.last_run_times={}
+ for e in self.ordered_steps:
+ self.last_run_times[e.name]=0
- def save_run_times(self):
- run_times = json.dumps(self.last_run_times)
- open('/tmp/observer_run_times','w').write(run_times)
+ def save_run_times(self):
+ run_times = json.dumps(self.last_run_times)
+ open('/tmp/observer_run_times','w').write(run_times)
- def check_class_dependency(self, step, failed_steps):
- for failed_step in failed_steps:
- if (failed_step in self.dependency_graph[step.name]):
- raise StepNotReady
+ def check_class_dependency(self, step, failed_steps):
+ for failed_step in failed_steps:
+ if (failed_step in self.dependency_graph[step.name]):
+ raise StepNotReady
- def run(self):
- if not self.driver.enabled or not self.driver.has_openstack:
- return
+ def run(self):
+ if not self.driver.enabled or not self.driver.has_openstack:
+ return
- while True:
- try:
- logger.info('Waiting for event')
- tBeforeWait = time.time()
- self.wait_for_event(timeout=300)
- logger.info('Observer woke up')
+ while True:
+ try:
+ logger.info('Waiting for event')
+ tBeforeWait = time.time()
+ self.wait_for_event(timeout=300)
+ logger.info('Observer woke up')
- # Set of whole steps that failed
- failed_steps = []
+ # Set of whole steps that failed
+ failed_steps = []
- # Set of individual objects within steps that failed
- failed_step_objects = []
+ # Set of individual objects within steps that failed
+ failed_step_objects = []
- for S in self.ordered_steps:
- start_time=time.time()
-
- sync_step = S(driver=self.driver)
- sync_step.dependencies = self.dependencies[sync_step.name]
- sync_step.debug_mode = debug_mode
+ for S in self.ordered_steps:
+ start_time=time.time()
+
+ sync_step = S(driver=self.driver)
+ sync_step.dependencies = self.dependencies[sync_step.name]
+ sync_step.debug_mode = debug_mode
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
- self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
- should_run = True
- except StepNotReady:
- logging.info('Step not ready: %s'%sync_step.name)
- failed_steps.add(sync_step)
- except:
- failed_steps.add(sync_step)
+ should_run = False
+ try:
+ # Various checks that decide whether
+ # this step runs or not
+ self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
+ self.check_schedule(sync_step) # dont run sync_network_routes if time since last run < 1 hour
+ should_run = True
+ except StepNotReady:
+ logging.info('Step not ready: %s'%sync_step.name)
+ failed_steps.add(sync_step)
+ except:
+ failed_steps.add(sync_step)
- if (should_run):
- try:
- duration=time.time() - start_time
+ if (should_run):
+ try:
+ duration=time.time() - start_time
- # ********* This is the actual sync step
- failed_objects = sync_step(failed=failed_step_objects)
+ # ********* This is the actual sync step
+ failed_objects = sync_step(failed=failed_step_objects)
- check_deadline(sync_step, duration)
- failed_step_objects.extend(failed_objects)
- self.update_run_time(sync_step)
- except:
- failed_steps.add(S)
- self.save_run_times()
- except:
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
+ check_deadline(sync_step, duration)
+ failed_step_objects.extend(failed_objects)
+ self.update_run_time(sync_step)
+ except:
+ failed_steps.add(S)
+ self.save_run_times()
+ except:
+ logger.log_exc("Exception in observer run loop")
+ traceback.print_exc()
diff --git a/planetstack/observer/event_manager.py b/planetstack/observer/event_manager.py
index 857452b..de622f5 100644
--- a/planetstack/observer/event_manager.py
+++ b/planetstack/observer/event_manager.py
@@ -10,80 +10,80 @@
from fofum import Fofum
import json
-# decorator that marks dispatachable event methods
+# decorator that marks dispatachable event methods
def event(func):
- setattr(func, 'event', func.__name__)
- return func
+ setattr(func, 'event', func.__name__)
+ return func
class EventHandler:
- # This code is currently not in use.
- def __init__(self):
+ # This code is currently not in use.
+ def __init__(self):
pass
- @staticmethod
- def get_events():
- events = []
- for name in dir(EventHandler):
- attribute = getattr(EventHandler, name)
- if hasattr(attribute, 'event'):
- events.append(getattr(attribute, 'event'))
- return events
+ @staticmethod
+ def get_events():
+ events = []
+ for name in dir(EventHandler):
+ attribute = getattr(EventHandler, name)
+ if hasattr(attribute, 'event'):
+ events.append(getattr(attribute, 'event'))
+ return events
- def dispatch(self, event, *args, **kwds):
- if hasattr(self, event):
- return getattr(self, event)(*args, **kwds)
-
-
+ def dispatch(self, event, *args, **kwds):
+ if hasattr(self, event):
+ return getattr(self, event)(*args, **kwds)
+
+
class EventSender:
- def __init__(self,user=None,clientid=None):
- try:
- clid = Config().feefie_client_id
- user = Config().feefie_client_user
- except:
- clid = 'planetstack_core_team'
- user = 'pl'
+ def __init__(self,user=None,clientid=None):
+ try:
+ clid = Config().feefie_client_id
+ user = Config().feefie_client_user
+ except:
+ clid = 'planetstack_core_team'
+ user = 'pl'
- self.fofum = Fofum(user=user)
- self.fofum.make(clid)
+ self.fofum = Fofum(user=user)
+ self.fofum.make(clid)
- def fire(self,**args):
- self.fofum.fire(json.dumps(args))
+ def fire(self,**args):
+ self.fofum.fire(json.dumps(args))
class EventListener:
- def __init__(self,wake_up=None):
- self.handler = EventHandler()
- self.wake_up = wake_up
+ def __init__(self,wake_up=None):
+ self.handler = EventHandler()
+ self.wake_up = wake_up
- def handle_event(self, payload):
- payload_dict = json.loads(payload)
+ def handle_event(self, payload):
+ payload_dict = json.loads(payload)
- try:
- deletion = payload_dict['deletion_flag']
- if (deletion):
- model = payload_dict['model']
- pk = payload_dict['pk']
+ try:
+ deletion = payload_dict['deletion_flag']
+ if (deletion):
+ model = payload_dict['model']
+ pk = payload_dict['pk']
- for deleter in deleters[model]:
- deleter(pk)
- except:
- deletion = False
+ for deleter in deleters[model]:
+ deleter(pk)
+ except:
+ deletion = False
- if (not deletion and self.wake_up):
- self.wake_up()
-
+ if (not deletion and self.wake_up):
+ self.wake_up()
+
- def run(self):
- # This is our unique client id, to be used when firing and receiving events
- # It needs to be generated once and placed in the config file
+ def run(self):
+ # This is our unique client id, to be used when firing and receiving events
+ # It needs to be generated once and placed in the config file
- try:
- clid = Config().feefie_client_id
- user = Config().feefie_client_user
- except:
- clid = 'planetstack_core_team'
- user = 'pl'
+ try:
+ clid = Config().feefie_client_id
+ user = Config().feefie_client_user
+ except:
+ clid = 'planetstack_core_team'
+ user = 'pl'
- f = Fofum(user=user)
-
- listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
- listener_thread.start()
+ f = Fofum(user=user)
+
+ listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
+ listener_thread.start()
diff --git a/planetstack/observer/steps/sync_networks.py b/planetstack/observer/steps/sync_networks.py
index f87d241..656ae68 100644
--- a/planetstack/observer/steps/sync_networks.py
+++ b/planetstack/observer/steps/sync_networks.py
@@ -5,54 +5,54 @@
from core.models.network import *
class SyncNetworks(OpenStackSyncStep):
- provides=[Network]
- requested_interval = 0
+ provides=[Network]
+ requested_interval = 0
- def save_network(self, network):
- if not network.network_id:
- if network.template.sharedNetworkName:
- network.network_id = network.template.sharedNetworkId
- (network.subnet_id, network.subnet) = self.driver.get_network_subnet(network.network_id)
- else:
- network_name = network.name
+ def save_network(self, network):
+ if not network.network_id:
+ if network.template.sharedNetworkName:
+ network.network_id = network.template.sharedNetworkId
+ (network.subnet_id, network.subnet) = self.driver.get_network_subnet(network.network_id)
+ else:
+ network_name = network.name
- # create network
- os_network = self.driver.create_network(network_name, shared=True)
- network.network_id = os_network['id']
+ # create network
+ os_network = self.driver.create_network(network_name, shared=True)
+ network.network_id = os_network['id']
- # create router
- router = self.driver.create_router(network_name)
- network.router_id = router['id']
+ # create router
+ router = self.driver.create_router(network_name)
+ network.router_id = router['id']
- # create subnet
- next_subnet = self.get_next_subnet()
- cidr = str(next_subnet.cidr)
- ip_version = next_subnet.version
- start = str(next_subnet[2])
- end = str(next_subnet[-2])
- subnet = self.driver.create_subnet(name=network_name,
- network_id = network.network_id,
- cidr_ip = cidr,
- ip_version = ip_version,
- start = start,
- end = end)
- network.subnet = cidr
- network.subnet_id = subnet['id']
+ # create subnet
+ next_subnet = self.get_next_subnet()
+ cidr = str(next_subnet.cidr)
+ ip_version = next_subnet.version
+ start = str(next_subnet[2])
+ end = str(next_subnet[-2])
+ subnet = self.driver.create_subnet(name=network_name,
+ network_id = network.network_id,
+ cidr_ip = cidr,
+ ip_version = ip_version,
+ start = start,
+ end = end)
+ network.subnet = cidr
+ network.subnet_id = subnet['id']
# add subnet as interface to slice's router
self.driver.add_router_interface(router['id'], subnet['id'])
# add external route
self.driver.add_external_route(subnet)
- def sync_record(self, site):
- if network.owner and network.owner.creator:
- try:
- # update manager context
+ def sync_record(self, site):
+ if network.owner and network.owner.creator:
+ try:
+ # update manager context
real_driver = self.driver
self.driver = self.driver.client_driver(network.owner.creator, network.owner.name)
- self.save_network(network)
+ self.save_network(network)
self.driver = real_driver
- logger.info("saved network: %s" % (network))
- except Exception,e:
- logger.log_exc("save network failed: %s" % network)
- raise e
+ logger.info("saved network: %s" % (network))
+ except Exception,e:
+ logger.log_exc("save network failed: %s" % network)
+ raise e
diff --git a/planetstack/observer/steps/sync_slice_memberships.py b/planetstack/observer/steps/sync_slice_memberships.py
index 66953f1..1ec3a96 100644
--- a/planetstack/observer/steps/sync_slice_memberships.py
+++ b/planetstack/observer/steps/sync_slice_memberships.py
@@ -5,10 +5,10 @@
from core.models.slice import *
class SyncSliceMemberships(OpenStackSyncStep):
- requested_interval=0
- provides=[SliceMembership]
- def sync_record(self, user):
- if slice_memb.user.kuser_id and slice_memb.slice.tenant_id:
- self.driver.add_user_role(slice_memb.user.kuser_id,
- slice_memb.slice.tenant_id,
- slice_memb.role.role_type)
+ requested_interval=0
+ provides=[SliceRole]
+ def sync_record(self, user):
+ if slice_memb.user.kuser_id and slice_memb.slice.tenant_id:
+ self.driver.add_user_role(slice_memb.user.kuser_id,
+ slice_memb.slice.tenant_id,
+ slice_memb.role.role_type)
diff --git a/planetstack/observer/steps/sync_sliver_ips.py b/planetstack/observer/steps/sync_sliver_ips.py
index d231d13..50ec6ad 100644
--- a/planetstack/observer/steps/sync_sliver_ips.py
+++ b/planetstack/observer/steps/sync_sliver_ips.py
@@ -5,21 +5,21 @@
from core.models.sliver import Sliver
class SyncSliverIps(OpenStackSyncStep):
- provides=[Sliver]
- requested_interval=0
- def fetch_pending(self):
- slivers = Sliver.objects.filter(ip=None)
- return slivers
+ provides=[Sliver]
+ requested_interval=0
+ def fetch_pending(self):
+ slivers = Sliver.objects.filter(ip=None)
+ return slivers
- def sync_record(self, sliver):
+ def sync_record(self, sliver):
driver = self.driver.client_driver(tenant=sliver.slice.name)
- servers = driver.shell.nova.servers.findall(id=sliver.instance_id)
- if not servers:
- return
- server = servers[0]
- ips = server.addresses.get(sliver.slice.name, [])
- if not ips:
- return
- sliver.ip = ips[0]['addr']
- sliver.save()
- logger.info("saved sliver ip: %s %s" % (sliver, ips[0]))
+ servers = driver.shell.nova.servers.findall(id=sliver.instance_id)
+ if not servers:
+ return
+ server = servers[0]
+ ips = server.addresses.get(sliver.slice.name, [])
+ if not ips:
+ return
+ sliver.ip = ips[0]['addr']
+ sliver.save()
+ logger.info("saved sliver ip: %s %s" % (sliver, ips[0]))
diff --git a/planetstack/observer/steps/sync_users.py b/planetstack/observer/steps/sync_users.py
index aa665be..dde8a24 100644
--- a/planetstack/observer/steps/sync_users.py
+++ b/planetstack/observer/steps/sync_users.py
@@ -5,32 +5,32 @@
from core.models.user import User
class SyncUsers(OpenStackSyncStep):
- provides=[User]
- requested_interval=0
- def sync_record(self, user):
- name = user.email[:user.email.find('@')]
- user_fields = {'name': name,
- 'email': user.email,
- 'password': hashlib.md5(user.password).hexdigest()[:6],
- 'enabled': True}
- if not user.kuser_id:
- keystone_user = self.driver.create_user(**user_fields)
- user.kuser_id = keystone_user.id
- else:
- self.driver.update_user(user.kuser_id, user_fields)
+ provides=[User]
+ requested_interval=0
+ def sync_record(self, user):
+ name = user.email[:user.email.find('@')]
+ user_fields = {'name': name,
+ 'email': user.email,
+ 'password': hashlib.md5(user.password).hexdigest()[:6],
+ 'enabled': True}
+ if not user.kuser_id:
+ keystone_user = self.driver.create_user(**user_fields)
+ user.kuser_id = keystone_user.id
+ else:
+ self.driver.update_user(user.kuser_id, user_fields)
- if user.site:
- self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'user')
- if user.is_admin:
- self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'admin')
- else:
- # may have admin role so attempt to remove it
- self.driver.delete_user_role(user.kuser_id, user.site.tenant_id, 'admin')
+ if user.site:
+ self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'user')
+ if user.is_admin:
+ self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'admin')
+ else:
+ # may have admin role so attempt to remove it
+ self.driver.delete_user_role(user.kuser_id, user.site.tenant_id, 'admin')
- if user.public_key:
+ if user.public_key:
driver = self.driver.client_driver(caller=user, tenant=user.site.login_base)
key_fields = {'name': user.keyname,
'public_key': user.public_key}
driver.create_keypair(**key_fields)
- user.save()
+ user.save()
diff --git a/planetstack/observer/syncstep.py b/planetstack/observer/syncstep.py
index 9f32621..c8d3e42 100644
--- a/planetstack/observer/syncstep.py
+++ b/planetstack/observer/syncstep.py
@@ -3,60 +3,60 @@
from planetstack.config import Config
class FailedDependency(Exception):
- pass
+ pass
class SyncStep:
- """ A PlanetStack Sync step.
+ """ A PlanetStack Sync step.
- Attributes:
- psmodel Model name the step synchronizes
- dependencies list of names of models that must be synchronized first if the current model depends on them
- """
- slow=False
- def get_prop(prop):
- try:
- sync_config_dir = Config().sync_config_dir
- except:
- sync_config_dir = '/etc/planetstack/sync'
- prop_config_path = '/'.join(sync_config_dir,self.name,prop)
- return open(prop_config_path).read().rstrip()
+ Attributes:
+ psmodel Model name the step synchronizes
+ dependencies list of names of models that must be synchronized first if the current model depends on them
+ """
+ slow=False
+ def get_prop(prop):
+ try:
+ sync_config_dir = Config().sync_config_dir
+ except:
+ sync_config_dir = '/etc/planetstack/sync'
+ prop_config_path = '/'.join(sync_config_dir,self.name,prop)
+ return open(prop_config_path).read().rstrip()
- def __init__(self, **args):
- """Initialize a sync step
- Keyword arguments:
- name -- Name of the step
- provides -- PlanetStack models sync'd by this step
- """
- dependencies = []
+ def __init__(self, **args):
+ """Initialize a sync step
+ Keyword arguments:
+ name -- Name of the step
+ provides -- PlanetStack models sync'd by this step
+ """
+ dependencies = []
self.driver = args.get('driver')
- try:
- self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
- except:
- self.soft_deadline = 5 # 5 seconds
+ try:
+ self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
+ except:
+ self.soft_deadline = 5 # 5 seconds
- return
+ return
- def fetch_pending(self):
- return Sliver.objects.filter(ip=None)
-
- def check_dependencies(self, obj):
- for dep in self.dependencies:
- peer_object = getattr(obj, dep.name.lowercase())
- if (peer_object.pk==dep.pk):
- raise DependencyFailed
+ def fetch_pending(self):
+ return Sliver.objects.filter(ip=None)
+
+ def check_dependencies(self, obj):
+ for dep in self.dependencies:
+ peer_object = getattr(obj, dep.name.lowercase())
+ if (peer_object.pk==dep.pk):
+ raise DependencyFailed
- def call(self, failed=[]):
- pending = self.fetch_pending()
- for o in pending:
- if (not self.depends_on(o, failed)):
- try:
- check_dependencies(o) # Raises exception if failed
- self.sync_record(o)
- o.enacted = datetime.now() # Is this the same timezone? XXX
- o.save(update_fields=['enacted'])
- except:
- failed.append(o)
- return failed
+ def call(self, failed=[]):
+ pending = self.fetch_pending()
+ for o in pending:
+ if (not self.depends_on(o, failed)):
+ try:
+ check_dependencies(o) # Raises exception if failed
+ self.sync_record(o)
+ o.enacted = datetime.now() # Is this the same timezone? XXX
+ o.save(update_fields=['enacted'])
+ except:
+ failed.append(o)
+ return failed
- def __call__(self):
- return self.call()
+ def __call__(self):
+ return self.call()
diff --git a/planetstack/plstackapi_config b/planetstack/plstackapi_config
index deaf2e3..6e0b26c 100644
--- a/planetstack/plstackapi_config
+++ b/planetstack/plstackapi_config
@@ -29,4 +29,4 @@
default_security_group=default
[observer]
-pl_dependency_graph='/opt/planetstack/model-deps'
+pl_dependency_graph=/opt/planetstack/model-deps