Merge branch 'master' of ssh://git.planet-lab.org/git/plstackapi
diff --git a/planetstack/ec2_observer/event_loop.py b/planetstack/ec2_observer/event_loop.py
index a579a94..12e88ab 100644
--- a/planetstack/ec2_observer/event_loop.py
+++ b/planetstack/ec2_observer/event_loop.py
@@ -16,21 +16,21 @@
 from util.logger import Logger, logging, logger
 #from timeout import timeout
 from planetstack.config import Config
-from ec2_observer.steps import *
+from observer.steps import *
 from syncstep import SyncStep
 from toposort import toposort
-from ec2_observer.error_mapper import *
+from observer.error_mapper import *
 
 debug_mode = False
 
 logger = Logger(level=logging.INFO)
 
 class StepNotReady(Exception):
-    pass
+	pass
 
 class NoOpDriver:
-    def __init__(self):
-         self.enabled = True
+	def __init__(self):
+		 self.enabled = True
 
 class PlanetStackObserver:
 	#sync_steps = [SyncNetworks,SyncNetworkSlivers,SyncSites,SyncSitePrivileges,SyncSlices,SyncSliceMemberships,SyncSlivers,SyncSliverIps,SyncExternalRoutes,SyncUsers,SyncRoles,SyncNodes,SyncImages,GarbageCollector]
@@ -98,7 +98,7 @@
 			# This contains dependencies between backend records
 			self.backend_dependency_graph = json.loads(open(backend_path).read())
 		except Exception,e:
-			logger.info('Backend dependency graph not loaded: %s'%str(e))
+			logger.info('Backend dependency graph not loaded')
 			# We can work without a backend graph
 			self.backend_dependency_graph = {}
 
@@ -111,7 +111,6 @@
 				except KeyError:
 					provides_dict[m.__name__]=[s.__name__]
 
-				
 		step_graph = {}
 		for k,v in self.model_dependency_graph.iteritems():
 			try:
@@ -221,6 +220,65 @@
 			if (failed_step in step.dependencies):
 				raise StepNotReady
 
+	def sync(self, ordered_steps, error_mapper, deletion):
+		# Set of whole steps that failed
+		failed_steps = []
+
+		# Set of individual objects within steps that failed
+		failed_step_objects = set()
+
+		for S in ordered_steps:
+			step = self.step_lookup[S]
+			start_time=time.time()
+			
+			sync_step = step(driver=self.driver,error_map=error_mapper)
+			sync_step.__name__ = step.__name__
+			sync_step.dependencies = []
+			try:
+				mlist = sync_step.provides
+				
+				for m in mlist:
+					sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
+			except KeyError:
+				pass
+			sync_step.debug_mode = debug_mode
+
+			should_run = False
+			try:
+				# Various checks that decide whether
+				# this step runs or not
+				self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
+				self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
+				should_run = True
+			except StepNotReady:
+				logging.info('Step not ready: %s'%sync_step.__name__)
+				failed_steps.append(sync_step)
+			except Exception,e:
+				logging.error('%r',e)
+				logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+				failed_steps.append(sync_step)
+
+			if (should_run):
+				try:
+					duration=time.time() - start_time
+
+					logger.info('Executing step %s' % sync_step.__name__)
+
+					# ********* This is the actual sync step
+					#import pdb
+					#pdb.set_trace()
+					failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
+
+
+					self.check_duration(sync_step, duration)
+					if failed_objects:
+						failed_step_objects.update(failed_objects)
+
+					self.update_run_time(sync_step,deletion)
+				except Exception,e:
+					logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
+					logger.log_exc(e)
+					failed_steps.append(S)
 	def run(self):
 		if not self.driver.enabled:
 			return
@@ -238,68 +296,22 @@
 				logger.info('Observer woke up')
 
 				# Two passes. One for sync, the other for deletion.
-				for deletion in (False,True):
+				for deletion in [False,True]:
+					threads = []
 					logger.info('Deletion=%r...'%deletion)
-					# Set of whole steps that failed
-					failed_steps = []
-
-					# Set of individual objects within steps that failed
-					failed_step_objects = set()
-
 					ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps)
+					thread = threading.Thread(target=self.sync, args=(ordered_steps,error_mapper,deletion))
+					logger.info('Deletion=%r...'%deletion)
+					threads.append(thread)
 
-					for S in ordered_steps:
-						step = self.step_lookup[S]
-						start_time=time.time()
-						
-						sync_step = step(driver=self.driver,error_map=error_mapper)
-						sync_step.__name__ = step.__name__
-						sync_step.dependencies = []
-						try:
-							mlist = sync_step.provides
-							
-							for m in mlist:
-								sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
-						except KeyError:
-							pass
-						sync_step.debug_mode = debug_mode
+					# Start threads 
+					for t in threads:
+						t.start()
 
-						should_run = False
-						try:
-							# Various checks that decide whether
-							# this step runs or not
-							self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
-							self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
-							should_run = True
-						except StepNotReady:
-							logging.info('Step not ready: %s'%sync_step.__name__)
-							failed_steps.append(sync_step)
-						except Exception,e:
-							logging.error('%r',e)
-							logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
-							failed_steps.append(sync_step)
+					# Wait for all threads to finish before continuing with the run loop
+					for t in threads:
+						t.join()
 
-						if (should_run):
-							try:
-								duration=time.time() - start_time
-
-								logger.info('Executing step %s' % sync_step.__name__)
-
-								# ********* This is the actual sync step
-								#import pdb
-								#pdb.set_trace()
-								failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
-
-
-								self.check_duration(sync_step, duration)
-								if failed_objects:
-									failed_step_objects.update(failed_objects)
-
-								self.update_run_time(sync_step,deletion)
-							except Exception,e:
-								logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
-								logger.log_exc(e)
-								failed_steps.append(S)
 				self.save_run_times()
 			except Exception, e:
 				logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
diff --git a/planetstack/ec2_observer/syncstep.py b/planetstack/ec2_observer/syncstep.py
index d5f7523..31fec04 100644
--- a/planetstack/ec2_observer/syncstep.py
+++ b/planetstack/ec2_observer/syncstep.py
@@ -86,7 +86,8 @@
                 except:
                     o.backend_status = str(e)
 
-                o.save(update_fields=['backend_status'])
+                if (o.pk):
+                    o.save(update_fields=['backend_status'])
 
                 logger.log_exc("sync step failed!")
                 failed.append(o)
diff --git a/planetstack/ec2_observer/toposort.py b/planetstack/ec2_observer/toposort.py
index a2c9389..6d28f4a 100644
--- a/planetstack/ec2_observer/toposort.py
+++ b/planetstack/ec2_observer/toposort.py
@@ -10,6 +10,103 @@
 from datetime import datetime
 from collections import defaultdict
 
+# Assumes that there are no empty dependencies
+# in the graph. E.g. Foo -> []
+def dfs(graph, visit):
+	nodes = graph.keys()
+	edge_nodes = set()
+
+	for n in nodes:
+		edge_nodes|=set(graph[n])
+
+	print 'Edge nodes=',edge_nodes
+	print 'Nodes=',nodes
+
+	sinks = list(edge_nodes - set(nodes))
+	sources = list(set(nodes) - edge_nodes)
+	
+	print 'Sinks =',sinks
+	print 'Sources =',sources
+
+	nodes.extend(sinks)
+
+	for s in sinks:
+		graph[s]=[]
+
+	visited = set(sources)
+	stack = sources
+	while stack:
+		current = stack.pop()
+		visit(current)
+		for node in graph[current]:
+			if node not in visited:
+				stack.append(node)
+				visited.add(node)
+
+
+# Topological sort + find connected components
+# Notes:
+# - Uses a stack instead of recursion
+# - Forfeits optimization involving tracking currently visited nodes
+
+def toposort_with_components(g, steps=None):
+	# Get set of all nodes, including those without outgoing edges
+	keys = set(g.keys())
+	values = set({})
+	for v in g.values():
+		values=values | set(v)
+	
+	all_nodes=list(keys|values)
+	if (not steps):
+		steps = all_nodes
+
+	# Final order
+	order = {}
+
+	# Index of newest component
+	cidx = 0
+
+	# DFS stack, not using recursion
+	stack = []
+
+	# Unmarked set
+	unmarked = all_nodes
+
+	# visiting = [] - skip, don't expect 1000s of nodes, |E|/|V| is small
+
+	while unmarked:
+		stack.insert(0,unmarked[0]) # push first unmarked
+
+		while (stack):
+			n = stack[0]
+			add = True
+			try:
+				for m in g[n]:
+					if (m in unmarked):
+						if (m not in stack):
+							add = False
+							stack.insert(0,m)
+						else:
+							# Should not happen, if so there's a loop
+							print 'Loop at %s'%m
+			except KeyError:
+				pass
+			if (add):
+				if (n in steps):
+					order.append(n)
+				item = stack.pop(0)
+				unmarked.remove(item)
+
+	noorder = list(set(steps) - set(order))
+
+	# Steps that do not have an order get pushed as
+	# separate components that run in parallel.
+	for s in noorder:
+		order['%d'%cidx] = [s]
+		cidx+=1
+
+	return order + noorder
+
 # Topological sort
 # Notes:
 # - Uses a stack instead of recursion
diff --git a/planetstack/openstack_observer/event_loop.py b/planetstack/openstack_observer/event_loop.py
index 9f4658e..12e88ab 100644
--- a/planetstack/openstack_observer/event_loop.py
+++ b/planetstack/openstack_observer/event_loop.py
@@ -111,7 +111,6 @@
 				except KeyError:
 					provides_dict[m.__name__]=[s.__name__]
 
-				
 		step_graph = {}
 		for k,v in self.model_dependency_graph.iteritems():
 			try:
@@ -221,6 +220,65 @@
 			if (failed_step in step.dependencies):
 				raise StepNotReady
 
+	def sync(self, ordered_steps, error_mapper, deletion):
+		# Set of whole steps that failed
+		failed_steps = []
+
+		# Set of individual objects within steps that failed
+		failed_step_objects = set()
+
+		for S in ordered_steps:
+			step = self.step_lookup[S]
+			start_time=time.time()
+			
+			sync_step = step(driver=self.driver,error_map=error_mapper)
+			sync_step.__name__ = step.__name__
+			sync_step.dependencies = []
+			try:
+				mlist = sync_step.provides
+				
+				for m in mlist:
+					sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
+			except KeyError:
+				pass
+			sync_step.debug_mode = debug_mode
+
+			should_run = False
+			try:
+				# Various checks that decide whether
+				# this step runs or not
+				self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
+				self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
+				should_run = True
+			except StepNotReady:
+				logging.info('Step not ready: %s'%sync_step.__name__)
+				failed_steps.append(sync_step)
+			except Exception,e:
+				logging.error('%r',e)
+				logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
+				failed_steps.append(sync_step)
+
+			if (should_run):
+				try:
+					duration=time.time() - start_time
+
+					logger.info('Executing step %s' % sync_step.__name__)
+
+					# ********* This is the actual sync step
+					#import pdb
+					#pdb.set_trace()
+					failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
+
+
+					self.check_duration(sync_step, duration)
+					if failed_objects:
+						failed_step_objects.update(failed_objects)
+
+					self.update_run_time(sync_step,deletion)
+				except Exception,e:
+					logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
+					logger.log_exc(e)
+					failed_steps.append(S)
 	def run(self):
 		if not self.driver.enabled:
 			return
@@ -238,68 +296,22 @@
 				logger.info('Observer woke up')
 
 				# Two passes. One for sync, the other for deletion.
-				for deletion in (False,True):
+				for deletion in [False,True]:
+					threads = []
 					logger.info('Deletion=%r...'%deletion)
-					# Set of whole steps that failed
-					failed_steps = []
-
-					# Set of individual objects within steps that failed
-					failed_step_objects = set()
-
 					ordered_steps = self.ordered_steps if not deletion else reversed(self.ordered_steps)
+					thread = threading.Thread(target=self.sync, args=(ordered_steps,error_mapper,deletion))
+					logger.info('Deletion=%r...'%deletion)
+					threads.append(thread)
 
-					for S in ordered_steps:
-						step = self.step_lookup[S]
-						start_time=time.time()
-						
-						sync_step = step(driver=self.driver,error_map=error_mapper)
-						sync_step.__name__ = step.__name__
-						sync_step.dependencies = []
-						try:
-							mlist = sync_step.provides
-							
-							for m in mlist:
-								sync_step.dependencies.extend(self.model_dependency_graph[m.__name__])
-						except KeyError:
-							pass
-						sync_step.debug_mode = debug_mode
+					# Start threads 
+					for t in threads:
+						t.start()
 
-						should_run = False
-						try:
-							# Various checks that decide whether
-							# this step runs or not
-							self.check_class_dependency(sync_step, failed_steps) # dont run Slices if Sites failed
-							self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
-							should_run = True
-						except StepNotReady:
-							logging.info('Step not ready: %s'%sync_step.__name__)
-							failed_steps.append(sync_step)
-						except Exception,e:
-							logging.error('%r',e)
-							logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
-							failed_steps.append(sync_step)
+					# Wait for all threads to finish before continuing with the run loop
+					for t in threads:
+						t.join()
 
-						if (should_run):
-							try:
-								duration=time.time() - start_time
-
-								logger.info('Executing step %s' % sync_step.__name__)
-
-								# ********* This is the actual sync step
-								#import pdb
-								#pdb.set_trace()
-								failed_objects = sync_step(failed=list(failed_step_objects), deletion=deletion)
-
-
-								self.check_duration(sync_step, duration)
-								if failed_objects:
-									failed_step_objects.update(failed_objects)
-
-								self.update_run_time(sync_step,deletion)
-							except Exception,e:
-								logging.error('Model step failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
-								logger.log_exc(e)
-								failed_steps.append(S)
 				self.save_run_times()
 			except Exception, e:
 				logging.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!',e)
diff --git a/planetstack/openstack_observer/steps/sync_networks.py b/planetstack/openstack_observer/steps/sync_networks.py
index 84257f7..5174fe6 100644
--- a/planetstack/openstack_observer/steps/sync_networks.py
+++ b/planetstack/openstack_observer/steps/sync_networks.py
@@ -5,6 +5,7 @@
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.network import *
 from util.logger import Logger, logging
+from observer.steps.sync_network_deployments import *
 
 logger = Logger(level=logging.INFO)
 
@@ -16,10 +17,10 @@
         network.save()
 
     def delete_record(self, network):
-        network_deployment_deleter = NetworkDeploymentDeleter()
+        network_deployment_deleter = SyncNetworkDeployments().delete_record
         for network_deployment in NetworkDeployments.objects.filter(network=network):
             try:
-                network_deployment_deleter(network_deployment.id)    
+                network_deployment_deleter(network_deployment)    
             except Exeption,e:
                 logger.log_exc("Failed to delete network deployment %s" % network_deployment)
                 raise e
diff --git a/planetstack/openstack_observer/steps/sync_sites.py b/planetstack/openstack_observer/steps/sync_sites.py
index e7c645e..c560a6a 100644
--- a/planetstack/openstack_observer/steps/sync_sites.py
+++ b/planetstack/openstack_observer/steps/sync_sites.py
@@ -4,6 +4,7 @@
 from planetstack.config import Config
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.site import Site
+from observer.steps.sync_site_deployments import *
 
 class SyncSites(OpenStackSyncStep):
     provides=[Site]
@@ -14,6 +15,6 @@
 
     def delete_record(self, site):
         site_deployments = SiteDeployments.objects.filter(site=site)
-        site_deployment_deleter = SiteDeploymentDeleter()
+        site_deployment_deleter = SyncSiteDeployments().delete_record
         for site_deployment in site_deployments:
-            site_deployment_deleter(site_deployment.id)
+            site_deployment_deleter(site_deployment)
diff --git a/planetstack/openstack_observer/steps/sync_slices.py b/planetstack/openstack_observer/steps/sync_slices.py
index c0b8abe..a6073b6 100644
--- a/planetstack/openstack_observer/steps/sync_slices.py
+++ b/planetstack/openstack_observer/steps/sync_slices.py
@@ -6,6 +6,7 @@
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.slice import Slice, SliceDeployments
 from util.logger import Logger, logging
+from observer.steps.sync_slice_deployments import *
 
 logger = Logger(level=logging.INFO)
 
@@ -20,10 +21,10 @@
             slice_deployment.save()    
 
     def delete_record(self, slice):
-        slice_deployment_deleter = SliceDeploymentDeleter()
+        slice_deployment_deleter = SyncSliceDeployments().delete_record
         for slice_deployment in SliceDeployments.objects.filter(slice=slice):
             try:
-                slice_deployment_deleter(slice_deployment.id)
+                slice_deployment_deleter(slice_deployment)
             except Exception,e:
                 logger.log_exc("Failed to delete slice_deployment %s" % slice_deployment) 
                 raise e
diff --git a/planetstack/openstack_observer/steps/sync_sliver_ips.py b/planetstack/openstack_observer/steps/sync_sliver_ips.py
deleted file mode 100644
index d723da5..0000000
--- a/planetstack/openstack_observer/steps/sync_sliver_ips.py
+++ /dev/null
@@ -1,50 +0,0 @@
-import os
-import base64
-from django.db.models import F, Q
-from planetstack.config import Config
-from observer.openstacksyncstep import OpenStackSyncStep
-from core.models.sliver import Sliver
-from util.logger import Logger, logging
-
-logger = Logger(level=logging.INFO)
-
-class SyncSliverIps(OpenStackSyncStep):
-    provides=[Sliver]
-    requested_interval=0
-
-    def fetch_pending(self, deleted):
-        return [] # XXX smbaker - disabling this sync_step, since sliver.ip is obsoleted by sync_network_slivers()
-
-        # Not supported yet
-        if (deleted):
-            return []
-        slivers = Sliver.objects.filter(ip=None)
-        return slivers
-
-    def sync_record(self, sliver):
-        driver = self.driver.client_driver(tenant=sliver.slice.name,
-                                           deployment=sliver.node.deployment.name)
-        servers = driver.shell.nova.servers.findall(id=sliver.instance_id)
-        if not servers:
-            return
-        server = servers[0]
-
-        # First try to grab the dedicated public address
-        # NOTE: "ext-net" is hardcoded here.
-        ip = None
-        ext_net_addrs = server.addresses.get("ext-net")

-        if ext_net_addrs:

-            ip = ext_net_addrs[0]["addr"]

-

-        # If there was no public address, then grab the first address in the

-        # list.

-        if not ip:

-            if server.addresses:

-                addrs = server.addresses.values()[0]

-                if addrs:

-                    ip = addrs[0]["addr"]
-
-        if ip and ip!=sliver.ip:
-            sliver.ip = ip
-            sliver.save()
-            logger.info("saved sliver ip: %s %s" % (sliver, ip))
diff --git a/planetstack/openstack_observer/steps/sync_users.py b/planetstack/openstack_observer/steps/sync_users.py
index 2852b73..a22c213 100644
--- a/planetstack/openstack_observer/steps/sync_users.py
+++ b/planetstack/openstack_observer/steps/sync_users.py
@@ -6,6 +6,7 @@
 from observer.openstacksyncstep import OpenStackSyncStep
 from core.models.user import User
 from core.models.userdeployments import  UserDeployments
+from observer.steps.sync_user_deployments import SyncUserDeployments
 
 class SyncUsers(OpenStackSyncStep):
     provides=[User]
@@ -18,6 +19,6 @@
             user_deployment.save()
 
     def delete_record(self, user):
-        user_deployment_deleter = UserDeploymentDeleter()
+        user_deployment_deleter = SyncUserDeployments().delete_record
         for user_deployment in UserDeployments.objects.filter(user=user):
-            user_deployment_deleter(user_deployment.id)
+            user_deployment_deleter(user_deployment)
diff --git a/planetstack/openstack_observer/syncstep.py b/planetstack/openstack_observer/syncstep.py
index c77c8d5..ad148b5 100644
--- a/planetstack/openstack_observer/syncstep.py
+++ b/planetstack/openstack_observer/syncstep.py
@@ -4,6 +4,7 @@
 from planetstack.config import Config
 from util.logger import Logger, logging
 from observer.steps import *
+from django.db.models import F, Q
 
 logger = Logger(level=logging.INFO)
 
@@ -48,7 +49,7 @@
         # Steps should override it if they have their own logic
         # for figuring out what objects are outstanding.
         main_obj = self.provides[0]
-        if (not deleted):
+        if (not deletion):
             objs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
         else:
             objs = main_obj.deleted_objects.all()
@@ -59,10 +60,16 @@
     def check_dependencies(self, obj, failed):
         for dep in self.dependencies:
             peer_name = dep[0].lower() + dep[1:]    # django names are camelCased with the first letter lower
-            peer_object = getattr(obj, peer_name)
+            try:
+                peer_object = getattr(obj, peer_name)
+            except:
+                peer_object = None
+
             if (peer_object and peer_object.pk==failed.pk and type(peer_object)==type(failed)):
-                raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed  %s:%s" % (obj.__class__.__name__, str(obj.pk\

-), peer_object.__class__.__name__, str(peer_object.pk), failed.__class__.__name__, str(failed.pk)))
+                if (obj.backend_status!=peer_object.backend_status):
+                    obj.backend_status = peer_object.backend_status
+                    obj.save(update_fields=['backend_status'])
+                raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed  %s:%s" % (obj.__class__.__name__, str(obj.pk), peer_object.__class__.__name__, str(peer_object.pk), failed.__class__.__name__, str(failed.pk)))
 
     def call(self, failed=[], deletion=False):
         pending = self.fetch_pending(deletion)
@@ -79,14 +86,22 @@
                     o.backend_status = "OK"
                     o.save(update_fields=['enacted'])
             except Exception,e:
-                try:
-                    o.backend_status = self.error_map.map(str(e))
-                except:
-                    o.backend_status = str(e)
-
-                o.save(update_fields=['backend_status'])
-
                 logger.log_exc("sync step failed!")
+                str_e = '%r'%e
+                try:
+                    o.backend_status = self.error_map.map(str_e)
+                except:
+                    o.backend_status = str_e
+
+                # TOFIX:
+                # DatabaseError: value too long for type character varying(140)
+                if (o.pk):
+                    try:
+                        o.save(update_fields=['backend_status'])
+                    except:
+                        print "Could not update backend status field!"
+                        pass
+
                 failed.append(o)
 
         return failed