blob: 4e375df329472a7d158064a7bb186c4351913c05 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
Scott Bakerba60d822014-03-27 09:12:28 -070039# global to hold cached mappings
40mappings = {}
41reverse_mappings = {}
42
Scott Bakerc655e662014-04-18 10:46:25 -070043def to_number(s):
44 try:
45 if "." in str(s):
46 return float(s)
47 else:
48 return int(s)
49 except:
50 return 0
51
Scott Bakerba60d822014-03-27 09:12:28 -070052class MappingException(Exception):
53 pass
54
Scott Baker43adf1b2014-03-19 21:54:55 -070055class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070059
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070064
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
69 return resp.text
70 else:
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73 def run_query_raw(self, query):
Scott Baker58c83962014-04-24 17:04:55 -070074 try:
75 file("/tmp/query_log","a").write("query %s\n" % query)
76 except:
77 pass
78
Scott Baker43adf1b2014-03-19 21:54:55 -070079 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070080
81 try:
82 query = p.sub(self.remap, query)
83 except MappingException:
84 self.reload_mapping()
85 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070086
Scott Baker58c83962014-04-24 17:04:55 -070087 try:
88 file("/tmp/query_log","a").write("remapped query %s\n" % query)
89 except:
90 pass
91
Scott Baker43adf1b2014-03-19 21:54:55 -070092 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
93 credentials = storage.get()
94
95 if credentials is None or credentials.invalid:
96 credentials = run(FLOW, storage)
97
98 http = httplib2.Http()
99 http = credentials.authorize(http)
100
101 service = build('bigquery', 'v2', http=http)
102
Scott Baker08172092014-03-20 15:07:06 -0700103 body = {"query": query,
Scott Bakerdb403bb2014-04-21 00:26:00 -0700104 "timeoutMs": 60000}
Scott Baker43adf1b2014-03-19 21:54:55 -0700105 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
106
107 return response
108
109 def translate_schema(self, response):
110 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -0700111 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -0700112
113 def run_query(self, query):
114 response = self.run_query_raw(query)
115
116 fieldNames = []
117 for field in response["schema"]["fields"]:
118 fieldNames.append(field["name"])
119
120 result = []
121 if "rows" in response:
122 for row in response["rows"]:
123 this_result = {}
124 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700125 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700126 result.append(this_result)
127
128 return result
129
Scott Bakerc655e662014-04-18 10:46:25 -0700130 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
131 are all used for postprocessing queries. The idea is to do one query that
132 includes the ungrouped and unfiltered data, and cache it for multiple
133 consumers who will filter and group it as necessary.
134
135 TODO: Find a more generalized source for these sorts operations. Perhaps
136 put the results in SQLite and then run SQL queries against it.
137 """
138
139 def filter_results(self, rows, name, value):
140 result = [row for row in rows if row.get(name)==value]
141 return result
142
143 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
144 new_rows = {}
145 for row in rows:
146 groupby_key = [row.get(k, None) for k in groupBy]
147
148 if str(groupby_key) not in new_rows:
149 new_row = {}
150 for k in groupBy:
151 new_row[k] = row.get(k, None)
152
153 new_rows[str(groupby_key)] = new_row
154 else:
155 new_row = new_rows[str(groupby_key)]
156
157 for k in sum:
158 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
159
160 for k in avg:
161 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
162 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
163
164 for k in maxi:
165 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
166
167 for k in count:
Scott Baker58c83962014-04-24 17:04:55 -0700168 v = row.get(k,None)
169 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
170 if (v not in dl):
171 dl.append(v)
172
173 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
Scott Bakerc655e662014-04-18 10:46:25 -0700174
175 for row in new_rows.values():
176 for k in avg:
177 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
178 del row["avg_base_" + k]
179
Scott Baker58c83962014-04-24 17:04:55 -0700180 for k in count:
181 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
182
Scott Bakerc655e662014-04-18 10:46:25 -0700183 return new_rows.values()
184
185 def do_computed_fields(self, rows, computed=[]):
186 computedFieldNames=[]
187 for row in rows:
188 for k in computed:
189 if "/" in k:
190 parts = k.split("/")
191 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
192 try:
193 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
194 except:
195 pass
196
197 if computedFieldName not in computedFieldNames:
198 computedFieldNames.append(computedFieldName)
199 return (computedFieldNames, rows)
200
201 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
202 sum = [x.replace("%","") for x in sum]
203 count = [x.replace("%","") for x in count]
204 avg = [x.replace("%","") for x in avg]
205 computed = [x.replace("%","") for x in computed]
206 maxi = [x.replace("%","") for x in maxi]
Scott Baker3a3b4df2014-04-28 23:30:52 -0700207 groupBy = [x.replace("%","") for x in groupBy]
Scott Bakerc655e662014-04-18 10:46:25 -0700208
209 for (k,v) in filter.items():
210 rows = self.filter_results(rows, k, v)
211
Scott Baker58c83962014-04-24 17:04:55 -0700212 if rows:
213 if maxDeltaTime is not None:
214 maxTime = max([float(row["time"]) for row in rows])
215 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
Scott Bakerc655e662014-04-18 10:46:25 -0700216
217 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
218 sum = sum + computedFieldNames
219 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
220 return rows
221
Scott Baker43adf1b2014-03-19 21:54:55 -0700222 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700223 if not self.tableName in mappings:
224 raise MappingException("no mapping for table %s" % self.tableName)
225
226 mapping = mappings[self.tableName]
227
Scott Baker43adf1b2014-03-19 21:54:55 -0700228 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700229 if token in mapping:
230 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700231 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700232 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700233
234 def dump_table(self, rows, keys=None):
235 if not keys:
236 keys = rows[0].keys()
237
238 lens = {}
239 for key in keys:
240 lens[key] = len(key)
241
242 for row in rows:
243 for key in keys:
244 thislen = len(str(row.get(key,"")))
245 lens[key] = max(lens.get(key,0), thislen)
246
247 for key in keys:
248 print "%*s" % (lens[key], key),
249 print
250
251 for row in rows:
252 for key in keys:
253 print "%*s" % (lens[key], str(row.get(key,""))),
254 print
255
Scott Bakerba60d822014-03-27 09:12:28 -0700256 def schema_to_cols(self, schema):
257 fields = schema["fields"]
258
259 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
260
261 cols = []
262 i=0
263 for field in fields:
264 col = {"type": colTypes[field["type"]],
265 "id": "Col%d" % i,
266 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
267 cols.append(col)
268 i=i+1
269
270 return cols
271
Scott Baker43adf1b2014-03-19 21:54:55 -0700272def main():
273 bq = BigQueryAnalytics()
274
275 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
276
277 bq.dump_table(rows)
278
279if __name__ == "__main__":
280 main()