SEBA-405 Convert synchronizer framework to library
Change-Id: If8562f23dc15c7d18d7a8b040b33756708b3c5ec
diff --git a/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
new file mode 100644
index 0000000..3f4732d
--- /dev/null
+++ b/lib/xos-synchronizer/xossynchronizer/pull_step_engine.py
@@ -0,0 +1,102 @@
+# Copyright 2017-present Open Networking Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import imp
+import inspect
+import os
+import threading
+import time
+from xosconfig import Config
+from multistructlog import create_logger
+
+log = create_logger(Config().get("logging"))
+
+
+class XOSPullStepScheduler:
+ """ XOSPullStepThread
+
+ A Thread for servicing pull steps. There is one event_step associated with one XOSPullStepThread.
+ The thread's pull_records() function is called for every five seconds.
+ """
+
+ def __init__(self, steps, *args, **kwargs):
+ self.steps = steps
+
+ def run(self):
+ while True:
+ time.sleep(5)
+ self.run_once()
+
+ def run_once(self):
+ log.trace("Starting pull steps", steps=self.steps)
+
+ threads = []
+ for step in self.steps:
+ thread = threading.Thread(target=step().pull_records, name="pull_step")
+ threads.append(thread)
+
+ for t in threads:
+ t.start()
+
+ for t in threads:
+ t.join()
+
+ log.trace("Done with pull steps", steps=self.steps)
+
+
+class XOSPullStepEngine:
+ """ XOSPullStepEngine
+
+ Load pull step modules. Two methods are defined:
+
+ load_pull_step_modules(dir) ... look for step modules in the given directory, and load objects that are
+ descendant from PullStep.
+
+ start() ... Launch threads to handle processing of the PullSteps. It's expected that load_step_modules()
+ will be called before start().
+ """
+
+ def __init__(self):
+ self.pull_steps = []
+
+ def load_pull_step_modules(self, pull_step_dir):
+ self.pull_steps = []
+ log.info("Loading pull steps", pull_step_dir=pull_step_dir)
+
+ # NOTE we'll load all the classes that inherit from PullStep
+ for fn in os.listdir(pull_step_dir):
+ pathname = os.path.join(pull_step_dir, fn)
+ if (
+ os.path.isfile(pathname)
+ and fn.endswith(".py")
+ and (fn != "__init__.py")
+ and ("test" not in fn)
+ ):
+ event_module = imp.load_source(fn[:-3], pathname)
+
+ for classname in dir(event_module):
+ c = getattr(event_module, classname, None)
+
+ if inspect.isclass(c):
+ base_names = [b.__name__ for b in c.__bases__]
+ if "PullStep" in base_names:
+ self.pull_steps.append(c)
+ log.info("Loaded pull steps", steps=self.pull_steps)
+
+ def start(self):
+ log.info("Starting pull steps engine", steps=self.pull_steps)
+
+ for step in self.pull_steps:
+ sched = XOSPullStepScheduler(steps=self.pull_steps)
+ sched.run()