store watcherd state
diff --git a/xos/hpc_observer/hpc_watcher.py b/xos/hpc_observer/hpc_watcher.py
index b0587f5..15adce9 100644
--- a/xos/hpc_observer/hpc_watcher.py
+++ b/xos/hpc_observer/hpc_watcher.py
@@ -46,6 +46,7 @@
import time
import pycurl
import traceback
+import json
from StringIO import StringIO
from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
@@ -279,6 +280,62 @@
job["status"] = "success"
+class WatcherWorker(Thread):
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.daemon = True
+ self.start()
+
+ def run(self):
+ while True:
+ job = self.queue.get_job()
+ self.handle_job(job)
+ self.queue.submit_result(job)
+
+ def curl_error_message(self, e):
+ if e.args[0] == 6:
+ return "couldn't resolve host"
+ if e.args[0] == 7:
+ return "failed to connect"
+ return "curl error %d" % e.args[0]
+
+ def handle_job(self, job):
+ server = job["server"]
+ port = job["port"]
+
+ try:
+ buffer = StringIO()
+ c = pycurl.Curl()
+
+ c.setopt(c.URL, "http://%s:%s/" % (server, port))
+ c.setopt(c.WRITEDATA, buffer)
+ c.setopt(c.TIMEOUT, 10)
+ c.setopt(c.CONNECTTIMEOUT, 10)
+ c.setopt(c.NOSIGNAL, 1)
+
+ try:
+ c.perform()
+ response_code = c.getinfo(c.RESPONSE_CODE)
+ except Exception, e:
+ #traceback.print_exc()
+ job["status"] = json.dumps( {"status": self.curl_error_message(e)} )
+ return
+ finally:
+ c.close()
+
+ if response_code != 200:
+ job["status"] = json.dumps( {"status": "error response %d" % response_code} )
+ return
+
+ d = json.loads(buffer.getvalue())
+ d["status"] = "success";
+ job["status"] = json.dumps(d)
+
+ except Exception, e:
+ job["status"] = json.dumps( {"status": "Exception: %s" % str(e)} )
+ return
+
class BaseWatcher(Thread):
def __init__(self):
Thread.__init__(self)
@@ -307,14 +364,17 @@
else:
Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
- def get_service_slices(self, service, kind):
+ def get_service_slices(self, service, kind=None):
try:
slices = service.slices.all()
except:
# buggy data model
slices = service.service.all()
- return [x for x in slices if (kind in x.name)]
+ if kind:
+ return [x for x in slices if (kind in x.name)]
+ else:
+ return list(slices)
class RRWatcher(BaseWatcher):
def __init__(self):
@@ -446,15 +506,54 @@
self.run_once()
time.sleep(10)
+class WatcherFetcher(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.fetch_queue = WorkQueue()
+ for i in range(0, 10):
+ WatcherWorker(queue = self.fetch_queue)
+
+ def fetch_watcher(self, service, slivers):
+ for sliver in slivers:
+ ip = sliver.get_public_ip()
+ if not ip:
+ ip = socket.gethostbyname(sliver.node.name)
+
+ port = 8015
+ if ("redir" in sliver.slice.name):
+ port = 8016
+ elif ("demux" in sliver.slice.name):
+ port = 8017
+
+ self.fetch_queue.submit_job({"server": ip, "port": port, "sliver": sliver})
+
+ while self.fetch_queue.outstanding > 0:
+ result = self.fetch_queue.get_result()
+ sliver = result["sliver"]
+ self.set_status(sliver, service, "watcher.watcher", result["status"])
+
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService):
+ self.fetch_watcher(hpcService, slice.slivers.all())
+
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
if __name__ == "__main__":
if "--once" in sys.argv:
RRWatcher().run_once()
HpcProber().run_once()
HpcFetcher().run_once()
+ WatcherFetcher().run_once()
else:
RRWatcher().start()
HpcProber().start()
HpcFetcher().start()
+ WatcherFetcher().start()
print "Running forever..."
while True: