blob: c6224a60f74f6148017d076245a708e39f655a09 [file] [log] [blame]
Scott Bakere98a1332015-03-21 15:04:30 -07001"""
2 hpc_watcher.py
3
4 Daemon to watch the health of HPC and RR slivers.
5
6 This deamon uses HpcHealthCheck objects in the Data Model to conduct
7 periodic tests of HPC and RR nodes. Two types of Health Checks are
8 supported:
9
10 kind="dns": checks the request routers to make sure that a DNS
11 name is resolveable and returns the right kind of records.
12
13 resource_name should be set to the domain name to lookup.
14
15 kind="http": checks the hpc nodes to make sure that a URL can be
16 retrieved from the node.
17
18 resource_name should be set to the HostName:Url to fetch. For
19 example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
20
21 In addition to the above, HPC heartbeat probes are conducted, similar to
22 the ones that dnsredir conducts.
23
24 The results of health checks are stored in a tag attached to the Sliver
25 the healthcheck was conducted against. If all healthchecks of a particular
26 variety were successful for a sliver, then "success" will be stored in
27 the tag. Otherwise, the first healthcheck to fail will be stored in the
28 tag.
29"""
30
Scott Baker43e9f2c2015-03-16 16:43:51 -070031import os
32import sys
33sys.path.append("/opt/xos")
34os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
35import django
36from django.contrib.contenttypes.models import ContentType
37from core.models import *
38from hpc.models import *
39from requestrouter.models import *
40django.setup()
41import time
Scott Baker42f80bc2015-03-16 22:52:13 -070042import pycurl
43import traceback
44from StringIO import StringIO
Scott Baker43e9f2c2015-03-16 16:43:51 -070045
46from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
47from dnslib.digparser import DigParser
48
49from threading import Thread, Condition
50
Scott Baker6d1eb3e2015-03-18 23:21:04 -070051"""
52from dnslib import *
53q = DNSRecord(q=DNSQuestion("cdn-stream.htm.fiu.edu"))
54a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
55a = DNSRecord.parse(a_pkt)
56
57from dnslib import *
58q = DNSRecord(q=DNSQuestion("onlab.vicci.org"))
59a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
60a = DNSRecord.parse(a_pkt)
61"""
62
Scott Baker43e9f2c2015-03-16 16:43:51 -070063class WorkQueue:
64 def __init__(self):
65 self.job_cv = Condition()
66 self.jobs = []
67 self.result_cv = Condition()
68 self.results = []
69 self.outstanding = 0
70
71 def get_job(self):
72 self.job_cv.acquire()
73 while not self.jobs:
74 self.job_cv.wait()
75 result = self.jobs.pop()
76 self.job_cv.release()
77 return result
78
79 def submit_job(self, job):
80 self.job_cv.acquire()
81 self.jobs.append(job)
82 self.job_cv.notify()
83 self.job_cv.release()
84 self.outstanding = self.outstanding + 1
85
86 def get_result(self):
87 self.result_cv.acquire()
88 while not self.results:
89 self.result_cv.wait()
90 result = self.results.pop()
91 self.result_cv.release()
92 self.outstanding = self.outstanding - 1
93 return result
94
95 def submit_result(self, result):
96 self.result_cv.acquire()
97 self.results.append(result)
98 self.result_cv.notify()
99 self.result_cv.release()
100
101class DnsResolver(Thread):
102 def __init__(self, queue):
103 Thread.__init__(self)
104 self.queue = queue
105 self.daemon = True
106 self.start()
107
108 def run(self):
109 while True:
110 job = self.queue.get_job()
111 self.handle_job(job)
112 self.queue.submit_result(job)
113
114 def handle_job(self, job):
115 domain = job["domain"]
116 server = job["server"]
117 port = job["port"]
Scott Baker6d1eb3e2015-03-18 23:21:04 -0700118 result_contains = job.get("result_contains", None)
Scott Baker43e9f2c2015-03-16 16:43:51 -0700119
120 try:
Scott Baker6d1eb3e2015-03-18 23:21:04 -0700121 q = DNSRecord(q=DNSQuestion(domain)) #, getattr(QTYPE,"A")))
Scott Baker43e9f2c2015-03-16 16:43:51 -0700122
123 a_pkt = q.send(server, port, tcp=False, timeout=10)
124 a = DNSRecord.parse(a_pkt)
125
Scott Baker6d1eb3e2015-03-18 23:21:04 -0700126 found_record = False
127 for record in a.rr:
128 if (not result_contains):
129 if ((record.rtype==QTYPE_A) or (record.qtype==QTYPE_CNAME)):
130 found_record = True
131 else:
132 tmp = QTYPE.get(record.rtype) + str(record.rdata)
133 if (result_contains in tmp):
134 found_record = True
Scott Baker43e9f2c2015-03-16 16:43:51 -0700135
Scott Baker6d1eb3e2015-03-18 23:21:04 -0700136 if not found_record:
137 if result_contains:
138 job["status"] = "%s,No %s records" % (domain, result_contains)
139 else:
140 job["status"] = "%s,No A or CNAME records" % domain
141
Scott Baker43e9f2c2015-03-16 16:43:51 -0700142 return
143
144 except Exception, e:
145 job["status"] = "%s,Exception: %s" % (domain, str(e))
146 return
147
148 job["status"] = "success"
149
Scott Baker42f80bc2015-03-16 22:52:13 -0700150class HpcHeartbeat(Thread):
151 def __init__(self, queue):
152 Thread.__init__(self)
153 self.queue = queue
154 self.daemon = True
155 self.start()
156
157 def run(self):
158 while True:
159 job = self.queue.get_job()
160 self.handle_job(job)
161 self.queue.submit_result(job)
162
163 def curl_error_message(self, e):
164 if e.args[0] == 6:
165 return "couldn't resolve host"
166 if e.args[0] == 7:
167 return "failed to connect"
168 return "curl error %d" % e.args[0]
169
170 def handle_job(self, job):
171 server = job["server"]
172 port = job["port"]
173
174 try:
175 buffer = StringIO()
176 c = pycurl.Curl()
177
178 c.setopt(c.URL, "http://%s:%s/heartbeat" % (server, port))
179 c.setopt(c.WRITEDATA, buffer)
180 c.setopt(c.HTTPHEADER, ['host: hpc-heartbeat', 'X-heartbeat: 1'])
181 c.setopt(c.TIMEOUT, 10)
182 c.setopt(c.CONNECTTIMEOUT, 10)
183 c.setopt(c.NOSIGNAL, 1)
184
185 try:
186 c.perform()
187 response_code = c.getinfo(c.RESPONSE_CODE)
188 except Exception, e:
189 #traceback.print_exc()
190 job["status"] = self.curl_error_message(e)
191 return
192 finally:
193 c.close()
194
195 if response_code != 200:
Scott Bakere98a1332015-03-21 15:04:30 -0700196 job["status"] = "error response %d" % response_code
Scott Baker42f80bc2015-03-16 22:52:13 -0700197 return
198
199 except Exception, e:
200 job["status"] = "Exception: %s" % str(e)
201 return
202
203 job["status"] = "success"
204
Scott Bakere98a1332015-03-21 15:04:30 -0700205class HpcFetchUrl(Thread):
206 def __init__(self, queue):
207 Thread.__init__(self)
208 self.queue = queue
209 self.daemon = True
210 self.start()
Scott Baker43e9f2c2015-03-16 16:43:51 -0700211
Scott Bakere98a1332015-03-21 15:04:30 -0700212 def run(self):
213 while True:
214 job = self.queue.get_job()
215 self.handle_job(job)
216 self.queue.submit_result(job)
217
218 def curl_error_message(self, e):
219 if e.args[0] == 6:
220 return "couldn't resolve host"
221 if e.args[0] == 7:
222 return "failed to connect"
223 return "curl error %d" % e.args[0]
224
225 def handle_job(self, job):
226 server = job["server"]
227 port = job["port"]
228 url = job["url"]
229 domain = job["domain"]
230
231 def progress(download_t, download_d, upload_t, upload_d):
232 # limit download size to a megabyte
233 if (download_d > 1024*1024):
234 return 1
235 else:
236 return 0
237
238 try:
239 buffer = StringIO()
240 c = pycurl.Curl()
241
242 c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
243 c.setopt(c.WRITEDATA, buffer)
244 c.setopt(c.HTTPHEADER, ['host: ' + domain])
245 c.setopt(c.TIMEOUT, 10)
246 c.setopt(c.CONNECTTIMEOUT, 10)
247 c.setopt(c.NOSIGNAL, 1)
248 c.setopt(c.NOPROGRESS, 0)
249 c.setopt(c.PROGRESSFUNCTION, progress)
250
251 try:
252 try:
253 c.perform()
254 except Exception, e:
255 # prevent callback abort from raising exception
256 if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
257 raise
258 response_code = c.getinfo(c.RESPONSE_CODE)
259 bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
260 except Exception, e:
261 #traceback.print_exc()
262 job["status"] = self.curl_error_message(e)
263 return
264 finally:
265 c.close()
266
267 if response_code != 200:
268 job["status"] = "error response %s" % str(response_code)
269 return
270
271 except Exception, e:
272 job["status"] = "Exception: %s" % str(e)
273 return
274
275 job["status"] = "success"
276
277class BaseWatcher(Thread):
278 def __init__(self):
279 Thread.__init__(self)
280 self.daemon = True
Scott Baker42f80bc2015-03-16 22:52:13 -0700281
Scott Baker43e9f2c2015-03-16 16:43:51 -0700282 def set_status(self, sliver, service, kind, msg):
Scott Baker42f80bc2015-03-16 22:52:13 -0700283 #print sliver.node.name, kind, msg
Scott Baker43e9f2c2015-03-16 16:43:51 -0700284 sliver.has_error = (msg!="success")
285
286 sliver_type = ContentType.objects.get_for_model(sliver)
287
288 t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=sliver_type.id, object_id=sliver.id)
289 if t:
290 t=t[0]
291 if (t.value != msg):
292 t.value = msg
293 t.save()
294 else:
295 Tag(service=service, name=kind+".msg", content_object = sliver, value=msg).save()
296
297 t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=sliver_type.id, object_id=sliver.id)
298 if t:
299 t=t[0]
300 t.value = str(time.time())
301 t.save()
302 else:
303 Tag(service=service, name=kind+".time", content_object = sliver, value=str(time.time())).save()
304
Scott Bakere98a1332015-03-21 15:04:30 -0700305 def get_service_slices(self, service, kind):
306 try:
307 slices = service.slices.all()
308 except:
309 # buggy data model
310 slices = service.service.all()
311
312 return [x for x in slices if (kind in x.name)]
313
314class RRWatcher(BaseWatcher):
315 def __init__(self):
316 BaseWatcher.__init__(self)
317
318 self.resolver_queue = WorkQueue()
319 for i in range(0,10):
320 DnsResolver(queue = self.resolver_queue)
321
Scott Baker43e9f2c2015-03-16 16:43:51 -0700322 def check_request_routers(self, service, slivers):
323 for sliver in slivers:
324 sliver.has_error = False
325
Scott Bakerc282e3c2015-03-16 17:13:58 -0700326 ip = sliver.get_public_ip()
Scott Baker43e9f2c2015-03-16 16:43:51 -0700327 if not ip:
328 self.set_status(sliver, service, "watcher.DNS", "no public IP")
329 continue
330
Scott Baker6d1eb3e2015-03-18 23:21:04 -0700331 checks = HpcHealthCheck.objects.filter(kind="dns")
332 if not checks:
333 self.set_status(sliver, service, "watcher.DNS", "no DNS HealthCheck tests configured")
334
335 for check in checks:
336 self.resolver_queue.submit_job({"domain": check.resource_name, "server": ip, "port": 53, "sliver": sliver, "result_contains": check.result_contains})
Scott Baker43e9f2c2015-03-16 16:43:51 -0700337
Scott Baker43e9f2c2015-03-16 16:43:51 -0700338 while self.resolver_queue.outstanding > 0:
339 result = self.resolver_queue.get_result()
340 sliver = result["sliver"]
341 if (result["status"]!="success") and (not sliver.has_error):
342 self.set_status(sliver, service, "watcher.DNS", result["status"])
343
344 for sliver in slivers:
345 if not sliver.has_error:
346 self.set_status(sliver, service, "watcher.DNS", "success")
347
Scott Bakere98a1332015-03-21 15:04:30 -0700348 def run_once(self):
349 for hpcService in HpcService.objects.all():
350 for slice in self.get_service_slices(hpcService, "dnsdemux"):
351 self.check_request_routers(hpcService, slice.slivers.all())
352
353 for rrService in RequestRouterService.objects.all():
354 for slice in self.get_service_slices(rrService, "dnsdemux"):
355 self.check_request_routers(rrService, slice.slivers.all())
356
357 def run(self):
358 while True:
359 self.run_once()
360 time.sleep(10)
361
362class HpcProber(BaseWatcher):
363 def __init__(self):
364 BaseWatcher.__init__(self)
365
366 self.heartbeat_queue = WorkQueue()
367 for i in range(0, 10):
368 HpcHeartbeat(queue = self.heartbeat_queue)
369
Scott Baker42f80bc2015-03-16 22:52:13 -0700370 def probe_hpc(self, service, slivers):
371 for sliver in slivers:
372 sliver.has_error = False
373
374 self.heartbeat_queue.submit_job({"server": sliver.node.name, "port": 8009, "sliver": sliver})
375
376 while self.heartbeat_queue.outstanding > 0:
377 result = self.heartbeat_queue.get_result()
378 sliver = result["sliver"]
379 if (result["status"]!="success") and (not sliver.has_error):
380 self.set_status(sliver, service, "watcher.HPC-hb", result["status"])
381
382 for sliver in slivers:
383 if not sliver.has_error:
384 self.set_status(sliver, service, "watcher.HPC-hb", "success")
385
Scott Baker43e9f2c2015-03-16 16:43:51 -0700386 def run_once(self):
387 for hpcService in HpcService.objects.all():
Scott Baker42f80bc2015-03-16 22:52:13 -0700388 for slice in self.get_service_slices(hpcService, "hpc"):
389 self.probe_hpc(hpcService, slice.slivers.all())
390
Scott Bakere98a1332015-03-21 15:04:30 -0700391 def run(self):
392 while True:
393 self.run_once()
394 time.sleep(10)
395
396class HpcFetcher(BaseWatcher):
397 def __init__(self):
398 BaseWatcher.__init__(self)
399
400 self.fetch_queue = WorkQueue()
401 for i in range(0, 10):
402 HpcFetchUrl(queue = self.fetch_queue)
403
404 def fetch_hpc(self, service, slivers):
405 for sliver in slivers:
406 sliver.has_error = False
407
408 checks = HpcHealthCheck.objects.filter(kind="http")
409 if not checks:
410 self.set_status(sliver, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
411
412 for check in checks:
413 if (not check.resource_name) or (":" not in check.resource_name):
414 self.set_status(sliver, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
415 break
416
417 (domain, url) = check.resource_name.split(":",1)
418
419 self.fetch_queue.submit_job({"server": sliver.node.name, "port": 80, "sliver": sliver, "domain": domain, "url": url})
420
421 while self.fetch_queue.outstanding > 0:
422 result = self.fetch_queue.get_result()
423 sliver = result["sliver"]
424 if (result["status"]!="success") and (not sliver.has_error):
425 self.set_status(sliver, service, "watcher.HPC-fetch", result["status"])
426
427 for sliver in slivers:
428 if not sliver.has_error:
429 self.set_status(sliver, service, "watcher.HPC-fetch", "success")
430
431 def run_once(self):
432 for hpcService in HpcService.objects.all():
433 for slice in self.get_service_slices(hpcService, "hpc"):
434 self.fetch_hpc(hpcService, slice.slivers.all())
435
436 def run(self):
Scott Baker42f80bc2015-03-16 22:52:13 -0700437 while True:
438 self.run_once()
439 time.sleep(10)
Scott Baker43e9f2c2015-03-16 16:43:51 -0700440
441if __name__ == "__main__":
Scott Baker42f80bc2015-03-16 22:52:13 -0700442 if "--once" in sys.argv:
Scott Bakere98a1332015-03-21 15:04:30 -0700443 RRWatcher().run_once()
444 HpcProber().run_once()
445 HpcFetcher().run_once()
Scott Baker42f80bc2015-03-16 22:52:13 -0700446 else:
Scott Bakere98a1332015-03-21 15:04:30 -0700447 RRWatcher().start()
448 HpcProber().start()
449 HpcFetcher().start()
450
451 print "Running forever..."
452 while True:
453 time.sleep(60)
Scott Baker43e9f2c2015-03-16 16:43:51 -0700454