CORD-1010 add model_policy support to new_base synchronizer
Change-Id: I0ba137a0f65257709ab09be1a22668e09c5deb22
diff --git a/xos/coreapi/apihelper.py b/xos/coreapi/apihelper.py
index b389f65..3ef547f 100644
--- a/xos/coreapi/apihelper.py
+++ b/xos/coreapi/apihelper.py
@@ -233,6 +233,12 @@
queryset = djangoClass.objects.filter(query)
elif request.kind == request.SYNCHRONIZER_DELETED_OBJECTS:
queryset = djangoClass.deleted_objects.all()
+ elif request.kind == request.SYNCHRONIZER_DIRTY_POLICIES:
+ query = (Q(policed__lt=F('updated')) | Q(policed=None)) & Q(no_policy=False)
+ queryset = djangoClass.objects.filter(query)
+ elif request.kind == request.SYNCHRONIZER_DELETED_POLICIES:
+ query = Q(policed__lt=F('updated')) | Q(policed=None)
+ queryset = djangoClass.deleted_objects.filter(query)
elif request.kind == request.ALL:
queryset = djangoClass.objects.all()
diff --git a/xos/coreapi/protos/common.proto b/xos/coreapi/protos/common.proto
index 0cf84c2..f6b2940 100644
--- a/xos/coreapi/protos/common.proto
+++ b/xos/coreapi/protos/common.proto
@@ -29,6 +29,8 @@
ALL=1;
SYNCHRONIZER_DIRTY_OBJECTS = 2;
SYNCHRONIZER_DELETED_OBJECTS = 3;
+ SYNCHRONIZER_DIRTY_POLICIES = 4;
+ SYNCHRONIZER_DELETED_POLICIES = 5;
}
QueryKind kind = 1;
repeated QueryElement elements = 2;
diff --git a/xos/synchronizers/new_base/apiaccessor.py b/xos/synchronizers/new_base/apiaccessor.py
index c721ae1..9f40162 100644
--- a/xos/synchronizers/new_base/apiaccessor.py
+++ b/xos/synchronizers/new_base/apiaccessor.py
@@ -28,6 +28,20 @@
return objs
+ def fetch_policies(self, main_objs, deletion=False):
+ if (type(main_objs) is not list):
+ main_objs=[main_objs]
+
+ objs = []
+ for main_obj in main_objs:
+ if (not deletion):
+ lobjs = main_obj.objects.filter_special(main_obj.objects.SYNCHRONIZER_DIRTY_POLICIES)
+ else:
+ lobjs = main_obj.objects.filter_special(main_obj.objects.SYNCHRONIZER_DELETED_POLICIES)
+ objs.extend(lobjs)
+
+ return objs
+
def obj_exists(self, o):
# gRPC will default id to '0' for uninitialized objects
return (o.id is not None) and (o.id != 0)
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 660fd21..9faddca 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -72,6 +72,16 @@
watcher_thread = threading.Thread(target=watcher.run,name='watcher')
watcher_thread.start()
+ # start model policies thread
+ policies_dir = getattr(Config(), "observer_model_policies_dir", None)
+ if policies_dir:
+ from synchronizers.new_base.model_policy_loop import run_policy
+ model_policy_thread = threading.Thread(target=run_policy)
+ model_policy_thread.start()
+ else:
+ model_policy_thread = None
+ logger.info("Skipping model policies thread due to no model_policies dir.")
+
while True:
try:
time.sleep(1000)
diff --git a/xos/synchronizers/new_base/dependency_walker_new.py b/xos/synchronizers/new_base/dependency_walker_new.py
new file mode 100644
index 0000000..b5ffff8
--- /dev/null
+++ b/xos/synchronizers/new_base/dependency_walker_new.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+
+# TODO: Moved this into the synchronizer, as it appeared to require model
+# access. Verify whether or not that's true and reconcile with
+# generate/dependency_walker.py
+
+import os
+import imp
+from xos.config import Config, XOS_DIR
+import inspect
+import time
+import traceback
+import commands
+import threading
+import json
+
+from xos.logger import Logger, logging
+logger = Logger(level=logging.INFO)
+
+missing_links={}
+
+try:
+ dep_data = open(Config().dependency_graph).read()
+except:
+ dep_data = open(XOS_DIR + '/model-deps').read()
+
+dependencies = json.loads(dep_data)
+
+inv_dependencies = {}
+for k, lst in dependencies.items():
+ for v in lst:
+ try:
+ inv_dependencies[v].append(k)
+ except KeyError:
+ inv_dependencies[v]=[k]
+
+
+def plural(name):
+ if (name.endswith('s')):
+ return name+'es'
+ else:
+ return name+'s'
+
+
+def walk_deps(fn, object):
+ model = object.__class__.__name__
+ try:
+ deps = dependencies[model]
+ except:
+ deps = []
+ return __walk_deps(fn, object, deps)
+
+def walk_inv_deps(fn, object):
+ model = object.__class__.__name__
+ try:
+ deps = inv_dependencies[model]
+ except:
+ deps = []
+ return __walk_deps(fn, object, deps)
+
+def __walk_deps(fn, object, deps):
+ model = object.__class__.__name__
+ ret = []
+ for dep in deps:
+ #print "Checking dep %s"%dep
+ peer=None
+ link = dep.lower()
+ try:
+ peer = getattr(object, link)
+ except AttributeError:
+ link = plural(link)
+ try:
+ peer = getattr(object, link)
+ except AttributeError:
+ if not missing_links.has_key(model+'.'+link):
+ print "Model %s missing link for dependency %s"%(model, link)
+ logger.log_exc("WARNING: Model %s missing link for dependency %s."%(model, link))
+ missing_links[model+'.'+link]=True
+
+
+ if (peer):
+ try:
+ peer_objects = peer.all()
+ except AttributeError:
+ peer_objects = [peer]
+ except:
+ peer_objects = []
+
+ for o in peer_objects:
+ if (hasattr(o,'updated')):
+ fn(o, object)
+ ret.append(o)
+ # Uncomment the following line to enable recursion
+ # walk_inv_deps(fn, o)
+ return ret
+
+
diff --git a/xos/synchronizers/new_base/djangoaccessor.py b/xos/synchronizers/new_base/djangoaccessor.py
index 6703ffb..aa916c0 100644
--- a/xos/synchronizers/new_base/djangoaccessor.py
+++ b/xos/synchronizers/new_base/djangoaccessor.py
@@ -38,6 +38,20 @@
return objs
+ def fetch_policies(self, main_objs, deletion=False):
+ if (type(main_objs) is not list):
+ main_objs=[main_objs]
+
+ objs = []
+ for main_obj in main_objs:
+ if (not deletion):
+ res = main_obj.objects.filter((Q(policed__lt=F('updated')) | Q(policed=None)) & Q(no_policy=False))
+ else:
+ res = main_obj.deleted_objects.filter(Q(policed__lt=F('updated')) | Q(policed=None))
+ objs.extend(res)
+
+ return objs
+
def reset_queries(self):
reset_queries()
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 54d8acb..0c89a3c 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -16,7 +16,7 @@
from xos.logger import Logger, logging, logger
from xos.config import Config, XOS_DIR
from synchronizers.new_base.steps import *
-from syncstep import SyncStep
+from syncstep import SyncStep, NullSyncStep
from toposort import toposort
from synchronizers.new_base.error_mapper import *
from synchronizers.new_base.steps.sync_object import SyncObject
@@ -241,17 +241,24 @@
def lookup_step_class(self, s):
if ('#' in s):
- raise Exception("# is not supported anymore. I hope it wasn't important.")
-
- step = self.step_lookup[s]
+ return NullSyncStep
+ else:
+ step = self.step_lookup[s]
return step
def lookup_step(self, s):
if ('#' in s):
- raise Exception("# is not supported anymore. I hope it wasn't important.")
+ objname = s[1:]
+ so = NullSyncStep()
- step_class = self.step_lookup[s]
- step = step_class(driver=self.driver, error_map=self.error_mapper)
+ obj = model_accessor.get_model_class(objname)
+
+ so.provides = [obj]
+ so.observes = [obj]
+ step = so
+ else:
+ step_class = self.step_lookup[s]
+ step = step_class(driver=self.driver, error_map=self.error_mapper)
return step
def save_run_times(self):
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
new file mode 100644
index 0000000..eee49b1
--- /dev/null
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -0,0 +1,146 @@
+from synchronizers.new_base.modelaccessor import *
+from synchronizers.new_base.dependency_walker_new import *
+from xos.logger import Logger, logging
+
+import pdb
+import time
+import traceback
+
+modelPolicyEnabled = True
+bad_instances=[]
+
+model_policies = {}
+
+logger = Logger(level=logging.DEBUG)
+
+def EnableModelPolicy(x):
+ global modelPolicyEnabled
+ modelPolicyEnabled = x
+
+def update_wp(d, o):
+ try:
+ save_fields = []
+ if (d.write_protect != o.write_protect):
+ d.write_protect = o.write_protect
+ save_fields.append('write_protect')
+ if (save_fields):
+ d.save(update_fields=save_fields)
+ except AttributeError,e:
+ raise e
+
+def update_dep(d, o):
+ try:
+ print 'Trying to update %s'%d
+ save_fields = []
+ if (d.updated < o.updated):
+ save_fields = ['updated']
+
+ if (save_fields):
+ d.save(update_fields=save_fields)
+ except AttributeError,e:
+ logger.log_exc("AttributeError in update_dep")
+ raise e
+ except Exception,e:
+ logger.log_exc("Exception in update_dep")
+
+def delete_if_inactive(d, o):
+ try:
+ d.delete()
+ print "Deleted %s (%s)"%(d,d.__class__.__name__)
+ except:
+ pass
+ return
+
+def load_model_policies(policies_dir=None):
+ global model_policies
+
+ if policies_dir is None:
+ policies_dir = Config().observer_model_policies_dir
+
+ for fn in os.listdir(policies_dir):
+ pathname = os.path.join(policies_dir,fn)
+ if os.path.isfile(pathname) and fn.startswith("model_policy_") and fn.endswith(".py") and (fn!="__init__.py"):
+ model_policies[fn[:-3]] = imp.load_source(fn[:-3],pathname)
+
+ logger.debug("Loaded model polices %s from %s" % (",".join(model_policies.keys()), policies_dir))
+
+def execute_model_policy(instance, deleted):
+ # Automatic dirtying
+ if (instance in bad_instances):
+ return
+
+ # These are the models whose children get deleted when they are
+ delete_policy_models = ['Slice','Instance','Network']
+ sender_name = instance.__class__.__name__
+ policy_name = 'model_policy_%s'%sender_name
+ noargs = False
+
+ if (not deleted):
+ walk_inv_deps(update_dep, instance)
+ walk_deps(update_wp, instance)
+ elif (sender_name in delete_policy_models):
+ walk_inv_deps(delete_if_inactive, instance)
+
+ try:
+ policy_handler = model_policies.get(policy_name, None) # getattr(model_policies, policy_name, None)
+ logger.debug("MODEL POLICY: handler %s %s" % (policy_name, policy_handler))
+ if policy_handler is not None:
+ if (deleted):
+ try:
+ policy_handler.handle_delete(instance)
+ except AttributeError:
+ pass
+ else:
+ policy_handler.handle(instance)
+ logger.debug("MODEL POLICY: completed handler %s %s" % (policy_name, policy_handler))
+ except:
+ logger.log_exc("MODEL POLICY: Exception when running handler")
+
+ try:
+ instance.policed=model_accessor.now()
+ instance.save(update_fields=['policed'])
+ except:
+ logger.log_exc('MODEL POLICY: Object %r is defective'%instance)
+ bad_instances.append(instance)
+
+def noop(o,p):
+ pass
+
+def run_policy():
+ load_model_policies()
+
+ while (True):
+ start = time.time()
+ try:
+ run_policy_once()
+ except:
+ logger.log_exc("MODEL_POLICY: Exception in run_policy()")
+ if (time.time()-start<1):
+ time.sleep(1)
+
+def run_policy_once():
+ # TODO: Core-specific model list is hardcoded here. These models should
+ # be learned from the model_policy files, not hardcoded.
+
+ models = [Controller, Site, SitePrivilege, Image, ControllerSlice, ControllerSite, ControllerUser, User, Slice, Network, Instance, SlicePrivilege]
+
+ logger.debug("MODEL POLICY: run_policy_once()")
+
+ model_accessor.check_db_connection_okay()
+
+ objects = model_accessor.fetch_policies(models, False)
+ deleted_objects = model_accessor.fetch_policies(models, True)
+
+ for o in objects:
+ execute_model_policy(o, o.deleted)
+
+ for o in deleted_objects:
+ execute_model_policy(o, True)
+
+ try:
+ model_accessor.reset_queries()
+ except:
+ # this shouldn't happen, but in case it does, catch it...
+ logger.log_exc("MODEL POLICY: exception in reset_queries")
+
+ logger.debug("MODEL POLICY: finished run_policy_once()")
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
index a7c69bc..9b9e657 100644
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ b/xos/synchronizers/new_base/modelaccessor.py
@@ -32,7 +32,11 @@
""" Given a class name, return that model class """
return name in self.all_model_classes
- def fetch_pending(self, name, deletion=False):
+ def fetch_pending(self, main_objs, deletion=False):
+ """ Execute the default fetch_pending query """
+ raise Exception("Not Implemented")
+
+ def fetch_policies(self, main_objs, deletion=False):
""" Execute the default fetch_pending query """
raise Exception("Not Implemented")
diff --git a/xos/synchronizers/new_base/syncstep.py b/xos/synchronizers/new_base/syncstep.py
index 56ad412..cf5b40a 100644
--- a/xos/synchronizers/new_base/syncstep.py
+++ b/xos/synchronizers/new_base/syncstep.py
@@ -312,3 +312,13 @@
def __call__(self, **args):
return self.call(**args)
+
+# TODO: What does this do? It seems like it's just going to toss exceptions.
+
+class NullSyncStep(SyncStep): # was SyncObject
+ provides=[] # Caller fills this in
+ requested_interval=0
+ observes=[] # Caller fills this in
+
+ def sync_record(self, r):
+ raise DeferredException('Waiting for Service dependency: %r'%r)
diff --git a/xos/xos_client/xosapi/orm.py b/xos/xos_client/xosapi/orm.py
index f8bcc63..31a20a2 100644
--- a/xos/xos_client/xosapi/orm.py
+++ b/xos/xos_client/xosapi/orm.py
@@ -264,8 +264,10 @@
""" Manages a remote list of objects """
# constants better agree with common.proto
- SYNCHRONIZER_DIRTY_OBJECTS = 2;
- SYNCHRONIZER_DELETED_OBJECTS = 3;
+ SYNCHRONIZER_DIRTY_OBJECTS = 2
+ SYNCHRONIZER_DELETED_OBJECTS = 3
+ SYNCHRONIZER_DIRTY_POLICIES = 4
+ SYNCHRONIZER_DELETED_POLICIES = 5
def __init__(self, stub, modelName, packageName):
self._stub = stub