CORD-1242 Generalize Model Policy Framework
Change-Id: Ic14c4f9be818dcbe0ea98ff562f05b093dda132c
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 9faddca..72de08f 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -6,6 +6,7 @@
import time
from synchronizers.new_base.syncstep import SyncStep
from synchronizers.new_base.event_loop import XOSObserver
+from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
from synchronizers.new_base.modelaccessor import *
from xos.logger import Logger, logging
from xos.config import Config
@@ -75,8 +76,8 @@
# start model policies thread
policies_dir = getattr(Config(), "observer_model_policies_dir", None)
if policies_dir:
- from synchronizers.new_base.model_policy_loop import run_policy
- model_policy_thread = threading.Thread(target=run_policy)
+ policy_engine = XOSPolicyEngine(policies_dir=policies_dir)
+ model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine")
model_policy_thread.start()
else:
model_policy_thread = None
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
index 61fb7c3..fe15f09 100644
--- a/xos/synchronizers/new_base/model_policy_loop.py
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -1,146 +1,156 @@
from synchronizers.new_base.modelaccessor import *
from synchronizers.new_base.dependency_walker_new import *
+from synchronizers.new_base.policy import Policy
from xos.logger import Logger, logging
+import imp
import pdb
import time
import traceback
-modelPolicyEnabled = True
-bad_instances=[]
-
-model_policies = {}
-
logger = Logger(level=logging.DEBUG)
-def EnableModelPolicy(x):
- global modelPolicyEnabled
- modelPolicyEnabled = x
+class XOSPolicyEngine(object):
+ def __init__(self, policies_dir):
+ self.model_policies = self.load_model_policies(policies_dir)
+ self.policies_by_name = {}
+ self.policies_by_class = {}
-def update_wp(d, o):
- try:
- save_fields = []
- if (d.write_protect != o.write_protect):
- d.write_protect = o.write_protect
- save_fields.append('write_protect')
- if (save_fields):
- d.save(update_fields=save_fields)
- except AttributeError,e:
- raise e
+ for policy in self.model_policies:
+ if not policy.model_name in self.policies_by_name:
+ self.policies_by_name[policy.model_name] = []
+ self.policies_by_name[policy.model_name].append(policy)
-def update_dep(d, o):
- try:
- print 'Trying to update %s'%d
- save_fields = []
- if (d.updated < o.updated):
- save_fields = ['updated']
+ if not policy.model in self.policies_by_class:
+ self.policies_by_class[policy.model] = []
+ self.policies_by_class[policy.model].append(policy)
- if (save_fields):
- d.save(update_fields=save_fields)
- except AttributeError,e:
- logger.log_exc("AttributeError in update_dep")
- raise e
- except Exception,e:
- logger.log_exc("Exception in update_dep")
+ def update_wp(self, d, o):
+ try:
+ save_fields = []
+ if (d.write_protect != o.write_protect):
+ d.write_protect = o.write_protect
+ save_fields.append('write_protect')
+ if (save_fields):
+ d.save(update_fields=save_fields)
+ except AttributeError,e:
+ raise e
-def delete_if_inactive(d, o):
- try:
- d.delete()
- print "Deleted %s (%s)"%(d,d.__class__.__name__)
- except:
- pass
- return
+ def update_dep(self, d, o):
+ try:
+ print 'Trying to update %s'%d
+ save_fields = []
+ if (d.updated < o.updated):
+ save_fields = ['updated']
-def load_model_policies(policies_dir=None):
- global model_policies
+ if (save_fields):
+ d.save(update_fields=save_fields)
+ except AttributeError,e:
+ logger.log_exc("AttributeError in update_dep")
+ raise e
+ except Exception,e:
+ logger.log_exc("Exception in update_dep")
- if policies_dir is None:
- policies_dir = Config().observer_model_policies_dir
-
- for fn in os.listdir(policies_dir):
- pathname = os.path.join(policies_dir,fn)
- if os.path.isfile(pathname) and fn.startswith("model_policy_") and fn.endswith(".py") and (fn!="__init__.py"):
- model_policies[fn[:-3]] = imp.load_source(fn[:-3],pathname)
-
- logger.debug("Loaded model polices %s from %s" % (",".join(model_policies.keys()), policies_dir))
-
-def execute_model_policy(instance, deleted):
- # Automatic dirtying
- if (instance in bad_instances):
+ def delete_if_inactive(self, d, o):
+ try:
+ d.delete()
+ print "Deleted %s (%s)"%(d,d.__class__.__name__)
+ except:
+ pass
return
- # These are the models whose children get deleted when they are
- delete_policy_models = ['Slice','Instance','Network']
- sender_name = getattr(instance, "model_name", instance.__class__.__name__)
- policy_name = 'model_policy_%s'%sender_name
- noargs = False
+ def load_model_policies(self, policies_dir):
+ policies=[]
+ for fn in os.listdir(policies_dir):
+ pathname = os.path.join(policies_dir,fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+ module = imp.load_source(fn[:-3], pathname)
+ for classname in dir(module):
+ c = getattr(module, classname, None)
- if (not deleted):
- walk_inv_deps(update_dep, instance)
- walk_deps(update_wp, instance)
- elif (sender_name in delete_policy_models):
- walk_inv_deps(delete_if_inactive, instance)
+ # make sure 'c' is a descendent of Policy and has a
+ # provides field (this eliminates the abstract base classes
+ # since they don't have a provides)
- try:
- policy_handler = model_policies.get(policy_name, None)
- logger.debug("MODEL POLICY: handler %s %s" % (policy_name, policy_handler))
- if policy_handler is not None:
- if (deleted):
+ if inspect.isclass(c) and issubclass(c, Policy) and hasattr(c, "model_name") and (
+ c not in policies):
+ if not model_accessor.has_model_class(c.model_name):
+ logger.error("load_model_policies: unable to find model policy %s" % c.model_name)
+ c.model = model_accessor.get_model_class(c.model_name)
+ policies.append(c)
+
+ logger.info("Loaded %s model policies" % len(policies))
+ return policies
+
+ def execute_model_policy(self, instance, action):
+ # These are the models whose children get deleted when they are
+ delete_policy_models = ['Slice','Instance','Network']
+ sender_name = getattr(instance, "model_name", instance.__class__.__name__)
+
+ #if (action != "deleted"):
+ # walk_inv_deps(self.update_dep, instance)
+ # walk_deps(self.update_wp, instance)
+ #elif (sender_name in delete_policy_models):
+ # walk_inv_deps(self.delete_if_inactive, instance)
+
+ for policy in self.policies_by_name.get(sender_name, None):
+ method_name= "handle_%s" % action
+ if hasattr(policy, method_name):
try:
- policy_handler.handle_delete(instance)
- except AttributeError:
- pass
- else:
- policy_handler.handle(instance)
- logger.debug("MODEL POLICY: completed handler %s %s" % (policy_name, policy_handler))
- except:
- logger.log_exc("MODEL POLICY: Exception when running handler")
-
- try:
- instance.policed=model_accessor.now()
- instance.save(update_fields=['policed'])
- except:
- logger.log_exc('MODEL POLICY: Object %r is defective'%instance)
- bad_instances.append(instance)
-
-def noop(o,p):
- pass
-
-def run_policy():
- load_model_policies()
-
- while (True):
- start = time.time()
- try:
- run_policy_once()
- except:
- logger.log_exc("MODEL_POLICY: Exception in run_policy()")
- if (time.time()-start<1):
- time.sleep(1)
-
-def run_policy_once():
- # TODO: Core-specific model list is hardcoded here. These models should
- # be learned from the model_policy files, not hardcoded.
-
- models = [Controller, Site, SitePrivilege, Image, ControllerSlice, ControllerSite, ControllerUser, User, Slice, Network, Instance, SlicePrivilege]
-
- logger.debug("MODEL POLICY: run_policy_once()")
-
- model_accessor.check_db_connection_okay()
-
- objects = model_accessor.fetch_policies(models, False)
- deleted_objects = model_accessor.fetch_policies(models, True)
-
- for o in objects:
- execute_model_policy(o, o.deleted)
-
- for o in deleted_objects:
- execute_model_policy(o, True)
+ logger.debug("MODEL POLICY: calling handler %s %s %s %s" % (sender_name, instance, policy.__name__, method_name))
+ getattr(policy(), method_name)(instance)
+ logger.debug("MODEL POLICY: completed handler %s %s %s %s" % (sender_name, instance, policy.__name__, method_name))
+ except:
+ logger.log_exc("MODEL POLICY: Exception when running handler")
try:
- model_accessor.reset_queries()
+ instance.policed=model_accessor.now()
+ instance.save(update_fields=['policed'])
except:
- # this shouldn't happen, but in case it does, catch it...
- logger.log_exc("MODEL POLICY: exception in reset_queries")
+ logger.log_exc('MODEL POLICY: Object %r failed to update policed timestamp' % instance)
- logger.debug("MODEL POLICY: finished run_policy_once()")
+ def noop(self, o,p):
+ pass
+
+ def run(self):
+ while (True):
+ start = time.time()
+ try:
+ self.run_policy_once()
+ except:
+ logger.log_exc("MODEL_POLICY: Exception in run()")
+ if (time.time() - start < 5):
+ time.sleep(5)
+
+ # TODO: This loop is different from the synchronizer event_loop, but they both do mostly the same thing. Look for
+ # ways to combine them.
+
+ def run_policy_once(self):
+ models = self.policies_by_class.keys()
+
+ logger.debug("MODEL POLICY: run_policy_once()")
+
+ model_accessor.check_db_connection_okay()
+
+ objects = model_accessor.fetch_policies(models, False)
+ deleted_objects = model_accessor.fetch_policies(models, True)
+
+ for o in objects:
+ if o.deleted:
+ # This shouldn't happen, but previous code was examining o.deleted. Verify.
+ continue
+ if not o.policed:
+ self.execute_model_policy(o, "create")
+ else:
+ self.execute_model_policy(o, "update")
+
+ for o in deleted_objects:
+ self.execute_model_policy(o, "delete")
+
+ try:
+ model_accessor.reset_queries()
+ except:
+ # this shouldn't happen, but in case it does, catch it...
+ logger.log_exc("MODEL POLICY: exception in reset_queries")
+
+ logger.debug("MODEL POLICY: finished run_policy_once()")
diff --git a/xos/synchronizers/new_base/modelaccessor.py b/xos/synchronizers/new_base/modelaccessor.py
index 51c326e..0912ffb 100644
--- a/xos/synchronizers/new_base/modelaccessor.py
+++ b/xos/synchronizers/new_base/modelaccessor.py
@@ -10,12 +10,15 @@
import functools
import os
+import signal
from xos.config import Config
from diag import update_diag
from xos.logger import Logger, logging
logger = Logger(level=logging.INFO)
+orig_sigint = None
+
class ModelAccessor(object):
def __init__(self):
self.all_model_classes = self.get_all_model_classes()
@@ -165,8 +168,12 @@
# Synchronizer framework isn't ready to embrace reactor yet...
reactor.stop()
+ # Restore the sigint handler
+ signal.signal(signal.SIGINT, orig_sigint)
+
def config_accessor():
global model_accessor
+ global orig_sigint
accessor_kind = getattr(Config(), "observer_accessor_kind", "django")
@@ -193,6 +200,10 @@
grpcapi_client.set_reconnect_callback(functools.partial(grpcapi_reconnect, grpcapi_client, reactor))
grpcapi_client.start()
+ # Reactor will take over SIGINT during reactor.run(), but does not return it when reactor.stop() is called.
+
+ orig_sigint = signal.getsignal(signal.SIGINT)
+
# Start reactor. This will cause the client to connect and then execute
# grpcapi_callback().
diff --git a/xos/synchronizers/new_base/policy.py b/xos/synchronizers/new_base/policy.py
new file mode 100644
index 0000000..03a258b
--- /dev/null
+++ b/xos/synchronizers/new_base/policy.py
@@ -0,0 +1,22 @@
+""" policy.py
+
+ Base Classes for Model Policies
+"""
+
+from xos.logger import Logger, logging
+
+class Policy(object):
+ """ An XOS Model Policy
+
+ Set the class member model_name to the name of the model that this policy will act on.
+
+ The following functions will be invoked if they are defined:
+
+ handle_create ... called when a model is created
+ handle_update ... called when a model is updated
+ handle_delete ... called when a model is deleted
+ """
+
+ def __init__(self):
+ self.logger = Logger(level=logging.DEBUG)
+
diff --git a/xos/synchronizers/new_base/xos-policy.py b/xos/synchronizers/new_base/xos-policy.py
new file mode 100644
index 0000000..d2630ec
--- /dev/null
+++ b/xos/synchronizers/new_base/xos-policy.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+
+""" xos-policy.py
+
+ Standalone interface to model_policy engine.
+
+ Normally model policies are run by the synchronizer. This file allows them to be run independently as an aid
+ to development.
+"""
+
+import os
+import argparse
+import sys
+
+sys.path.append('/opt/xos')
+
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
+from xos.config import Config, DEFAULT_CONFIG_FN
+from xos.logger import Logger, logging, logger
+import time
+from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
+from synchronizers.new_base.modelaccessor import *
+
+config = Config()
+
+logger = Logger(level=logging.INFO)
+
+# after http://www.erlenstar.demon.co.uk/unix/faq_2.html
+def daemon():
+ """Daemonize the current process."""
+ if os.fork() != 0: os._exit(0)
+ os.setsid()
+ if os.fork() != 0: os._exit(0)
+ os.umask(0)
+ devnull = os.open(os.devnull, os.O_RDWR)
+ os.dup2(devnull, 0)
+ # xxx fixme - this is just to make sure that nothing gets stupidly lost - should use devnull
+ logdir=os.path.dirname(config.observer_logfile)
+ # when installed in standalone we might not have httpd installed
+ if not os.path.isdir(logdir): os.mkdir(logdir)
+ crashlog = os.open('%s'%config.observer_logfile, os.O_RDWR | os.O_APPEND | os.O_CREAT, 0644)
+ os.dup2(crashlog, 1)
+ os.dup2(crashlog, 2)
+
+ if hasattr(config, "observer_pidfile"):
+ pidfile = config.get("observer_pidfile")
+ else:
+ pidfile = "/var/run/xosobserver.pid"
+ try:
+ file(pidfile,"w").write(str(os.getpid()))
+ except:
+ print "failed to create pidfile %s" % pidfile
+
+def main():
+ # Generate command line parser
+ parser = argparse.ArgumentParser(usage='%(prog)s [options]')
+ parser.add_argument('-d', '--daemon', dest='daemon', action='store_true', default=False,
+ help='Run as daemon.')
+ # smbaker: util/config.py parses sys.argv[] directly to get config file name; include the option here to avoid
+ # throwing unrecognized argument exceptions
+ parser.add_argument('-C', '--config', dest='config_file', action='store', default=DEFAULT_CONFIG_FN,
+ help='Name of config file.')
+ args = parser.parse_args()
+
+ if args.daemon: daemon()
+
+ models_active = False
+ wait = False
+ while not models_active:
+ try:
+ _ = Instance.objects.first()
+ _ = NetworkTemplate.objects.first()
+ models_active = True
+ except Exception,e:
+ logger.info(str(e))
+ logger.info('Waiting for data model to come up before starting...')
+ time.sleep(10)
+ wait = True
+
+ if (wait):
+ time.sleep(60) # Safety factor, seeing that we stumbled waiting for the data model to come up.
+
+ # start model policies thread
+ policies_dir = Config().observer_model_policies_dir
+
+ XOSPolicyEngine(policies_dir=policies_dir).run()
+
+if __name__ == '__main__':
+ main()