Redis and watchers support patch for the synchronizer

Change-Id: If2931070ddbe06a05233ebc6b41b0f1368f5fb0e
diff --git a/xos/synchronizers/base/backend.py b/xos/synchronizers/base/backend.py
index 075a7b9..6f91c59 100644
--- a/xos/synchronizers/base/backend.py
+++ b/xos/synchronizers/base/backend.py
@@ -8,6 +8,12 @@
 from django.utils import timezone
 from diag import update_diag
 
+
+watchers_enabled = getattr(Config(), "observer_enable_watchers", None)
+
+if (watchers_enabled):
+    from synchronizers.base.watchers import XOSWatcher
+
 logger = Logger(level=logging.INFO)
 
 class Backend:
@@ -20,6 +26,12 @@
         observer_thread = threading.Thread(target=observer.run,name='synchronizer')
         observer_thread.start()
 
+        # start the watcher thread
+        if (watchers_enabled):
+            watcher = XOSWatcher()
+            watcher_thread = threading.Thread(target=watcher.run,name='watcher')
+            watcher_thread.start()
+
         # start model policies thread
         policies_dir = getattr(Config(), "observer_model_policies_dir", None)
         if policies_dir:
diff --git a/xos/synchronizers/base/watchers.py b/xos/synchronizers/base/watchers.py
new file mode 100644
index 0000000..c29db32
--- /dev/null
+++ b/xos/synchronizers/base/watchers.py
@@ -0,0 +1,91 @@
+import os
+import inspect
+import imp
+import time
+import sys
+import traceback
+import commands
+import threading
+import json
+import pdb
+import pprint
+import traceback
+
+
+from datetime import datetime
+from collections import defaultdict
+from core.models import *
+from django.db.models import F, Q
+from django.db import connection
+from django.db import reset_queries
+from xos.logger import Logger, logging, logger
+from xos.config import Config, XOS_DIR
+from synchronizers.base.steps import *
+from syncstep import SyncStep
+from synchronizers.base.error_mapper import *
+from synchronizers.base.steps.sync_object import SyncObject
+from django.utils import timezone
+from diag import update_diag
+import redis
+
+class XOSWatcher:
+    def load_sync_step_modules(self, step_dir=None):
+        if step_dir is None:
+            try:
+                step_dir = Config().observer_steps_dir
+            except:
+                step_dir = '/opt/xos/synchronizers/openstack/steps'
+
+
+        for fn in os.listdir(step_dir):
+            pathname = os.path.join(step_dir,fn)
+            if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+                module = imp.load_source(fn[:-3],pathname)
+                for classname in dir(module):
+                    c = getattr(module, classname, None)
+
+                    # make sure 'c' is a descendent of SyncStep and has a
+                    # provides field (this eliminates the abstract base classes
+                    # since they don't have a provides)
+
+                    if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
+                        self.sync_steps.append(c)
+
+    def load_sync_steps(self):
+        for s in self.sync_steps:
+            if hasattr(s,'watches'):
+                for w in s.watches:
+                    w.source = s
+                    try:
+                        self.watch_map[w.dest.__name__].append(w)
+                    except:
+                        self.watch_map[w.dest.__name__]=[w]
+
+    def __init__(self):
+        self.watch_map = {}
+        self.sync_steps = []
+        self.load_sync_step_modules()
+        self.load_sync_steps()
+        r = redis.Redis("redis")
+        channels = self.watch_map.keys()
+        self.redis = r
+        self.pubsub = self.redis.pubsub()
+        self.pubsub.subscribe(channels)
+
+    def run(self):
+        for item in self.pubsub.listen():
+            channel = item['channel']
+            try:
+                entry = self.watch_map[channel]
+                data = json.loads(item['data'])
+                pk = data['pk']
+                changed_fields = data['changed_fields']
+                pdb.set_trace()
+                for w in entry:
+                    if w.into in changed_fields or not w.into:
+                        if (hasattr(w.source, 'handle_watched_object')):
+                            o = w.dest.objects.get(pk=data['pk'])
+                            step = w.source()
+                            step.handle_watched_object(o)
+            except:
+                pass