blob: 4ddf4f668d4ad66dcf142b6c5e5eec45f7ce8d49 [file] [log] [blame]
import re
import base64
import requests
import urllib
import json
import httplib2
import threading
import os
import time
import traceback
from apiclient.discovery import build
from apiclient.errors import HttpError
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import run_flow,run
"""
yum -y install python-httplib2
easy_install python_gflags
easy_install google_api_python_client
"""
PROJECT_NUMBER = '549187599759'
try:
FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
scope='https://www.googleapis.com/auth/bigquery')
except:
print "exception while initializing bigquery flow"
traceback.print_exc()
FLOW = None
MINUTE_MS = 60*1000
HOUR_MS = 60*60*1000
class HpcQuery:
def __init__(self):
self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
self.reverse_mapping = {v:k for k, v in self.mapping.items()}
def fetch_mapping(self, m=0, table="events"):
req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
resp = requests.get(req)
if (resp.status_code==200):
return resp.text
else:
raise Exception('Error accessing register allocations: %d'%resp.status_code)
def run_query_old(self, query):
req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
resp = requests.get(req)
if (resp.status_code==200):
return resp.text
else:
raise Exception('Error running query: %d'%resp.status_code)
return resp
def run_query(self, query):
storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run(FLOW, storage)
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery', 'v2', http=http)
body = {"query": query}
response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
fieldNames = []
for field in response["schema"]["fields"]:
fieldNames.append(field["name"])
result = []
if "rows" in response:
for row in response["rows"]:
this_result = {}
for (i,column) in enumerate(row["f"]):
this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
result.append(this_result)
return result
def remap(self, match):
token = match.group()[1:]
if token in self.mapping:
return self.mapping[token]
else:
raise Exception('unknown token %s' % token)
def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
where = []
if slice is not None:
where.append("%slice='" + slice + "'")
if cp is not None:
where.append("%cp='" + cp + "'")
if hostname is not None:
where.append("%hostname='" + hostname + "'")
if site is not None:
where.append("%hostname contains " + site)
where.append("%bytes_sent>0")
where = "WHERE " + " AND ".join(where)
if timeStart is not None:
tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
else:
tableName = "[vicci.demoevents]"
query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
" MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
tableName + " " + where
if groupBy:
query = query + " GROUP BY " + ",".join(groupBy)
p = re.compile('%[a-zA-z_]*')
query = p.sub(self.remap, query)
rows = self.run_query(query)
for row in rows:
row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
elapsed = (timeStop-timeStart)/1000
KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
row["KBps"] = KBps
return rows
def sites_from_usage(self, rows, nodes_to_sites={}):
sites = {}
for row in rows:
hostname = row["hostname"]
if hostname in nodes_to_sites:
site_name = nodes_to_sites[hostname]
else:
parts = hostname.split(".")
if len(parts)<=2:
continue
site_name = parts[1]
if not (site_name in sites):
row = row.copy()
row["site"] = site_name
row["max_avg_bandwidth"] = row["avg_bandwidth"]
# sites table doesn't care about hostnames or avg_bandwidth
del row["hostname"]
del row["avg_bandwidth"]
sites[site_name] = row
else:
site_row = sites[site_name]
site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
return sites.values()
def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
return self.sites_from_usage(rows)
def dump_table(self, rows, keys=None):
if not keys:
keys = rows[0].keys()
lens = {}
for key in keys:
lens[key] = len(key)
for row in rows:
for key in keys:
thislen = len(str(row.get(key,"")))
lens[key] = max(lens.get(key,0), thislen)
for key in keys:
print "%*s" % (lens[key], key),
print
for row in rows:
for key in keys:
print "%*s" % (lens[key], str(row.get(key,""))),
print
class HpcQueryThread(HpcQuery, threading.Thread):
def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
threading.Thread.__init__(self)
HpcQuery.__init__(self)
self.daemon = True
self.interval = interval
self.timeStart = timeStart
self.nodes_to_sites = nodes_to_sites
self.slice = slice
self.cp = cp
self.data_version = 0
self.please_die = False
self.update_time = time.time()
self.start()
def is_stalled(self):
if time.time()-self.update_time > 300:
return True
else:
return False
def run(self):
while not self.please_die:
try:
self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
self.update_time = time.time()
self.new_data()
self.data_version += 1
except:
file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
time.sleep(self.interval)
def new_data(self):
pass
class HpcDumpThread(HpcQueryThread):
def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
def new_data(self):
os.system("clear")
print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
print
self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
def main_old():
hq = HpcQuery()
# print hq.mapping
print "5 minute"
hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
print "1 hour"
hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
print "24 hours"
hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
print
def main():
hd = HpcDumpThread()
while True:
time.sleep(30)
if __name__ == "__main__":
main()