Sync refactored into abstract steps
diff --git a/planetstack/core/models/plcorebase.py b/planetstack/core/models/plcorebase.py
index 30d4df3..00dc2d0 100644
--- a/planetstack/core/models/plcorebase.py
+++ b/planetstack/core/models/plcorebase.py
@@ -8,7 +8,6 @@
created = models.DateTimeField(auto_now_add=True)
updated = models.DateTimeField(auto_now=True)
- enacted = models.DateTimeField(null=True, default=None)
class Meta:
abstract = True
diff --git a/planetstack/observer/event_loop.py b/planetstack/observer/event_loop.py
new file mode 100644
index 0000000..4b11504
--- /dev/null
+++ b/planetstack/observer/event_loop.py
@@ -0,0 +1,161 @@
+import time
+import traceback
+import commands
+import threading
+import json
+
+from datetime import datetime
+from collections import defaultdict
+from core.models import *
+from django.db.models import F, Q
+from openstack.manager import OpenStackManager
+from util.logger import Logger, logging, logger
+#from timeout import timeout
+
+
+logger = Logger(logfile='observer.log', level=logging.INFO)
+
+def toposort(g, steps):
+ reverse = {}
+
+ for k,v in g.items():
+ for rk in v:
+ try:
+ reverse[rk].append(k)
+ except:
+ reverse[rk]=k
+
+ sources = []
+ for k,v in g.items():
+ if not reverse.has_key(k):
+ sources.append(k)
+
+
+ for k,v in reverse.iteritems():
+ if (not v):
+ sources.append(k)
+
+ order = []
+ marked = []
+ while sources:
+ n = sources.pop()
+ try:
+ for m in g[n]:
+ if m not in marked:
+ sources.append(m)
+ marked.append(m)
+ except KeyError:
+ pass
+ if (n in steps):
+ order.append(n)
+
+ return order
+
+class PlanetStackObserver:
+ sync_steps = ['SyncNetworks','SyncNetworkSlivers','SyncSites','SyncSitePrivileges','SyncSlices','SyncSliceMemberships','SyncSlivers','SyncSliverIps']
+
+ def __init__(self):
+ self.manager = OpenStackManager()
+ # The Condition object that gets signalled by Feefie events
+ self.load_sync_steps()
+ self.event_cond = threading.Condition()
+ self.load_enacted()
+
+ def wait_for_event(self, timeout):
+ self.event_cond.acquire()
+ self.event_cond.wait(timeout)
+ self.event_cond.release()
+
+ def wake_up(self):
+ logger.info('Wake up routine called. Event cond %r'%self.event_cond)
+ self.event_cond.acquire()
+ self.event_cond.notify()
+ self.event_cond.release()
+
+ def load_sync_steps(self):
+ dep_path = Config().pl_dependency_path
+ try:
+ # This contains dependencies between records, not sync steps
+ self.model_dependency_graph = json.loads(open(dep_path).read())
+ except Exception,e:
+ raise e
+
+ backend_path = Config().backend_dependency_path
+ try:
+ # This contains dependencies between backend records
+ self.backend_dependency_graph = json.loads(open(backend_path).read())
+ except Exception,e:
+ raise e
+
+ provides_dict = {}
+ for s in sync_steps:
+ for m in s.provides:
+ provides_dict[m]=s.__name__
+
+ step_graph = {}
+ for k,v in model_dependency_graph.iteritems():
+ try:
+ source = provides_dict[k]
+ for m in v:
+ try:
+ dest = provides_dict[m]
+ except KeyError:
+ # no deps, pass
+ step_graph[source]=dest
+
+ except KeyError:
+ pass
+ # no dependencies, pass
+
+ if (backend_dependency_graph):
+ backend_dict = {}
+ for s in sync_steps:
+ for m in s.serves:
+ backend_dict[m]=s.__name__
+
+ for k,v in backend_dependency_graph.iteritems():
+ try:
+ source = backend_dict[k]
+ for m in v:
+ try:
+ dest = backend_dict[m]
+ except KeyError:
+ # no deps, pass
+ step_graph[source]=dest
+
+ except KeyError:
+ pass
+ # no dependencies, pass
+
+ dependency_graph = step_graph
+
+ self.ordered_steps = toposort(dependency_graph, steps)
+
+
+ def run(self):
+ if not self.manager.enabled or not self.manager.has_openstack:
+ return
+
+
+ while True:
+ try:
+ start_time=time.time()
+
+ logger.info('Waiting for event')
+ tBeforeWait = time.time()
+ self.wait_for_event(timeout=300)
+
+ for S in self.ordered_steps:
+ sync_step = S()
+ sync_step()
+
+ # Enforce 5 minutes between wakeups
+ tSleep = 300 - (time.time() - tBeforeWait)
+ if tSleep > 0:
+ logger.info('Sleeping for %d seconds' % tSleep)
+ time.sleep(tSleep)
+
+ logger.info('Observer woke up')
+ except:
+ logger.log_exc("Exception in observer run loop")
+ traceback.print_exc()
diff --git a/planetstack/observer/openstacksyncstep.py b/planetstack/observer/openstacksyncstep.py
new file mode 100644
index 0000000..7bfe9f4
--- /dev/null
+++ b/planetstack/observer/openstacksyncstep.py
@@ -0,0 +1,27 @@
+import os
+import base64
+from syncstep import SyncStep
+
+class OpenStackSyncStep:
+ """ PlanetStack Sync step for copying data to OpenStack
+ """
+
+ def __init__(self, **args):
+ super(SyncStep,self).__init__(**args)
+ return
+
+ def call(self):
+ pending = self.fetch_pending()
+ failed = []
+ for o in pending:
+ if (not self.depends_on(o, failed)):
+ try:
+ self.sync_record(o)
+ o.enacted = datetime.now() # Is this the same timezone? XXX
+ o.save(update_fields=['enacted'])
+ except:
+ failed.append(o)
+
+
+ def __call__(self):
+ return self.call()
diff --git a/planetstack/observer/steps/__init__.py b/planetstack/observer/steps/__init__.py
new file mode 100644
index 0000000..a239587
--- /dev/null
+++ b/planetstack/observer/steps/__init__.py
@@ -0,0 +1,10 @@
+from .syncexternalroutes import SyncExternalRoutes
+from .syncnetworkslivers import SyncNetworkSlivers
+from .syncnetworks import SyncNetworks
+from .syncsiteprivileges import SyncSitePrivileges
+from .syncsites import SyncSites
+from .syncslicememberships import SyncSliceMemberships
+from .syncslices import SyncSlices
+from .syncsliverips import SyncSliverIps
+from .syncslivers import SyncSlivers
+from .syncusers import SyncUsers
diff --git a/planetstack/observer/steps/sync_external_routes.py b/planetstack/observer/steps/sync_external_routes.py
new file mode 100644
index 0000000..ba4f939
--- /dev/null
+++ b/planetstack/observer/steps/sync_external_routes.py
@@ -0,0 +1,14 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncExternalRoutes(SyncStep):
+ # XXX what does this provide?
+ def call(self):
+ routes = self.manager.driver.get_external_routes()
+ subnets = self.manager.driver.shell.quantum.list_subnets()['subnets']
+ for subnet in subnets:
+ try:
+ self.manager.driver.add_external_route(subnet, routes)
+ except:
+ logger.log_exc("failed to add external route for subnet %s" % subnet)
diff --git a/planetstack/observer/steps/sync_network_slivers.py b/planetstack/observer/steps/sync_network_slivers.py
new file mode 100644
index 0000000..414a260
--- /dev/null
+++ b/planetstack/observer/steps/sync_network_slivers.py
@@ -0,0 +1,73 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncNetworkSlivers(OpenStackSyncStep):
+ slow=True
+ provides=[NetworkSliver]
+
+ def call(self):
+ networkSlivers = NetworkSliver.objects.all()
+ networkSlivers_by_id = {}
+ networkSlivers_by_port = {}
+ for networkSliver in networkSlivers:
+ networkSlivers_by_id[networkSliver.id] = networkSliver
+ networkSlivers_by_port[networkSliver.port_id] = networkSliver
+
+ networks = Network.objects.all()
+ networks_by_id = {}
+ for network in networks:
+ networks_by_id[network.network_id] = network
+
+ slivers = Sliver.objects.all()
+ slivers_by_instance_id = {}
+ for sliver in slivers:
+ slivers_by_instance_id[sliver.instance_id] = sliver
+
+ ports = self.manager.driver.shell.quantum.list_ports()["ports"]
+ for port in ports:
+ if port["id"] in networkSlivers_by_port:
+ # we already have it
+ print "already accounted for port", port["id"]
+ continue
+
+ if port["device_owner"] != "compute:nova":
+ # we only want the ports that connect to instances
+ continue
+
+ network = networks_by_id.get(port['network_id'], None)
+ if not network:
+ #print "no network for port", port["id"], "network", port["network_id"]
+ continue
+
+ sliver = slivers_by_instance_id.get(port['device_id'], None)
+ if not sliver:
+ print "no sliver for port", port["id"], "device_id", port['device_id']
+ continue
+
+ if network.template.sharedNetworkId is not None:
+ # If it's a shared network template, then more than one network
+ # object maps to the quantum network. We have to do a whole bunch
+ # of extra work to find the right one.
+ networks = network.template.network_set.all()
+ network = None
+ for candidate_network in networks:
+ if (candidate_network.owner == sliver.slice):
+ print "found network", candidate_network
+ network = candidate_network
+
+ if not network:
+ print "failed to find the correct network for a shared template for port", port["id"], "network", port["network_id"]
+ continue
+
+ if not port["fixed_ips"]:
+ print "port", port["id"], "has no fixed_ips"
+ continue
+
+# print "XXX", port
+
+ ns = NetworkSliver(network=network,
+ sliver=sliver,
+ ip=port["fixed_ips"][0]["ip_address"],
+ port_id=port["id"])
+ ns.save()
diff --git a/planetstack/observer/steps/sync_networks.py b/planetstack/observer/steps/sync_networks.py
new file mode 100644
index 0000000..7ae7dc2
--- /dev/null
+++ b/planetstack/observer/steps/sync_networks.py
@@ -0,0 +1,50 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncNetworks(OpenStackSyncStep):
+ provides=[Network]
+
+ def save_network(self, network):
+ if not network.network_id:
+ if network.template.sharedNetworkName:
+ network.network_id = network.template.sharedNetworkId
+ (network.subnet_id, network.subnet) = self.driver.get_network_subnet(network.network_id)
+ else:
+ network_name = network.name
+
+ # create network
+ os_network = self.driver.create_network(network_name, shared=True)
+ network.network_id = os_network['id']
+
+ # create router
+ router = self.driver.create_router(network_name)
+ network.router_id = router['id']
+
+ # create subnet
+ next_subnet = self.get_next_subnet()
+ cidr = str(next_subnet.cidr)
+ ip_version = next_subnet.version
+ start = str(next_subnet[2])
+ end = str(next_subnet[-2])
+ subnet = self.driver.create_subnet(name=network_name,
+ network_id = network.network_id,
+ cidr_ip = cidr,
+ ip_version = ip_version,
+ start = start,
+ end = end)
+ network.subnet = cidr
+ network.subnet_id = subnet['id']
+
+ @require_enabled
+ def sync_record(self, site):
+ if network.owner and network.owner.creator:
+ try:
+ # update manager context
+ self.driver.init_caller(network.owner.creator, network.owner.name)
+ self.save_network(network)
+ logger.info("saved network: %s" % (network))
+ except Exception,e:
+ logger.log_exc("save network failed: %s" % network)
+ raise e
+
diff --git a/planetstack/observer/steps/sync_site_privileges.py b/planetstack/observer/steps/sync_site_privileges.py
new file mode 100644
index 0000000..8f8f8ac
--- /dev/null
+++ b/planetstack/observer/steps/sync_site_privileges.py
@@ -0,0 +1,11 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncSitePrivileges(OpenStackSyncStep):
+ provides=[SitePrivilege]
+ def sync_record(self, user):
+ if site_priv.user.kuser_id and site_priv.site.tenant_id:
+ self.driver.add_user_role(site_priv.user.kuser_id,
+ site_priv.site.tenant_id,
+ site_priv.role.role_type)
diff --git a/planetstack/observer/steps/sync_sites.py b/planetstack/observer/steps/sync_sites.py
new file mode 100644
index 0000000..5d7cc30
--- /dev/null
+++ b/planetstack/observer/steps/sync_sites.py
@@ -0,0 +1,26 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncSites(OpenStackSyncStep):
+ provides=[Site]
+ def sync_record(self, site):
+ save_site = False
+ if not site.tenant_id:
+ tenant = self.driver.create_tenant(tenant_name=site.login_base,
+ description=site.name,
+ enabled=site.enabled)
+ site.tenant_id = tenant.id
+ save_site = True
+ # XXX - What's caller?
+ # self.driver.add_user_role(self.caller.kuser_id, tenant.id, 'admin')
+
+ # update the record
+ if site.id and site.tenant_id:
+ self.driver.update_tenant(site.tenant_id,
+ description=site.name,
+ enabled=site.enabled)
+
+ if (save_site):
+ site.save() #
+
diff --git a/planetstack/observer/steps/sync_slice_memberships.py b/planetstack/observer/steps/sync_slice_memberships.py
new file mode 100644
index 0000000..a0c83eb
--- /dev/null
+++ b/planetstack/observer/steps/sync_slice_memberships.py
@@ -0,0 +1,12 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncSliceMemberships(OpenStackSyncStep):
+ provides=[SliceMembership]
+ def sync_record(self, user):
+ if slice_memb.user.kuser_id and slice_memb.slice.tenant_id:
+ self.driver.add_user_role(slice_memb.user.kuser_id,
+ slice_memb.slice.tenant_id,
+ slice_memb.role.role_type)
+
diff --git a/planetstack/observer/steps/sync_slices.py b/planetstack/observer/steps/sync_slices.py
new file mode 100644
index 0000000..736fde6
--- /dev/null
+++ b/planetstack/observer/steps/sync_slices.py
@@ -0,0 +1,55 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncSlices(OpenStackSyncStep):
+ provides=[Slice]
+ def sync_record(self, slice):
+ if not slice.tenant_id:
+ nova_fields = {'tenant_name': slice.name,
+ 'description': slice.description,
+ 'enabled': slice.enabled}
+ tenant = self.driver.create_tenant(**nova_fields)
+ slice.tenant_id = tenant.id
+
+ # XXX give caller an admin role at the tenant they've created
+ self.driver.add_user_role(self.caller.kuser_id, tenant.id, 'admin')
+
+ # refresh credentials using this tenant
+ self.driver.shell.connect(username=self.driver.shell.keystone.username,
+ password=self.driver.shell.keystone.password,
+ tenant=tenant.name)
+
+ # create network
+ network = self.driver.create_network(slice.name)
+ slice.network_id = network['id']
+
+ # create router
+ router = self.driver.create_router(slice.name)
+ slice.router_id = router['id']
+
+ # create subnet
+ next_subnet = self.get_next_subnet()
+ cidr = str(next_subnet.cidr)
+ ip_version = next_subnet.version
+ start = str(next_subnet[2])
+ end = str(next_subnet[-2])
+ subnet = self.driver.create_subnet(name=slice.name,
+ network_id = network['id'],
+ cidr_ip = cidr,
+ ip_version = ip_version,
+ start = start,
+ end = end)
+ slice.subnet_id = subnet['id']
+ # add subnet as interface to slice's router
+ self.driver.add_router_interface(router['id'], subnet['id'])
+ # add external route
+ self.driver.add_external_route(subnet)
+
+
+ if slice.id and slice.tenant_id:
+ self.driver.update_tenant(slice.tenant_id,
+ description=slice.description,
+ enabled=slice.enabled)
+
+ slice.save()
diff --git a/planetstack/observer/steps/sync_sliver_ips.py b/planetstack/observer/steps/sync_sliver_ips.py
new file mode 100644
index 0000000..4421ca5
--- /dev/null
+++ b/planetstack/observer/steps/sync_sliver_ips.py
@@ -0,0 +1,22 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncSliverIps(OpenStackSyncStep):
+ provides=[Sliver]
+ def fetch_pending(self):
+ slivers = Sliver.objects.filter(ip=None)
+ return slivers
+
+ def sync_record(self, sliver):
+ self.manager.init_admin(tenant=sliver.slice.name)
+ servers = self.manager.driver.shell.nova.servers.findall(id=sliver.instance_id)
+ if not servers:
+ continue
+ server = servers[0]
+ ips = server.addresses.get(sliver.slice.name, [])
+ if not ips:
+ continue
+ sliver.ip = ips[0]['addr']
+ sliver.save()
+ logger.info("saved sliver ip: %s %s" % (sliver, ips[0]))
diff --git a/planetstack/observer/steps/sync_slivers.py b/planetstack/observer/steps/sync_slivers.py
new file mode 100644
index 0000000..a8ef822
--- /dev/null
+++ b/planetstack/observer/steps/sync_slivers.py
@@ -0,0 +1,26 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncSlivers(OpenStackSyncStep):
+ provides=[Sliver]
+ def sync_record(self, slice):
+ if not sliver.instance_id:
+ nics = self.get_requested_networks(sliver.slice)
+ file("/tmp/scott-manager","a").write("slice: %s\nreq: %s\n" % (str(sliver.slice.name), str(nics)))
+ slice_memberships = SliceMembership.objects.filter(slice=sliver.slice)
+ pubkeys = [sm.user.public_key for sm in slice_memberships if sm.user.public_key]
+ pubkeys.append(sliver.creator.public_key)
+ instance = self.driver.spawn_instance(name=sliver.name,
+ key_name = sliver.creator.keyname,
+ image_id = sliver.image.image_id,
+ hostname = sliver.node.name,
+ pubkeys = pubkeys,
+ nics = nics )
+ sliver.instance_id = instance.id
+ sliver.instance_name = getattr(instance, 'OS-EXT-SRV-ATTR:instance_name')
+
+ if sliver.instance_id and ("numberCores" in sliver.changed_fields):
+ self.driver.update_instance_metadata(sliver.instance_id, {"cpu_cores": str(sliver.numberCores)})
+
+ sliver.save()
diff --git a/planetstack/observer/steps/sync_users.py b/planetstack/observer/steps/sync_users.py
new file mode 100644
index 0000000..af1bc30
--- /dev/null
+++ b/planetstack/observer/steps/sync_users.py
@@ -0,0 +1,32 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncUsers(OpenStackSyncStep):
+ provides=[User]
+ def sync_record(self, user):
+ name = user.email[:user.email.find('@')]
+ user_fields = {'name': name,
+ 'email': user.email,
+ 'password': hashlib.md5(user.password).hexdigest()[:6],
+ 'enabled': True}
+ if not user.kuser_id:
+ keystone_user = self.driver.create_user(**user_fields)
+ user.kuser_id = keystone_user.id
+ else:
+ self.driver.update_user(user.kuser_id, user_fields)
+
+ if user.site:
+ self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'user')
+ if user.is_admin:
+ self.driver.add_user_role(user.kuser_id, user.site.tenant_id, 'admin')
+ else:
+ # may have admin role so attempt to remove it
+ self.driver.delete_user_role(user.kuser_id, user.site.tenant_id, 'admin')
+
+ if user.public_key:
+ self.init_caller(user, user.site.login_base)
+ self.save_key(user.public_key, user.keyname)
+ self.init_admin()
+
+ user.save()
diff --git a/planetstack/observer/syncstep.py b/planetstack/observer/syncstep.py
new file mode 100644
index 0000000..b206106
--- /dev/null
+++ b/planetstack/observer/syncstep.py
@@ -0,0 +1,41 @@
+import os
+import base64
+from planetstack.config import Config
+
+class SyncStep:
+ """ A PlanetStack Sync step.
+
+ Attributes:
+ psmodel Model name the step synchronizes
+ dependencies list of names of models that must be synchronized first if the current model depends on them
+ """
+ slow=False
+ def get_prop(prop):
+ try:
+ sync_config_dir = Config().sync_config_dir
+ except:
+ sync_config_dir = '/etc/planetstack/sync'
+ prop_config_path = '/'.join(sync_config_dir,self.name,prop)
+ return open(prop_config_path).read().rstrip()
+
+ def __init__(self, **args):
+ """Initialize a sync step
+ Keyword arguments:
+ name -- Name of the step
+ provides -- PlanetStack models sync'd by this step
+ """
+ try:
+ self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
+ except:
+ self.soft_deadline = 5 # 5 seconds
+
+ return
+
+ def fetch_pending(self):
+ return Sliver.objects.filter(ip=None)
+
+ def call(self):
+ return True
+
+ def __call__(self):
+ return self.call()
diff --git a/planetstack/observer/toposort.py b/planetstack/observer/toposort.py
new file mode 100755
index 0000000..34bf6f5
--- /dev/null
+++ b/planetstack/observer/toposort.py
@@ -0,0 +1,48 @@
+#!/usr/bin/python
+
+import time
+import traceback
+import commands
+import threading
+import json
+
+from datetime import datetime
+from collections import defaultdict
+
+def toposort(g, steps):
+ reverse = {}
+
+ for k,v in g.items():
+ for rk in v:
+ try:
+ reverse[rk].append(k)
+ except:
+ reverse[rk]=k
+
+ sources = []
+ for k,v in g.items():
+ if not reverse.has_key(k):
+ sources.append(k)
+
+
+ for k,v in reverse.iteritems():
+ if (not v):
+ sources.append(k)
+
+ order = []
+ marked = []
+ while sources:
+ n = sources.pop()
+ try:
+ for m in g[n]:
+ if m not in marked:
+ sources.append(m)
+ marked.append(m)
+ except KeyError:
+ pass
+ if (n in steps):
+ order.append(n)
+
+ return order
+
+print toposort({'a':'b','b':'c','c':'d','d':'c'},['d','c','b','a'])
diff --git a/planetstack/openstack/observer.py b/planetstack/openstack/observer.py
deleted file mode 100644
index 977aa86..0000000
--- a/planetstack/openstack/observer.py
+++ /dev/null
@@ -1,479 +0,0 @@
-import time
-import traceback
-import commands
-import threading
-
-from datetime import datetime
-from collections import defaultdict
-from core.models import *
-from django.db.models import F, Q
-from openstack.manager import OpenStackManager
-from util.logger import Logger, logging, logger
-#from timeout import timeout
-
-
-logger = Logger(logfile='observer.log', level=logging.INFO)
-
-class PlanetStackObserver:
-
- def __init__(self):
- self.manager = OpenStackManager()
- # The Condition object that gets signalled by Feefie events
- self.event_cond = threading.Condition()
-
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r'%self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
-
- def run(self):
- if not self.manager.enabled or not self.manager.has_openstack:
- return
- while True:
- try:
- start_time=time.time()
- logger.info('Observer run loop')
- #self.sync_roles()
-
- logger.info('Calling sync tenants')
- try:
- self.sync_tenants()
- except:
- logger.log_exc("Exception in sync_tenants")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync tenants took %f seconds'%(finish_time-start_time))
-
- logger.info('Calling sync users')
- try:
- self.sync_users()
- except:
- logger.log_exc("Exception in sync_users")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync users took %f seconds'%(finish_time-start_time))
-
- logger.info('Calling sync tenant roles')
- try:
- self.sync_user_tenant_roles()
- except:
- logger.log_exc("Exception in sync_users")
- traceback.print_exc()
-
- logger.info('Calling sync slivers')
- try:
- self.sync_slivers()
- except:
- logger.log_exc("Exception in sync slivers")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync slivers took %f seconds'%(finish_time-start_time))
-
- logger.info('Calling sync sliver ips')
- try:
- self.sync_sliver_ips()
- except:
- logger.log_exc("Exception in sync_sliver_ips")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync sliver ips took %f seconds'%(finish_time-start_time))
-
- logger.info('Calling sync networks')
- try:
- self.sync_networks()
- except:
- logger.log_exc("Exception in sync_networks")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync networks took %f seconds'%(finish_time-start_time))
-
- logger.info('Calling sync network slivers')
- try:
- self.sync_network_slivers()
- except:
- logger.log_exc("Exception in sync_network_slivers")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync network sliver ips took %f seconds'%(finish_time-start_time))
-
- logger.info('Calling sync external routes')
- try:
- self.sync_external_routes()
- except:
- logger.log_exc("Exception in sync_external_routes")
- traceback.print_exc()
- finish_time = time.time()
- logger.info('Sync external routes took %f seconds'%(finish_time-start_time))
-
- logger.info('Waiting for event')
- tBeforeWait = time.time()
- self.wait_for_event(timeout=300)
-
- # Enforce 5 minutes between wakeups
- tSleep = 300 - (time.time() - tBeforeWait)
- if tSleep > 0:
- logger.info('Sleeping for %d seconds' % tSleep)
- time.sleep(tSleep)
-
- logger.info('Observer woken up')
- except:
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
-
- def sync_roles(self):
- """
- save all role that don't already exist in keystone. Remove keystone roles that
- don't exist in planetstack
- """
- # sync all roles that don't already in keystone
- keystone_roles = self.manager.driver.shell.keystone.roles.findall()
- keystone_role_names = [kr.name for kr in keystone_roles]
- pending_roles = Role.objects.all()
- pending_role_names = [r.role_type for r in pending_roles]
- for role in pending_roles:
- if role.role_type not in keystone_role_names:
- try:
- self.manager.save_role(role)
- logger.info("save role: %s" % (role))
- except:
- logger.log_exc("save role failed: %s" % role)
- traceback.print_exc()
-
- # don't delete roles for now
- """
- # delete keystone roles that don't exist in planetstack
- for keystone_role in keystone_roles:
- if keystone_role.name == 'admin':
- continue
- if keystone_role.name not in pending_role_names:
- try:
- self.manager.driver.delete_role({id: keystone_role.id})
- except:
- traceback.print_exc()
- """
-
- def sync_tenants(self):
- """
- Save all sites and sliceswhere enacted < updated or enacted == None.
- Remove sites and slices that no don't exist in openstack db if they
- have an enacted time (enacted != None).
- """
- # get all sites that need to be synced (enacted < updated or enacted is None)
- pending_sites = Site.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for site in pending_sites:
- try:
- self.manager.save_site(site)
- logger.info("saved site %s" % site)
- except:
- logger.log_exc("save site failed: %s" % site)
-
- # get all slices that need to be synced (enacted < updated or enacted is None)
- pending_slices = Slice.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for slice in pending_slices:
- try:
- self.manager.init_caller(slice.creator, slice.creator.site.login_base)
- self.manager.save_slice(slice)
- logger.info("saved slice %s" % slice)
- except:
- logger.log_exc("save slice failed: %s" % slice)
-
- # get all sites that where enacted != null. We can assume these sites
- # have previously been synced and need to be checed for deletion.
- sites = Site.objects.filter(enacted__isnull=False)
- site_dict = {}
- for site in sites:
- site_dict[site.login_base] = site
-
- # get all slices that where enacted != null. We can assume these slices
- # have previously been synced and need to be checed for deletion.
- slices = Slice.objects.filter(enacted__isnull=False)
- slice_dict = {}
- for slice in slices:
- slice_dict[slice.name] = slice
-
- # delete keystone tenants that don't have a site record
- tenants = self.manager.driver.shell.keystone.tenants.findall()
- system_tenants = ['admin','service']
- for tenant in tenants:
- if tenant.name in system_tenants:
- continue
- if tenant.name not in site_dict and tenant.name not in slice_dict:
- try:
- self.manager.driver.delete_tenant(tenant.id)
- logger.info("deleted tenant: %s" % (tenant))
- except:
- logger.log_exc("delete tenant failed: %s" % tenant)
-
-
- def sync_users(self):
- """
- save all users where enacted < updated or enacted == None. Remove users that
- no don't exist in openstack db if they have an enacted time (enacted != None).
- """
- # get all users that need to be synced (enacted < updated or enacted is None)
- pending_users = User.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for user in pending_users:
- try:
- self.manager.save_user(user)
- logger.info("saved user: %s" % (user))
- except:
- logger.log_exc("save user failed: %s" %user)
-
- # get all users that where enacted != null. We can assume these users
- # have previously been synced and need to be checed for deletion.
- users = User.objects.filter(enacted__isnull=False)
- user_dict = {}
- for user in users:
- user_dict[user.kuser_id] = user
-
- # delete keystone users that don't have a user record
- system_users = ['admin', 'nova', 'quantum', 'glance', 'cinder', 'swift', 'service']
- users = self.manager.driver.shell.keystone.users.findall()
- for user in users:
- if user.name in system_users:
- continue
- if user.id not in user_dict:
- try:
- #self.manager.driver.delete_user(user.id)
- logger.info("deleted user: %s" % user)
- except:
- logger.log_exc("delete user failed: %s" % user)
-
-
- def sync_user_tenant_roles(self):
- """
- Save all site privileges and slice memberships wheree enacted < updated or
- enacted == None. Remove ones that don't exist in openstack db if they have
- an enacted time (enacted != None).
- """
- # sync site privileges
- pending_site_privileges = SitePrivilege.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for site_priv in pending_site_privileges:
- try:
- self.manager.save_site_privilege(site_priv)
- logger.info("saved site privilege: %s" % (site_priv))
- except: logger.log_exc("save site privilege failed: %s " % site_priv)
-
- # sync slice memberships
- pending_slice_memberships = SliceMembership.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for slice_memb in pending_slice_memberships:
- try:
- self.manager.save_slice_membership(slice_memb)
- logger.info("saved slice membership: %s" % (slice_memb))
- except: logger.log_exc("save slice membership failed: %s" % slice_memb)
-
- # get all site privileges and slice memberships that have been enacted
- user_tenant_roles = defaultdict(list)
- for site_priv in SitePrivilege.objects.filter(enacted__isnull=False):
- user_tenant_roles[(site_priv.user.kuser_id, site_priv.site.tenant_id)].append(site_priv.role.role)
- for slice_memb in SliceMembership.objects.filter(enacted__isnull=False):
- user_tenant_roles[(slice_memb.user.kuser_id, slice_memb.slice.tenant_id)].append(slice_memb.role.role)
-
- # Some user tenant role aren't stored in planetstack but they must be preserved.
- # Role that fall in this category are
- # 1. Never remove a user's role that their home site
- # 2. Never remove a user's role at a slice they've created.
- # Keep track of all roles that must be preserved.
- users = User.objects.all()
- preserved_roles = {}
- for user in users:
- tenant_ids = [s['tenant_id'] for s in user.slices.values()]
- tenant_ids.append(user.site.tenant_id)
- preserved_roles[user.kuser_id] = tenant_ids
-
-
- # begin removing user tenant roles from keystone. This is stored in the
- # Metadata table.
- for metadata in self.manager.driver.shell.keystone_db.get_metadata():
- # skip admin roles
- if metadata.user_id == self.manager.driver.admin_user.id:
- continue
- # skip preserved tenant ids
- if metadata.user_id in preserved_roles and \
- metadata.tenant_id in preserved_roles[metadata.user_id]:
- continue
- # get roles for user at this tenant
- user_tenant_role_ids = user_tenant_roles.get((metadata.user_id, metadata.tenant_id), [])
-
- if user_tenant_role_ids:
- # The user has roles at the tenant. Check if roles need to
- # be updated.
- user_keystone_role_ids = metadata.data.get('roles', [])
- for role_id in user_keystone_role_ids:
- if role_id not in user_tenant_role_ids:
- user_keystone_role_ids.pop(user_keystone_role_ids.index(role_id))
- else:
- # The user has no roles at this tenant.
- metadata.data['roles'] = []
- #session.add(metadata)
- logger.info("pruning metadata for %s at %s" % (metadata.user_id, metadata.tenant_id))
-
- def sync_slivers(self):
- """
- save all slivers where enacted < updated or enacted == None. Remove slivers that
- no don't exist in openstack db if they have an enacted time (enacted != None).
- """
- # get all users that need to be synced (enacted < updated or enacted is None)
- pending_slivers = Sliver.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for sliver in pending_slivers:
- if sliver.creator:
- try:
- # update manager context
- self.manager.init_caller(sliver.creator, sliver.slice.name)
- self.manager.save_sliver(sliver)
- logger.info("saved sliver: %s" % (sliver))
- except:
- logger.log_exc("save sliver failed: %s" % sliver)
-
- # get all slivers where enacted != null. We can assume these users
- # have previously been synced and need to be checed for deletion.
- slivers = Sliver.objects.filter(enacted__isnull=False)
- sliver_dict = {}
- for sliver in slivers:
- sliver_dict[sliver.instance_id] = sliver
-
- # delete sliver that don't have a sliver record
- ctx = self.manager.driver.shell.nova_db.ctx
- instances = self.manager.driver.shell.nova_db.instance_get_all(ctx)
- for instance in instances:
- if instance.uuid not in sliver_dict:
- try:
- # lookup tenant and update context
- try:
- tenant = self.manager.driver.shell.keystone.tenants.find(id=instance.project_id)
- tenant_name = tenant.name
- except:
- tenant_name = None
- logger.info("exception while retrieving tenant %s. Deleting instance using root tenant." % instance.project_id)
- self.manager.init_admin(tenant=tenant_name)
- self.manager.driver.destroy_instance(instance.uuid)
- logger.info("destroyed sliver: %s" % (instance.uuid))
- except:
- logger.log_exc("destroy sliver failed: %s" % instance)
-
-
- def sync_sliver_ips(self):
- # fill in null ip addresses
- slivers = Sliver.objects.filter(ip=None)
- for sliver in slivers:
- # update connection
- self.manager.init_admin(tenant=sliver.slice.name)
- servers = self.manager.driver.shell.nova.servers.findall(id=sliver.instance_id)
- if not servers:
- continue
- server = servers[0]
- ips = server.addresses.get(sliver.slice.name, [])
- if not ips:
- continue
- sliver.ip = ips[0]['addr']
- sliver.save()
- logger.info("saved sliver ip: %s %s" % (sliver, ips[0]))
-
- def sync_external_routes(self):
- routes = self.manager.driver.get_external_routes()
- subnets = self.manager.driver.shell.quantum.list_subnets()['subnets']
- for subnet in subnets:
- try:
- self.manager.driver.add_external_route(subnet, routes)
- except:
- logger.log_exc("failed to add external route for subnet %s" % subnet)
-
- def sync_network_slivers(self):
- networkSlivers = NetworkSliver.objects.all()
- networkSlivers_by_id = {}
- networkSlivers_by_port = {}
- for networkSliver in networkSlivers:
- networkSlivers_by_id[networkSliver.id] = networkSliver
- networkSlivers_by_port[networkSliver.port_id] = networkSliver
-
- networks = Network.objects.all()
- networks_by_id = {}
- for network in networks:
- networks_by_id[network.network_id] = network
-
- slivers = Sliver.objects.all()
- slivers_by_instance_id = {}
- for sliver in slivers:
- slivers_by_instance_id[sliver.instance_id] = sliver
-
- ports = self.manager.driver.shell.quantum.list_ports()["ports"]
- for port in ports:
- if port["id"] in networkSlivers_by_port:
- # we already have it
- print "already accounted for port", port["id"]
- continue
-
- if port["device_owner"] != "compute:nova":
- # we only want the ports that connect to instances
- continue
-
- network = networks_by_id.get(port['network_id'], None)
- if not network:
- #print "no network for port", port["id"], "network", port["network_id"]
- continue
-
- sliver = slivers_by_instance_id.get(port['device_id'], None)
- if not sliver:
- print "no sliver for port", port["id"], "device_id", port['device_id']
- continue
-
- if network.template.sharedNetworkId is not None:
- # If it's a shared network template, then more than one network
- # object maps to the quantum network. We have to do a whole bunch
- # of extra work to find the right one.
- networks = network.template.network_set.all()
- network = None
- for candidate_network in networks:
- if (candidate_network.owner == sliver.slice):
- print "found network", candidate_network
- network = candidate_network
-
- if not network:
- print "failed to find the correct network for a shared template for port", port["id"], "network", port["network_id"]
- continue
-
- if not port["fixed_ips"]:
- print "port", port["id"], "has no fixed_ips"
- continue
-
-# print "XXX", port
-
- ns = NetworkSliver(network=network,
- sliver=sliver,
- ip=port["fixed_ips"][0]["ip_address"],
- port_id=port["id"])
- ns.save()
-
- def sync_networks(self):
- """
- save all networks where enacted < updated or enacted == None. Remove networks that
- no don't exist in openstack db if they have an enacted time (enacted != None).
- """
- # get all users that need to be synced (enacted < updated or enacted is None)
- pending_networks = Network.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None))
- for network in pending_networks:
- if network.owner and network.owner.creator:
- try:
- # update manager context
- self.manager.init_caller(network.owner.creator, network.owner.name)
- self.manager.save_network(network)
- logger.info("saved network: %s" % (network))
- except:
- logger.log_exc("save network failed: %s" % network)
-
- # get all networks where enacted != null. We can assume these users
- # have previously been synced and need to be checed for deletion.
- networks = Network.objects.filter(enacted__isnull=False)
- network_dict = {}
- for network in networks:
- network_dict[network.network_id] = network
-
- # TODO: delete Network objects if quantum network doesn't exist
- # (need to write self.manager.driver.shell.quantum_db)
-