[CORD-2888] Adding pull steps to the synchronization loop
Change-Id: I768c5a32739a27764ee79e545b895be6273b3dc8
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index dd25888..bdaf938 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -57,6 +57,8 @@
type: str
steps_dir:
type: str
+ pull_steps_dir:
+ type: str
sys_dir:
type: str
models_dir:
diff --git a/lib/xos-util/.gitignore b/lib/xos-util/.gitignore
new file mode 100644
index 0000000..7fa3589
--- /dev/null
+++ b/lib/xos-util/.gitignore
@@ -0,0 +1,6 @@
+build
+dist
+XosUtil.egg-info
+.coverage
+coverage.xml
+cover
\ No newline at end of file
diff --git a/scripts/setup_venv.sh b/scripts/setup_venv.sh
index 56e87fe..73effb1 100644
--- a/scripts/setup_venv.sh
+++ b/scripts/setup_venv.sh
@@ -15,7 +15,7 @@
# limitations under the License.
BASEDIR=$(pwd)
-REQUIREMENTS=$BASEDIR/containers/xos/pip_requirements.txt
+REQUIREMENTS=$BASEDIR/containers/xos/pip_requested.txt
VENVDIR=venv-xos
echo $BASEDIR
@@ -48,7 +48,12 @@
cd $BASEDIR/xos/xos_client; python setup.py install && \
chmod 777 $BASEDIR/venv-xos/lib/python2.7/site-packages/xosapi/chameleon/protoc_plugins/gw_gen.py && \
chmod 777 $BASEDIR/venv-xos/lib/python2.7/site-packages/xosapi/chameleon/protoc_plugins/swagger_gen.py && \
-cd $BASEDIR/lib/xos-genx; python setup.py install
+
+#install xos-genx
+cd $BASEDIR/lib/xos-genx; python setup.py install && \
+
+#install xos-util
+cd $BASEDIR/lib/xos-util; python setup.py install
then
echo "Requirements installed."
echo "Virtualenv ready"
diff --git a/xos/synchronizers/.gitignore b/xos/synchronizers/.gitignore
index 6aed14b..f0650b7 100644
--- a/xos/synchronizers/.gitignore
+++ b/xos/synchronizers/.gitignore
@@ -2,4 +2,5 @@
new_base/mock_modelaccessor.py
new_base/mock_modelaccessor.py.context
new_base/synchronizers.new_base.ansible_helper
-new_base/xos.synchronizers.new_base.tests.test_payload
\ No newline at end of file
+new_base/xos.synchronizers.new_base.tests.test_payload
+new_base/synchronizers.new_base.diag
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index d2a1112..32de955 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -42,6 +42,25 @@
self.log = log
pass
+ def load_pull_step_modules(self, pull_step_dir):
+ pull_steps = []
+ self.log.info("Loading pull steps", pull_step_dir=pull_step_dir)
+ # NOTE we'll load all the classes that inherit from PullStep
+ for fn in os.listdir(pull_step_dir):
+ pathname = os.path.join(pull_step_dir, fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and not "test" in fn:
+ module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(module):
+ c = getattr(module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if 'PullStep' in base_names:
+ pull_steps.append(c)
+ self.log.info("Loaded pull steps", steps=pull_steps)
+ return pull_steps
+
def load_sync_step_modules(self, step_dir):
sync_steps = []
@@ -49,7 +68,7 @@
for fn in os.listdir(step_dir):
pathname = os.path.join(step_dir,fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py") and not "test" in fn:
module = imp.load_source(fn[:-3],pathname)
for classname in dir(module):
@@ -79,21 +98,34 @@
model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
steps_dir = Config.get("steps_dir")
- if steps_dir:
- sync_steps = self.load_sync_step_modules(steps_dir)
- if sync_steps:
+ pull_steps_dir = Config.get("pull_steps_dir")
+ if steps_dir or pull_steps_dir:
+ sync_steps = []
+ pull_steps = []
+
+ # load sync_steps and pull_steps
+ if steps_dir:
+ sync_steps = self.load_sync_step_modules(steps_dir)
+ if pull_steps_dir:
+ pull_steps = self.load_pull_step_modules(pull_steps_dir)
+
+ # if we have at least one sync_step or one pull_step
+ if len(sync_steps) > 0 or len(pull_steps) > 0:
# start the observer
- observer = XOSObserver(sync_steps, log = self.log)
+ self.log.info("Starting XOSObserver", sync_steps=sync_steps, pull_steps=pull_steps)
+ observer = XOSObserver(sync_steps, pull_steps, log = self.log)
observer_thread = threading.Thread(target=observer.run,name='synchronizer')
observer_thread.start()
# start the watcher thread
if (watchers_enabled):
+ self.log.info("Starting XOSWatcher", sync_steps=sync_steps)
watcher = XOSWatcher(sync_steps)
watcher_thread = threading.Thread(target=watcher.run,name='watcher')
watcher_thread.start()
else:
- self.log.info("Skipping observer and watcher threads due to no steps dir.")
+ self.log.info(""
+ "Skipping observer and watcher threads due to no steps dir.")
# start model policies thread
policies_dir = Config.get("model_policies_dir")
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 2e5e6e1..fdf3671 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -71,11 +71,12 @@
class XOSObserver:
sync_steps = []
- def __init__(self, sync_steps, log=log):
+ def __init__(self, sync_steps, pull_steps=[], log=log):
# The Condition object via which events are received
self.log = log
self.step_lookup = {}
self.sync_steps = sync_steps
+ self.pull_steps = pull_steps
self.load_sync_steps()
self.load_dependency_graph()
@@ -669,6 +670,27 @@
loop_start = time.time()
+ # Run pull_steps (just once, before doing everything else)
+ self.log.debug('Starting pull steps', steps=self.pull_steps)
+ pull_threads = []
+ for ps in self.pull_steps:
+ i = ps()
+ thread = threading.Thread(
+ target=i.pull_records, name='pullstep', args=()
+ )
+ pull_threads.append(thread)
+
+ # Start threads
+ for t in pull_threads:
+ t.start()
+
+ # Wait for all threads to finish before continuing with the run
+ # loop
+ for t in pull_threads:
+ t.join()
+
+ self.log.debug('Done with pull steps', steps=self.pull_steps)
+
# Two passes. One for sync, the other for deletion.
for deletion in (False, True):
objects_to_process = []
diff --git a/xos/synchronizers/new_base/pullstep.py b/xos/synchronizers/new_base/pullstep.py
new file mode 100644
index 0000000..1300ea6
--- /dev/null
+++ b/xos/synchronizers/new_base/pullstep.py
@@ -0,0 +1,28 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class PullStep(object):
+ """
+ All the pull steps defined in each synchronizer needs to inherit from this class in order to be loaded
+ """
+ def __init__(self, **kwargs):
+ """
+ Initialize a pull step
+ :param kwargs:
+ -- observed_model: name of the model that is being polled
+ """
+ self.observed_model = kwargs.get('observed_model')
+
+ def pull_records(self):
+ self.log.debug("There is no default pull_records, please provide a pull_records method for %s" % self.observed_model)
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/tests/pull_steps/__init__.py b/xos/synchronizers/new_base/tests/pull_steps/__init__.py
new file mode 100644
index 0000000..eb28b96
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/pull_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/tests/pull_steps/pull_step.py b/xos/synchronizers/new_base/tests/pull_steps/pull_step.py
new file mode 100644
index 0000000..2209b99
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/pull_steps/pull_step.py
@@ -0,0 +1,20 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synchronizers.new_base.pullstep import PullStep
+from mock_modelaccessor import *
+
+class TestPullStep(PullStep):
+ def __init__(self):
+ super(TestPullStep, self).__init__(observed_model=Instance)
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/tests/test_config.yaml b/xos/synchronizers/new_base/tests/test_config.yaml
index 5dc99fe..cd3dd64 100644
--- a/xos/synchronizers/new_base/tests/test_config.yaml
+++ b/xos/synchronizers/new_base/tests/test_config.yaml
@@ -30,3 +30,4 @@
level: DEBUG
dependency_graph: "tests/model-deps"
steps_dir: "tests/steps"
+pull_steps_dir: "tests/pull_steps"
diff --git a/xos/synchronizers/new_base/tests/test_load.py b/xos/synchronizers/new_base/tests/test_load.py
index ae267a5..06f8275 100644
--- a/xos/synchronizers/new_base/tests/test_load.py
+++ b/xos/synchronizers/new_base/tests/test_load.py
@@ -55,8 +55,10 @@
b = backend.Backend()
steps_dir = Config.get("steps_dir")
+ pull_steps_dir = Config.get("pull_steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = event_loop.XOSObserver(self.steps)
+ self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
def tearDown(self):
sys.path = self.sys_path_save
@@ -96,5 +98,8 @@
observed_names = [o.__name__ for o in observes]
self.assertIn(k, observed_names)
+ def test_load_pull_steps(self):
+ self.assertEqual(len(self.synchronizer.pull_steps), 1)
+
if __name__ == '__main__':
unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_run.py b/xos/synchronizers/new_base/tests/test_run.py
index c4ca3af..61147ea 100644
--- a/xos/synchronizers/new_base/tests/test_run.py
+++ b/xos/synchronizers/new_base/tests/test_run.py
@@ -64,8 +64,10 @@
b = backend.Backend()
steps_dir = Config.get("steps_dir")
+ pull_steps_dir = Config.get("pull_steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.synchronizer = event_loop.XOSObserver(self.steps)
+ self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
+ self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
try:
os.remove('/tmp/sync_ports')
except OSError:
@@ -81,7 +83,10 @@
@mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
@mock.patch("event_loop.model_accessor")
- def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
+ @mock.patch("pull_step.TestPullStep.pull_records")
+ def test_run_once(self, mock_pull_records, mock_run_template, mock_accessor, *_other_accessors):
+
+
pending_objects, pending_steps = self.synchronizer.fetch_pending()
pending_objects2 = list(pending_objects)
@@ -93,12 +98,15 @@
any_cs.slice = slice
self.synchronizer.run_once()
+
sync_ports = open('/tmp/sync_ports').read()
delete_ports = open('/tmp/delete_ports').read()
self.assertIn("successful", sync_ports)
self.assertIn("successful", delete_ports)
-
+
+ mock_pull_records.assert_called_once()
+
if __name__ == '__main__':
unittest.main()
diff --git a/xos/xos_client/.gitignore b/xos/xos_client/.gitignore
index 608c120..f82b325 100644
--- a/xos/xos_client/.gitignore
+++ b/xos/xos_client/.gitignore
@@ -1,2 +1,3 @@
build
xosapi/chameleon
+xosapi.egg-info