blob: a0a027555d54ca2b56da5e2b44ff825467ecd2fe [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
Scott Bakerba60d822014-03-27 09:12:28 -070039# global to hold cached mappings
40mappings = {}
41reverse_mappings = {}
42
Scott Bakerc655e662014-04-18 10:46:25 -070043def to_number(s):
44 try:
45 if "." in str(s):
46 return float(s)
47 else:
48 return int(s)
49 except:
50 return 0
51
Scott Bakerba60d822014-03-27 09:12:28 -070052class MappingException(Exception):
53 pass
54
Scott Baker43adf1b2014-03-19 21:54:55 -070055class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070059
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070064
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
69 return resp.text
70 else:
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73 def run_query_raw(self, query):
Scott Baker58c83962014-04-24 17:04:55 -070074 try:
75 file("/tmp/query_log","a").write("query %s\n" % query)
76 except:
77 pass
78
Scott Baker43adf1b2014-03-19 21:54:55 -070079 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070080
81 try:
82 query = p.sub(self.remap, query)
83 except MappingException:
84 self.reload_mapping()
85 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070086
Scott Baker58c83962014-04-24 17:04:55 -070087 try:
88 file("/tmp/query_log","a").write("remapped query %s\n" % query)
89 except:
90 pass
91
Scott Baker43adf1b2014-03-19 21:54:55 -070092 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93 credentials = storage.get()
94
95 if credentials is None or credentials.invalid:
96 credentials = run(FLOW, storage)
97
98 http = httplib2.Http()
99 http = credentials.authorize(http)
100
101 service = build('bigquery', 'v2', http=http)
102
Scott Baker08172092014-03-20 15:07:06 -0700103 body = {"query": query,
Scott Bakerdb403bb2014-04-21 00:26:00 -0700104 "timeoutMs": 60000}
Scott Baker43adf1b2014-03-19 21:54:55 -0700105 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
106
107 return response
108
109 def translate_schema(self, response):
110 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -0700111 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -0700112
113 def run_query(self, query):
114 response = self.run_query_raw(query)
115
116 fieldNames = []
117 for field in response["schema"]["fields"]:
118 fieldNames.append(field["name"])
119
120 result = []
121 if "rows" in response:
122 for row in response["rows"]:
123 this_result = {}
124 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700125 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700126 result.append(this_result)
127
128 return result
129
Scott Bakerc655e662014-04-18 10:46:25 -0700130 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131 are all used for postprocessing queries. The idea is to do one query that
132 includes the ungrouped and unfiltered data, and cache it for multiple
133 consumers who will filter and group it as necessary.
134
135 TODO: Find a more generalized source for these sorts operations. Perhaps
136 put the results in SQLite and then run SQL queries against it.
137 """
138
139 def filter_results(self, rows, name, value):
140 result = [row for row in rows if row.get(name)==value]
141 return result
142
143 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
144 new_rows = {}
145 for row in rows:
146 groupby_key = [row.get(k, None) for k in groupBy]
147
148 if str(groupby_key) not in new_rows:
149 new_row = {}
150 for k in groupBy:
151 new_row[k] = row.get(k, None)
152
153 new_rows[str(groupby_key)] = new_row
154 else:
155 new_row = new_rows[str(groupby_key)]
156
157 for k in sum:
158 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
159
160 for k in avg:
161 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
163
164 for k in maxi:
165 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
166
167 for k in count:
Scott Baker58c83962014-04-24 17:04:55 -0700168 v = row.get(k,None)
169 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
170 if (v not in dl):
171 dl.append(v)
172
173 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
Scott Bakerc655e662014-04-18 10:46:25 -0700174
175 for row in new_rows.values():
176 for k in avg:
177 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178 del row["avg_base_" + k]
179
Scott Baker58c83962014-04-24 17:04:55 -0700180 for k in count:
181 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
182
Scott Bakerc655e662014-04-18 10:46:25 -0700183 return new_rows.values()
184
185 def do_computed_fields(self, rows, computed=[]):
186 computedFieldNames=[]
187 for row in rows:
188 for k in computed:
189 if "/" in k:
190 parts = k.split("/")
191 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
192 try:
193 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
194 except:
195 pass
196
197 if computedFieldName not in computedFieldNames:
198 computedFieldNames.append(computedFieldName)
199 return (computedFieldNames, rows)
200
201 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202 sum = [x.replace("%","") for x in sum]
203 count = [x.replace("%","") for x in count]
204 avg = [x.replace("%","") for x in avg]
205 computed = [x.replace("%","") for x in computed]
206 maxi = [x.replace("%","") for x in maxi]
Scott Baker3a3b4df2014-04-28 23:30:52 -0700207 groupBy = [x.replace("%","") for x in groupBy]
Scott Bakerc655e662014-04-18 10:46:25 -0700208
209 for (k,v) in filter.items():
210 rows = self.filter_results(rows, k, v)
211
Scott Baker58c83962014-04-24 17:04:55 -0700212 if rows:
213 if maxDeltaTime is not None:
214 maxTime = max([float(row["time"]) for row in rows])
215 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
Scott Bakerc655e662014-04-18 10:46:25 -0700216
217 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
218 sum = sum + computedFieldNames
Scott Baker0fd787d2014-05-13 17:03:47 -0700219 if groupBy:
220 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
Scott Bakerc655e662014-04-18 10:46:25 -0700221 return rows
222
Scott Baker43adf1b2014-03-19 21:54:55 -0700223 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700224 if not self.tableName in mappings:
225 raise MappingException("no mapping for table %s" % self.tableName)
226
227 mapping = mappings[self.tableName]
228
Scott Baker43adf1b2014-03-19 21:54:55 -0700229 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700230 if token in mapping:
231 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700232 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700233 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700234
235 def dump_table(self, rows, keys=None):
236 if not keys:
237 keys = rows[0].keys()
238
239 lens = {}
240 for key in keys:
241 lens[key] = len(key)
242
243 for row in rows:
244 for key in keys:
245 thislen = len(str(row.get(key,"")))
246 lens[key] = max(lens.get(key,0), thislen)
247
248 for key in keys:
249 print "%*s" % (lens[key], key),
250 print
251
252 for row in rows:
253 for key in keys:
254 print "%*s" % (lens[key], str(row.get(key,""))),
255 print
256
Scott Bakerba60d822014-03-27 09:12:28 -0700257 def schema_to_cols(self, schema):
258 fields = schema["fields"]
259
260 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
261
262 cols = []
263 i=0
264 for field in fields:
265 col = {"type": colTypes[field["type"]],
266 "id": "Col%d" % i,
267 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
268 cols.append(col)
269 i=i+1
270
271 return cols
272
Scott Baker43adf1b2014-03-19 21:54:55 -0700273def main():
274 bq = BigQueryAnalytics()
275
276 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
277
278 bq.dump_table(rows)
279
280if __name__ == "__main__":
281 main()