SEBA-405 Cleanup synchronizer imports of model_accessor to globals;
Move mock modelaccessor to /tmp;
Easier mock modelaccessor configuration
Change-Id: I67a17b9a72ea69f61d92206f1b520a11c2f18d80
diff --git a/VERSION b/VERSION
index b9975ae..78c4baa 100644
--- a/VERSION
+++ b/VERSION
@@ -1,2 +1,2 @@
-2.1.37
+2.1.38
diff --git a/containers/chameleon/Dockerfile.chameleon b/containers/chameleon/Dockerfile.chameleon
index 56da03e..0c6bd0a 100644
--- a/containers/chameleon/Dockerfile.chameleon
+++ b/containers/chameleon/Dockerfile.chameleon
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/chameleon
-FROM xosproject/xos-base:2.1.36
+FROM xosproject/xos-base:2.1.38
# xos-base already has protoc and dependencies installed
diff --git a/containers/xos/Dockerfile.client b/containers/xos/Dockerfile.client
index cc8c026..1cfe9be 100644
--- a/containers/xos/Dockerfile.client
+++ b/containers/xos/Dockerfile.client
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-client
-FROM xosproject/xos-libraries:2.1.37
+FROM xosproject/xos-libraries:2.1.38
# Install XOS client
COPY lib/xos-api /tmp/xos-api
diff --git a/containers/xos/Dockerfile.libraries b/containers/xos/Dockerfile.libraries
index 66f9f01..28f06b7 100644
--- a/containers/xos/Dockerfile.libraries
+++ b/containers/xos/Dockerfile.libraries
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-libraries
-FROM xosproject/xos-base:2.1.37
+FROM xosproject/xos-base:2.1.38
# Add libraries
COPY lib /opt/xos/lib
diff --git a/containers/xos/Dockerfile.synchronizer-base b/containers/xos/Dockerfile.synchronizer-base
index 9230cd9..457ddb5 100644
--- a/containers/xos/Dockerfile.synchronizer-base
+++ b/containers/xos/Dockerfile.synchronizer-base
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-synchronizer-base
-FROM xosproject/xos-client:2.1.37
+FROM xosproject/xos-client:2.1.38
COPY xos/synchronizers/new_base /opt/xos/synchronizers/new_base
COPY xos/xos/logger.py /opt/xos/xos/logger.py
diff --git a/containers/xos/Dockerfile.xos-core b/containers/xos/Dockerfile.xos-core
index 0a92707..23246d2 100644
--- a/containers/xos/Dockerfile.xos-core
+++ b/containers/xos/Dockerfile.xos-core
@@ -13,7 +13,7 @@
# limitations under the License.
# xosproject/xos-core
-FROM xosproject/xos-libraries:2.1.37
+FROM xosproject/xos-libraries:2.1.38
# Install XOS
ADD xos /opt/xos
diff --git a/lib/xos-genx/xosgenx/targets/mock_classes.xtarget b/lib/xos-genx/xosgenx/targets/mock_classes.xtarget
index 69efe92..61c2f1f 100644
--- a/lib/xos-genx/xosgenx/targets/mock_classes.xtarget
+++ b/lib/xos-genx/xosgenx/targets/mock_classes.xtarget
@@ -170,10 +170,20 @@
AllMockObjectStores.append(store)
return store
-class ModelAccessor:
- def check_db_connection_ok(self):
+class ModelAccessor(object):
+ def check_db_connection_okay(self):
return True
+ def connection_close(self):
+ pass
+
+ def journal_object(self, *args, **kwargs):
+ pass
+
+ def obj_exists(self, o):
+ # gRPC will default id to '0' for uninitialized objects
+ return (o.id is not None) and (o.id != 0)
+
def fetch_pending(self, model, deleted = False):
num = random.randint(1, 5)
object_list = []
@@ -200,6 +210,19 @@
def get_model_class(self, classname):
return globals()[classname]
+ def has_model_class(self, classname):
+ return classname in globals()
+
+ def __getattr__(self, name):
+ """ Wrapper for getattr to facilitate retrieval of classes """
+ has_model_class = self.__getattribute__("has_model_class")
+ get_model_class = self.__getattribute__("get_model_class")
+ if has_model_class(name):
+ return get_model_class(name)
+
+ # Default behaviour
+ return self.__getattribute__(name)
+
model_accessor = ModelAccessor()
class ObjectSet(object):
diff --git a/lib/xos-synchronizer/tests/event_steps/event_step.py b/lib/xos-synchronizer/tests/event_steps/event_step.py
index 601b8df..d04f5a5 100644
--- a/lib/xos-synchronizer/tests/event_steps/event_step.py
+++ b/lib/xos-synchronizer/tests/event_steps/event_step.py
@@ -22,8 +22,8 @@
topics = ["sometopic"]
pattern = None
- def __init__(self, log, *args, **kwargs):
- super(TestEventStep, self).__init__(log, *args, **kwargs)
+ def __init__(self, model_accessor, log, *args, **kwargs):
+ super(TestEventStep, self).__init__(model_accessor, log, *args, **kwargs)
def process_event(self, event):
print("received an event", event)
diff --git a/lib/xos-synchronizer/tests/steps/sync_container.py b/lib/xos-synchronizer/tests/steps/sync_container.py
index baf108f..8cbabcb 100644
--- a/lib/xos-synchronizer/tests/steps/sync_container.py
+++ b/lib/xos-synchronizer/tests/steps/sync_container.py
@@ -21,7 +21,6 @@
import time
from xossynchronizer.steps.SyncInstanceUsingAnsible import SyncInstanceUsingAnsible
from xossynchronizer.steps.syncstep import DeferredException
-from xossynchronizer.mock_modelaccessor import *
# hpclibrary will be in steps/..
parentdir = os.path.join(os.path.dirname(__file__), "..")
@@ -29,19 +28,18 @@
class SyncContainer(SyncInstanceUsingAnsible):
- provides = [Instance]
- observes = Instance
+ observes = "Instance"
template_name = "sync_container.yaml"
def __init__(self, *args, **kwargs):
super(SyncContainer, self).__init__(*args, **kwargs)
def fetch_pending(self, deletion=False):
- i = Instance()
+ i = self.model_accessor.Instance()
i.name = "Spectacular Sponge"
- j = Instance()
+ j = self.model_accessor.Instance()
j.name = "Spontaneous Tent"
- k = Instance()
+ k = self.model_accessor.Instance()
k.name = "Embarrassed Cat"
objs = [i, j, k]
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_images.py b/lib/xos-synchronizer/tests/steps/sync_controller_images.py
index 84a43b1..ef85983 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_images.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_images.py
@@ -16,17 +16,15 @@
import os
import base64
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncControllerImages(SyncStep):
- provides = [ControllerImages]
- observes = ControllerImages
+ observes = "ControllerImages"
requested_interval = 0
playbook = "sync_controller_images.yaml"
def fetch_pending(self, deleted):
- ci = ControllerImages()
- i = Image()
+ ci = self.model_accessor.ControllerImages()
+ i = self.model_accessor.Image()
i.name = "Lush Loss"
ci.i = i
return [ci]
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_networks.py b/lib/xos-synchronizer/tests/steps/sync_controller_networks.py
index 1133545..55dfe4e 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_networks.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_networks.py
@@ -19,20 +19,18 @@
import socket
from netaddr import IPAddress, IPNetwork
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncControllerNetworks(SyncStep):
requested_interval = 0
- provides = [Network]
- observes = ControllerNetwork
- external_dependencies = [User]
+ observes = "ControllerNetwork"
+ external_dependencies = ["User"]
playbook = "sync_controller_networks.yaml"
def fetch_pending(self, deleted):
- ci = ControllerNetwork()
- i = Network()
+ ci = self.model_accessor.ControllerNetwork()
+ i = self.model_accessor.Network()
i.name = "Lush Loss"
- s = Slice()
+ s = self.model_accessor.Slice()
s.name = "Ghastly Notebook"
i.owner = s
ci.i = i
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py b/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py
index 65d3985..e286ef8 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_site_privileges.py
@@ -17,12 +17,10 @@
import base64
import json
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncControllerSitePrivileges(SyncStep):
- provides = [SitePrivilege]
requested_interval = 0
- observes = ControllerSitePrivilege
+ observes = "ControllerSitePrivilege"
playbook = "sync_controller_users.yaml"
def map_sync_inputs(self, controller_site_privilege):
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_sites.py b/lib/xos-synchronizer/tests/steps/sync_controller_sites.py
index 509a45c..24aa76f 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_sites.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_sites.py
@@ -17,12 +17,10 @@
import base64
import json
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncControllerSites(SyncStep):
requested_interval = 0
- provides = [Site]
- observes = ControllerSite
+ observes = "ControllerSite"
playbook = "sync_controller_sites.yaml"
def fetch_pending(self, deleted=False):
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py b/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py
index ec0667c..09b63e6 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_slice_privileges.py
@@ -17,12 +17,10 @@
import base64
import json
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncControllerSlicePrivileges(SyncStep):
- provides = [SlicePrivilege]
requested_interval = 0
- observes = ControllerSlicePrivilege
+ observes = "ControllerSlicePrivilege"
playbook = "sync_controller_users.yaml"
def map_sync_inputs(self, controller_slice_privilege):
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_slices.py b/lib/xos-synchronizer/tests/steps/sync_controller_slices.py
index 0f43bad..31c62f1 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_slices.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_slices.py
@@ -15,13 +15,12 @@
import os
import base64
-from xossynchronizer.steps.syncstep import SyncStep, DeferredException
-from xossynchronizer.mock_modelaccessor import *
+from xossynchronizer.steps.syncstep import DeferredException
+from xossynchronizer.steps.ansiblesyncstep import AnsibleSyncStep
-class SyncControllerSlices(SyncStep):
- provides = [Slice]
+class SyncControllerSlices(AnsibleSyncStep):
requested_interval = 0
- observes = ControllerSlice
+ observes = "ControllerSlice"
playbook = "sync_controller_slices.yaml"
def map_sync_inputs(self, controller_slice):
diff --git a/lib/xos-synchronizer/tests/steps/sync_controller_users.py b/lib/xos-synchronizer/tests/steps/sync_controller_users.py
index 881e78a..a039257 100644
--- a/lib/xos-synchronizer/tests/steps/sync_controller_users.py
+++ b/lib/xos-synchronizer/tests/steps/sync_controller_users.py
@@ -16,13 +16,11 @@
import os
import base64
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncControllerUsers(SyncStep):
- provides = [User]
requested_interval = 0
- observes = ControllerUser
+ observes = "ControllerUser"
playbook = "sync_controller_users.yaml"
def map_sync_inputs(self, controller_user):
diff --git a/lib/xos-synchronizer/tests/steps/sync_images.py b/lib/xos-synchronizer/tests/steps/sync_images.py
index 2284ed2..b3ed9bd 100644
--- a/lib/xos-synchronizer/tests/steps/sync_images.py
+++ b/lib/xos-synchronizer/tests/steps/sync_images.py
@@ -13,15 +13,11 @@
# limitations under the License.
-import os
-import base64
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncImages(SyncStep):
- provides = [Image]
requested_interval = 0
- observes = [Image]
+ observes = ["Image"]
def sync_record(self, role):
# do nothing
diff --git a/lib/xos-synchronizer/tests/steps/sync_instances.py b/lib/xos-synchronizer/tests/steps/sync_instances.py
index 479b87d..1a70884 100644
--- a/lib/xos-synchronizer/tests/steps/sync_instances.py
+++ b/lib/xos-synchronizer/tests/steps/sync_instances.py
@@ -14,13 +14,7 @@
import os
-import base64
-import socket
-from xossynchronizer.steps import syncstep
-from xossynchronizer.mock_modelaccessor import *
-
-RESTAPI_HOSTNAME = socket.gethostname()
-RESTAPI_PORT = "8000"
+from xossynchronizer.steps import ansiblesyncstep
def escape(s):
@@ -28,10 +22,10 @@
return s
-class SyncInstances(syncstep.SyncStep):
- provides = [Instance]
+class SyncInstances(ansiblesyncstep.AnsibleSyncStep):
requested_interval = 0
- observes = Instance
+ # This observes is intentionally a list of one string, to test steps where observes is a list of strings.
+ observes = ["Instance"]
playbook = "sync_instances.yaml"
def fetch_pending(self, deletion=False):
diff --git a/lib/xos-synchronizer/tests/steps/sync_ports.py b/lib/xos-synchronizer/tests/steps/sync_ports.py
index 77209a5..a7eb7d1 100644
--- a/lib/xos-synchronizer/tests/steps/sync_ports.py
+++ b/lib/xos-synchronizer/tests/steps/sync_ports.py
@@ -16,13 +16,13 @@
import os
import base64
from xossynchronizer.steps.syncstep import SyncStep
-from xossynchronizer.mock_modelaccessor import *
class SyncPort(SyncStep):
- requested_interval = 0 # 3600
- provides = [Port]
- observes = Port
+ requested_interval = 0
+
+ # This observes is intentionally a string, to test steps where observes is a string
+ observes = "Port"
def call(self, failed=[], deletion=False):
if deletion:
diff --git a/lib/xos-synchronizer/tests/steps/sync_roles.py b/lib/xos-synchronizer/tests/steps/sync_roles.py
index e8b1364..7298a64 100644
--- a/lib/xos-synchronizer/tests/steps/sync_roles.py
+++ b/lib/xos-synchronizer/tests/steps/sync_roles.py
@@ -20,8 +20,9 @@
class SyncRoles(SyncStep):
- provides = [Role]
requested_interval = 0
+
+ # This observes is intentionally a list of three classes, to test steps where observes is a list of classes.
observes = [SiteRole, SliceRole, ControllerRole]
def sync_record(self, role):
diff --git a/lib/xos-synchronizer/tests/test_event_engine.py b/lib/xos-synchronizer/tests/test_event_engine.py
index 13972c6..a09b3d0 100644
--- a/lib/xos-synchronizer/tests/test_event_engine.py
+++ b/lib/xos-synchronizer/tests/test_event_engine.py
@@ -128,6 +128,8 @@
build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
+ from xossynchronizer.modelaccessor import model_accessor
+
# The test config.yaml references files in `test/` so make sure we're in the parent directory of the
# test directory.
os.chdir(os.path.join(test_path, ".."))
@@ -135,7 +137,7 @@
from xossynchronizer.event_engine import XOSKafkaThread, XOSEventEngine
self.event_steps_dir = Config.get("event_steps_dir")
- self.event_engine = XOSEventEngine(log)
+ self.event_engine = XOSEventEngine(model_accessor=model_accessor, log=log)
def tearDown(self):
sys.path = self.sys_path_save
diff --git a/lib/xos-synchronizer/tests/test_load.py b/lib/xos-synchronizer/tests/test_load.py
index 8f9813d..3802dd7 100644
--- a/lib/xos-synchronizer/tests/test_load.py
+++ b/lib/xos-synchronizer/tests/test_load.py
@@ -52,10 +52,12 @@
import xossynchronizer.backend
reload(xossynchronizer.backend)
- b = xossynchronizer.backend.Backend()
+ from xossynchronizer.modelaccessor import model_accessor
+
+ b = xossynchronizer.backend.Backend(model_accessor=model_accessor)
steps_dir = Config.get("steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps, model_accessor)
def tearDown(self):
sys.path = self.sys_path_save
@@ -92,6 +94,9 @@
self.assertIn(
("ControllerSlice", ["SyncControllerSlices"]), model_to_step.items()
)
+ self.assertIn(
+ ("Port", ["SyncPort"]), model_to_step.items()
+ )
self.assertIn(("SiteRole", ["SyncRoles"]), model_to_step.items())
for k, v in model_to_step.items():
@@ -100,7 +105,13 @@
if not isinstance(observes, list):
observes = [observes]
- observed_names = [o.__name__ for o in observes]
+ observed_names = []
+ for o in observes:
+ if isinstance(o,str):
+ observed_names.append(o)
+ else:
+ observed_names.append(o.__name__)
+
self.assertIn(k, observed_names)
diff --git a/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py b/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py
index fa3c774..b15240d 100644
--- a/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py
+++ b/lib/xos-synchronizer/tests/test_model_policy_tenantwithcontainer.py
@@ -60,12 +60,9 @@
) in xossynchronizer.model_policies.model_policy_tenantwithcontainer.model_accessor.all_model_classes.items():
globals()[k] = v
- # TODO: Mock_model_accessor lacks save or delete methods
- # Instance.save = mock.Mock
- # Instance.delete = mock.Mock
- # TenantWithContainer.save = mock.Mock
+ from xossynchronizer.modelaccessor import model_accessor
- self.policy = TenantWithContainerPolicy()
+ self.policy = TenantWithContainerPolicy(model_accessor=model_accessor)
self.user = User(email="testadmin@test.org")
self.tenant = TenantWithContainer(creator=self.user)
self.flavor = Flavor(name="m1.small")
diff --git a/lib/xos-synchronizer/tests/test_payload.py b/lib/xos-synchronizer/tests/test_payload.py
index 6bd1cfc..cfba52d 100644
--- a/lib/xos-synchronizer/tests/test_payload.py
+++ b/lib/xos-synchronizer/tests/test_payload.py
@@ -99,26 +99,25 @@
# import all class names to globals
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
- b = xossynchronizer.backend.Backend()
+ b = xossynchronizer.backend.Backend(model_accessor = model_accessor)
steps_dir = Config.get("steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps, model_accessor)
def tearDown(self):
sys.path = self.sys_path_save
os.chdir(self.cwd_save)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_delete_record(self, mock_run_template, mock_modelaccessor):
+ def test_delete_record(self, mock_run_template):
with mock.patch.object(Instance, "save") as instance_save:
o = Instance()
o.name = "Sisi Pascal"
- o.synchronizer_step = steps.sync_instances.SyncInstances()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor = self.synchronizer.model_accessor)
self.synchronizer.delete_record(o, log)
a = get_ansible_output()
@@ -126,16 +125,15 @@
o.save.assert_called_with(update_fields=["backend_need_reap"])
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template_fail,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_delete_record_fail(self, mock_run_template, mock_modelaccessor):
+ def test_delete_record_fail(self, mock_run_template):
with mock.patch.object(Instance, "save") as instance_save:
o = Instance()
o.name = "Sisi Pascal"
- o.synchronizer_step = steps.sync_instances.SyncInstances()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor = self.synchronizer.model_accessor)
with self.assertRaises(Exception) as e:
self.synchronizer.delete_record(o, log)
@@ -145,16 +143,15 @@
)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_sync_record(self, mock_run_template, mock_modelaccessor):
+ def test_sync_record(self, mock_run_template):
with mock.patch.object(Instance, "save") as instance_save:
o = Instance()
o.name = "Sisi Pascal"
- o.synchronizer_step = steps.sync_instances.SyncInstances()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor = self.synchronizer.model_accessor)
self.synchronizer.sync_record(o, log)
a = get_ansible_output()
@@ -169,11 +166,10 @@
)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_sync_cohort(self, mock_run_template, mock_modelaccessor):
+ def test_sync_cohort(self, mock_run_template):
with mock.patch.object(Instance, "save") as instance_save, mock.patch.object(
ControllerSlice, "save"
) as controllerslice_save:
@@ -186,8 +182,10 @@
o.slice = s
cohort = [cs, o]
- o.synchronizer_step = steps.sync_instances.SyncInstances()
- cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor = self.synchronizer.model_accessor)
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices(
+ model_accessor = self.synchronizer.model_accessor
+ )
self.synchronizer.sync_cohort(cohort, False)
@@ -211,11 +209,10 @@
)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_deferred_exception(self, mock_run_template, mock_modelaccessor):
+ def test_deferred_exception(self, mock_run_template):
with mock.patch.object(Instance, "save") as instance_save:
cs = ControllerSlice()
s = Slice(name="SP SP")
@@ -227,8 +224,10 @@
o.slice = s
cohort = [cs, o]
- o.synchronizer_step = steps.sync_instances.SyncInstances()
- cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor=self.synchronizer.model_accessor)
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices(
+ model_accessor=self.synchronizer.model_accessor
+ )
self.synchronizer.sync_cohort(cohort, False)
o.save.assert_called_with(
@@ -241,11 +240,10 @@
self.assertIn("Failed due to", o.backend_status)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_backend_status(self, mock_run_template, mock_modelaccessor):
+ def test_backend_status(self, mock_run_template):
with mock.patch.object(Instance, "save") as instance_save:
cs = ControllerSlice()
s = Slice(name="SP SP")
@@ -257,8 +255,9 @@
o.slice = s
cohort = [cs, o]
- o.synchronizer_step = steps.sync_instances.SyncInstances()
- cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor=self.synchronizer.model_accessor)
+ cs.synchronizer_step = steps.sync_controller_slices.SyncControllerSlices(
+ model_accessor=self.synchronizer.model_accessor)
self.synchronizer.sync_cohort(cohort, False)
o.save.assert_called_with(
@@ -269,11 +268,10 @@
self.assertIn("Failed due to", o.backend_status)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_fetch_pending(self, mock_run_template, mock_accessor, *_other_accessors):
+ def test_fetch_pending(self, mock_run_template):
pending_objects, pending_steps = self.synchronizer.fetch_pending()
pending_objects2 = list(pending_objects)
@@ -295,12 +293,11 @@
self.assertEqual(set(flat_objects), set(pending_objects))
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
def test_fetch_pending_with_external_dependencies(
- self, mock_run_template, mock_accessor, *_other_accessors
+ self, mock_run_template,
):
pending_objects, pending_steps = self.synchronizer.fetch_pending()
pending_objects2 = list(pending_objects)
@@ -322,11 +319,10 @@
self.assertIsNotNone(any_user)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_external_dependency_exception(self, mock_run_template, mock_modelaccessor):
+ def test_external_dependency_exception(self, mock_run_template):
cs = ControllerSlice()
s = Slice(name="SP SP")
cs.slice = s
@@ -337,7 +333,7 @@
cohort = [cs, o]
o.synchronizer_step = None
- o.synchronizer_step = steps.sync_instances.SyncInstances()
+ o.synchronizer_step = steps.sync_instances.SyncInstances(model_accessor=self.synchronizer.model_accessor)
self.synchronizer.sync_cohort(cohort, False)
diff --git a/lib/xos-synchronizer/tests/test_run.py b/lib/xos-synchronizer/tests/test_run.py
index f5815f2..65651d9 100644
--- a/lib/xos-synchronizer/tests/test_run.py
+++ b/lib/xos-synchronizer/tests/test_run.py
@@ -14,10 +14,7 @@
import json
import unittest
-from mock import patch
import mock
-import pdb
-import networkx as nx
import os
import sys
@@ -70,10 +67,12 @@
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
- b = xossynchronizer.backend.Backend()
+ from xossynchronizer.modelaccessor import model_accessor
+
+ b = xossynchronizer.backend.Backend(model_accessor=model_accessor)
steps_dir = Config.get("steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps, model_accessor)
try:
os.remove("/tmp/sync_ports")
except OSError:
@@ -88,12 +87,10 @@
os.chdir(self.cwd_save)
@mock.patch(
- "steps.sync_instances.syncstep.run_template",
+ "steps.sync_instances.ansiblesyncstep.run_template",
side_effect=run_fake_ansible_template,
)
- @mock.patch("xossynchronizer.event_loop.model_accessor")
- def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
-
+ def test_run_once(self, mock_run_template):
pending_objects, pending_steps = self.synchronizer.fetch_pending()
pending_objects2 = list(pending_objects)
diff --git a/lib/xos-synchronizer/tests/test_scheduler.py b/lib/xos-synchronizer/tests/test_scheduler.py
index afbf036..12e5ce0 100644
--- a/lib/xos-synchronizer/tests/test_scheduler.py
+++ b/lib/xos-synchronizer/tests/test_scheduler.py
@@ -65,7 +65,7 @@
b = xossynchronizer.backend.Backend()
steps_dir = Config.get("steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps, model_accessor)
def tearDown(self):
sys.path = self.sys_path_save
diff --git a/lib/xos-synchronizer/tests/test_services.py b/lib/xos-synchronizer/tests/test_services.py
index 2456c27..c41ed05 100644
--- a/lib/xos-synchronizer/tests/test_services.py
+++ b/lib/xos-synchronizer/tests/test_services.py
@@ -57,10 +57,10 @@
for (k, v) in model_accessor.all_model_classes.items():
globals()[k] = v
- b = xossynchronizer.backend.Backend()
+ b = xossynchronizer.backend.Backend(model_accessor=model_accessor)
steps_dir = Config.get("steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps)
+ self.synchronizer = xossynchronizer.event_loop.XOSObserver(self.steps, model_accessor)
def tearDown(self):
sys.path = self.sys_path_save
diff --git a/lib/xos-synchronizer/xossynchronizer/backend.py b/lib/xos-synchronizer/xossynchronizer/backend.py
index b404864..d4b7e67 100644
--- a/lib/xos-synchronizer/xossynchronizer/backend.py
+++ b/lib/xos-synchronizer/xossynchronizer/backend.py
@@ -24,7 +24,6 @@
from xossynchronizer.model_policy_loop import XOSPolicyEngine
from xossynchronizer.event_engine import XOSEventEngine
from xossynchronizer.pull_step_engine import XOSPullStepEngine
-from xossynchronizer.modelaccessor import *
from xosconfig import Config
from multistructlog import create_logger
@@ -33,7 +32,8 @@
class Backend:
- def __init__(self, log=log):
+ def __init__(self, model_accessor, log=log):
+ self.model_accessor = model_accessor
self.log = log
def load_sync_step_modules(self, step_dir):
@@ -100,8 +100,8 @@
# if we have at least one sync_step
if len(sync_steps) > 0:
# start the observer
- self.log.info("Starting XOSObserver", sync_steps=sync_steps)
- observer = XOSObserver(sync_steps, self.log)
+ self.log.info("Starting XOSObserver", sync_steps=sync_steps, model_accessor=self.model_accessor)
+ observer = XOSObserver(sync_steps, self.model_accessor, self.log)
observer_thread = threading.Thread(
target=observer.run, name="synchronizer"
)
@@ -113,7 +113,7 @@
pull_steps_dir = Config.get("pull_steps_dir")
if pull_steps_dir:
self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
- pull_steps_engine = XOSPullStepEngine()
+ pull_steps_engine = XOSPullStepEngine(model_accessor=self.model_accessor)
pull_steps_engine.load_pull_step_modules(pull_steps_dir)
pull_steps_thread = threading.Thread(
target=pull_steps_engine.start, name="pull_step_engine"
@@ -125,7 +125,7 @@
event_steps_dir = Config.get("event_steps_dir")
if event_steps_dir:
self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir)
- event_engine = XOSEventEngine(self.log)
+ event_engine = XOSEventEngine(model_accessor=self.model_accessor, log=self.log)
event_engine.load_event_step_modules(event_steps_dir)
event_engine.start()
else:
@@ -134,7 +134,7 @@
# start model policies thread
policies_dir = Config.get("model_policies_dir")
if policies_dir:
- policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log=self.log)
+ policy_engine = XOSPolicyEngine(policies_dir=policies_dir, model_accessor=self.model_accessor, log=self.log)
model_policy_thread = threading.Thread(
target=policy_engine.run, name="policy_engine"
)
diff --git a/lib/xos-synchronizer/xossynchronizer/event_engine.py b/lib/xos-synchronizer/xossynchronizer/event_engine.py
index e5e18d1..694a1a8 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_engine.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_engine.py
@@ -43,11 +43,12 @@
function is called for each event.
"""
- def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
+ def __init__(self, step, bootstrap_servers, model_accessor, log, *args, **kwargs):
super(XOSKafkaThread, self).__init__(*args, **kwargs)
self.consumer = None
self.step = step
self.bootstrap_servers = bootstrap_servers
+ self.model_accessor = model_accessor
self.log = log
self.daemon = True
@@ -129,7 +130,7 @@
)
try:
- self.step(log=self.log).process_event(event_msg)
+ self.step(model_accessor=self.model_accessor, log=self.log).process_event(event_msg)
except BaseException:
self.log.exception(
@@ -151,9 +152,10 @@
will be called before start().
"""
- def __init__(self, log):
+ def __init__(self, model_accessor, log):
self.event_steps = []
self.threads = []
+ self.model_accessor = model_accessor
self.log = log
def load_event_step_modules(self, event_step_dir):
@@ -205,7 +207,7 @@
for step in self.event_steps:
if step.technology == "kafka":
- thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
+ thread = XOSKafkaThread(step, [eventbus_endpoint], self.model_accessor, self.log)
thread.start()
self.threads.append(thread)
else:
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
index 96ce727..bdff10b 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_loop.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -30,7 +30,6 @@
from networkx.algorithms.dag import topological_sort
from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
-from xossynchronizer.modelaccessor import *
from xosconfig import Config
from multistructlog import create_logger
@@ -71,9 +70,9 @@
class XOSObserver(object):
sync_steps = []
- def __init__(self, sync_steps, log=log):
- # The Condition object via which events are received
+ def __init__(self, sync_steps, model_accessor, log=log):
self.log = log
+ self.model_accessor = model_accessor
self.step_lookup = {}
self.sync_steps = sync_steps
@@ -153,9 +152,13 @@
observes = [s.observes]
else:
observes = s.observes
-
for m in observes:
- model_to_step[m.__name__].append(s.__name__)
+ if isinstance(m, str):
+ # observes is a string that names the model
+ model_to_step[m].append(s.__name__)
+ else:
+ # observes is the model class
+ model_to_step[m.__name__].append(s.__name__)
try:
external_dependencies.extend(s.external_dependencies)
@@ -173,7 +176,7 @@
def reset_model_accessor(self, o=None):
try:
- model_accessor.reset_queries()
+ self.model_accessor.reset_queries()
except BaseException:
# this shouldn't happen, but in case it does, catch it...
if o:
@@ -190,13 +193,13 @@
if getattr(o, "backend_need_reap", False):
# the object has already been deleted and marked for reaping
- model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
+ self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
else:
step = getattr(o, "synchronizer_step", None)
if not step:
raise ExternalDependencyFailed
- model_accessor.journal_object(o, "syncstep.call.delete_record")
+ self.model_accessor.journal_object(o, "syncstep.call.delete_record")
dr_log.debug("Deleting object", **o.tologdict())
@@ -206,7 +209,7 @@
dr_log.debug("Deleted object", **o.tologdict())
- model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+ self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=["backend_need_reap"])
@@ -228,7 +231,7 @@
o.backend_need_delete = True
o.save(update_fields=["backend_need_delete"])
- model_accessor.journal_object(o, "syncstep.call.sync_record")
+ self.model_accessor.journal_object(o, "syncstep.call.sync_record")
sr_log.debug("Syncing object", **o.tologdict())
@@ -243,7 +246,7 @@
o.backend_register = json.dumps(scratchpad)
o.backend_status = "OK"
o.backend_code = 1
- model_accessor.journal_object(o, "syncstep.call.save_update")
+ self.model_accessor.journal_object(o, "syncstep.call.save_update")
o.save(
update_fields=[
"enacted",
@@ -339,7 +342,7 @@
# TOFIX:
# DatabaseError: value too long for type character varying(140)
- if model_accessor.obj_exists(o):
+ if self.model_accessor.obj_exists(o):
try:
o.backend_status = o.backend_status[:1024]
o.save(
@@ -397,16 +400,16 @@
cohort_emptied = True
finally:
self.reset_model_accessor()
- model_accessor.connection_close()
+ self.model_accessor.connection_close()
def tenant_class_name_from_service(self, service_name):
""" This code supports legacy functionality. To be cleaned up. """
name1 = service_name + "Instance"
- if hasattr(Slice().stub, name1):
+ if hasattr(self.model_accessor.Slice().stub, name1):
return name1
else:
name2 = service_name.replace("Service", "Tenant")
- if hasattr(Slice().stub, name2):
+ if hasattr(self.model_accessor.Slice().stub, name2):
return name2
else:
return None
@@ -427,14 +430,14 @@
s_model_names = [v for k, v in ugly_tuples]
s_models0 = [
- getattr(Slice().stub, model_name, None) for model_name in s_model_names
+ getattr(self.model_accessor.Slice().stub, model_name, None) for model_name in s_model_names
]
s_models1 = [model.objects.first() for model in s_models0]
s_models = [m for m in s_models1 if m is not None]
dependencies = []
for model in s_models:
- deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
+ deps = self.model_accessor.ServiceDependency.objects.filter(subscriber_service_id=model.id)
if deps:
services = [
self.tenant_class_name_from_service(
@@ -450,7 +453,7 @@
def compute_service_instance_dependencies(self, objects):
link_set = [
- ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
+ self.model_accessor.ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
for o in objects
]
@@ -490,17 +493,22 @@
for e in self.external_dependencies:
s = SyncStep
- s.observes = e
+ if isinstance(e,str):
+ # external dependency is a string that names a model class
+ s.observes = self.model_accessor.get_model_class(e)
+ else:
+ # external dependency is a model class
+ s.observes = e
step_list.append(s)
for step_class in step_list:
- step = step_class(driver=self.driver)
+ step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
if not hasattr(step, "call"):
pending = step.fetch_pending(deletion)
for obj in pending:
- step = step_class(driver=self.driver)
+ step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
obj.synchronizer_step = step
@@ -719,7 +727,7 @@
try:
# Why are we checking the DB connection here?
- model_accessor.check_db_connection_okay()
+ self.model_accessor.check_db_connection_okay()
loop_start = time.time()
diff --git a/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py b/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
index 9596248..84ea74b 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
@@ -25,12 +25,15 @@
topics = []
pattern = None
- def __init__(self, log, **kwargs):
+ def __init__(self, model_accessor, log, **kwargs):
"""
Initialize a pull step. Override this function to include any initialization. Make sure to call the original
__init__() from your method.
"""
+ # self.model_accessor can be used to create and query models
+ self.model_accessor = model_accessor
+
# self.log can be used to emit logging messages.
self.log = log
diff --git a/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py b/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
index 5c389cf..99b2d46 100644
--- a/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
+++ b/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
@@ -28,6 +28,12 @@
def build_mock_modelaccessor(
dest_dir, xos_dir, services_dir, service_xprotos, target="mock_classes.xtarget"
):
+ # TODO: deprecate the dest_dir argument
+
+ # force modelaccessor to be found in /tmp
+ dest_dir="/tmp/mock_modelaccessor"
+ if not os.path.exists(dest_dir):
+ os.makedirs(dest_dir)
dest_fn = os.path.join(dest_dir, "mock_modelaccessor.py")
args = ["xosgenx", "--target", target]
@@ -69,3 +75,38 @@
# Save the context of this invocation of xosgenx
open(context_fn, "w").write(cPickle.dumps(this_context))
+
+# generate model from xproto
+def get_models_fn(services_dir, service_name, xproto_name):
+ name = os.path.join(service_name, "xos", xproto_name)
+ if os.path.exists(os.path.join(services_dir, name)):
+ return name
+ else:
+ name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
+ if os.path.exists(os.path.join(services_dir, name)):
+ return name
+ raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
+# END generate model from xproto
+
+def mock_modelaccessor_config(test_dir, services):
+ """ Automatically configure the mock modelaccessor.
+
+ We start from the test directory, and then back up until we find the orchestration directory. From there we
+ can find the other xproto (core, services) that we need to build the mock modelaccessor.
+ """
+
+ orchestration_dir = test_dir
+ while not orchestration_dir.endswith("orchestration"):
+ # back up a level
+ orchestration_dir = os.path.dirname(orchestration_dir)
+ if len(orchestration_dir)<10:
+ raise Exception("Failed to autodiscovery repository tree")
+
+ xos_dir = os.path.join(orchestration_dir, "xos", "xos")
+ services_dir = os.path.join(orchestration_dir, "xos_services")
+
+ service_xprotos=[]
+ for (service_name, xproto_name) in services:
+ service_xprotos.append(get_models_fn(services_dir, service_name, xproto_name))
+
+ build_mock_modelaccessor(None, xos_dir, services_dir, service_xprotos)
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py b/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
index b455c79..5877279 100644
--- a/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
+++ b/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
@@ -36,5 +36,6 @@
handle_delete ... called when a model is deleted
"""
- def __init__(self):
+ def __init__(self, model_accessor):
+ self.model_accessor = model_accessor
self.logger = log
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py b/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
index c23e47c..20144a5 100644
--- a/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
+++ b/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
@@ -14,18 +14,18 @@
from __future__ import print_function
-from xossynchronizer.modelaccessor import *
from xossynchronizer.dependency_walker_new import *
-from xossynchronizer.policy import Policy
+from xossynchronizer.model_policies.policy import Policy
import imp
-import pdb
+import inspect
import time
import traceback
class XOSPolicyEngine(object):
- def __init__(self, policies_dir, log):
+ def __init__(self, policies_dir, model_accessor, log):
+ self.model_accessor = model_accessor
self.model_policies = self.load_model_policies(policies_dir)
self.policies_by_name = {}
self.policies_by_class = {}
@@ -94,26 +94,28 @@
# provides field (this eliminates the abstract base classes
# since they don't have a provides)
- if (
- inspect.isclass(c)
- and issubclass(c, Policy)
- and hasattr(c, "model_name")
- and (c not in policies)
- ):
- if not c.model_name:
- log.info(
- "load_model_policies: skipping model policy",
- classname=classname,
- )
- continue
- if not model_accessor.has_model_class(c.model_name):
- log.error(
- "load_model_policies: unable to find model policy",
- classname=classname,
- model=c.model_name,
- )
- c.model = model_accessor.get_model_class(c.model_name)
- policies.append(c)
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+
+ if (
+ "Policy" in base_names
+ and hasattr(c, "model_name")
+ and (c not in policies)
+ ):
+ if not c.model_name:
+ log.info(
+ "load_model_policies: skipping model policy",
+ classname=classname,
+ )
+ continue
+ if not self.model_accessor.has_model_class(c.model_name):
+ log.error(
+ "load_model_policies: unable to find model policy",
+ classname=classname,
+ model=c.model_name,
+ )
+ c.model = self.model_accessor.get_model_class(c.model_name)
+ policies.append(c)
log.info("Loaded model policies", policies=policies)
return policies
@@ -141,7 +143,7 @@
policy=policy.__name__,
method=method_name,
)
- getattr(policy(), method_name)(instance)
+ getattr(policy(model_accessor=self.model_accessor), method_name)(instance)
log.debug(
"MODEL POLICY: completed handler",
sender_name=sender_name,
@@ -171,7 +173,7 @@
instance.save(update_fields=["policed", "policy_status", "policy_code"])
if hasattr(policy, "after_policy_save"):
- policy().after_policy_save(instance)
+ policy(model_accessor=self.model_accessor).after_policy_save(instance)
log.info("MODEL_POLICY: Saved", o=instance)
except BaseException:
@@ -199,10 +201,10 @@
def run_policy_once(self):
models = self.policies_by_class.keys()
- model_accessor.check_db_connection_okay()
+ self.model_accessor.check_db_connection_okay()
- objects = model_accessor.fetch_policies(models, False)
- deleted_objects = model_accessor.fetch_policies(models, True)
+ objects = self.model_accessor.fetch_policies(models, False)
+ deleted_objects = self.model_accessor.fetch_policies(models, True)
for o in objects:
if o.deleted:
@@ -217,7 +219,7 @@
self.execute_model_policy(o, "delete")
try:
- model_accessor.reset_queries()
+ self.model_accessor.reset_queries()
except Exception as e:
# this shouldn't happen, but in case it does, catch it...
log.exception("MODEL POLICY: exception in reset_queries", e)
diff --git a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
index 6084579..1418cc6 100644
--- a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
+++ b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
@@ -285,6 +285,11 @@
def config_accessor_mock():
global model_accessor
+
+ # the mock model accessor always gets built to a temporary location
+ if not "/tmp/mock_modelaccessor" in sys.path:
+ sys.path.append("/tmp/mock_modelaccessor")
+
from mock_modelaccessor import model_accessor as mock_model_accessor
model_accessor = mock_model_accessor
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
index 3f4732d..ece77bc 100644
--- a/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
+++ b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
@@ -30,8 +30,9 @@
The thread's pull_records() function is called for every five seconds.
"""
- def __init__(self, steps, *args, **kwargs):
+ def __init__(self, steps, model_accessor, *args, **kwargs):
self.steps = steps
+ self.model_accessor = model_accessor
def run(self):
while True:
@@ -43,7 +44,7 @@
threads = []
for step in self.steps:
- thread = threading.Thread(target=step().pull_records, name="pull_step")
+ thread = threading.Thread(target=step(model_accessor=self.model_accessor).pull_records, name="pull_step")
threads.append(thread)
for t in threads:
@@ -67,7 +68,8 @@
will be called before start().
"""
- def __init__(self):
+ def __init__(self, model_accessor):
+ self.model_accessor = model_accessor
self.pull_steps = []
def load_pull_step_modules(self, pull_step_dir):
@@ -98,5 +100,5 @@
log.info("Starting pull steps engine", steps=self.pull_steps)
for step in self.pull_steps:
- sched = XOSPullStepScheduler(steps=self.pull_steps)
+ sched = XOSPullStepScheduler(steps=self.pull_steps, model_accessor=self.model_accessor)
sched.run()
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py b/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
index adbc0b1..290ab92 100644
--- a/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
+++ b/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
@@ -23,8 +23,10 @@
Initialize a pull step
:param kwargs:
-- observed_model: name of the model that is being polled
+ -- model_accessor: used to create and update models
"""
self.observed_model = kwargs.get("observed_model")
+ self.model_accessor = kwargs.get("model_accessor")
def pull_records(self):
self.log.debug(
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py b/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
index 6ed656c..1bc54ce 100644
--- a/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
+++ b/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
@@ -23,7 +23,6 @@
from xossynchronizer.steps.syncstep import SyncStep, DeferredException
from xossynchronizer.ansible_helper import run_template_ssh
-from xossynchronizer.modelaccessor import *
class SyncInstanceUsingAnsible(SyncStep):
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/ansiblesyncstep.py b/lib/xos-synchronizer/xossynchronizer/steps/ansiblesyncstep.py
new file mode 100644
index 0000000..116f8c2
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/steps/ansiblesyncstep.py
@@ -0,0 +1,60 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.ansible_helper import run_template
+from syncstep import SyncStep
+
+class AnsibleSyncStep(SyncStep):
+ def sync_record(self, o):
+ self.log.debug("In default sync record", **o.tologdict())
+
+ tenant_fields = self.map_sync_inputs(o)
+ if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
+ return
+
+ main_obj = self.observes_classes[0]
+
+ path = "".join(main_obj.__name__).lower()
+ res = run_template(self.playbook, tenant_fields, path=path, object=o)
+
+ if hasattr(self, "map_sync_outputs"):
+ self.map_sync_outputs(o, res)
+
+ self.log.debug("Finished default sync record", **o.tologdict())
+
+ def delete_record(self, o):
+ self.log.debug("In default delete record", **o.tologdict())
+
+ # If there is no map_delete_inputs, then assume deleting a record is a no-op.
+ if not hasattr(self, "map_delete_inputs"):
+ return
+
+ tenant_fields = self.map_delete_inputs(o)
+
+ main_obj = self.observes_classes[0]
+
+ path = "".join(main_obj.__name__).lower()
+
+ tenant_fields["delete"] = True
+ res = run_template(self.playbook, tenant_fields, path=path)
+
+ if hasattr(self, "map_delete_outputs"):
+ self.map_delete_outputs(o, res)
+ else:
+ # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
+ # of 0.
+ if res[0].get("rc", 0) != 0:
+ raise Exception("Nonzero rc from Ansible during delete_record")
+
+ self.log.debug("Finished default delete record", **o.tologdict())
\ No newline at end of file
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py b/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py
deleted file mode 100644
index 1fb5894..0000000
--- a/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from synchronizers.new_base.syncstep import *
-
-
-class SyncObject(SyncStep):
- provides = [] # Caller fills this in
- requested_interval = 0
- observes = [] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException("Waiting for Service dependency: %r" % r)
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py b/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
index 2f31e3e..7644822 100644
--- a/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
+++ b/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
@@ -13,19 +13,6 @@
# limitations under the License.
-import os
-import base64
-
-from xosconfig import Config
-from xossynchronizer.modelaccessor import *
-from xossynchronizer.ansible_helper import run_template
-
-# from tests.steps.mock_modelaccessor import model_accessor
-
-import json
-import time
-import pdb
-
from xosconfig import Config
from functools import reduce
@@ -86,13 +73,16 @@
def __init__(self, **args):
"""Initialize a sync step
Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
+ model_accessor: class used to access models
+ driver: used by openstack synchronizer (DEPRECATED)
+ error_map: used by openstack synchronizer (DEPRECATED)
"""
- dependencies = []
+ self.model_accessor = args.get("model_accessor")
self.driver = args.get("driver")
self.error_map = args.get("error_map")
+ assert self.model_accessor is not None
+
try:
self.soft_deadline = int(self.get_prop("soft_deadline_seconds"))
except BaseException:
@@ -103,56 +93,40 @@
return
+ @property
+ def observes_classes(self):
+ """ Return a list of classes that this syncstep observes. The "observes" class member can be either a list of
+ items or a single item. Those items may be either classes or names of classes. This function always returns
+ a list of classes.
+ """
+ if not self.observes:
+ return []
+ if isinstance(self.observes, list):
+ observes = self.observes
+ else:
+ observes = [self.observes]
+ result = []
+ for class_or_name in observes:
+ if isinstance(class_or_name, str):
+ result.append(self.model_accessor.get_model_class(class_or_name))
+ else:
+ result.append(class_or_name)
+ return result
+
+
def fetch_pending(self, deletion=False):
# This is the most common implementation of fetch_pending
# Steps should override it if they have their own logic
# for figuring out what objects are outstanding.
- return model_accessor.fetch_pending(self.observes, deletion)
+ return self.model_accessor.fetch_pending(self.observes_classes, deletion)
+
def sync_record(self, o):
- self.log.debug("In default sync record", **o.tologdict())
+ self.log.debug("In abstract sync record", **o.tologdict())
+ # This method should be overridden by the service
- tenant_fields = self.map_sync_inputs(o)
- if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
- return
-
- main_objs = self.observes
- if isinstance(main_objs, list):
- main_objs = main_objs[0]
-
- path = "".join(main_objs.__name__).lower()
- res = run_template(self.playbook, tenant_fields, path=path, object=o)
-
- if hasattr(self, "map_sync_outputs"):
- self.map_sync_outputs(o, res)
-
- self.log.debug("Finished default sync record", **o.tologdict())
def delete_record(self, o):
- self.log.debug("In default delete record", **o.tologdict())
-
- # If there is no map_delete_inputs, then assume deleting a record is a no-op.
- if not hasattr(self, "map_delete_inputs"):
- return
-
- tenant_fields = self.map_delete_inputs(o)
-
- main_objs = self.observes
- if isinstance(main_objs, list):
- main_objs = main_objs[0]
-
- path = "".join(main_objs.__name__).lower()
-
- tenant_fields["delete"] = True
- res = run_template(self.playbook, tenant_fields, path=path)
-
- if hasattr(self, "map_delete_outputs"):
- self.map_delete_outputs(o, res)
- else:
- # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
- # of 0.
- if res[0].get("rc", 0) != 0:
- raise Exception("Nonzero rc from Ansible during delete_record")
-
- self.log.debug("Finished default delete record", **o.tologdict())
+ self.log.debug("In abstract delete record", **o.tologdict())
+ # This method should be overridden by the service
diff --git a/lib/xos-synchronizer/xossynchronizer/synchronizer.py b/lib/xos-synchronizer/xossynchronizer/synchronizer.py
index 9a530d7..30d3ca0 100644
--- a/lib/xos-synchronizer/xossynchronizer/synchronizer.py
+++ b/lib/xos-synchronizer/xossynchronizer/synchronizer.py
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import sys
import time
from xosconfig import Config
@@ -60,7 +58,7 @@
from backend import Backend
log_closure = self.log.bind(synchronizer_name=Config().get("name"))
- backend = Backend(log=log_closure)
+ backend = Backend(log=log_closure, model_accessor=self.model_accessor)
backend.run()