[CORD-3175] Move pull_steps in a separate thread

Change-Id: I22fb139c5fb48b0b2a58283fa6898af24a1820b5
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index e9a9a56..7c5b350 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -24,6 +24,7 @@
 from synchronizers.new_base.event_loop import XOSObserver
 from synchronizers.new_base.model_policy_loop import XOSPolicyEngine
 from synchronizers.new_base.event_engine import XOSEventEngine
+from synchronizers.new_base.pull_step_engine import XOSPullStepEngine
 from synchronizers.new_base.modelaccessor import *
 
 from xosconfig import Config
@@ -43,25 +44,6 @@
         self.log = log
         pass
 
-    def load_pull_step_modules(self, pull_step_dir):
-        pull_steps = []
-        self.log.info("Loading pull steps", pull_step_dir=pull_step_dir)
-        # NOTE we'll load all the classes that inherit from PullStep
-        for fn in os.listdir(pull_step_dir):
-            pathname = os.path.join(pull_step_dir, fn)
-            if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and not "test" in fn:
-                module = imp.load_source(fn[:-3], pathname)
-
-                for classname in dir(module):
-                    c = getattr(module, classname, None)
-
-                    if inspect.isclass(c):
-                        base_names = [b.__name__ for b in c.__bases__]
-                        if 'PullStep' in base_names:
-                            pull_steps.append(c)
-        self.log.info("Loaded pull steps", steps=pull_steps)
-        return pull_steps
-
     def load_sync_step_modules(self, step_dir):
         sync_steps = []
 
@@ -107,22 +89,18 @@
         model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
 
         steps_dir = Config.get("steps_dir")
-        pull_steps_dir = Config.get("pull_steps_dir")
-        if steps_dir or pull_steps_dir:
+        if steps_dir:
             sync_steps = []
-            pull_steps = []
 
-            # load sync_steps and pull_steps
+            # load sync_steps
             if steps_dir:
                 sync_steps = self.load_sync_step_modules(steps_dir)
-            if pull_steps_dir:
-                pull_steps = self.load_pull_step_modules(pull_steps_dir)
 
-            # if we have at least one sync_step or one pull_step
-            if len(sync_steps) > 0 or len(pull_steps) > 0:
+            # if we have at least one sync_step
+            if len(sync_steps) > 0:
                 # start the observer
-                self.log.info("Starting XOSObserver", sync_steps=sync_steps, pull_steps=pull_steps)
-                observer = XOSObserver(sync_steps, pull_steps, log = self.log)
+                self.log.info("Starting XOSObserver", sync_steps=sync_steps)
+                observer = XOSObserver(sync_steps, log = self.log)
                 observer_thread = threading.Thread(target=observer.run,name='synchronizer')
                 observer_thread.start()
 
@@ -133,8 +111,15 @@
                     watcher_thread = threading.Thread(target=watcher.run,name='watcher')
                     watcher_thread.start()
         else:
-            self.log.info(""
-                          "Skipping observer and watcher threads due to no steps dir.")
+            self.log.info("Skipping observer and watcher threads due to no steps dir.")
+
+        pull_steps_dir = Config.get("pull_steps_dir")
+        if pull_steps_dir:
+            pull_steps_engine = XOSPullStepEngine()
+            pull_steps_engine.load_pull_step_modules(pull_steps_dir)
+            pull_steps_engine.start()
+        else:
+            self.log.info("Skipping event engine due to no event_steps dir.")
 
         event_steps_dir = Config.get("event_steps_dir")
         if event_steps_dir:
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 3273c84..1a1a4f4 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -71,12 +71,11 @@
 class XOSObserver:
     sync_steps = []
 
-    def __init__(self, sync_steps, pull_steps=[], log=log):
+    def __init__(self, sync_steps, log=log):
         # The Condition object via which events are received
         self.log = log
         self.step_lookup = {}
         self.sync_steps = sync_steps
-        self.pull_steps = pull_steps
         self.load_sync_steps()
 
         self.load_dependency_graph()
@@ -674,27 +673,6 @@
 
             loop_start = time.time()
 
-            # Run pull_steps (just once, before doing everything else)
-            self.log.debug('Starting pull steps', steps=self.pull_steps)
-            pull_threads = []
-            for ps in self.pull_steps:
-                i = ps()
-                thread = threading.Thread(
-                    target=i.pull_records, name='pullstep', args=()
-                )
-                pull_threads.append(thread)
-
-            # Start threads
-            for t in pull_threads:
-                t.start()
-
-            # Wait for all threads to finish before continuing with the run
-            # loop
-            for t in pull_threads:
-                t.join()
-
-            self.log.debug('Done with pull steps', steps=self.pull_steps)
-
             # Two passes. One for sync, the other for deletion.
             for deletion in (False, True):
                 objects_to_process = []
diff --git a/xos/synchronizers/new_base/pull_step_engine.py b/xos/synchronizers/new_base/pull_step_engine.py
new file mode 100644
index 0000000..e695b1a
--- /dev/null
+++ b/xos/synchronizers/new_base/pull_step_engine.py
@@ -0,0 +1,98 @@
+
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get('logging'))
+
+class XOSPullStepScheduler():
+    """ XOSPullStepThread
+
+        A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
+        The thread's pull_records() function is called for every five seconds.
+    """
+
+    def __init__(self, steps, *args, **kwargs):
+        self.steps = steps
+
+    def run(self):
+        while True:
+            time.sleep(5)
+            self.run_once()
+
+    def run_once(self):
+        log.debug('Starting pull steps', steps=self.steps)
+
+        threads = []
+        for step in self.steps:
+            thread = threading.Thread(target=step().pull_records, name='pull_step')
+            threads.append(thread)
+
+        for t in threads:
+            t.start()
+
+        for t in threads:
+            t.join()
+
+        log.debug('Done with pull steps', steps=self.steps)
+
+
+
+class XOSPullStepEngine:
+    """ XOSPullStepEngine
+
+        Load pull step modules. Two methods are defined:
+
+            load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+                                       descendant from PullStep.
+
+            start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
+                        will be called before start().
+    """
+
+    def __init__(self):
+        self.pull_steps = []
+
+    def load_pull_step_modules(self, pull_step_dir):
+        self.pull_steps = []
+        log.info("Loading event steps", pull_step_dir=pull_step_dir)
+
+        # NOTE we'll load all the classes that inherit from PullStep
+        for fn in os.listdir(pull_step_dir):
+            pathname = os.path.join(pull_step_dir, fn)
+            if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and ("test" not in fn):
+                event_module = imp.load_source(fn[:-3], pathname)
+
+                for classname in dir(event_module):
+                    c = getattr(event_module, classname, None)
+
+                    if inspect.isclass(c):
+                        base_names = [b.__name__ for b in c.__bases__]
+                        if 'PullStep' in base_names:
+                            self.pull_steps.append(c)
+        log.info("Loaded pull steps", steps=self.pull_steps)
+
+    def start(self):
+        log.info("Starting pull steps engine", steps=self.pull_steps)
+
+        for step in self.pull_steps:
+            sched = XOSPullStepScheduler(steps=self.pull_steps)
+            sched.run()
diff --git a/xos/synchronizers/new_base/tests/test_load.py b/xos/synchronizers/new_base/tests/test_load.py
index 06f8275..c42b8af 100644
--- a/xos/synchronizers/new_base/tests/test_load.py
+++ b/xos/synchronizers/new_base/tests/test_load.py
@@ -55,10 +55,8 @@
 
         b = backend.Backend()
         steps_dir = Config.get("steps_dir")
-        pull_steps_dir = Config.get("pull_steps_dir")
         self.steps = b.load_sync_step_modules(steps_dir)
-        self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
-        self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
+        self.synchronizer = event_loop.XOSObserver(self.steps)
 
     def tearDown(self):
         sys.path = self.sys_path_save
@@ -98,8 +96,6 @@
             observed_names = [o.__name__ for o in observes]
             self.assertIn(k, observed_names)
 
-    def test_load_pull_steps(self):
-        self.assertEqual(len(self.synchronizer.pull_steps), 1)
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_run.py b/xos/synchronizers/new_base/tests/test_run.py
index 61147ea..60eb2c0 100644
--- a/xos/synchronizers/new_base/tests/test_run.py
+++ b/xos/synchronizers/new_base/tests/test_run.py
@@ -64,10 +64,8 @@
 
         b = backend.Backend()
         steps_dir = Config.get("steps_dir")
-        pull_steps_dir = Config.get("pull_steps_dir")
         self.steps = b.load_sync_step_modules(steps_dir)
-        self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
-        self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
+        self.synchronizer = event_loop.XOSObserver(self.steps)
         try:
             os.remove('/tmp/sync_ports')
         except OSError:
@@ -83,8 +81,7 @@
 
     @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
     @mock.patch("event_loop.model_accessor")
-    @mock.patch("pull_step.TestPullStep.pull_records")
-    def test_run_once(self, mock_pull_records, mock_run_template, mock_accessor, *_other_accessors):
+    def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
 
 
         pending_objects, pending_steps = self.synchronizer.fetch_pending()
@@ -105,8 +102,5 @@
         self.assertIn("successful", sync_ports)
         self.assertIn("successful", delete_ports)
 
-        mock_pull_records.assert_called_once()
-
-
 if __name__ == '__main__':
     unittest.main()