CORD-1242 Generalize Model Policy Framework

Change-Id: Ic14c4f9be818dcbe0ea98ff562f05b093dda132c
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 9faddca..72de08f 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -6,6 +6,7 @@
 import time
 from synchronizers.new_base.syncstep import SyncStep
 from synchronizers.new_base.event_loop import XOSObserver
+from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
 from synchronizers.new_base.modelaccessor import *
 from xos.logger import Logger, logging
 from xos.config import Config
@@ -75,8 +76,8 @@
         # start model policies thread
         policies_dir = getattr(Config(), "observer_model_policies_dir", None)
         if policies_dir:
-            from synchronizers.new_base.model_policy_loop import run_policy
-            model_policy_thread = threading.Thread(target=run_policy)
+            policy_engine = XOSPolicyEngine(policies_dir=policies_dir)
+            model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine")
             model_policy_thread.start()
         else:
             model_policy_thread = None
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
index 61fb7c3..fe15f09 100644
--- a/xos/synchronizers/new_base/model_policy_loop.py
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -1,146 +1,156 @@
 from synchronizers.new_base.modelaccessor import *
 from synchronizers.new_base.dependency_walker_new import *
+from synchronizers.new_base.policy import Policy
 from xos.logger import Logger, logging
 
+import imp
 import pdb
 import time
 import traceback
 
-modelPolicyEnabled = True
-bad_instances=[]
-
-model_policies = {}
-
 logger = Logger(level=logging.DEBUG)
 
-def EnableModelPolicy(x):
-    global modelPolicyEnabled
-    modelPolicyEnabled = x
+class XOSPolicyEngine(object):
+    def __init__(self, policies_dir):
+        self.model_policies = self.load_model_policies(policies_dir)
+        self.policies_by_name = {}
+        self.policies_by_class = {}
 
-def update_wp(d, o):
-    try:
-        save_fields = []
-        if (d.write_protect != o.write_protect):
-            d.write_protect = o.write_protect
-            save_fields.append('write_protect')
-        if (save_fields):
-            d.save(update_fields=save_fields)
-    except AttributeError,e:
-        raise e
+        for policy in self.model_policies:
+            if not policy.model_name in self.policies_by_name:
+                self.policies_by_name[policy.model_name] = []
+            self.policies_by_name[policy.model_name].append(policy)
 
-def update_dep(d, o):
-    try:
-        print 'Trying to update %s'%d
-        save_fields = []
-        if (d.updated < o.updated):
-            save_fields = ['updated']
+            if not policy.model in self.policies_by_class:
+                self.policies_by_class[policy.model] = []
+            self.policies_by_class[policy.model].append(policy)
 
-        if (save_fields):
-            d.save(update_fields=save_fields)
-    except AttributeError,e:
-        logger.log_exc("AttributeError in update_dep")
-        raise e
-    except Exception,e:
-        logger.log_exc("Exception in update_dep")
+    def update_wp(self, d, o):
+        try:
+            save_fields = []
+            if (d.write_protect != o.write_protect):
+                d.write_protect = o.write_protect
+                save_fields.append('write_protect')
+            if (save_fields):
+                d.save(update_fields=save_fields)
+        except AttributeError,e:
+            raise e
 
-def delete_if_inactive(d, o):
-    try:
-        d.delete()
-        print "Deleted %s (%s)"%(d,d.__class__.__name__)
-    except:
-        pass
-    return
+    def update_dep(self, d, o):
+        try:
+            print 'Trying to update %s'%d
+            save_fields = []
+            if (d.updated < o.updated):
+                save_fields = ['updated']
 
-def load_model_policies(policies_dir=None):
-    global model_policies
+            if (save_fields):
+                d.save(update_fields=save_fields)
+        except AttributeError,e:
+            logger.log_exc("AttributeError in update_dep")
+            raise e
+        except Exception,e:
+            logger.log_exc("Exception in update_dep")
 
-    if policies_dir is None:
-            policies_dir = Config().observer_model_policies_dir
-
-    for fn in os.listdir(policies_dir):
-            pathname = os.path.join(policies_dir,fn)
-            if os.path.isfile(pathname) and fn.startswith("model_policy_") and fn.endswith(".py") and (fn!="__init__.py"):
-                model_policies[fn[:-3]] = imp.load_source(fn[:-3],pathname)
-
-    logger.debug("Loaded model polices %s from %s" % (",".join(model_policies.keys()), policies_dir))
-
-def execute_model_policy(instance, deleted):
-    # Automatic dirtying
-    if (instance in bad_instances):
+    def delete_if_inactive(self, d, o):
+        try:
+            d.delete()
+            print "Deleted %s (%s)"%(d,d.__class__.__name__)
+        except:
+            pass
         return
 
-    # These are the models whose children get deleted when they are
-    delete_policy_models = ['Slice','Instance','Network']
-    sender_name = getattr(instance, "model_name", instance.__class__.__name__)
-    policy_name = 'model_policy_%s'%sender_name
-    noargs = False
+    def load_model_policies(self, policies_dir):
+        policies=[]
+        for fn in os.listdir(policies_dir):
+                pathname = os.path.join(policies_dir,fn)
+                if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+                    module = imp.load_source(fn[:-3], pathname)
+                    for classname in dir(module):
+                        c = getattr(module, classname, None)
 
-    if (not deleted):
-        walk_inv_deps(update_dep, instance)
-        walk_deps(update_wp, instance)
-    elif (sender_name in delete_policy_models):
-        walk_inv_deps(delete_if_inactive, instance)
+                        # make sure 'c' is a descendent of Policy and has a
+                        # provides field (this eliminates the abstract base classes
+                        # since they don't have a provides)
 
-    try:
-        policy_handler = model_policies.get(policy_name, None)
-        logger.debug("MODEL POLICY: handler %s %s" % (policy_name, policy_handler))
-        if policy_handler is not None:
-            if (deleted):
+                        if inspect.isclass(c) and issubclass(c, Policy) and hasattr(c, "model_name") and (
+                            c not in policies):
+                            if not model_accessor.has_model_class(c.model_name):
+                                logger.error("load_model_policies: unable to find model policy %s" % c.model_name)
+                            c.model = model_accessor.get_model_class(c.model_name)
+                            policies.append(c)
+
+        logger.info("Loaded %s model policies" % len(policies))
+        return policies
+
+    def execute_model_policy(self, instance, action):
+        # These are the models whose children get deleted when they are
+        delete_policy_models = ['Slice','Instance','Network']
+        sender_name = getattr(instance, "model_name", instance.__class__.__name__)
+
+        #if (action != "deleted"):
+        #    walk_inv_deps(self.update_dep, instance)
+        #    walk_deps(self.update_wp, instance)
+        #elif (sender_name in delete_policy_models):
+        #    walk_inv_deps(self.delete_if_inactive, instance)
+
+        for policy in self.policies_by_name.get(sender_name, None):
+            method_name= "handle_%s" % action
+            if hasattr(policy, method_name):
                 try:
-                    policy_handler.handle_delete(instance)
-                except AttributeError:
-                    pass
-            else:
-                policy_handler.handle(instance)
-        logger.debug("MODEL POLICY: completed handler %s %s" % (policy_name, policy_handler))
-    except:
-        logger.log_exc("MODEL POLICY: Exception when running handler")
-
-    try:
-        instance.policed=model_accessor.now()
-        instance.save(update_fields=['policed'])
-    except:
-        logger.log_exc('MODEL POLICY: Object %r is defective'%instance)
-        bad_instances.append(instance)
-
-def noop(o,p):
-        pass
-
-def run_policy():
-    load_model_policies()
-
-    while (True):
-        start = time.time()
-        try:
-            run_policy_once()
-        except:
-            logger.log_exc("MODEL_POLICY: Exception in run_policy()")
-        if (time.time()-start<1):
-            time.sleep(1)
-
-def run_policy_once():
-        # TODO: Core-specific model list is hardcoded here. These models should
-        # be learned from the model_policy files, not hardcoded.
-
-        models = [Controller, Site, SitePrivilege, Image, ControllerSlice, ControllerSite, ControllerUser, User, Slice, Network, Instance, SlicePrivilege]
-
-        logger.debug("MODEL POLICY: run_policy_once()")
-
-        model_accessor.check_db_connection_okay()
-
-        objects = model_accessor.fetch_policies(models, False)
-        deleted_objects = model_accessor.fetch_policies(models, True)
-
-        for o in objects:
-            execute_model_policy(o, o.deleted)
-
-        for o in deleted_objects:
-            execute_model_policy(o, True)
+                    logger.debug("MODEL POLICY: calling handler %s %s %s %s" % (sender_name, instance, policy.__name__, method_name))
+                    getattr(policy(), method_name)(instance)
+                    logger.debug("MODEL POLICY: completed handler %s %s %s %s" % (sender_name, instance, policy.__name__, method_name))
+                except:
+                    logger.log_exc("MODEL POLICY: Exception when running handler")
 
         try:
-            model_accessor.reset_queries()
+            instance.policed=model_accessor.now()
+            instance.save(update_fields=['policed'])
         except:
-            # this shouldn't happen, but in case it does, catch it...
-            logger.log_exc("MODEL POLICY: exception in reset_queries")
+            logger.log_exc('MODEL POLICY: Object %r failed to update policed timestamp' % instance)
 
-        logger.debug("MODEL POLICY: finished run_policy_once()")
+    def noop(self, o,p):
+            pass
+
+    def run(self):
+        while (True):
+            start = time.time()
+            try:
+                self.run_policy_once()
+            except:
+                logger.log_exc("MODEL_POLICY: Exception in run()")
+            if (time.time() - start < 5):
+                time.sleep(5)
+
+    # TODO: This loop is different from the synchronizer event_loop, but they both do mostly the same thing. Look for
+    # ways to combine them.
+
+    def run_policy_once(self):
+            models = self.policies_by_class.keys()
+
+            logger.debug("MODEL POLICY: run_policy_once()")
+
+            model_accessor.check_db_connection_okay()
+
+            objects = model_accessor.fetch_policies(models, False)
+            deleted_objects = model_accessor.fetch_policies(models, True)
+
+            for o in objects:
+                if o.deleted:
+                    # This shouldn't happen, but previous code was examining o.deleted. Verify.
+                    continue
+                if not o.policed:
+                    self.execute_model_policy(o, "create")
+                else:
+                    self.execute_model_policy(o, "update")
+
+            for o in deleted_objects:
+                self.execute_model_policy(o, "delete")
+
+            try:
+                model_accessor.reset_queries()
+            except:
+                # this shouldn't happen, but in case it does, catch it...
+                logger.log_exc("MODEL POLICY: exception in reset_queries")
+
+            logger.debug("MODEL POLICY: finished run_policy_once()")
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
index 51c326e..0912ffb 100644
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ b/xos/synchronizers/new_base/modelaccessor.py
@@ -10,12 +10,15 @@
 
 import functools
 import os
+import signal
 from xos.config import Config
 from diag import update_diag
 
 from xos.logger import Logger, logging
 logger = Logger(level=logging.INFO)
 
+orig_sigint = None
+
 class ModelAccessor(object):
     def __init__(self):
         self.all_model_classes = self.get_all_model_classes()
@@ -165,8 +168,12 @@
     # Synchronizer framework isn't ready to embrace reactor yet...
     reactor.stop()
 
+    # Restore the sigint handler
+    signal.signal(signal.SIGINT, orig_sigint)
+
 def config_accessor():
     global model_accessor
+    global orig_sigint
 
     accessor_kind = getattr(Config(), "observer_accessor_kind", "django")
 
@@ -193,6 +200,10 @@
        grpcapi_client.set_reconnect_callback(functools.partial(grpcapi_reconnect, grpcapi_client, reactor))
        grpcapi_client.start()
 
+       # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
+
+       orig_sigint = signal.getsignal(signal.SIGINT)
+
        # Start reactor. This will cause the client to connect and then execute
        # grpcapi_callback().
 
diff --git a/xos/synchronizers/new_base/policy.py b/xos/synchronizers/new_base/policy.py
new file mode 100644
index 0000000..03a258b
--- /dev/null
+++ b/xos/synchronizers/new_base/policy.py
@@ -0,0 +1,22 @@
+""" policy.py
+
+    Base Classes for Model Policies
+"""
+
+from xos.logger import Logger, logging
+
+class Policy(object):
+    """ An XOS Model Policy
+
+        Set the class member model_name to the name of the model that this policy will act on.
+
+        The following functions will be invoked if they are defined:
+
+            handle_create ... called when a model is created
+            handle_update ... called when a model is updated
+            handle_delete ... called when a model is deleted
+    """
+
+    def __init__(self):
+        self.logger = Logger(level=logging.DEBUG)
+
diff --git a/xos/synchronizers/new_base/xos-policy.py b/xos/synchronizers/new_base/xos-policy.py
new file mode 100644
index 0000000..d2630ec
--- /dev/null
+++ b/xos/synchronizers/new_base/xos-policy.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+""" xos-policy.py
+
+    Standalone interface to model_policy engine.
+
+    Normally model policies are run by the synchronizer. This file allows them to be run independently as an aid
+    to development.
+"""
+
+import os
+import argparse
+import sys
+
+sys.path.append('/opt/xos')
+
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
+from xos.config import Config, DEFAULT_CONFIG_FN
+from xos.logger import Logger, logging, logger
+import time
+from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
+from synchronizers.new_base.modelaccessor import *
+
+config = Config()
+
+logger = Logger(level=logging.INFO)
+
+# after http://www.erlenstar.demon.co.uk/unix/faq_2.html
+def daemon():
+    """Daemonize the current process."""
+    if os.fork() != 0: os._exit(0)
+    os.setsid()
+    if os.fork() != 0: os._exit(0)
+    os.umask(0)
+    devnull = os.open(os.devnull, os.O_RDWR)
+    os.dup2(devnull, 0)
+    # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull
+    logdir=os.path.dirname(config.observer_logfile)
+    # when installed in standalone we might not have httpd installed
+    if not os.path.isdir(logdir): os.mkdir(logdir)
+    crashlog = os.open('%s'%config.observer_logfile, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)
+    os.dup2(crashlog, 1)
+    os.dup2(crashlog, 2)
+
+    if hasattr(config, "observer_pidfile"):
+        pidfile = config.get("observer_pidfile")
+    else:
+        pidfile = "/var/run/xosobserver.pid"
+    try:
+        file(pidfile,"w").write(str(os.getpid()))
+    except:
+        print "failed to create pidfile %s" % pidfile
+
+def main():
+    # Generate command line parser
+    parser = argparse.ArgumentParser(usage='%(prog)s [options]')
+    parser.add_argument('-d', '--daemon', dest='daemon', action='store_true', default=False,
+                        help='Run as daemon.')
+    # smbaker: util/config.py parses sys.argv[] directly to get config file name; include the option here to avoid
+    #   throwing unrecognized argument exceptions
+    parser.add_argument('-C', '--config', dest='config_file', action='store', default=DEFAULT_CONFIG_FN,
+                        help='Name of config file.')
+    args = parser.parse_args()
+
+    if args.daemon: daemon()
+
+    models_active = False
+    wait = False
+    while not models_active:
+        try:
+            _ = Instance.objects.first()
+            _ = NetworkTemplate.objects.first()
+            models_active = True
+        except Exception,e:
+            logger.info(str(e))
+            logger.info('Waiting for data model to come up before starting...')
+            time.sleep(10)
+            wait = True
+
+    if (wait):
+        time.sleep(60) # Safety factor, seeing that we stumbled waiting for the data model to come up.
+
+    # start model policies thread
+    policies_dir = Config().observer_model_policies_dir
+
+    XOSPolicyEngine(policies_dir=policies_dir).run()
+
+if __name__ == '__main__':
+    main()