latest changes to hpc watcher
diff --git a/xos/core/xoslib/methods/hpcview.py b/xos/core/xoslib/methods/hpcview.py
index ba8a348..850e29e 100644
--- a/xos/core/xoslib/methods/hpcview.py
+++ b/xos/core/xoslib/methods/hpcview.py
@@ -73,6 +73,8 @@
hpc.append( {"name": sliver.node.name,
"watcher.HPC-hb.msg": lookup_tag(hpc_service, sliver, "watcher.HPC-hb.msg"),
"watcher.HPC-hb.time": lookup_time(hpc_service, sliver, "watcher.HPC-hb.time"),
+ "watcher.HPC-fetch.msg": lookup_tag(hpc_service, sliver, "watcher.HPC-fetch.msg"),
+ "watcher.HPC-fetch.time": lookup_time(hpc_service, sliver, "watcher.HPC-fetch.time"),
})
diff --git a/xos/core/xoslib/static/js/xosHpc.js b/xos/core/xoslib/static/js/xosHpc.js
index 1d8adf0..15dc404 100644
--- a/xos/core/xoslib/static/js/xosHpc.js
+++ b/xos/core/xoslib/static/js/xosHpc.js
@@ -1,8 +1,16 @@
-function staleCheck(row, time_key, msg_key) {
- if (parseInt(row[time_key])>30) {
+SC_RR = 60;
+SC_HPC_PROBE = 600;
+SC_HPC_FETCH = 3600;
+
+function staleCheck(row, time_key, msg_key, seconds) {
+ if (parseInt(row[time_key])>seconds) {
return "stale";
} else {
- return row[msg_key];
+ if (! row[msg_key]) {
+ return "null";
+ } else {
+ return row[msg_key];
+ }
}
}
@@ -15,7 +23,7 @@
for (rowkey in dnsdemux) {
row = dnsdemux[rowkey];
- actualEntries.push( [row.name, row.ip, staleCheck(row, "watcher.DNS.time", "watcher.DNS.msg")] );
+ actualEntries.push( [row.name, row.ip, staleCheck(row, "watcher.DNS.time", "watcher.DNS.msg", SC_RR)] );
}
console.log(actualEntries);
oTable = $('#dynamic_dnsdemux').dataTable( {
@@ -27,7 +35,7 @@
"aoColumns": [
{ "sTitle": "Node" },
{ "sTitle": "IP Address" },
- { "sTitle": "Status" },
+ { "sTitle": "Record Checker" },
]
} );
}
@@ -41,7 +49,7 @@
for (rowkey in dnsdemux) {
row = dnsdemux[rowkey];
- actualEntries.push( [row.name, staleCheck(row, "watcher.HPC-hb.time", "watcher.HPC-hb.msg")] );
+ actualEntries.push( [row.name, staleCheck(row, "watcher.HPC-hb.time", "watcher.HPC-hb.msg", SC_HPC_PROBE), staleCheck(row, "watcher.HPC-fetch.time", "watcher.HPC-fetch.msg", SC_HPC_FETCH) ] );
}
console.log(actualEntries);
oTable = $('#dynamic_hpc').dataTable( {
@@ -52,7 +60,8 @@
"bPaginate": false,
"aoColumns": [
{ "sTitle": "Node", },
- { "sTitle": "Status" },
+ { "sTitle": "Prober" },
+ { "sTitle": "Fetcher" },
]
} );
}
diff --git a/xos/hpc_observer/hpc_watcher.py b/xos/hpc_observer/hpc_watcher.py
index 698118d..c6224a6 100644
--- a/xos/hpc_observer/hpc_watcher.py
+++ b/xos/hpc_observer/hpc_watcher.py
@@ -1,3 +1,33 @@
+"""
+ hpc_watcher.py
+
+ Daemon to watch the health of HPC and RR slivers.
+
+ This deamon uses HpcHealthCheck objects in the Data Model to conduct
+ periodic tests of HPC and RR nodes. Two types of Health Checks are
+ supported:
+
+ kind="dns": checks the request routers to make sure that a DNS
+ name is resolveable and returns the right kind of records.
+
+ resource_name should be set to the domain name to lookup.
+
+ kind="http": checks the hpc nodes to make sure that a URL can be
+ retrieved from the node.
+
+ resource_name should be set to the HostName:Url to fetch. For
+ example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
+
+ In addition to the above, HPC heartbeat probes are conducted, similar to
+ the ones that dnsredir conducts.
+
+ The results of health checks are stored in a tag attached to the Sliver
+ the healthcheck was conducted against. If all healthchecks of a particular
+ variety were successful for a sliver, then "success" will be stored in
+ the tag. Otherwise, the first healthcheck to fail will be stored in the
+ tag.
+"""
+
import os
import sys
sys.path.append("/opt/xos")
@@ -163,7 +193,7 @@
c.close()
if response_code != 200:
- job["status"] = "error response %d" % c.getinfo(c.RESPONSE_CODE)
+ job["status"] = "error response %d" % response_code
return
except Exception, e:
@@ -172,15 +202,82 @@
job["status"] = "success"
-class HpcWatcher:
- def __init__(self):
- self.resolver_queue = WorkQueue()
- for i in range(0,10):
- DnsResolver(queue = self.resolver_queue)
+class HpcFetchUrl(Thread):
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.daemon = True
+ self.start()
- self.heartbeat_queue = WorkQueue()
- for i in range(0, 10):
- HpcHeartbeat(queue = self.heartbeat_queue)
+ def run(self):
+ while True:
+ job = self.queue.get_job()
+ self.handle_job(job)
+ self.queue.submit_result(job)
+
+ def curl_error_message(self, e):
+ if e.args[0] == 6:
+ return "couldn't resolve host"
+ if e.args[0] == 7:
+ return "failed to connect"
+ return "curl error %d" % e.args[0]
+
+ def handle_job(self, job):
+ server = job["server"]
+ port = job["port"]
+ url = job["url"]
+ domain = job["domain"]
+
+ def progress(download_t, download_d, upload_t, upload_d):
+ # limit download size to a megabyte
+ if (download_d > 1024*1024):
+ return 1
+ else:
+ return 0
+
+ try:
+ buffer = StringIO()
+ c = pycurl.Curl()
+
+ c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
+ c.setopt(c.WRITEDATA, buffer)
+ c.setopt(c.HTTPHEADER, ['host: ' + domain])
+ c.setopt(c.TIMEOUT, 10)
+ c.setopt(c.CONNECTTIMEOUT, 10)
+ c.setopt(c.NOSIGNAL, 1)
+ c.setopt(c.NOPROGRESS, 0)
+ c.setopt(c.PROGRESSFUNCTION, progress)
+
+ try:
+ try:
+ c.perform()
+ except Exception, e:
+ # prevent callback abort from raising exception
+ if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
+ raise
+ response_code = c.getinfo(c.RESPONSE_CODE)
+ bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
+ except Exception, e:
+ #traceback.print_exc()
+ job["status"] = self.curl_error_message(e)
+ return
+ finally:
+ c.close()
+
+ if response_code != 200:
+ job["status"] = "error response %s" % str(response_code)
+ return
+
+ except Exception, e:
+ job["status"] = "Exception: %s" % str(e)
+ return
+
+ job["status"] = "success"
+
+class BaseWatcher(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.daemon = True
def set_status(self, sliver, service, kind, msg):
#print sliver.node.name, kind, msg
@@ -205,6 +302,23 @@
else:
Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
+ def get_service_slices(self, service, kind):
+ try:
+ slices = service.slices.all()
+ except:
+ # buggy data model
+ slices = service.service.all()
+
+ return [x for x in slices if (kind in x.name)]
+
+class RRWatcher(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.resolver_queue = WorkQueue()
+ for i in range(0,10):
+ DnsResolver(queue = self.resolver_queue)
+
def check_request_routers(self, service, slivers):
for sliver in slivers:
sliver.has_error = False
@@ -231,6 +345,28 @@
if not sliver.has_error:
self.set_status(sliver, service, "watcher.DNS", "success")
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService, "dnsdemux"):
+ self.check_request_routers(hpcService, slice.slivers.all())
+
+ for rrService in RequestRouterService.objects.all():
+ for slice in self.get_service_slices(rrService, "dnsdemux"):
+ self.check_request_routers(rrService, slice.slivers.all())
+
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
+class HpcProber(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.heartbeat_queue = WorkQueue()
+ for i in range(0, 10):
+ HpcHeartbeat(queue = self.heartbeat_queue)
+
def probe_hpc(self, service, slivers):
for sliver in slivers:
sliver.has_error = False
@@ -247,36 +383,72 @@
if not sliver.has_error:
self.set_status(sliver, service, "watcher.HPC-hb", "success")
- def get_service_slices(self, service, kind):
- try:
- slices = service.slices.all()
- except:
- # buggy data model
- slices = service.service.all()
-
- return [x for x in slices if (kind in x.name)]
-
def run_once(self):
for hpcService in HpcService.objects.all():
- for slice in self.get_service_slices(hpcService, "dnsdemux"):
- self.check_request_routers(hpcService, slice.slivers.all())
-
- for rrService in RequestRouterService.objects.all():
- for slice in self.get_service_slices(rrService, "dnsdemux"):
- self.check_request_routers(rrService, slice.slivers.all())
-
- for hpcService in HpcService.objects.all():
for slice in self.get_service_slices(hpcService, "hpc"):
self.probe_hpc(hpcService, slice.slivers.all())
- def run_loop(self):
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
+class HpcFetcher(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.fetch_queue = WorkQueue()
+ for i in range(0, 10):
+ HpcFetchUrl(queue = self.fetch_queue)
+
+ def fetch_hpc(self, service, slivers):
+ for sliver in slivers:
+ sliver.has_error = False
+
+ checks = HpcHealthCheck.objects.filter(kind="http")
+ if not checks:
+ self.set_status(sliver, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
+
+ for check in checks:
+ if (not check.resource_name) or (":" not in check.resource_name):
+ self.set_status(sliver, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
+ break
+
+ (domain, url) = check.resource_name.split(":",1)
+
+ self.fetch_queue.submit_job({"server": sliver.node.name, "port": 80, "sliver": sliver, "domain": domain, "url": url})
+
+ while self.fetch_queue.outstanding > 0:
+ result = self.fetch_queue.get_result()
+ sliver = result["sliver"]
+ if (result["status"]!="success") and (not sliver.has_error):
+ self.set_status(sliver, service, "watcher.HPC-fetch", result["status"])
+
+ for sliver in slivers:
+ if not sliver.has_error:
+ self.set_status(sliver, service, "watcher.HPC-fetch", "success")
+
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService, "hpc"):
+ self.fetch_hpc(hpcService, slice.slivers.all())
+
+ def run(self):
while True:
self.run_once()
time.sleep(10)
if __name__ == "__main__":
if "--once" in sys.argv:
- HpcWatcher().run_once()
+ RRWatcher().run_once()
+ HpcProber().run_once()
+ HpcFetcher().run_once()
else:
- HpcWatcher().run_loop()
+ RRWatcher().start()
+ HpcProber().start()
+ HpcFetcher().start()
+
+ print "Running forever..."
+ while True:
+ time.sleep(60)