blob: 4a90c2b67c9378861869547df0c831d34da61ba0 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
Scott Bakerc337ac32014-06-10 19:54:17 -07009import sys
Scott Baker43adf1b2014-03-19 21:54:55 -070010import time
11import traceback
12
13from apiclient.discovery import build
14from apiclient.errors import HttpError
15from oauth2client.client import AccessTokenRefreshError
16from oauth2client.client import OAuth2WebServerFlow
17from oauth2client.client import flow_from_clientsecrets
18from oauth2client.file import Storage
19from oauth2client.tools import run_flow,run
20
21"""
22yum -y install python-httplib2
23easy_install python_gflags
24easy_install google_api_python_client
25"""
26
27PROJECT_NUMBER = '549187599759'
28
Scott Baker78ab1012014-03-19 23:44:39 -070029try:
30 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
31 scope='https://www.googleapis.com/auth/bigquery')
Scott Bakerc337ac32014-06-10 19:54:17 -070032 BIGQUERY_AVAILABLE = True
Scott Baker78ab1012014-03-19 23:44:39 -070033except:
Scott Bakerc337ac32014-06-10 19:54:17 -070034 print >> sys.stderr, "exception while initializing bigquery flow"
Scott Baker78ab1012014-03-19 23:44:39 -070035 traceback.print_exc()
36 FLOW = None
Scott Bakerc337ac32014-06-10 19:54:17 -070037 BIGQUERY_AVAILABLE = False
Scott Baker43adf1b2014-03-19 21:54:55 -070038
39MINUTE_MS = 60*1000
40HOUR_MS = 60*60*1000
41
Scott Bakerba60d822014-03-27 09:12:28 -070042# global to hold cached mappings
43mappings = {}
44reverse_mappings = {}
45
Scott Bakerc655e662014-04-18 10:46:25 -070046def to_number(s):
47 try:
48 if "." in str(s):
49 return float(s)
50 else:
51 return int(s)
52 except:
53 return 0
54
Scott Bakerba60d822014-03-27 09:12:28 -070055class MappingException(Exception):
56 pass
57
Scott Baker43adf1b2014-03-19 21:54:55 -070058class BigQueryAnalytics:
59 def __init__(self, table = "demoevents"):
60 self.projectName = "vicci"
61 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070062
63 def reload_mapping(self):
64 global mappings, reverse_mappings
65 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
66 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070067
68 def fetch_mapping(self, m=0, table="events"):
69 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
70 resp = requests.get(req)
71 if (resp.status_code==200):
72 return resp.text
73 else:
74 raise Exception('Error accessing register allocations: %d'%resp.status_code)
75
76 def run_query_raw(self, query):
Scott Baker58c83962014-04-24 17:04:55 -070077 try:
78 file("/tmp/query_log","a").write("query %s\n" % query)
79 except:
80 pass
81
Scott Baker43adf1b2014-03-19 21:54:55 -070082 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070083
84 try:
85 query = p.sub(self.remap, query)
86 except MappingException:
87 self.reload_mapping()
88 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070089
Scott Baker58c83962014-04-24 17:04:55 -070090 try:
91 file("/tmp/query_log","a").write("remapped query %s\n" % query)
92 except:
93 pass
94
Scott Baker43adf1b2014-03-19 21:54:55 -070095 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
96 credentials = storage.get()
97
98 if credentials is None or credentials.invalid:
Scott Bakerc337ac32014-06-10 19:54:17 -070099 credentials = run(FLOW, storage)
Scott Baker43adf1b2014-03-19 21:54:55 -0700100
101 http = httplib2.Http()
102 http = credentials.authorize(http)
103
104 service = build('bigquery', 'v2', http=http)
105
Scott Baker08172092014-03-20 15:07:06 -0700106 body = {"query": query,
Scott Bakerdb403bb2014-04-21 00:26:00 -0700107 "timeoutMs": 60000}
Scott Baker43adf1b2014-03-19 21:54:55 -0700108 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
109
110 return response
111
112 def translate_schema(self, response):
113 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -0700114 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -0700115
116 def run_query(self, query):
Scott Bakerc337ac32014-06-10 19:54:17 -0700117 if not BIGQUERY_AVAILABLE:
118 print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result."
119 return []
120
Scott Baker43adf1b2014-03-19 21:54:55 -0700121 response = self.run_query_raw(query)
122
123 fieldNames = []
124 for field in response["schema"]["fields"]:
125 fieldNames.append(field["name"])
126
127 result = []
128 if "rows" in response:
129 for row in response["rows"]:
130 this_result = {}
131 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700132 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700133 result.append(this_result)
134
135 return result
136
Scott Bakerc655e662014-04-18 10:46:25 -0700137 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
138 are all used for postprocessing queries. The idea is to do one query that
139 includes the ungrouped and unfiltered data, and cache it for multiple
140 consumers who will filter and group it as necessary.
141
142 TODO: Find a more generalized source for these sorts operations. Perhaps
143 put the results in SQLite and then run SQL queries against it.
144 """
145
146 def filter_results(self, rows, name, value):
147 result = [row for row in rows if row.get(name)==value]
148 return result
149
150 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
151 new_rows = {}
152 for row in rows:
153 groupby_key = [row.get(k, None) for k in groupBy]
154
155 if str(groupby_key) not in new_rows:
156 new_row = {}
157 for k in groupBy:
158 new_row[k] = row.get(k, None)
159
160 new_rows[str(groupby_key)] = new_row
161 else:
162 new_row = new_rows[str(groupby_key)]
163
164 for k in sum:
165 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
166
167 for k in avg:
168 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
169 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
170
171 for k in maxi:
172 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
173
174 for k in count:
Scott Baker58c83962014-04-24 17:04:55 -0700175 v = row.get(k,None)
176 dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, [])
177 if (v not in dl):
178 dl.append(v)
179
180 #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
Scott Bakerc655e662014-04-18 10:46:25 -0700181
182 for row in new_rows.values():
183 for k in avg:
184 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
185 del row["avg_base_" + k]
186
Scott Baker58c83962014-04-24 17:04:55 -0700187 for k in count:
188 new_row["count_" + k] = len(new_row.get("distinct_" + k, []))
189
Scott Bakerc655e662014-04-18 10:46:25 -0700190 return new_rows.values()
191
192 def do_computed_fields(self, rows, computed=[]):
193 computedFieldNames=[]
194 for row in rows:
195 for k in computed:
196 if "/" in k:
197 parts = k.split("/")
198 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
199 try:
200 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
201 except:
202 pass
203
204 if computedFieldName not in computedFieldNames:
205 computedFieldNames.append(computedFieldName)
206 return (computedFieldNames, rows)
207
208 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
209 sum = [x.replace("%","") for x in sum]
210 count = [x.replace("%","") for x in count]
211 avg = [x.replace("%","") for x in avg]
212 computed = [x.replace("%","") for x in computed]
213 maxi = [x.replace("%","") for x in maxi]
Scott Baker3a3b4df2014-04-28 23:30:52 -0700214 groupBy = [x.replace("%","") for x in groupBy]
Scott Bakerc655e662014-04-18 10:46:25 -0700215
216 for (k,v) in filter.items():
217 rows = self.filter_results(rows, k, v)
218
Scott Baker58c83962014-04-24 17:04:55 -0700219 if rows:
220 if maxDeltaTime is not None:
221 maxTime = max([float(row["time"]) for row in rows])
222 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
Scott Bakerc655e662014-04-18 10:46:25 -0700223
224 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
225 sum = sum + computedFieldNames
Scott Baker0fd787d2014-05-13 17:03:47 -0700226 if groupBy:
227 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
Scott Bakerc655e662014-04-18 10:46:25 -0700228 return rows
229
Scott Baker43adf1b2014-03-19 21:54:55 -0700230 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700231 if not self.tableName in mappings:
232 raise MappingException("no mapping for table %s" % self.tableName)
233
234 mapping = mappings[self.tableName]
235
Scott Baker43adf1b2014-03-19 21:54:55 -0700236 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700237 if token in mapping:
238 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700239 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700240 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700241
242 def dump_table(self, rows, keys=None):
243 if not keys:
244 keys = rows[0].keys()
245
246 lens = {}
247 for key in keys:
248 lens[key] = len(key)
249
250 for row in rows:
251 for key in keys:
252 thislen = len(str(row.get(key,"")))
253 lens[key] = max(lens.get(key,0), thislen)
254
255 for key in keys:
256 print "%*s" % (lens[key], key),
257 print
258
259 for row in rows:
260 for key in keys:
261 print "%*s" % (lens[key], str(row.get(key,""))),
262 print
263
Scott Bakerba60d822014-03-27 09:12:28 -0700264 def schema_to_cols(self, schema):
265 fields = schema["fields"]
266
267 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
268
269 cols = []
270 i=0
271 for field in fields:
272 col = {"type": colTypes[field["type"]],
273 "id": "Col%d" % i,
274 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
275 cols.append(col)
276 i=i+1
277
278 return cols
279
Scott Baker43adf1b2014-03-19 21:54:55 -0700280def main():
281 bq = BigQueryAnalytics()
282
283 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
284
285 bq.dump_table(rows)
286
287if __name__ == "__main__":
288 main()