SEBA-405 Cleanup synchronizer imports of model_accessor to globals;
Move mock modelaccessor to /tmp;
Easier mock modelaccessor configuration
Change-Id: I67a17b9a72ea69f61d92206f1b520a11c2f18d80
diff --git a/lib/xos-synchronizer/xossynchronizer/backend.py b/lib/xos-synchronizer/xossynchronizer/backend.py
index b404864..d4b7e67 100644
--- a/lib/xos-synchronizer/xossynchronizer/backend.py
+++ b/lib/xos-synchronizer/xossynchronizer/backend.py
@@ -24,7 +24,6 @@
from xossynchronizer.model_policy_loop import XOSPolicyEngine
from xossynchronizer.event_engine import XOSEventEngine
from xossynchronizer.pull_step_engine import XOSPullStepEngine
-from xossynchronizer.modelaccessor import *
from xosconfig import Config
from multistructlog import create_logger
@@ -33,7 +32,8 @@
class Backend:
- def __init__(self, log=log):
+ def __init__(self, model_accessor, log=log):
+ self.model_accessor = model_accessor
self.log = log
def load_sync_step_modules(self, step_dir):
@@ -100,8 +100,8 @@
# if we have at least one sync_step
if len(sync_steps) > 0:
# start the observer
- self.log.info("Starting XOSObserver", sync_steps=sync_steps)
- observer = XOSObserver(sync_steps, self.log)
+ self.log.info("Starting XOSObserver", sync_steps=sync_steps, model_accessor=self.model_accessor)
+ observer = XOSObserver(sync_steps, self.model_accessor, self.log)
observer_thread = threading.Thread(
target=observer.run, name="synchronizer"
)
@@ -113,7 +113,7 @@
pull_steps_dir = Config.get("pull_steps_dir")
if pull_steps_dir:
self.log.info("Starting XOSPullStepEngine", pull_steps_dir=pull_steps_dir)
- pull_steps_engine = XOSPullStepEngine()
+ pull_steps_engine = XOSPullStepEngine(model_accessor=self.model_accessor)
pull_steps_engine.load_pull_step_modules(pull_steps_dir)
pull_steps_thread = threading.Thread(
target=pull_steps_engine.start, name="pull_step_engine"
@@ -125,7 +125,7 @@
event_steps_dir = Config.get("event_steps_dir")
if event_steps_dir:
self.log.info("Starting XOSEventEngine", event_steps_dir=event_steps_dir)
- event_engine = XOSEventEngine(self.log)
+ event_engine = XOSEventEngine(model_accessor=self.model_accessor, log=self.log)
event_engine.load_event_step_modules(event_steps_dir)
event_engine.start()
else:
@@ -134,7 +134,7 @@
# start model policies thread
policies_dir = Config.get("model_policies_dir")
if policies_dir:
- policy_engine = XOSPolicyEngine(policies_dir=policies_dir, log=self.log)
+ policy_engine = XOSPolicyEngine(policies_dir=policies_dir, model_accessor=self.model_accessor, log=self.log)
model_policy_thread = threading.Thread(
target=policy_engine.run, name="policy_engine"
)
diff --git a/lib/xos-synchronizer/xossynchronizer/event_engine.py b/lib/xos-synchronizer/xossynchronizer/event_engine.py
index e5e18d1..694a1a8 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_engine.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_engine.py
@@ -43,11 +43,12 @@
function is called for each event.
"""
- def __init__(self, step, bootstrap_servers, log, *args, **kwargs):
+ def __init__(self, step, bootstrap_servers, model_accessor, log, *args, **kwargs):
super(XOSKafkaThread, self).__init__(*args, **kwargs)
self.consumer = None
self.step = step
self.bootstrap_servers = bootstrap_servers
+ self.model_accessor = model_accessor
self.log = log
self.daemon = True
@@ -129,7 +130,7 @@
)
try:
- self.step(log=self.log).process_event(event_msg)
+ self.step(model_accessor=self.model_accessor, log=self.log).process_event(event_msg)
except BaseException:
self.log.exception(
@@ -151,9 +152,10 @@
will be called before start().
"""
- def __init__(self, log):
+ def __init__(self, model_accessor, log):
self.event_steps = []
self.threads = []
+ self.model_accessor = model_accessor
self.log = log
def load_event_step_modules(self, event_step_dir):
@@ -205,7 +207,7 @@
for step in self.event_steps:
if step.technology == "kafka":
- thread = XOSKafkaThread(step, [eventbus_endpoint], self.log)
+ thread = XOSKafkaThread(step, [eventbus_endpoint], self.model_accessor, self.log)
thread.start()
self.threads.append(thread)
else:
diff --git a/lib/xos-synchronizer/xossynchronizer/event_loop.py b/lib/xos-synchronizer/xossynchronizer/event_loop.py
index 96ce727..bdff10b 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_loop.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_loop.py
@@ -30,7 +30,6 @@
from networkx.algorithms.dag import topological_sort
from xossynchronizer.steps.syncstep import InnocuousException, DeferredException, SyncStep
-from xossynchronizer.modelaccessor import *
from xosconfig import Config
from multistructlog import create_logger
@@ -71,9 +70,9 @@
class XOSObserver(object):
sync_steps = []
- def __init__(self, sync_steps, log=log):
- # The Condition object via which events are received
+ def __init__(self, sync_steps, model_accessor, log=log):
self.log = log
+ self.model_accessor = model_accessor
self.step_lookup = {}
self.sync_steps = sync_steps
@@ -153,9 +152,13 @@
observes = [s.observes]
else:
observes = s.observes
-
for m in observes:
- model_to_step[m.__name__].append(s.__name__)
+ if isinstance(m, str):
+ # observes is a string that names the model
+ model_to_step[m].append(s.__name__)
+ else:
+ # observes is the model class
+ model_to_step[m.__name__].append(s.__name__)
try:
external_dependencies.extend(s.external_dependencies)
@@ -173,7 +176,7 @@
def reset_model_accessor(self, o=None):
try:
- model_accessor.reset_queries()
+ self.model_accessor.reset_queries()
except BaseException:
# this shouldn't happen, but in case it does, catch it...
if o:
@@ -190,13 +193,13 @@
if getattr(o, "backend_need_reap", False):
# the object has already been deleted and marked for reaping
- model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
+ self.model_accessor.journal_object(o, "syncstep.call.already_marked_reap")
else:
step = getattr(o, "synchronizer_step", None)
if not step:
raise ExternalDependencyFailed
- model_accessor.journal_object(o, "syncstep.call.delete_record")
+ self.model_accessor.journal_object(o, "syncstep.call.delete_record")
dr_log.debug("Deleting object", **o.tologdict())
@@ -206,7 +209,7 @@
dr_log.debug("Deleted object", **o.tologdict())
- model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
+ self.model_accessor.journal_object(o, "syncstep.call.delete_set_reap")
o.backend_need_reap = True
o.save(update_fields=["backend_need_reap"])
@@ -228,7 +231,7 @@
o.backend_need_delete = True
o.save(update_fields=["backend_need_delete"])
- model_accessor.journal_object(o, "syncstep.call.sync_record")
+ self.model_accessor.journal_object(o, "syncstep.call.sync_record")
sr_log.debug("Syncing object", **o.tologdict())
@@ -243,7 +246,7 @@
o.backend_register = json.dumps(scratchpad)
o.backend_status = "OK"
o.backend_code = 1
- model_accessor.journal_object(o, "syncstep.call.save_update")
+ self.model_accessor.journal_object(o, "syncstep.call.save_update")
o.save(
update_fields=[
"enacted",
@@ -339,7 +342,7 @@
# TOFIX:
# DatabaseError: value too long for type character varying(140)
- if model_accessor.obj_exists(o):
+ if self.model_accessor.obj_exists(o):
try:
o.backend_status = o.backend_status[:1024]
o.save(
@@ -397,16 +400,16 @@
cohort_emptied = True
finally:
self.reset_model_accessor()
- model_accessor.connection_close()
+ self.model_accessor.connection_close()
def tenant_class_name_from_service(self, service_name):
""" This code supports legacy functionality. To be cleaned up. """
name1 = service_name + "Instance"
- if hasattr(Slice().stub, name1):
+ if hasattr(self.model_accessor.Slice().stub, name1):
return name1
else:
name2 = service_name.replace("Service", "Tenant")
- if hasattr(Slice().stub, name2):
+ if hasattr(self.model_accessor.Slice().stub, name2):
return name2
else:
return None
@@ -427,14 +430,14 @@
s_model_names = [v for k, v in ugly_tuples]
s_models0 = [
- getattr(Slice().stub, model_name, None) for model_name in s_model_names
+ getattr(self.model_accessor.Slice().stub, model_name, None) for model_name in s_model_names
]
s_models1 = [model.objects.first() for model in s_models0]
s_models = [m for m in s_models1 if m is not None]
dependencies = []
for model in s_models:
- deps = ServiceDependency.objects.filter(subscriber_service_id=model.id)
+ deps = self.model_accessor.ServiceDependency.objects.filter(subscriber_service_id=model.id)
if deps:
services = [
self.tenant_class_name_from_service(
@@ -450,7 +453,7 @@
def compute_service_instance_dependencies(self, objects):
link_set = [
- ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
+ self.model_accessor.ServiceInstanceLink.objects.filter(subscriber_service_instance_id=o.id)
for o in objects
]
@@ -490,17 +493,22 @@
for e in self.external_dependencies:
s = SyncStep
- s.observes = e
+ if isinstance(e,str):
+ # external dependency is a string that names a model class
+ s.observes = self.model_accessor.get_model_class(e)
+ else:
+ # external dependency is a model class
+ s.observes = e
step_list.append(s)
for step_class in step_list:
- step = step_class(driver=self.driver)
+ step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
if not hasattr(step, "call"):
pending = step.fetch_pending(deletion)
for obj in pending:
- step = step_class(driver=self.driver)
+ step = step_class(driver=self.driver, model_accessor=self.model_accessor)
step.log = self.log.new(step=step)
obj.synchronizer_step = step
@@ -719,7 +727,7 @@
try:
# Why are we checking the DB connection here?
- model_accessor.check_db_connection_okay()
+ self.model_accessor.check_db_connection_okay()
loop_start = time.time()
diff --git a/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py b/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
index 9596248..84ea74b 100644
--- a/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
+++ b/lib/xos-synchronizer/xossynchronizer/event_steps/eventstep.py
@@ -25,12 +25,15 @@
topics = []
pattern = None
- def __init__(self, log, **kwargs):
+ def __init__(self, model_accessor, log, **kwargs):
"""
Initialize a pull step. Override this function to include any initialization. Make sure to call the original
__init__() from your method.
"""
+ # self.model_accessor can be used to create and query models
+ self.model_accessor = model_accessor
+
# self.log can be used to emit logging messages.
self.log = log
diff --git a/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py b/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
index 5c389cf..99b2d46 100644
--- a/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
+++ b/lib/xos-synchronizer/xossynchronizer/mock_modelaccessor_build.py
@@ -28,6 +28,12 @@
def build_mock_modelaccessor(
dest_dir, xos_dir, services_dir, service_xprotos, target="mock_classes.xtarget"
):
+ # TODO: deprecate the dest_dir argument
+
+ # force modelaccessor to be found in /tmp
+ dest_dir="/tmp/mock_modelaccessor"
+ if not os.path.exists(dest_dir):
+ os.makedirs(dest_dir)
dest_fn = os.path.join(dest_dir, "mock_modelaccessor.py")
args = ["xosgenx", "--target", target]
@@ -69,3 +75,38 @@
# Save the context of this invocation of xosgenx
open(context_fn, "w").write(cPickle.dumps(this_context))
+
+# generate model from xproto
+def get_models_fn(services_dir, service_name, xproto_name):
+ name = os.path.join(service_name, "xos", xproto_name)
+ if os.path.exists(os.path.join(services_dir, name)):
+ return name
+ else:
+ name = os.path.join(service_name, "xos", "synchronizer", "models", xproto_name)
+ if os.path.exists(os.path.join(services_dir, name)):
+ return name
+ raise Exception("Unable to find service=%s xproto=%s" % (service_name, xproto_name))
+# END generate model from xproto
+
+def mock_modelaccessor_config(test_dir, services):
+ """ Automatically configure the mock modelaccessor.
+
+ We start from the test directory, and then back up until we find the orchestration directory. From there we
+ can find the other xproto (core, services) that we need to build the mock modelaccessor.
+ """
+
+ orchestration_dir = test_dir
+ while not orchestration_dir.endswith("orchestration"):
+ # back up a level
+ orchestration_dir = os.path.dirname(orchestration_dir)
+ if len(orchestration_dir)<10:
+ raise Exception("Failed to autodiscovery repository tree")
+
+ xos_dir = os.path.join(orchestration_dir, "xos", "xos")
+ services_dir = os.path.join(orchestration_dir, "xos_services")
+
+ service_xprotos=[]
+ for (service_name, xproto_name) in services:
+ service_xprotos.append(get_models_fn(services_dir, service_name, xproto_name))
+
+ build_mock_modelaccessor(None, xos_dir, services_dir, service_xprotos)
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py b/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
index b455c79..5877279 100644
--- a/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
+++ b/lib/xos-synchronizer/xossynchronizer/model_policies/policy.py
@@ -36,5 +36,6 @@
handle_delete ... called when a model is deleted
"""
- def __init__(self):
+ def __init__(self, model_accessor):
+ self.model_accessor = model_accessor
self.logger = log
diff --git a/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py b/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
index c23e47c..20144a5 100644
--- a/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
+++ b/lib/xos-synchronizer/xossynchronizer/model_policy_loop.py
@@ -14,18 +14,18 @@
from __future__ import print_function
-from xossynchronizer.modelaccessor import *
from xossynchronizer.dependency_walker_new import *
-from xossynchronizer.policy import Policy
+from xossynchronizer.model_policies.policy import Policy
import imp
-import pdb
+import inspect
import time
import traceback
class XOSPolicyEngine(object):
- def __init__(self, policies_dir, log):
+ def __init__(self, policies_dir, model_accessor, log):
+ self.model_accessor = model_accessor
self.model_policies = self.load_model_policies(policies_dir)
self.policies_by_name = {}
self.policies_by_class = {}
@@ -94,26 +94,28 @@
# provides field (this eliminates the abstract base classes
# since they don't have a provides)
- if (
- inspect.isclass(c)
- and issubclass(c, Policy)
- and hasattr(c, "model_name")
- and (c not in policies)
- ):
- if not c.model_name:
- log.info(
- "load_model_policies: skipping model policy",
- classname=classname,
- )
- continue
- if not model_accessor.has_model_class(c.model_name):
- log.error(
- "load_model_policies: unable to find model policy",
- classname=classname,
- model=c.model_name,
- )
- c.model = model_accessor.get_model_class(c.model_name)
- policies.append(c)
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+
+ if (
+ "Policy" in base_names
+ and hasattr(c, "model_name")
+ and (c not in policies)
+ ):
+ if not c.model_name:
+ log.info(
+ "load_model_policies: skipping model policy",
+ classname=classname,
+ )
+ continue
+ if not self.model_accessor.has_model_class(c.model_name):
+ log.error(
+ "load_model_policies: unable to find model policy",
+ classname=classname,
+ model=c.model_name,
+ )
+ c.model = self.model_accessor.get_model_class(c.model_name)
+ policies.append(c)
log.info("Loaded model policies", policies=policies)
return policies
@@ -141,7 +143,7 @@
policy=policy.__name__,
method=method_name,
)
- getattr(policy(), method_name)(instance)
+ getattr(policy(model_accessor=self.model_accessor), method_name)(instance)
log.debug(
"MODEL POLICY: completed handler",
sender_name=sender_name,
@@ -171,7 +173,7 @@
instance.save(update_fields=["policed", "policy_status", "policy_code"])
if hasattr(policy, "after_policy_save"):
- policy().after_policy_save(instance)
+ policy(model_accessor=self.model_accessor).after_policy_save(instance)
log.info("MODEL_POLICY: Saved", o=instance)
except BaseException:
@@ -199,10 +201,10 @@
def run_policy_once(self):
models = self.policies_by_class.keys()
- model_accessor.check_db_connection_okay()
+ self.model_accessor.check_db_connection_okay()
- objects = model_accessor.fetch_policies(models, False)
- deleted_objects = model_accessor.fetch_policies(models, True)
+ objects = self.model_accessor.fetch_policies(models, False)
+ deleted_objects = self.model_accessor.fetch_policies(models, True)
for o in objects:
if o.deleted:
@@ -217,7 +219,7 @@
self.execute_model_policy(o, "delete")
try:
- model_accessor.reset_queries()
+ self.model_accessor.reset_queries()
except Exception as e:
# this shouldn't happen, but in case it does, catch it...
log.exception("MODEL POLICY: exception in reset_queries", e)
diff --git a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
index 6084579..1418cc6 100644
--- a/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
+++ b/lib/xos-synchronizer/xossynchronizer/modelaccessor.py
@@ -285,6 +285,11 @@
def config_accessor_mock():
global model_accessor
+
+ # the mock model accessor always gets built to a temporary location
+ if not "/tmp/mock_modelaccessor" in sys.path:
+ sys.path.append("/tmp/mock_modelaccessor")
+
from mock_modelaccessor import model_accessor as mock_model_accessor
model_accessor = mock_model_accessor
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
index 3f4732d..ece77bc 100644
--- a/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
+++ b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
@@ -30,8 +30,9 @@
The thread's pull_records() function is called for every five seconds.
"""
- def __init__(self, steps, *args, **kwargs):
+ def __init__(self, steps, model_accessor, *args, **kwargs):
self.steps = steps
+ self.model_accessor = model_accessor
def run(self):
while True:
@@ -43,7 +44,7 @@
threads = []
for step in self.steps:
- thread = threading.Thread(target=step().pull_records, name="pull_step")
+ thread = threading.Thread(target=step(model_accessor=self.model_accessor).pull_records, name="pull_step")
threads.append(thread)
for t in threads:
@@ -67,7 +68,8 @@
will be called before start().
"""
- def __init__(self):
+ def __init__(self, model_accessor):
+ self.model_accessor = model_accessor
self.pull_steps = []
def load_pull_step_modules(self, pull_step_dir):
@@ -98,5 +100,5 @@
log.info("Starting pull steps engine", steps=self.pull_steps)
for step in self.pull_steps:
- sched = XOSPullStepScheduler(steps=self.pull_steps)
+ sched = XOSPullStepScheduler(steps=self.pull_steps, model_accessor=self.model_accessor)
sched.run()
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py b/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
index adbc0b1..290ab92 100644
--- a/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
+++ b/lib/xos-synchronizer/xossynchronizer/pull_steps/pullstep.py
@@ -23,8 +23,10 @@
Initialize a pull step
:param kwargs:
-- observed_model: name of the model that is being polled
+ -- model_accessor: used to create and update models
"""
self.observed_model = kwargs.get("observed_model")
+ self.model_accessor = kwargs.get("model_accessor")
def pull_records(self):
self.log.debug(
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py b/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
index 6ed656c..1bc54ce 100644
--- a/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
+++ b/lib/xos-synchronizer/xossynchronizer/steps/SyncInstanceUsingAnsible.py
@@ -23,7 +23,6 @@
from xossynchronizer.steps.syncstep import SyncStep, DeferredException
from xossynchronizer.ansible_helper import run_template_ssh
-from xossynchronizer.modelaccessor import *
class SyncInstanceUsingAnsible(SyncStep):
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/ansiblesyncstep.py b/lib/xos-synchronizer/xossynchronizer/steps/ansiblesyncstep.py
new file mode 100644
index 0000000..116f8c2
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/steps/ansiblesyncstep.py
@@ -0,0 +1,60 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from xossynchronizer.ansible_helper import run_template
+from syncstep import SyncStep
+
+class AnsibleSyncStep(SyncStep):
+ def sync_record(self, o):
+ self.log.debug("In default sync record", **o.tologdict())
+
+ tenant_fields = self.map_sync_inputs(o)
+ if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
+ return
+
+ main_obj = self.observes_classes[0]
+
+ path = "".join(main_obj.__name__).lower()
+ res = run_template(self.playbook, tenant_fields, path=path, object=o)
+
+ if hasattr(self, "map_sync_outputs"):
+ self.map_sync_outputs(o, res)
+
+ self.log.debug("Finished default sync record", **o.tologdict())
+
+ def delete_record(self, o):
+ self.log.debug("In default delete record", **o.tologdict())
+
+ # If there is no map_delete_inputs, then assume deleting a record is a no-op.
+ if not hasattr(self, "map_delete_inputs"):
+ return
+
+ tenant_fields = self.map_delete_inputs(o)
+
+ main_obj = self.observes_classes[0]
+
+ path = "".join(main_obj.__name__).lower()
+
+ tenant_fields["delete"] = True
+ res = run_template(self.playbook, tenant_fields, path=path)
+
+ if hasattr(self, "map_delete_outputs"):
+ self.map_delete_outputs(o, res)
+ else:
+ # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
+ # of 0.
+ if res[0].get("rc", 0) != 0:
+ raise Exception("Nonzero rc from Ansible during delete_record")
+
+ self.log.debug("Finished default delete record", **o.tologdict())
\ No newline at end of file
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py b/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py
deleted file mode 100644
index 1fb5894..0000000
--- a/lib/xos-synchronizer/xossynchronizer/steps/sync_object.py
+++ /dev/null
@@ -1,25 +0,0 @@
-# Copyright 2017-present Open Networking Foundation
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-from synchronizers.new_base.syncstep import *
-
-
-class SyncObject(SyncStep):
- provides = [] # Caller fills this in
- requested_interval = 0
- observes = [] # Caller fills this in
-
- def sync_record(self, r):
- raise DeferredException("Waiting for Service dependency: %r" % r)
diff --git a/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py b/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
index 2f31e3e..7644822 100644
--- a/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
+++ b/lib/xos-synchronizer/xossynchronizer/steps/syncstep.py
@@ -13,19 +13,6 @@
# limitations under the License.
-import os
-import base64
-
-from xosconfig import Config
-from xossynchronizer.modelaccessor import *
-from xossynchronizer.ansible_helper import run_template
-
-# from tests.steps.mock_modelaccessor import model_accessor
-
-import json
-import time
-import pdb
-
from xosconfig import Config
from functools import reduce
@@ -86,13 +73,16 @@
def __init__(self, **args):
"""Initialize a sync step
Keyword arguments:
- name -- Name of the step
- provides -- XOS models sync'd by this step
+ model_accessor: class used to access models
+ driver: used by openstack synchronizer (DEPRECATED)
+ error_map: used by openstack synchronizer (DEPRECATED)
"""
- dependencies = []
+ self.model_accessor = args.get("model_accessor")
self.driver = args.get("driver")
self.error_map = args.get("error_map")
+ assert self.model_accessor is not None
+
try:
self.soft_deadline = int(self.get_prop("soft_deadline_seconds"))
except BaseException:
@@ -103,56 +93,40 @@
return
+ @property
+ def observes_classes(self):
+ """ Return a list of classes that this syncstep observes. The "observes" class member can be either a list of
+ items or a single item. Those items may be either classes or names of classes. This function always returns
+ a list of classes.
+ """
+ if not self.observes:
+ return []
+ if isinstance(self.observes, list):
+ observes = self.observes
+ else:
+ observes = [self.observes]
+ result = []
+ for class_or_name in observes:
+ if isinstance(class_or_name, str):
+ result.append(self.model_accessor.get_model_class(class_or_name))
+ else:
+ result.append(class_or_name)
+ return result
+
+
def fetch_pending(self, deletion=False):
# This is the most common implementation of fetch_pending
# Steps should override it if they have their own logic
# for figuring out what objects are outstanding.
- return model_accessor.fetch_pending(self.observes, deletion)
+ return self.model_accessor.fetch_pending(self.observes_classes, deletion)
+
def sync_record(self, o):
- self.log.debug("In default sync record", **o.tologdict())
+ self.log.debug("In abstract sync record", **o.tologdict())
+ # This method should be overridden by the service
- tenant_fields = self.map_sync_inputs(o)
- if tenant_fields == SyncStep.SYNC_WITHOUT_RUNNING:
- return
-
- main_objs = self.observes
- if isinstance(main_objs, list):
- main_objs = main_objs[0]
-
- path = "".join(main_objs.__name__).lower()
- res = run_template(self.playbook, tenant_fields, path=path, object=o)
-
- if hasattr(self, "map_sync_outputs"):
- self.map_sync_outputs(o, res)
-
- self.log.debug("Finished default sync record", **o.tologdict())
def delete_record(self, o):
- self.log.debug("In default delete record", **o.tologdict())
-
- # If there is no map_delete_inputs, then assume deleting a record is a no-op.
- if not hasattr(self, "map_delete_inputs"):
- return
-
- tenant_fields = self.map_delete_inputs(o)
-
- main_objs = self.observes
- if isinstance(main_objs, list):
- main_objs = main_objs[0]
-
- path = "".join(main_objs.__name__).lower()
-
- tenant_fields["delete"] = True
- res = run_template(self.playbook, tenant_fields, path=path)
-
- if hasattr(self, "map_delete_outputs"):
- self.map_delete_outputs(o, res)
- else:
- # "rc" is often only returned when something bad happens, so assume that no "rc" implies a successful rc
- # of 0.
- if res[0].get("rc", 0) != 0:
- raise Exception("Nonzero rc from Ansible during delete_record")
-
- self.log.debug("Finished default delete record", **o.tologdict())
+ self.log.debug("In abstract delete record", **o.tologdict())
+ # This method should be overridden by the service
diff --git a/lib/xos-synchronizer/xossynchronizer/synchronizer.py b/lib/xos-synchronizer/xossynchronizer/synchronizer.py
index 9a530d7..30d3ca0 100644
--- a/lib/xos-synchronizer/xossynchronizer/synchronizer.py
+++ b/lib/xos-synchronizer/xossynchronizer/synchronizer.py
@@ -14,8 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import os
-import sys
import time
from xosconfig import Config
@@ -60,7 +58,7 @@
from backend import Backend
log_closure = self.log.bind(synchronizer_name=Config().get("name"))
- backend = Backend(log=log_closure)
+ backend = Backend(log=log_closure, model_accessor=self.model_accessor)
backend.run()