latest changes to hpc watcher
diff --git a/xos/core/xoslib/methods/hpcview.py b/xos/core/xoslib/methods/hpcview.py
index ba8a348..850e29e 100644
--- a/xos/core/xoslib/methods/hpcview.py
+++ b/xos/core/xoslib/methods/hpcview.py
@@ -73,6 +73,8 @@
         hpc.append( {"name": sliver.node.name,
                      "watcher.HPC-hb.msg": lookup_tag(hpc_service, sliver, "watcher.HPC-hb.msg"),
                      "watcher.HPC-hb.time": lookup_time(hpc_service, sliver, "watcher.HPC-hb.time"),
+                     "watcher.HPC-fetch.msg": lookup_tag(hpc_service, sliver, "watcher.HPC-fetch.msg"),
+                     "watcher.HPC-fetch.time": lookup_time(hpc_service, sliver, "watcher.HPC-fetch.time"),
 
         })
 
diff --git a/xos/core/xoslib/static/js/xosHpc.js b/xos/core/xoslib/static/js/xosHpc.js
index 1d8adf0..15dc404 100644
--- a/xos/core/xoslib/static/js/xosHpc.js
+++ b/xos/core/xoslib/static/js/xosHpc.js
@@ -1,8 +1,16 @@
-function staleCheck(row, time_key, msg_key) {
-    if (parseInt(row[time_key])>30) {
+SC_RR = 60;
+SC_HPC_PROBE = 600;
+SC_HPC_FETCH = 3600;
+
+function staleCheck(row, time_key, msg_key, seconds) {
+    if (parseInt(row[time_key])>seconds) {
         return "stale";
     } else {
-        return row[msg_key];
+        if (! row[msg_key]) {
+            return "null";
+        } else {
+            return row[msg_key];
+        }
     }
 }
 
@@ -15,7 +23,7 @@
     for (rowkey in dnsdemux) {
         row = dnsdemux[rowkey];
 
-        actualEntries.push( [row.name, row.ip, staleCheck(row, "watcher.DNS.time", "watcher.DNS.msg")] );
+        actualEntries.push( [row.name, row.ip, staleCheck(row, "watcher.DNS.time", "watcher.DNS.msg", SC_RR)] );
     }
     console.log(actualEntries);
     oTable = $('#dynamic_dnsdemux').dataTable( {
@@ -27,7 +35,7 @@
         "aoColumns": [
             { "sTitle": "Node" },
             { "sTitle": "IP Address" },
-            { "sTitle": "Status" },
+            { "sTitle": "Record Checker" },
         ]
     } );
 }
@@ -41,7 +49,7 @@
     for (rowkey in dnsdemux) {
         row = dnsdemux[rowkey];
 
-        actualEntries.push( [row.name, staleCheck(row, "watcher.HPC-hb.time", "watcher.HPC-hb.msg")] );
+        actualEntries.push( [row.name, staleCheck(row, "watcher.HPC-hb.time", "watcher.HPC-hb.msg", SC_HPC_PROBE), staleCheck(row, "watcher.HPC-fetch.time", "watcher.HPC-fetch.msg", SC_HPC_FETCH) ] );
     }
     console.log(actualEntries);
     oTable = $('#dynamic_hpc').dataTable( {
@@ -52,7 +60,8 @@
         "bPaginate": false,
         "aoColumns": [
             { "sTitle": "Node", },
-            { "sTitle": "Status" },
+            { "sTitle": "Prober" },
+            { "sTitle": "Fetcher" },
         ]
     } );
 }
diff --git a/xos/hpc_observer/hpc_watcher.py b/xos/hpc_observer/hpc_watcher.py
index 698118d..c6224a6 100644
--- a/xos/hpc_observer/hpc_watcher.py
+++ b/xos/hpc_observer/hpc_watcher.py
@@ -1,3 +1,33 @@
+"""
+    hpc_watcher.py
+
+    Daemon to watch the health of HPC and RR slivers.
+
+    This deamon uses HpcHealthCheck objects in the Data Model to conduct
+    periodic tests of HPC and RR nodes. Two types of Health Checks are
+    supported:
+
+       kind="dns": checks the request routers to make sure that a DNS
+         name is resolveable and returns the right kind of records.
+
+         resource_name should be set to the domain name to lookup.
+
+       kind="http": checks the hpc nodes to make sure that a URL can be
+         retrieved from the node.
+
+         resource_name should be set to the HostName:Url to fetch. For
+         example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
+
+     In addition to the above, HPC heartbeat probes are conducted, similar to
+     the ones that dnsredir conducts.
+
+     The results of health checks are stored in a tag attached to the Sliver
+     the healthcheck was conducted against. If all healthchecks of a particular
+     variety were successful for a sliver, then "success" will be stored in
+     the tag. Otherwise, the first healthcheck to fail will be stored in the
+     tag.
+"""
+
 import os
 import sys
 sys.path.append("/opt/xos")
@@ -163,7 +193,7 @@
                 c.close()
 
             if response_code != 200:
-                job["status"] = "error response %d" % c.getinfo(c.RESPONSE_CODE)
+                job["status"] = "error response %d" % response_code
                 return
 
         except Exception, e:
@@ -172,15 +202,82 @@
 
         job["status"] = "success"
 
-class HpcWatcher:
-    def __init__(self):
-        self.resolver_queue = WorkQueue()
-        for i in range(0,10):
-            DnsResolver(queue = self.resolver_queue)
+class HpcFetchUrl(Thread):
+    def __init__(self, queue):
+        Thread.__init__(self)
+        self.queue = queue
+        self.daemon = True
+        self.start()
 
-        self.heartbeat_queue = WorkQueue()
-        for i in range(0, 10):
-            HpcHeartbeat(queue = self.heartbeat_queue)
+    def run(self):
+        while True:
+            job = self.queue.get_job()
+            self.handle_job(job)
+            self.queue.submit_result(job)
+
+    def curl_error_message(self, e):
+        if e.args[0] == 6:
+            return "couldn't resolve host"
+        if e.args[0] == 7:
+            return "failed to connect"
+        return "curl error %d" % e.args[0]
+
+    def handle_job(self, job):
+        server = job["server"]
+        port = job["port"]
+        url = job["url"]
+        domain = job["domain"]
+
+        def progress(download_t, download_d, upload_t, upload_d):
+            # limit download size to a megabyte
+            if (download_d > 1024*1024):
+                return 1
+            else:
+                return 0
+
+        try:
+            buffer = StringIO()
+            c = pycurl.Curl()
+
+            c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
+            c.setopt(c.WRITEDATA, buffer)
+            c.setopt(c.HTTPHEADER, ['host: ' + domain])
+            c.setopt(c.TIMEOUT, 10)
+            c.setopt(c.CONNECTTIMEOUT, 10)
+            c.setopt(c.NOSIGNAL, 1)
+            c.setopt(c.NOPROGRESS, 0)
+            c.setopt(c.PROGRESSFUNCTION, progress)
+
+            try:
+                try:
+                    c.perform()
+                except Exception, e:
+                    # prevent callback abort from raising exception
+                    if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
+                        raise
+                response_code = c.getinfo(c.RESPONSE_CODE)
+                bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
+            except Exception, e:
+                #traceback.print_exc()
+                job["status"] = self.curl_error_message(e)
+                return
+            finally:
+                c.close()
+
+            if response_code != 200:
+                job["status"] = "error response %s" %  str(response_code)
+                return
+
+        except Exception, e:
+            job["status"] = "Exception: %s" % str(e)
+            return
+
+        job["status"] = "success"
+
+class BaseWatcher(Thread):
+    def __init__(self):
+        Thread.__init__(self)
+        self.daemon = True
 
     def set_status(self, sliver, service, kind, msg):
         #print sliver.node.name, kind, msg
@@ -205,6 +302,23 @@
         else:
             Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
 
+    def get_service_slices(self, service, kind):
+        try:
+            slices = service.slices.all()
+        except:
+            # buggy data model
+            slices = service.service.all()
+
+        return [x for x in slices if (kind in x.name)]
+
+class RRWatcher(BaseWatcher):
+    def __init__(self):
+        BaseWatcher.__init__(self)
+
+        self.resolver_queue = WorkQueue()
+        for i in range(0,10):
+            DnsResolver(queue = self.resolver_queue)
+
     def check_request_routers(self, service, slivers):
         for sliver in slivers:
             sliver.has_error = False
@@ -231,6 +345,28 @@
             if not sliver.has_error:
                 self.set_status(sliver, service, "watcher.DNS", "success")
 
+    def run_once(self):
+        for hpcService in HpcService.objects.all():
+            for slice in self.get_service_slices(hpcService, "dnsdemux"):
+                self.check_request_routers(hpcService, slice.slivers.all())
+
+        for rrService in RequestRouterService.objects.all():
+            for slice in self.get_service_slices(rrService, "dnsdemux"):
+                self.check_request_routers(rrService, slice.slivers.all())
+
+    def run(self):
+        while True:
+            self.run_once()
+            time.sleep(10)
+
+class HpcProber(BaseWatcher):
+    def __init__(self):
+        BaseWatcher.__init__(self)
+
+        self.heartbeat_queue = WorkQueue()
+        for i in range(0, 10):
+            HpcHeartbeat(queue = self.heartbeat_queue)
+
     def probe_hpc(self, service, slivers):
         for sliver in slivers:
             sliver.has_error = False
@@ -247,36 +383,72 @@
             if not sliver.has_error:
                 self.set_status(sliver, service, "watcher.HPC-hb", "success")
 
-    def get_service_slices(self, service, kind):
-        try:
-            slices = service.slices.all()
-        except:
-            # buggy data model
-            slices = service.service.all()
-
-        return [x for x in slices if (kind in x.name)]
-
     def run_once(self):
         for hpcService in HpcService.objects.all():
-            for slice in self.get_service_slices(hpcService, "dnsdemux"):
-                self.check_request_routers(hpcService, slice.slivers.all())
-
-        for rrService in RequestRouterService.objects.all():
-            for slice in self.get_service_slices(rrService, "dnsdemux"):
-                self.check_request_routers(rrService, slice.slivers.all())
-
-        for hpcService in HpcService.objects.all():
             for slice in self.get_service_slices(hpcService, "hpc"):
                 self.probe_hpc(hpcService, slice.slivers.all())
 
-    def run_loop(self):
+    def run(self):
+        while True:
+            self.run_once()
+            time.sleep(10)
+
+class HpcFetcher(BaseWatcher):
+    def __init__(self):
+        BaseWatcher.__init__(self)
+
+        self.fetch_queue = WorkQueue()
+        for i in range(0, 10):
+            HpcFetchUrl(queue = self.fetch_queue)
+
+    def fetch_hpc(self, service, slivers):
+        for sliver in slivers:
+            sliver.has_error = False
+
+            checks = HpcHealthCheck.objects.filter(kind="http")
+            if not checks:
+                self.set_status(sliver, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
+
+            for check in checks:
+                if (not check.resource_name) or (":" not in check.resource_name):
+                    self.set_status(sliver, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
+                    break
+
+                (domain, url) = check.resource_name.split(":",1)
+
+                self.fetch_queue.submit_job({"server": sliver.node.name, "port": 80, "sliver": sliver, "domain": domain, "url": url})
+
+        while self.fetch_queue.outstanding > 0:
+            result = self.fetch_queue.get_result()
+            sliver = result["sliver"]
+            if (result["status"]!="success") and (not sliver.has_error):
+                self.set_status(sliver, service, "watcher.HPC-fetch", result["status"])
+
+        for sliver in slivers:
+            if not sliver.has_error:
+                self.set_status(sliver, service, "watcher.HPC-fetch", "success")
+
+    def run_once(self):
+        for hpcService in HpcService.objects.all():
+            for slice in self.get_service_slices(hpcService, "hpc"):
+                self.fetch_hpc(hpcService, slice.slivers.all())
+
+    def run(self):
         while True:
             self.run_once()
             time.sleep(10)
 
 if __name__ == "__main__":
     if "--once" in sys.argv:
-        HpcWatcher().run_once()
+        RRWatcher().run_once()
+        HpcProber().run_once()
+        HpcFetcher().run_once()
     else:
-        HpcWatcher().run_loop()
+        RRWatcher().start()
+        HpcProber().start()
+        HpcFetcher().start()
+
+        print "Running forever..."
+        while True:
+            time.sleep(60)