blob: dafb55c801d95245003418834e54c54c57072392 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
Scott Bakerba60d822014-03-27 09:12:28 -070039# global to hold cached mappings
40mappings = {}
41reverse_mappings = {}
42
Scott Bakerc655e662014-04-18 10:46:25 -070043def to_number(s):
44 try:
45 if "." in str(s):
46 return float(s)
47 else:
48 return int(s)
49 except:
50 return 0
51
Scott Bakerba60d822014-03-27 09:12:28 -070052class MappingException(Exception):
53 pass
54
Scott Baker43adf1b2014-03-19 21:54:55 -070055class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070059
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070064
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
69 return resp.text
70 else:
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73 def run_query_raw(self, query):
Scott Baker58c83962014-04-24 17:04:55 -070074 try:
75 file("/tmp/query_log","a").write("query %s\n" % query)
76 except:
77 pass
78
Scott Baker43adf1b2014-03-19 21:54:55 -070079 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070080
81 try:
82 query = p.sub(self.remap, query)
83 except MappingException:
84 self.reload_mapping()
85 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070086
Scott Baker58c83962014-04-24 17:04:55 -070087 try:
88 file("/tmp/query_log","a").write("remapped query %s\n" % query)
89 except:
90 pass
91
Scott Baker43adf1b2014-03-19 21:54:55 -070092 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93 credentials = storage.get()
94
95 if credentials is None or credentials.invalid:
96 credentials = run(FLOW, storage)
97
98 http = httplib2.Http()
99 http = credentials.authorize(http)
100
101 service = build('bigquery', 'v2', http=http)
102
Scott Baker08172092014-03-20 15:07:06 -0700103 body = {"query": query,
Scott Bakerdb403bb2014-04-21 00:26:00 -0700104 "timeoutMs": 60000}
Scott Baker43adf1b2014-03-19 21:54:55 -0700105 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
106
107 return response
108
109 def translate_schema(self, response):
110 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -0700111 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -0700112
113 def run_query(self, query):
114 response = self.run_query_raw(query)
115
116 fieldNames = []
117 for field in response["schema"]["fields"]:
118 fieldNames.append(field["name"])
119
120 result = []
121 if "rows" in response:
122 for row in response["rows"]:
123 this_result = {}
124 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700125 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700126 result.append(this_result)
127
128 return result
129
Scott Bakerc655e662014-04-18 10:46:25 -0700130 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131 are all used for postprocessing queries. The idea is to do one query that
132 includes the ungrouped and unfiltered data, and cache it for multiple
133 consumers who will filter and group it as necessary.
134
135 TODO: Find a more generalized source for these sorts operations. Perhaps
136 put the results in SQLite and then run SQL queries against it.
137 """
138
139 def filter_results(self, rows, name, value):
140 result = [row for row in rows if row.get(name)==value]
141 return result
142
143 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
144 new_rows = {}
145 for row in rows:
146 groupby_key = [row.get(k, None) for k in groupBy]
147
148 if str(groupby_key) not in new_rows:
149 new_row = {}
150 for k in groupBy:
151 new_row[k] = row.get(k, None)
152
153 new_rows[str(groupby_key)] = new_row
154 else:
155 new_row = new_rows[str(groupby_key)]
156
157 for k in sum:
158 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
159
160 for k in avg:
161 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
163
164 for k in maxi:
165 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
166
167 for k in count:
Scott Baker58c83962014-04-24 17:04:55 -0700168 v = row.get(k,None)
169 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
170 if (v not in dl):
171 dl.append(v)
172
173 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
Scott Bakerc655e662014-04-18 10:46:25 -0700174
175 for row in new_rows.values():
176 for k in avg:
177 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178 del row["avg_base_" + k]
179
Scott Baker58c83962014-04-24 17:04:55 -0700180 for k in count:
181 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
182
Scott Bakerc655e662014-04-18 10:46:25 -0700183 return new_rows.values()
184
185 def do_computed_fields(self, rows, computed=[]):
186 computedFieldNames=[]
187 for row in rows:
188 for k in computed:
189 if "/" in k:
190 parts = k.split("/")
191 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
192 try:
193 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
194 except:
195 pass
196
197 if computedFieldName not in computedFieldNames:
198 computedFieldNames.append(computedFieldName)
199 return (computedFieldNames, rows)
200
201 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202 sum = [x.replace("%","") for x in sum]
203 count = [x.replace("%","") for x in count]
204 avg = [x.replace("%","") for x in avg]
205 computed = [x.replace("%","") for x in computed]
206 maxi = [x.replace("%","") for x in maxi]
207
208 for (k,v) in filter.items():
209 rows = self.filter_results(rows, k, v)
210
Scott Baker58c83962014-04-24 17:04:55 -0700211 if rows:
212 if maxDeltaTime is not None:
213 maxTime = max([float(row["time"]) for row in rows])
214 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
Scott Bakerc655e662014-04-18 10:46:25 -0700215
216 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
217 sum = sum + computedFieldNames
218 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
219 return rows
220
Scott Baker43adf1b2014-03-19 21:54:55 -0700221 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700222 if not self.tableName in mappings:
223 raise MappingException("no mapping for table %s" % self.tableName)
224
225 mapping = mappings[self.tableName]
226
Scott Baker43adf1b2014-03-19 21:54:55 -0700227 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700228 if token in mapping:
229 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700230 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700231 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700232
233 def dump_table(self, rows, keys=None):
234 if not keys:
235 keys = rows[0].keys()
236
237 lens = {}
238 for key in keys:
239 lens[key] = len(key)
240
241 for row in rows:
242 for key in keys:
243 thislen = len(str(row.get(key,"")))
244 lens[key] = max(lens.get(key,0), thislen)
245
246 for key in keys:
247 print "%*s" % (lens[key], key),
248 print
249
250 for row in rows:
251 for key in keys:
252 print "%*s" % (lens[key], str(row.get(key,""))),
253 print
254
Scott Bakerba60d822014-03-27 09:12:28 -0700255 def schema_to_cols(self, schema):
256 fields = schema["fields"]
257
258 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
259
260 cols = []
261 i=0
262 for field in fields:
263 col = {"type": colTypes[field["type"]],
264 "id": "Col%d" % i,
265 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
266 cols.append(col)
267 i=i+1
268
269 return cols
270
Scott Baker43adf1b2014-03-19 21:54:55 -0700271def main():
272 bq = BigQueryAnalytics()
273
274 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
275
276 bq.dump_table(rows)
277
278if __name__ == "__main__":
279 main()