blob: 4a90c2b67c9378861869547df0c831d34da61ba0 [file] [log] [blame]
import re
import base64
import requests
import urllib
import json
import httplib2
import threading
import os
import sys
import time
import traceback
from apiclient.discovery import build
from apiclient.errors import HttpError
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import run_flow,run
"""
yum -y install python-httplib2
easy_install python_gflags
easy_install google_api_python_client
"""
PROJECT_NUMBER = '549187599759'
try:
FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
scope='https://www.googleapis.com/auth/bigquery')
BIGQUERY_AVAILABLE = True
except:
print >> sys.stderr, "exception while initializing bigquery flow"
traceback.print_exc()
FLOW = None
BIGQUERY_AVAILABLE = False
MINUTE_MS = 60*1000
HOUR_MS = 60*60*1000
# global to hold cached mappings
mappings = {}
reverse_mappings = {}
def to_number(s):
try:
if "." in str(s):
return float(s)
else:
return int(s)
except:
return 0
class MappingException(Exception):
pass
class BigQueryAnalytics:
def __init__(self, table = "demoevents"):
self.projectName = "vicci"
self.tableName = table
def reload_mapping(self):
global mappings, reverse_mappings
mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
def fetch_mapping(self, m=0, table="events"):
req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
resp = requests.get(req)
if (resp.status_code==200):
return resp.text
else:
raise Exception('Error accessing register allocations: %d'%resp.status_code)
def run_query_raw(self, query):
try:
file("/tmp/query_log","a").write("query %s\n" % query)
except:
pass
p = re.compile('%[a-zA-z_]*')
try:
query = p.sub(self.remap, query)
except MappingException:
self.reload_mapping()
query = p.sub(self.remap, query)
try:
file("/tmp/query_log","a").write("remapped query %s\n" % query)
except:
pass
storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run(FLOW, storage)
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery', 'v2', http=http)
body = {"query": query,
"timeoutMs": 60000}
response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
return response
def translate_schema(self, response):
for field in response["schema"]["fields"]:
field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
def run_query(self, query):
if not BIGQUERY_AVAILABLE:
print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
return []
response = self.run_query_raw(query)
fieldNames = []
for field in response["schema"]["fields"]:
fieldNames.append(field["name"])
result = []
if "rows" in response:
for row in response["rows"]:
this_result = {}
for (i,column) in enumerate(row["f"]):
this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
result.append(this_result)
return result
""" Filter_results, groupby_results, do_computed_fields, and postprocess_results
are all used for postprocessing queries. The idea is to do one query that
includes the ungrouped and unfiltered data, and cache it for multiple
consumers who will filter and group it as necessary.
TODO: Find a more generalized source for these sorts operations. Perhaps
put the results in SQLite and then run SQL queries against it.
"""
def filter_results(self, rows, name, value):
result = [row for row in rows if row.get(name)==value]
return result
def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
new_rows = {}
for row in rows:
groupby_key = [row.get(k, None) for k in groupBy]
if str(groupby_key) not in new_rows:
new_row = {}
for k in groupBy:
new_row[k] = row.get(k, None)
new_rows[str(groupby_key)] = new_row
else:
new_row = new_rows[str(groupby_key)]
for k in sum:
new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
for k in avg:
new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
for k in maxi:
new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
for k in count:
v = row.get(k,None)
dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
if (v not in dl):
dl.append(v)
#new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
for row in new_rows.values():
for k in avg:
row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
del row["avg_base_" + k]
for k in count:
new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
return new_rows.values()
def do_computed_fields(self, rows, computed=[]):
computedFieldNames=[]
for row in rows:
for k in computed:
if "/" in k:
parts = k.split("/")
computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
try:
row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
except:
pass
if computedFieldName not in computedFieldNames:
computedFieldNames.append(computedFieldName)
return (computedFieldNames, rows)
def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
sum = [x.replace("%","") for x in sum]
count = [x.replace("%","") for x in count]
avg = [x.replace("%","") for x in avg]
computed = [x.replace("%","") for x in computed]
maxi = [x.replace("%","") for x in maxi]
groupBy = [x.replace("%","") for x in groupBy]
for (k,v) in filter.items():
rows = self.filter_results(rows, k, v)
if rows:
if maxDeltaTime is not None:
maxTime = max([float(row["time"]) for row in rows])
rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
(computedFieldNames, rows) = self.do_computed_fields(rows, computed)
sum = sum + computedFieldNames
if groupBy:
rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
return rows
def remap(self, match):
if not self.tableName in mappings:
raise MappingException("no mapping for table %s" % self.tableName)
mapping = mappings[self.tableName]
token = match.group()[1:]
if token in mapping:
return mapping[token]
else:
raise MappingException('unknown token %s' % token)
def dump_table(self, rows, keys=None):
if not keys:
keys = rows[0].keys()
lens = {}
for key in keys:
lens[key] = len(key)
for row in rows:
for key in keys:
thislen = len(str(row.get(key,"")))
lens[key] = max(lens.get(key,0), thislen)
for key in keys:
print "%*s" % (lens[key], key),
print
for row in rows:
for key in keys:
print "%*s" % (lens[key], str(row.get(key,""))),
print
def schema_to_cols(self, schema):
fields = schema["fields"]
colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
cols = []
i=0
for field in fields:
col = {"type": colTypes[field["type"]],
"id": "Col%d" % i,
"label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
cols.append(col)
i=i+1
return cols
def main():
bq = BigQueryAnalytics()
rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
bq.dump_table(rows)
if __name__ == "__main__":
main()