Merge branch 'master' of github.com:open-cloud/xos
diff --git a/xos/hpc_observer/hpc_watcher.py b/xos/hpc_observer/hpc_watcher.py
index b0587f5..15adce9 100644
--- a/xos/hpc_observer/hpc_watcher.py
+++ b/xos/hpc_observer/hpc_watcher.py
@@ -46,6 +46,7 @@
 import time
 import pycurl
 import traceback
+import json
 from StringIO import StringIO
 
 from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
@@ -279,6 +280,62 @@
 
         job["status"] = "success"
 
+class WatcherWorker(Thread):
+    def __init__(self, queue):
+        Thread.__init__(self)
+        self.queue = queue
+        self.daemon = True
+        self.start()
+
+    def run(self):
+        while True:
+            job = self.queue.get_job()
+            self.handle_job(job)
+            self.queue.submit_result(job)
+
+    def curl_error_message(self, e):
+        if e.args[0] == 6:
+            return "couldn't resolve host"
+        if e.args[0] == 7:
+            return "failed to connect"
+        return "curl error %d" % e.args[0]
+
+    def handle_job(self, job):
+        server = job["server"]
+        port = job["port"]
+
+        try:
+            buffer = StringIO()
+            c = pycurl.Curl()
+
+            c.setopt(c.URL, "http://%s:%s/" % (server, port))
+            c.setopt(c.WRITEDATA, buffer)
+            c.setopt(c.TIMEOUT, 10)
+            c.setopt(c.CONNECTTIMEOUT, 10)
+            c.setopt(c.NOSIGNAL, 1)
+
+            try:
+                c.perform()
+                response_code = c.getinfo(c.RESPONSE_CODE)
+            except Exception, e:
+                #traceback.print_exc()
+                job["status"] = json.dumps( {"status": self.curl_error_message(e)} )
+                return
+            finally:
+                c.close()
+
+            if response_code != 200:
+                job["status"] = json.dumps( {"status": "error response %d" % response_code} )
+                return
+
+            d = json.loads(buffer.getvalue())
+            d["status"] = "success";
+            job["status"] = json.dumps(d)
+
+        except Exception, e:
+            job["status"] = json.dumps( {"status": "Exception: %s" % str(e)} )
+            return
+
 class BaseWatcher(Thread):
     def __init__(self):
         Thread.__init__(self)
@@ -307,14 +364,17 @@
         else:
             Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
 
-    def get_service_slices(self, service, kind):
+    def get_service_slices(self, service, kind=None):
         try:
             slices = service.slices.all()
         except:
             # buggy data model
             slices = service.service.all()
 
-        return [x for x in slices if (kind in x.name)]
+        if kind:
+            return [x for x in slices if (kind in x.name)]
+        else:
+            return list(slices)
 
 class RRWatcher(BaseWatcher):
     def __init__(self):
@@ -446,15 +506,54 @@
             self.run_once()
             time.sleep(10)
 
+class WatcherFetcher(BaseWatcher):
+    def __init__(self):
+        BaseWatcher.__init__(self)
+
+        self.fetch_queue = WorkQueue()
+        for i in range(0, 10):
+             WatcherWorker(queue = self.fetch_queue)
+
+    def fetch_watcher(self, service, slivers):
+        for sliver in slivers:
+            ip = sliver.get_public_ip()
+            if not ip:
+                ip = socket.gethostbyname(sliver.node.name)
+
+            port = 8015
+            if ("redir" in sliver.slice.name):
+                port = 8016
+            elif ("demux" in sliver.slice.name):
+                port = 8017
+
+            self.fetch_queue.submit_job({"server": ip, "port": port, "sliver": sliver})
+
+        while self.fetch_queue.outstanding > 0:
+            result = self.fetch_queue.get_result()
+            sliver = result["sliver"]
+            self.set_status(sliver, service, "watcher.watcher", result["status"])
+
+    def run_once(self):
+        for hpcService in HpcService.objects.all():
+            for slice in self.get_service_slices(hpcService):
+                self.fetch_watcher(hpcService, slice.slivers.all())
+
+    def run(self):
+        while True:
+            self.run_once()
+            time.sleep(10)
+
 if __name__ == "__main__":
     if "--once" in sys.argv:
         RRWatcher().run_once()
         HpcProber().run_once()
         HpcFetcher().run_once()
+        WatcherFetcher().run_once()
     else:
         RRWatcher().start()
         HpcProber().start()
         HpcFetcher().start()
+        WatcherFetcher().start()
 
         print "Running forever..."
         while True: