blob: ca08025133071c50d340d8f753838fa86821e930 [file] [log] [blame]
import re
import base64
import requests
import urllib
import json
import httplib2
import threading
import os
import time
import traceback
from apiclient.discovery import build
from apiclient.errors import HttpError
from oauth2client.client import AccessTokenRefreshError
from oauth2client.client import OAuth2WebServerFlow
from oauth2client.client import flow_from_clientsecrets
from oauth2client.file import Storage
from oauth2client.tools import run_flow,run
"""
yum -y install python-httplib2
easy_install python_gflags
easy_install google_api_python_client
"""
PROJECT_NUMBER = '549187599759'
FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
scope='https://www.googleapis.com/auth/bigquery')
MINUTE_MS = 60*1000
HOUR_MS = 60*60*1000
class BigQueryAnalytics:
def __init__(self, table = "demoevents"):
self.projectName = "vicci"
self.tableName = table
self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
self.reverse_mapping = {v:k for k, v in self.mapping.items()}
def fetch_mapping(self, m=0, table="events"):
req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
resp = requests.get(req)
if (resp.status_code==200):
return resp.text
else:
raise Exception('Error accessing register allocations: %d'%resp.status_code)
def run_query_raw(self, query):
p = re.compile('%[a-zA-z_]*')
query = p.sub(self.remap, query)
storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
credentials = storage.get()
if credentials is None or credentials.invalid:
credentials = run(FLOW, storage)
http = httplib2.Http()
http = credentials.authorize(http)
service = build('bigquery', 'v2', http=http)
body = {"query": query}
response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
return response
def translate_schema(self, response):
for field in response["schema"]["fields"]:
field["name"] = self.reverse_mapping.get(field["name"], field["name"])
def run_query(self, query):
response = self.run_query_raw(query)
fieldNames = []
for field in response["schema"]["fields"]:
fieldNames.append(field["name"])
result = []
if "rows" in response:
for row in response["rows"]:
this_result = {}
for (i,column) in enumerate(row["f"]):
this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
result.append(this_result)
return result
def remap(self, match):
token = match.group()[1:]
if token in self.mapping:
return self.mapping[token]
else:
raise Exception('unknown token %s' % token)
def dump_table(self, rows, keys=None):
if not keys:
keys = rows[0].keys()
lens = {}
for key in keys:
lens[key] = len(key)
for row in rows:
for key in keys:
thislen = len(str(row.get(key,"")))
lens[key] = max(lens.get(key,0), thislen)
for key in keys:
print "%*s" % (lens[key], key),
print
for row in rows:
for key in keys:
print "%*s" % (lens[key], str(row.get(key,""))),
print
def main():
bq = BigQueryAnalytics()
rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
bq.dump_table(rows)
if __name__ == "__main__":
main()