SEBA-405 Convert synchronizer framework to library

Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
new file mode 100644
index 0000000..96ce727
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -0,0 +1,772 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# TODO:
+# Add unit tests:
+# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
+
+import time
+import threading
+import json
+
+from collections import defaultdict
+from networkx import (
+    DiGraph,
+    weakly_connected_component_subgraphs,
+    all_shortest_paths,
+    NetworkXNoPath,
+)
+from networkx.algorithms.dag import topological_sort
+
+from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
+from xossynchronizer.modelaccessor import *
+
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class StepNotReady(Exception):
+    pass
+
+
+class ExternalDependencyFailed(Exception):
+    pass
+
+
+# FIXME: Move drivers into a context shared across sync steps.
+
+
+class NoOpDriver:
+    def __init__(self):
+        self.enabled = True
+        self.dependency_graph = None
+
+
+# Everyone gets NoOpDriver by default. To use a different driver, call
+# set_driver() below.
+DRIVER = NoOpDriver()
+
+DIRECT_EDGE = 1
+PROXY_EDGE = 2
+
+
+def set_driver(x):
+    global DRIVER
+    DRIVER = x
+
+
+class XOSObserver(object):
+    sync_steps = []
+
+    def __init__(self, sync_steps, log=log):
+        # The Condition object via which events are received
+        self.log = log
+
+        self.step_lookup = {}
+        self.sync_steps = sync_steps
+        self.load_sync_steps()
+
+        self.load_dependency_graph()
+
+        self.event_cond = threading.Condition()
+
+        self.driver = DRIVER
+        self.observer_name = Config.get("name")
+
+    def wait_for_event(self, timeout):
+        self.event_cond.acquire()
+        self.event_cond.wait(timeout)
+        self.event_cond.release()
+
+    def wake_up(self):
+        self.log.debug("Wake up routine called")
+        self.event_cond.acquire()
+        self.event_cond.notify()
+        self.event_cond.release()
+
+    def load_dependency_graph(self):
+
+        try:
+            if Config.get("dependency_graph"):
+                self.log.trace(
+                    "Loading model dependency graph",
+                    path=Config.get("dependency_graph"),
+                )
+                dep_graph_str = open(Config.get("dependency_graph")).read()
+            else:
+                self.log.trace("Using default model dependency graph", graph={})
+                dep_graph_str = "{}"
+
+            # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
+            # src_port is the field that accesses Model2 from Model1
+            # dst_port is the field that accesses Model1 from Model2
+            static_dependencies = json.loads(dep_graph_str)
+            dynamic_dependencies = self.compute_service_dependencies()
+
+            joint_dependencies = dict(
+                static_dependencies.items() + dynamic_dependencies
+            )
+
+            model_dependency_graph = DiGraph()
+            for src_model, deps in joint_dependencies.items():
+                for dep in deps:
+                    dst_model, src_accessor, dst_accessor = dep
+                    if src_model != dst_model:
+                        edge_label = {
+                            "src_accessor": src_accessor,
+                            "dst_accessor": dst_accessor,
+                        }
+                        model_dependency_graph.add_edge(
+                            src_model, dst_model, edge_label
+                        )
+
+            model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
+            self.model_dependency_graph = {
+                # deletion
+                True: model_dependency_graph_rev,
+                False: model_dependency_graph,
+            }
+            self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges())
+        except Exception as e:
+            self.log.exception("Error loading dependency graph", e=e)
+            raise e
+
+    def load_sync_steps(self):
+        model_to_step = defaultdict(list)
+        external_dependencies = []
+
+        for s in self.sync_steps:
+            if not isinstance(s.observes, list):
+                observes = [s.observes]
+            else:
+                observes = s.observes
+
+            for m in observes:
+                model_to_step[m.__name__].append(s.__name__)
+
+            try:
+                external_dependencies.extend(s.external_dependencies)
+            except AttributeError:
+                pass
+
+            self.step_lookup[s.__name__] = s
+
+        self.model_to_step = model_to_step
+        self.external_dependencies = list(set(external_dependencies))
+        self.log.info(
+            "Loaded external dependencies", external_dependencies=external_dependencies
+        )
+        self.log.info("Loaded model_map", **model_to_step)
+
+    def reset_model_accessor(self, o=None):
+        try:
+            model_accessor.reset_queries()
+        except BaseException:
+            # this shouldn't happen, but in case it does, catch it...
+            if o:
+                logdict = o.tologdict()
+            else:
+                logdict = {}
+
+            self.log.error("exception in reset_queries", **logdict)
+
+    def delete_record(self, o, dr_log=None):
+
+        if dr_log is None:
+            dr_log = self.log
+
+        if getattr(o, "backend_need_reap", False):
+            # the object has already been deleted and marked for reaping
+            model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
+        else:
+            step = getattr(o, "synchronizer_step", None)
+            if not step:
+                raise ExternalDependencyFailed
+
+            model_accessor.journal_object(o, "syncstep.call.delete_record")
+
+            dr_log.debug("Deleting object", **o.tologdict())
+
+            step.log = dr_log.new(step=step)
+            step.delete_record(o)
+            step.log = dr_log
+
+            dr_log.debug("Deleted object", **o.tologdict())
+
+            model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+            o.backend_need_reap = True
+            o.save(update_fields=["backend_need_reap"])
+
+    def sync_record(self, o, sr_log=None):
+        try:
+            step = o.synchronizer_step
+        except AttributeError:
+            step = None
+
+        if step is None:
+            raise ExternalDependencyFailed
+
+        if sr_log is None:
+            sr_log = self.log
+
+        # Mark this as an object that will require delete. Do
+        # this now rather than after the syncstep,
+        if not (o.backend_need_delete):
+            o.backend_need_delete = True
+            o.save(update_fields=["backend_need_delete"])
+
+        model_accessor.journal_object(o, "syncstep.call.sync_record")
+
+        sr_log.debug("Syncing object", **o.tologdict())
+
+        step.log = sr_log.new(step=step)
+        step.sync_record(o)
+        step.log = sr_log
+
+        sr_log.debug("Synced object", **o.tologdict())
+
+        o.enacted = max(o.updated, o.changed_by_policy)
+        scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
+        o.backend_register = json.dumps(scratchpad)
+        o.backend_status = "OK"
+        o.backend_code = 1
+        model_accessor.journal_object(o, "syncstep.call.save_update")
+        o.save(
+            update_fields=[
+                "enacted",
+                "backend_status",
+                "backend_register",
+                "backend_code",
+            ]
+        )
+
+        if hasattr(step, "after_sync_save"):
+            step.log = sr_log.new(step=step)
+            step.after_sync_save(o)
+            step.log = sr_log
+
+        sr_log.info("Saved sync object", o=o)
+
+    """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
+
+    def handle_sync_exception(self, o, e):
+        self.log.exception("sync step failed!", e=e, **o.tologdict())
+        current_code = o.backend_code
+
+        if hasattr(e, "message"):
+            status = str(e.message)
+        else:
+            status = str(e)
+
+        if isinstance(e, InnocuousException):
+            code = 1
+        elif isinstance(e, DeferredException):
+            # NOTE if the synchronization is Deferred it means that synchronization is still in progress
+            code = 0
+        else:
+            code = 2
+
+        self.set_object_error(o, status, code)
+
+        dependency_error = "Failed due to error in model %s id %d: %s" % (
+            o.leaf_model_name,
+            o.id,
+            status,
+        )
+        return dependency_error, code
+
+    def set_object_error(self, o, status, code):
+        if o.backend_status:
+            error_list = o.backend_status.split(" // ")
+        else:
+            error_list = []
+
+        if status not in error_list:
+            error_list.append(status)
+
+        # Keep last two errors
+        error_list = error_list[-2:]
+
+        o.backend_code = code
+        o.backend_status = " // ".join(error_list)
+
+        try:
+            scratchpad = json.loads(o.backend_register)
+            scratchpad["exponent"]
+        except BaseException:
+            scratchpad = {
+                "next_run": 0,
+                "exponent": 0,
+                "last_success": time.time(),
+                "failures": 0,
+            }
+
+        # Second failure
+        if scratchpad["exponent"]:
+            if code == 1:
+                delay = scratchpad["exponent"] * 60  # 1 minute
+            else:
+                delay = scratchpad["exponent"] * 600  # 10 minutes
+
+            # cap delays at 8 hours
+            if delay > 8 * 60 * 60:
+                delay = 8 * 60 * 60
+            scratchpad["next_run"] = time.time() + delay
+
+        scratchpad["exponent"] += 1
+
+        try:
+            scratchpad["failures"] += 1
+        except KeyError:
+            scratchpad["failures"] = 1
+
+        scratchpad["last_failure"] = time.time()
+
+        o.backend_register = json.dumps(scratchpad)
+
+        # TOFIX:
+        # DatabaseError: value too long for type character varying(140)
+        if model_accessor.obj_exists(o):
+            try:
+                o.backend_status = o.backend_status[:1024]
+                o.save(
+                    update_fields=["backend_status", "backend_register"],
+                    always_update_timestamp=True,
+                )
+            except BaseException as e:
+                self.log.exception("Could not update backend status field!", e=e)
+                pass
+
+    def sync_cohort(self, cohort, deletion):
+        threading.current_thread().is_sync_thread = True
+
+        sc_log = self.log.new(thread_id=threading.current_thread().ident)
+
+        try:
+            start_time = time.time()
+            sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
+
+            cohort_emptied = False
+            dependency_error = None
+            dependency_error_code = None
+
+            itty = iter(cohort)
+
+            while not cohort_emptied:
+                try:
+                    self.reset_model_accessor()
+                    o = next(itty)
+
+                    if dependency_error:
+                        self.set_object_error(
+                            o, dependency_error, dependency_error_code
+                        )
+                        continue
+
+                    try:
+                        if deletion:
+                            self.delete_record(o, sc_log)
+                        else:
+                            self.sync_record(o, sc_log)
+                    except ExternalDependencyFailed:
+                        dependency_error = (
+                            "External dependency on object %s id %d not met"
+                            % (o.leaf_model_name, o.id)
+                        )
+                        dependency_error_code = 1
+                    except (DeferredException, InnocuousException, Exception) as e:
+                        dependency_error, dependency_error_code = self.handle_sync_exception(
+                            o, e
+                        )
+
+                except StopIteration:
+                    sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
+                    cohort_emptied = True
+        finally:
+            self.reset_model_accessor()
+            model_accessor.connection_close()
+
+    def tenant_class_name_from_service(self, service_name):
+        """ This code supports legacy functionality. To be cleaned up. """
+        name1 = service_name + "Instance"
+        if hasattr(Slice().stub, name1):
+            return name1
+        else:
+            name2 = service_name.replace("Service", "Tenant")
+            if hasattr(Slice().stub, name2):
+                return name2
+            else:
+                return None
+
+    def compute_service_dependencies(self):
+        """ FIXME: Implement more cleanly via xproto """
+
+        model_names = self.model_to_step.keys()
+        ugly_tuples = [
+            (m, m.replace("Instance", "").replace("Tenant", "Service"))
+            for m in model_names
+            if m.endswith("ServiceInstance") or m.endswith("Tenant")
+        ]
+        ugly_rtuples = [(v, k) for k, v in ugly_tuples]
+
+        ugly_map = dict(ugly_tuples)
+        ugly_rmap = dict(ugly_rtuples)
+
+        s_model_names = [v for k, v in ugly_tuples]
+        s_models0 = [
+            getattr(Slice().stub, model_name, None) for model_name in s_model_names
+        ]
+        s_models1 = [model.objects.first() for model in s_models0]
+        s_models = [m for m in s_models1 if m is not None]
+
+        dependencies = []
+        for model in s_models:
+            deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
+            if deps:
+                services = [
+                    self.tenant_class_name_from_service(
+                        d.provider_service.leaf_model_name
+                    )
+                    for d in deps
+                ]
+                dependencies.append(
+                    (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services])
+                )
+
+        return dependencies
+
+    def compute_service_instance_dependencies(self, objects):
+        link_set = [
+            ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
+            for o in objects
+        ]
+
+        dependencies = [
+            (l.provider_service_instance, l.subscriber_service_instance)
+            for links in link_set
+            for l in links
+        ]
+        providers = []
+
+        for p, s in dependencies:
+            if not p.enacted or p.enacted < p.updated:
+                p.dependent = s
+                providers.append(p)
+
+        return providers
+
+    def run(self):
+        # Cleanup: Move self.driver into a synchronizer context
+        # made available to every sync step.
+        if not self.driver.enabled:
+            self.log.warning("Driver is not enabled. Not running sync steps.")
+            return
+
+        while True:
+            self.log.trace("Waiting for event or timeout")
+            self.wait_for_event(timeout=5)
+            self.log.trace("Synchronizer awake")
+
+            self.run_once()
+
+    def fetch_pending(self, deletion=False):
+        unique_model_list = list(set(self.model_to_step.keys()))
+        pending_objects = []
+        pending_steps = []
+        step_list = self.step_lookup.values()
+
+        for e in self.external_dependencies:
+            s = SyncStep
+            s.observes = e
+            step_list.append(s)
+
+        for step_class in step_list:
+            step = step_class(driver=self.driver)
+            step.log = self.log.new(step=step)
+
+            if not hasattr(step, "call"):
+                pending = step.fetch_pending(deletion)
+                for obj in pending:
+                    step = step_class(driver=self.driver)
+                    step.log = self.log.new(step=step)
+                    obj.synchronizer_step = step
+
+                pending_service_dependencies = self.compute_service_instance_dependencies(
+                    pending
+                )
+
+                for obj in pending_service_dependencies:
+                    obj.synchronizer_step = None
+
+                pending_objects.extend(pending)
+                pending_objects.extend(pending_service_dependencies)
+            else:
+                # Support old and broken legacy synchronizers
+                # This needs to be dropped soon.
+                pending_steps.append(step)
+
+        self.log.trace(
+            "Fetched pending data",
+            pending_objects=pending_objects,
+            legacy_steps=pending_steps,
+        )
+        return pending_objects, pending_steps
+
+    def linked_objects(self, o):
+        if o is None:
+            return [], None
+        try:
+            o_lst = [o for o in o.all()]
+            edge_type = PROXY_EDGE
+        except (AttributeError, TypeError):
+            o_lst = [o]
+            edge_type = DIRECT_EDGE
+        return o_lst, edge_type
+
+    """ Automatically test if a real dependency path exists between two objects. e.g.
+        given an Instance, and a ControllerSite, the test amounts to:
+            instance.slice.site == controller.site
+
+        Then the two objects are related, and should be put in the same cohort.
+        If the models of the two objects are not dependent, then the check trivially
+        returns False.
+    """
+
+    def same_object(self, o1, o2):
+        if not o1 or not o2:
+            return False, None
+
+        o1_lst, edge_type = self.linked_objects(o1)
+
+        try:
+            found = next(
+                obj
+                for obj in o1_lst
+                if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
+            )
+        except AttributeError as e:
+            self.log.exception("Compared objects could not be identified", e=e)
+            raise e
+        except StopIteration:
+            # This is a temporary workaround to establish dependencies between
+            # deleted proxy objects. A better solution would be for the ORM to
+            # return the set of deleted objects via foreign keys. At that point,
+            # the following line would change back to found = False
+            # - Sapan
+
+            found = getattr(o2, "deleted", False)
+
+        return found, edge_type
+
+    def concrete_path_exists(self, o1, o2):
+        try:
+            m1 = o1.leaf_model_name
+            m2 = o2.leaf_model_name
+        except AttributeError:
+            # One of the nodes is not in the dependency graph
+            # No dependency
+            return False, None
+
+        if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
+            return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
+
+        # FIXME: Dynamic dependency check
+        G = self.model_dependency_graph[False]
+        paths = all_shortest_paths(G, m1, m2)
+
+        try:
+            any(paths)
+            paths = all_shortest_paths(G, m1, m2)
+        except NetworkXNoPath:
+            # Easy. The two models are unrelated.
+            return False, None
+
+        for p in paths:
+            src_object = o1
+            edge_type = DIRECT_EDGE
+
+            for i in range(len(p) - 1):
+                src = p[i]
+                dst = p[i + 1]
+                edge_label = G[src][dst]
+                sa = edge_label["src_accessor"]
+                try:
+                    dst_accessor = getattr(src_object, sa)
+                    dst_objects, link_edge_type = self.linked_objects(dst_accessor)
+                    if link_edge_type == PROXY_EDGE:
+                        edge_type = link_edge_type
+
+                    """
+
+                    True       			If no linked objects and deletion
+                    False      			If no linked objects
+                    True       			If multiple linked objects
+                    <continue traversal> 	If single linked object
+
+                    """
+
+                    if dst_objects == []:
+                        # Workaround for ORM not returning linked deleted
+                        # objects
+                        if o2.deleted:
+                            return True, edge_type
+                        else:
+                            dst_object = None
+                    elif len(dst_objects) > 1:
+                        # Multiple linked objects. Assume anything could be among those multiple objects.
+                        raise AttributeError
+                    else:
+                        dst_object = dst_objects[0]
+                except AttributeError as e:
+                    if sa != "fake_accessor":
+                        self.log.debug(
+                            "Could not check object dependencies, making conservative choice %s",
+                            e,
+                            src_object=src_object,
+                            sa=sa,
+                            o1=o1,
+                            o2=o2,
+                        )
+                    return True, edge_type
+
+                src_object = dst_object
+
+                if not src_object:
+                    break
+
+            verdict, edge_type = self.same_object(src_object, o2)
+            if verdict:
+                return verdict, edge_type
+
+            # Otherwise try other paths
+
+        return False, None
+
+    """
+
+    This function implements the main scheduling logic
+    of the Synchronizer. It divides incoming work (dirty objects)
+    into cohorts of dependent objects, and runs each such cohort
+    in its own thread.
+
+    Future work:
+
+    * Run event thread in parallel to the scheduling thread, and
+      add incoming objects to existing cohorts. Doing so should
+      greatly improve synchronizer performance.
+    * A single object might need to be added to multiple cohorts.
+      In this case, the last cohort handles such an object.
+    * This algorithm is horizontal-scale-ready. Multiple synchronizers
+      could run off a shared runqueue of cohorts.
+
+    """
+
+    def compute_dependent_cohorts(self, objects, deletion):
+        model_map = defaultdict(list)
+        n = len(objects)
+        r = range(n)
+        indexed_objects = zip(r, objects)
+
+        oG = DiGraph()
+
+        for i in r:
+            oG.add_node(i)
+
+        try:
+            for i0 in range(n):
+                for i1 in range(n):
+                    if i0 != i1:
+                        if deletion:
+                            path_args = (objects[i1], objects[i0])
+                        else:
+                            path_args = (objects[i0], objects[i1])
+
+                        is_connected, edge_type = self.concrete_path_exists(*path_args)
+                        if is_connected:
+                            try:
+                                edge_type = oG[i1][i0]["type"]
+                                if edge_type == PROXY_EDGE:
+                                    oG.remove_edge(i1, i0)
+                                    oG.add_edge(i0, i1, {"type": edge_type})
+                            except KeyError:
+                                oG.add_edge(i0, i1, {"type": edge_type})
+        except KeyError:
+            pass
+
+        components = weakly_connected_component_subgraphs(oG)
+        cohort_indexes = [reversed(topological_sort(g)) for g in components]
+        cohorts = [
+            [objects[i] for i in cohort_index] for cohort_index in cohort_indexes
+        ]
+
+        return cohorts
+
+    def run_once(self):
+        self.load_dependency_graph()
+
+        try:
+            # Why are we checking the DB connection here?
+            model_accessor.check_db_connection_okay()
+
+            loop_start = time.time()
+
+            # Two passes. One for sync, the other for deletion.
+            for deletion in (False, True):
+                objects_to_process = []
+
+                objects_to_process, steps_to_process = self.fetch_pending(deletion)
+                dependent_cohorts = self.compute_dependent_cohorts(
+                    objects_to_process, deletion
+                )
+
+                threads = []
+                self.log.trace("In run once inner loop", deletion=deletion)
+
+                for cohort in dependent_cohorts:
+                    thread = threading.Thread(
+                        target=self.sync_cohort,
+                        name="synchronizer",
+                        args=(cohort, deletion),
+                    )
+
+                    threads.append(thread)
+
+                # Start threads
+                for t in threads:
+                    t.start()
+
+                self.reset_model_accessor()
+
+                # Wait for all threads to finish before continuing with the run
+                # loop
+                for t in threads:
+                    t.join()
+
+                # Run legacy synchronizers, which do everything in call()
+                for step in steps_to_process:
+                    try:
+                        step.call(deletion=deletion)
+                    except Exception as e:
+                        self.log.exception("Legacy step failed", step=step, e=e)
+
+            loop_end = time.time()
+
+        except Exception as e:
+            self.log.exception(
+                "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
+                e=e,
+            )
+            self.log.error("Exception in observer run loop")