blob: 29b8c288642c1fbdc8172922bd82bb34cc4173a6 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
39class BigQueryAnalytics:
40 def __init__(self, table = "demoevents"):
41 self.projectName = "vicci"
42 self.tableName = table
43 self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
44 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
45
46 def fetch_mapping(self, m=0, table="events"):
47 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
48 resp = requests.get(req)
49 if (resp.status_code==200):
50 return resp.text
51 else:
52 raise Exception('Error accessing register allocations: %d'%resp.status_code)
53
54 def run_query_raw(self, query):
55 p = re.compile('%[a-zA-z_]*')
56 query = p.sub(self.remap, query)
57
58 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59 credentials = storage.get()
60
61 if credentials is None or credentials.invalid:
62 credentials = run(FLOW, storage)
63
64 http = httplib2.Http()
65 http = credentials.authorize(http)
66
67 service = build('bigquery', 'v2', http=http)
68
Scott Baker08172092014-03-20 15:07:06 -070069 body = {"query": query,
70 "timeoutMs": 30000}
Scott Baker43adf1b2014-03-19 21:54:55 -070071 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
72
73 return response
74
75 def translate_schema(self, response):
76 for field in response["schema"]["fields"]:
77 field["name"] = self.reverse_mapping.get(field["name"], field["name"])
78
79 def run_query(self, query):
80 response = self.run_query_raw(query)
81
82 fieldNames = []
83 for field in response["schema"]["fields"]:
84 fieldNames.append(field["name"])
85
86 result = []
87 if "rows" in response:
88 for row in response["rows"]:
89 this_result = {}
90 for (i,column) in enumerate(row["f"]):
91 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
92 result.append(this_result)
93
94 return result
95
96 def remap(self, match):
97 token = match.group()[1:]
98 if token in self.mapping:
99 return self.mapping[token]
100 else:
101 raise Exception('unknown token %s' % token)
102
103 def dump_table(self, rows, keys=None):
104 if not keys:
105 keys = rows[0].keys()
106
107 lens = {}
108 for key in keys:
109 lens[key] = len(key)
110
111 for row in rows:
112 for key in keys:
113 thislen = len(str(row.get(key,"")))
114 lens[key] = max(lens.get(key,0), thislen)
115
116 for key in keys:
117 print "%*s" % (lens[key], key),
118 print
119
120 for row in rows:
121 for key in keys:
122 print "%*s" % (lens[key], str(row.get(key,""))),
123 print
124
125def main():
126 bq = BigQueryAnalytics()
127
128 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
129
130 bq.dump_table(rows)
131
132if __name__ == "__main__":
133 main()