CORD-1010 add model_policy support to new_base synchronizer

Change-Id: I0ba137a0f65257709ab09be1a22668e09c5deb22
diff --git a/xos/coreapi/apihelper.py b/xos/coreapi/apihelper.py
index b389f65..3ef547f 100644
--- a/xos/coreapi/apihelper.py
+++ b/xos/coreapi/apihelper.py
@@ -233,6 +233,12 @@
             queryset = djangoClass.objects.filter(query)
         elif request.kind == request.SYNCHRONIZER_DELETED_OBJECTS:
             queryset = djangoClass.deleted_objects.all()
+        elif request.kind == request.SYNCHRONIZER_DIRTY_POLICIES:
+            query = (Q(policed__lt=F('updated')) | Q(policed=None)) & Q(no_policy=False)
+            queryset = djangoClass.objects.filter(query)
+        elif request.kind == request.SYNCHRONIZER_DELETED_POLICIES:
+            query = Q(policed__lt=F('updated')) | Q(policed=None)
+            queryset = djangoClass.deleted_objects.filter(query)
         elif request.kind == request.ALL:
             queryset = djangoClass.objects.all()
 
diff --git a/xos/coreapi/protos/common.proto b/xos/coreapi/protos/common.proto
index 0cf84c2..f6b2940 100644
--- a/xos/coreapi/protos/common.proto
+++ b/xos/coreapi/protos/common.proto
@@ -29,6 +29,8 @@
         ALL=1;
         SYNCHRONIZER_DIRTY_OBJECTS = 2;
         SYNCHRONIZER_DELETED_OBJECTS = 3;
+        SYNCHRONIZER_DIRTY_POLICIES = 4;
+        SYNCHRONIZER_DELETED_POLICIES = 5;
     }
     QueryKind kind = 1;
     repeated QueryElement elements = 2;
diff --git a/xos/synchronizers/new_base/apiaccessor.py b/xos/synchronizers/new_base/apiaccessor.py
index c721ae1..9f40162 100644
--- a/xos/synchronizers/new_base/apiaccessor.py
+++ b/xos/synchronizers/new_base/apiaccessor.py
@@ -28,6 +28,20 @@
 
         return objs
 
+    def fetch_policies(self, main_objs, deletion=False):
+        if (type(main_objs) is not list):
+                main_objs=[main_objs]
+
+        objs = []
+        for main_obj in main_objs:
+            if (not deletion):
+                lobjs = main_obj.objects.filter_special(main_obj.objects.SYNCHRONIZER_DIRTY_POLICIES)
+            else:
+                lobjs = main_obj.objects.filter_special(main_obj.objects.SYNCHRONIZER_DELETED_POLICIES)
+            objs.extend(lobjs)
+
+        return objs
+
     def obj_exists(self, o):
         # gRPC will default id to '0' for uninitialized objects
         return (o.id is not None) and (o.id != 0)
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 660fd21..9faddca 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -72,6 +72,16 @@
             watcher_thread = threading.Thread(target=watcher.run,name='watcher')
             watcher_thread.start()
 
+        # start model policies thread
+        policies_dir = getattr(Config(), "observer_model_policies_dir", None)
+        if policies_dir:
+            from synchronizers.new_base.model_policy_loop import run_policy
+            model_policy_thread = threading.Thread(target=run_policy)
+            model_policy_thread.start()
+        else:
+            model_policy_thread = None
+            logger.info("Skipping model policies thread due to no model_policies dir.")
+
         while True:
             try:
                 time.sleep(1000)
diff --git a/xos/synchronizers/new_base/dependency_walker_new.py b/xos/synchronizers/new_base/dependency_walker_new.py
new file mode 100644
index 0000000..b5ffff8
--- /dev/null
+++ b/xos/synchronizers/new_base/dependency_walker_new.py
@@ -0,0 +1,97 @@
+#!/usr/bin/env python
+
+# TODO: Moved this into the synchronizer, as it appeared to require model
+#       access. Verify whether or not that's true and reconcile with
+#       generate/dependency_walker.py
+
+import os
+import imp
+from xos.config import Config, XOS_DIR
+import inspect
+import time
+import traceback
+import commands
+import threading
+import json
+
+from xos.logger import Logger, logging
+logger = Logger(level=logging.INFO)
+
+missing_links={}
+
+try:
+	dep_data = open(Config().dependency_graph).read()
+except:
+	dep_data = open(XOS_DIR + '/model-deps').read()
+
+dependencies = json.loads(dep_data)
+
+inv_dependencies = {}
+for k, lst in dependencies.items():
+	for v in lst:
+		try:
+			inv_dependencies[v].append(k)
+		except KeyError:
+			inv_dependencies[v]=[k]
+	
+
+def plural(name):
+	if (name.endswith('s')):
+		return name+'es'
+	else:
+		return name+'s'
+
+
+def walk_deps(fn, object):
+	model = object.__class__.__name__
+	try:	
+		deps = dependencies[model]
+	except:
+		deps = []
+	return __walk_deps(fn, object, deps)
+
+def walk_inv_deps(fn, object):
+	model = object.__class__.__name__
+	try:	
+		deps = inv_dependencies[model]
+	except:
+		deps = []
+	return __walk_deps(fn, object, deps)
+
+def __walk_deps(fn, object, deps):
+	model = object.__class__.__name__
+	ret = []
+	for dep in deps:
+		#print "Checking dep %s"%dep
+		peer=None
+		link = dep.lower()
+		try:
+			peer = getattr(object, link)
+		except AttributeError:
+			link = plural(link)
+			try:
+				peer = getattr(object, link)
+			except AttributeError:
+				if not missing_links.has_key(model+'.'+link):
+					print "Model %s missing link for dependency %s"%(model, link)
+                                        logger.log_exc("WARNING: Model %s missing link for dependency %s."%(model, link))
+					missing_links[model+'.'+link]=True
+
+
+		if (peer):
+			try:
+				peer_objects = peer.all()
+			except AttributeError:
+				peer_objects = [peer]
+			except:
+				peer_objects = []
+
+			for o in peer_objects:
+				if (hasattr(o,'updated')):
+					fn(o, object)
+					ret.append(o)
+				# Uncomment the following line to enable recursion
+				# walk_inv_deps(fn, o)
+	return ret
+
+
diff --git a/xos/synchronizers/new_base/djangoaccessor.py b/xos/synchronizers/new_base/djangoaccessor.py
index 6703ffb..aa916c0 100644
--- a/xos/synchronizers/new_base/djangoaccessor.py
+++ b/xos/synchronizers/new_base/djangoaccessor.py
@@ -38,6 +38,20 @@
 
         return objs
 
+    def fetch_policies(self, main_objs, deletion=False):
+        if (type(main_objs) is not list):
+                main_objs=[main_objs]
+
+        objs = []
+        for main_obj in main_objs:
+            if (not deletion):
+                res = main_obj.objects.filter((Q(policed__lt=F('updated')) | Q(policed=None)) & Q(no_policy=False))
+            else:
+                res = main_obj.deleted_objects.filter(Q(policed__lt=F('updated')) | Q(policed=None))
+            objs.extend(res)
+
+        return objs
+
     def reset_queries(self):
         reset_queries()
 
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 54d8acb..0c89a3c 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -16,7 +16,7 @@
 from xos.logger import Logger, logging, logger
 from xos.config import Config, XOS_DIR
 from synchronizers.new_base.steps import *
-from syncstep import SyncStep
+from syncstep import SyncStep, NullSyncStep
 from toposort import toposort
 from synchronizers.new_base.error_mapper import *
 from synchronizers.new_base.steps.sync_object import SyncObject
@@ -241,17 +241,24 @@
 
     def lookup_step_class(self, s):
         if ('#' in s):
-            raise Exception("# is not supported anymore. I hope it wasn't important.")
-
-        step = self.step_lookup[s]
+            return NullSyncStep
+        else:
+            step = self.step_lookup[s]
         return step
 
     def lookup_step(self, s):
         if ('#' in s):
-            raise Exception("# is not supported anymore. I hope it wasn't important.")
+            objname = s[1:]
+            so = NullSyncStep()
 
-        step_class = self.step_lookup[s]
-        step = step_class(driver=self.driver, error_map=self.error_mapper)
+            obj = model_accessor.get_model_class(objname)
+
+            so.provides = [obj]
+            so.observes = [obj]
+            step = so
+        else:
+            step_class = self.step_lookup[s]
+            step = step_class(driver=self.driver, error_map=self.error_mapper)
         return step
 
     def save_run_times(self):
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
new file mode 100644
index 0000000..eee49b1
--- /dev/null
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -0,0 +1,146 @@
+from synchronizers.new_base.modelaccessor import *
+from synchronizers.new_base.dependency_walker_new import *
+from xos.logger import Logger, logging
+
+import pdb
+import time
+import traceback
+
+modelPolicyEnabled = True
+bad_instances=[]
+
+model_policies = {}
+
+logger = Logger(level=logging.DEBUG)
+
+def EnableModelPolicy(x):
+    global modelPolicyEnabled
+    modelPolicyEnabled = x
+
+def update_wp(d, o):
+    try:
+        save_fields = []
+        if (d.write_protect != o.write_protect):
+            d.write_protect = o.write_protect
+            save_fields.append('write_protect')
+        if (save_fields):
+            d.save(update_fields=save_fields)
+    except AttributeError,e:
+        raise e
+
+def update_dep(d, o):
+    try:
+        print 'Trying to update %s'%d
+        save_fields = []
+        if (d.updated < o.updated):
+            save_fields = ['updated']
+
+        if (save_fields):
+            d.save(update_fields=save_fields)
+    except AttributeError,e:
+        logger.log_exc("AttributeError in update_dep")
+        raise e
+    except Exception,e:
+        logger.log_exc("Exception in update_dep")
+
+def delete_if_inactive(d, o):
+    try:
+        d.delete()
+        print "Deleted %s (%s)"%(d,d.__class__.__name__)
+    except:
+        pass
+    return
+
+def load_model_policies(policies_dir=None):
+    global model_policies
+
+    if policies_dir is None:
+            policies_dir = Config().observer_model_policies_dir
+
+    for fn in os.listdir(policies_dir):
+            pathname = os.path.join(policies_dir,fn)
+            if os.path.isfile(pathname) and fn.startswith("model_policy_") and fn.endswith(".py") and (fn!="__init__.py"):
+                model_policies[fn[:-3]] = imp.load_source(fn[:-3],pathname)
+
+    logger.debug("Loaded model polices %s from %s" % (",".join(model_policies.keys()), policies_dir))
+
+def execute_model_policy(instance, deleted):
+    # Automatic dirtying
+    if (instance in bad_instances):
+        return
+
+    # These are the models whose children get deleted when they are
+    delete_policy_models = ['Slice','Instance','Network']
+    sender_name = instance.__class__.__name__
+    policy_name = 'model_policy_%s'%sender_name
+    noargs = False
+
+    if (not deleted):
+        walk_inv_deps(update_dep, instance)
+        walk_deps(update_wp, instance)
+    elif (sender_name in delete_policy_models):
+        walk_inv_deps(delete_if_inactive, instance)
+
+    try:
+        policy_handler = model_policies.get(policy_name, None) # getattr(model_policies, policy_name, None)
+        logger.debug("MODEL POLICY: handler %s %s" % (policy_name, policy_handler))
+        if policy_handler is not None:
+            if (deleted):
+                try:
+                    policy_handler.handle_delete(instance)
+                except AttributeError:
+                    pass
+            else:
+                policy_handler.handle(instance)
+        logger.debug("MODEL POLICY: completed handler %s %s" % (policy_name, policy_handler))
+    except:
+        logger.log_exc("MODEL POLICY: Exception when running handler")
+
+    try:
+        instance.policed=model_accessor.now()
+        instance.save(update_fields=['policed'])
+    except:
+        logger.log_exc('MODEL POLICY: Object %r is defective'%instance)
+        bad_instances.append(instance)
+
+def noop(o,p):
+        pass
+
+def run_policy():
+    load_model_policies()
+
+    while (True):
+        start = time.time()
+        try:
+            run_policy_once()
+        except:
+            logger.log_exc("MODEL_POLICY: Exception in run_policy()")
+        if (time.time()-start<1):
+            time.sleep(1)
+
+def run_policy_once():
+        # TODO: Core-specific model list is hardcoded here. These models should
+        # be learned from the model_policy files, not hardcoded.
+
+        models = [Controller, Site, SitePrivilege, Image, ControllerSlice, ControllerSite, ControllerUser, User, Slice, Network, Instance, SlicePrivilege]
+
+        logger.debug("MODEL POLICY: run_policy_once()")
+
+        model_accessor.check_db_connection_okay()
+
+        objects = model_accessor.fetch_policies(models, False)
+        deleted_objects = model_accessor.fetch_policies(models, True)
+
+        for o in objects:
+            execute_model_policy(o, o.deleted)
+
+        for o in deleted_objects:
+            execute_model_policy(o, True)
+
+        try:
+            model_accessor.reset_queries()
+        except:
+            # this shouldn't happen, but in case it does, catch it...
+            logger.log_exc("MODEL POLICY: exception in reset_queries")
+
+        logger.debug("MODEL POLICY: finished run_policy_once()")
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
index a7c69bc..9b9e657 100644
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ b/xos/synchronizers/new_base/modelaccessor.py
@@ -32,7 +32,11 @@
         """ Given a class name, return that model class """
         return name in self.all_model_classes
 
-    def fetch_pending(self, name, deletion=False):
+    def fetch_pending(self, main_objs, deletion=False):
+        """ Execute the default fetch_pending query """
+        raise Exception("Not Implemented")
+
+    def fetch_policies(self, main_objs, deletion=False):
         """ Execute the default fetch_pending query """
         raise Exception("Not Implemented")
 
diff --git a/xos/synchronizers/new_base/syncstep.py b/xos/synchronizers/new_base/syncstep.py
index 56ad412..cf5b40a 100644
--- a/xos/synchronizers/new_base/syncstep.py
+++ b/xos/synchronizers/new_base/syncstep.py
@@ -312,3 +312,13 @@
 
     def __call__(self, **args):
         return self.call(**args)
+
+# TODO: What does this do? It seems like it's just going to toss exceptions.
+
+class NullSyncStep(SyncStep):   # was SyncObject
+    provides=[] # Caller fills this in
+    requested_interval=0
+    observes=[] # Caller fills this in
+
+    def sync_record(self, r):
+        raise DeferredException('Waiting for Service dependency: %r'%r)
diff --git a/xos/xos_client/xosapi/orm.py b/xos/xos_client/xosapi/orm.py
index f8bcc63..31a20a2 100644
--- a/xos/xos_client/xosapi/orm.py
+++ b/xos/xos_client/xosapi/orm.py
@@ -264,8 +264,10 @@
     """ Manages a remote list of objects """
 
     # constants better agree with common.proto
-    SYNCHRONIZER_DIRTY_OBJECTS = 2;
-    SYNCHRONIZER_DELETED_OBJECTS = 3;
+    SYNCHRONIZER_DIRTY_OBJECTS = 2
+    SYNCHRONIZER_DELETED_OBJECTS = 3
+    SYNCHRONIZER_DIRTY_POLICIES = 4
+    SYNCHRONIZER_DELETED_POLICIES = 5
 
     def __init__(self, stub, modelName, packageName):
         self._stub = stub