blob: d2efdccedc3d3d3b6191e77d529855cf13513027 [file] [log] [blame]
Sapan Bhatia4d6cd132016-01-15 10:43:19 -05001"""
2 hpc_watcher.py
3
4 Daemon to watch the health of HPC and RR instances.
5
6 This deamon uses HpcHealthCheck objects in the Data Model to conduct
7 periodic tests of HPC and RR nodes. Two types of Health Checks are
8 supported:
9
10 kind="dns": checks the request routers to make sure that a DNS
11 name is resolveable and returns the right kind of records.
12
13 resource_name should be set to the domain name to lookup.
14
15 result_contains is option and can be used to hold "A", "CNAME", or
16 a particular address or hostname that should be contained in the
17 query's answer.
18
19 kind="http": checks the hpc nodes to make sure that a URL can be
20 retrieved from the node.
21
22 resource_name should be set to the HostName:Url to fetch. For
23 example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
24
25 In addition to the above, HPC heartbeat probes are conducted, similar to
26 the ones that dnsredir conducts.
27
28 The results of health checks are stored in a tag attached to the Instance
29 the healthcheck was conducted against. If all healthchecks of a particular
30 variety were successful for a instance, then "success" will be stored in
31 the tag. Otherwise, the first healthcheck to fail will be stored in the
32 tag.
33
34 Ubuntu prereqs:
35 apt-get install python-pycurl
36 pip install dnslib
37"""
38
39import os
40import socket
41import sys
42sys.path.append("/opt/xos")
43os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
44import django
45from django.contrib.contenttypes.models import ContentType
46from core.models import *
47from services.hpc.models import *
48from services.requestrouter.models import *
49django.setup()
50import time
51import pycurl
52import traceback
53import json
54from StringIO import StringIO
55
56from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
57from dnslib.digparser import DigParser
58
59from threading import Thread, Condition
60
61"""
62from dnslib import *
63q = DNSRecord(q=DNSQuestion("cdn-stream.htm.fiu.edu"))
64a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
65a = DNSRecord.parse(a_pkt)
66
67from dnslib import *
68q = DNSRecord(q=DNSQuestion("onlab.vicci.org"))
69a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
70a = DNSRecord.parse(a_pkt)
71"""
72
73class WorkQueue:
74 def __init__(self):
75 self.job_cv = Condition()
76 self.jobs = []
77 self.result_cv = Condition()
78 self.results = []
79 self.outstanding = 0
80
81 def get_job(self):
82 self.job_cv.acquire()
83 while not self.jobs:
84 self.job_cv.wait()
85 result = self.jobs.pop()
86 self.job_cv.release()
87 return result
88
89 def submit_job(self, job):
90 self.job_cv.acquire()
91 self.jobs.append(job)
92 self.job_cv.notify()
93 self.job_cv.release()
94 self.outstanding = self.outstanding + 1
95
96 def get_result(self):
97 self.result_cv.acquire()
98 while not self.results:
99 self.result_cv.wait()
100 result = self.results.pop()
101 self.result_cv.release()
102 self.outstanding = self.outstanding - 1
103 return result
104
105 def submit_result(self, result):
106 self.result_cv.acquire()
107 self.results.append(result)
108 self.result_cv.notify()
109 self.result_cv.release()
110
111class DnsResolver(Thread):
112 def __init__(self, queue):
113 Thread.__init__(self)
114 self.queue = queue
115 self.daemon = True
116 self.start()
117
118 def run(self):
119 while True:
120 job = self.queue.get_job()
121 self.handle_job(job)
122 self.queue.submit_result(job)
123
124 def handle_job(self, job):
125 domain = job["domain"]
126 server = job["server"]
127 port = job["port"]
128 result_contains = job.get("result_contains", None)
129
130 try:
131 q = DNSRecord(q=DNSQuestion(domain)) #, getattr(QTYPE,"A")))
132
133 a_pkt = q.send(server, port, tcp=False, timeout=10)
134 a = DNSRecord.parse(a_pkt)
135
136 found_record = False
137 for record in a.rr:
138 if (not result_contains):
139 QTYPE_A = getattr(QTYPE,"A")
140 QTYPE_CNAME = getattr(QTYPE, "CNAME")
141 if ((record.rtype==QTYPE_A) or (record.qtype==QTYPE_CNAME)):
142 found_record = True
143 else:
144 tmp = QTYPE.get(record.rtype) + str(record.rdata)
145 if (result_contains in tmp):
146 found_record = True
147
148 if not found_record:
149 if result_contains:
150 job["status"] = "%s,No %s records" % (domain, result_contains)
151 else:
152 job["status"] = "%s,No A or CNAME records" % domain
153
154 return
155
156 except Exception, e:
157 job["status"] = "%s,Exception: %s" % (domain, str(e))
158 return
159
160 job["status"] = "success"
161
162class HpcHeartbeat(Thread):
163 def __init__(self, queue):
164 Thread.__init__(self)
165 self.queue = queue
166 self.daemon = True
167 self.start()
168
169 def run(self):
170 while True:
171 job = self.queue.get_job()
172 self.handle_job(job)
173 self.queue.submit_result(job)
174
175 def curl_error_message(self, e):
176 if e.args[0] == 6:
177 return "couldn't resolve host"
178 if e.args[0] == 7:
179 return "failed to connect"
180 return "curl error %d" % e.args[0]
181
182 def handle_job(self, job):
183 server = job["server"]
184 port = job["port"]
185
186 try:
187 buffer = StringIO()
188 c = pycurl.Curl()
189
190 c.setopt(c.URL, "http://%s:%s/heartbeat" % (server, port))
191 c.setopt(c.WRITEDATA, buffer)
192 c.setopt(c.HTTPHEADER, ['host: hpc-heartbeat', 'X-heartbeat: 1'])
193 c.setopt(c.TIMEOUT, 10)
194 c.setopt(c.CONNECTTIMEOUT, 10)
195 c.setopt(c.NOSIGNAL, 1)
196
197 try:
198 c.perform()
199 response_code = c.getinfo(c.RESPONSE_CODE)
200 except Exception, e:
201 #traceback.print_exc()
202 job["status"] = self.curl_error_message(e)
203 return
204 finally:
205 c.close()
206
207 if response_code != 200:
208 job["status"] = "error response %d" % response_code
209 return
210
211 except Exception, e:
212 job["status"] = "Exception: %s" % str(e)
213 return
214
215 job["status"] = "success"
216
217class HpcFetchUrl(Thread):
218 def __init__(self, queue):
219 Thread.__init__(self)
220 self.queue = queue
221 self.daemon = True
222 self.start()
223
224 def run(self):
225 while True:
226 job = self.queue.get_job()
227 self.handle_job(job)
228 self.queue.submit_result(job)
229
230 def curl_error_message(self, e):
231 if e.args[0] == 6:
232 return "couldn't resolve host"
233 if e.args[0] == 7:
234 return "failed to connect"
235 return "curl error %d" % e.args[0]
236
237 def handle_job(self, job):
238 server = job["server"]
239 port = job["port"]
240 url = job["url"]
241 domain = job["domain"]
242
243 def progress(download_t, download_d, upload_t, upload_d):
244 # limit download size to a megabyte
245 if (download_d > 1024*1024):
246 return 1
247 else:
248 return 0
249
250 try:
251 buffer = StringIO()
252 c = pycurl.Curl()
253
254 c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
255 c.setopt(c.WRITEDATA, buffer)
256 c.setopt(c.HTTPHEADER, ['host: ' + domain])
257 c.setopt(c.TIMEOUT, 10)
258 c.setopt(c.CONNECTTIMEOUT, 10)
259 c.setopt(c.NOSIGNAL, 1)
260 c.setopt(c.NOPROGRESS, 0)
261 c.setopt(c.PROGRESSFUNCTION, progress)
262
263 try:
264 try:
265 c.perform()
266 except Exception, e:
267 # prevent callback abort from raising exception
268 if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
269 raise
270 response_code = c.getinfo(c.RESPONSE_CODE)
271 bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
272 total_time = float(c.getinfo(c.TOTAL_TIME))
273 except Exception, e:
274 #traceback.print_exc()
275 job["status"] = self.curl_error_message(e)
276 return
277 finally:
278 c.close()
279
280 if response_code != 200:
281 job["status"] = "error response %s" % str(response_code)
282 return
283
284 except Exception, e:
285 #traceback.print_exc()
286 job["status"] = "Exception: %s" % str(e)
287 return
288
289 job["status"] = "success"
290 job["bytes_downloaded"] = bytes_downloaded
291 job["total_time"] = total_time
292
293class WatcherWorker(Thread):
294 def __init__(self, queue):
295 Thread.__init__(self)
296 self.queue = queue
297 self.daemon = True
298 self.start()
299
300 def run(self):
301 while True:
302 job = self.queue.get_job()
303 self.handle_job(job)
304 self.queue.submit_result(job)
305
306 def curl_error_message(self, e):
307 if e.args[0] == 6:
308 return "couldn't resolve host"
309 if e.args[0] == 7:
310 return "failed to connect"
311 return "curl error %d" % e.args[0]
312
313 def handle_job(self, job):
314 server = job["server"]
315 port = job["port"]
316
317 try:
318 buffer = StringIO()
319 c = pycurl.Curl()
320
321 c.setopt(c.URL, "http://%s:%s/" % (server, port))
322 c.setopt(c.WRITEDATA, buffer)
323 c.setopt(c.TIMEOUT, 10)
324 c.setopt(c.CONNECTTIMEOUT, 10)
325 c.setopt(c.NOSIGNAL, 1)
326
327 try:
328 c.perform()
329 response_code = c.getinfo(c.RESPONSE_CODE)
330 except Exception, e:
331 #traceback.print_exc()
332 job["status"] = json.dumps( {"status": self.curl_error_message(e)} )
333 return
334 finally:
335 c.close()
336
337 if response_code != 200:
338 job["status"] = json.dumps( {"status": "error response %d" % response_code} )
339 return
340
341 d = json.loads(buffer.getvalue())
342 d["status"] = "success";
343 job["status"] = json.dumps(d)
344
345 except Exception, e:
346 job["status"] = json.dumps( {"status": "Exception: %s" % str(e)} )
347 return
348
349class BaseWatcher(Thread):
350 def __init__(self):
351 Thread.__init__(self)
352 self.daemon = True
353
354 def get_public_ip(self, service, instance):
355 network_name = None
356 if "hpc" in instance.slice.name:
357 network_name = getattr(service, "watcher_hpc_network", None)
358 elif "demux" in instance.slice.name:
359 network_name = getattr(service, "watcher_dnsdemux_network", None)
360 elif "redir" in instance.slice.name:
361 network_name = getattr(service, "watcher_dnsredir_network", None)
362
363 if network_name and network_name.lower()=="nat":
364 return None
365
366 if (network_name is None) or (network_name=="") or (network_name.lower()=="public"):
367 return instance.get_public_ip()
368
369 for ns in instance.ports.all():
370 if (ns.ip) and (ns.network.name==network_name):
371 return ns.ip
372
373 raise ValueError("Couldn't find network %s" % str(network_name))
374
375 def set_status(self, instance, service, kind, msg, check_error=True):
376 #print instance.node.name, kind, msg
377 if check_error:
378 instance.has_error = (msg!="success")
379
380 instance_type = ContentType.objects.get_for_model(instance)
381
382 t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=instance_type.id, object_id=instance.id)
383 if t:
384 t=t[0]
385 if (t.value != msg):
386 t.value = msg
387 t.save()
388 else:
389 Tag(service=service, name=kind+".msg", content_object = instance, value=msg).save()
390
391 t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=instance_type.id, object_id=instance.id)
392 if t:
393 t=t[0]
394 t.value = str(time.time())
395 t.save()
396 else:
397 Tag(service=service, name=kind+".time", content_object = instance, value=str(time.time())).save()
398
399 def get_service_slices(self, service, kind=None):
400 try:
401 slices = service.slices.all()
402 except:
403 # buggy data model
404 slices = service.service.all()
405
406 if kind:
407 return [x for x in slices if (kind in x.name)]
408 else:
409 return list(slices)
410
411class RRWatcher(BaseWatcher):
412 def __init__(self):
413 BaseWatcher.__init__(self)
414
415 self.resolver_queue = WorkQueue()
416 for i in range(0,10):
417 DnsResolver(queue = self.resolver_queue)
418
419 def check_request_routers(self, service, instances):
420 for instance in instances:
421 instance.has_error = False
422
423 try:
424 ip = self.get_public_ip(service, instance)
425 except Exception, e:
426 self.set_status(instance, service, "watcher.DNS", "exception: %s" % str(e))
427 continue
428 if not ip:
429 try:
430 ip = socket.gethostbyname(instance.node.name)
431 except:
432 self.set_status(instance, service, "watcher.DNS", "dns resolution failure")
433 continue
434
435 if not ip:
436 self.set_status(instance, service, "watcher.DNS", "no IP address")
437 continue
438
439 checks = HpcHealthCheck.objects.filter(kind="dns")
440 if not checks:
441 self.set_status(instance, service, "watcher.DNS", "no DNS HealthCheck tests configured")
442
443 for check in checks:
444 self.resolver_queue.submit_job({"domain": check.resource_name, "server": ip, "port": 53, "instance": instance, "result_contains": check.result_contains})
445
446 while self.resolver_queue.outstanding > 0:
447 result = self.resolver_queue.get_result()
448 instance = result["instance"]
449 if (result["status"]!="success") and (not instance.has_error):
450 self.set_status(instance, service, "watcher.DNS", result["status"])
451
452 for instance in instances:
453 if not instance.has_error:
454 self.set_status(instance, service, "watcher.DNS", "success")
455
456 def run_once(self):
457 for hpcService in HpcService.objects.all():
458 for slice in self.get_service_slices(hpcService, "dnsdemux"):
459 self.check_request_routers(hpcService, slice.instances.all())
460
461 for rrService in RequestRouterService.objects.all():
462 for slice in self.get_service_slices(rrService, "dnsdemux"):
463 self.check_request_routers(rrService, slice.instances.all())
464
465 def run(self):
466 while True:
467 self.run_once()
468 time.sleep(10)
469
470 django.db.reset_queries()
471
472class HpcProber(BaseWatcher):
473 def __init__(self):
474 BaseWatcher.__init__(self)
475
476 self.heartbeat_queue = WorkQueue()
477 for i in range(0, 10):
478 HpcHeartbeat(queue = self.heartbeat_queue)
479
480 def probe_hpc(self, service, instances):
481 for instance in instances:
482 instance.has_error = False
483
484 self.heartbeat_queue.submit_job({"server": instance.node.name, "port": 8009, "instance": instance})
485
486 while self.heartbeat_queue.outstanding > 0:
487 result = self.heartbeat_queue.get_result()
488 instance = result["instance"]
489 if (result["status"]!="success") and (not instance.has_error):
490 self.set_status(instance, service, "watcher.HPC-hb", result["status"])
491
492 for instance in instances:
493 if not instance.has_error:
494 self.set_status(instance, service, "watcher.HPC-hb", "success")
495
496 def run_once(self):
497 for hpcService in HpcService.objects.all():
498 for slice in self.get_service_slices(hpcService, "hpc"):
499 self.probe_hpc(hpcService, slice.instances.all())
500
501 def run(self):
502 while True:
503 self.run_once()
504 time.sleep(10)
505
506 django.db.reset_queries()
507
508class HpcFetcher(BaseWatcher):
509 def __init__(self):
510 BaseWatcher.__init__(self)
511
512 self.fetch_queue = WorkQueue()
513 for i in range(0, 10):
514 HpcFetchUrl(queue = self.fetch_queue)
515
516 def fetch_hpc(self, service, instances):
517 for instance in instances:
518 instance.has_error = False
519 instance.url_status = []
520
521 checks = HpcHealthCheck.objects.filter(kind="http")
522 if not checks:
523 self.set_status(instance, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
524
525 for check in checks:
526 if (not check.resource_name) or (":" not in check.resource_name):
527 self.set_status(instance, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
528 break
529
530 (domain, url) = check.resource_name.split(":",1)
531
532 self.fetch_queue.submit_job({"server": instance.node.name, "port": 80, "instance": instance, "domain": domain, "url": url})
533
534 while self.fetch_queue.outstanding > 0:
535 result = self.fetch_queue.get_result()
536 instance = result["instance"]
537 if (result["status"] == "success"):
538 instance.url_status.append( (result["domain"] + result["url"], "success", result["bytes_downloaded"], result["total_time"]) )
539 if (result["status"]!="success") and (not instance.has_error):
540 self.set_status(instance, service, "watcher.HPC-fetch", result["status"])
541
542 for instance in instances:
543 self.set_status(instance, service, "watcher.HPC-fetch-urls", json.dumps(instance.url_status), check_error=False)
544 if not instance.has_error:
545 self.set_status(instance, service, "watcher.HPC-fetch", "success")
546
547 def run_once(self):
548 for hpcService in HpcService.objects.all():
549 for slice in self.get_service_slices(hpcService, "hpc"):
550 try:
551 self.fetch_hpc(hpcService, slice.instances.all())
552 except:
553 traceback.print_exc()
554
555 def run(self):
556 while True:
557 self.run_once()
558 time.sleep(10)
559
560 django.db.reset_queries()
561
562class WatcherFetcher(BaseWatcher):
563 def __init__(self):
564 BaseWatcher.__init__(self)
565
566 self.fetch_queue = WorkQueue()
567 for i in range(0, 10):
568 WatcherWorker(queue = self.fetch_queue)
569
570 def fetch_watcher(self, service, instances):
571 for instance in instances:
572 try:
573 ip = self.get_public_ip(service, instance)
574 except Exception, e:
575 self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "exception: %s" % str(e)}) )
576 continue
577 if not ip:
578 try:
579 ip = socket.gethostbyname(instance.node.name)
580 except:
581 self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "dns resolution failure"}) )
582 continue
583
584 if not ip:
585 self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "no IP address"}) )
586 continue
587
588 port = 8015
589 if ("redir" in instance.slice.name):
590 port = 8016
591 elif ("demux" in instance.slice.name):
592 port = 8017
593
594 self.fetch_queue.submit_job({"server": ip, "port": port, "instance": instance})
595
596 while self.fetch_queue.outstanding > 0:
597 result = self.fetch_queue.get_result()
598 instance = result["instance"]
599 self.set_status(instance, service, "watcher.watcher", result["status"])
600
601 def run_once(self):
602 for hpcService in HpcService.objects.all():
603 for slice in self.get_service_slices(hpcService):
604 self.fetch_watcher(hpcService, slice.instances.all())
605
606 def run(self):
607 while True:
608 self.run_once()
609 time.sleep(10)
610
611 django.db.reset_queries()
612
613
614if __name__ == "__main__":
615 if "--once" in sys.argv:
616 RRWatcher().run_once()
617 HpcProber().run_once()
618 HpcFetcher().run_once()
619 WatcherFetcher().run_once()
620 else:
621 RRWatcher().start()
622 HpcProber().start()
623 HpcFetcher().start()
624 WatcherFetcher().start()
625
626 print "Running forever..."
627 while True:
628 time.sleep(60)
629