Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 1 | import re |
| 2 | import base64 |
| 3 | import requests |
| 4 | import urllib |
| 5 | import json |
| 6 | import httplib2 |
| 7 | import threading |
| 8 | import os |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 9 | import sys |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 10 | import time |
| 11 | import traceback |
| 12 | |
| 13 | from apiclient.discovery import build |
| 14 | from apiclient.errors import HttpError |
| 15 | from oauth2client.client import AccessTokenRefreshError |
| 16 | from oauth2client.client import OAuth2WebServerFlow |
| 17 | from oauth2client.client import flow_from_clientsecrets |
| 18 | from oauth2client.file import Storage |
| 19 | from oauth2client.tools import run_flow,run |
| 20 | |
| 21 | """ |
| 22 | yum -y install python-httplib2 |
| 23 | easy_install python_gflags |
| 24 | easy_install google_api_python_client |
| 25 | """ |
| 26 | |
| 27 | PROJECT_NUMBER = '549187599759' |
| 28 | |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 29 | try: |
| 30 | FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json', |
| 31 | scope='https://www.googleapis.com/auth/bigquery') |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 32 | BIGQUERY_AVAILABLE = True |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 33 | except: |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 34 | print >> sys.stderr, "exception while initializing bigquery flow" |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 35 | traceback.print_exc() |
| 36 | FLOW = None |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 37 | BIGQUERY_AVAILABLE = False |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 38 | |
| 39 | MINUTE_MS = 60*1000 |
| 40 | HOUR_MS = 60*60*1000 |
| 41 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 42 | # global to hold cached mappings |
| 43 | mappings = {} |
| 44 | reverse_mappings = {} |
| 45 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 46 | def to_number(s): |
| 47 | try: |
| 48 | if "." in str(s): |
| 49 | return float(s) |
| 50 | else: |
| 51 | return int(s) |
| 52 | except: |
| 53 | return 0 |
| 54 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 55 | class MappingException(Exception): |
| 56 | pass |
| 57 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 58 | class BigQueryAnalytics: |
| 59 | def __init__(self, table = "demoevents"): |
| 60 | self.projectName = "vicci" |
| 61 | self.tableName = table |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 62 | |
| 63 | def reload_mapping(self): |
| 64 | global mappings, reverse_mappings |
| 65 | mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName)) |
| 66 | reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 67 | |
| 68 | def fetch_mapping(self, m=0, table="events"): |
| 69 | req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table) |
| 70 | resp = requests.get(req) |
| 71 | if (resp.status_code==200): |
| 72 | return resp.text |
| 73 | else: |
| 74 | raise Exception('Error accessing register allocations: %d'%resp.status_code) |
| 75 | |
| 76 | def run_query_raw(self, query): |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 77 | try: |
| 78 | file("/tmp/query_log","a").write("query %s\n" % query) |
| 79 | except: |
| 80 | pass |
| 81 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 82 | p = re.compile('%[a-zA-z_]*') |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 83 | |
| 84 | try: |
| 85 | query = p.sub(self.remap, query) |
| 86 | except MappingException: |
| 87 | self.reload_mapping() |
| 88 | query = p.sub(self.remap, query) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 89 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 90 | try: |
| 91 | file("/tmp/query_log","a").write("remapped query %s\n" % query) |
| 92 | except: |
| 93 | pass |
| 94 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 95 | storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat') |
| 96 | credentials = storage.get() |
| 97 | |
| 98 | if credentials is None or credentials.invalid: |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 99 | credentials = run(FLOW, storage) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 100 | |
| 101 | http = httplib2.Http() |
| 102 | http = credentials.authorize(http) |
| 103 | |
| 104 | service = build('bigquery', 'v2', http=http) |
| 105 | |
Scott Baker | 0817209 | 2014-03-20 15:07:06 -0700 | [diff] [blame] | 106 | body = {"query": query, |
Scott Baker | db403bb | 2014-04-21 00:26:00 -0700 | [diff] [blame] | 107 | "timeoutMs": 60000} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 108 | response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute() |
| 109 | |
| 110 | return response |
| 111 | |
| 112 | def translate_schema(self, response): |
| 113 | for field in response["schema"]["fields"]: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 114 | field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"]) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 115 | |
| 116 | def run_query(self, query): |
Scott Baker | c337ac3 | 2014-06-10 19:54:17 -0700 | [diff] [blame] | 117 | if not BIGQUERY_AVAILABLE: |
| 118 | print >> sys.stderr, "bigquery_analytics: bigquery flow is not available. returning empty result." |
| 119 | return [] |
| 120 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 121 | response = self.run_query_raw(query) |
| 122 | |
| 123 | fieldNames = [] |
| 124 | for field in response["schema"]["fields"]: |
| 125 | fieldNames.append(field["name"]) |
| 126 | |
| 127 | result = [] |
| 128 | if "rows" in response: |
| 129 | for row in response["rows"]: |
| 130 | this_result = {} |
| 131 | for (i,column) in enumerate(row["f"]): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 132 | this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 133 | result.append(this_result) |
| 134 | |
| 135 | return result |
| 136 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 137 | """ Filter_results, groupby_results, do_computed_fields, and postprocess_results |
| 138 | are all used for postprocessing queries. The idea is to do one query that |
| 139 | includes the ungrouped and unfiltered data, and cache it for multiple |
| 140 | consumers who will filter and group it as necessary. |
| 141 | |
| 142 | TODO: Find a more generalized source for these sorts operations. Perhaps |
| 143 | put the results in SQLite and then run SQL queries against it. |
| 144 | """ |
| 145 | |
| 146 | def filter_results(self, rows, name, value): |
| 147 | result = [row for row in rows if row.get(name)==value] |
| 148 | return result |
| 149 | |
| 150 | def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]): |
| 151 | new_rows = {} |
| 152 | for row in rows: |
| 153 | groupby_key = [row.get(k, None) for k in groupBy] |
| 154 | |
| 155 | if str(groupby_key) not in new_rows: |
| 156 | new_row = {} |
| 157 | for k in groupBy: |
| 158 | new_row[k] = row.get(k, None) |
| 159 | |
| 160 | new_rows[str(groupby_key)] = new_row |
| 161 | else: |
| 162 | new_row = new_rows[str(groupby_key)] |
| 163 | |
| 164 | for k in sum: |
| 165 | new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0)) |
| 166 | |
| 167 | for k in avg: |
| 168 | new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0)) |
| 169 | new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1 |
| 170 | |
| 171 | for k in maxi: |
| 172 | new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0))) |
| 173 | |
| 174 | for k in count: |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 175 | v = row.get(k,None) |
| 176 | dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, []) |
| 177 | if (v not in dl): |
| 178 | dl.append(v) |
| 179 | |
| 180 | #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1 |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 181 | |
| 182 | for row in new_rows.values(): |
| 183 | for k in avg: |
| 184 | row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k] |
| 185 | del row["avg_base_" + k] |
| 186 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 187 | for k in count: |
| 188 | new_row["count_" + k] = len(new_row.get("distinct_" + k, [])) |
| 189 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 190 | return new_rows.values() |
| 191 | |
| 192 | def do_computed_fields(self, rows, computed=[]): |
| 193 | computedFieldNames=[] |
| 194 | for row in rows: |
| 195 | for k in computed: |
| 196 | if "/" in k: |
| 197 | parts = k.split("/") |
| 198 | computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","") |
| 199 | try: |
| 200 | row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]]) |
| 201 | except: |
| 202 | pass |
| 203 | |
| 204 | if computedFieldName not in computedFieldNames: |
| 205 | computedFieldNames.append(computedFieldName) |
| 206 | return (computedFieldNames, rows) |
| 207 | |
| 208 | def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None): |
| 209 | sum = [x.replace("%","") for x in sum] |
| 210 | count = [x.replace("%","") for x in count] |
| 211 | avg = [x.replace("%","") for x in avg] |
| 212 | computed = [x.replace("%","") for x in computed] |
| 213 | maxi = [x.replace("%","") for x in maxi] |
Scott Baker | 3a3b4df | 2014-04-28 23:30:52 -0700 | [diff] [blame] | 214 | groupBy = [x.replace("%","") for x in groupBy] |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 215 | |
| 216 | for (k,v) in filter.items(): |
| 217 | rows = self.filter_results(rows, k, v) |
| 218 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 219 | if rows: |
| 220 | if maxDeltaTime is not None: |
| 221 | maxTime = max([float(row["time"]) for row in rows]) |
| 222 | rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime] |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 223 | |
| 224 | (computedFieldNames, rows) = self.do_computed_fields(rows, computed) |
| 225 | sum = sum + computedFieldNames |
Scott Baker | 0fd787d | 2014-05-13 17:03:47 -0700 | [diff] [blame] | 226 | if groupBy: |
| 227 | rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi) |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 228 | return rows |
| 229 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 230 | def remap(self, match): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 231 | if not self.tableName in mappings: |
| 232 | raise MappingException("no mapping for table %s" % self.tableName) |
| 233 | |
| 234 | mapping = mappings[self.tableName] |
| 235 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 236 | token = match.group()[1:] |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 237 | if token in mapping: |
| 238 | return mapping[token] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 239 | else: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 240 | raise MappingException('unknown token %s' % token) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 241 | |
| 242 | def dump_table(self, rows, keys=None): |
| 243 | if not keys: |
| 244 | keys = rows[0].keys() |
| 245 | |
| 246 | lens = {} |
| 247 | for key in keys: |
| 248 | lens[key] = len(key) |
| 249 | |
| 250 | for row in rows: |
| 251 | for key in keys: |
| 252 | thislen = len(str(row.get(key,""))) |
| 253 | lens[key] = max(lens.get(key,0), thislen) |
| 254 | |
| 255 | for key in keys: |
| 256 | print "%*s" % (lens[key], key), |
| 257 | print |
| 258 | |
| 259 | for row in rows: |
| 260 | for key in keys: |
| 261 | print "%*s" % (lens[key], str(row.get(key,""))), |
| 262 | print |
| 263 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 264 | def schema_to_cols(self, schema): |
| 265 | fields = schema["fields"] |
| 266 | |
| 267 | colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"} |
| 268 | |
| 269 | cols = [] |
| 270 | i=0 |
| 271 | for field in fields: |
| 272 | col = {"type": colTypes[field["type"]], |
| 273 | "id": "Col%d" % i, |
| 274 | "label": reverse_mappings[self.tableName].get(field["name"],field["name"])} |
| 275 | cols.append(col) |
| 276 | i=i+1 |
| 277 | |
| 278 | return cols |
| 279 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 280 | def main(): |
| 281 | bq = BigQueryAnalytics() |
| 282 | |
| 283 | rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname") |
| 284 | |
| 285 | bq.dump_table(rows) |
| 286 | |
| 287 | if __name__ == "__main__": |
| 288 | main() |