| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| # TODO: |
| # Add unit tests: |
| # - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case |
| |
| import time |
| import threading |
| import json |
| |
| from collections import defaultdict |
| from networkx import DiGraph, weakly_connected_component_subgraphs, all_shortest_paths, NetworkXNoPath |
| from networkx.algorithms.dag import topological_sort |
| |
| from synchronizers.new_base.steps import * |
| from syncstep import InnocuousException, DeferredException, SyncStep |
| from synchronizers.new_base.modelaccessor import * |
| |
| from xosconfig import Config |
| from multistructlog import create_logger |
| |
| log = create_logger(Config().get('logging')) |
| |
| class StepNotReady(Exception): |
| pass |
| |
| |
| class ExternalDependencyFailed(Exception): |
| pass |
| |
| # FIXME: Move drivers into a context shared across sync steps. |
| |
| |
| class NoOpDriver: |
| def __init__(self): |
| self.enabled = True |
| self.dependency_graph = None |
| |
| |
| # Everyone gets NoOpDriver by default. To use a different driver, call |
| # set_driver() below. |
| DRIVER = NoOpDriver() |
| |
| DIRECT_EDGE = 1 |
| PROXY_EDGE = 2 |
| |
| |
| def set_driver(x): |
| global DRIVER |
| DRIVER = x |
| |
| |
| class XOSObserver(object): |
| sync_steps = [] |
| |
| def __init__(self, sync_steps, log=log): |
| # The Condition object via which events are received |
| self.log = log |
| |
| self.step_lookup = {} |
| self.sync_steps = sync_steps |
| self.load_sync_steps() |
| |
| self.load_dependency_graph() |
| |
| self.event_cond = threading.Condition() |
| |
| self.driver = DRIVER |
| self.observer_name = Config.get("name") |
| |
| def wait_for_event(self, timeout): |
| self.event_cond.acquire() |
| self.event_cond.wait(timeout) |
| self.event_cond.release() |
| |
| def wake_up(self): |
| self.log.debug('Wake up routine called') |
| self.event_cond.acquire() |
| self.event_cond.notify() |
| self.event_cond.release() |
| |
| def load_dependency_graph(self): |
| |
| try: |
| if Config.get("dependency_graph"): |
| self.log.trace('Loading model dependency graph', path=Config.get("dependency_graph")) |
| dep_graph_str = open(Config.get("dependency_graph")).read() |
| else: |
| self.log.trace('Using default model dependency graph', graph={}) |
| dep_graph_str = '{}' |
| |
| # joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] } |
| # src_port is the field that accesses Model2 from Model1 |
| # dst_port is the field that accesses Model1 from Model2 |
| static_dependencies = json.loads(dep_graph_str) |
| dynamic_dependencies = self.compute_service_dependencies() |
| |
| joint_dependencies = dict( |
| static_dependencies.items() + dynamic_dependencies) |
| |
| model_dependency_graph = DiGraph() |
| for src_model, deps in joint_dependencies.items(): |
| for dep in deps: |
| dst_model, src_accessor, dst_accessor = dep |
| if src_model != dst_model: |
| edge_label = {'src_accessor': src_accessor, |
| 'dst_accessor': dst_accessor} |
| model_dependency_graph.add_edge( |
| src_model, dst_model, edge_label) |
| |
| model_dependency_graph_rev = model_dependency_graph.reverse( |
| copy=True) |
| self.model_dependency_graph = { |
| # deletion |
| True: model_dependency_graph_rev, |
| False: model_dependency_graph |
| } |
| self.log.trace( |
| "Loaded dependencies", |
| edges=model_dependency_graph.edges()) |
| except Exception as e: |
| self.log.exception("Error loading dependency graph", e=e) |
| raise e |
| |
| def load_sync_steps(self): |
| model_to_step = defaultdict(list) |
| external_dependencies = [] |
| |
| for s in self.sync_steps: |
| if not isinstance(s.observes, list): |
| observes = [s.observes] |
| else: |
| observes = s.observes |
| |
| for m in observes: |
| model_to_step[m.__name__].append(s.__name__) |
| |
| try: |
| external_dependencies.extend(s.external_dependencies) |
| except AttributeError: |
| pass |
| |
| self.step_lookup[s.__name__] = s |
| |
| self.model_to_step = model_to_step |
| self.external_dependencies = list(set(external_dependencies)) |
| self.log.info( |
| 'Loaded external dependencies', |
| external_dependencies=external_dependencies) |
| self.log.info('Loaded model_map', **model_to_step) |
| |
| def reset_model_accessor(self, o=None): |
| try: |
| model_accessor.reset_queries() |
| except BaseException: |
| # this shouldn't happen, but in case it does, catch it... |
| if (o): |
| logdict = o.tologdict() |
| else: |
| logdict = {} |
| |
| self.log.error("exception in reset_queries", **logdict) |
| |
| def delete_record(self, o, dr_log = None): |
| |
| if dr_log is None: |
| dr_log = self.log |
| |
| if getattr(o, "backend_need_reap", False): |
| # the object has already been deleted and marked for reaping |
| model_accessor.journal_object( |
| o, "syncstep.call.already_marked_reap") |
| else: |
| step = getattr(o, 'synchronizer_step', None) |
| if not step: |
| raise ExternalDependencyFailed |
| |
| model_accessor.journal_object(o, "syncstep.call.delete_record") |
| |
| dr_log.debug("Deleting object", **o.tologdict()) |
| |
| step.log = dr_log.new(step=step) |
| step.delete_record(o) |
| step.log = dr_log |
| |
| dr_log.debug("Deleted object", **o.tologdict()) |
| |
| model_accessor.journal_object(o, "syncstep.call.delete_set_reap") |
| o.backend_need_reap = True |
| o.save(update_fields=['backend_need_reap']) |
| |
| def sync_record(self, o, sr_log=None): |
| try: |
| step = o.synchronizer_step |
| except AttributeError: |
| step = None |
| |
| if step is None: |
| raise ExternalDependencyFailed |
| |
| if sr_log is None: |
| sr_log = self.log |
| |
| # Mark this as an object that will require delete. Do |
| # this now rather than after the syncstep, |
| if not (o.backend_need_delete): |
| o.backend_need_delete = True |
| o.save(update_fields=['backend_need_delete']) |
| |
| model_accessor.journal_object(o, "syncstep.call.sync_record") |
| |
| sr_log.debug("Syncing object", **o.tologdict()) |
| |
| step.log = sr_log.new(step=step) |
| step.sync_record(o) |
| step.log = sr_log |
| |
| sr_log.debug("Synced object", **o.tologdict()) |
| |
| o.enacted = max(o.updated, o.changed_by_policy) |
| scratchpad = {'next_run': 0, 'exponent': 0, |
| 'last_success': time.time()} |
| o.backend_register = json.dumps(scratchpad) |
| o.backend_status = "OK" |
| o.backend_code = 1 |
| model_accessor.journal_object(o, "syncstep.call.save_update") |
| o.save(update_fields=['enacted', 'backend_status', |
| 'backend_register', 'backend_code']) |
| |
| if hasattr(step, "after_sync_save"): |
| step.log = sr_log.new(step=step) |
| step.after_sync_save(o) |
| step.log = sr_log |
| |
| sr_log.info("Saved sync object", o=o) |
| |
| """ This function needs a cleanup. FIXME: Rethink backend_status, backend_register """ |
| |
| def handle_sync_exception(self, o, e): |
| self.log.exception("sync step failed!", e=e, **o.tologdict()) |
| current_code = o.backend_code |
| |
| if hasattr(e, 'message'): |
| status = str(e.message) |
| else: |
| status = str(e) |
| |
| if isinstance(e, InnocuousException): |
| code = 1 |
| elif isinstance(e, DeferredException): |
| # NOTE if the synchronization is Deferred it means that synchronization is still in progress |
| code = 0 |
| else: |
| code = 2 |
| |
| self.set_object_error(o, status, code) |
| |
| dependency_error = 'Failed due to error in model %s id %d: %s' % ( |
| o.leaf_model_name, o.id, status) |
| return dependency_error, code |
| |
| def set_object_error(self, o, status, code): |
| if o.backend_status: |
| error_list = o.backend_status.split(' // ') |
| else: |
| error_list = [] |
| |
| if status not in error_list: |
| error_list.append(status) |
| |
| # Keep last two errors |
| error_list = error_list[-2:] |
| |
| o.backend_code = code |
| o.backend_status = ' // '.join(error_list) |
| |
| try: |
| scratchpad = json.loads(o.backend_register) |
| scratchpad['exponent'] |
| except BaseException: |
| scratchpad = {'next_run': 0, 'exponent': 0, |
| 'last_success': time.time(), 'failures': 0} |
| |
| # Second failure |
| if (scratchpad['exponent']): |
| if code == 1: |
| delay = scratchpad['exponent'] * 60 # 1 minute |
| else: |
| delay = scratchpad['exponent'] * 600 # 10 minutes |
| |
| # cap delays at 8 hours |
| if (delay > 8 * 60 * 60): |
| delay = 8 * 60 * 60 |
| scratchpad['next_run'] = time.time() + delay |
| |
| scratchpad['exponent'] += 1 |
| |
| try: |
| scratchpad['failures'] += 1 |
| except KeyError: |
| scratchpad['failures'] = 1 |
| |
| scratchpad['last_failure'] = time.time() |
| |
| o.backend_register = json.dumps(scratchpad) |
| |
| # TOFIX: |
| # DatabaseError: value too long for type character varying(140) |
| if (model_accessor.obj_exists(o)): |
| try: |
| o.backend_status = o.backend_status[:1024] |
| o.save(update_fields=['backend_status', |
| 'backend_register'], |
| always_update_timestamp=True) |
| except BaseException as e: |
| self.log.exception( |
| "Could not update backend status field!", e=e) |
| pass |
| |
| def sync_cohort(self, cohort, deletion): |
| threading.current_thread().is_sync_thread=True |
| |
| sc_log = self.log.new(thread_id=threading.current_thread().ident) |
| |
| try: |
| start_time = time.time() |
| sc_log.debug( |
| "Starting to work on cohort", |
| cohort=cohort, |
| deletion=deletion) |
| |
| cohort_emptied = False |
| dependency_error = None |
| dependency_error_code = None |
| |
| itty = iter(cohort) |
| |
| while not cohort_emptied: |
| try: |
| self.reset_model_accessor() |
| o = next(itty) |
| |
| if dependency_error: |
| self.set_object_error( |
| o, dependency_error, dependency_error_code) |
| continue |
| |
| try: |
| if (deletion): |
| self.delete_record(o, sc_log) |
| else: |
| self.sync_record(o, sc_log) |
| except ExternalDependencyFailed: |
| dependency_error = 'External dependency on object %s id %d not met' % ( |
| o.leaf_model_name, o.id) |
| dependency_error_code = 1 |
| except (DeferredException, InnocuousException, Exception) as e: |
| dependency_error, dependency_error_code = self.handle_sync_exception( |
| o, e) |
| |
| except StopIteration: |
| sc_log.debug( |
| "Cohort completed", |
| cohort=cohort, |
| deletion=deletion) |
| cohort_emptied = True |
| finally: |
| self.reset_model_accessor() |
| model_accessor.connection_close() |
| |
| def tenant_class_name_from_service(self, service_name): |
| """ This code supports legacy functionality. To be cleaned up. """ |
| name1 = service_name + 'Instance' |
| if hasattr(Slice().stub, name1): |
| return name1 |
| else: |
| name2 = service_name.replace('Service', 'Tenant') |
| if hasattr(Slice().stub, name2): |
| return name2 |
| else: |
| return None |
| |
| def compute_service_dependencies(self): |
| """ FIXME: Implement more cleanly via xproto """ |
| |
| model_names = self.model_to_step.keys() |
| ugly_tuples = [(m, m.replace('Instance', '').replace('Tenant', 'Service')) |
| for m in model_names if m.endswith('ServiceInstance') or m.endswith('Tenant')] |
| ugly_rtuples = [(v, k) for k, v in ugly_tuples] |
| |
| ugly_map = dict(ugly_tuples) |
| ugly_rmap = dict(ugly_rtuples) |
| |
| s_model_names = [v for k, v in ugly_tuples] |
| s_models0 = [getattr(Slice().stub, model_name, None) |
| for model_name in s_model_names] |
| s_models1 = [model.objects.first() for model in s_models0] |
| s_models = [m for m in s_models1 if m is not None] |
| |
| dependencies = [] |
| for model in s_models: |
| deps = ServiceDependency.objects.filter( |
| subscriber_service_id=model.id) |
| if deps: |
| services = [self.tenant_class_name_from_service( |
| d.provider_service.leaf_model_name) for d in deps] |
| dependencies.append((ugly_rmap[model.leaf_model_name], [ |
| (s, '', '') for s in services])) |
| |
| return dependencies |
| |
| def compute_service_instance_dependencies(self, objects): |
| link_set = [ServiceInstanceLink.objects.filter( |
| subscriber_service_instance_id=o.id) for o in objects] |
| |
| dependencies = [(l.provider_service_instance, l.subscriber_service_instance) |
| for links in link_set for l in links] |
| providers = [] |
| |
| for p, s in dependencies: |
| if not p.enacted or p.enacted<p.updated: |
| p.dependent = s |
| providers.append(p) |
| |
| return providers |
| |
| def run(self): |
| # Cleanup: Move self.driver into a synchronizer context |
| # made available to every sync step. |
| if not self.driver.enabled: |
| self.log.warning('Driver is not enabled. Not running sync steps.') |
| return |
| |
| while True: |
| self.log.trace('Waiting for event or timeout') |
| self.wait_for_event(timeout=5) |
| self.log.trace('Synchronizer awake') |
| |
| self.run_once() |
| |
| def fetch_pending(self, deletion=False): |
| unique_model_list = list(set(self.model_to_step.keys())) |
| pending_objects = [] |
| pending_steps = [] |
| step_list = self.step_lookup.values() |
| |
| for e in self.external_dependencies: |
| s = SyncStep |
| s.observes = e |
| step_list.append(s) |
| |
| for step_class in step_list: |
| step = step_class(driver=self.driver) |
| step.log = self.log.new(step=step) |
| |
| if not hasattr(step, 'call'): |
| pending = step.fetch_pending(deletion) |
| for obj in pending: |
| step = step_class(driver=self.driver) |
| step.log = self.log.new(step=step) |
| obj.synchronizer_step = step |
| |
| pending_service_dependencies = self.compute_service_instance_dependencies( |
| pending) |
| |
| for obj in pending_service_dependencies: |
| obj.synchronizer_step = None |
| |
| pending_objects.extend(pending) |
| pending_objects.extend(pending_service_dependencies) |
| else: |
| # Support old and broken legacy synchronizers |
| # This needs to be dropped soon. |
| pending_steps.append(step) |
| |
| self.log.trace( |
| 'Fetched pending data', |
| pending_objects=pending_objects, |
| legacy_steps=pending_steps) |
| return pending_objects, pending_steps |
| |
| def linked_objects(self, o): |
| if o is None: |
| return [], None |
| try: |
| o_lst = [o for o in o.all()] |
| edge_type = PROXY_EDGE |
| except (AttributeError, TypeError): |
| o_lst = [o] |
| edge_type = DIRECT_EDGE |
| return o_lst, edge_type |
| |
| """ Automatically test if a real dependency path exists between two objects. e.g. |
| given an Instance, and a ControllerSite, the test amounts to: |
| instance.slice.site == controller.site |
| |
| Then the two objects are related, and should be put in the same cohort. |
| If the models of the two objects are not dependent, then the check trivially |
| returns False. |
| """ |
| |
| def same_object(self, o1, o2): |
| if not o1 or not o2: |
| return False, None |
| |
| o1_lst, edge_type = self.linked_objects(o1) |
| |
| try: |
| found = next(obj for obj in o1_lst if obj.leaf_model_name == |
| o2.leaf_model_name and obj.pk == o2.pk) |
| except AttributeError as e: |
| self.log.exception('Compared objects could not be identified', e=e) |
| raise e |
| except StopIteration: |
| # This is a temporary workaround to establish dependencies between |
| # deleted proxy objects. A better solution would be for the ORM to |
| # return the set of deleted objects via foreign keys. At that point, |
| # the following line would change back to found = False |
| # - Sapan |
| |
| found = getattr(o2, 'deleted', False) |
| |
| return found, edge_type |
| |
| def concrete_path_exists(self, o1, o2): |
| try: |
| m1 = o1.leaf_model_name |
| m2 = o2.leaf_model_name |
| except AttributeError: |
| # One of the nodes is not in the dependency graph |
| # No dependency |
| return False, None |
| |
| if m1.endswith('ServiceInstance') and m2.endswith('ServiceInstance'): |
| return getattr(o2, 'dependent', None) == o1, DIRECT_EDGE |
| |
| # FIXME: Dynamic dependency check |
| G = self.model_dependency_graph[False] |
| paths = all_shortest_paths(G, m1, m2) |
| |
| try: |
| any(paths) |
| paths = all_shortest_paths(G, m1, m2) |
| except NetworkXNoPath: |
| # Easy. The two models are unrelated. |
| return False, None |
| |
| for p in paths: |
| src_object = o1 |
| edge_type = DIRECT_EDGE |
| |
| for i in range(len(p) - 1): |
| src = p[i] |
| dst = p[i + 1] |
| edge_label = G[src][dst] |
| sa = edge_label['src_accessor'] |
| try: |
| dst_accessor = getattr(src_object, sa) |
| dst_objects, link_edge_type = self.linked_objects( |
| dst_accessor) |
| if link_edge_type == PROXY_EDGE: |
| edge_type = link_edge_type |
| |
| """ |
| |
| True If no linked objects and deletion |
| False If no linked objects |
| True If multiple linked objects |
| <continue traversal> If single linked object |
| |
| """ |
| |
| if dst_objects == []: |
| # Workaround for ORM not returning linked deleted |
| # objects |
| if o2.deleted: |
| return True, edge_type |
| else: |
| dst_object = None |
| elif len(dst_objects) > 1: |
| # Multiple linked objects. Assume anything could be among those multiple objects. |
| raise AttributeError |
| else: |
| dst_object = dst_objects[0] |
| except AttributeError as e: |
| if sa!='fake_accessor': |
| self.log.debug( |
| 'Could not check object dependencies, making conservative choice %s', e, src_object=src_object, sa=sa, o1=o1, o2=o2) |
| return True, edge_type |
| |
| src_object = dst_object |
| |
| if not src_object: |
| break |
| |
| verdict, edge_type = self.same_object(src_object, o2) |
| if verdict: |
| return verdict, edge_type |
| |
| # Otherwise try other paths |
| |
| return False, None |
| |
| """ |
| |
| This function implements the main scheduling logic |
| of the Synchronizer. It divides incoming work (dirty objects) |
| into cohorts of dependent objects, and runs each such cohort |
| in its own thread. |
| |
| Future work: |
| |
| * Run event thread in parallel to the scheduling thread, and |
| add incoming objects to existing cohorts. Doing so should |
| greatly improve synchronizer performance. |
| * A single object might need to be added to multiple cohorts. |
| In this case, the last cohort handles such an object. |
| * This algorithm is horizontal-scale-ready. Multiple synchronizers |
| could run off a shared runqueue of cohorts. |
| |
| """ |
| |
| def compute_dependent_cohorts(self, objects, deletion): |
| model_map = defaultdict(list) |
| n = len(objects) |
| r = range(n) |
| indexed_objects = zip(r, objects) |
| |
| oG = DiGraph() |
| |
| for i in r: |
| oG.add_node(i) |
| |
| try: |
| for i0 in range(n): |
| for i1 in range(n): |
| if i0 != i1: |
| if deletion: |
| path_args = (objects[i1], objects[i0]) |
| else: |
| path_args = (objects[i0], objects[i1]) |
| |
| is_connected, edge_type = self.concrete_path_exists( |
| *path_args) |
| if is_connected: |
| try: |
| edge_type = oG[i1][i0]['type'] |
| if edge_type == PROXY_EDGE: |
| oG.remove_edge(i1, i0) |
| oG.add_edge(i0, i1, {'type': edge_type}) |
| except KeyError: |
| oG.add_edge(i0, i1, {'type': edge_type}) |
| except KeyError: |
| pass |
| |
| components = weakly_connected_component_subgraphs(oG) |
| cohort_indexes = [reversed(topological_sort(g)) for g in components] |
| cohorts = [[objects[i] for i in cohort_index] |
| for cohort_index in cohort_indexes] |
| |
| return cohorts |
| |
| def run_once(self): |
| self.load_dependency_graph() |
| |
| try: |
| # Why are we checking the DB connection here? |
| model_accessor.check_db_connection_okay() |
| |
| loop_start = time.time() |
| |
| # Two passes. One for sync, the other for deletion. |
| for deletion in (False, True): |
| objects_to_process = [] |
| |
| objects_to_process, steps_to_process = self.fetch_pending( |
| deletion) |
| dependent_cohorts = self.compute_dependent_cohorts( |
| objects_to_process, deletion) |
| |
| threads = [] |
| self.log.trace('In run once inner loop', deletion=deletion) |
| |
| for cohort in dependent_cohorts: |
| thread = threading.Thread( |
| target=self.sync_cohort, name='synchronizer', args=( |
| cohort, deletion)) |
| |
| threads.append(thread) |
| |
| # Start threads |
| for t in threads: |
| t.start() |
| |
| self.reset_model_accessor() |
| |
| # Wait for all threads to finish before continuing with the run |
| # loop |
| for t in threads: |
| t.join() |
| |
| # Run legacy synchronizers, which do everything in call() |
| for step in steps_to_process: |
| try: |
| step.call(deletion=deletion) |
| except Exception as e: |
| self.log.exception( |
| "Legacy step failed", step=step, e=e) |
| |
| loop_end = time.time() |
| |
| except Exception as e: |
| self.log.exception( |
| 'Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!', |
| e=e) |
| self.log.error("Exception in observer run loop") |