Redis and watchers support patch for the synchronizer
Change-Id: If2931070ddbe06a05233ebc6b41b0f1368f5fb0e
diff --git a/xos/synchronizers/base/backend.py b/xos/synchronizers/base/backend.py
index 075a7b9..6f91c59 100644
--- a/xos/synchronizers/base/backend.py
+++ b/xos/synchronizers/base/backend.py
@@ -8,6 +8,12 @@
from django.utils import timezone
from diag import update_diag
+
+watchers_enabled = getattr(Config(), "observer_enable_watchers", None)
+
+if (watchers_enabled):
+ from synchronizers.base.watchers import XOSWatcher
+
logger = Logger(level=logging.INFO)
class Backend:
@@ -20,6 +26,12 @@
observer_thread = threading.Thread(target=observer.run,name='synchronizer')
observer_thread.start()
+ # start the watcher thread
+ if (watchers_enabled):
+ watcher = XOSWatcher()
+ watcher_thread = threading.Thread(target=watcher.run,name='watcher')
+ watcher_thread.start()
+
# start model policies thread
policies_dir = getattr(Config(), "observer_model_policies_dir", None)
if policies_dir:
diff --git a/xos/synchronizers/base/watchers.py b/xos/synchronizers/base/watchers.py
new file mode 100644
index 0000000..c29db32
--- /dev/null
+++ b/xos/synchronizers/base/watchers.py
@@ -0,0 +1,91 @@
+import os
+import inspect
+import imp
+import time
+import sys
+import traceback
+import commands
+import threading
+import json
+import pdb
+import pprint
+import traceback
+
+
+from datetime import datetime
+from collections import defaultdict
+from core.models import *
+from django.db.models import F, Q
+from django.db import connection
+from django.db import reset_queries
+from xos.logger import Logger, logging, logger
+from xos.config import Config, XOS_DIR
+from synchronizers.base.steps import *
+from syncstep import SyncStep
+from synchronizers.base.error_mapper import *
+from synchronizers.base.steps.sync_object import SyncObject
+from django.utils import timezone
+from diag import update_diag
+import redis
+
+class XOSWatcher:
+ def load_sync_step_modules(self, step_dir=None):
+ if step_dir is None:
+ try:
+ step_dir = Config().observer_steps_dir
+ except:
+ step_dir = '/opt/xos/synchronizers/openstack/steps'
+
+
+ for fn in os.listdir(step_dir):
+ pathname = os.path.join(step_dir,fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+ module = imp.load_source(fn[:-3],pathname)
+ for classname in dir(module):
+ c = getattr(module, classname, None)
+
+ # make sure 'c' is a descendent of SyncStep and has a
+ # provides field (this eliminates the abstract base classes
+ # since they don't have a provides)
+
+ if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
+ self.sync_steps.append(c)
+
+ def load_sync_steps(self):
+ for s in self.sync_steps:
+ if hasattr(s,'watches'):
+ for w in s.watches:
+ w.source = s
+ try:
+ self.watch_map[w.dest.__name__].append(w)
+ except:
+ self.watch_map[w.dest.__name__]=[w]
+
+ def __init__(self):
+ self.watch_map = {}
+ self.sync_steps = []
+ self.load_sync_step_modules()
+ self.load_sync_steps()
+ r = redis.Redis("redis")
+ channels = self.watch_map.keys()
+ self.redis = r
+ self.pubsub = self.redis.pubsub()
+ self.pubsub.subscribe(channels)
+
+ def run(self):
+ for item in self.pubsub.listen():
+ channel = item['channel']
+ try:
+ entry = self.watch_map[channel]
+ data = json.loads(item['data'])
+ pk = data['pk']
+ changed_fields = data['changed_fields']
+ pdb.set_trace()
+ for w in entry:
+ if w.into in changed_fields or not w.into:
+ if (hasattr(w.source, 'handle_watched_object')):
+ o = w.dest.objects.get(pk=data['pk'])
+ step = w.source()
+ step.handle_watched_object(o)
+ except:
+ pass