[CORD-3175] Move pull_steps in a separate thread
Change-Id: I22fb139c5fb48b0b2a58283fa6898af24a1820b5
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index e9a9a56..7c5b350 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -24,6 +24,7 @@
from synchronizers.new_base.event_loop import XOSObserver
from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
from synchronizers.new_base.event_engine import XOSEventEngine
+from synchronizers.new_base.pull_step_engine import XOSPullStepEngine
from synchronizers.new_base.modelaccessor import *
from xosconfig import Config
@@ -43,25 +44,6 @@
self.log = log
pass
- def load_pull_step_modules(self, pull_step_dir):
- pull_steps = []
- self.log.info("Loading pull steps", pull_step_dir=pull_step_dir)
- # NOTE we'll load all the classes that inherit from PullStep
- for fn in os.listdir(pull_step_dir):
- pathname = os.path.join(pull_step_dir, fn)
- if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and not "test" in fn:
- module = imp.load_source(fn[:-3], pathname)
-
- for classname in dir(module):
- c = getattr(module, classname, None)
-
- if inspect.isclass(c):
- base_names = [b.__name__ for b in c.__bases__]
- if 'PullStep' in base_names:
- pull_steps.append(c)
- self.log.info("Loaded pull steps", steps=pull_steps)
- return pull_steps
-
def load_sync_step_modules(self, step_dir):
sync_steps = []
@@ -107,22 +89,18 @@
model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
steps_dir = Config.get("steps_dir")
- pull_steps_dir = Config.get("pull_steps_dir")
- if steps_dir or pull_steps_dir:
+ if steps_dir:
sync_steps = []
- pull_steps = []
- # load sync_steps and pull_steps
+ # load sync_steps
if steps_dir:
sync_steps = self.load_sync_step_modules(steps_dir)
- if pull_steps_dir:
- pull_steps = self.load_pull_step_modules(pull_steps_dir)
- # if we have at least one sync_step or one pull_step
- if len(sync_steps) > 0 or len(pull_steps) > 0:
+ # if we have at least one sync_step
+ if len(sync_steps) > 0:
# start the observer
- self.log.info("Starting XOSObserver", sync_steps=sync_steps, pull_steps=pull_steps)
- observer = XOSObserver(sync_steps, pull_steps, log = self.log)
+ self.log.info("Starting XOSObserver", sync_steps=sync_steps)
+ observer = XOSObserver(sync_steps, log = self.log)
observer_thread = threading.Thread(target=observer.run,name='synchronizer')
observer_thread.start()
@@ -133,8 +111,15 @@
watcher_thread = threading.Thread(target=watcher.run,name='watcher')
watcher_thread.start()
else:
- self.log.info(""
- "Skipping observer and watcher threads due to no steps dir.")
+ self.log.info("Skipping observer and watcher threads due to no steps dir.")
+
+ pull_steps_dir = Config.get("pull_steps_dir")
+ if pull_steps_dir:
+ pull_steps_engine = XOSPullStepEngine()
+ pull_steps_engine.load_pull_step_modules(pull_steps_dir)
+ pull_steps_engine.start()
+ else:
+ self.log.info("Skipping event engine due to no event_steps dir.")
event_steps_dir = Config.get("event_steps_dir")
if event_steps_dir:
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 3273c84..1a1a4f4 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -71,12 +71,11 @@
class XOSObserver:
sync_steps = []
- def __init__(self, sync_steps, pull_steps=[], log=log):
+ def __init__(self, sync_steps, log=log):
# The Condition object via which events are received
self.log = log
self.step_lookup = {}
self.sync_steps = sync_steps
- self.pull_steps = pull_steps
self.load_sync_steps()
self.load_dependency_graph()
@@ -674,27 +673,6 @@
loop_start = time.time()
- # Run pull_steps (just once, before doing everything else)
- self.log.debug('Starting pull steps', steps=self.pull_steps)
- pull_threads = []
- for ps in self.pull_steps:
- i = ps()
- thread = threading.Thread(
- target=i.pull_records, name='pullstep', args=()
- )
- pull_threads.append(thread)
-
- # Start threads
- for t in pull_threads:
- t.start()
-
- # Wait for all threads to finish before continuing with the run
- # loop
- for t in pull_threads:
- t.join()
-
- self.log.debug('Done with pull steps', steps=self.pull_steps)
-
# Two passes. One for sync, the other for deletion.
for deletion in (False, True):
objects_to_process = []
diff --git a/xos/synchronizers/new_base/pull_step_engine.py b/xos/synchronizers/new_base/pull_step_engine.py
new file mode 100644
index 0000000..e695b1a
--- /dev/null
+++ b/xos/synchronizers/new_base/pull_step_engine.py
@@ -0,0 +1,98 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+class XOSPullStepScheduler():
+ """ XOSPullStepThread
+
+ A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
+ The thread's pull_records() function is called for every five seconds.
+ """
+
+ def __init__(self, steps, *args, **kwargs):
+ self.steps = steps
+
+ def run(self):
+ while True:
+ time.sleep(5)
+ self.run_once()
+
+ def run_once(self):
+ log.debug('Starting pull steps', steps=self.steps)
+
+ threads = []
+ for step in self.steps:
+ thread = threading.Thread(target=step().pull_records, name='pull_step')
+ threads.append(thread)
+
+ for t in threads:
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ log.debug('Done with pull steps', steps=self.steps)
+
+
+
+class XOSPullStepEngine:
+ """ XOSPullStepEngine
+
+ Load pull step modules. Two methods are defined:
+
+ load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+ descendant from PullStep.
+
+ start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
+ will be called before start().
+ """
+
+ def __init__(self):
+ self.pull_steps = []
+
+ def load_pull_step_modules(self, pull_step_dir):
+ self.pull_steps = []
+ log.info("Loading event steps", pull_step_dir=pull_step_dir)
+
+ # NOTE we'll load all the classes that inherit from PullStep
+ for fn in os.listdir(pull_step_dir):
+ pathname = os.path.join(pull_step_dir, fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and ("test" not in fn):
+ event_module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(event_module):
+ c = getattr(event_module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if 'PullStep' in base_names:
+ self.pull_steps.append(c)
+ log.info("Loaded pull steps", steps=self.pull_steps)
+
+ def start(self):
+ log.info("Starting pull steps engine", steps=self.pull_steps)
+
+ for step in self.pull_steps:
+ sched = XOSPullStepScheduler(steps=self.pull_steps)
+ sched.run()
diff --git a/xos/synchronizers/new_base/tests/test_load.py b/xos/synchronizers/new_base/tests/test_load.py
index 06f8275..c42b8af 100644
--- a/xos/synchronizers/new_base/tests/test_load.py
+++ b/xos/synchronizers/new_base/tests/test_load.py
@@ -55,10 +55,8 @@
b = backend.Backend()
steps_dir = Config.get("steps_dir")
- pull_steps_dir = Config.get("pull_steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
- self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
def tearDown(self):
sys.path = self.sys_path_save
@@ -98,8 +96,6 @@
observed_names = [o.__name__ for o in observes]
self.assertIn(k, observed_names)
- def test_load_pull_steps(self):
- self.assertEqual(len(self.synchronizer.pull_steps), 1)
if __name__ == '__main__':
unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_run.py b/xos/synchronizers/new_base/tests/test_run.py
index 61147ea..60eb2c0 100644
--- a/xos/synchronizers/new_base/tests/test_run.py
+++ b/xos/synchronizers/new_base/tests/test_run.py
@@ -64,10 +64,8 @@
b = backend.Backend()
steps_dir = Config.get("steps_dir")
- pull_steps_dir = Config.get("pull_steps_dir")
self.steps = b.load_sync_step_modules(steps_dir)
- self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
- self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
+ self.synchronizer = event_loop.XOSObserver(self.steps)
try:
os.remove('/tmp/sync_ports')
except OSError:
@@ -83,8 +81,7 @@
@mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
@mock.patch("event_loop.model_accessor")
- @mock.patch("pull_step.TestPullStep.pull_records")
- def test_run_once(self, mock_pull_records, mock_run_template, mock_accessor, *_other_accessors):
+ def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
pending_objects, pending_steps = self.synchronizer.fetch_pending()
@@ -105,8 +102,5 @@
self.assertIn("successful", sync_ports)
self.assertIn("successful", delete_ports)
- mock_pull_records.assert_called_once()
-
-
if __name__ == '__main__':
unittest.main()