CORD-2145: Mirror ServiceDependency and ServiceInstanceLink in the
synchronizer
Change-Id: I19b3b8c68dccf3ff35c8e0daeba7b268ea077a68
diff --git a/xos/coreapi/orm.py b/xos/coreapi/orm.py
index 7f1ec68..a59e736 100644
--- a/xos/coreapi/orm.py
+++ b/xos/coreapi/orm.py
@@ -49,6 +49,7 @@
super(ORMWrapper, self).__setattr__("stub", stub)
super(ORMWrapper, self).__setattr__("cache", {})
super(ORMWrapper, self).__setattr__("synchronizer_step", None)
+ super(ORMWrapper, self).__setattr__("dependent", None)
super(ORMWrapper, self).__setattr__("reverse_cache", {})
super(ORMWrapper, self).__setattr__("is_new", is_new)
fkmap=self.gen_fkmap()
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 242cd19..2e5e6e1 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -22,7 +22,7 @@
import json
import pprint
import traceback
-
+
from collections import defaultdict
from networkx import DiGraph, dfs_edges, weakly_connected_component_subgraphs, all_shortest_paths, NetworkXNoPath
from networkx.algorithms.dag import topological_sort
@@ -62,6 +62,7 @@
DIRECT_EDGE = 1
PROXY_EDGE = 2
+
def set_driver(x):
global DRIVER
DRIVER = x
@@ -76,7 +77,9 @@
self.step_lookup = {}
self.sync_steps = sync_steps
self.load_sync_steps()
+
self.load_dependency_graph()
+
self.event_cond = threading.Condition()
self.driver = DRIVER
@@ -103,7 +106,11 @@
# joint_dependencies is of the form { Model1 -> [(Model2, src_port, dst_port), ...] }
# src_port is the field that accesses Model2 from Model1
# dst_port is the field that accesses Model1 from Model2
- joint_dependencies = json.loads(dep_graph_str)
+ static_dependencies = json.loads(dep_graph_str)
+ dynamic_dependencies = self.compute_service_dependencies()
+
+ joint_dependencies = dict(
+ static_dependencies.items() + dynamic_dependencies)
model_dependency_graph = DiGraph()
for src_model, deps in joint_dependencies.items():
@@ -195,6 +202,9 @@
try:
step = o.synchronizer_step
except AttributeError:
+ step = None
+
+ if step is None:
raise ExternalDependencyFailed
new_enacted = model_accessor.now()
@@ -224,7 +234,8 @@
o.backend_status = "OK"
o.backend_code = 1
model_accessor.journal_object(o, "syncstep.call.save_update")
- o.save(update_fields=['enacted', 'backend_status', 'backend_register'])
+ o.save(update_fields=['enacted', 'backend_status',
+ 'backend_register', 'backend_code'])
log.info("Saved sync object, new enacted", enacted=new_enacted)
""" This function needs a cleanup. FIXME: Rethink backend_status, backend_register """
@@ -342,7 +353,7 @@
self.sync_record(o, log)
except ExternalDependencyFailed:
dependency_error = 'External dependency on object %s id %d not met' % (
- o.__class__.__name__, o.id)
+ o.leaf_model_name, o.id)
dependency_error_code = 1
except (DeferredException, InnocuousException, Exception) as e:
dependency_error, dependency_error_code = self.handle_sync_exception(
@@ -358,6 +369,62 @@
self.reset_model_accessor()
model_accessor.connection_close()
+ def tenant_class_name_from_service(self, service_name):
+ """ This code supports legacy functionality. To be cleaned up. """
+ name1 = service_name + 'Instance'
+ if hasattr(Slice().stub, name1):
+ return name1
+ else:
+ name2 = service_name.replace('Service', 'Tenant')
+ if hasattr(Slice().stub, name2):
+ return name2
+ else:
+ return None
+
+ def compute_service_dependencies(self):
+ """ FIXME: Implement more cleanly via xproto """
+
+ model_names = self.model_to_step.keys()
+ ugly_tuples = [(m, m.replace('Instance', '').replace('Tenant', 'Service'))
+ for m in model_names if m.endswith('ServiceInstance') or m.endswith('Tenant')]
+ ugly_rtuples = [(v, k) for k, v in ugly_tuples]
+
+ ugly_map = dict(ugly_tuples)
+ ugly_rmap = dict(ugly_rtuples)
+
+ s_model_names = [v for k, v in ugly_tuples]
+ s_models0 = [getattr(Slice().stub, model_name, None)
+ for model_name in s_model_names]
+ s_models1 = [model.objects.first() for model in s_models0]
+ s_models = [m for m in s_models1 if m is not None]
+
+ dependencies = []
+ for model in s_models:
+ deps = ServiceDependency.objects.filter(
+ subscriber_service_id=model.id)
+ if deps:
+ services = [self.tenant_class_name_from_service(
+ d.provider_service.leaf_model_name) for d in deps]
+ dependencies.append((ugly_rmap[model.leaf_model_name], [
+ (s, '', '') for s in services]))
+
+ return dependencies
+
+ def compute_service_instance_dependencies(self, objects):
+ link_set = [ServiceInstanceLink.objects.filter(
+ subscriber_service_instance_id=o.id) for o in objects]
+
+ dependencies = [(l.provider_service_instance, l.subscriber_service_instance)
+ for links in link_set for l in links]
+ providers = []
+
+ for p, s in dependencies:
+ if not p.enacted or p.enacted<p.updated:
+ p.dependent = s
+ providers.append(p)
+
+ return providers
+
def run(self):
# Cleanup: Move self.driver into a synchronizer context
# made available to every sync step.
@@ -392,7 +459,15 @@
step = step_class(driver=self.driver)
step.log = self.log.bind(step=step)
obj.synchronizer_step = step
+
+ pending_service_dependencies = self.compute_service_instance_dependencies(
+ pending)
+
+ for obj in pending_service_dependencies:
+ obj.synchronizer_step = None
+
pending_objects.extend(pending)
+ pending_objects.extend(pending_service_dependencies)
else:
# Support old and broken legacy synchronizers
# This needs to be dropped soon.
@@ -405,9 +480,9 @@
return pending_objects, pending_steps
def linked_objects(self, o):
- if o is None:
+ if o is None:
return [], None
- try:
+ try:
o_lst = [o for o in o.all()]
edge_type = PROXY_EDGE
except (AttributeError, TypeError):
@@ -427,8 +502,8 @@
def same_object(self, o1, o2):
if not o1 or not o2:
return False, None
-
- o1_lst,edge_type = self.linked_objects(o1)
+
+ o1_lst, edge_type = self.linked_objects(o1)
try:
found = next(obj for obj in o1_lst if obj.leaf_model_name ==
@@ -440,7 +515,7 @@
# This is a temporary workaround to establish dependencies between
# deleted proxy objects. A better solution would be for the ORM to
# return the set of deleted objects via foreign keys. At that point,
- # the following line would change back to found = False
+ # the following line would change back to found = False
# - Sapan
found = getattr(o2, 'deleted', False)
@@ -456,6 +531,9 @@
# No dependency
return False, None
+ if m1.endswith('ServiceInstance') and m2.endswith('ServiceInstance'):
+ return getattr(o2, 'dependent', None) == o1, DIRECT_EDGE
+
# FIXME: Dynamic dependency check
G = self.model_dependency_graph[False]
paths = all_shortest_paths(G, m1, m2)
@@ -478,7 +556,8 @@
sa = edge_label['src_accessor']
try:
dst_accessor = getattr(src_object, sa)
- dst_objects,link_edge_type = self.linked_objects(dst_accessor)
+ dst_objects, link_edge_type = self.linked_objects(
+ dst_accessor)
if link_edge_type == PROXY_EDGE:
edge_type = link_edge_type
@@ -491,16 +570,18 @@
"""
- if dst_objects==[]:
+ if dst_objects == []:
+ # Workaround for ORM not returning linked deleted
+ # objects
if o2.deleted:
return True, edge_type
else:
dst_object = None
- elif len(dst_objects)>1:
- # Multiple linked objects. Assume anything could be among those multiple objects.
- raise AttributeError
+ elif len(dst_objects) > 1:
+ # Multiple linked objects. Assume anything could be among those multiple objects.
+ raise AttributeError
else:
- dst_object = dst_objects[0]
+ dst_object = dst_objects[0]
except AttributeError as e:
if sa!='fake_accessor':
self.log.debug(
@@ -577,10 +658,11 @@
cohorts = [[objects[i] for i in cohort_index]
for cohort_index in cohort_indexes]
-
return cohorts
def run_once(self):
+ self.load_dependency_graph()
+
try:
# Why are we checking the DB connection here?
model_accessor.check_db_connection_okay()
diff --git a/xos/synchronizers/new_base/tests/test_payload.py b/xos/synchronizers/new_base/tests/test_payload.py
index b893c85..524c5ea 100644
--- a/xos/synchronizers/new_base/tests/test_payload.py
+++ b/xos/synchronizers/new_base/tests/test_payload.py
@@ -105,7 +105,7 @@
a = get_ansible_output()
self.assertDictContainsSubset({'delete':False, 'name':o.name}, a)
- o.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register'])
+ o.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register', 'backend_code'])
@mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
@mock.patch("event_loop.model_accessor")
@@ -128,8 +128,8 @@
a = get_ansible_output()
self.assertDictContainsSubset({'delete':False, 'name':o.name}, a)
- o.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register'])
- cs.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register'])
+ o.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register', 'backend_code'])
+ cs.save.assert_called_with(update_fields=['enacted', 'backend_status', 'backend_register', 'backend_code'])
@mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
@mock.patch("event_loop.model_accessor")
diff --git a/xos/xos_client/xosapi/orm.py b/xos/xos_client/xosapi/orm.py
index ae3278d..e4be772 100644
--- a/xos/xos_client/xosapi/orm.py
+++ b/xos/xos_client/xosapi/orm.py
@@ -49,6 +49,7 @@
super(ORMWrapper, self).__setattr__("cache", {})
super(ORMWrapper, self).__setattr__("reverse_cache", {})
super(ORMWrapper, self).__setattr__("synchronizer_step", None)
+ super(ORMWrapper, self).__setattr__("dependent", None)
super(ORMWrapper, self).__setattr__("is_new", is_new)
super(ORMWrapper, self).__setattr__("post_save_fixups", [])
fkmap=self.gen_fkmap()