SEBA-79 Introduce changed_by_policy and changed_by_step timestamps
Change-Id: I545e1fc28328eb8d0a620f422f961973f88caf8e
diff --git a/docs/dev/sync_arch.md b/docs/dev/sync_arch.md
index 5ab8770..82c7260 100644
--- a/docs/dev/sync_arch.md
+++ b/docs/dev/sync_arch.md
@@ -192,3 +192,28 @@
failure, the synchronizer core re-executes the actuator at a later time, and
then again at increasing intervals.
+## Timestamps
+
+XOS models come with a variety of timestamps. The first three timestamps indicate changes that occur to the models:
+
+* `Updated`. Updated is set whenever a model is saved by a non-synchronizer. For example, updating a model via the GUI or the REST API will cause the updated timestamp to be set. The updated timestamp is set regardless of whether or not any actual changes have occurred to the model. This allows a developer or operator to save a model and cause the model to be resynchronized.
+
+* `changed_by_step`. This timestamp is set whenever non-bookkeeping fields in the model are changed during the execution of a syncstep. If no changes occur during a save, then this timestamp is not set.
+
+* `changed_by_policy`. This timestamp is set whenever non-bookkeeping fields in the model are changed during the execution of a model policy. If no changes occur during a save, then this timestamp is not set.
+
+For a given model, if we take the maximum of the three timestamps, `max(model.updated, model.changed_by_step, model.changed.by_policy)`, we can use that calculation as an overall version of the substantive fields of the model. If a user updated the model, or if a policy or syncstep changed the model, then one of those timestamps will be updated.
+
+The following two timestamps are set when a sync or a model_policy is completed.
+
+* `enacted`. Enacted indicates the model has been successfully synced. It is set to `max(model.updated, model.changed_by_policy)`. The enacted timestamp does not indicate the time of the synchronization, but rather indicates the version of the data that was synchronized.
+
+* `policed`. Policed indicates the model has successfully had model policies applied. It is set to `max(model.updated, model.changed_by_step)`. The policed timestamp does not indicate the time the policy completed, but rather indicates the version of the data that had policies applied.
+
+The rules for running steps and policies are as follows:
+
+* Model policies are run if `model.updated > model.policed || model.changed_by_step > model.policed`. In other words, if a user updates the model, or a syncstep changes the model, then policies will run.
+
+* Sync steps are run if `model.udpated > model.enacted || model.changed_by_policy > model.enacted`. In other words, if a user updates the model, or a policy changes the model, then steps will be run.
+
+This means it is possible for a syncstep to trigger a policy, and it is possible for a policy to trigger a syncstep. A cycle is not necessarily bad assuming the cycle does eventually terminate in a steady state. Because the `changed_by_` timestamps are only set when a model changes (i.e. authoritative state change), and not merely when it is saved, simply no longer making changes to a model will break any cycle. It's recommended that developers do exercise caution when modifying models from both syncsteps and policies.
diff --git a/xos/core/models/core.xproto b/xos/core/models/core.xproto
index 7bffdac..f7fcbdd 100644
--- a/xos/core/models/core.xproto
+++ b/xos/core/models/core.xproto
@@ -9,10 +9,10 @@
option custom_header = "xosbase_header";
option abstract = True;
- required string created = 1 [content_type = "date", auto_now_add = True];
- required string updated = 2 [default = "now()", content_type = "date"];
- optional string enacted = 3 [null = True, content_type = "date", blank = True, default = None];
- optional string policed = 4 [null = True, content_type = "date", blank = True, default = None];
+ required string created = 1 [content_type = "date", auto_now_add = True, help_text = "Time this model was created"];
+ required string updated = 2 [default = "now()", content_type = "date", help_text = "Time this model was changed by a non-synchronizer"];
+ optional string enacted = 3 [null = True, content_type = "date", blank = True, default = None, help_text = "When synced, set to the timestamp of the data that was synced"];
+ optional string policed = 4 [null = True, content_type = "date", blank = True, default = None, help_text = "When policed, set to the timestamp of the data that was policed"];
optional string backend_register = 5 [default = "{}", max_length = 1024, feedback_state = True];
required bool backend_need_delete = 6 [default = False, blank = True];
required bool backend_need_reap = 7 [default = False, blank = True];
@@ -29,6 +29,8 @@
required bool backend_need_delete_policy = 18 [default = False, help_text = "True if delete model_policy must be run before object can be reaped", blank = True];
required bool xos_managed = 19 [default = True, help_text = "True if xos is responsible for creating/deleting this object", blank = True, gui_hidden = True];
optional string backend_handle = 20 [max_length = 1024, feedback_state = True, blank=True, null=True, help_text = "Handle used by the backend to track this object", gui_hidden = True];
+ optional string changed_by_step = 21 [null = True, content_type = "date", blank = True, default = None, gui_hidden = True, help_text = "Time this model was changed by a sync step"];
+ optional string changed_by_policy = 22 [null = True, content_type = "date", blank = True, default = None, gui_hidden = True, help_text = "Time this model was changed by a model policy"];
}
// The calling user represents the user being accessed, or is a site admin.
@@ -80,6 +82,11 @@
optional string policy_status = 32 [default = "0 - Policy in process", max_length = 1024];
optional int32 policy_code = 35 [default = 0];
required string leaf_model_name = 33 [null = False, max_length = 1024, help_text = "The most specialized model in this chain of inheritance, often defined by a service developer"];
+ required bool backend_need_delete_policy = 34 [default = False, help_text = "True if delete model_policy must be run before object can be reaped", blank = True];
+ required bool xos_managed = 35 [default = True, help_text = "True if xos is responsible for creating/deleting this object", blank = True, gui_hidden = True];
+ optional string backend_handle = 36 [max_length = 1024, feedback_state = True, blank=True, null=True, help_text = "Handle used by the backend to track this object", gui_hidden = True];
+ optional string changed_by_step = 37 [null = True, content_type = "date", blank = True, default = None, gui_hidden = True, help_text = "Time this model was changed by a sync step"];
+ optional string changed_by_policy = 38 [null = True, content_type = "date", blank = True, default = None, gui_hidden = True, help_text = "Time this model was changed by a model policy"];
}
// A user may give a permission that he has to another user
diff --git a/xos/core/models/user.py b/xos/core/models/user.py
index a2999bf..75c87a4 100644
--- a/xos/core/models/user.py
+++ b/xos/core/models/user.py
@@ -169,6 +169,18 @@
policy_status = models.CharField( default = "0 - Policy in process", max_length = 1024, null = True )
policy_code = models.IntegerField( default = 0, null = True )
+ backend_need_delete_policy = models.BooleanField(
+ help_text="True if delete model_policy must be run before object can be reaped", default=False, null=False,
+ blank=True)
+ xos_managed = models.BooleanField(help_text="True if xos is responsible for creating/deleting this object", default=True,
+ null=False, blank=True)
+ backend_handle = models.CharField(help_text="Handle used by the backend to track this object", max_length=1024, null=True,
+ blank=True)
+ changed_by_step = models.DateTimeField(default=None, help_text="Time this model was changed by a sync step", null=True,
+ blank=True)
+ changed_by_policy = models.DateTimeField(default=None, help_text="Time this model was changed by a model policy",
+ null=True, blank=True)
+
objects = UserManager()
deleted_objects = DeletedUserManager()
diff --git a/xos/core/models/xosbase.py b/xos/core/models/xosbase.py
index 8ccb2b8..0a67657 100644
--- a/xos/core/models/xosbase.py
+++ b/xos/core/models/xosbase.py
@@ -113,6 +113,20 @@
if getattr(f, "deleted", False):
raise Exception("Attempt to save object with deleted foreign key reference")
+ def has_important_changes(self):
+ """ Determine whether the model has changes that should be reflected in one of the changed_by_* timestampes.
+ Ignores varous feedback and bookeeeping state set by synchronizers.
+ """
+ for field_name in self.changed_fields:
+ if field_name in ["policed", "updated", "enacted", "changed_by_step", "changed_by_policy"]:
+ continue
+ if field_name.startswith("backend_"):
+ continue
+ if field_name.startswith("policy_"):
+ continue
+ return True
+ return False
+
def save(self, *args, **kwargs):
# let the user specify silence as either a kwarg or an instance varible
@@ -122,15 +136,28 @@
caller_kind = "unknown"
- if ('synchronizer' in threading.current_thread().name):
- caller_kind = "synchronizer"
-
if "caller_kind" in kwargs:
caller_kind = kwargs.pop("caller_kind")
+ update_fields = None
+ if "update_fields" in kwargs:
+ # NOTE(smbaker): modifying update_fields will cause kwargs["update_fiels"] to be modified. This is
+ # intended, as kwargs will be passed to save() below.
+ update_fields = kwargs["update_fields"]
+
+ # NOTE(smbaker): always_update_timestamp is deprecated, and will be removed when synchronizers are cleaned up.
always_update_timestamp = False
if "always_update_timestamp" in kwargs:
always_update_timestamp = always_update_timestamp or kwargs.pop("always_update_timestamp")
+ log.warning("always_update_timestamp is deprecated, was used on model", model=self)
+
+ is_sync_save = False
+ if "is_sync_save" in kwargs:
+ is_sync_save = kwargs.pop("is_sync_save")
+
+ is_policy_save = False
+ if "is_policy_save" in kwargs:
+ is_policy_save = kwargs.pop("is_policy_save")
# validate that only synchronizers can write feedback state
@@ -146,16 +173,6 @@
log.error('A non Synchronizer is trying to update fields marked as feedback_state', model=self._dict, feedback_state_fields=self.feedback_state_fields, caller_kind=caller_kind, feedback_changed=feedback_changed)
raise XOSPermissionDenied('A non Synchronizer is trying to update fields marked as feedback_state: %s' % feedback_changed)
- # SMBAKER: if an object is trying to delete itself, or if the observer
- # is updating an object's backend_* fields, then let it slip past the
- # composite key check.
- ignore_composite_key_check=False
- if "update_fields" in kwargs:
- ignore_composite_key_check=True
- for field in kwargs["update_fields"]:
- if not (field in ["backend_register", "backend_status", "deleted", "enacted", "updated"]):
- ignore_composite_key_check=False
-
# Django only enforces field.blank=False during form validation. We'd like it to be enforced when saving the
# model.
for field in self._meta.fields:
@@ -164,17 +181,28 @@
if getattr(self, field.name) == "":
raise XOSValidationError("Blank is not allowed on field %s" % field.name)
- if (caller_kind!="synchronizer") or always_update_timestamp:
+ if (caller_kind != "synchronizer") or always_update_timestamp:
+ # Non-synchronizers update the `updated` timestamp
self.updated = timezone.now()
else:
# We're not auto-setting timestamp, but let's check to make sure that the caller hasn't tried to set our
# timestamp backward...
- if (self.updated != self._initial["updated"]) and ((not kwargs.get("update_fields")) or ("updated" in kwargs.get("update_fields"))):
+ if (self.updated != self._initial["updated"]) and ((not update_fields) or ("updated" in update_fields)):
log.info("Synchronizer tried to change `updated` timestamp on model %s from %s to %s. Ignored." % (self, self._initial["updated"], self.updated))
self.updated = self._initial["updated"]
+ if is_sync_save and self.has_important_changes():
+ self.changed_by_step = timezone.now()
+ if update_fields:
+ update_fields.append("changed_by_step")
+
+ if is_policy_save and self.has_important_changes():
+ self.changed_by_policy = timezone.now()
+ if update_fields:
+ update_fields.append("changed_by_policy")
+
with transaction.atomic():
- self.verify_live_keys(update_fields = kwargs.get("update_fields"))
+ self.verify_live_keys(update_fields = update_fields)
super(XOSBase, self).save(*args, **kwargs)
self.push_redis_event()
diff --git a/xos/coreapi/apihelper.py b/xos/coreapi/apihelper.py
index cef1327..afe50b9 100644
--- a/xos/coreapi/apihelper.py
+++ b/xos/coreapi/apihelper.py
@@ -510,6 +510,10 @@
save_kwargs["caller_kind"] = v
elif k == "always_update_timestamp":
save_kwargs["always_update_timestamp"] = True
+ elif k == "is_sync_save":
+ save_kwargs["is_sync_save"] = True
+ elif k == "is_policy_save":
+ save_kwargs["is_policy_save"] = True
obj.save(**save_kwargs)
@@ -585,14 +589,14 @@
query = self.query_element_to_q(element)
queryset = djangoClass.objects.filter(query)
elif request.kind == request.SYNCHRONIZER_DIRTY_OBJECTS:
- query = (Q(enacted__lt=F('updated')) | Q(enacted=None)) & Q(
- lazy_blocked=False) & Q(no_sync=False)
+ query = (Q(enacted=None) | Q(enacted__lt=F('updated')) | Q(enacted__lt=F('changed_by_policy'))) \
+ & Q(lazy_blocked=False) & Q(no_sync=False)
queryset = djangoClass.objects.filter(query)
elif request.kind == request.SYNCHRONIZER_DELETED_OBJECTS:
queryset = djangoClass.deleted_objects.all()
elif request.kind == request.SYNCHRONIZER_DIRTY_POLICIES:
- query = (Q(policed__lt=F('updated')) | Q(
- policed=None)) & Q(no_policy=False)
+ query = (Q(policed=None) | Q(policed__lt=F('updated')) | Q(policed__lt=F('changed_by_step'))) \
+ & Q(no_policy=False)
queryset = djangoClass.objects.filter(query)
elif request.kind == request.SYNCHRONIZER_DELETED_POLICIES:
query = Q(policed__lt=F('updated')) | Q(policed=None)
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index 04cd41f..ee80f2b 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -137,6 +137,7 @@
if policies_dir:
policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log = self.log)
model_policy_thread = threading.Thread(target=policy_engine.run, name="policy_engine")
+ model_policy_thread.is_policy_thread=True
model_policy_thread.start()
else:
self.log.info("Skipping model policies thread due to no model_policies dir.")
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 54d3fef..4f44cc9 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -210,8 +210,6 @@
if step is None:
raise ExternalDependencyFailed
- new_enacted = model_accessor.now()
-
# Mark this as an object that will require delete. Do
# this now rather than after the syncstep,
if not (o.backend_need_delete):
@@ -230,7 +228,8 @@
model_accessor.update_diag(
syncrecord_start=time.time(), backend_status="Synced Record", backend_code=1)
- o.enacted = new_enacted
+
+ o.enacted = max(o.updated, o.changed_by_policy)
scratchpad = {'next_run': 0, 'exponent': 0,
'last_success': time.time()}
o.backend_register = json.dumps(scratchpad)
@@ -245,7 +244,7 @@
step.after_sync_save(o)
step.log = self.log
- log.info("Saved sync object, new enacted", enacted=new_enacted)
+ log.info("Saved sync object", o=o)
""" This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
@@ -331,6 +330,7 @@
pass
def sync_cohort(self, cohort, deletion):
+ threading.current_thread().is_sync_thread=True
log = self.log.bind(thread_id=threading.current_thread().ident)
try:
start_time = time.time()
diff --git a/xos/synchronizers/new_base/model_policy_loop.py b/xos/synchronizers/new_base/model_policy_loop.py
index b6f5ec4..14ca39c 100644
--- a/xos/synchronizers/new_base/model_policy_loop.py
+++ b/xos/synchronizers/new_base/model_policy_loop.py
@@ -111,7 +111,6 @@
# These are the models whose children get deleted when they are
delete_policy_models = ['Slice','Instance','Network']
sender_name = getattr(instance, "model_name", instance.__class__.__name__)
- new_policed = model_accessor.now()
#if (action != "deleted"):
# walk_inv_deps(self.update_dep, instance)
@@ -140,13 +139,16 @@
if not policies_failed:
try:
- instance.policed=new_policed
+ instance.policed = max(instance.updated, instance.changed_by_step)
instance.policy_status = "done"
instance.policy_code = 1
+
instance.save(update_fields=['policed', 'policy_status', 'policy_code'])
if hasattr(policy, "after_policy_save"):
policy().after_policy_save(instance)
+
+ log.info("MODEL_POLICY: Saved", o=instance)
except:
log.exception('MODEL POLICY: Object failed to update policed timestamp', instance =instance)
diff --git a/xos/xos_client/xosapi/orm.py b/xos/xos_client/xosapi/orm.py
index 32f8128..d0a2a31 100644
--- a/xos/xos_client/xosapi/orm.py
+++ b/xos/xos_client/xosapi/orm.py
@@ -37,6 +37,7 @@
import os
import sys
+import threading
import time
import imp
from xosconfig import Config
@@ -275,7 +276,7 @@
self.cache.clear()
self.reverse_cache.clear()
- def save(self, update_fields=None, always_update_timestamp=False):
+ def save(self, update_fields=None, always_update_timestamp=False, is_sync_save=False, is_policy_save=False):
if self.is_new:
new_class = self.stub.invoke("Create%s" % self._wrapped_class.__class__.__name__, self._wrapped_class)
self._wrapped_class = new_class
@@ -286,6 +287,10 @@
metadata.append( ("update_fields", ",".join(update_fields)) )
if always_update_timestamp:
metadata.append( ("always_update_timestamp", "1") )
+ if is_policy_save:
+ metadata.append( ("is_policy_save", "1") )
+ if is_sync_save:
+ metadata.append( ("is_sync_save", "1") )
self.stub.invoke("Update%s" % self._wrapped_class.__class__.__name__, self._wrapped_class, metadata=metadata)
self.do_post_save_fixups()
@@ -553,6 +558,14 @@
def add_default_metadata(self, metadata):
default_metadata = [ ("caller_kind", self.caller_kind) ]
+ # introspect to see if we're running from a synchronizer thread
+ if getattr(threading.current_thread(), "is_sync_thread", False):
+ default_metadata.append( ("is_sync_save", "1") )
+
+ # introspect to see if we're running from a model_policy thread
+ if getattr(threading.current_thread(), "is_policy_thread", False):
+ default_metadata.append( ("is_policy_save", "1") )
+
# build up a list of metadata keys we already have
md_keys=[x[0] for x in metadata]