initial code migration from xos repo
Change-Id: I8c848929ec4583a7a18ba9da44095f8f688f96c0
diff --git a/xos/synchronizer/fsck.py b/xos/synchronizer/fsck.py
new file mode 100644
index 0000000..448bfb7
--- /dev/null
+++ b/xos/synchronizer/fsck.py
@@ -0,0 +1,79 @@
+#!/usr/bin/env python
+import argparse
+import imp
+import inspect
+import os
+import sys
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
+sys.path.append("/opt/xos")
+from xos.config import Config, DEFAULT_CONFIG_FN, XOS_DIR
+from xos.logger import Logger, logging
+from synchronizers.base.syncstep import SyncStep
+
+try:
+ from django import setup as django_setup # django 1.7
+except:
+ django_setup = False
+
+logger = Logger(level=logging.INFO)
+
+class XOSConsistencyCheck:
+ def __init__(self):
+ self.sync_steps = []
+ self.load_sync_step_modules()
+
+ def load_sync_step_modules(self, step_dir=None):
+ if step_dir is None:
+ if hasattr(Config(), "observer_steps_dir"):
+ step_dir = Config().observer_steps_dir
+ else:
+ step_dir = XOS_DIR+"/observer/steps"
+
+ for fn in os.listdir(step_dir):
+ pathname = os.path.join(step_dir,fn)
+ if os.path.isfile(pathname) and fn.endswith(".py") and (fn!="__init__.py"):
+ module = imp.load_source(fn[:-3],pathname)
+ for classname in dir(module):
+ c = getattr(module, classname, None)
+
+ # make sure 'c' is a descendent of SyncStep and has a
+ # provides field (this eliminates the abstract base classes
+ # since they don't have a provides)
+
+ if inspect.isclass(c) and issubclass(c, SyncStep) and hasattr(c,"provides") and (c not in self.sync_steps):
+ self.sync_steps.append(c)
+ logger.info('loaded sync steps: %s' % ",".join([x.__name__ for x in self.sync_steps]))
+
+ def run(self):
+ updated = True
+ while updated:
+ updated = False
+
+ for step in self.sync_steps:
+ if hasattr(step, "consistency_check"):
+ updated = updated or step(driver=None).consistency_check()
+
+ if updated:
+ logger.info('re-running consistency checks because something changed')
+
+def main():
+ if not "-C" in sys.argv:
+ print >> sys.stderr, "You probably wanted to use -C " + XOS_DIR + "/hpc_observer/hpc_observer_config"
+
+ # Generate command line parser
+ parser = argparse.ArgumentParser(usage='%(prog)s [options]')
+ # smbaker: util/config.py parses sys.argv[] directly to get config file name; include the option here to avoid
+ # throwing unrecognized argument exceptions
+ parser.add_argument('-C', '--config', dest='config_file', action='store', default=DEFAULT_CONFIG_FN,
+ help='Name of config file.')
+ args = parser.parse_args()
+
+ if django_setup: # 1.7
+ django_setup()
+
+ cc = XOSConsistencyCheck()
+ cc.run()
+
+if __name__ == '__main__':
+ main()
+
diff --git a/xos/synchronizer/hpc-synchronizer.py b/xos/synchronizer/hpc-synchronizer.py
new file mode 100755
index 0000000..84bec4f
--- /dev/null
+++ b/xos/synchronizer/hpc-synchronizer.py
@@ -0,0 +1,11 @@
+#!/usr/bin/env python
+
+# This imports and runs ../../xos-observer.py
+
+import importlib
+import os
+import sys
+observer_path = os.path.join(os.path.dirname(os.path.realpath(__file__)),"../../synchronizers/base")
+sys.path.append(observer_path)
+mod = importlib.import_module("xos-synchronizer")
+mod.main()
diff --git a/xos/synchronizer/hpc_synchronizer_config b/xos/synchronizer/hpc_synchronizer_config
new file mode 100644
index 0000000..9d4e70a
--- /dev/null
+++ b/xos/synchronizer/hpc_synchronizer_config
@@ -0,0 +1,36 @@
+
+[plc]
+name=plc
+deployment=VICCI
+
+[db]
+name=xos
+user=postgres
+password=password
+host=localhost
+port=5432
+
+[api]
+host=128.112.171.237
+port=8000
+ssl_key=None
+ssl_cert=None
+ca_ssl_cert=None
+ratelimit_enabled=0
+omf_enabled=0
+mail_support_address=support@localhost
+nova_enabled=True
+
+[observer]
+name=hpc
+dependency_graph=/opt/xos/synchronizers/hpc/model-deps
+steps_dir=/opt/xos/synchronizers/hpc/steps
+deleters_dir=/opt/xos/synchronizers/hpc/deleters
+log_file=console
+#/var/log/hpc.log
+driver=None
+#cmi_hostname=openclouddev0.internet2.edu
+
+[feefie]
+client_id='vicci_dev_central'
+user_id='pl'
diff --git a/xos/synchronizer/hpc_watcher.py b/xos/synchronizer/hpc_watcher.py
new file mode 100644
index 0000000..d2efdcc
--- /dev/null
+++ b/xos/synchronizer/hpc_watcher.py
@@ -0,0 +1,629 @@
+"""
+ hpc_watcher.py
+
+ Daemon to watch the health of HPC and RR instances.
+
+ This deamon uses HpcHealthCheck objects in the Data Model to conduct
+ periodic tests of HPC and RR nodes. Two types of Health Checks are
+ supported:
+
+ kind="dns": checks the request routers to make sure that a DNS
+ name is resolveable and returns the right kind of records.
+
+ resource_name should be set to the domain name to lookup.
+
+ result_contains is option and can be used to hold "A", "CNAME", or
+ a particular address or hostname that should be contained in the
+ query's answer.
+
+ kind="http": checks the hpc nodes to make sure that a URL can be
+ retrieved from the node.
+
+ resource_name should be set to the HostName:Url to fetch. For
+ example, cdn-stream.htm.fiu.edu:/hft2441/intro.mp4
+
+ In addition to the above, HPC heartbeat probes are conducted, similar to
+ the ones that dnsredir conducts.
+
+ The results of health checks are stored in a tag attached to the Instance
+ the healthcheck was conducted against. If all healthchecks of a particular
+ variety were successful for a instance, then "success" will be stored in
+ the tag. Otherwise, the first healthcheck to fail will be stored in the
+ tag.
+
+ Ubuntu prereqs:
+ apt-get install python-pycurl
+ pip install dnslib
+"""
+
+import os
+import socket
+import sys
+sys.path.append("/opt/xos")
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
+import django
+from django.contrib.contenttypes.models import ContentType
+from core.models import *
+from services.hpc.models import *
+from services.requestrouter.models import *
+django.setup()
+import time
+import pycurl
+import traceback
+import json
+from StringIO import StringIO
+
+from dnslib.dns import DNSRecord,DNSHeader,DNSQuestion,QTYPE
+from dnslib.digparser import DigParser
+
+from threading import Thread, Condition
+
+"""
+from dnslib import *
+q = DNSRecord(q=DNSQuestion("cdn-stream.htm.fiu.edu"))
+a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
+a = DNSRecord.parse(a_pkt)
+
+from dnslib import *
+q = DNSRecord(q=DNSQuestion("onlab.vicci.org"))
+a_pkt = q.send("150.135.65.10", tcp=False, timeout=10)
+a = DNSRecord.parse(a_pkt)
+"""
+
+class WorkQueue:
+ def __init__(self):
+ self.job_cv = Condition()
+ self.jobs = []
+ self.result_cv = Condition()
+ self.results = []
+ self.outstanding = 0
+
+ def get_job(self):
+ self.job_cv.acquire()
+ while not self.jobs:
+ self.job_cv.wait()
+ result = self.jobs.pop()
+ self.job_cv.release()
+ return result
+
+ def submit_job(self, job):
+ self.job_cv.acquire()
+ self.jobs.append(job)
+ self.job_cv.notify()
+ self.job_cv.release()
+ self.outstanding = self.outstanding + 1
+
+ def get_result(self):
+ self.result_cv.acquire()
+ while not self.results:
+ self.result_cv.wait()
+ result = self.results.pop()
+ self.result_cv.release()
+ self.outstanding = self.outstanding - 1
+ return result
+
+ def submit_result(self, result):
+ self.result_cv.acquire()
+ self.results.append(result)
+ self.result_cv.notify()
+ self.result_cv.release()
+
+class DnsResolver(Thread):
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.daemon = True
+ self.start()
+
+ def run(self):
+ while True:
+ job = self.queue.get_job()
+ self.handle_job(job)
+ self.queue.submit_result(job)
+
+ def handle_job(self, job):
+ domain = job["domain"]
+ server = job["server"]
+ port = job["port"]
+ result_contains = job.get("result_contains", None)
+
+ try:
+ q = DNSRecord(q=DNSQuestion(domain)) #, getattr(QTYPE,"A")))
+
+ a_pkt = q.send(server, port, tcp=False, timeout=10)
+ a = DNSRecord.parse(a_pkt)
+
+ found_record = False
+ for record in a.rr:
+ if (not result_contains):
+ QTYPE_A = getattr(QTYPE,"A")
+ QTYPE_CNAME = getattr(QTYPE, "CNAME")
+ if ((record.rtype==QTYPE_A) or (record.qtype==QTYPE_CNAME)):
+ found_record = True
+ else:
+ tmp = QTYPE.get(record.rtype) + str(record.rdata)
+ if (result_contains in tmp):
+ found_record = True
+
+ if not found_record:
+ if result_contains:
+ job["status"] = "%s,No %s records" % (domain, result_contains)
+ else:
+ job["status"] = "%s,No A or CNAME records" % domain
+
+ return
+
+ except Exception, e:
+ job["status"] = "%s,Exception: %s" % (domain, str(e))
+ return
+
+ job["status"] = "success"
+
+class HpcHeartbeat(Thread):
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.daemon = True
+ self.start()
+
+ def run(self):
+ while True:
+ job = self.queue.get_job()
+ self.handle_job(job)
+ self.queue.submit_result(job)
+
+ def curl_error_message(self, e):
+ if e.args[0] == 6:
+ return "couldn't resolve host"
+ if e.args[0] == 7:
+ return "failed to connect"
+ return "curl error %d" % e.args[0]
+
+ def handle_job(self, job):
+ server = job["server"]
+ port = job["port"]
+
+ try:
+ buffer = StringIO()
+ c = pycurl.Curl()
+
+ c.setopt(c.URL, "http://%s:%s/heartbeat" % (server, port))
+ c.setopt(c.WRITEDATA, buffer)
+ c.setopt(c.HTTPHEADER, ['host: hpc-heartbeat', 'X-heartbeat: 1'])
+ c.setopt(c.TIMEOUT, 10)
+ c.setopt(c.CONNECTTIMEOUT, 10)
+ c.setopt(c.NOSIGNAL, 1)
+
+ try:
+ c.perform()
+ response_code = c.getinfo(c.RESPONSE_CODE)
+ except Exception, e:
+ #traceback.print_exc()
+ job["status"] = self.curl_error_message(e)
+ return
+ finally:
+ c.close()
+
+ if response_code != 200:
+ job["status"] = "error response %d" % response_code
+ return
+
+ except Exception, e:
+ job["status"] = "Exception: %s" % str(e)
+ return
+
+ job["status"] = "success"
+
+class HpcFetchUrl(Thread):
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.daemon = True
+ self.start()
+
+ def run(self):
+ while True:
+ job = self.queue.get_job()
+ self.handle_job(job)
+ self.queue.submit_result(job)
+
+ def curl_error_message(self, e):
+ if e.args[0] == 6:
+ return "couldn't resolve host"
+ if e.args[0] == 7:
+ return "failed to connect"
+ return "curl error %d" % e.args[0]
+
+ def handle_job(self, job):
+ server = job["server"]
+ port = job["port"]
+ url = job["url"]
+ domain = job["domain"]
+
+ def progress(download_t, download_d, upload_t, upload_d):
+ # limit download size to a megabyte
+ if (download_d > 1024*1024):
+ return 1
+ else:
+ return 0
+
+ try:
+ buffer = StringIO()
+ c = pycurl.Curl()
+
+ c.setopt(c.URL, "http://%s:%s/%s" % (server, port, url))
+ c.setopt(c.WRITEDATA, buffer)
+ c.setopt(c.HTTPHEADER, ['host: ' + domain])
+ c.setopt(c.TIMEOUT, 10)
+ c.setopt(c.CONNECTTIMEOUT, 10)
+ c.setopt(c.NOSIGNAL, 1)
+ c.setopt(c.NOPROGRESS, 0)
+ c.setopt(c.PROGRESSFUNCTION, progress)
+
+ try:
+ try:
+ c.perform()
+ except Exception, e:
+ # prevent callback abort from raising exception
+ if (e.args[0] != pycurl.E_ABORTED_BY_CALLBACK):
+ raise
+ response_code = c.getinfo(c.RESPONSE_CODE)
+ bytes_downloaded = int(c.getinfo(c.SIZE_DOWNLOAD))
+ total_time = float(c.getinfo(c.TOTAL_TIME))
+ except Exception, e:
+ #traceback.print_exc()
+ job["status"] = self.curl_error_message(e)
+ return
+ finally:
+ c.close()
+
+ if response_code != 200:
+ job["status"] = "error response %s" % str(response_code)
+ return
+
+ except Exception, e:
+ #traceback.print_exc()
+ job["status"] = "Exception: %s" % str(e)
+ return
+
+ job["status"] = "success"
+ job["bytes_downloaded"] = bytes_downloaded
+ job["total_time"] = total_time
+
+class WatcherWorker(Thread):
+ def __init__(self, queue):
+ Thread.__init__(self)
+ self.queue = queue
+ self.daemon = True
+ self.start()
+
+ def run(self):
+ while True:
+ job = self.queue.get_job()
+ self.handle_job(job)
+ self.queue.submit_result(job)
+
+ def curl_error_message(self, e):
+ if e.args[0] == 6:
+ return "couldn't resolve host"
+ if e.args[0] == 7:
+ return "failed to connect"
+ return "curl error %d" % e.args[0]
+
+ def handle_job(self, job):
+ server = job["server"]
+ port = job["port"]
+
+ try:
+ buffer = StringIO()
+ c = pycurl.Curl()
+
+ c.setopt(c.URL, "http://%s:%s/" % (server, port))
+ c.setopt(c.WRITEDATA, buffer)
+ c.setopt(c.TIMEOUT, 10)
+ c.setopt(c.CONNECTTIMEOUT, 10)
+ c.setopt(c.NOSIGNAL, 1)
+
+ try:
+ c.perform()
+ response_code = c.getinfo(c.RESPONSE_CODE)
+ except Exception, e:
+ #traceback.print_exc()
+ job["status"] = json.dumps( {"status": self.curl_error_message(e)} )
+ return
+ finally:
+ c.close()
+
+ if response_code != 200:
+ job["status"] = json.dumps( {"status": "error response %d" % response_code} )
+ return
+
+ d = json.loads(buffer.getvalue())
+ d["status"] = "success";
+ job["status"] = json.dumps(d)
+
+ except Exception, e:
+ job["status"] = json.dumps( {"status": "Exception: %s" % str(e)} )
+ return
+
+class BaseWatcher(Thread):
+ def __init__(self):
+ Thread.__init__(self)
+ self.daemon = True
+
+ def get_public_ip(self, service, instance):
+ network_name = None
+ if "hpc" in instance.slice.name:
+ network_name = getattr(service, "watcher_hpc_network", None)
+ elif "demux" in instance.slice.name:
+ network_name = getattr(service, "watcher_dnsdemux_network", None)
+ elif "redir" in instance.slice.name:
+ network_name = getattr(service, "watcher_dnsredir_network", None)
+
+ if network_name and network_name.lower()=="nat":
+ return None
+
+ if (network_name is None) or (network_name=="") or (network_name.lower()=="public"):
+ return instance.get_public_ip()
+
+ for ns in instance.ports.all():
+ if (ns.ip) and (ns.network.name==network_name):
+ return ns.ip
+
+ raise ValueError("Couldn't find network %s" % str(network_name))
+
+ def set_status(self, instance, service, kind, msg, check_error=True):
+ #print instance.node.name, kind, msg
+ if check_error:
+ instance.has_error = (msg!="success")
+
+ instance_type = ContentType.objects.get_for_model(instance)
+
+ t = Tag.objects.filter(service=service, name=kind+".msg", content_type__pk=instance_type.id, object_id=instance.id)
+ if t:
+ t=t[0]
+ if (t.value != msg):
+ t.value = msg
+ t.save()
+ else:
+ Tag(service=service, name=kind+".msg", content_object = instance, value=msg).save()
+
+ t = Tag.objects.filter(service=service, name=kind+".time", content_type__pk=instance_type.id, object_id=instance.id)
+ if t:
+ t=t[0]
+ t.value = str(time.time())
+ t.save()
+ else:
+ Tag(service=service, name=kind+".time", content_object = instance, value=str(time.time())).save()
+
+ def get_service_slices(self, service, kind=None):
+ try:
+ slices = service.slices.all()
+ except:
+ # buggy data model
+ slices = service.service.all()
+
+ if kind:
+ return [x for x in slices if (kind in x.name)]
+ else:
+ return list(slices)
+
+class RRWatcher(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.resolver_queue = WorkQueue()
+ for i in range(0,10):
+ DnsResolver(queue = self.resolver_queue)
+
+ def check_request_routers(self, service, instances):
+ for instance in instances:
+ instance.has_error = False
+
+ try:
+ ip = self.get_public_ip(service, instance)
+ except Exception, e:
+ self.set_status(instance, service, "watcher.DNS", "exception: %s" % str(e))
+ continue
+ if not ip:
+ try:
+ ip = socket.gethostbyname(instance.node.name)
+ except:
+ self.set_status(instance, service, "watcher.DNS", "dns resolution failure")
+ continue
+
+ if not ip:
+ self.set_status(instance, service, "watcher.DNS", "no IP address")
+ continue
+
+ checks = HpcHealthCheck.objects.filter(kind="dns")
+ if not checks:
+ self.set_status(instance, service, "watcher.DNS", "no DNS HealthCheck tests configured")
+
+ for check in checks:
+ self.resolver_queue.submit_job({"domain": check.resource_name, "server": ip, "port": 53, "instance": instance, "result_contains": check.result_contains})
+
+ while self.resolver_queue.outstanding > 0:
+ result = self.resolver_queue.get_result()
+ instance = result["instance"]
+ if (result["status"]!="success") and (not instance.has_error):
+ self.set_status(instance, service, "watcher.DNS", result["status"])
+
+ for instance in instances:
+ if not instance.has_error:
+ self.set_status(instance, service, "watcher.DNS", "success")
+
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService, "dnsdemux"):
+ self.check_request_routers(hpcService, slice.instances.all())
+
+ for rrService in RequestRouterService.objects.all():
+ for slice in self.get_service_slices(rrService, "dnsdemux"):
+ self.check_request_routers(rrService, slice.instances.all())
+
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
+ django.db.reset_queries()
+
+class HpcProber(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.heartbeat_queue = WorkQueue()
+ for i in range(0, 10):
+ HpcHeartbeat(queue = self.heartbeat_queue)
+
+ def probe_hpc(self, service, instances):
+ for instance in instances:
+ instance.has_error = False
+
+ self.heartbeat_queue.submit_job({"server": instance.node.name, "port": 8009, "instance": instance})
+
+ while self.heartbeat_queue.outstanding > 0:
+ result = self.heartbeat_queue.get_result()
+ instance = result["instance"]
+ if (result["status"]!="success") and (not instance.has_error):
+ self.set_status(instance, service, "watcher.HPC-hb", result["status"])
+
+ for instance in instances:
+ if not instance.has_error:
+ self.set_status(instance, service, "watcher.HPC-hb", "success")
+
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService, "hpc"):
+ self.probe_hpc(hpcService, slice.instances.all())
+
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
+ django.db.reset_queries()
+
+class HpcFetcher(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.fetch_queue = WorkQueue()
+ for i in range(0, 10):
+ HpcFetchUrl(queue = self.fetch_queue)
+
+ def fetch_hpc(self, service, instances):
+ for instance in instances:
+ instance.has_error = False
+ instance.url_status = []
+
+ checks = HpcHealthCheck.objects.filter(kind="http")
+ if not checks:
+ self.set_status(instance, service, "watcher.HPC-fetch", "no HTTP HealthCheck tests configured")
+
+ for check in checks:
+ if (not check.resource_name) or (":" not in check.resource_name):
+ self.set_status(instance, service, "watcher.HPC-fetch", "malformed resource_name: " + str(check.resource_name))
+ break
+
+ (domain, url) = check.resource_name.split(":",1)
+
+ self.fetch_queue.submit_job({"server": instance.node.name, "port": 80, "instance": instance, "domain": domain, "url": url})
+
+ while self.fetch_queue.outstanding > 0:
+ result = self.fetch_queue.get_result()
+ instance = result["instance"]
+ if (result["status"] == "success"):
+ instance.url_status.append( (result["domain"] + result["url"], "success", result["bytes_downloaded"], result["total_time"]) )
+ if (result["status"]!="success") and (not instance.has_error):
+ self.set_status(instance, service, "watcher.HPC-fetch", result["status"])
+
+ for instance in instances:
+ self.set_status(instance, service, "watcher.HPC-fetch-urls", json.dumps(instance.url_status), check_error=False)
+ if not instance.has_error:
+ self.set_status(instance, service, "watcher.HPC-fetch", "success")
+
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService, "hpc"):
+ try:
+ self.fetch_hpc(hpcService, slice.instances.all())
+ except:
+ traceback.print_exc()
+
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
+ django.db.reset_queries()
+
+class WatcherFetcher(BaseWatcher):
+ def __init__(self):
+ BaseWatcher.__init__(self)
+
+ self.fetch_queue = WorkQueue()
+ for i in range(0, 10):
+ WatcherWorker(queue = self.fetch_queue)
+
+ def fetch_watcher(self, service, instances):
+ for instance in instances:
+ try:
+ ip = self.get_public_ip(service, instance)
+ except Exception, e:
+ self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "exception: %s" % str(e)}) )
+ continue
+ if not ip:
+ try:
+ ip = socket.gethostbyname(instance.node.name)
+ except:
+ self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "dns resolution failure"}) )
+ continue
+
+ if not ip:
+ self.set_status(instance, service, "watcher.watcher", json.dumps({"status": "no IP address"}) )
+ continue
+
+ port = 8015
+ if ("redir" in instance.slice.name):
+ port = 8016
+ elif ("demux" in instance.slice.name):
+ port = 8017
+
+ self.fetch_queue.submit_job({"server": ip, "port": port, "instance": instance})
+
+ while self.fetch_queue.outstanding > 0:
+ result = self.fetch_queue.get_result()
+ instance = result["instance"]
+ self.set_status(instance, service, "watcher.watcher", result["status"])
+
+ def run_once(self):
+ for hpcService in HpcService.objects.all():
+ for slice in self.get_service_slices(hpcService):
+ self.fetch_watcher(hpcService, slice.instances.all())
+
+ def run(self):
+ while True:
+ self.run_once()
+ time.sleep(10)
+
+ django.db.reset_queries()
+
+
+if __name__ == "__main__":
+ if "--once" in sys.argv:
+ RRWatcher().run_once()
+ HpcProber().run_once()
+ HpcFetcher().run_once()
+ WatcherFetcher().run_once()
+ else:
+ RRWatcher().start()
+ HpcProber().start()
+ HpcFetcher().start()
+ WatcherFetcher().start()
+
+ print "Running forever..."
+ while True:
+ time.sleep(60)
+
diff --git a/xos/synchronizer/hpclib.py b/xos/synchronizer/hpclib.py
new file mode 100644
index 0000000..bb1c263
--- /dev/null
+++ b/xos/synchronizer/hpclib.py
@@ -0,0 +1,126 @@
+import os
+import base64
+import string
+import sys
+import xmlrpclib
+
+if __name__ == '__main__':
+ sys.path.append("/opt/xos")
+ os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
+
+from xos.config import Config
+from core.models import Service
+from services.hpc.models import HpcService
+from services.requestrouter.models import RequestRouterService
+from xos.logger import Logger, logging
+
+logger = Logger(level=logging.INFO)
+
+class APIHelper:
+ def __init__(self, proxy, auth, method=None):
+ self.proxy = proxy
+ self.auth = auth
+ self.method = method
+
+ def __getattr__(self, name):
+ if name.startswith("_"):
+ return getattr(self, name)
+ else:
+ return APIHelper(self.proxy, self.auth, name)
+
+ def __call__(self, *args):
+ method = getattr(self.proxy, self.method)
+ return method(self.auth, *args)
+
+class CmiClient:
+ def __init__(self, hostname, port=8003, username="apiuser", password="apiuser"):
+ self.connect_api(hostname, port, username, password)
+
+ def connect_api(self, hostname, port=8003, username="apiuser", password="apiuser"):
+ #print "https://%s:%d/COAPI/" % (hostname, port)
+ cob = xmlrpclib.ServerProxy("https://%s:%d/COAPI/" % (hostname, port), allow_none=True)
+ cob_auth = {}
+ cob_auth["Username"] = username
+ cob_auth["AuthString"] = password
+ cob_auth["AuthMethod"] = "password"
+
+ onev = xmlrpclib.ServerProxy("https://%s:%d/ONEV_API/" % (hostname, port), allow_none=True)
+ onev_auth = {}
+ onev_auth["Username"] = username
+ onev_auth["AuthString"] = password
+ onev_auth["AuthMethod"] = "password"
+
+ self.cob = APIHelper(cob, cob_auth)
+ self.onev = APIHelper(onev, onev_auth)
+
+class HpcLibrary:
+ def __init__(self):
+ self._client = None
+
+ def make_account_name(self, x):
+ x=x.lower()
+ y = ""
+ for c in x:
+ if (c in (string.lowercase + string.digits)):
+ y = y + c
+ return y[:20]
+
+ def get_hpc_service(self):
+ hpc_service_name = getattr(Config(), "observer_hpc_service", None)
+ if hpc_service_name:
+ hpc_service = HpcService.objects.filter(name = hpc_service_name)
+ else:
+ hpc_service = HpcService.objects.all()
+
+ if not hpc_service:
+ if hpc_service_name:
+ raise Exception("No HPC Service with name %s" % hpc_service_name)
+ else:
+ raise Exception("No HPC Services")
+ hpc_service = hpc_service[0]
+
+ return hpc_service
+
+ def get_cmi_hostname(self, hpc_service=None):
+ if getattr(Config(),"observer_cmi_hostname",None):
+ return getattr(Config(),"observer_cmi_hostname")
+
+ if (hpc_service is None):
+ hpc_service = self.get_hpc_service()
+
+ if hpc_service.cmi_hostname:
+ return hpc_service.cmi_hostname
+
+ try:
+ slices = hpc_service.slices.all()
+ except:
+ # deal with buggy data model
+ slices = hpc_service.service.all()
+
+ for slice in slices:
+ if slice.name.endswith("cmi"):
+ for instance in slice.instances.all():
+ if instance.node:
+ return instance.node.name
+
+ raise Exception("Failed to find a CMI instance")
+
+ @property
+ def client(self):
+ if self._client is None:
+ self._client = CmiClient(self.get_cmi_hostname())
+ return self._client
+
+if __name__ == '__main__':
+ import django
+ django.setup()
+
+ lib = HpcLibrary()
+
+ print "testing API connection to", lib.get_cmi_hostname()
+ lib.client.cob.GetNewObjects()
+ lib.client.onev.ListAll("CDN")
+
+
+
+
diff --git a/xos/synchronizer/manifest b/xos/synchronizer/manifest
new file mode 100644
index 0000000..f0686a7
--- /dev/null
+++ b/xos/synchronizer/manifest
@@ -0,0 +1,17 @@
+manifest
+hpc_watcher.py
+steps/sync_hpcservices.py
+steps/sync_serviceprovider.py
+steps/sync_contentprovider.py
+steps/sync_cdnprefix.py
+steps/sync_originserver.py
+steps/garbage_collector.py
+steps/sync_sitemap.py
+hpc_synchronizer_config
+start.sh
+stop.sh
+hpc-synchronizer.py
+model-deps
+run.sh
+fsck.py
+hpclib.py
diff --git a/xos/synchronizer/model-deps b/xos/synchronizer/model-deps
new file mode 100644
index 0000000..63188f0
--- /dev/null
+++ b/xos/synchronizer/model-deps
@@ -0,0 +1,19 @@
+{
+ "OriginServer": [
+ "ContentProvider"
+ ],
+ "ContentProvider": [
+ "ServiceProvider"
+ ],
+ "CDNPrefix": [
+ "ContentProvider"
+ ],
+ "AccessMap": [
+ "ContentProvider"
+ ],
+ "SiteMap": [
+ "ContentProvider",
+ "ServiceProvider",
+ "CDNPrefix"
+ ]
+}
diff --git a/xos/synchronizer/run.sh b/xos/synchronizer/run.sh
new file mode 100755
index 0000000..9d22047
--- /dev/null
+++ b/xos/synchronizer/run.sh
@@ -0,0 +1,2 @@
+export XOS_DIR=/opt/xos
+python hpc-synchronizer.py -C $XOS_DIR/synchronizers/hpc/hpc_synchronizer_config
diff --git a/xos/synchronizer/start.sh b/xos/synchronizer/start.sh
new file mode 100755
index 0000000..3153a7d
--- /dev/null
+++ b/xos/synchronizer/start.sh
@@ -0,0 +1,2 @@
+export XOS_DIR=/opt/xos
+nohup python hpc-synchronizer.py -C $XOS_DIR/synchronizers/hpc/hpc_synchronizer_config > /dev/null 2>&1 &
diff --git a/xos/synchronizer/steps/garbage_collector.py b/xos/synchronizer/steps/garbage_collector.py
new file mode 100644
index 0000000..658f7a1
--- /dev/null
+++ b/xos/synchronizer/steps/garbage_collector.py
@@ -0,0 +1,67 @@
+import os
+import sys
+import base64
+import traceback
+from collections import defaultdict
+from django.db.models import F, Q
+from xos.config import Config
+from xos.logger import Logger, logging
+from synchronizers.base.syncstep import SyncStep
+from services.hpc.models import ServiceProvider, ContentProvider, CDNPrefix, OriginServer
+from core.models import *
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class GarbageCollector(SyncStep, HpcLibrary):
+# requested_interval = 86400
+ requested_interval = 0
+ provides=[]
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def call(self, **args):
+ logger.info("running garbage collector")
+ try:
+ self.gc_originservers()
+ self.gc_cdnprefixes()
+ self.gc_contentproviders()
+ self.gc_serviceproviders()
+ except:
+ traceback.print_exc()
+
+ def gc_onev(self, ps_class, ps_idField, onev_className, onev_idField):
+ # get the CMI's objects
+ onev_objs = self.client.onev.ListAll(onev_className)
+
+ # get the data model's objects,
+ ps_objs = ps_class.objects.filter(enacted__isnull=False)
+ ps_ids = [str(getattr(x,ps_idField,None)) for x in ps_objs]
+
+ # for each onev object, if it's id does not exist in a data model
+ # object, then delete it.
+ for onev_obj in onev_objs:
+ onev_id = onev_obj[onev_idField]
+ if str(onev_id) not in ps_ids:
+ logger.info("garbage collecting %s %s" % (onev_className, str(onev_id)))
+ self.client.onev.Delete(onev_className, onev_id)
+
+ def gc_originservers(self):
+ self.gc_onev(OriginServer, "origin_server_id", "OriginServer", "origin_server_id")
+
+ def gc_cdnprefixes(self):
+ self.gc_onev(CDNPrefix, "cdn_prefix_id", "CDNPrefix", "cdn_prefix_id")
+
+ def gc_contentproviders(self):
+ self.gc_onev(ContentProvider, "content_provider_id", "ContentProvider", "content_provider_id")
+
+ def gc_serviceproviders(self):
+ self.gc_onev(ServiceProvider, "service_provider_id", "ServiceProvider", "service_provider_id")
+
diff --git a/xos/synchronizer/steps/sync_cdnprefix.py b/xos/synchronizer/steps/sync_cdnprefix.py
new file mode 100644
index 0000000..eff3b5d
--- /dev/null
+++ b/xos/synchronizer/steps/sync_cdnprefix.py
@@ -0,0 +1,101 @@
+import os
+import sys
+import base64
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from core.models import Service
+from services.hpc.models import ServiceProvider, ContentProvider, CDNPrefix
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class SyncCDNPrefix(SyncStep, HpcLibrary):
+ provides=[CDNPrefix]
+ observes=CDNPrefix
+ requested_interval=0
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def filter_hpc_service(self, objs):
+ hpcService = self.get_hpc_service()
+
+ return [x for x in objs if x.contentProvider.serviceProvider.hpcService == hpcService]
+
+ def fetch_pending(self, deleted):
+ #self.consistency_check()
+
+ return self.filter_hpc_service(SyncStep.fetch_pending(self, deleted))
+
+ def consistency_check(self):
+ # set to true if something changed
+ result=False
+
+ # sanity check to make sure our PS objects have CMI objects behind them
+ all_p_ids = [x["cdn_prefix_id"] for x in self.client.onev.ListAll("CDNPrefix")]
+
+ all_p_ids = []
+ all_origins = {}
+ for x in self.client.onev.ListAll("CDNPrefix"):
+ id = x["cdn_prefix_id"]
+ all_p_ids.append(id)
+ all_origins[id] = x.get("default_origin_server", None)
+
+ for p in CDNPrefix.objects.all():
+ if (p.cdn_prefix_id is None):
+ continue
+
+ if (p.cdn_prefix_id not in all_p_ids):
+ logger.info("CDN Prefix %s was not found on CMI" % p.cdn_prefix_id)
+ p.cdn_prefix_id=None
+ p.save()
+ result = True
+
+ if (p.defaultOriginServer!=None) and (all_origins.get(p.cdn_prefix_id,None) != p.defaultOriginServer.url):
+ logger.info("CDN Prefix %s does not have default origin server on CMI" % str(p))
+ p.save() # this will set updated>enacted and force observer to re-sync
+ result = True
+
+ return result
+
+ def sync_record(self, cp):
+ logger.info("sync'ing cdn prefix %s" % str(cp),extra=cp.tologdict())
+
+ if (not cp.contentProvider) or (not cp.contentProvider.content_provider_id):
+ raise Exception("CDN Prefix %s is linked to a contentProvider without an id" % str(cp))
+
+ cpid = cp.contentProvider.content_provider_id
+
+ cp_dict = {"service": "HyperCache", "enabled": cp.enabled, "content_provider_id": cpid, "cdn_prefix": cp.prefix}
+
+ if cp.defaultOriginServer and cp.defaultOriginServer.url:
+ if (not cp.defaultOriginServer.origin_server_id):
+ # It's probably a bad idea to try to set defaultOriginServer before
+ # we've crated defaultOriginServer.
+ raise Exception("cdn prefix %s is waiting for it's default origin server to get an id" % str(cp))
+
+ cp_dict["default_origin_server"] = cp.defaultOriginServer.url
+
+ #print cp_dict
+
+ if not cp.cdn_prefix_id:
+ id = self.client.onev.Create("CDNPrefix", cp_dict)
+ cp.cdn_prefix_id = id
+ else:
+ del cp_dict["content_provider_id"] # this can't be updated
+ del cp_dict["cdn_prefix"] # this can't be updated either
+ self.client.onev.Update("CDNPrefix", cp.cdn_prefix_id, cp_dict)
+
+ cp.save()
+
+ def delete_record(self, m):
+ if m.cdn_prefix_id is not None:
+ self.client.onev.Delete("CDNPrefix", m.cdn_prefix_id)
diff --git a/xos/synchronizer/steps/sync_contentprovider.py b/xos/synchronizer/steps/sync_contentprovider.py
new file mode 100644
index 0000000..3e30ed3
--- /dev/null
+++ b/xos/synchronizer/steps/sync_contentprovider.py
@@ -0,0 +1,78 @@
+import os
+import sys
+import base64
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from core.models import Service
+from services.hpc.models import ServiceProvider, ContentProvider
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class SyncContentProvider(SyncStep, HpcLibrary):
+ provides=[ContentProvider]
+ observes=ContentProvider
+ requested_interval=0
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def filter_hpc_service(self, objs):
+ hpcService = self.get_hpc_service()
+
+ return [x for x in objs if x.serviceProvider.hpcService == hpcService]
+
+ def fetch_pending(self, deleted):
+ #self.consistency_check()
+
+ return self.filter_hpc_service(SyncStep.fetch_pending(self, deleted))
+
+ def consistency_check(self):
+ # set to true if something changed
+ result=False
+
+ # sanity check to make sure our PS objects have CMI objects behind them
+ all_cp_ids = [x["content_provider_id"] for x in self.client.onev.ListAll("ContentProvider")]
+ for cp in ContentProvider.objects.all():
+ if (cp.content_provider_id is not None) and (cp.content_provider_id not in all_cp_ids):
+ logger.info("Content provider %s was not found on CMI" % cp.content_provider_id)
+ cp.content_provider_id=None
+ cp.save()
+ result = True
+
+ return result
+
+ def sync_record(self, cp):
+ logger.info("sync'ing content provider %s" % str(cp), extra=cp.tologdict())
+ account_name = self.make_account_name(cp.name)
+
+ if (not cp.serviceProvider) or (not cp.serviceProvider.service_provider_id):
+ raise Exception("ContentProvider %s is linked to a serviceProvider with no id" % str(cp))
+
+ spid = cp.serviceProvider.service_provider_id
+
+ cp_dict = {"account": account_name, "name": cp.name, "enabled": cp.enabled}
+
+ #print cp_dict
+
+ if not cp.content_provider_id:
+ cp_dict["service_provider_id"] = spid
+ id = self.client.onev.Create("ContentProvider", cp_dict)
+ cp.content_provider_id = id
+ else:
+ self.client.onev.Update("ContentProvider", cp.content_provider_id, cp_dict)
+
+ cp.save()
+
+ def delete_record(self, m):
+ if m.content_provider_id is not None:
+ self.client.onev.Delete("ContentProvider", m.content_provider_id)
+
diff --git a/xos/synchronizer/steps/sync_hpcservices.py b/xos/synchronizer/steps/sync_hpcservices.py
new file mode 100644
index 0000000..63bf19b
--- /dev/null
+++ b/xos/synchronizer/steps/sync_hpcservices.py
@@ -0,0 +1,43 @@
+import os
+import sys
+import base64
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from core.models import Service
+from services.hpc.models import HpcService
+from services.requestrouter.models import RequestRouterService
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class SyncHpcService(SyncStep, HpcLibrary):
+ provides=[HpcService]
+ observes=HpcService
+ requested_interval=0
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def filter_hpc_service(self, objs):
+ hpcService = self.get_hpc_service()
+
+ return [x for x in objs if x == hpcService]
+
+ def fetch_pending(self, deleted):
+ # Looks like deletion is not supported for this object - Sapan
+ if (deleted):
+ return []
+ else:
+ return self.filter_hpc_service(HpcService.objects.filter(Q(enacted__lt=F('updated')) | Q(enacted=None)))
+
+ def sync_record(self, hpc_service):
+ logger.info("sync'ing hpc_service %s" % str(hpc_service),extra=hpc_service.tologdict())
+ hpc_service.save()
diff --git a/xos/synchronizer/steps/sync_originserver.py b/xos/synchronizer/steps/sync_originserver.py
new file mode 100644
index 0000000..bd5b227
--- /dev/null
+++ b/xos/synchronizer/steps/sync_originserver.py
@@ -0,0 +1,92 @@
+import os
+import sys
+import base64
+
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from core.models import Service
+from services.hpc.models import ServiceProvider, ContentProvider, CDNPrefix, OriginServer
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class SyncOriginServer(SyncStep, HpcLibrary):
+ provides=[OriginServer]
+ observes=OriginServer
+ requested_interval=0
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def filter_hpc_service(self, objs):
+ hpcService = self.get_hpc_service()
+
+ return [x for x in objs if x.contentProvider.serviceProvider.hpcService == hpcService]
+
+ def fetch_pending(self, deleted):
+ #self.consistency_check()
+
+ return self.filter_hpc_service(SyncStep.fetch_pending(self, deleted))
+
+ def consistency_check(self):
+ # set to true if something changed
+ result=False
+
+ # sanity check to make sure our PS objects have CMI objects behind them
+ all_ors_ids = [x["origin_server_id"] for x in self.client.onev.ListAll("OriginServer")]
+ for ors in OriginServer.objects.all():
+ if (ors.origin_server_id is not None) and (ors.origin_server_id not in all_ors_ids):
+ # we have an origin server ID, but it doesn't exist in the CMI
+ # something went wrong
+ # start over
+ logger.info("origin server %s was not found on CMI" % ors.origin_server_id)
+ ors.origin_server_id=None
+ ors.save()
+ result = True
+
+ return result
+
+ def sync_record(self, ors):
+ logger.info("sync'ing origin server %s" % str(ors),extra=ors.tologdict())
+
+ if (not ors.contentProvider) or (not ors.contentProvider.content_provider_id):
+ raise Exception("Origin Server %s is linked to a contentProvider with no id" % str(ors))
+
+ cpid = ors.contentProvider.content_provider_id
+
+ # validation requires URL start with http://
+ url = ors.url
+ if not url.startswith("http://"):
+ url = "http://" + url
+
+ ors_dict = {"authenticated_content": ors.authenticated, "zone_redirects": ors.redirects, "content_provider_id": cpid, "url": url, "service_type": "HyperCache", "caching_type": "Optimistic", "description": ors.description}
+ if not ors_dict["description"]:
+ ors_dict["description"] = "blank"
+
+ #print os_dict
+
+ if not ors.origin_server_id:
+ id = self.client.onev.Create("OriginServer", ors_dict)
+ ors.origin_server_id = id
+ else:
+ self.client.onev.Update("OriginServer", ors.origin_server_id, ors_dict)
+
+ # ... something breaks (analytics) if the URL starts with http://, so we
+ # change it in cob after we added it via onev.
+ url = url[7:]
+ self.client.cob.UpdateContent(ors.origin_server_id, {"url": url})
+
+ ors.silent = True
+ ors.save()
+
+ def delete_record(self, m):
+ if m.origin_server_id is not None:
+ self.client.onev.Delete("OriginServer", m.origin_server_id)
diff --git a/xos/synchronizer/steps/sync_serviceprovider.py b/xos/synchronizer/steps/sync_serviceprovider.py
new file mode 100644
index 0000000..af6d685
--- /dev/null
+++ b/xos/synchronizer/steps/sync_serviceprovider.py
@@ -0,0 +1,67 @@
+import os
+import sys
+import base64
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from core.models import Service
+from services.hpc.models import ServiceProvider
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class SyncServiceProvider(SyncStep, HpcLibrary):
+ provides=[ServiceProvider]
+ observes=ServiceProvider
+ requested_interval=0
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def filter_hpc_service(self, objs):
+ hpcService = self.get_hpc_service()
+
+ return [x for x in objs if x.hpcService == hpcService]
+
+ def fetch_pending(self, deleted):
+ #self.consistency_check()
+
+ return self.filter_hpc_service(SyncStep.fetch_pending(self, deleted))
+
+ def consistency_check(self):
+ # set to true if something changed
+ result=False
+
+ # sanity check to make sure our PS objects have CMI objects behind them
+ all_sp_ids = [x["service_provider_id"] for x in self.client.onev.ListAll("ServiceProvider")]
+ for sp in ServiceProvider.objects.all():
+ if (sp.service_provider_id is not None) and (sp.service_provider_id not in all_sp_ids):
+ logger.info("Service provider %s was not found on CMI" % sp.service_provider_id)
+ sp.service_provider_id=None
+ sp.save()
+ result = True
+
+ return result
+
+ def sync_record(self, sp):
+ logger.info("sync'ing service provider %s" % str(sp),extra=sp.tologdict())
+ account_name = self.make_account_name(sp.name)
+ sp_dict = {"account": account_name, "name": sp.name, "enabled": sp.enabled}
+ if not sp.service_provider_id:
+ id = self.client.onev.Create("ServiceProvider", sp_dict)
+ sp.service_provider_id = id
+ else:
+ self.client.onev.Update("ServiceProvider", sp.service_provider_id, sp_dict)
+
+ sp.save()
+
+ def delete_record(self, m):
+ if m.service_provider_id is not None:
+ self.client.onev.Delete("ServiceProvider", m.service_provider_id)
diff --git a/xos/synchronizer/steps/sync_sitemap.py b/xos/synchronizer/steps/sync_sitemap.py
new file mode 100644
index 0000000..a1d177b
--- /dev/null
+++ b/xos/synchronizer/steps/sync_sitemap.py
@@ -0,0 +1,118 @@
+import os
+import sys
+import base64
+from django.db.models import F, Q
+from xos.config import Config
+from synchronizers.base.syncstep import SyncStep
+from core.models import Service
+from services.hpc.models import ServiceProvider, ContentProvider, CDNPrefix, SiteMap
+from xos.logger import Logger, logging
+
+# hpclibrary will be in steps/..
+parentdir = os.path.join(os.path.dirname(__file__),"..")
+sys.path.insert(0,parentdir)
+
+from hpclib import HpcLibrary
+
+logger = Logger(level=logging.INFO)
+
+class SyncSiteMap(SyncStep, HpcLibrary):
+ provides=[SiteMap]
+ observes=SiteMap
+ requested_interval=0
+
+ def __init__(self, **args):
+ SyncStep.__init__(self, **args)
+ HpcLibrary.__init__(self)
+
+ def filter_hpc_service(self, objs):
+ hpcService = self.get_hpc_service()
+
+ filtered_objs = []
+ for x in objs:
+ if ((x.hpcService == hpcService) or
+ ((x.serviceProvider != None) and (x.serviceProvider.hpcService == hpcService)) or
+ ((x.contentProvider != None) and (x.contentProvider.serviceProvider.hpcService == hpcService)) or
+ ((x.cdnPrefix!= None) and (x.cdnPrefix.contentProvider.serviceProvider.hpcService == hpcService))):
+ filtered_objs.append(x)
+
+ return filtered_objs
+
+ def fetch_pending(self, deleted):
+ return self.filter_hpc_service(SyncStep.fetch_pending(self, deleted))
+
+ def consistency_check(self):
+ # set to true if something changed
+ result=False
+
+ # sanity check to make sure our PS objects have CMI objects behind them
+ all_map_ids = [x["map_id"] for x in self.client.onev.ListAll("Map")]
+ for map in SiteMap.objects.all():
+ if (map.map_id is not None) and (map.map_id not in all_map_ids):
+ logger.info("Map %s was not found on CMI" % map.map_id,extra=map.tologdict())
+ map.map_id=None
+ map.save()
+ result = True
+
+ return result
+
+ def update_bind(self, map, map_dict, field_name, to_name, ids):
+ for id in ids:
+ if (not id in map_dict.get(field_name, [])):
+ print "Bind Map", map.map_id, "to", to_name, id
+ self.client.onev.Bind("Map", map.map_id, to_name, id)
+
+ for id in map_dict.get(field_name, []):
+ if (not id in ids):
+ print "Unbind Map", map.map_id, "from", to_name, id
+ self.client.onev.UnBind("map", map.map_id, to_name, id)
+
+ def sync_record(self, map):
+ logger.info("sync'ing SiteMap %s" % str(map),extra=map.tologdict())
+
+ if not map.map:
+ # no contents
+ return
+
+ content = map.map.read()
+
+ map_dict = {"name": map.name, "type": "site", "content": content}
+
+ cdn_prefix_ids=[]
+ service_provider_ids=[]
+ content_provider_ids=[]
+
+ if (map.contentProvider):
+ if not map.contentProvider.content_provider_id:
+ raise Exception("Map %s links to a contentProvider with no id" % map.name)
+ conent_provider_ids = [map.contentProvider.content_provider_id]
+
+ if (map.serviceProvider):
+ if not map.serviceProvider.service_provider_id:
+ raise Exception("Map %s links to a serviceProvider with no id" % map.name)
+ service_provider_ids = [map.serviceProvider.service_provider_id]
+
+ if (map.cdnPrefix):
+ if not map.cdnPrefix.cdn_prefix_id:
+ raise Exception("Map %s links to a cdnPrefix with no id" % map.name)
+ cdn_prefix_ids = [map.cdnPrefix.cdn_prefix_id]
+
+ if not map.map_id:
+ print "Create Map", map_dict
+ id = self.client.onev.Create("Map", map_dict)
+ map.map_id = id
+ else:
+ print "Update Map", map_dict
+ # these things we probably cannot update
+ del map_dict["name"]
+ self.client.onev.Update("Map", map.map_id, map_dict)
+
+ cmi_map_dict = self.client.onev.Read("Map", map.map_id)
+
+ self.update_bind(map, cmi_map_dict, "cdn_prefix_ids", "CDNPrefix", cdn_prefix_ids)
+
+ map.save()
+
+ def delete_record(self, m):
+ if m.map_id is not None:
+ self.client.onev.Delete("Map", m.map_id)
diff --git a/xos/synchronizer/stop.sh b/xos/synchronizer/stop.sh
new file mode 100755
index 0000000..780e25c
--- /dev/null
+++ b/xos/synchronizer/stop.sh
@@ -0,0 +1 @@
+pkill -9 -f hpc-synchronizer.py
diff --git a/xos/synchronizer/supervisor/hpc-observer.conf b/xos/synchronizer/supervisor/hpc-observer.conf
new file mode 100644
index 0000000..f2c79d4
--- /dev/null
+++ b/xos/synchronizer/supervisor/hpc-observer.conf
@@ -0,0 +1,2 @@
+[program:hpc-observer]
+command=python /opt/xos/observers/hpc/hpc-observer.py -C /opt/xos/observers/hpc/hpc_observer_config
diff --git a/xos/synchronizer/supervisor/hpc-watcher.conf b/xos/synchronizer/supervisor/hpc-watcher.conf
new file mode 100644
index 0000000..e0f4eb1
--- /dev/null
+++ b/xos/synchronizer/supervisor/hpc-watcher.conf
@@ -0,0 +1,2 @@
+[program:hpc-watcher]
+command=python /opt/xos/observers/hpc/hpc_watcher.py