Merge branch 'master' of github.com:open-cloud/xos
diff --git a/xos/configurations/cord-pod/pod-exampleservice.yaml b/xos/configurations/cord-pod/pod-exampleservice.yaml
index 4e4835c..ec45bb8 100644
--- a/xos/configurations/cord-pod/pod-exampleservice.yaml
+++ b/xos/configurations/cord-pod/pod-exampleservice.yaml
@@ -18,7 +18,7 @@
no-delete: true
no-update: true
- service_vrouter:
+ service#vrouter:
type: tosca.nodes.Service
properties:
no-create: true
@@ -40,7 +40,7 @@
node: mysite_exampleservice
relationship: tosca.relationships.ConnectsToSlice
- vrouter_tenant:
- node: service_vrouter
+ node: service#vrouter
relationship: tosca.relationships.TenantOfService
mysite:
@@ -59,10 +59,10 @@
node: management
relationship: tosca.relationships.ConnectsToNetwork
- exmapleserver:
- node: service_exampleservice
+ node: service#exampleservice
relationship: tosca.relationships.MemberOfService
- service_exampleservice:
+ service#exampleservice:
type: tosca.nodes.ExampleService
requirements:
- management:
diff --git a/xos/synchronizers/openstack/ansible.py b/xos/synchronizers/openstack/ansible.py
deleted file mode 120000
index c20dc8b..0000000
--- a/xos/synchronizers/openstack/ansible.py
+++ /dev/null
@@ -1 +0,0 @@
-../base/ansible.py
\ No newline at end of file
diff --git a/xos/synchronizers/openstack/backend.py b/xos/synchronizers/openstack/backend.py
deleted file mode 100644
index 5f11d46..0000000
--- a/xos/synchronizers/openstack/backend.py
+++ /dev/null
@@ -1,46 +0,0 @@
-import os
-import sys
-import threading
-import time
-from synchronizers.base.event_loop import XOSObserver
-from synchronizers.base.event_manager import EventListener
-from xos.logger import Logger, logging
-from synchronizers.model_policy import run_policy
-from xos.config import Config
-
-logger = Logger(level=logging.INFO)
-
-class Backend:
-
- def run(self):
- # start the openstack observer
- observer = XOSObserver()
- observer_thread = threading.Thread(target=observer.run)
- observer_thread.start()
-
- # start model policies thread
- observer_name = getattr(Config(), "observer_name", "")
- if (not observer_name):
- model_policy_thread = threading.Thread(target=run_policy)
- model_policy_thread.start()
- else:
- model_policy_thread = None
- print "Skipping model policies thread for service observer."
-
-
- # start event listene
- #event_manager = EventListener(wake_up=observer.wake_up)
- #event_manager_thread = threading.Thread(target=event_manager.run)
- #event_manager_thread.start()
-
- while True:
- try:
- time.sleep(1000)
- except KeyboardInterrupt:
- print "exiting due to keyboard interrupt"
- # TODO: See about setting the threads as daemons
- observer_thread._Thread__stop()
- if model_policy_thread:
- model_policy_thread._Thread__stop()
- sys.exit(1)
-
diff --git a/xos/synchronizers/openstack/deleter.py b/xos/synchronizers/openstack/deleter.py
deleted file mode 100644
index 93fa572..0000000
--- a/xos/synchronizers/openstack/deleter.py
+++ /dev/null
@@ -1,16 +0,0 @@
-import os
-import base64
-from xos.config import Config
-
-class Deleter:
- model=None # Must be overridden
-
- def __init__(self, *args, **kwargs):
- pass
-
- def call(self, pk, model_dict):
- # Fetch object from XOS db and delete it
- pass
-
- def __call__(self, *args, **kwargs):
- return self.call(*args, **kwargs)
diff --git a/xos/synchronizers/openstack/event_loop.py b/xos/synchronizers/openstack/event_loop.py
deleted file mode 100644
index db78f07..0000000
--- a/xos/synchronizers/openstack/event_loop.py
+++ /dev/null
@@ -1,527 +0,0 @@
-import os
-import imp
-import inspect
-import time
-import sys
-import traceback
-import commands
-import threading
-import json
-import pdb
-import pprint
-
-
-from datetime import datetime
-from collections import defaultdict
-from core.models import *
-from django.db.models import F, Q
-from django.db import connection
-from django.db import reset_queries
-#from openstack.manager import OpenStackManager
-from openstack.driver import OpenStackDriver
-from xos.logger import Logger, logging, logger
-#from timeout import timeout
-from xos.config import Config, XOS_DIR
-from synchronizers.base.steps import *
-from synchronizers.base.syncstep import SyncStep
-from synchronizers.base.toposort import toposort
-from synchronizers.base.error_mapper import *
-from synchronizers.openstack.openstacksyncstep import OpenStackSyncStep
-from synchronizers.base.steps.sync_object import SyncObject
-
-# Load app models
-
-try:
- app_module_names = Config().observer_applist.split(',')
-except AttributeError:
- app_module_names = []
-
-if (type(app_module_names)!=list):
- app_module_names=[app_module_names]
-
-app_modules = []
-
-for m in app_module_names:
- model_path = m+'.models'
- module = __import__(model_path,fromlist=[m])
- app_modules.append(module)
-
-
-debug_mode = False
-
-class bcolors:
- HEADER = '\033[95m'
- OKBLUE = '\033[94m'
- OKGREEN = '\033[92m'
- WARNING = '\033[93m'
- FAIL = '\033[91m'
- ENDC = '\033[0m'
- BOLD = '\033[1m'
- UNDERLINE = '\033[4m'
-
-logger = Logger(level=logging.INFO)
-
-class StepNotReady(Exception):
- pass
-
-class NoOpDriver:
- def __init__(self):
- self.enabled = True
- self.dependency_graph = None
-
-STEP_STATUS_WORKING=1
-STEP_STATUS_OK=2
-STEP_STATUS_KO=3
-
-def invert_graph(g):
- ig = {}
- for k,v in g.items():
- for v0 in v:
- try:
- ig[v0].append(k)
- except:
- ig[v0]=[k]
- return ig
-
-class XOSObserver:
- sync_steps = []
-
-
- def __init__(self):
- # The Condition object that gets signalled by Feefie events
- self.step_lookup = {}
- self.load_sync_step_modules()
- self.load_sync_steps()
- self.event_cond = threading.Condition()
-
- self.driver_kind = getattr(Config(), "observer_driver", "openstack")
- self.observer_name = getattr(Config(), "observer_name", "")
- if self.driver_kind=="openstack":
- self.driver = OpenStackDriver()
- else:
- self.driver = NoOpDriver()
-
- def consolePrint(self, what):
- if getattr(Config(), "observer_console_print", True):
- print what
-
- def wait_for_event(self, timeout):
- self.event_cond.acquire()
- self.event_cond.wait(timeout)
- self.event_cond.release()
-
- def wake_up(self):
- logger.info('Wake up routine called. Event cond %r'%self.event_cond)
- self.event_cond.acquire()
- self.event_cond.notify()
- self.event_cond.release()
-
- def load_sync_step_modules(self, step_dir=None):
- if step_dir is None:
- if hasattr(Config(), "observer_steps_dir"):
- step_dir = Config().observer_steps_dir
- else:
- step_dir = XOS_DIR + "/synchronizers/openstack/steps"
-
- for fn in os.listdir(step_dir):
- pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
- module = imp.load_source(fn[:-3],pathname)
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- # make sure 'c' is a descendent of SyncStep and has a
- # provides field (this eliminates the abstract base classes
- # since they don't have a provides)
-
- if inspect.isclass(c) and (issubclass(c, SyncStep) or issubclass(c,OpenStackSyncStep)) and hasattr(c,"provides") and (c not in self.sync_steps):
- self.sync_steps.append(c)
- logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
-
- def load_sync_steps(self):
- dep_path = Config().observer_dependency_graph
- logger.info('Loading model dependency graph from %s' % dep_path)
- try:
- # This contains dependencies between records, not sync steps
- self.model_dependency_graph = json.loads(open(dep_path).read())
- for left,lst in self.model_dependency_graph.items():
- new_lst = []
- for k in lst:
- try:
- tup = (k,k.lower())
- new_lst.append(tup)
- deps = self.model_dependency_graph[k]
- except:
- self.model_dependency_graph[k] = []
-
- self.model_dependency_graph[left] = new_lst
- except Exception,e:
- raise e
-
- try:
- backend_path = Config().observer_pl_dependency_graph
- logger.info('Loading backend dependency graph from %s' % backend_path)
- # This contains dependencies between backend records
- self.backend_dependency_graph = json.loads(open(backend_path).read())
- for k,v in self.backend_dependency_graph.items():
- try:
- self.model_dependency_graph[k].extend(v)
- except KeyError:
- self.model_dependency_graphp[k] = v
-
- except Exception,e:
- logger.info('Backend dependency graph not loaded')
- # We can work without a backend graph
- self.backend_dependency_graph = {}
-
- provides_dict = {}
- for s in self.sync_steps:
- self.step_lookup[s.__name__] = s
- for m in s.provides:
- try:
- provides_dict[m.__name__].append(s.__name__)
- except KeyError:
- provides_dict[m.__name__]=[s.__name__]
-
- step_graph = {}
- phantom_steps = []
- for k,v in self.model_dependency_graph.items():
- try:
- for source in provides_dict[k]:
- if (not v):
- step_graph[source] = []
-
- for m,_ in v:
- try:
- for dest in provides_dict[m]:
- # no deps, pass
- try:
- if (dest not in step_graph[source]):
- step_graph[source].append(dest)
- except:
- step_graph[source]=[dest]
- except KeyError:
- if (not provides_dict.has_key(m)):
- try:
- step_graph[source]+=['#%s'%m]
- except:
- step_graph[source]=['#%s'%m]
-
- phantom_steps+=['#%s'%m]
- pass
-
- except KeyError:
- pass
- # no dependencies, pass
-
-
- self.dependency_graph = step_graph
- self.deletion_dependency_graph = invert_graph(step_graph)
-
- pp = pprint.PrettyPrinter(indent=4)
- logger.info(pp.pformat(step_graph))
- self.ordered_steps = toposort(self.dependency_graph, phantom_steps+map(lambda s:s.__name__,self.sync_steps))
- self.ordered_steps = [i for i in self.ordered_steps if i!='SyncObject']
-
- logger.info("Order of steps=%s" % self.ordered_steps)
-
- self.load_run_times()
-
-
- def check_duration(self, step, duration):
- try:
- if (duration > step.deadline):
- logger.info('Sync step %s missed deadline, took %.2f seconds'%(step.name,duration))
- except AttributeError:
- # S doesn't have a deadline
- pass
-
- def update_run_time(self, step, deletion):
- if (not deletion):
- self.last_run_times[step.__name__]=time.time()
- else:
- self.last_deletion_run_times[step.__name__]=time.time()
-
-
- def check_schedule(self, step, deletion):
- last_run_times = self.last_run_times if not deletion else self.last_deletion_run_times
-
- time_since_last_run = time.time() - last_run_times.get(step.__name__, 0)
- try:
- if (time_since_last_run < step.requested_interval):
- raise StepNotReady
- except AttributeError:
- logger.info('Step %s does not have requested_interval set'%step.__name__)
- raise StepNotReady
-
- def load_run_times(self):
- try:
- jrun_times = open('/tmp/%sobserver_run_times'%self.observer_name).read()
- self.last_run_times = json.loads(jrun_times)
- except:
- self.last_run_times={}
- for e in self.ordered_steps:
- self.last_run_times[e]=0
- try:
- jrun_times = open('/tmp/%sobserver_deletion_run_times'%self.observer_name).read()
- self.last_deletion_run_times = json.loads(jrun_times)
- except:
- self.last_deletion_run_times={}
- for e in self.ordered_steps:
- self.last_deletion_run_times[e]=0
-
- def lookup_step_class(self,s):
- if ('#' in s):
- return SyncObject
- else:
- step = self.step_lookup[s]
- return step
-
- def lookup_step(self,s):
- if ('#' in s):
- objname = s[1:]
- so = SyncObject()
-
- try:
- obj = globals()[objname]
- except:
- for m in app_modules:
- if (hasattr(m,objname)):
- obj = getattr(m,objname)
-
- so.provides=[obj]
- so.observes=[obj]
- step = so
- else:
- step_class = self.step_lookup[s]
- step = step_class(driver=self.driver,error_map=self.error_mapper)
- return step
-
- def save_run_times(self):
- run_times = json.dumps(self.last_run_times)
- open('/tmp/%sobserver_run_times'%self.observer_name,'w').write(run_times)
-
- deletion_run_times = json.dumps(self.last_deletion_run_times)
- open('/tmp/%sobserver_deletion_run_times'%self.observer_name,'w').write(deletion_run_times)
-
- def check_class_dependency(self, step, failed_steps):
- step.dependenices = []
- for obj in step.provides:
- lst = self.model_dependency_graph.get(obj.__name__, [])
- nlst = map(lambda(a,b):b,lst)
- step.dependenices.extend(nlst)
- for failed_step in failed_steps:
- if (failed_step in step.dependencies):
- raise StepNotReady
-
- def sync(self, S, deletion):
- try:
- step = self.lookup_step_class(S)
- start_time=time.time()
-
- logger.info("Starting to work on step %s, deletion=%s" % (step.__name__, str(deletion)))
-
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
- step_conditions = self.step_conditions# if not deletion else self.deletion_step_conditions
- step_status = self.step_status# if not deletion else self.deletion_step_status
-
- # Wait for step dependencies to be met
- try:
- deps = dependency_graph[S]
- has_deps = True
- except KeyError:
- has_deps = False
-
- go = True
-
- failed_dep = None
- if (has_deps):
- for d in deps:
- if d==step.__name__:
- logger.info(" step %s self-wait skipped" % step.__name__)
- go = True
- continue
-
- cond = step_conditions[d]
- cond.acquire()
- if (step_status[d] is STEP_STATUS_WORKING):
- logger.info(" step %s wait on dep %s" % (step.__name__, d))
- cond.wait()
- elif step_status[d] == STEP_STATUS_OK:
- go = True
- else:
- go = False
- failed_dep = d
- cond.release()
- if (not go):
- break
- else:
- go = True
-
- if (not go):
- self.consolePrint(bcolors.FAIL + "Step %r skipped on %r" % (step,failed_dep) + bcolors.ENDC)
- # SMBAKER: sync_step was not defined here, so I changed
- # this from 'sync_step' to 'step'. Verify.
- self.failed_steps.append(step)
- my_status = STEP_STATUS_KO
- else:
- sync_step = self.lookup_step(S)
- sync_step. __name__= step.__name__
- sync_step.dependencies = []
- try:
- mlist = sync_step.provides
-
- try:
- for m in mlist:
- lst = self.model_dependency_graph[m.__name__]
- nlst = map(lambda(a,b):b,lst)
- sync_step.dependencies.extend(nlst)
- except Exception,e:
- raise e
-
- except KeyError:
- pass
- sync_step.debug_mode = debug_mode
-
- should_run = False
- try:
- # Various checks that decide whether
- # this step runs or not
- self.check_class_dependency(sync_step, self.failed_steps) # dont run Slices if Sites failed
- self.check_schedule(sync_step, deletion) # dont run sync_network_routes if time since last run < 1 hour
- should_run = True
- except StepNotReady:
- logger.info('Step not ready: %s'%sync_step.__name__)
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
- except Exception,e:
- logger.error('%r' % e)
- logger.log_exc("sync step failed: %r. Deletion: %r"%(sync_step,deletion))
- self.failed_steps.append(sync_step)
- my_status = STEP_STATUS_KO
-
- if (should_run):
- try:
- duration=time.time() - start_time
-
- logger.info('Executing step %s, deletion=%s' % (sync_step.__name__, deletion))
-
- self.consolePrint(bcolors.OKBLUE + "Executing step %s" % sync_step.__name__ + bcolors.ENDC)
- failed_objects = sync_step(failed=list(self.failed_step_objects), deletion=deletion)
-
- self.check_duration(sync_step, duration)
-
- if failed_objects:
- self.failed_step_objects.update(failed_objects)
-
- logger.info("Step %r succeeded" % sync_step.__name__)
- self.consolePrint(bcolors.OKGREEN + "Step %r succeeded" % sync_step.__name__ + bcolors.ENDC)
- my_status = STEP_STATUS_OK
- self.update_run_time(sync_step,deletion)
- except Exception,e:
- self.consolePrint(bcolors.FAIL + "Model step %r failed" % (sync_step.__name__) + bcolors.ENDC)
- logger.error('Model step %r failed. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % (sync_step.__name__, e))
- logger.log_exc("Exception in sync step")
- self.failed_steps.append(S)
- my_status = STEP_STATUS_KO
- else:
- logger.info("Step %r succeeded due to non-run" % step)
- my_status = STEP_STATUS_OK
-
- try:
- my_cond = step_conditions[S]
- my_cond.acquire()
- step_status[S]=my_status
- my_cond.notify_all()
- my_cond.release()
- except KeyError,e:
- logger.info('Step %r is a leaf' % step)
- pass
- finally:
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries")
-
- connection.close()
-
- def run(self):
- if not self.driver.enabled:
- return
-
- if (self.driver_kind=="openstack") and (not self.driver.has_openstack):
- return
-
- while True:
- logger.info('Waiting for event')
- self.wait_for_event(timeout=5)
- logger.info('Observer woke up')
-
- self.run_once()
-
- def run_once(self):
- try:
- loop_start = time.time()
- error_map_file = getattr(Config(), "error_map_path", XOS_DIR + "/error_map.txt")
- self.error_mapper = ErrorMapper(error_map_file)
-
- # Two passes. One for sync, the other for deletion.
- for deletion in [False,True]:
- # Set of individual objects within steps that failed
- self.failed_step_objects = set()
-
- # Set up conditions and step status
- # This is needed for steps to run in parallel
- # while obeying dependencies.
-
- providers = set()
- dependency_graph = self.dependency_graph if not deletion else self.deletion_dependency_graph
-
- for v in dependency_graph.values():
- if (v):
- providers.update(v)
-
- self.step_conditions = {}
- self.step_status = {}
-
- for p in list(providers):
- self.step_conditions[p] = threading.Condition()
-
- self.step_status[p] = STEP_STATUS_WORKING
-
- self.failed_steps = []
-
- threads = []
- logger.info('Deletion=%r...'%deletion)
- schedule = self.ordered_steps if not deletion else reversed(self.ordered_steps)
-
- for S in schedule:
- thread = threading.Thread(target=self.sync, args=(S, deletion))
-
- logger.info('Deletion=%r...'%deletion)
- threads.append(thread)
-
- # Start threads
- for t in threads:
- t.start()
-
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries")
-
- # Wait for all threads to finish before continuing with the run loop
- for t in threads:
- t.join()
-
- self.save_run_times()
-
- loop_end = time.time()
- open('/tmp/%sobserver_last_run'%self.observer_name,'w').write(json.dumps({'last_run': loop_end, 'last_duration':loop_end - loop_start}))
- except Exception, e:
- logger.error('Core error. This seems like a misconfiguration or bug: %r. This error will not be relayed to the user!' % e)
- logger.log_exc("Exception in observer run loop")
- traceback.print_exc()
diff --git a/xos/synchronizers/openstack/event_manager.py b/xos/synchronizers/openstack/event_manager.py
deleted file mode 100644
index fce2b68..0000000
--- a/xos/synchronizers/openstack/event_manager.py
+++ /dev/null
@@ -1,120 +0,0 @@
-import threading
-import requests, json
-
-from xos.config import Config, XOS_DIR
-
-import uuid
-import os
-import imp
-import inspect
-import base64
-import json
-import traceback
-
-if getattr(Config(),"observer_fofum_disabled", False) != True:
- from fofum import Fofum
- fofum_enabled = True
-else:
- fofum_enabled = False
-
-random_client_id=None
-def get_random_client_id():
- global random_client_id
-
- if (random_client_id is None) and os.path.exists(XOS_DIR + "/random_client_id"):
- # try to use the last one we used, if we saved it
- try:
- random_client_id = open(XOS_DIR+"/random_client_id","r").readline().strip()
- print "get_random_client_id: loaded %s" % random_client_id
- except:
- print "get_random_client_id: failed to read " + XOS_DIR + "/random_client_id"
-
- if random_client_id is None:
- random_client_id = base64.urlsafe_b64encode(os.urandom(12))
- print "get_random_client_id: generated new id %s" % random_client_id
-
- # try to save it for later (XXX: could race with another client here)
- try:
- open(XOS_DIR + "/random_client_id","w").write("%s\n" % random_client_id)
- except:
- print "get_random_client_id: failed to write " + XOS_DIR + "/random_client_id"
-
- return random_client_id
-
-# decorator that marks dispatachable event methods
-def event(func):
- setattr(func, 'event', func.__name__)
- return func
-
-class EventHandler:
- # This code is currently not in use.
- def __init__(self):
- pass
-
- @staticmethod
- def get_events():
- events = []
- for name in dir(EventHandler):
- attribute = getattr(EventHandler, name)
- if hasattr(attribute, 'event'):
- events.append(getattr(attribute, 'event'))
- return events
-
- def dispatch(self, event, *args, **kwds):
- if hasattr(self, event):
- return getattr(self, event)(*args, **kwds)
-
-
-class EventSender:
- def __init__(self,user=None,clientid=None):
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
-
- try:
- clid = Config().feefie_client_id
- except:
- clid = get_random_client_id()
- print "EventSender: no feefie_client_id configured. Using random id %s" % clid
-
- if fofum_enabled:
- self.fofum = Fofum(user=user)
- self.fofum.make(clid)
-
- def fire(self,**kwargs):
- kwargs["uuid"] = str(uuid.uuid1())
- if fofum_enabled:
- self.fofum.fire(json.dumps(kwargs))
-
-class EventListener:
- def __init__(self,wake_up=None):
- self.handler = EventHandler()
- self.wake_up = wake_up
-
- def handle_event(self, payload):
- payload_dict = json.loads(payload)
-
- if (self.wake_up):
- self.wake_up()
-
- def run(self):
- # This is our unique client id, to be used when firing and receiving events
- # It needs to be generated once and placed in the config file
-
- try:
- user = Config().feefie_client_user
- except:
- user = 'pl'
-
- try:
- clid = Config().feefie_client_id
- except:
- clid = get_random_client_id()
- print "EventListener: no feefie_client_id configured. Using random id %s" % clid
-
- if fofum_enabled:
- f = Fofum(user=user)
-
- listener_thread = threading.Thread(target=f.listen_for_event,args=(clid,self.handle_event))
- listener_thread.start()
diff --git a/xos/synchronizers/openstack/run_ansible b/xos/synchronizers/openstack/run_ansible
deleted file mode 100755
index a504ec3..0000000
--- a/xos/synchronizers/openstack/run_ansible
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-source /opt/ansible/hacking/env-setup >> /dev/null
-ansible-playbook -v "$@"
diff --git a/xos/synchronizers/openstack/run_ansible_verbose b/xos/synchronizers/openstack/run_ansible_verbose
deleted file mode 100755
index d17cad7..0000000
--- a/xos/synchronizers/openstack/run_ansible_verbose
+++ /dev/null
@@ -1,4 +0,0 @@
-#!/bin/bash
-
-source /opt/ansible/hacking/env-setup >> /dev/null
-ansible-playbook -vvv "$@"
diff --git a/xos/synchronizers/openstack/syncstep.py b/xos/synchronizers/openstack/syncstep.py
deleted file mode 100644
index 0a01356..0000000
--- a/xos/synchronizers/openstack/syncstep.py
+++ /dev/null
@@ -1,307 +0,0 @@
-import os
-import base64
-from datetime import datetime
-from xos.config import Config
-from xos.logger import Logger, logging
-from synchronizers.base.steps import *
-from django.db.models import F, Q
-from core.models import *
-from django.db import reset_queries
-from synchronizers.base.ansible import *
-from generate.dependency_walker import *
-
-from time import time
-import json
-import time
-import pdb
-
-logger = Logger(level=logging.INFO)
-
-def f7(seq):
- seen = set()
- seen_add = seen.add
- return [ x for x in seq if not (x in seen or seen_add(x))]
-
-def elim_dups(backend_str):
- strs = backend_str.split(' // ')
- strs2 = f7(strs)
- return ' // '.join(strs2)
-
-def deepgetattr(obj, attr):
- return reduce(getattr, attr.split('.'), obj)
-
-
-class InnocuousException(Exception):
- pass
-
-class DeferredException(Exception):
- pass
-
-class FailedDependency(Exception):
- pass
-
-class SyncStep(object):
- """ An XOS Sync step.
-
- Attributes:
- psmodel Model name the step synchronizes
- dependencies list of names of models that must be synchronized first if the current model depends on them
- """
-
- # map_sync_outputs can return this value to cause a step to be marked
- # successful without running ansible. Used for sync_network_controllers
- # on nat networks.
- SYNC_WITHOUT_RUNNING = "sync_without_running"
-
- slow=False
- def get_prop(self, prop):
- try:
- sync_config_dir = Config().sync_config_dir
- except:
- sync_config_dir = '/etc/xos/sync'
- prop_config_path = '/'.join(sync_config_dir,self.name,prop)
- return open(prop_config_path).read().rstrip()
-
- def __init__(self, **args):
- """Initialize a sync step
- Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
- """
- dependencies = []
- self.driver = args.get('driver')
- self.error_map = args.get('error_map')
-
- try:
- self.soft_deadline = int(self.get_prop('soft_deadline_seconds'))
- except:
- self.soft_deadline = 5 # 5 seconds
-
- return
-
- def fetch_pending(self, deletion=False):
- # This is the most common implementation of fetch_pending
- # Steps should override it if they have their own logic
- # for figuring out what objects are outstanding.
-
- main_objs = self.observes
- if (type(main_objs) is not list):
- main_objs=[main_objs]
-
- objs = []
- for main_obj in main_objs:
- if (not deletion):
- lobjs = main_obj.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None),Q(lazy_blocked=False),Q(no_sync=False))
- else:
- lobjs = main_obj.deleted_objects.all()
- objs.extend(lobjs)
-
- return objs
- #return Instance.objects.filter(ip=None)
-
- def check_dependencies(self, obj, failed):
- for dep in self.dependencies:
- peer_name = dep[0].lower() + dep[1:] # django names are camelCased with the first letter lower
-
- peer_objects=[]
- try:
- peer_names = plural(peer_name)
- peer_object_list=[]
-
- try:
- peer_object_list.append(deepgetattr(obj, peer_name))
- except:
- pass
-
- try:
- peer_object_list.append(deepgetattr(obj, peer_names))
- except:
- pass
-
- for peer_object in peer_object_list:
- try:
- peer_objects.extend(peer_object.all())
- except AttributeError:
- peer_objects.append(peer_object)
- except:
- peer_objects = []
-
- if (hasattr(obj,'controller')):
- try:
- peer_objects = filter(lambda o:o.controller==obj.controller, peer_objects)
- except AttributeError:
- pass
-
- if (failed in peer_objects):
- if (obj.backend_status!=failed.backend_status):
- obj.backend_status = failed.backend_status
- obj.save(update_fields=['backend_status'])
- raise FailedDependency("Failed dependency for %s:%s peer %s:%s failed %s:%s" % (obj.__class__.__name__, str(getattr(obj,"pk","no_pk")), peer_object.__class__.__name__, str(getattr(peer_object,"pk","no_pk")), failed.__class__.__name__, str(getattr(failed,"pk","no_pk"))))
-
-
- def sync_record(self, o):
- try:
- controller = o.get_controller()
- controller_register = json.loads(controller.backend_register)
-
- if (controller_register.get('disabled',False)):
- raise InnocuousException('Controller %s is disabled'%controller.name)
- except AttributeError:
- pass
-
- tenant_fields = self.map_sync_inputs(o)
- if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
- return
- main_objs=self.observes
- if (type(main_objs) is list):
- main_objs=main_objs[0]
-
- path = ''.join(main_objs.__name__).lower()
- res = run_template(self.playbook,tenant_fields,path=path)
-
- try:
- self.map_sync_outputs(o,res)
- except AttributeError:
- pass
-
- def delete_record(self, o):
- try:
- controller = o.get_controller()
- controller_register = json.loads(o.node.site_deployment.controller.backend_register)
-
- if (controller_register.get('disabled',False)):
- raise InnocuousException('Controller %s is disabled'%sliver.node.site_deployment.controller.name)
- except AttributeError:
- pass
-
- tenant_fields = self.map_delete_inputs(o)
-
- main_objs=self.observes
- if (type(main_objs) is list):
- main_objs=main_objs[0]
-
- path = ''.join(main_objs.__name__).lower()
-
- tenant_fields['delete']=True
- res = run_template(self.playbook,tenant_fields,path=path)
- try:
- self.map_delete_outputs(o,res)
- except AttributeError:
- pass
-
- def call(self, failed=[], deletion=False):
- #if ('Instance' in self.__class__.__name__):
- # pdb.set_trace()
-
- pending = self.fetch_pending(deletion)
-
- for o in pending:
- # another spot to clean up debug state
- try:
- reset_queries()
- except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("exception in reset_queries",extra=o.tologdict())
-
- sync_failed = False
- try:
- backoff_disabled = Config().observer_backoff_disabled
- except:
- backoff_disabled = 0
-
- try:
- scratchpad = json.loads(o.backend_register)
- if (scratchpad):
- next_run = scratchpad['next_run']
- if (not backoff_disabled and next_run>time.time()):
- sync_failed = True
- except:
- logger.log_exc("Exception while loading scratchpad",extra=o.tologdict())
- pass
-
- if (not sync_failed):
- try:
- for f in failed:
- self.check_dependencies(o,f) # Raises exception if failed
- if (deletion):
- self.delete_record(o)
- o.delete(purge=True)
- else:
- self.sync_record(o)
- o.enacted = datetime.now() # Is this the same timezone? XXX
- scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time()}
- o.backend_register = json.dumps(scratchpad)
- o.backend_status = "1 - OK"
- o.save(update_fields=['enacted','backend_status','backend_register'])
- except (InnocuousException,Exception,DeferredException) as e:
- logger.log_exc("sync step failed!",extra=o.tologdict())
- try:
- if (o.backend_status.startswith('2 - ')):
- str_e = '%s // %r'%(o.backend_status[4:],e)
- str_e = elim_dups(str_e)
- else:
- str_e = '%r'%e
- except:
- str_e = '%r'%e
-
- try:
- error = self.error_map.map(str_e)
- except:
- error = '%s'%str_e
-
- if isinstance(e, InnocuousException) and not force_error:
- o.backend_status = '1 - %s'%error
- else:
- o.backend_status = '2 - %s'%error
-
- try:
- scratchpad = json.loads(o.backend_register)
- scratchpad['exponent']
- except:
- logger.log_exc("Exception while updating scratchpad",extra=o.tologdict())
- scratchpad = {'next_run':0, 'exponent':0, 'last_success':time.time(),'failures':0}
-
- # Second failure
- if (scratchpad['exponent']):
- if isinstance(e,DeferredException):
- delay = scratchpad['exponent'] * 60 # 1 minute
- else:
- delay = scratchpad['exponent'] * 600 # 10 minutes
- # cap delays at 8 hours
- if (delay>8*60*60):
- delay=8*60*60
- scratchpad['next_run'] = time.time() + delay
-
- try:
- scratchpad['exponent']+=1
- except:
- scratchpad['exponent']=1
-
- try:
- scratchpad['failures']+=1
- except KeyError:
- scratchpad['failures']=1
-
- scratchpad['last_failure']=time.time()
-
- o.backend_register = json.dumps(scratchpad)
-
- # TOFIX:
- # DatabaseError: value too long for type character varying(140)
- if (o.pk):
- try:
- o.backend_status = o.backend_status[:1024]
- o.save(update_fields=['backend_status','backend_register','updated'])
- except:
- print "Could not update backend status field!"
- pass
- sync_failed = True
-
-
- if (sync_failed):
- failed.append(o)
-
- return failed
-
- def __call__(self, **args):
- return self.call(**args)
diff --git a/xos/synchronizers/openstack/toposort.py b/xos/synchronizers/openstack/toposort.py
deleted file mode 100644
index 6839861..0000000
--- a/xos/synchronizers/openstack/toposort.py
+++ /dev/null
@@ -1,72 +0,0 @@
-#!/usr/bin/env python
-
-import time
-import traceback
-import commands
-import threading
-import json
-import pdb
-
-from datetime import datetime
-from collections import defaultdict
-
-# Topological sort
-# Notes:
-# - Uses a stack instead of recursion
-# - Forfeits optimization involving tracking currently visited nodes
-def toposort(g, steps=None):
- # Get set of all nodes, including those without outgoing edges
- keys = set(g.keys())
- values = set({})
- for v in g.values():
- values=values | set(v)
-
- all_nodes=list(keys|values)
- if (not steps):
- steps = all_nodes
-
- # Final order
- order = []
-
- # DFS stack, not using recursion
- stack = []
-
- # Unmarked set
- unmarked = all_nodes
-
- # visiting = [] - skip, don't expect 1000s of nodes, |E|/|V| is small
-
- while unmarked:
- stack.insert(0,unmarked[0]) # push first unmarked
-
- while (stack):
- n = stack[0]
- add = True
- try:
- for m in g[n]:
- if (m in unmarked):
- add = False
- stack.insert(0,m)
- except KeyError:
- pass
- if (add):
- if (n in steps and n not in order):
- order.append(n)
- item = stack.pop(0)
- try:
- unmarked.remove(item)
- except ValueError:
- pass
-
- noorder = list(set(steps) - set(order))
- return order + noorder
-
-def main():
- graph_file=open('xos.deps').read()
- g = json.loads(graph_file)
- print toposort(g)
-
-if (__name__=='__main__'):
- main()
-
-#print toposort({'a':'b','b':'c','c':'d','d':'c'},['d','c','b','a'])