blob: 2e65707bed12a3cd5b8aa8452b88672c795f73d3 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
Scott Bakerba60d822014-03-27 09:12:28 -070039# global to hold cached mappings
40mappings = {}
41reverse_mappings = {}
42
Scott Bakerc655e662014-04-18 10:46:25 -070043def to_number(s):
44 try:
45 if "." in str(s):
46 return float(s)
47 else:
48 return int(s)
49 except:
50 return 0
51
Scott Bakerba60d822014-03-27 09:12:28 -070052class MappingException(Exception):
53 pass
54
Scott Baker43adf1b2014-03-19 21:54:55 -070055class BigQueryAnalytics:
56 def __init__(self, table = "demoevents"):
57 self.projectName = "vicci"
58 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070059
60 def reload_mapping(self):
61 global mappings, reverse_mappings
62 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
63 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070064
65 def fetch_mapping(self, m=0, table="events"):
66 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
67 resp = requests.get(req)
68 if (resp.status_code==200):
69 return resp.text
70 else:
71 raise Exception('Error accessing register allocations: %d'%resp.status_code)
72
73 def run_query_raw(self, query):
74 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070075
76 try:
77 query = p.sub(self.remap, query)
78 except MappingException:
79 self.reload_mapping()
80 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070081
82 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
83 credentials = storage.get()
84
85 if credentials is None or credentials.invalid:
86 credentials = run(FLOW, storage)
87
88 http = httplib2.Http()
89 http = credentials.authorize(http)
90
91 service = build('bigquery', 'v2', http=http)
92
Scott Baker08172092014-03-20 15:07:06 -070093 body = {"query": query,
94 "timeoutMs": 30000}
Scott Baker43adf1b2014-03-19 21:54:55 -070095 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
96
97 return response
98
99 def translate_schema(self, response):
100 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -0700101 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -0700102
103 def run_query(self, query):
104 response = self.run_query_raw(query)
105
106 fieldNames = []
107 for field in response["schema"]["fields"]:
108 fieldNames.append(field["name"])
109
110 result = []
111 if "rows" in response:
112 for row in response["rows"]:
113 this_result = {}
114 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700115 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700116 result.append(this_result)
117
118 return result
119
Scott Bakerc655e662014-04-18 10:46:25 -0700120 """ Filter_results, groupby_results, do_computed_fields, and postprocess_results
121 are all used for postprocessing queries. The idea is to do one query that
122 includes the ungrouped and unfiltered data, and cache it for multiple
123 consumers who will filter and group it as necessary.
124
125 TODO: Find a more generalized source for these sorts operations. Perhaps
126 put the results in SQLite and then run SQL queries against it.
127 """
128
129 def filter_results(self, rows, name, value):
130 result = [row for row in rows if row.get(name)==value]
131 return result
132
133 def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]):
134 new_rows = {}
135 for row in rows:
136 groupby_key = [row.get(k, None) for k in groupBy]
137
138 if str(groupby_key) not in new_rows:
139 new_row = {}
140 for k in groupBy:
141 new_row[k] = row.get(k, None)
142
143 new_rows[str(groupby_key)] = new_row
144 else:
145 new_row = new_rows[str(groupby_key)]
146
147 for k in sum:
148 new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0))
149
150 for k in avg:
151 new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0))
152 new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1
153
154 for k in maxi:
155 new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0)))
156
157 for k in count:
158 new_row["count_" + k] = new_row.get("count_" + k, 0) + 1
159
160 for row in new_rows.values():
161 for k in avg:
162 row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k]
163 del row["avg_base_" + k]
164
165 return new_rows.values()
166
167 def do_computed_fields(self, rows, computed=[]):
168 computedFieldNames=[]
169 for row in rows:
170 for k in computed:
171 if "/" in k:
172 parts = k.split("/")
173 computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
174 try:
175 row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]])
176 except:
177 pass
178
179 if computedFieldName not in computedFieldNames:
180 computedFieldNames.append(computedFieldName)
181 return (computedFieldNames, rows)
182
183 def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None):
184 sum = [x.replace("%","") for x in sum]
185 count = [x.replace("%","") for x in count]
186 avg = [x.replace("%","") for x in avg]
187 computed = [x.replace("%","") for x in computed]
188 maxi = [x.replace("%","") for x in maxi]
189
190 for (k,v) in filter.items():
191 rows = self.filter_results(rows, k, v)
192
193 if maxDeltaTime is not None:
194 maxTime = max([float(row["time"]) for row in rows])
195 rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime]
196
197 (computedFieldNames, rows) = self.do_computed_fields(rows, computed)
198 sum = sum + computedFieldNames
199 rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi)
200 return rows
201
Scott Baker43adf1b2014-03-19 21:54:55 -0700202 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700203 if not self.tableName in mappings:
204 raise MappingException("no mapping for table %s" % self.tableName)
205
206 mapping = mappings[self.tableName]
207
Scott Baker43adf1b2014-03-19 21:54:55 -0700208 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700209 if token in mapping:
210 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700211 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700212 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700213
214 def dump_table(self, rows, keys=None):
215 if not keys:
216 keys = rows[0].keys()
217
218 lens = {}
219 for key in keys:
220 lens[key] = len(key)
221
222 for row in rows:
223 for key in keys:
224 thislen = len(str(row.get(key,"")))
225 lens[key] = max(lens.get(key,0), thislen)
226
227 for key in keys:
228 print "%*s" % (lens[key], key),
229 print
230
231 for row in rows:
232 for key in keys:
233 print "%*s" % (lens[key], str(row.get(key,""))),
234 print
235
Scott Bakerba60d822014-03-27 09:12:28 -0700236 def schema_to_cols(self, schema):
237 fields = schema["fields"]
238
239 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
240
241 cols = []
242 i=0
243 for field in fields:
244 col = {"type": colTypes[field["type"]],
245 "id": "Col%d" % i,
246 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
247 cols.append(col)
248 i=i+1
249
250 return cols
251
Scott Baker43adf1b2014-03-19 21:54:55 -0700252def main():
253 bq = BigQueryAnalytics()
254
255 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
256
257 bq.dump_table(rows)
258
259if __name__ == "__main__":
260 main()