blob: cb3038a49f5fa1b6da981b606ed8160e474413a4 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
Scott Bakerc337ac32014-06-10 19:54:17 -07009import sys
Scott Baker43adf1b2014-03-19 21:54:55 -070010import time
11import traceback
12
13from apiclient.discovery import build
14from apiclient.errors import HttpError
15from oauth2client.client import AccessTokenRefreshError
16from oauth2client.client import OAuth2WebServerFlow
17from oauth2client.client import flow_from_clientsecrets
18from oauth2client.file import Storage
19from oauth2client.tools import run_flow,run
20
Scott Bakerb969c462015-02-04 16:22:05 -080021from bigquery_config import BIGQUERY_SECRETS_FN, BIGQUERY_CREDENTIALS_FN
22
Scott Baker43adf1b2014-03-19 21:54:55 -070023"""
24yum -y install python-httplib2
25easy_install python_gflags
26easy_install google_api_python_client
27"""
28
29PROJECT_NUMBER = '549187599759'
30
Scott Baker78ab1012014-03-19 23:44:39 -070031try:
Scott Bakerb969c462015-02-04 16:22:05 -080032 FLOW = flow_from_clientsecrets(BIGQUERY_SECRETS_FN,
Scott Baker78ab1012014-03-19 23:44:39 -070033 scope='https://www.googleapis.com/auth/bigquery')
Scott Bakerc337ac32014-06-10 19:54:17 -070034 BIGQUERY_AVAILABLE = True
Scott Baker78ab1012014-03-19 23:44:39 -070035except:
Scott Bakerc337ac32014-06-10 19:54:17 -070036 print >> sys.stderr, "exception while initializing bigquery flow"
Scott Baker78ab1012014-03-19 23:44:39 -070037 traceback.print_exc()
38 FLOW = None
Scott Bakerc337ac32014-06-10 19:54:17 -070039 BIGQUERY_AVAILABLE = False
Scott Baker43adf1b2014-03-19 21:54:55 -070040
41MINUTE_MS = 60*1000
42HOUR_MS = 60*60*1000
43
Scott Bakerba60d822014-03-27 09:12:28 -070044# global to hold cached mappings
45mappings = {}
46reverse_mappings = {}
47
Scott Bakerc655e662014-04-18 10:46:25 -070048def to_number(s):
49 try:
50 if "." in str(s):
51 return float(s)
52 else:
53 return int(s)
54 except:
55 return 0
56
Scott Bakerba60d822014-03-27 09:12:28 -070057class MappingException(Exception):
58 pass
59
Scott Baker43adf1b2014-03-19 21:54:55 -070060class BigQueryAnalytics:
61 def __init__(self, table = "demoevents"):
62 self.projectName = "vicci"
63 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070064
65 def reload_mapping(self):
66 global mappings, reverse_mappings
67 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
68 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070069
70 def fetch_mapping(self, m=0, table="events"):
71 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
72 resp = requests.get(req)
73 if (resp.status_code==200):
74 return resp.text
75 else:
76 raise Exception('Error accessing register allocations: %d'%resp.status_code)
77
78 def run_query_raw(self, query):
Scott Baker58c83962014-04-24 17:04:55 -070079 try:
80 file("/tmp/query_log","a").write("query %s\n" % query)
81 except:
82 pass
83
Scott Baker43adf1b2014-03-19 21:54:55 -070084 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070085
86 try:
87 query = p.sub(self.remap, query)
88 except MappingException:
89 self.reload_mapping()
90 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070091
Scott Baker58c83962014-04-24 17:04:55 -070092 try:
93 file("/tmp/query_log","a").write("remapped query %s\n" % query)
94 except:
95 pass
96
Scott Bakerb969c462015-02-04 16:22:05 -080097 storage = Storage(BIGQUERY_CREDENTIALS_FN)
Scott Baker43adf1b2014-03-19 21:54:55 -070098 credentials = storage.get()
99
100 if credentials is None or credentials.invalid:
Scott Bakerc337ac32014-06-10 19:54:17 -0700101 credentials = run(FLOW, storage)
Scott Baker43adf1b2014-03-19 21:54:55 -0700102
103 http = httplib2.Http()
104 http = credentials.authorize(http)
105
106 service = build('bigquery', 'v2', http=http)
107
Scott Baker08172092014-03-20 15:07:06 -0700108 body = {"query": query,
Scott Bakerdb403bb2014-04-21 00:26:00 -0700109 "timeoutMs": 60000}
Scott Baker43adf1b2014-03-19 21:54:55 -0700110 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
111
112 return response
113
114 def translate_schema(self, response):
115 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -0700116 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -0700117
118 def run_query(self, query):
Scott Bakerc337ac32014-06-10 19:54:17 -0700119 if not BIGQUERY_AVAILABLE:
120 print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
121 return []
122
Scott Baker43adf1b2014-03-19 21:54:55 -0700123 response = self.run_query_raw(query)
124
125 fieldNames = []
126 for field in response["schema"]["fields"]:
127 fieldNames.append(field["name"])
128
129 result = []
130 if "rows" in response:
131 for row in response["rows"]:
132 this_result = {}
133 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700134 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700135 result.append(this_result)
136
137 return result
138
Scott Bakerc655e662014-04-18 10:46:25 -0700139 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
140 are all used for postprocessing queries. The idea is to do one query that
141 includes the ungrouped and unfiltered data, and cache it for multiple
142 consumers who will filter and group it as necessary.
143
144 TODO: Find a more generalized source for these sorts operations. Perhaps
145 put the results in SQLite and then run SQL queries against it.
146 """
147
148 def filter_results(self, rows, name, value):
149 result = [row for row in rows if row.get(name)==value]
150 return result
151
152 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
153 new_rows = {}
154 for row in rows:
155 groupby_key = [row.get(k, None) for k in groupBy]
156
157 if str(groupby_key) not in new_rows:
158 new_row = {}
159 for k in groupBy:
160 new_row[k] = row.get(k, None)
161
162 new_rows[str(groupby_key)] = new_row
163 else:
164 new_row = new_rows[str(groupby_key)]
165
166 for k in sum:
167 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
168
169 for k in avg:
170 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
171 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
172
173 for k in maxi:
174 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
175
176 for k in count:
Scott Baker58c83962014-04-24 17:04:55 -0700177 v = row.get(k,None)
178 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
179 if (v not in dl):
180 dl.append(v)
181
182 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
Scott Bakerc655e662014-04-18 10:46:25 -0700183
184 for row in new_rows.values():
185 for k in avg:
186 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
187 del row["avg_base_" + k]
188
Scott Baker58c83962014-04-24 17:04:55 -0700189 for k in count:
190 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
191
Scott Bakerc655e662014-04-18 10:46:25 -0700192 return new_rows.values()
193
194 def do_computed_fields(self, rows, computed=[]):
195 computedFieldNames=[]
196 for row in rows:
197 for k in computed:
198 if "/" in k:
199 parts = k.split("/")
200 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
201 try:
202 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
203 except:
204 pass
205
206 if computedFieldName not in computedFieldNames:
207 computedFieldNames.append(computedFieldName)
208 return (computedFieldNames, rows)
209
210 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
211 sum = [x.replace("%","") for x in sum]
212 count = [x.replace("%","") for x in count]
213 avg = [x.replace("%","") for x in avg]
214 computed = [x.replace("%","") for x in computed]
215 maxi = [x.replace("%","") for x in maxi]
Scott Baker3a3b4df2014-04-28 23:30:52 -0700216 groupBy = [x.replace("%","") for x in groupBy]
Scott Bakerc655e662014-04-18 10:46:25 -0700217
218 for (k,v) in filter.items():
219 rows = self.filter_results(rows, k, v)
220
Scott Baker58c83962014-04-24 17:04:55 -0700221 if rows:
222 if maxDeltaTime is not None:
223 maxTime = max([float(row["time"]) for row in rows])
224 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
Scott Bakerc655e662014-04-18 10:46:25 -0700225
226 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
227 sum = sum + computedFieldNames
Scott Baker0fd787d2014-05-13 17:03:47 -0700228 if groupBy:
229 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
Scott Bakerc655e662014-04-18 10:46:25 -0700230 return rows
231
Scott Baker43adf1b2014-03-19 21:54:55 -0700232 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700233 if not self.tableName in mappings:
234 raise MappingException("no mapping for table %s" % self.tableName)
235
236 mapping = mappings[self.tableName]
237
Scott Baker43adf1b2014-03-19 21:54:55 -0700238 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700239 if token in mapping:
240 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700241 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700242 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700243
244 def dump_table(self, rows, keys=None):
245 if not keys:
246 keys = rows[0].keys()
247
248 lens = {}
249 for key in keys:
250 lens[key] = len(key)
251
252 for row in rows:
253 for key in keys:
254 thislen = len(str(row.get(key,"")))
255 lens[key] = max(lens.get(key,0), thislen)
256
257 for key in keys:
258 print "%*s" % (lens[key], key),
259 print
260
261 for row in rows:
262 for key in keys:
263 print "%*s" % (lens[key], str(row.get(key,""))),
264 print
265
Scott Bakerba60d822014-03-27 09:12:28 -0700266 def schema_to_cols(self, schema):
267 fields = schema["fields"]
268
269 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
270
271 cols = []
272 i=0
273 for field in fields:
274 col = {"type": colTypes[field["type"]],
275 "id": "Col%d" % i,
276 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
277 cols.append(col)
278 i=i+1
279
280 return cols
281
Scott Baker43adf1b2014-03-19 21:54:55 -0700282def main():
283 bq = BigQueryAnalytics()
284
285 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
286
287 bq.dump_table(rows)
288
289if __name__ == "__main__":
290 main()