blob: 5294e5900eeac9bcbb113a2baab3e44d261a21bd [file] [log] [blame]
Matteo Scandoloede125b2017-08-08 13:05:25 -07001
2# Copyright 2017-present Open Networking Foundation
3#
4# Licensed under the Apache License, Version 2.0 (the "License");
5# you may not use this file except in compliance with the License.
6# You may obtain a copy of the License at
7#
8# http://www.apache.org/licenses/LICENSE-2.0
9#
10# Unless required by applicable law or agreed to in writing, software
11# distributed under the License is distributed on an "AS IS" BASIS,
12# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13# See the License for the specific language governing permissions and
14# limitations under the License.
15
16
Scott Baker25467ff2016-08-04 09:50:22 -070017"""
18 hpc_watcher.py
19
20 Daemon to watch the health of HPC and RR instances.
21
22 This deamon uses HpcHealthCheck objects in the Data Model to conduct
23 periodic tests of HPC and RR nodes. Two types of Health Checks are
24 supported:
25
26 kind="dns": checks the request routers to make sure that a DNS
27 name is resolveable and returns the right kind of records.
28
29 resource_name should be set to the domain name to lookup.
30
31 result_contains is option and can be used to hold "A", "CNAME", or
32 a particular address or hostname that should be contained in the
33 query's answer.
34
35 kind="http": checks the hpc nodes to make sure that a URL can be
36 retrieved from the node.
37
38 resource_name should be set to the HostName:Url to fetch. For
39 example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
40
41 In addition to the above, HPC heartbeat probes are conducted, similar to
42 the ones that dnsredir conducts.
43
44 The results of health checks are stored in a tag attached to the Instance
45 the healthcheck was conducted against. If all healthchecks of a particular
46 variety were successful for a instance, then "success" will be stored in
47 the tag. Otherwise, the first healthcheck to fail will be stored in the
48 tag.
49
50 Ubuntu prereqs:
51 apt-get install python-pycurl
52 pip install dnslib
53"""
54
55import os
56import socket
57import sys
58sys.path.append("/opt/xos")
59os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
60import django
61from django.contrib.contenttypes.models import ContentType
62from core.models import *
63from services.hpc.models import *
Scott Baker6ecad3b2016-08-09 09:01:55 -070064#from services.requestrouter.models import *
Scott Baker25467ff2016-08-04 09:50:22 -070065django.setup()
66import time
67import pycurl
68import traceback
69import json
70from StringIO import StringIO
71
72from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
73from dnslib.digparser import DigParser
74
75from threading import Thread, Condition
76
77"""
78from dnslib import *
79q = DNSRecord(q=DNSQuestion("cdn-stream.htm.fiu.edu"))
80a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
81a = DNSRecord.parse(a_pkt)
82
83from dnslib import *
84q = DNSRecord(q=DNSQuestion("onlab.vicci.org"))
85a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
86a = DNSRecord.parse(a_pkt)
87"""
88
89class WorkQueue:
90 def __init__(self):
91 self.job_cv = Condition()
92 self.jobs = []
93 self.result_cv = Condition()
94 self.results = []
95 self.outstanding = 0
96
97 def get_job(self):
98 self.job_cv.acquire()
99 while not self.jobs:
100 self.job_cv.wait()
101 result = self.jobs.pop()
102 self.job_cv.release()
103 return result
104
105 def submit_job(self, job):
106 self.job_cv.acquire()
107 self.jobs.append(job)
108 self.job_cv.notify()
109 self.job_cv.release()
110 self.outstanding = self.outstanding + 1
111
112 def get_result(self):
113 self.result_cv.acquire()
114 while not self.results:
115 self.result_cv.wait()
116 result = self.results.pop()
117 self.result_cv.release()
118 self.outstanding = self.outstanding - 1
119 return result
120
121 def submit_result(self, result):
122 self.result_cv.acquire()
123 self.results.append(result)
124 self.result_cv.notify()
125 self.result_cv.release()
126
127class DnsResolver(Thread):
128 def __init__(self, queue):
129 Thread.__init__(self)
130 self.queue = queue
131 self.daemon = True
132 self.start()
133
134 def run(self):
135 while True:
136 job = self.queue.get_job()
137 self.handle_job(job)
138 self.queue.submit_result(job)
139
140 def handle_job(self, job):
141 domain = job["domain"]
142 server = job["server"]
143 port = job["port"]
144 result_contains = job.get("result_contains", None)
145
146 try:
147 q = DNSRecord(q=DNSQuestion(domain)) #, getattr(QTYPE,"A")))
148
149 a_pkt = q.send(server, port, tcp=False, timeout=10)
150 a = DNSRecord.parse(a_pkt)
151
152 found_record = False
153 for record in a.rr:
154 if (not result_contains):
155 QTYPE_A = getattr(QTYPE,"A")
156 QTYPE_CNAME = getattr(QTYPE, "CNAME")
157 if ((record.rtype==QTYPE_A) or (record.qtype==QTYPE_CNAME)):
158 found_record = True
159 else:
160 tmp = QTYPE.get(record.rtype) + str(record.rdata)
161 if (result_contains in tmp):
162 found_record = True
163
164 if not found_record:
165 if result_contains:
166 job["status"] = "%s,No %s records" % (domain, result_contains)
167 else:
168 job["status"] = "%s,No A or CNAME records" % domain
169
170 return
171
172 except Exception, e:
173 job["status"] = "%s,Exception: %s" % (domain, str(e))
174 return
175
176 job["status"] = "success"
177
178class HpcHeartbeat(Thread):
179 def __init__(self, queue):
180 Thread.__init__(self)
181 self.queue = queue
182 self.daemon = True
183 self.start()
184
185 def run(self):
186 while True:
187 job = self.queue.get_job()
188 self.handle_job(job)
189 self.queue.submit_result(job)
190
191 def curl_error_message(self, e):
192 if e.args[0] == 6:
193 return "couldn't resolve host"
194 if e.args[0] == 7:
195 return "failed to connect"
196 return "curl error %d" % e.args[0]
197
198 def handle_job(self, job):
199 server = job["server"]
200 port = job["port"]
201
202 try:
203 buffer = StringIO()
204 c = pycurl.Curl()
205
206 c.setopt(c.URL, "http://%s:%s/heartbeat" % (server, port))
207 c.setopt(c.WRITEDATA, buffer)
208 c.setopt(c.HTTPHEADER, ['host: hpc-heartbeat', 'X-heartbeat: 1'])
209 c.setopt(c.TIMEOUT, 10)
210 c.setopt(c.CONNECTTIMEOUT, 10)
211 c.setopt(c.NOSIGNAL, 1)
212
213 try:
214 c.perform()
215 response_code = c.getinfo(c.RESPONSE_CODE)
216 except Exception, e:
217 #traceback.print_exc()
218 job["status"] = self.curl_error_message(e)
219 return
220 finally:
221 c.close()
222
223 if response_code != 200:
224 job["status"] = "error response %d" % response_code
225 return
226
227 except Exception, e:
228 job["status"] = "Exception: %s" % str(e)
229 return
230
231 job["status"] = "success"
232
233class HpcFetchUrl(Thread):
234 def __init__(self, queue):
235 Thread.__init__(self)
236 self.queue = queue
237 self.daemon = True
238 self.start()
239
240 def run(self):
241 while True:
242 job = self.queue.get_job()
243 self.handle_job(job)
244 self.queue.submit_result(job)
245
246 def curl_error_message(self, e):
247 if e.args[0] == 6:
248 return "couldn't resolve host"
249 if e.args[0] == 7:
250 return "failed to connect"
251 return "curl error %d" % e.args[0]
252
253 def handle_job(self, job):
254 server = job["server"]
255 port = job["port"]
256 url = job["url"]
257 domain = job["domain"]
258
259 def progress(download_t, download_d, upload_t, upload_d):
260 # limit download size to a megabyte
261 if (download_d > 1024*1024):
262 return 1
263 else:
264 return 0
265
266 try:
267 buffer = StringIO()
268 c = pycurl.Curl()
269
270 c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
271 c.setopt(c.WRITEDATA, buffer)
272 c.setopt(c.HTTPHEADER, ['host: ' + domain])
273 c.setopt(c.TIMEOUT, 10)
274 c.setopt(c.CONNECTTIMEOUT, 10)
275 c.setopt(c.NOSIGNAL, 1)
276 c.setopt(c.NOPROGRESS, 0)
277 c.setopt(c.PROGRESSFUNCTION, progress)
278
279 try:
280 try:
281 c.perform()
282 except Exception, e:
283 # prevent callback abort from raising exception
284 if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
285 raise
286 response_code = c.getinfo(c.RESPONSE_CODE)
287 bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
288 total_time = float(c.getinfo(c.TOTAL_TIME))
289 except Exception, e:
290 #traceback.print_exc()
291 job["status"] = self.curl_error_message(e)
292 return
293 finally:
294 c.close()
295
296 if response_code != 200:
297 job["status"] = "error response %s" % str(response_code)
298 return
299
300 except Exception, e:
301 #traceback.print_exc()
302 job["status"] = "Exception: %s" % str(e)
303 return
304
305 job["status"] = "success"
306 job["bytes_downloaded"] = bytes_downloaded
307 job["total_time"] = total_time
308
309class WatcherWorker(Thread):
310 def __init__(self, queue):
311 Thread.__init__(self)
312 self.queue = queue
313 self.daemon = True
314 self.start()
315
316 def run(self):
317 while True:
318 job = self.queue.get_job()
319 self.handle_job(job)
320 self.queue.submit_result(job)
321
322 def curl_error_message(self, e):
323 if e.args[0] == 6:
324 return "couldn't resolve host"
325 if e.args[0] == 7:
326 return "failed to connect"
327 return "curl error %d" % e.args[0]
328
329 def handle_job(self, job):
330 server = job["server"]
331 port = job["port"]
332
333 try:
334 buffer = StringIO()
335 c = pycurl.Curl()
336
337 c.setopt(c.URL, "http://%s:%s/" % (server, port))
338 c.setopt(c.WRITEDATA, buffer)
339 c.setopt(c.TIMEOUT, 10)
340 c.setopt(c.CONNECTTIMEOUT, 10)
341 c.setopt(c.NOSIGNAL, 1)
342
343 try:
344 c.perform()
345 response_code = c.getinfo(c.RESPONSE_CODE)
346 except Exception, e:
347 #traceback.print_exc()
348 job["status"] = json.dumps( {"status": self.curl_error_message(e)} )
349 return
350 finally:
351 c.close()
352
353 if response_code != 200:
354 job["status"] = json.dumps( {"status": "error response %d" % response_code} )
355 return
356
357 d = json.loads(buffer.getvalue())
358 d["status"] = "success";
359 job["status"] = json.dumps(d)
360
361 except Exception, e:
362 job["status"] = json.dumps( {"status": "Exception: %s" % str(e)} )
363 return
364
365class BaseWatcher(Thread):
366 def __init__(self):
367 Thread.__init__(self)
368 self.daemon = True
369
370 def get_public_ip(self, service, instance):
371 network_name = None
372 if "hpc" in instance.slice.name:
373 network_name = getattr(service, "watcher_hpc_network", None)
374 elif "demux" in instance.slice.name:
375 network_name = getattr(service, "watcher_dnsdemux_network", None)
376 elif "redir" in instance.slice.name:
377 network_name = getattr(service, "watcher_dnsredir_network", None)
378
379 if network_name and network_name.lower()=="nat":
380 return None
381
382 if (network_name is None) or (network_name=="") or (network_name.lower()=="public"):
383 return instance.get_public_ip()
384
385 for ns in instance.ports.all():
386 if (ns.ip) and (ns.network.name==network_name):
387 return ns.ip
388
389 raise ValueError("Couldn't find network %s" % str(network_name))
390
391 def set_status(self, instance, service, kind, msg, check_error=True):
392 #print instance.node.name, kind, msg
393 if check_error:
394 instance.has_error = (msg!="success")
395
396 instance_type = ContentType.objects.get_for_model(instance)
397
398 t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=instance_type.id, object_id=instance.id)
399 if t:
400 t=t[0]
401 if (t.value != msg):
402 t.value = msg
403 t.save()
404 else:
405 Tag(service=service, name=kind+".msg", content_object = instance, value=msg).save()
406
407 t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=instance_type.id, object_id=instance.id)
408 if t:
409 t=t[0]
410 t.value = str(time.time())
411 t.save()
412 else:
413 Tag(service=service, name=kind+".time", content_object = instance, value=str(time.time())).save()
414
415 def get_service_slices(self, service, kind=None):
416 try:
417 slices = service.slices.all()
418 except:
419 # buggy data model
420 slices = service.service.all()
421
422 if kind:
423 return [x for x in slices if (kind in x.name)]
424 else:
425 return list(slices)
426
427class RRWatcher(BaseWatcher):
428 def __init__(self):
429 BaseWatcher.__init__(self)
430
431 self.resolver_queue = WorkQueue()
432 for i in range(0,10):
433 DnsResolver(queue = self.resolver_queue)
434
435 def check_request_routers(self, service, instances):
436 for instance in instances:
437 instance.has_error = False
438
439 try:
440 ip = self.get_public_ip(service, instance)
441 except Exception, e:
442 self.set_status(instance, service, "watcher.DNS", "exception: %s" % str(e))
443 continue
444 if not ip:
445 try:
446 ip = socket.gethostbyname(instance.node.name)
447 except:
448 self.set_status(instance, service, "watcher.DNS", "dns resolution failure")
449 continue
450
451 if not ip:
452 self.set_status(instance, service, "watcher.DNS", "no IP address")
453 continue
454
455 checks = HpcHealthCheck.objects.filter(kind="dns")
456 if not checks:
457 self.set_status(instance, service, "watcher.DNS", "no DNS HealthCheck tests configured")
458
459 for check in checks:
460 self.resolver_queue.submit_job({"domain": check.resource_name, "server": ip, "port": 53, "instance": instance, "result_contains": check.result_contains})
461
462 while self.resolver_queue.outstanding > 0:
463 result = self.resolver_queue.get_result()
464 instance = result["instance"]
465 if (result["status"]!="success") and (not instance.has_error):
466 self.set_status(instance, service, "watcher.DNS", result["status"])
467
468 for instance in instances:
469 if not instance.has_error:
470 self.set_status(instance, service, "watcher.DNS", "success")
471
472 def run_once(self):
473 for hpcService in HpcService.objects.all():
474 for slice in self.get_service_slices(hpcService, "dnsdemux"):
475 self.check_request_routers(hpcService, slice.instances.all())
476
Scott Baker6ecad3b2016-08-09 09:01:55 -0700477# for rrService in RequestRouterService.objects.all():
478# for slice in self.get_service_slices(rrService, "dnsdemux"):
479# self.check_request_routers(rrService, slice.instances.all())
Scott Baker25467ff2016-08-04 09:50:22 -0700480
481 def run(self):
482 while True:
483 self.run_once()
484 time.sleep(10)
485
486 django.db.reset_queries()
487
488class HpcProber(BaseWatcher):
489 def __init__(self):
490 BaseWatcher.__init__(self)
491
492 self.heartbeat_queue = WorkQueue()
493 for i in range(0, 10):
494 HpcHeartbeat(queue = self.heartbeat_queue)
495
496 def probe_hpc(self, service, instances):
497 for instance in instances:
498 instance.has_error = False
499
500 self.heartbeat_queue.submit_job({"server": instance.node.name, "port": 8009, "instance": instance})
501
502 while self.heartbeat_queue.outstanding > 0:
503 result = self.heartbeat_queue.get_result()
504 instance = result["instance"]
505 if (result["status"]!="success") and (not instance.has_error):
506 self.set_status(instance, service, "watcher.HPC-hb", result["status"])
507
508 for instance in instances:
509 if not instance.has_error:
510 self.set_status(instance, service, "watcher.HPC-hb", "success")
511
512 def run_once(self):
513 for hpcService in HpcService.objects.all():
514 for slice in self.get_service_slices(hpcService, "hpc"):
515 self.probe_hpc(hpcService, slice.instances.all())
516
517 def run(self):
518 while True:
519 self.run_once()
520 time.sleep(10)
521
522 django.db.reset_queries()
523
524class HpcFetcher(BaseWatcher):
525 def __init__(self):
526 BaseWatcher.__init__(self)
527
528 self.fetch_queue = WorkQueue()
529 for i in range(0, 10):
530 HpcFetchUrl(queue = self.fetch_queue)
531
532 def fetch_hpc(self, service, instances):
533 for instance in instances:
534 instance.has_error = False
535 instance.url_status = []
536
537 checks = HpcHealthCheck.objects.filter(kind="http")
538 if not checks:
539 self.set_status(instance, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
540
541 for check in checks:
542 if (not check.resource_name) or (":" not in check.resource_name):
543 self.set_status(instance, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
544 break
545
546 (domain, url) = check.resource_name.split(":",1)
547
548 self.fetch_queue.submit_job({"server": instance.node.name, "port": 80, "instance": instance, "domain": domain, "url": url})
549
550 while self.fetch_queue.outstanding > 0:
551 result = self.fetch_queue.get_result()
552 instance = result["instance"]
553 if (result["status"] == "success"):
554 instance.url_status.append( (result["domain"] + result["url"], "success", result["bytes_downloaded"], result["total_time"]) )
555 if (result["status"]!="success") and (not instance.has_error):
556 self.set_status(instance, service, "watcher.HPC-fetch", result["status"])
557
558 for instance in instances:
559 self.set_status(instance, service, "watcher.HPC-fetch-urls", json.dumps(instance.url_status), check_error=False)
560 if not instance.has_error:
561 self.set_status(instance, service, "watcher.HPC-fetch", "success")
562
563 def run_once(self):
564 for hpcService in HpcService.objects.all():
565 for slice in self.get_service_slices(hpcService, "hpc"):
566 try:
567 self.fetch_hpc(hpcService, slice.instances.all())
568 except:
569 traceback.print_exc()
570
571 def run(self):
572 while True:
573 self.run_once()
574 time.sleep(10)
575
576 django.db.reset_queries()
577
578class WatcherFetcher(BaseWatcher):
579 def __init__(self):
580 BaseWatcher.__init__(self)
581
582 self.fetch_queue = WorkQueue()
583 for i in range(0, 10):
584 WatcherWorker(queue = self.fetch_queue)
585
586 def fetch_watcher(self, service, instances):
587 for instance in instances:
588 try:
589 ip = self.get_public_ip(service, instance)
590 except Exception, e:
591 self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "exception: %s" % str(e)}) )
592 continue
593 if not ip:
594 try:
595 ip = socket.gethostbyname(instance.node.name)
596 except:
597 self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "dns resolution failure"}) )
598 continue
599
600 if not ip:
601 self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "no IP address"}) )
602 continue
603
604 port = 8015
605 if ("redir" in instance.slice.name):
606 port = 8016
607 elif ("demux" in instance.slice.name):
608 port = 8017
609
610 self.fetch_queue.submit_job({"server": ip, "port": port, "instance": instance})
611
612 while self.fetch_queue.outstanding > 0:
613 result = self.fetch_queue.get_result()
614 instance = result["instance"]
615 self.set_status(instance, service, "watcher.watcher", result["status"])
616
617 def run_once(self):
618 for hpcService in HpcService.objects.all():
619 for slice in self.get_service_slices(hpcService):
620 self.fetch_watcher(hpcService, slice.instances.all())
621
622 def run(self):
623 while True:
624 self.run_once()
625 time.sleep(10)
626
627 django.db.reset_queries()
628
629
630if __name__ == "__main__":
631 if "--once" in sys.argv:
632 RRWatcher().run_once()
633 HpcProber().run_once()
634 HpcFetcher().run_once()
635 WatcherFetcher().run_once()
636 else:
637 RRWatcher().start()
638 HpcProber().start()
639 HpcFetcher().start()
640 WatcherFetcher().start()
641
642 print "Running forever..."
643 while True:
644 time.sleep(60)
645