blob: 5cbfda98cd01867e7fe45912ab081d7e61c835b2 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO:
# Add unit tests:
# - 2 sets of Instance, ControllerSlice, ControllerNetworks - delete and create case
import time
import threading
import json
from collections import defaultdict
from networkx import (
DiGraph,
weakly_connected_component_subgraphs,
all_shortest_paths,
NetworkXNoPath,
)
from networkx.algorithms.dag import topological_sort
from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
from xosconfig import Config
from multistructlog import create_logger
log = create_logger(Config().get("logging"))
class StepNotReady(Exception):
pass
class ExternalDependencyFailed(Exception):
pass
# FIXME: Move drivers into a context shared across sync steps.
class NoOpDriver:
def __init__(self):
self.enabled = True
self.dependency_graph = None
# Everyone gets NoOpDriver by default. To use a different driver, call
# set_driver() below.
DRIVER = NoOpDriver()
DIRECT_EDGE = 1
PROXY_EDGE = 2
def set_driver(x):
global DRIVER
DRIVER = x
class XOSObserver(object):
sync_steps = []
def __init__(self, sync_steps, model_accessor, log=log):
self.log = log
self.model_accessor = model_accessor
self.step_lookup = {}
self.sync_steps = sync_steps
self.load_sync_steps()
self.load_dependency_graph()
self.event_cond = threading.Condition()
self.driver = DRIVER
self.observer_name = Config.get("name")
def wait_for_event(self, timeout):
self.event_cond.acquire()
self.event_cond.wait(timeout)
self.event_cond.release()
def wake_up(self):
self.log.debug("Wake up routine called")
self.event_cond.acquire()
self.event_cond.notify()
self.event_cond.release()
def load_dependency_graph(self):
try:
if Config.get("dependency_graph"):
self.log.debug(
"Loading model dependency graph",
path=Config.get("dependency_graph"),
)
dep_graph_str = open(Config.get("dependency_graph")).read()
else:
self.log.debug("Using default model dependency graph", graph={})
dep_graph_str = "{}"
# joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
# src_port is the field that accesses Model2 from Model1
# dst_port is the field that accesses Model1 from Model2
static_dependencies = json.loads(dep_graph_str)
dynamic_dependencies = [] # Dropped Service and ServiceInstance dynamic dependencies
joint_dependencies = dict(
static_dependencies.items() + dynamic_dependencies
)
model_dependency_graph = DiGraph()
for src_model, deps in joint_dependencies.items():
for dep in deps:
dst_model, src_accessor, dst_accessor = dep
if src_model != dst_model:
edge_label = {
"src_accessor": src_accessor,
"dst_accessor": dst_accessor,
}
model_dependency_graph.add_edge(
src_model, dst_model, edge_label
)
model_dependency_graph_rev = model_dependency_graph.reverse(copy=True)
self.model_dependency_graph = {
# deletion
True: model_dependency_graph_rev,
False: model_dependency_graph,
}
self.log.debug("Loaded dependencies", edges=model_dependency_graph.edges())
except Exception as e:
self.log.exception("Error loading dependency graph", e=e)
raise e
def load_sync_steps(self):
model_to_step = defaultdict(list)
external_dependencies = []
for s in self.sync_steps:
if not isinstance(s.observes, list):
observes = [s.observes]
else:
observes = s.observes
for m in observes:
if isinstance(m, str):
# observes is a string that names the model
model_to_step[m].append(s.__name__)
else:
# observes is the model class
model_to_step[m.__name__].append(s.__name__)
try:
external_dependencies.extend(s.external_dependencies)
except AttributeError:
pass
self.step_lookup[s.__name__] = s
self.model_to_step = model_to_step
self.external_dependencies = list(set(external_dependencies))
self.log.info(
"Loaded external dependencies", external_dependencies=external_dependencies
)
self.log.info("Loaded model_map", **model_to_step)
def reset_model_accessor(self, o=None):
try:
self.model_accessor.reset_queries()
except BaseException:
# this shouldn't happen, but in case it does, catch it...
if o:
logdict = o.tologdict()
else:
logdict = {}
self.log.error("exception in reset_queries", **logdict)
def delete_record(self, o, dr_log=None):
if dr_log is None:
dr_log = self.log
if getattr(o, "backend_need_reap", False):
# the object has already been deleted and marked for reaping
self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
else:
step = getattr(o, "synchronizer_step", None)
if not step:
raise ExternalDependencyFailed
self.model_accessor.journal_object(o, "syncstep.call.delete_record")
dr_log.debug("Deleting object", **o.tologdict())
step.log = dr_log.new(step=step)
step.delete_record(o)
step.log = dr_log
dr_log.debug("Deleted object", **o.tologdict())
self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=["backend_need_reap"])
def sync_record(self, o, sr_log=None):
try:
step = o.synchronizer_step
except AttributeError:
step = None
if step is None:
raise ExternalDependencyFailed
if sr_log is None:
sr_log = self.log
# Mark this as an object that will require delete. Do
# this now rather than after the syncstep,
if not (o.backend_need_delete):
o.backend_need_delete = True
o.save(update_fields=["backend_need_delete"])
self.model_accessor.journal_object(o, "syncstep.call.sync_record")
sr_log.debug("Syncing object", **o.tologdict())
step.log = sr_log.new(step=step)
step.sync_record(o)
step.log = sr_log
sr_log.debug("Synced object", **o.tologdict())
o.enacted = max(o.updated, o.changed_by_policy)
scratchpad = {"next_run": 0, "exponent": 0, "last_success": time.time()}
o.backend_register = json.dumps(scratchpad)
o.backend_status = "OK"
o.backend_code = 1
self.model_accessor.journal_object(o, "syncstep.call.save_update")
o.save(
update_fields=[
"enacted",
"backend_status",
"backend_register",
"backend_code",
]
)
if hasattr(step, "after_sync_save"):
step.log = sr_log.new(step=step)
step.after_sync_save(o)
step.log = sr_log
sr_log.info("Saved sync object", o=o)
""" This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
def handle_sync_exception(self, o, e):
self.log.exception("sync step failed!", e=e, **o.tologdict())
current_code = o.backend_code
if hasattr(e, "message"):
status = str(e.message)
else:
status = str(e)
if isinstance(e, InnocuousException):
code = 1
elif isinstance(e, DeferredException):
# NOTE if the synchronization is Deferred it means that synchronization is still in progress
code = 0
else:
code = 2
self.set_object_error(o, status, code)
dependency_error = "Failed due to error in model %s id %d: %s" % (
o.leaf_model_name,
o.id,
status,
)
return dependency_error, code
def set_object_error(self, o, status, code):
if o.backend_status:
error_list = o.backend_status.split(" // ")
else:
error_list = []
if status not in error_list:
error_list.append(status)
# Keep last two errors
error_list = error_list[-2:]
o.backend_code = code
o.backend_status = " // ".join(error_list)
try:
scratchpad = json.loads(o.backend_register)
scratchpad["exponent"]
except BaseException:
scratchpad = {
"next_run": 0,
"exponent": 0,
"last_success": time.time(),
"failures": 0,
}
# Second failure
if scratchpad["exponent"]:
if code == 1:
delay = scratchpad["exponent"] * 60 # 1 minute
else:
delay = scratchpad["exponent"] * 600 # 10 minutes
# cap delays at 8 hours
if delay > 8 * 60 * 60:
delay = 8 * 60 * 60
scratchpad["next_run"] = time.time() + delay
scratchpad["exponent"] += 1
try:
scratchpad["failures"] += 1
except KeyError:
scratchpad["failures"] = 1
scratchpad["last_failure"] = time.time()
o.backend_register = json.dumps(scratchpad)
# TOFIX:
# DatabaseError: value too long for type character varying(140)
if self.model_accessor.obj_exists(o):
try:
o.backend_status = o.backend_status[:1024]
o.save(
update_fields=["backend_status", "backend_register"],
always_update_timestamp=True,
)
except BaseException as e:
self.log.exception("Could not update backend status field!", e=e)
pass
def sync_cohort(self, cohort, deletion):
threading.current_thread().is_sync_thread = True
sc_log = self.log.new(thread_id=threading.current_thread().ident)
try:
start_time = time.time()
sc_log.debug("Starting to work on cohort", cohort=cohort, deletion=deletion)
cohort_emptied = False
dependency_error = None
dependency_error_code = None
itty = iter(cohort)
while not cohort_emptied:
try:
self.reset_model_accessor()
o = next(itty)
if dependency_error:
self.set_object_error(
o, dependency_error, dependency_error_code
)
continue
try:
if deletion:
self.delete_record(o, sc_log)
else:
self.sync_record(o, sc_log)
except ExternalDependencyFailed:
dependency_error = (
"External dependency on object %s id %d not met"
% (o.leaf_model_name, o.id)
)
dependency_error_code = 1
except (DeferredException, InnocuousException, Exception) as e:
dependency_error, dependency_error_code = self.handle_sync_exception(
o, e
)
except StopIteration:
sc_log.debug("Cohort completed", cohort=cohort, deletion=deletion)
cohort_emptied = True
finally:
self.reset_model_accessor()
self.model_accessor.connection_close()
def run(self):
# Cleanup: Move self.driver into a synchronizer context
# made available to every sync step.
if not self.driver.enabled:
self.log.warning("Driver is not enabled. Not running sync steps.")
return
while True:
self.log.debug("Waiting for event or timeout")
self.wait_for_event(timeout=5)
self.log.debug("Synchronizer awake")
self.run_once()
def fetch_pending(self, deletion=False):
unique_model_list = list(set(self.model_to_step.keys()))
pending_objects = []
pending_steps = []
step_list = self.step_lookup.values()
for e in self.external_dependencies:
s = SyncStep
if isinstance(e,str):
# external dependency is a string that names a model class
s.observes = self.model_accessor.get_model_class(e)
else:
# external dependency is a model class
s.observes = e
step_list.append(s)
for step_class in step_list:
step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
if not hasattr(step, "call"):
pending = step.fetch_pending(deletion)
for obj in pending:
step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
obj.synchronizer_step = step
pending_objects.extend(pending)
else:
# Support old and broken legacy synchronizers
# This needs to be dropped soon.
pending_steps.append(step)
self.log.debug(
"Fetched pending data",
pending_objects=pending_objects,
legacy_steps=pending_steps,
)
return pending_objects, pending_steps
def linked_objects(self, o):
if o is None:
return [], None
try:
o_lst = [o for o in o.all()]
edge_type = PROXY_EDGE
except (AttributeError, TypeError):
o_lst = [o]
edge_type = DIRECT_EDGE
return o_lst, edge_type
""" Automatically test if a real dependency path exists between two objects. e.g.
given an Instance, and a ControllerSite, the test amounts to:
instance.slice.site == controller.site
Then the two objects are related, and should be put in the same cohort.
If the models of the two objects are not dependent, then the check trivially
returns False.
"""
def same_object(self, o1, o2):
if not o1 or not o2:
return False, None
o1_lst, edge_type = self.linked_objects(o1)
try:
found = next(
obj
for obj in o1_lst
if obj.leaf_model_name == o2.leaf_model_name and obj.pk == o2.pk
)
except AttributeError as e:
self.log.exception("Compared objects could not be identified", e=e)
raise e
except StopIteration:
# This is a temporary workaround to establish dependencies between
# deleted proxy objects. A better solution would be for the ORM to
# return the set of deleted objects via foreign keys. At that point,
# the following line would change back to found = False
# - Sapan
found = getattr(o2, "deleted", False)
return found, edge_type
def concrete_path_exists(self, o1, o2):
try:
m1 = o1.leaf_model_name
m2 = o2.leaf_model_name
except AttributeError:
# One of the nodes is not in the dependency graph
# No dependency
return False, None
if m1.endswith("ServiceInstance") and m2.endswith("ServiceInstance"):
return getattr(o2, "dependent", None) == o1, DIRECT_EDGE
# FIXME: Dynamic dependency check
G = self.model_dependency_graph[False]
paths = all_shortest_paths(G, m1, m2)
try:
any(paths)
paths = all_shortest_paths(G, m1, m2)
except NetworkXNoPath:
# Easy. The two models are unrelated.
return False, None
for p in paths:
src_object = o1
edge_type = DIRECT_EDGE
for i in range(len(p) - 1):
src = p[i]
dst = p[i + 1]
edge_label = G[src][dst]
sa = edge_label["src_accessor"]
try:
dst_accessor = getattr(src_object, sa)
dst_objects, link_edge_type = self.linked_objects(dst_accessor)
if link_edge_type == PROXY_EDGE:
edge_type = link_edge_type
"""
True If no linked objects and deletion
False If no linked objects
True If multiple linked objects
<continue traversal> If single linked object
"""
if dst_objects == []:
# Workaround for ORM not returning linked deleted
# objects
if o2.deleted:
return True, edge_type
else:
dst_object = None
elif len(dst_objects) > 1:
# Multiple linked objects. Assume anything could be among those multiple objects.
raise AttributeError
else:
dst_object = dst_objects[0]
except AttributeError as e:
if sa != "fake_accessor":
self.log.debug(
"Could not check object dependencies, making conservative choice %s",
e,
src_object=src_object,
sa=sa,
o1=o1,
o2=o2,
)
return True, edge_type
src_object = dst_object
if not src_object:
break
verdict, edge_type = self.same_object(src_object, o2)
if verdict:
return verdict, edge_type
# Otherwise try other paths
return False, None
"""
This function implements the main scheduling logic
of the Synchronizer. It divides incoming work (dirty objects)
into cohorts of dependent objects, and runs each such cohort
in its own thread.
Future work:
* Run event thread in parallel to the scheduling thread, and
add incoming objects to existing cohorts. Doing so should
greatly improve synchronizer performance.
* A single object might need to be added to multiple cohorts.
In this case, the last cohort handles such an object.
* This algorithm is horizontal-scale-ready. Multiple synchronizers
could run off a shared runqueue of cohorts.
"""
def compute_dependent_cohorts(self, objects, deletion):
model_map = defaultdict(list)
n = len(objects)
r = range(n)
indexed_objects = zip(r, objects)
oG = DiGraph()
for i in r:
oG.add_node(i)
try:
for i0 in range(n):
for i1 in range(n):
if i0 != i1:
if deletion:
path_args = (objects[i1], objects[i0])
else:
path_args = (objects[i0], objects[i1])
is_connected, edge_type = self.concrete_path_exists(*path_args)
if is_connected:
try:
edge_type = oG[i1][i0]["type"]
if edge_type == PROXY_EDGE:
oG.remove_edge(i1, i0)
oG.add_edge(i0, i1, {"type": edge_type})
except KeyError:
oG.add_edge(i0, i1, {"type": edge_type})
except KeyError:
pass
components = weakly_connected_component_subgraphs(oG)
cohort_indexes = [reversed(topological_sort(g)) for g in components]
cohorts = [
[objects[i] for i in cohort_index] for cohort_index in cohort_indexes
]
return cohorts
def run_once(self):
self.load_dependency_graph()
try:
# Why are we checking the DB connection here?
self.model_accessor.check_db_connection_okay()
loop_start = time.time()
# Two passes. One for sync, the other for deletion.
for deletion in (False, True):
objects_to_process = []
objects_to_process, steps_to_process = self.fetch_pending(deletion)
dependent_cohorts = self.compute_dependent_cohorts(
objects_to_process, deletion
)
threads = []
self.log.debug("In run once inner loop", deletion=deletion)
for cohort in dependent_cohorts:
thread = threading.Thread(
target=self.sync_cohort,
name="synchronizer",
args=(cohort, deletion),
)
threads.append(thread)
# Start threads
for t in threads:
t.start()
self.reset_model_accessor()
# Wait for all threads to finish before continuing with the run
# loop
for t in threads:
t.join()
# Run legacy synchronizers, which do everything in call()
for step in steps_to_process:
try:
step.call(deletion=deletion)
except Exception as e:
self.log.exception("Legacy step failed", step=step, e=e)
loop_end = time.time()
except Exception as e:
self.log.exception(
"Core error. This seems like a misconfiguration or bug. This error will not be relayed to the user!",
e=e,
)
self.log.error("Exception in observer run loop")