SEBA-405 Convert synchronizer framework to library
Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
new file mode 100644
index 0000000..96ce727
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -0,0 +1,772 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TODO:
+# Add unit tests:
+# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
+
+import time
+import threading
+import json
+
+from collections import defaultdict
+from networkx import (
+ DiGraph,
+ weakly_connected_component_subgraphs,
+ all_shortest_paths,
+ NetworkXNoPath,
+)
+from networkx.algorithms.dag import topological_sort
+
+from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
+from xossynchronizer.modelaccessor import *
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class StepNotReady(Exception):
+ pass
+
+
+class ExternalDependencyFailed(Exception):
+ pass
+
+
+# FIXME: Move drivers into a context shared across sync steps.
+
+
+class NoOpDriver:
+ def __init__(self):
+ self.enabled = True
+ self.dependency_graph = None
+
+
+# Everyone gets NoOpDriver by default. To use a different driver, call
+# set_driver() below.
+DRIVER = NoOpDriver()
+
+DIRECT_EDGE = 1
+PROXY_EDGE = 2
+
+
+def set_driver(x):
+ global DRIVER
+ DRIVER = x
+
+
+class XOSObserver(object):
+ sync_steps = []
+
+ def __init__(self, sync_steps, log=log):
+ # The Condition object via which events are received
+ self.log = log
+
+ self.step_lookup = {}
+ self.sync_steps = sync_steps
+ self.load_sync_steps()
+
+ self.load_dependency_graph()
+
+ self.event_cond = threading.Condition()
+
+ self.driver = DRIVER
+ self.observer_name = Config.get("name")
+
+ def wait_for_event(self, timeout):
+ self.event_cond.acquire()
+ self.event_cond.wait(timeout)
+ self.event_cond.release()
+
+ def wake_up(self):
+ self.log.debug("Wake up routine called")
+ self.event_cond.acquire()
+ self.event_cond.notify()
+ self.event_cond.release()
+
+ def load_dependency_graph(self):
+
+ try:
+ if Config.get("dependency_graph"):
+ self.log.trace(
+ "Loading model dependency graph",
+ path=Config.get("dependency_graph"),
+ )
+ dep_graph_str = open(Config.get("dependency_graph")).read()
+ else:
+ self.log.trace("Using default model dependency graph", graph={})
+ dep_graph_str = "{}"
+
+ # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
+ # src_port is the field that accesses Model2 from Model1
+ # dst_port is the field that accesses Model1 from Model2
+ static_dependencies = json.loads(dep_graph_str)
+ dynamic_dependencies = self.compute_service_dependencies()
+
+ joint_dependencies = dict(
+ static_dependencies.items() + dynamic_dependencies
+ )
+
+ model_dependency_graph = DiGraph()
+ for src_model, deps in joint_dependencies.items():
+ for dep in deps:
+ dst_model, src_accessor, dst_accessor = dep
+ if src_model != dst_model:
+ edge_label = {
+ "src_accessor": src_accessor,
+ "dst_accessor": dst_accessor,
+ }
+ model_dependency_graph.add_edge(
+ src_model, dst_model, edge_label
+ )
+
+ model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
+ self.model_dependency_graph = {
+ # deletion
+ True: model_dependency_graph_rev,
+ False: model_dependency_graph,
+ }
+ self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
+ except Exception as e:
+ self.log.exception("Error loading dependency graph", e=e)
+ raise e
+
+ def load_sync_steps(self):
+ model_to_step = defaultdict(list)
+ external_dependencies = []
+
+ for s in self.sync_steps:
+ if not isinstance(s.observes, list):
+ observes = [s.observes]
+ else:
+ observes = s.observes
+
+ for m in observes:
+ model_to_step[m.__name__].append(s.__name__)
+
+ try:
+ external_dependencies.extend(s.external_dependencies)
+ except AttributeError:
+ pass
+
+ self.step_lookup[s.__name__] = s
+
+ self.model_to_step = model_to_step
+ self.external_dependencies = list(set(external_dependencies))
+ self.log.info(
+ "Loaded external dependencies", external_dependencies=external_dependencies
+ )
+ self.log.info("Loaded model_map", **model_to_step)
+
+ def reset_model_accessor(self, o=None):
+ try:
+ model_accessor.reset_queries()
+ except BaseException:
+ # this shouldn't happen, but in case it does, catch it...
+ if o:
+ logdict = o.tologdict()
+ else:
+ logdict = {}
+
+ self.log.error("exception in reset_queries", **logdict)
+
+ def delete_record(self, o, dr_log=None):
+
+ if dr_log is None:
+ dr_log = self.log
+
+ if getattr(o, "backend_need_reap", False):
+ # the object has already been deleted and marked for reaping
+ model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
+ else:
+ step = getattr(o, "synchronizer_step", None)
+ if not step:
+ raise ExternalDependencyFailed
+
+ model_accessor.journal_object(o, "syncstep.call.delete_record")
+
+ dr_log.debug("Deleting object", **o.tologdict())
+
+ step.log = dr_log.new(step=step)
+ step.delete_record(o)
+ step.log = dr_log
+
+ dr_log.debug("Deleted object", **o.tologdict())
+
+ model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+ o.backend_need_reap = True
+ o.save(update_fields=["backend_need_reap"])
+
+ def sync_record(self, o, sr_log=None):
+ try:
+ step = o.synchronizer_step
+ except AttributeError:
+ step = None
+
+ if step is None:
+ raise ExternalDependencyFailed
+
+ if sr_log is None:
+ sr_log = self.log
+
+ # Mark this as an object that will require delete. Do
+ # this now rather than after the syncstep,
+ if not (o.backend_need_delete):
+ o.backend_need_delete = True
+ o.save(update_fields=["backend_need_delete"])
+
+ model_accessor.journal_object(o, "syncstep.call.sync_record")
+
+ sr_log.debug("Syncing object", **o.tologdict())
+
+ step.log = sr_log.new(step=step)
+ step.sync_record(o)
+ step.log = sr_log
+
+ sr_log.debug("Synced object", **o.tologdict())
+
+ o.enacted = max(o.updated, o.changed_by_policy)
+ scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
+ o.backend_register = json.dumps(scratchpad)
+ o.backend_status = "OK"
+ o.backend_code = 1
+ model_accessor.journal_object(o, "syncstep.call.save_update")
+ o.save(
+ update_fields=[
+ "enacted",
+ "backend_status",
+ "backend_register",
+ "backend_code",
+ ]
+ )
+
+ if hasattr(step, "after_sync_save"):
+ step.log = sr_log.new(step=step)
+ step.after_sync_save(o)
+ step.log = sr_log
+
+ sr_log.info("Saved sync object", o=o)
+
+ """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
+
+ def handle_sync_exception(self, o, e):
+ self.log.exception("sync step failed!", e=e, **o.tologdict())
+ current_code = o.backend_code
+
+ if hasattr(e, "message"):
+ status = str(e.message)
+ else:
+ status = str(e)
+
+ if isinstance(e, InnocuousException):
+ code = 1
+ elif isinstance(e, DeferredException):
+ # NOTE if the synchronization is Deferred it means that synchronization is still in progress
+ code = 0
+ else:
+ code = 2
+
+ self.set_object_error(o, status, code)
+
+ dependency_error = "Failed due to error in model %s id %d: %s" % (
+ o.leaf_model_name,
+ o.id,
+ status,
+ )
+ return dependency_error, code
+
+ def set_object_error(self, o, status, code):
+ if o.backend_status:
+ error_list = o.backend_status.split(" // ")
+ else:
+ error_list = []
+
+ if status not in error_list:
+ error_list.append(status)
+
+ # Keep last two errors
+ error_list = error_list[-2:]
+
+ o.backend_code = code
+ o.backend_status = " // ".join(error_list)
+
+ try:
+ scratchpad = json.loads(o.backend_register)
+ scratchpad["exponent"]
+ except BaseException:
+ scratchpad = {
+ "next_run": 0,
+ "exponent": 0,
+ "last_success": time.time(),
+ "failures": 0,
+ }
+
+ # Second failure
+ if scratchpad["exponent"]:
+ if code == 1:
+ delay = scratchpad["exponent"] * 60 # 1 minute
+ else:
+ delay = scratchpad["exponent"] * 600 # 10 minutes
+
+ # cap delays at 8 hours
+ if delay > 8 * 60 * 60:
+ delay = 8 * 60 * 60
+ scratchpad["next_run"] = time.time() + delay
+
+ scratchpad["exponent"] += 1
+
+ try:
+ scratchpad["failures"] += 1
+ except KeyError:
+ scratchpad["failures"] = 1
+
+ scratchpad["last_failure"] = time.time()
+
+ o.backend_register = json.dumps(scratchpad)
+
+ # TOFIX:
+ # DatabaseError: value too long for type character varying(140)
+ if model_accessor.obj_exists(o):
+ try:
+ o.backend_status = o.backend_status[:1024]
+ o.save(
+ update_fields=["backend_status", "backend_register"],
+ always_update_timestamp=True,
+ )
+ except BaseException as e:
+ self.log.exception("Could not update backend status field!", e=e)
+ pass
+
+ def sync_cohort(self, cohort, deletion):
+ threading.current_thread().is_sync_thread = True
+
+ sc_log = self.log.new(thread_id=threading.current_thread().ident)
+
+ try:
+ start_time = time.time()
+ sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
+
+ cohort_emptied = False
+ dependency_error = None
+ dependency_error_code = None
+
+ itty = iter(cohort)
+
+ while not cohort_emptied:
+ try:
+ self.reset_model_accessor()
+ o = next(itty)
+
+ if dependency_error:
+ self.set_object_error(
+ o, dependency_error, dependency_error_code
+ )
+ continue
+
+ try:
+ if deletion:
+ self.delete_record(o, sc_log)
+ else:
+ self.sync_record(o, sc_log)
+ except ExternalDependencyFailed:
+ dependency_error = (
+ "External dependency on object %s id %d not met"
+ % (o.leaf_model_name, o.id)
+ )
+ dependency_error_code = 1
+ except (DeferredException, InnocuousException, Exception) as e:
+ dependency_error, dependency_error_code = self.handle_sync_exception(
+ o, e
+ )
+
+ except StopIteration:
+ sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
+ cohort_emptied = True
+ finally:
+ self.reset_model_accessor()
+ model_accessor.connection_close()
+
+ def tenant_class_name_from_service(self, service_name):
+ """ This code supports legacy functionality. To be cleaned up. """
+ name1 = service_name + "Instance"
+ if hasattr(Slice().stub, name1):
+ return name1
+ else:
+ name2 = service_name.replace("Service", "Tenant")
+ if hasattr(Slice().stub, name2):
+ return name2
+ else:
+ return None
+
+ def compute_service_dependencies(self):
+ """ FIXME: Implement more cleanly via xproto """
+
+ model_names = self.model_to_step.keys()
+ ugly_tuples = [
+ (m, m.replace("Instance", "").replace("Tenant", "Service"))
+ for m in model_names
+ if m.endswith("ServiceInstance") or m.endswith("Tenant")
+ ]
+ ugly_rtuples = [(v, k) for k, v in ugly_tuples]
+
+ ugly_map = dict(ugly_tuples)
+ ugly_rmap = dict(ugly_rtuples)
+
+ s_model_names = [v for k, v in ugly_tuples]
+ s_models0 = [
+ getattr(Slice().stub, model_name, None) for model_name in s_model_names
+ ]
+ s_models1 = [model.objects.first() for model in s_models0]
+ s_models = [m for m in s_models1 if m is not None]
+
+ dependencies = []
+ for model in s_models:
+ deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
+ if deps:
+ services = [
+ self.tenant_class_name_from_service(
+ d.provider_service.leaf_model_name
+ )
+ for d in deps
+ ]
+ dependencies.append(
+ (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
+ )
+
+ return dependencies
+
+ def compute_service_instance_dependencies(self, objects):
+ link_set = [
+ ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
+ for o in objects
+ ]
+
+ dependencies = [
+ (l.provider_service_instance, l.subscriber_service_instance)
+ for links in link_set
+ for l in links
+ ]
+ providers = []
+
+ for p, s in dependencies:
+ if not p.enacted or p.enacted < p.updated:
+ p.dependent = s
+ providers.append(p)
+
+ return providers
+
+ def run(self):
+ # Cleanup: Move self.driver into a synchronizer context
+ # made available to every sync step.
+ if not self.driver.enabled:
+ self.log.warning("Driver is not enabled. Not running sync steps.")
+ return
+
+ while True:
+ self.log.trace("Waiting for event or timeout")
+ self.wait_for_event(timeout=5)
+ self.log.trace("Synchronizer awake")
+
+ self.run_once()
+
+ def fetch_pending(self, deletion=False):
+ unique_model_list = list(set(self.model_to_step.keys()))
+ pending_objects = []
+ pending_steps = []
+ step_list = self.step_lookup.values()
+
+ for e in self.external_dependencies:
+ s = SyncStep
+ s.observes = e
+ step_list.append(s)
+
+ for step_class in step_list:
+ step = step_class(driver=self.driver)
+ step.log = self.log.new(step=step)
+
+ if not hasattr(step, "call"):
+ pending = step.fetch_pending(deletion)
+ for obj in pending:
+ step = step_class(driver=self.driver)
+ step.log = self.log.new(step=step)
+ obj.synchronizer_step = step
+
+ pending_service_dependencies = self.compute_service_instance_dependencies(
+ pending
+ )
+
+ for obj in pending_service_dependencies:
+ obj.synchronizer_step = None
+
+ pending_objects.extend(pending)
+ pending_objects.extend(pending_service_dependencies)
+ else:
+ # Support old and broken legacy synchronizers
+ # This needs to be dropped soon.
+ pending_steps.append(step)
+
+ self.log.trace(
+ "Fetched pending data",
+ pending_objects=pending_objects,
+ legacy_steps=pending_steps,
+ )
+ return pending_objects, pending_steps
+
+ def linked_objects(self, o):
+ if o is None:
+ return [], None
+ try:
+ o_lst = [o for o in o.all()]
+ edge_type = PROXY_EDGE
+ except (AttributeError, TypeError):
+ o_lst = [o]
+ edge_type = DIRECT_EDGE
+ return o_lst, edge_type
+
+ """ Automatically test if a real dependency path exists between two objects. e.g.
+ given an Instance, and a ControllerSite, the test amounts to:
+ instance.slice.site == controller.site
+
+ Then the two objects are related, and should be put in the same cohort.
+ If the models of the two objects are not dependent, then the check trivially
+ returns False.
+ """
+
+ def same_object(self, o1, o2):
+ if not o1 or not o2:
+ return False, None
+
+ o1_lst, edge_type = self.linked_objects(o1)
+
+ try:
+ found = next(
+ obj
+ for obj in o1_lst
+ if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
+ )
+ except AttributeError as e:
+ self.log.exception("Compared objects could not be identified", e=e)
+ raise e
+ except StopIteration:
+ # This is a temporary workaround to establish dependencies between
+ # deleted proxy objects. A better solution would be for the ORM to
+ # return the set of deleted objects via foreign keys. At that point,
+ # the following line would change back to found = False
+ # - Sapan
+
+ found = getattr(o2, "deleted", False)
+
+ return found, edge_type
+
+ def concrete_path_exists(self, o1, o2):
+ try:
+ m1 = o1.leaf_model_name
+ m2 = o2.leaf_model_name
+ except AttributeError:
+ # One of the nodes is not in the dependency graph
+ # No dependency
+ return False, None
+
+ if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
+ return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
+
+ # FIXME: Dynamic dependency check
+ G = self.model_dependency_graph[False]
+ paths = all_shortest_paths(G, m1, m2)
+
+ try:
+ any(paths)
+ paths = all_shortest_paths(G, m1, m2)
+ except NetworkXNoPath:
+ # Easy. The two models are unrelated.
+ return False, None
+
+ for p in paths:
+ src_object = o1
+ edge_type = DIRECT_EDGE
+
+ for i in range(len(p) - 1):
+ src = p[i]
+ dst = p[i + 1]
+ edge_label = G[src][dst]
+ sa = edge_label["src_accessor"]
+ try:
+ dst_accessor = getattr(src_object, sa)
+ dst_objects, link_edge_type = self.linked_objects(dst_accessor)
+ if link_edge_type == PROXY_EDGE:
+ edge_type = link_edge_type
+
+ """
+
+ True If no linked objects and deletion
+ False If no linked objects
+ True If multiple linked objects
+ <continue traversal> If single linked object
+
+ """
+
+ if dst_objects == []:
+ # Workaround for ORM not returning linked deleted
+ # objects
+ if o2.deleted:
+ return True, edge_type
+ else:
+ dst_object = None
+ elif len(dst_objects) > 1:
+ # Multiple linked objects. Assume anything could be among those multiple objects.
+ raise AttributeError
+ else:
+ dst_object = dst_objects[0]
+ except AttributeError as e:
+ if sa != "fake_accessor":
+ self.log.debug(
+ "Could not check object dependencies, making conservative choice %s",
+ e,
+ src_object=src_object,
+ sa=sa,
+ o1=o1,
+ o2=o2,
+ )
+ return True, edge_type
+
+ src_object = dst_object
+
+ if not src_object:
+ break
+
+ verdict, edge_type = self.same_object(src_object, o2)
+ if verdict:
+ return verdict, edge_type
+
+ # Otherwise try other paths
+
+ return False, None
+
+ """
+
+ This function implements the main scheduling logic
+ of the Synchronizer. It divides incoming work (dirty objects)
+ into cohorts of dependent objects, and runs each such cohort
+ in its own thread.
+
+ Future work:
+
+ * Run event thread in parallel to the scheduling thread, and
+ add incoming objects to existing cohorts. Doing so should
+ greatly improve synchronizer performance.
+ * A single object might need to be added to multiple cohorts.
+ In this case, the last cohort handles such an object.
+ * This algorithm is horizontal-scale-ready. Multiple synchronizers
+ could run off a shared runqueue of cohorts.
+
+ """
+
+ def compute_dependent_cohorts(self, objects, deletion):
+ model_map = defaultdict(list)
+ n = len(objects)
+ r = range(n)
+ indexed_objects = zip(r, objects)
+
+ oG = DiGraph()
+
+ for i in r:
+ oG.add_node(i)
+
+ try:
+ for i0 in range(n):
+ for i1 in range(n):
+ if i0 != i1:
+ if deletion:
+ path_args = (objects[i1], objects[i0])
+ else:
+ path_args = (objects[i0], objects[i1])
+
+ is_connected, edge_type = self.concrete_path_exists(*path_args)
+ if is_connected:
+ try:
+ edge_type = oG[i1][i0]["type"]
+ if edge_type == PROXY_EDGE:
+ oG.remove_edge(i1, i0)
+ oG.add_edge(i0, i1, {"type": edge_type})
+ except KeyError:
+ oG.add_edge(i0, i1, {"type": edge_type})
+ except KeyError:
+ pass
+
+ components = weakly_connected_component_subgraphs(oG)
+ cohort_indexes = [reversed(topological_sort(g)) for g in components]
+ cohorts = [
+ [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
+ ]
+
+ return cohorts
+
+ def run_once(self):
+ self.load_dependency_graph()
+
+ try:
+ # Why are we checking the DB connection here?
+ model_accessor.check_db_connection_okay()
+
+ loop_start = time.time()
+
+ # Two passes. One for sync, the other for deletion.
+ for deletion in (False, True):
+ objects_to_process = []
+
+ objects_to_process, steps_to_process = self.fetch_pending(deletion)
+ dependent_cohorts = self.compute_dependent_cohorts(
+ objects_to_process, deletion
+ )
+
+ threads = []
+ self.log.trace("In run once inner loop", deletion=deletion)
+
+ for cohort in dependent_cohorts:
+ thread = threading.Thread(
+ target=self.sync_cohort,
+ name="synchronizer",
+ args=(cohort, deletion),
+ )
+
+ threads.append(thread)
+
+ # Start threads
+ for t in threads:
+ t.start()
+
+ self.reset_model_accessor()
+
+ # Wait for all threads to finish before continuing with the run
+ # loop
+ for t in threads:
+ t.join()
+
+ # Run legacy synchronizers, which do everything in call()
+ for step in steps_to_process:
+ try:
+ step.call(deletion=deletion)
+ except Exception as e:
+ self.log.exception("Legacy step failed", step=step, e=e)
+
+ loop_end = time.time()
+
+ except Exception as e:
+ self.log.exception(
+ "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
+ e=e,
+ )
+ self.log.error("Exception in observer run loop")