Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 1 | import re |
| 2 | import base64 |
| 3 | import requests |
| 4 | import urllib |
| 5 | import json |
| 6 | import httplib2 |
| 7 | import threading |
| 8 | import os |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 9 | import sys |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 10 | import time |
| 11 | import traceback |
| 12 | |
| 13 | from apiclient.discovery import build |
| 14 | from apiclient.errors import HttpError |
| 15 | from oauth2client.client import AccessTokenRefreshError |
| 16 | from oauth2client.client import OAuth2WebServerFlow |
| 17 | from oauth2client.client import flow_from_clientsecrets |
| 18 | from oauth2client.file import Storage |
| 19 | from oauth2client.tools import run_flow,run |
| 20 | |
Scott Baker | b969c46 | 2015-02-04 16:22:05 -0800 | [diff] [blame] | 21 | from bigquery_config import BIGQUERY_SECRETS_FN, BIGQUERY_CREDENTIALS_FN |
| 22 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 23 | """ |
| 24 | yum -y install python-httplib2 |
| 25 | easy_install python_gflags |
| 26 | easy_install google_api_python_client |
| 27 | """ |
| 28 | |
| 29 | PROJECT_NUMBER = '549187599759' |
| 30 | |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 31 | try: |
Scott Baker | b969c46 | 2015-02-04 16:22:05 -0800 | [diff] [blame] | 32 | FLOW = flow_from_clientsecrets(BIGQUERY_SECRETS_FN, |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 33 | scope='https://www.googleapis.com/auth/bigquery') |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 34 | BIGQUERY_AVAILABLE = True |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 35 | except: |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 36 | print >> sys.stderr, "exception while initializing bigquery flow" |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 37 | traceback.print_exc() |
| 38 | FLOW = None |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 39 | BIGQUERY_AVAILABLE = False |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 40 | |
| 41 | MINUTE_MS = 60*1000 |
| 42 | HOUR_MS = 60*60*1000 |
| 43 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 44 | # global to hold cached mappings |
| 45 | mappings = {} |
| 46 | reverse_mappings = {} |
| 47 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 48 | def to_number(s): |
| 49 | try: |
| 50 | if "." in str(s): |
| 51 | return float(s) |
| 52 | else: |
| 53 | return int(s) |
| 54 | except: |
| 55 | return 0 |
| 56 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 57 | class MappingException(Exception): |
| 58 | pass |
| 59 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 60 | class BigQueryAnalytics: |
| 61 | def __init__(self, table = "demoevents"): |
| 62 | self.projectName = "vicci" |
| 63 | self.tableName = table |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 64 | |
| 65 | def reload_mapping(self): |
| 66 | global mappings, reverse_mappings |
| 67 | mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName)) |
| 68 | reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 69 | |
| 70 | def fetch_mapping(self, m=0, table="events"): |
| 71 | req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table) |
| 72 | resp = requests.get(req) |
| 73 | if (resp.status_code==200): |
| 74 | return resp.text |
| 75 | else: |
| 76 | raise Exception('Error accessing register allocations: %d'%resp.status_code) |
| 77 | |
| 78 | def run_query_raw(self, query): |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 79 | try: |
| 80 | file("/tmp/query_log","a").write("query %s\n" % query) |
| 81 | except: |
| 82 | pass |
| 83 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 84 | p = re.compile('%[a-zA-z_]*') |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 85 | |
| 86 | try: |
| 87 | query = p.sub(self.remap, query) |
| 88 | except MappingException: |
| 89 | self.reload_mapping() |
| 90 | query = p.sub(self.remap, query) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 91 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 92 | try: |
| 93 | file("/tmp/query_log","a").write("remapped query %s\n" % query) |
| 94 | except: |
| 95 | pass |
| 96 | |
Scott Baker | b969c46 | 2015-02-04 16:22:05 -0800 | [diff] [blame] | 97 | storage = Storage(BIGQUERY_CREDENTIALS_FN) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 98 | credentials = storage.get() |
| 99 | |
| 100 | if credentials is None or credentials.invalid: |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 101 | credentials = run(FLOW, storage) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 102 | |
| 103 | http = httplib2.Http() |
| 104 | http = credentials.authorize(http) |
| 105 | |
| 106 | service = build('bigquery', 'v2', http=http) |
| 107 | |
Scott Baker | 0817209 | 2014-03-20 15:07:06 -0700 | [diff] [blame] | 108 | body = {"query": query, |
Scott Baker | db403bb | 2014-04-21 00:26:00 -0700 | [diff] [blame] | 109 | "timeoutMs": 60000} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 110 | response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute() |
| 111 | |
| 112 | return response |
| 113 | |
| 114 | def translate_schema(self, response): |
| 115 | for field in response["schema"]["fields"]: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 116 | field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"]) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 117 | |
| 118 | def run_query(self, query): |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 119 | if not BIGQUERY_AVAILABLE: |
| 120 | print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result." |
| 121 | return [] |
| 122 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 123 | response = self.run_query_raw(query) |
| 124 | |
| 125 | fieldNames = [] |
| 126 | for field in response["schema"]["fields"]: |
| 127 | fieldNames.append(field["name"]) |
| 128 | |
| 129 | result = [] |
| 130 | if "rows" in response: |
| 131 | for row in response["rows"]: |
| 132 | this_result = {} |
| 133 | for (i,column) in enumerate(row["f"]): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 134 | this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 135 | result.append(this_result) |
| 136 | |
| 137 | return result |
| 138 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 139 | """ Filter_results, groupby_results, do_computed_fields, and postprocess_results |
| 140 | are all used for postprocessing queries. The idea is to do one query that |
| 141 | includes the ungrouped and unfiltered data, and cache it for multiple |
| 142 | consumers who will filter and group it as necessary. |
| 143 | |
| 144 | TODO: Find a more generalized source for these sorts operations. Perhaps |
| 145 | put the results in SQLite and then run SQL queries against it. |
| 146 | """ |
| 147 | |
| 148 | def filter_results(self, rows, name, value): |
| 149 | result = [row for row in rows if row.get(name)==value] |
| 150 | return result |
| 151 | |
| 152 | def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]): |
| 153 | new_rows = {} |
| 154 | for row in rows: |
| 155 | groupby_key = [row.get(k, None) for k in groupBy] |
| 156 | |
| 157 | if str(groupby_key) not in new_rows: |
| 158 | new_row = {} |
| 159 | for k in groupBy: |
| 160 | new_row[k] = row.get(k, None) |
| 161 | |
| 162 | new_rows[str(groupby_key)] = new_row |
| 163 | else: |
| 164 | new_row = new_rows[str(groupby_key)] |
| 165 | |
| 166 | for k in sum: |
| 167 | new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0)) |
| 168 | |
| 169 | for k in avg: |
| 170 | new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0)) |
| 171 | new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1 |
| 172 | |
| 173 | for k in maxi: |
| 174 | new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0))) |
| 175 | |
| 176 | for k in count: |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 177 | v = row.get(k,None) |
| 178 | dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, []) |
| 179 | if (v not in dl): |
| 180 | dl.append(v) |
| 181 | |
| 182 | #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1 |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 183 | |
| 184 | for row in new_rows.values(): |
| 185 | for k in avg: |
| 186 | row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k] |
| 187 | del row["avg_base_" + k] |
| 188 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 189 | for k in count: |
| 190 | new_row["count_" + k] = len(new_row.get("distinct_" + k, [])) |
| 191 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 192 | return new_rows.values() |
| 193 | |
| 194 | def do_computed_fields(self, rows, computed=[]): |
| 195 | computedFieldNames=[] |
| 196 | for row in rows: |
| 197 | for k in computed: |
| 198 | if "/" in k: |
| 199 | parts = k.split("/") |
| 200 | computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","") |
| 201 | try: |
| 202 | row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]]) |
| 203 | except: |
| 204 | pass |
| 205 | |
| 206 | if computedFieldName not in computedFieldNames: |
| 207 | computedFieldNames.append(computedFieldName) |
| 208 | return (computedFieldNames, rows) |
| 209 | |
| 210 | def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None): |
| 211 | sum = [x.replace("%","") for x in sum] |
| 212 | count = [x.replace("%","") for x in count] |
| 213 | avg = [x.replace("%","") for x in avg] |
| 214 | computed = [x.replace("%","") for x in computed] |
| 215 | maxi = [x.replace("%","") for x in maxi] |
Scott Baker | 3a3b4df | 2014-04-28 23:30:52 -0700 | [diff] [blame] | 216 | groupBy = [x.replace("%","") for x in groupBy] |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 217 | |
| 218 | for (k,v) in filter.items(): |
| 219 | rows = self.filter_results(rows, k, v) |
| 220 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 221 | if rows: |
| 222 | if maxDeltaTime is not None: |
| 223 | maxTime = max([float(row["time"]) for row in rows]) |
| 224 | rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime] |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 225 | |
| 226 | (computedFieldNames, rows) = self.do_computed_fields(rows, computed) |
| 227 | sum = sum + computedFieldNames |
Scott Baker | 0fd787d | 2014-05-13 17:03:47 -0700 | [diff] [blame] | 228 | if groupBy: |
| 229 | rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi) |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 230 | return rows |
| 231 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 232 | def remap(self, match): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 233 | if not self.tableName in mappings: |
| 234 | raise MappingException("no mapping for table %s" % self.tableName) |
| 235 | |
| 236 | mapping = mappings[self.tableName] |
| 237 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 238 | token = match.group()[1:] |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 239 | if token in mapping: |
| 240 | return mapping[token] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 241 | else: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 242 | raise MappingException('unknown token %s' % token) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 243 | |
| 244 | def dump_table(self, rows, keys=None): |
| 245 | if not keys: |
| 246 | keys = rows[0].keys() |
| 247 | |
| 248 | lens = {} |
| 249 | for key in keys: |
| 250 | lens[key] = len(key) |
| 251 | |
| 252 | for row in rows: |
| 253 | for key in keys: |
| 254 | thislen = len(str(row.get(key,""))) |
| 255 | lens[key] = max(lens.get(key,0), thislen) |
| 256 | |
| 257 | for key in keys: |
| 258 | print "%*s" % (lens[key], key), |
| 259 | print |
| 260 | |
| 261 | for row in rows: |
| 262 | for key in keys: |
| 263 | print "%*s" % (lens[key], str(row.get(key,""))), |
| 264 | print |
| 265 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 266 | def schema_to_cols(self, schema): |
| 267 | fields = schema["fields"] |
| 268 | |
| 269 | colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"} |
| 270 | |
| 271 | cols = [] |
| 272 | i=0 |
| 273 | for field in fields: |
| 274 | col = {"type": colTypes[field["type"]], |
| 275 | "id": "Col%d" % i, |
| 276 | "label": reverse_mappings[self.tableName].get(field["name"],field["name"])} |
| 277 | cols.append(col) |
| 278 | i=i+1 |
| 279 | |
| 280 | return cols |
| 281 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 282 | def main(): |
| 283 | bq = BigQueryAnalytics() |
| 284 | |
| 285 | rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname") |
| 286 | |
| 287 | bq.dump_table(rows) |
| 288 | |
| 289 | if __name__ == "__main__": |
| 290 | main() |