[CORD-2888] Adding pull steps to the synchronization loop

Change-Id: I768c5a32739a27764ee79e545b895be6273b3dc8
diff --git a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
index dd25888..bdaf938 100644
--- a/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
+++ b/lib/xos-config/xosconfig/synchronizer-config-schema.yaml
@@ -57,6 +57,8 @@
     type: str
   steps_dir:
     type: str
+  pull_steps_dir:
+    type: str
   sys_dir:
     type: str
   models_dir:
diff --git a/lib/xos-util/.gitignore b/lib/xos-util/.gitignore
new file mode 100644
index 0000000..7fa3589
--- /dev/null
+++ b/lib/xos-util/.gitignore
@@ -0,0 +1,6 @@
+build
+dist
+XosUtil.egg-info
+.coverage
+coverage.xml
+cover
\ No newline at end of file
diff --git a/scripts/setup_venv.sh b/scripts/setup_venv.sh
index 56e87fe..73effb1 100644
--- a/scripts/setup_venv.sh
+++ b/scripts/setup_venv.sh
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 BASEDIR=$(pwd)
-REQUIREMENTS=$BASEDIR/containers/xos/pip_requirements.txt
+REQUIREMENTS=$BASEDIR/containers/xos/pip_requested.txt
 VENVDIR=venv-xos
 
 echo $BASEDIR
@@ -48,7 +48,12 @@
 cd $BASEDIR/xos/xos_client; python setup.py install && \
 chmod 777 $BASEDIR/venv-xos/lib/python2.7/site-packages/xosapi/chameleon/protoc_plugins/gw_gen.py && \
 chmod 777 $BASEDIR/venv-xos/lib/python2.7/site-packages/xosapi/chameleon/protoc_plugins/swagger_gen.py && \
-cd $BASEDIR/lib/xos-genx; python setup.py install
+
+#install xos-genx
+cd $BASEDIR/lib/xos-genx; python setup.py install && \
+
+#install xos-util
+cd $BASEDIR/lib/xos-util; python setup.py install
  then
    echo "Requirements installed."
    echo "Virtualenv ready"
diff --git a/xos/synchronizers/.gitignore b/xos/synchronizers/.gitignore
index 6aed14b..f0650b7 100644
--- a/xos/synchronizers/.gitignore
+++ b/xos/synchronizers/.gitignore
@@ -2,4 +2,5 @@
 new_base/mock_modelaccessor.py
 new_base/mock_modelaccessor.py.context
 new_base/synchronizers.new_base.ansible_helper
-new_base/xos.synchronizers.new_base.tests.test_payload
\ No newline at end of file
+new_base/xos.synchronizers.new_base.tests.test_payload
+new_base/synchronizers.new_base.diag
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/backend.py b/xos/synchronizers/new_base/backend.py
index d2a1112..32de955 100644
--- a/xos/synchronizers/new_base/backend.py
+++ b/xos/synchronizers/new_base/backend.py
@@ -42,6 +42,25 @@
         self.log = log
         pass
 
+    def load_pull_step_modules(self, pull_step_dir):
+        pull_steps = []
+        self.log.info("Loading pull steps", pull_step_dir=pull_step_dir)
+        # NOTE we'll load all the classes that inherit from PullStep
+        for fn in os.listdir(pull_step_dir):
+            pathname = os.path.join(pull_step_dir, fn)
+            if os.path.isfile(pathname) and fn.endswith(".py") and (fn != "__init__.py") and not "test" in fn:
+                module = imp.load_source(fn[:-3], pathname)
+
+                for classname in dir(module):
+                    c = getattr(module, classname, None)
+
+                    if inspect.isclass(c):
+                        base_names = [b.__name__ for b in c.__bases__]
+                        if 'PullStep' in base_names:
+                            pull_steps.append(c)
+        self.log.info("Loaded pull steps", steps=pull_steps)
+        return pull_steps
+
     def load_sync_step_modules(self, step_dir):
         sync_steps = []
 
@@ -49,7 +68,7 @@
 
         for fn in os.listdir(step_dir):
             pathname = os.path.join(step_dir,fn)
-            if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+            if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py") and not "test" in fn:
                 module = imp.load_source(fn[:-3],pathname)
 
                 for classname in dir(module):
@@ -79,21 +98,34 @@
         model_accessor.update_diag(sync_start=time.time(), backend_status="Synchronizer Start")
 
         steps_dir = Config.get("steps_dir")
-        if steps_dir:
-            sync_steps = self.load_sync_step_modules(steps_dir)
-            if sync_steps:
+        pull_steps_dir = Config.get("pull_steps_dir")
+        if steps_dir or pull_steps_dir:
+            sync_steps = []
+            pull_steps = []
+
+            # load sync_steps and pull_steps
+            if steps_dir:
+                sync_steps = self.load_sync_step_modules(steps_dir)
+            if pull_steps_dir:
+                pull_steps = self.load_pull_step_modules(pull_steps_dir)
+
+            # if we have at least one sync_step or one pull_step
+            if len(sync_steps) > 0 or len(pull_steps) > 0:
                 # start the observer
-                observer = XOSObserver(sync_steps, log = self.log)
+                self.log.info("Starting XOSObserver", sync_steps=sync_steps, pull_steps=pull_steps)
+                observer = XOSObserver(sync_steps, pull_steps, log = self.log)
                 observer_thread = threading.Thread(target=observer.run,name='synchronizer')
                 observer_thread.start()
 
                 # start the watcher thread
                 if (watchers_enabled):
+                    self.log.info("Starting XOSWatcher", sync_steps=sync_steps)
                     watcher = XOSWatcher(sync_steps)
                     watcher_thread = threading.Thread(target=watcher.run,name='watcher')
                     watcher_thread.start()
         else:
-            self.log.info("Skipping observer and watcher threads due to no steps dir.")
+            self.log.info(""
+                          "Skipping observer and watcher threads due to no steps dir.")
 
         # start model policies thread
         policies_dir = Config.get("model_policies_dir")
diff --git a/xos/synchronizers/new_base/event_loop.py b/xos/synchronizers/new_base/event_loop.py
index 2e5e6e1..fdf3671 100644
--- a/xos/synchronizers/new_base/event_loop.py
+++ b/xos/synchronizers/new_base/event_loop.py
@@ -71,11 +71,12 @@
 class XOSObserver:
     sync_steps = []
 
-    def __init__(self, sync_steps, log=log):
+    def __init__(self, sync_steps, pull_steps=[], log=log):
         # The Condition object via which events are received
         self.log = log
         self.step_lookup = {}
         self.sync_steps = sync_steps
+        self.pull_steps = pull_steps
         self.load_sync_steps()
 
         self.load_dependency_graph()
@@ -669,6 +670,27 @@
 
             loop_start = time.time()
 
+            # Run pull_steps (just once, before doing everything else)
+            self.log.debug('Starting pull steps', steps=self.pull_steps)
+            pull_threads = []
+            for ps in self.pull_steps:
+                i = ps()
+                thread = threading.Thread(
+                    target=i.pull_records, name='pullstep', args=()
+                )
+                pull_threads.append(thread)
+
+            # Start threads
+            for t in pull_threads:
+                t.start()
+
+            # Wait for all threads to finish before continuing with the run
+            # loop
+            for t in pull_threads:
+                t.join()
+
+            self.log.debug('Done with pull steps', steps=self.pull_steps)
+
             # Two passes. One for sync, the other for deletion.
             for deletion in (False, True):
                 objects_to_process = []
diff --git a/xos/synchronizers/new_base/pullstep.py b/xos/synchronizers/new_base/pullstep.py
new file mode 100644
index 0000000..1300ea6
--- /dev/null
+++ b/xos/synchronizers/new_base/pullstep.py
@@ -0,0 +1,28 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+class PullStep(object):
+    """
+    All the pull steps defined in each synchronizer needs to inherit from this class in order to be loaded
+    """
+    def __init__(self, **kwargs):
+        """
+        Initialize a pull step
+        :param kwargs:
+        -- observed_model: name of the model that is being polled
+        """
+        self.observed_model = kwargs.get('observed_model')
+
+    def pull_records(self):
+        self.log.debug("There is no default pull_records, please provide a pull_records method for %s" % self.observed_model)
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/tests/pull_steps/__init__.py b/xos/synchronizers/new_base/tests/pull_steps/__init__.py
new file mode 100644
index 0000000..eb28b96
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/pull_steps/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/tests/pull_steps/pull_step.py b/xos/synchronizers/new_base/tests/pull_steps/pull_step.py
new file mode 100644
index 0000000..2209b99
--- /dev/null
+++ b/xos/synchronizers/new_base/tests/pull_steps/pull_step.py
@@ -0,0 +1,20 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synchronizers.new_base.pullstep import PullStep
+from mock_modelaccessor import *
+
+class TestPullStep(PullStep):
+    def __init__(self):
+        super(TestPullStep, self).__init__(observed_model=Instance)
\ No newline at end of file
diff --git a/xos/synchronizers/new_base/tests/test_config.yaml b/xos/synchronizers/new_base/tests/test_config.yaml
index 5dc99fe..cd3dd64 100644
--- a/xos/synchronizers/new_base/tests/test_config.yaml
+++ b/xos/synchronizers/new_base/tests/test_config.yaml
@@ -30,3 +30,4 @@
       level: DEBUG 
 dependency_graph: "tests/model-deps"
 steps_dir: "tests/steps"
+pull_steps_dir: "tests/pull_steps"
diff --git a/xos/synchronizers/new_base/tests/test_load.py b/xos/synchronizers/new_base/tests/test_load.py
index ae267a5..06f8275 100644
--- a/xos/synchronizers/new_base/tests/test_load.py
+++ b/xos/synchronizers/new_base/tests/test_load.py
@@ -55,8 +55,10 @@
 
         b = backend.Backend()
         steps_dir = Config.get("steps_dir")
+        pull_steps_dir = Config.get("pull_steps_dir")
         self.steps = b.load_sync_step_modules(steps_dir)
-        self.synchronizer = event_loop.XOSObserver(self.steps)
+        self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
+        self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
 
     def tearDown(self):
         sys.path = self.sys_path_save
@@ -96,5 +98,8 @@
             observed_names = [o.__name__ for o in observes]
             self.assertIn(k, observed_names)
 
+    def test_load_pull_steps(self):
+        self.assertEqual(len(self.synchronizer.pull_steps), 1)
+
 if __name__ == '__main__':
     unittest.main()
diff --git a/xos/synchronizers/new_base/tests/test_run.py b/xos/synchronizers/new_base/tests/test_run.py
index c4ca3af..61147ea 100644
--- a/xos/synchronizers/new_base/tests/test_run.py
+++ b/xos/synchronizers/new_base/tests/test_run.py
@@ -64,8 +64,10 @@
 
         b = backend.Backend()
         steps_dir = Config.get("steps_dir")
+        pull_steps_dir = Config.get("pull_steps_dir")
         self.steps = b.load_sync_step_modules(steps_dir)
-        self.synchronizer = event_loop.XOSObserver(self.steps)
+        self.pull_steps = b.load_pull_step_modules(pull_steps_dir)
+        self.synchronizer = event_loop.XOSObserver(self.steps, self.pull_steps)
         try:
             os.remove('/tmp/sync_ports')
         except OSError:
@@ -81,7 +83,10 @@
 
     @mock.patch("steps.sync_instances.syncstep.run_template",side_effect=run_fake_ansible_template)
     @mock.patch("event_loop.model_accessor")
-    def test_run_once(self, mock_run_template, mock_accessor, *_other_accessors):
+    @mock.patch("pull_step.TestPullStep.pull_records")
+    def test_run_once(self, mock_pull_records, mock_run_template, mock_accessor, *_other_accessors):
+
+
         pending_objects, pending_steps = self.synchronizer.fetch_pending()
         pending_objects2 = list(pending_objects)
 
@@ -93,12 +98,15 @@
         any_cs.slice = slice
 
         self.synchronizer.run_once()
+
         sync_ports = open('/tmp/sync_ports').read()
         delete_ports = open('/tmp/delete_ports').read()
 
         self.assertIn("successful", sync_ports)
         self.assertIn("successful", delete_ports)
-    
+
+        mock_pull_records.assert_called_once()
+
 
 if __name__ == '__main__':
     unittest.main()
diff --git a/xos/xos_client/.gitignore b/xos/xos_client/.gitignore
index 608c120..f82b325 100644
--- a/xos/xos_client/.gitignore
+++ b/xos/xos_client/.gitignore
@@ -1,2 +1,3 @@
 build
 xosapi/chameleon
+xosapi.egg-info