blob: a23efdf147117ca2694bb026a36bced017c15c43 [file] [log] [blame]
import os
import inspect
import imp
import sys
import threading
import time
from syncstep import SyncStep
from synchronizers.base.event_loop import XOSObserver
from xos.logger import Logger, logging
from xos.config import Config
from django.utils import timezone
from diag import update_diag
watchers_enabled = getattr(Config(), "observer_enable_watchers", None)
if (watchers_enabled):
from synchronizers.base.watchers import XOSWatcher
logger = Logger(level=logging.INFO)
class Backend:
def load_sync_step_modules(self, step_dir=None):
sync_steps = []
if step_dir is None:
try:
step_dir = Config().observer_steps_dir
except:
step_dir = '/opt/xos/synchronizers/openstack/steps'
for fn in os.listdir(step_dir):
pathname = os.path.join(step_dir,fn)
if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
module = imp.load_source(fn[:-3],pathname)
for classname in dir(module):
c = getattr(module, classname, None)
# make sure 'c' is a descendent of SyncStep and has a
# provides field (this eliminates the abstract base classes
# since they don't have a provides)
if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in sync_steps):
sync_steps.append(c)
return sync_steps
def run(self):
update_diag(sync_start=time.time(), backend_status="0 - Synchronizer Start")
sync_steps = self.load_sync_step_modules()
# start the observer
observer = XOSObserver(sync_steps)
observer_thread = threading.Thread(target=observer.run,name='synchronizer')
observer_thread.start()
# start the watcher thread
if (watchers_enabled):
watcher = XOSWatcher(sync_steps)
watcher_thread = threading.Thread(target=watcher.run,name='watcher')
watcher_thread.start()
# start model policies thread
policies_dir = getattr(Config(), "observer_model_policies_dir", None)
if policies_dir:
from synchronizers.model_policy import run_policy
model_policy_thread = threading.Thread(target=run_policy)
model_policy_thread.start()
else:
model_policy_thread = None
logger.info("Skipping model policies thread due to no model_policies dir.")
while True:
try:
time.sleep(1000)
except KeyboardInterrupt:
print "exiting due to keyboard interrupt"
# TODO: See about setting the threads as daemons
observer_thread._Thread__stop()
if model_policy_thread:
model_policy_thread._Thread__stop()
sys.exit(1)