| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # TODO: |
| # Add unit tests: |
| # - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case |
| |
| import time |
| import threading |
| import json |
| |
| from collections import defaultdict |
| from networkx import ( |
| DiGraph, |
| weakly_connected_component_subgraphs, |
| all_shortest_paths, |
| NetworkXNoPath, |
| ) |
| from networkx.algorithms.dag import topological_sort |
| |
| from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep |
| |
| from xosconfig import Config |
| from multistructlog import create_logger |
| |
| log = create_logger(Config().get("logging")) |
| |
| |
| class StepNotReady(Exception): |
| pass |
| |
| |
| class ExternalDependencyFailed(Exception): |
| pass |
| |
| |
| # FIXME: Move drivers into a context shared across sync steps. |
| |
| |
| class NoOpDriver: |
| def __init__(self): |
| self.enabled = True |
| self.dependency_graph = None |
| |
| |
| # Everyone gets NoOpDriver by default. To use a different driver, call |
| # set_driver() below. |
| DRIVER = NoOpDriver() |
| |
| DIRECT_EDGE = 1 |
| PROXY_EDGE = 2 |
| |
| |
| def set_driver(x): |
| global DRIVER |
| DRIVER = x |
| |
| |
| class XOSObserver(object): |
| sync_steps = [] |
| |
| def __init__(self, sync_steps, model_accessor, log=log): |
| self.log = log |
| self.model_accessor = model_accessor |
| |
| self.step_lookup = {} |
| self.sync_steps = sync_steps |
| self.load_sync_steps() |
| |
| self.load_dependency_graph() |
| |
| self.event_cond = threading.Condition() |
| |
| self.driver = DRIVER |
| self.observer_name = Config.get("name") |
| |
| def wait_for_event(self, timeout): |
| self.event_cond.acquire() |
| self.event_cond.wait(timeout) |
| self.event_cond.release() |
| |
| def wake_up(self): |
| self.log.debug("Wake up routine called") |
| self.event_cond.acquire() |
| self.event_cond.notify() |
| self.event_cond.release() |
| |
| def load_dependency_graph(self): |
| |
| try: |
| if Config.get("dependency_graph"): |
| self.log.trace( |
| "Loading model dependency graph", |
| path=Config.get("dependency_graph"), |
| ) |
| dep_graph_str = open(Config.get("dependency_graph")).read() |
| else: |
| self.log.trace("Using default model dependency graph", graph={}) |
| dep_graph_str = "{}" |
| |
| # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] } |
| # src_port is the field that accesses Model2 from Model1 |
| # dst_port is the field that accesses Model1 from Model2 |
| static_dependencies = json.loads(dep_graph_str) |
| dynamic_dependencies = self.compute_service_dependencies() |
| |
| joint_dependencies = dict( |
| static_dependencies.items() + dynamic_dependencies |
| ) |
| |
| model_dependency_graph = DiGraph() |
| for src_model, deps in joint_dependencies.items(): |
| for dep in deps: |
| dst_model, src_accessor, dst_accessor = dep |
| if src_model != dst_model: |
| edge_label = { |
| "src_accessor": src_accessor, |
| "dst_accessor": dst_accessor, |
| } |
| model_dependency_graph.add_edge( |
| src_model, dst_model, edge_label |
| ) |
| |
| model_dependency_graph_rev = model_dependency_graph.reverse(copy=True) |
| self.model_dependency_graph = { |
| # deletion |
| True: model_dependency_graph_rev, |
| False: model_dependency_graph, |
| } |
| self.log.trace("Loaded dependencies", edges=model_dependency_graph.edges()) |
| except Exception as e: |
| self.log.exception("Error loading dependency graph", e=e) |
| raise e |
| |
| def load_sync_steps(self): |
| model_to_step = defaultdict(list) |
| external_dependencies = [] |
| |
| for s in self.sync_steps: |
| if not isinstance(s.observes, list): |
| observes = [s.observes] |
| else: |
| observes = s.observes |
| for m in observes: |
| if isinstance(m, str): |
| # observes is a string that names the model |
| model_to_step[m].append(s.__name__) |
| else: |
| # observes is the model class |
| model_to_step[m.__name__].append(s.__name__) |
| |
| try: |
| external_dependencies.extend(s.external_dependencies) |
| except AttributeError: |
| pass |
| |
| self.step_lookup[s.__name__] = s |
| |
| self.model_to_step = model_to_step |
| self.external_dependencies = list(set(external_dependencies)) |
| self.log.info( |
| "Loaded external dependencies", external_dependencies=external_dependencies |
| ) |
| self.log.info("Loaded model_map", **model_to_step) |
| |
| def reset_model_accessor(self, o=None): |
| try: |
| self.model_accessor.reset_queries() |
| except BaseException: |
| # this shouldn't happen, but in case it does, catch it... |
| if o: |
| logdict = o.tologdict() |
| else: |
| logdict = {} |
| |
| self.log.error("exception in reset_queries", **logdict) |
| |
| def delete_record(self, o, dr_log=None): |
| |
| if dr_log is None: |
| dr_log = self.log |
| |
| if getattr(o, "backend_need_reap", False): |
| # the object has already been deleted and marked for reaping |
| self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap") |
| else: |
| step = getattr(o, "synchronizer_step", None) |
| if not step: |
| raise ExternalDependencyFailed |
| |
| self.model_accessor.journal_object(o, "syncstep.call.delete_record") |
| |
| dr_log.debug("Deleting object", **o.tologdict()) |
| |
| step.log = dr_log.new(step=step) |
| step.delete_record(o) |
| step.log = dr_log |
| |
| dr_log.debug("Deleted object", **o.tologdict()) |
| |
| self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap") |
| o.backend_need_reap = True |
| o.save(update_fields=["backend_need_reap"]) |
| |
| def sync_record(self, o, sr_log=None): |
| try: |
| step = o.synchronizer_step |
| except AttributeError: |
| step = None |
| |
| if step is None: |
| raise ExternalDependencyFailed |
| |
| if sr_log is None: |
| sr_log = self.log |
| |
| # Mark this as an object that will require delete. Do |
| # this now rather than after the syncstep, |
| if not (o.backend_need_delete): |
| o.backend_need_delete = True |
| o.save(update_fields=["backend_need_delete"]) |
| |
| self.model_accessor.journal_object(o, "syncstep.call.sync_record") |
| |
| sr_log.debug("Syncing object", **o.tologdict()) |
| |
| step.log = sr_log.new(step=step) |
| step.sync_record(o) |
| step.log = sr_log |
| |
| sr_log.debug("Synced object", **o.tologdict()) |
| |
| o.enacted = max(o.updated, o.changed_by_policy) |
| scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()} |
| o.backend_register = json.dumps(scratchpad) |
| o.backend_status = "OK" |
| o.backend_code = 1 |
| self.model_accessor.journal_object(o, "syncstep.call.save_update") |
| o.save( |
| update_fields=[ |
| "enacted", |
| "backend_status", |
| "backend_register", |
| "backend_code", |
| ] |
| ) |
| |
| if hasattr(step, "after_sync_save"): |
| step.log = sr_log.new(step=step) |
| step.after_sync_save(o) |
| step.log = sr_log |
| |
| sr_log.info("Saved sync object", o=o) |
| |
| """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """ |
| |
| def handle_sync_exception(self, o, e): |
| self.log.exception("sync step failed!", e=e, **o.tologdict()) |
| current_code = o.backend_code |
| |
| if hasattr(e, "message"): |
| status = str(e.message) |
| else: |
| status = str(e) |
| |
| if isinstance(e, InnocuousException): |
| code = 1 |
| elif isinstance(e, DeferredException): |
| # NOTE if the synchronization is Deferred it means that synchronization is still in progress |
| code = 0 |
| else: |
| code = 2 |
| |
| self.set_object_error(o, status, code) |
| |
| dependency_error = "Failed due to error in model %s id %d: %s" % ( |
| o.leaf_model_name, |
| o.id, |
| status, |
| ) |
| return dependency_error, code |
| |
| def set_object_error(self, o, status, code): |
| if o.backend_status: |
| error_list = o.backend_status.split(" // ") |
| else: |
| error_list = [] |
| |
| if status not in error_list: |
| error_list.append(status) |
| |
| # Keep last two errors |
| error_list = error_list[-2:] |
| |
| o.backend_code = code |
| o.backend_status = " // ".join(error_list) |
| |
| try: |
| scratchpad = json.loads(o.backend_register) |
| scratchpad["exponent"] |
| except BaseException: |
| scratchpad = { |
| "next_run": 0, |
| "exponent": 0, |
| "last_success": time.time(), |
| "failures": 0, |
| } |
| |
| # Second failure |
| if scratchpad["exponent"]: |
| if code == 1: |
| delay = scratchpad["exponent"] * 60 # 1 minute |
| else: |
| delay = scratchpad["exponent"] * 600 # 10 minutes |
| |
| # cap delays at 8 hours |
| if delay > 8 * 60 * 60: |
| delay = 8 * 60 * 60 |
| scratchpad["next_run"] = time.time() + delay |
| |
| scratchpad["exponent"] += 1 |
| |
| try: |
| scratchpad["failures"] += 1 |
| except KeyError: |
| scratchpad["failures"] = 1 |
| |
| scratchpad["last_failure"] = time.time() |
| |
| o.backend_register = json.dumps(scratchpad) |
| |
| # TOFIX: |
| # DatabaseError: value too long for type character varying(140) |
| if self.model_accessor.obj_exists(o): |
| try: |
| o.backend_status = o.backend_status[:1024] |
| o.save( |
| update_fields=["backend_status", "backend_register"], |
| always_update_timestamp=True, |
| ) |
| except BaseException as e: |
| self.log.exception("Could not update backend status field!", e=e) |
| pass |
| |
| def sync_cohort(self, cohort, deletion): |
| threading.current_thread().is_sync_thread = True |
| |
| sc_log = self.log.new(thread_id=threading.current_thread().ident) |
| |
| try: |
| start_time = time.time() |
| sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion) |
| |
| cohort_emptied = False |
| dependency_error = None |
| dependency_error_code = None |
| |
| itty = iter(cohort) |
| |
| while not cohort_emptied: |
| try: |
| self.reset_model_accessor() |
| o = next(itty) |
| |
| if dependency_error: |
| self.set_object_error( |
| o, dependency_error, dependency_error_code |
| ) |
| continue |
| |
| try: |
| if deletion: |
| self.delete_record(o, sc_log) |
| else: |
| self.sync_record(o, sc_log) |
| except ExternalDependencyFailed: |
| dependency_error = ( |
| "External dependency on object %s id %d not met" |
| % (o.leaf_model_name, o.id) |
| ) |
| dependency_error_code = 1 |
| except (DeferredException, InnocuousException, Exception) as e: |
| dependency_error, dependency_error_code = self.handle_sync_exception( |
| o, e |
| ) |
| |
| except StopIteration: |
| sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion) |
| cohort_emptied = True |
| finally: |
| self.reset_model_accessor() |
| self.model_accessor.connection_close() |
| |
| def tenant_class_name_from_service(self, service_name): |
| """ This code supports legacy functionality. To be cleaned up. """ |
| name1 = service_name + "Instance" |
| if hasattr(self.model_accessor.Slice().stub, name1): |
| return name1 |
| else: |
| name2 = service_name.replace("Service", "Tenant") |
| if hasattr(self.model_accessor.Slice().stub, name2): |
| return name2 |
| else: |
| return None |
| |
| def compute_service_dependencies(self): |
| """ FIXME: Implement more cleanly via xproto """ |
| |
| model_names = self.model_to_step.keys() |
| ugly_tuples = [ |
| (m, m.replace("Instance", "").replace("Tenant", "Service")) |
| for m in model_names |
| if m.endswith("ServiceInstance") or m.endswith("Tenant") |
| ] |
| ugly_rtuples = [(v, k) for k, v in ugly_tuples] |
| |
| ugly_map = dict(ugly_tuples) |
| ugly_rmap = dict(ugly_rtuples) |
| |
| s_model_names = [v for k, v in ugly_tuples] |
| s_models0 = [ |
| getattr(self.model_accessor.Slice().stub, model_name, None) for model_name in s_model_names |
| ] |
| s_models1 = [model.objects.first() for model in s_models0] |
| s_models = [m for m in s_models1 if m is not None] |
| |
| dependencies = [] |
| for model in s_models: |
| deps = self.model_accessor.ServiceDependency.objects.filter(subscriber_service_id=model.id) |
| if deps: |
| services = [ |
| self.tenant_class_name_from_service( |
| d.provider_service.leaf_model_name |
| ) |
| for d in deps |
| ] |
| dependencies.append( |
| (ugly_rmap[model.leaf_model_name], [(s, "", "") for s in services]) |
| ) |
| |
| return dependencies |
| |
| def compute_service_instance_dependencies(self, objects): |
| link_set = [ |
| self.model_accessor.ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id) |
| for o in objects |
| ] |
| |
| dependencies = [ |
| (l.provider_service_instance, l.subscriber_service_instance) |
| for links in link_set |
| for l in links |
| ] |
| providers = [] |
| |
| for p, s in dependencies: |
| if not p.enacted or p.enacted < p.updated: |
| p.dependent = s |
| providers.append(p) |
| |
| return providers |
| |
| def run(self): |
| # Cleanup: Move self.driver into a synchronizer context |
| # made available to every sync step. |
| if not self.driver.enabled: |
| self.log.warning("Driver is not enabled. Not running sync steps.") |
| return |
| |
| while True: |
| self.log.trace("Waiting for event or timeout") |
| self.wait_for_event(timeout=5) |
| self.log.trace("Synchronizer awake") |
| |
| self.run_once() |
| |
| def fetch_pending(self, deletion=False): |
| unique_model_list = list(set(self.model_to_step.keys())) |
| pending_objects = [] |
| pending_steps = [] |
| step_list = self.step_lookup.values() |
| |
| for e in self.external_dependencies: |
| s = SyncStep |
| if isinstance(e,str): |
| # external dependency is a string that names a model class |
| s.observes = self.model_accessor.get_model_class(e) |
| else: |
| # external dependency is a model class |
| s.observes = e |
| step_list.append(s) |
| |
| for step_class in step_list: |
| step = step_class(driver=self.driver, model_accessor=self.model_accessor) |
| step.log = self.log.new(step=step) |
| |
| if not hasattr(step, "call"): |
| pending = step.fetch_pending(deletion) |
| for obj in pending: |
| step = step_class(driver=self.driver, model_accessor=self.model_accessor) |
| step.log = self.log.new(step=step) |
| obj.synchronizer_step = step |
| |
| pending_service_dependencies = self.compute_service_instance_dependencies( |
| pending |
| ) |
| |
| for obj in pending_service_dependencies: |
| obj.synchronizer_step = None |
| |
| pending_objects.extend(pending) |
| pending_objects.extend(pending_service_dependencies) |
| else: |
| # Support old and broken legacy synchronizers |
| # This needs to be dropped soon. |
| pending_steps.append(step) |
| |
| self.log.trace( |
| "Fetched pending data", |
| pending_objects=pending_objects, |
| legacy_steps=pending_steps, |
| ) |
| return pending_objects, pending_steps |
| |
| def linked_objects(self, o): |
| if o is None: |
| return [], None |
| try: |
| o_lst = [o for o in o.all()] |
| edge_type = PROXY_EDGE |
| except (AttributeError, TypeError): |
| o_lst = [o] |
| edge_type = DIRECT_EDGE |
| return o_lst, edge_type |
| |
| """ Automatically test if a real dependency path exists between two objects. e.g. |
| given an Instance, and a ControllerSite, the test amounts to: |
| instance.slice.site == controller.site |
| |
| Then the two objects are related, and should be put in the same cohort. |
| If the models of the two objects are not dependent, then the check trivially |
| returns False. |
| """ |
| |
| def same_object(self, o1, o2): |
| if not o1 or not o2: |
| return False, None |
| |
| o1_lst, edge_type = self.linked_objects(o1) |
| |
| try: |
| found = next( |
| obj |
| for obj in o1_lst |
| if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk |
| ) |
| except AttributeError as e: |
| self.log.exception("Compared objects could not be identified", e=e) |
| raise e |
| except StopIteration: |
| # This is a temporary workaround to establish dependencies between |
| # deleted proxy objects. A better solution would be for the ORM to |
| # return the set of deleted objects via foreign keys. At that point, |
| # the following line would change back to found = False |
| # - Sapan |
| |
| found = getattr(o2, "deleted", False) |
| |
| return found, edge_type |
| |
| def concrete_path_exists(self, o1, o2): |
| try: |
| m1 = o1.leaf_model_name |
| m2 = o2.leaf_model_name |
| except AttributeError: |
| # One of the nodes is not in the dependency graph |
| # No dependency |
| return False, None |
| |
| if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"): |
| return getattr(o2, "dependent", None) == o1, DIRECT_EDGE |
| |
| # FIXME: Dynamic dependency check |
| G = self.model_dependency_graph[False] |
| paths = all_shortest_paths(G, m1, m2) |
| |
| try: |
| any(paths) |
| paths = all_shortest_paths(G, m1, m2) |
| except NetworkXNoPath: |
| # Easy. The two models are unrelated. |
| return False, None |
| |
| for p in paths: |
| src_object = o1 |
| edge_type = DIRECT_EDGE |
| |
| for i in range(len(p) - 1): |
| src = p[i] |
| dst = p[i + 1] |
| edge_label = G[src][dst] |
| sa = edge_label["src_accessor"] |
| try: |
| dst_accessor = getattr(src_object, sa) |
| dst_objects, link_edge_type = self.linked_objects(dst_accessor) |
| if link_edge_type == PROXY_EDGE: |
| edge_type = link_edge_type |
| |
| """ |
| |
| True If no linked objects and deletion |
| False If no linked objects |
| True If multiple linked objects |
| <continue traversal> If single linked object |
| |
| """ |
| |
| if dst_objects == []: |
| # Workaround for ORM not returning linked deleted |
| # objects |
| if o2.deleted: |
| return True, edge_type |
| else: |
| dst_object = None |
| elif len(dst_objects) > 1: |
| # Multiple linked objects. Assume anything could be among those multiple objects. |
| raise AttributeError |
| else: |
| dst_object = dst_objects[0] |
| except AttributeError as e: |
| if sa != "fake_accessor": |
| self.log.debug( |
| "Could not check object dependencies, making conservative choice %s", |
| e, |
| src_object=src_object, |
| sa=sa, |
| o1=o1, |
| o2=o2, |
| ) |
| return True, edge_type |
| |
| src_object = dst_object |
| |
| if not src_object: |
| break |
| |
| verdict, edge_type = self.same_object(src_object, o2) |
| if verdict: |
| return verdict, edge_type |
| |
| # Otherwise try other paths |
| |
| return False, None |
| |
| """ |
| |
| This function implements the main scheduling logic |
| of the Synchronizer. It divides incoming work (dirty objects) |
| into cohorts of dependent objects, and runs each such cohort |
| in its own thread. |
| |
| Future work: |
| |
| * Run event thread in parallel to the scheduling thread, and |
| add incoming objects to existing cohorts. Doing so should |
| greatly improve synchronizer performance. |
| * A single object might need to be added to multiple cohorts. |
| In this case, the last cohort handles such an object. |
| * This algorithm is horizontal-scale-ready. Multiple synchronizers |
| could run off a shared runqueue of cohorts. |
| |
| """ |
| |
| def compute_dependent_cohorts(self, objects, deletion): |
| model_map = defaultdict(list) |
| n = len(objects) |
| r = range(n) |
| indexed_objects = zip(r, objects) |
| |
| oG = DiGraph() |
| |
| for i in r: |
| oG.add_node(i) |
| |
| try: |
| for i0 in range(n): |
| for i1 in range(n): |
| if i0 != i1: |
| if deletion: |
| path_args = (objects[i1], objects[i0]) |
| else: |
| path_args = (objects[i0], objects[i1]) |
| |
| is_connected, edge_type = self.concrete_path_exists(*path_args) |
| if is_connected: |
| try: |
| edge_type = oG[i1][i0]["type"] |
| if edge_type == PROXY_EDGE: |
| oG.remove_edge(i1, i0) |
| oG.add_edge(i0, i1, {"type": edge_type}) |
| except KeyError: |
| oG.add_edge(i0, i1, {"type": edge_type}) |
| except KeyError: |
| pass |
| |
| components = weakly_connected_component_subgraphs(oG) |
| cohort_indexes = [reversed(topological_sort(g)) for g in components] |
| cohorts = [ |
| [objects[i] for i in cohort_index] for cohort_index in cohort_indexes |
| ] |
| |
| return cohorts |
| |
| def run_once(self): |
| self.load_dependency_graph() |
| |
| try: |
| # Why are we checking the DB connection here? |
| self.model_accessor.check_db_connection_okay() |
| |
| loop_start = time.time() |
| |
| # Two passes. One for sync, the other for deletion. |
| for deletion in (False, True): |
| objects_to_process = [] |
| |
| objects_to_process, steps_to_process = self.fetch_pending(deletion) |
| dependent_cohorts = self.compute_dependent_cohorts( |
| objects_to_process, deletion |
| ) |
| |
| threads = [] |
| self.log.trace("In run once inner loop", deletion=deletion) |
| |
| for cohort in dependent_cohorts: |
| thread = threading.Thread( |
| target=self.sync_cohort, |
| name="synchronizer", |
| args=(cohort, deletion), |
| ) |
| |
| threads.append(thread) |
| |
| # Start threads |
| for t in threads: |
| t.start() |
| |
| self.reset_model_accessor() |
| |
| # Wait for all threads to finish before continuing with the run |
| # loop |
| for t in threads: |
| t.join() |
| |
| # Run legacy synchronizers, which do everything in call() |
| for step in steps_to_process: |
| try: |
| step.call(deletion=deletion) |
| except Exception as e: |
| self.log.exception("Legacy step failed", step=step, e=e) |
| |
| loop_end = time.time() |
| |
| except Exception as e: |
| self.log.exception( |
| "Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!", |
| e=e, |
| ) |
| self.log.error("Exception in observer run loop") |