remove obsolete hpc_wizard
diff --git a/xos/hpc_wizard/README b/xos/hpc_wizard/README
deleted file mode 100644
index 06b12b5..0000000
--- a/xos/hpc_wizard/README
+++ /dev/null
@@ -1,10 +0,0 @@
-Two files are purposely not included in the git repository:
- bigquery_credentials.dat
- client_secrets.json
-
-These files must be manually installed.
-
-Additionally, the following packages must be installed:
- yum -y install python-httplib2
- easy_install python_gflags
- easy_install google_api_python_client
diff --git a/xos/hpc_wizard/bigquery_analytics.py b/xos/hpc_wizard/bigquery_analytics.py
deleted file mode 100644
index cb3038a..0000000
--- a/xos/hpc_wizard/bigquery_analytics.py
+++ /dev/null
@@ -1,290 +0,0 @@
-import re
-import base64
-import requests
-import urllib
-import json
-import httplib2
-import threading
-import os
-import sys
-import time
-import traceback
-
-from apiclient.discovery import build
-from apiclient.errors import HttpError
-from oauth2client.client import AccessTokenRefreshError
-from oauth2client.client import OAuth2WebServerFlow
-from oauth2client.client import flow_from_clientsecrets
-from oauth2client.file import Storage
-from oauth2client.tools import run_flow,run
-
-from bigquery_config import BIGQUERY_SECRETS_FN, BIGQUERY_CREDENTIALS_FN
-
-"""
-yum -y install python-httplib2
-easy_install python_gflags
-easy_install google_api_python_client
-"""
-
-PROJECT_NUMBER = '549187599759'
-
-try:
- FLOW = flow_from_clientsecrets(BIGQUERY_SECRETS_FN,
- scope='https://www.googleapis.com/auth/bigquery')
- BIGQUERY_AVAILABLE = True
-except:
- print >> sys.stderr, "exception while initializing bigquery flow"
- traceback.print_exc()
- FLOW = None
- BIGQUERY_AVAILABLE = False
-
-MINUTE_MS = 60*1000
-HOUR_MS = 60*60*1000
-
-# global to hold cached mappings
-mappings = {}
-reverse_mappings = {}
-
-def to_number(s):
- try:
- if "." in str(s):
- return float(s)
- else:
- return int(s)
- except:
- return 0
-
-class MappingException(Exception):
- pass
-
-class BigQueryAnalytics:
- def __init__(self, table = "demoevents"):
- self.projectName = "vicci"
- self.tableName = table
-
- def reload_mapping(self):
- global mappings, reverse_mappings
- mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
- reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
-
- def fetch_mapping(self, m=0, table="events"):
- req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
- resp = requests.get(req)
- if (resp.status_code==200):
- return resp.text
- else:
- raise Exception('Error accessing register allocations: %d'%resp.status_code)
-
- def run_query_raw(self, query):
- try:
- file("/tmp/query_log","a").write("query %s\n" % query)
- except:
- pass
-
- p = re.compile('%[a-zA-z_]*')
-
- try:
- query = p.sub(self.remap, query)
- except MappingException:
- self.reload_mapping()
- query = p.sub(self.remap, query)
-
- try:
- file("/tmp/query_log","a").write("remapped query %s\n" % query)
- except:
- pass
-
- storage = Storage(BIGQUERY_CREDENTIALS_FN)
- credentials = storage.get()
-
- if credentials is None or credentials.invalid:
- credentials = run(FLOW, storage)
-
- http = httplib2.Http()
- http = credentials.authorize(http)
-
- service = build('bigquery', 'v2', http=http)
-
- body = {"query": query,
- "timeoutMs": 60000}
- response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
-
- return response
-
- def translate_schema(self, response):
- for field in response["schema"]["fields"]:
- field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
-
- def run_query(self, query):
- if not BIGQUERY_AVAILABLE:
- print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
- return []
-
- response = self.run_query_raw(query)
-
- fieldNames = []
- for field in response["schema"]["fields"]:
- fieldNames.append(field["name"])
-
- result = []
- if "rows" in response:
- for row in response["rows"]:
- this_result = {}
- for (i,column) in enumerate(row["f"]):
- this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
- result.append(this_result)
-
- return result
-
- """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
- are all used for postprocessing queries. The idea is to do one query that
- includes the ungrouped and unfiltered data, and cache it for multiple
- consumers who will filter and group it as necessary.
-
- TODO: Find a more generalized source for these sorts operations. Perhaps
- put the results in SQLite and then run SQL queries against it.
- """
-
- def filter_results(self, rows, name, value):
- result = [row for row in rows if row.get(name)==value]
- return result
-
- def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
- new_rows = {}
- for row in rows:
- groupby_key = [row.get(k, None) for k in groupBy]
-
- if str(groupby_key) not in new_rows:
- new_row = {}
- for k in groupBy:
- new_row[k] = row.get(k, None)
-
- new_rows[str(groupby_key)] = new_row
- else:
- new_row = new_rows[str(groupby_key)]
-
- for k in sum:
- new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
-
- for k in avg:
- new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
- new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
-
- for k in maxi:
- new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
-
- for k in count:
- v = row.get(k,None)
- dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
- if (v not in dl):
- dl.append(v)
-
- #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
-
- for row in new_rows.values():
- for k in avg:
- row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
- del row["avg_base_" + k]
-
- for k in count:
- new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
-
- return new_rows.values()
-
- def do_computed_fields(self, rows, computed=[]):
- computedFieldNames=[]
- for row in rows:
- for k in computed:
- if "/" in k:
- parts = k.split("/")
- computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
- try:
- row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
- except:
- pass
-
- if computedFieldName not in computedFieldNames:
- computedFieldNames.append(computedFieldName)
- return (computedFieldNames, rows)
-
- def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
- sum = [x.replace("%","") for x in sum]
- count = [x.replace("%","") for x in count]
- avg = [x.replace("%","") for x in avg]
- computed = [x.replace("%","") for x in computed]
- maxi = [x.replace("%","") for x in maxi]
- groupBy = [x.replace("%","") for x in groupBy]
-
- for (k,v) in filter.items():
- rows = self.filter_results(rows, k, v)
-
- if rows:
- if maxDeltaTime is not None:
- maxTime = max([float(row["time"]) for row in rows])
- rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
-
- (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
- sum = sum + computedFieldNames
- if groupBy:
- rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
- return rows
-
- def remap(self, match):
- if not self.tableName in mappings:
- raise MappingException("no mapping for table %s" % self.tableName)
-
- mapping = mappings[self.tableName]
-
- token = match.group()[1:]
- if token in mapping:
- return mapping[token]
- else:
- raise MappingException('unknown token %s' % token)
-
- def dump_table(self, rows, keys=None):
- if not keys:
- keys = rows[0].keys()
-
- lens = {}
- for key in keys:
- lens[key] = len(key)
-
- for row in rows:
- for key in keys:
- thislen = len(str(row.get(key,"")))
- lens[key] = max(lens.get(key,0), thislen)
-
- for key in keys:
- print "%*s" % (lens[key], key),
- print
-
- for row in rows:
- for key in keys:
- print "%*s" % (lens[key], str(row.get(key,""))),
- print
-
- def schema_to_cols(self, schema):
- fields = schema["fields"]
-
- colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
-
- cols = []
- i=0
- for field in fields:
- col = {"type": colTypes[field["type"]],
- "id": "Col%d" % i,
- "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
- cols.append(col)
- i=i+1
-
- return cols
-
-def main():
- bq = BigQueryAnalytics()
-
- rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
-
- bq.dump_table(rows)
-
-if __name__ == "__main__":
- main()
diff --git a/xos/hpc_wizard/bigquery_config.py b/xos/hpc_wizard/bigquery_config.py
deleted file mode 100644
index 345acf2..0000000
--- a/xos/hpc_wizard/bigquery_config.py
+++ /dev/null
@@ -1,3 +0,0 @@
-BIGQUERY_DIR = "/opt/xos"
-BIGQUERY_SECRETS_FN = os.path.join(BIGQUERY_DIR, "hpc_wizard/client_secrets.json")
-BIGQUERY_CREDENTIALS_FN = os.path.join(BIGQUERY_DIR, "hpc_wizard/bigquery_credentials.dat")
diff --git a/xos/hpc_wizard/query.py b/xos/hpc_wizard/query.py
deleted file mode 100644
index 874022e..0000000
--- a/xos/hpc_wizard/query.py
+++ /dev/null
@@ -1,278 +0,0 @@
-import re
-import base64
-import requests
-import urllib
-import json
-import httplib2
-import threading
-import os
-import time
-import traceback
-
-from apiclient.discovery import build
-from apiclient.errors import HttpError
-from oauth2client.client import AccessTokenRefreshError
-from oauth2client.client import OAuth2WebServerFlow
-from oauth2client.client import flow_from_clientsecrets
-from oauth2client.file import Storage
-from oauth2client.tools import run_flow,run
-
-from bigquery_config import BIGQUERY_SECRETS_FN, BIGQUERY_CREDENTIALS_FN
-
-"""
-yum -y install python-httplib2
-easy_install python_gflags
-easy_install google_api_python_client
-"""
-
-
-PROJECT_NUMBER = '549187599759'
-
-try:
- FLOW = flow_from_clientsecrets(BIGQUERY_SECRETS_FN,
- scope='https://www.googleapis.com/auth/bigquery')
-except:
- print "exception while initializing bigquery flow"
- traceback.print_exc()
- FLOW = None
-
-MINUTE_MS = 60*1000
-HOUR_MS = 60*60*1000
-
-class HpcQuery:
- def __init__(self):
- self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
- self.reverse_mapping = {v:k for k, v in self.mapping.items()}
-
- def fetch_mapping(self, m=0, table="events"):
- req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
- resp = requests.get(req)
- if (resp.status_code==200):
- return resp.text
- else:
- raise Exception('Error accessing register allocations: %d'%resp.status_code)
-
- def run_query_old(self, query):
- req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
- resp = requests.get(req)
- if (resp.status_code==200):
- return resp.text
- else:
- raise Exception('Error running query: %d'%resp.status_code)
- return resp
-
- def run_query(self, query):
- storage = Storage(BIGQUERY_CREDENTIALS_FN)
- credentials = storage.get()
-
- if credentials is None or credentials.invalid:
- credentials = run(FLOW, storage)
-
- http = httplib2.Http()
- http = credentials.authorize(http)
-
- service = build('bigquery', 'v2', http=http)
-
- body = {"query": query}
- response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
-
- fieldNames = []
- for field in response["schema"]["fields"]:
- fieldNames.append(field["name"])
-
- result = []
- if "rows" in response:
- for row in response["rows"]:
- this_result = {}
- for (i,column) in enumerate(row["f"]):
- this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
- result.append(this_result)
-
- return result
-
- def remap(self, match):
- token = match.group()[1:]
- if token in self.mapping:
- return self.mapping[token]
- else:
- raise Exception('unknown token %s' % token)
-
- def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
- where = []
- if slice is not None:
- where.append("%slice='" + slice + "'")
- if cp is not None:
- where.append("%cp='" + cp + "'")
- if hostname is not None:
- where.append("%hostname='" + hostname + "'")
- if site is not None:
- where.append("%hostname contains " + site)
- where.append("%bytes_sent>0")
- where = "WHERE " + " AND ".join(where)
-
- if timeStart is not None:
- tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
- else:
- tableName = "[vicci.demoevents]"
-
- query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
- " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
- tableName + " " + where
-
- if groupBy:
- query = query + " GROUP BY " + ",".join(groupBy)
-
- p = re.compile('%[a-zA-z_]*')
- query = p.sub(self.remap, query)
-
- rows = self.run_query(query)
-
- for row in rows:
- row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
- row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
- row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
- row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
-
- elapsed = (timeStop-timeStart)/1000
- KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
- row["KBps"] = KBps
-
- return rows
-
- def sites_from_usage(self, rows, nodes_to_sites={}):
- sites = {}
- for row in rows:
- hostname = row["hostname"]
-
- if hostname in nodes_to_sites:
- site_name = nodes_to_sites[hostname]
- else:
- parts = hostname.split(".")
- if len(parts)<=2:
- continue
- site_name = parts[1]
-
- if not (site_name in sites):
- row = row.copy()
- row["site"] = site_name
- row["max_avg_bandwidth"] = row["avg_bandwidth"]
- # sites table doesn't care about hostnames or avg_bandwidth
- del row["hostname"]
- del row["avg_bandwidth"]
- sites[site_name] = row
- else:
- site_row = sites[site_name]
- site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
- site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
- site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
- site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
-
- return sites.values()
-
- def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
- rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
-
- return self.sites_from_usage(rows)
-
- def dump_table(self, rows, keys=None):
- if not keys:
- keys = rows[0].keys()
-
- lens = {}
- for key in keys:
- lens[key] = len(key)
-
- for row in rows:
- for key in keys:
- thislen = len(str(row.get(key,"")))
- lens[key] = max(lens.get(key,0), thislen)
-
- for key in keys:
- print "%*s" % (lens[key], key),
- print
-
- for row in rows:
- for key in keys:
- print "%*s" % (lens[key], str(row.get(key,""))),
- print
-
-class HpcQueryThread(HpcQuery, threading.Thread):
- def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
- threading.Thread.__init__(self)
- HpcQuery.__init__(self)
- self.daemon = True
- self.interval = interval
- self.timeStart = timeStart
- self.nodes_to_sites = nodes_to_sites
- self.slice = slice
- self.cp = cp
- self.data_version = 0
- self.please_die = False
- self.update_time = time.time()
- self.start()
-
- def is_stalled(self):
- if time.time()-self.update_time > 300:
- return True
- else:
- return False
-
- def run(self):
- while not self.please_die:
- try:
- self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
- self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
- self.update_time = time.time()
- self.new_data()
- self.data_version += 1
- except:
- file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
- time.sleep(self.interval)
-
- def new_data(self):
- pass
-
-class HpcDumpThread(HpcQueryThread):
- def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
- HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
-
- def new_data(self):
- os.system("clear")
-
- print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
- print
-
- self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
- self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
-
-
-def main_old():
- hq = HpcQuery()
-# print hq.mapping
-
- print "5 minute"
- hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
- hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
-
- print "1 hour"
- hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
- hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
-
- print "24 hours"
- hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
- print
-
-def main():
- hd = HpcDumpThread()
- while True:
- time.sleep(30)
-
-if __name__ == "__main__":
- main()
diff --git a/xos/hpc_wizard/xos_analytics.py b/xos/hpc_wizard/xos_analytics.py
deleted file mode 100644
index 9502aa1..0000000
--- a/xos/hpc_wizard/xos_analytics.py
+++ /dev/null
@@ -1,475 +0,0 @@
-from bigquery_analytics import BigQueryAnalytics, BIGQUERY_AVAILABLE
-import datetime
-import re
-import os
-import sys
-import time
-import json
-import traceback
-import urllib2
-
-# XXX hardcoded path
-sys.path.append("/opt/xos")
-
-os.environ.setdefault("DJANGO_SETTINGS_MODULE", "xos.settings")
-from django.conf import settings
-from django import db
-from django.db import connection
-from core.models import Slice, Instance, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
-
-BLUE_LOAD=5000000
-RED_LOAD=15000000
-
-glo_cached_queries = {}
-
-class XOSAnalytics(BigQueryAnalytics):
- def __init__(self, tableName=None):
- if not tableName:
- tableName = settings.BIGQUERY_TABLE
-
- BigQueryAnalytics.__init__(self, tableName)
-
- def service_to_sliceNames(self, serviceName):
- service=Service.objects.get(name=serviceName)
- try:
- slices = service.slices.all()
- except:
- # BUG in data model -- Slice.service has related name 'service' and
- # it should be 'slices'
- slices = service.service.all()
-
- return [slice.name for slice in slices]
-
- def compose_query(self, filter={}, timeBucket="60", avg=[], sum=[], count=[], computed=[], val=[], groupBy=["Time"], orderBy=["Time"], tableName=None, latest=False, maxAge=60*60):
- if tableName is None:
- tableName = self.tableName
-
- maxAge = maxAge * 1000
- tablePart = "[%s.%s@-%d--1]" % ("vicci", tableName, maxAge)
-
- fields = []
- fieldNames = []
- srcFieldNames = ["time"]
-
- fields.append("SEC_TO_TIMESTAMP(INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s) as Time" % (str(timeBucket),str(timeBucket)))
- #fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/%s)*%s as Time" % (str(timeBucket),str(timeBucket)))
-
- for fieldName in avg:
- fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
- fieldNames.append("avg_%s" % fieldName.replace("%",""))
- srcFieldNames.append(fieldName)
-
- for fieldName in sum:
- fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
- fieldNames.append("sum_%s" % fieldName.replace("%",""))
- srcFieldNames.append(fieldName)
-
- for fieldName in count:
- fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
- fieldNames.append("count_%s" % fieldName.replace("%",""))
- srcFieldNames.append(fieldName)
-
- for fieldName in val:
- fields.append(fieldName)
- fieldNames.append(fieldName)
- srcFieldNames.append(fieldName)
-
- for fieldName in computed:
- operator = "/"
- parts = fieldName.split("/")
- computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
- if len(parts)==1:
- operator = "*"
- parts = computed.split("*")
- computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
- fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
- fieldNames.append(computedFieldName)
- srcFieldNames.append(parts[0])
- srcFieldNames.append(parts[1])
-
- for fieldName in groupBy:
- if (fieldName not in ["Time"]):
- fields.append(fieldName)
- fieldNames.append(fieldName)
- srcFieldNames.append(fieldName)
-
- fields = ", ".join(fields)
-
- where = []
-
- if filter.get("slice",None):
- where.append("%%slice='%s'" % filter["slice"])
- if filter.get("site",None):
- where.append("%%site='%s'" % filter["site"])
- if filter.get("node",None):
- where.append("%%hostname='%s'" % filter["node"])
- if filter.get("event",None):
- where.append("event='%s'" % filter["event"])
- if filter.get("service",None):
- sliceNames = self.service_to_sliceNames(filter["service"])
- if sliceNames:
- where.append("(" + " OR ".join(["%%slice='%s'" % sliceName for sliceName in sliceNames]) +")")
-
- if where:
- where = " WHERE " + " AND ".join(where)
- else:
- where =""
-
- if groupBy:
- groupBySub = " GROUP BY " + ",".join(groupBy + ["%hostname"])
- groupBy = " GROUP BY " + ",".join(groupBy)
- else:
- groupBySub = " GROUP BY %hostname"
- groupBy = ""
-
- if orderBy:
- orderBy = " ORDER BY " + ",".join(orderBy)
- else:
- orderBy = ""
-
- if latest:
- latestFields = ["table1.%s as %s" % (x,x) for x in srcFieldNames]
- latestFields = ", ".join(latestFields)
- tablePart = """(SELECT %s FROM %s AS table1
- JOIN
- (SELECT %%hostname, event, max(time) as maxtime from %s GROUP BY %%hostname, event) AS latest
- ON
- table1.%%hostname = latest.%%hostname AND table1.event = latest.event AND table1.time = latest.maxtime)""" % (latestFields, tablePart, tablePart)
-
- if computed:
- subQuery = "SELECT %%hostname, %s FROM %s" % (fields, tablePart)
- if where:
- subQuery = subQuery + where
- subQuery = subQuery + groupBySub
-
- sumFields = []
- for fieldName in fieldNames:
- if fieldName.startswith("avg"):
- sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
- sumFields.append("MAX(%s) as max_%s"%(fieldName,fieldName))
- elif (fieldName.startswith("count")) or (fieldName.startswith("sum")) or (fieldName.startswith("computed")):
- sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
- else:
- sumFields.append(fieldName)
-
- sumFields = ",".join(sumFields)
-
- query = "SELECT %s, %s FROM (%s)" % ("Time", sumFields, subQuery)
- if groupBy:
- query = query + groupBy
- if orderBy:
- query = query + orderBy
- else:
- query = "SELECT %s FROM %s" % (fields, tablePart)
- if where:
- query = query + " " + where
- if groupBy:
- query = query + groupBy
- if orderBy:
- query = query + orderBy
-
- return query
-
- def get_list_from_req(self, req, name, default=[]):
- value = req.GET.get(name, None)
- if not value:
- return default
- value=value.replace("@","%")
- return value.split(",")
-
- def format_result(self, format, result, query, dataSourceUrl):
- if not BIGQUERY_AVAILABLE:
- msg = "BigQuery Statistics Unavaiable"
- else:
- msg = None
-
- if (format == "json_dicts"):
- result = {"query": query, "rows": result, "dataSourceUrl": dataSourceUrl, "msg": msg}
- return ("application/javascript", json.dumps(result))
-
- elif (format == "json_arrays"):
- new_result = []
- for row in result:
- new_row = []
- for key in sorted(row.keys()):
- new_row.append(row[key])
- new_result.append(new_row)
- new_result = {"query": query, "rows": new_result, "msg": msg}
- return ("application/javascript", json.dumps(new_result))
-
- elif (format == "html_table"):
- new_rows = []
- for row in result:
- new_row = []
- for key in sorted(row.keys()):
- new_row.append("<TD>%s</TD>" % str(row[key]))
- new_rows.append("<TR>%s</TR>" % "".join(new_row))
-
- new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
-
- return ("text/html", new_result)
-
- def merge_datamodel_sites(self, rows, slice=None):
- """ For a query that included "site" in its groupby, merge in the
- opencloud site information.
- """
-
- if slice:
- try:
- slice = Slice.objects.get(name=slice)
- except:
- slice = None
-
- for row in rows:
- sitename = row["site"]
- try:
- model_site = Site.objects.get(name=sitename)
- except:
- # we didn't find it in the data model
- continue
-
- allocated_instances = 0
- if model_site and slice:
- for instance in slice.instances.all():
- if instance.node.site == model_site:
- allocated_instances = allocated_instances + 1
-
- row["lat"] = float(model_site.location.latitude)
- row["long"] = float(model_site.location.longitude)
- row["url"] = model_site.site_url
- row["numNodes"] = model_site.nodes.count()
- row["allocated_instances"] = allocated_instances
-
- max_cpu = row.get("max_avg_cpu", row.get("max_cpu",0))
- cpu=float(max_cpu)/100.0
- row["hotness"] = max(0.0, ((cpu*RED_LOAD) - BLUE_LOAD)/(RED_LOAD-BLUE_LOAD))
-
- def compose_cached_query(self, querySpec='default'):
- """ Compose a query that returns the 'most recent' row for each (hostname, event)
- pair.
-
- Note that groupByFields cannot contain any values that are 'Null' or those
- rows will be excluded. For example, if groupByFields includes cp, then
- there will be no libvirt_event rows, since libvirt_event does not have
- cp.
-
- This means we can't really have 'one query to rule them'. Settle on
- having a couple of different queries, and have the caller specify
- which one he wants.
- """
-
- fieldNames = ["%hostname", "%bytes_sent", "%bytes_hit", "%healthy", "time", "event", "%site", "%elapsed", "%cpu"]
-
- if querySpec=="default":
- groupByFields = ["%hostname", "event"]
- elif (querySpec=="hpc"):
- fieldNames.append("%cp")
- groupByFields = ["%hostname", "event", "%cp"]
- else:
- raise ValueError("Unknown queryspec %s" % querySpec)
-
- fields = ["table1.%s AS %s" % (x,x) for x in fieldNames]
- fields = ", ".join(fields)
-
- tableDesc = "%s.%s" % (self.projectName, self.tableName)
-
- groupByOn = ["table1.time = latest.maxtime"]
- for field in groupByFields:
- groupByOn.append("table1.%s = latest.%s" % (field, field))
-
- groupByOn = " AND ".join(groupByOn)
- groupByFields = ", ".join(groupByFields)
-
- base_query = "SELECT %s FROM [%s@-3600000--1] AS table1 JOIN (SELECT %s, max(time) as maxtime from [%s@-3600000--1] GROUP BY %s) AS latest ON %s" % \
- (fields, tableDesc, groupByFields, tableDesc, groupByFields, groupByOn)
-
- return base_query
-
- def get_cached_query_results(self, q, wait=True):
- global glo_cached_queries
-
- if q in glo_cached_queries:
- if (time.time() - glo_cached_queries[q]["time"]) <= 60:
- print "using cached query"
- return glo_cached_queries[q]["rows"]
-
- if not wait:
- return None
-
- print "refreshing cached query"
- result = self.run_query(q)
- glo_cached_queries[q] = {"time": time.time(), "rows": result}
-
- return result
-
- def process_request(self, req):
- print req.GET
-
- tqx = req.GET.get("tqx", None)
-
- slice = req.GET.get("slice", None)
- site = req.GET.get("site", None)
- node = req.GET.get("node", None)
- service = req.GET.get("service", None)
- event = req.GET.get("event", "libvirt_heartbeat")
- cp = req.GET.get("cp", None)
-
- format = req.GET.get("format", "json_dicts")
-
- timeBucket = int(req.GET.get("timeBucket", 60))
- avg = self.get_list_from_req(req, "avg")
- sum = self.get_list_from_req(req, "sum")
- count = self.get_list_from_req(req, "count")
- computed = self.get_list_from_req(req, "computed")
- groupBy = self.get_list_from_req(req, "groupBy", ["Time"])
- orderBy = self.get_list_from_req(req, "orderBy", ["Time"])
-
- maxRows = req.GET.get("maxRows", None)
- mergeDataModelSites = req.GET.get("mergeDataModelSites", None)
-
- maxAge = int(req.GET.get("maxAge", 60*60))
-
- cached = req.GET.get("cached", None)
- cachedGroupBy = self.get_list_from_req(req, "cachedGroupBy", ["doesnotexist"])
-
- filter={}
- if slice:
- filter["slice"] = slice
- if site:
- filter["site"] = site
- if node:
- filter["hostname"] = node
- if event:
- filter["event"] = event
- if cp:
- filter["cp"] = cp
-
- q = self.compose_query(filter, timeBucket, avg, sum, count, computed, [], groupBy, orderBy, maxAge=maxAge)
-
- print q
-
- dataSourceUrl = "http://" + req.META["SERVER_NAME"] + ":" + req.META["SERVER_PORT"] + req.META["PATH_INFO"] + "?" + req.META["QUERY_STRING"].replace("format=","origFormat=").replace("%","%25") + "&format=charts";
-
- if (format=="dataSourceUrl"):
- result = {"dataSourceUrl": dataSourceUrl}
- return ("application/javascript", result)
-
- elif (format=="raw"):
- result = self.run_query_raw(q)
- result["dataSourceUrl"] = dataSourceUrl
-
- result = json.dumps(result);
-
- return ("application/javascript", result)
-
- elif (format=="nodata"):
- result = {"dataSourceUrl": dataSourceUrl, "query": q}
- result = json.dumps(result);
- return {"application/javascript", result}
-
- elif (format=="charts"):
- bq_result = self.run_query_raw(q)
-
- # cloudscrutiny code is probably better!
- table = {}
- table["cols"] = self.schema_to_cols(bq_result["schema"])
- rows = []
- if "rows" in bq_result:
- for row in bq_result["rows"]:
- rowcols = []
- for (colnum,col) in enumerate(row["f"]):
- if (colnum==0):
- dt = datetime.datetime.fromtimestamp(float(col["v"]))
- rowcols.append({"v": 'new Date("%s")' % dt.isoformat()})
- else:
- try:
- rowcols.append({"v": float(col["v"])})
- except:
- rowcols.append({"v": col["v"]})
- rows.append({"c": rowcols})
- table["rows"] = rows
-
- if tqx:
- reqId = tqx.strip("reqId:")
- else:
- reqId = "0"
-
- result = {"status": "okColumnChart", "reqId": reqId, "table": table, "version": "0.6"}
-
- result = "google.visualization.Query.setResponse(" + json.dumps(result) + ");"
-
- def unquote_it(x): return x.group()[1:-1].replace('\\"', '"')
-
- p = re.compile(r'"new Date\(\\"[^"]*\\"\)"')
- result=p.sub(unquote_it, result)
-
- return ("application/javascript", result)
-
- else:
- if cached:
- results = self.get_cached_query_results(self.compose_cached_query(cached))
-
- result = self.postprocess_results(results, filter=filter, sum=sum, count=count, avg=avg, computed=computed, maxDeltaTime=120, groupBy=cachedGroupBy)
- else:
- result = self.run_query(q)
-
- if maxRows:
- result = result[-int(maxRows):]
-
- if mergeDataModelSites:
- self.merge_datamodel_sites(result)
-
- return self.format_result(format, result, q, dataSourceUrl)
-
-def DoXOSAnalytics(request):
- bq = XOSAnalytics()
- result = bq.process_request(request)
-
- return result
-
-def main():
- bq = XOSAnalytics(tableName="demoevents")
-
- q = bq.compose_cached_query()
- results = bq.run_query(q)
-
- #results = bq.postprocess_results(results,
- # filter={"slice": "HyperCache"},
- # groupBy=["site"],
- # computed=["bytes_sent/elapsed"],
- # sum=["bytes_sent", "computed_bytes_sent_div_elapsed"], avg=["cpu"],
- # maxDeltaTime=60)
-
- #results = bq.postprocess_results(results, filter={"slice": "HyperCache"}, maxi=["cpu"], count=["hostname"], computed=["bytes_sent/elapsed"], groupBy=["Time", "site"], maxDeltaTime=80)
-
- results = bq.postprocess_results(results,filter={"event": "libvirt_heartbeat"}, avg=["cpu"], count=["hostname"], groupBy=["doesnotexist"])
-
- bq.dump_table(results)
-
- sys.exit(0)
-
- q=bq.compose_query(sum=["%bytes_sent"], avg=["%cpu"], latest=True, groupBy=["Time", "%site"])
- print q
- bq.dump_table(bq.run_query(q))
-
- q=bq.compose_query(avg=["%cpu","%bandwidth"], count=["%hostname"], slice="HyperCache")
- print q
- bq.dump_table(bq.run_query(q))
-
- q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
- print
- print q
- bq.dump_table(bq.run_query(q))
-
- q=bq.compose_query(timeBucket=60*60, avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"])
- print
- print q
- bq.dump_table(bq.run_query(q))
-
-if __name__ == "__main__":
- main()
-
-
-
-
-