Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 1 | import re |
| 2 | import base64 |
| 3 | import requests |
| 4 | import urllib |
| 5 | import json |
| 6 | import httplib2 |
| 7 | import threading |
| 8 | import os |
| 9 | import time |
| 10 | import traceback |
| 11 | |
| 12 | from apiclient.discovery import build |
| 13 | from apiclient.errors import HttpError |
| 14 | from oauth2client.client import AccessTokenRefreshError |
| 15 | from oauth2client.client import OAuth2WebServerFlow |
| 16 | from oauth2client.client import flow_from_clientsecrets |
| 17 | from oauth2client.file import Storage |
| 18 | from oauth2client.tools import run_flow,run |
| 19 | |
| 20 | """ |
| 21 | yum -y install python-httplib2 |
| 22 | easy_install python_gflags |
| 23 | easy_install google_api_python_client |
| 24 | """ |
| 25 | |
| 26 | PROJECT_NUMBER = '549187599759' |
| 27 | |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 28 | try: |
| 29 | FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json', |
| 30 | scope='https://www.googleapis.com/auth/bigquery') |
| 31 | except: |
| 32 | print "exception while initializing bigquery flow" |
| 33 | traceback.print_exc() |
| 34 | FLOW = None |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 35 | |
| 36 | MINUTE_MS = 60*1000 |
| 37 | HOUR_MS = 60*60*1000 |
| 38 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 39 | # global to hold cached mappings |
| 40 | mappings = {} |
| 41 | reverse_mappings = {} |
| 42 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 43 | def to_number(s): |
| 44 | try: |
| 45 | if "." in str(s): |
| 46 | return float(s) |
| 47 | else: |
| 48 | return int(s) |
| 49 | except: |
| 50 | return 0 |
| 51 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 52 | class MappingException(Exception): |
| 53 | pass |
| 54 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 55 | class BigQueryAnalytics: |
| 56 | def __init__(self, table = "demoevents"): |
| 57 | self.projectName = "vicci" |
| 58 | self.tableName = table |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 59 | |
| 60 | def reload_mapping(self): |
| 61 | global mappings, reverse_mappings |
| 62 | mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName)) |
| 63 | reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 64 | |
| 65 | def fetch_mapping(self, m=0, table="events"): |
| 66 | req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table) |
| 67 | resp = requests.get(req) |
| 68 | if (resp.status_code==200): |
| 69 | return resp.text |
| 70 | else: |
| 71 | raise Exception('Error accessing register allocations: %d'%resp.status_code) |
| 72 | |
| 73 | def run_query_raw(self, query): |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 74 | try: |
| 75 | file("/tmp/query_log","a").write("query %s\n" % query) |
| 76 | except: |
| 77 | pass |
| 78 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 79 | p = re.compile('%[a-zA-z_]*') |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 80 | |
| 81 | try: |
| 82 | query = p.sub(self.remap, query) |
| 83 | except MappingException: |
| 84 | self.reload_mapping() |
| 85 | query = p.sub(self.remap, query) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 86 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 87 | try: |
| 88 | file("/tmp/query_log","a").write("remapped query %s\n" % query) |
| 89 | except: |
| 90 | pass |
| 91 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 92 | storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat') |
| 93 | credentials = storage.get() |
| 94 | |
| 95 | if credentials is None or credentials.invalid: |
| 96 | credentials = run(FLOW, storage) |
| 97 | |
| 98 | http = httplib2.Http() |
| 99 | http = credentials.authorize(http) |
| 100 | |
| 101 | service = build('bigquery', 'v2', http=http) |
| 102 | |
Scott Baker | 0817209 | 2014-03-20 15:07:06 -0700 | [diff] [blame] | 103 | body = {"query": query, |
Scott Baker | db403bb | 2014-04-21 00:26:00 -0700 | [diff] [blame] | 104 | "timeoutMs": 60000} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 105 | response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute() |
| 106 | |
| 107 | return response |
| 108 | |
| 109 | def translate_schema(self, response): |
| 110 | for field in response["schema"]["fields"]: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 111 | field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"]) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 112 | |
| 113 | def run_query(self, query): |
| 114 | response = self.run_query_raw(query) |
| 115 | |
| 116 | fieldNames = [] |
| 117 | for field in response["schema"]["fields"]: |
| 118 | fieldNames.append(field["name"]) |
| 119 | |
| 120 | result = [] |
| 121 | if "rows" in response: |
| 122 | for row in response["rows"]: |
| 123 | this_result = {} |
| 124 | for (i,column) in enumerate(row["f"]): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 125 | this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 126 | result.append(this_result) |
| 127 | |
| 128 | return result |
| 129 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 130 | """ Filter_results, groupby_results, do_computed_fields, and postprocess_results |
| 131 | are all used for postprocessing queries. The idea is to do one query that |
| 132 | includes the ungrouped and unfiltered data, and cache it for multiple |
| 133 | consumers who will filter and group it as necessary. |
| 134 | |
| 135 | TODO: Find a more generalized source for these sorts operations. Perhaps |
| 136 | put the results in SQLite and then run SQL queries against it. |
| 137 | """ |
| 138 | |
| 139 | def filter_results(self, rows, name, value): |
| 140 | result = [row for row in rows if row.get(name)==value] |
| 141 | return result |
| 142 | |
| 143 | def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]): |
| 144 | new_rows = {} |
| 145 | for row in rows: |
| 146 | groupby_key = [row.get(k, None) for k in groupBy] |
| 147 | |
| 148 | if str(groupby_key) not in new_rows: |
| 149 | new_row = {} |
| 150 | for k in groupBy: |
| 151 | new_row[k] = row.get(k, None) |
| 152 | |
| 153 | new_rows[str(groupby_key)] = new_row |
| 154 | else: |
| 155 | new_row = new_rows[str(groupby_key)] |
| 156 | |
| 157 | for k in sum: |
| 158 | new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0)) |
| 159 | |
| 160 | for k in avg: |
| 161 | new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0)) |
| 162 | new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1 |
| 163 | |
| 164 | for k in maxi: |
| 165 | new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0))) |
| 166 | |
| 167 | for k in count: |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 168 | v = row.get(k,None) |
| 169 | dl = new_row["distinct_" + k] = new_row.get("distinct_" + k, []) |
| 170 | if (v not in dl): |
| 171 | dl.append(v) |
| 172 | |
| 173 | #new_row["count_" + k] = new_row.get("count_" + k, 0) + 1 |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 174 | |
| 175 | for row in new_rows.values(): |
| 176 | for k in avg: |
| 177 | row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k] |
| 178 | del row["avg_base_" + k] |
| 179 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 180 | for k in count: |
| 181 | new_row["count_" + k] = len(new_row.get("distinct_" + k, [])) |
| 182 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 183 | return new_rows.values() |
| 184 | |
| 185 | def do_computed_fields(self, rows, computed=[]): |
| 186 | computedFieldNames=[] |
| 187 | for row in rows: |
| 188 | for k in computed: |
| 189 | if "/" in k: |
| 190 | parts = k.split("/") |
| 191 | computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","") |
| 192 | try: |
| 193 | row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]]) |
| 194 | except: |
| 195 | pass |
| 196 | |
| 197 | if computedFieldName not in computedFieldNames: |
| 198 | computedFieldNames.append(computedFieldName) |
| 199 | return (computedFieldNames, rows) |
| 200 | |
| 201 | def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None): |
| 202 | sum = [x.replace("%","") for x in sum] |
| 203 | count = [x.replace("%","") for x in count] |
| 204 | avg = [x.replace("%","") for x in avg] |
| 205 | computed = [x.replace("%","") for x in computed] |
| 206 | maxi = [x.replace("%","") for x in maxi] |
Scott Baker | 3a3b4df | 2014-04-28 23:30:52 -0700 | [diff] [blame] | 207 | groupBy = [x.replace("%","") for x in groupBy] |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 208 | |
| 209 | for (k,v) in filter.items(): |
| 210 | rows = self.filter_results(rows, k, v) |
| 211 | |
Scott Baker | 58c8396 | 2014-04-24 17:04:55 -0700 | [diff] [blame] | 212 | if rows: |
| 213 | if maxDeltaTime is not None: |
| 214 | maxTime = max([float(row["time"]) for row in rows]) |
| 215 | rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime] |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 216 | |
| 217 | (computedFieldNames, rows) = self.do_computed_fields(rows, computed) |
| 218 | sum = sum + computedFieldNames |
| 219 | rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi) |
| 220 | return rows |
| 221 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 222 | def remap(self, match): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 223 | if not self.tableName in mappings: |
| 224 | raise MappingException("no mapping for table %s" % self.tableName) |
| 225 | |
| 226 | mapping = mappings[self.tableName] |
| 227 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 228 | token = match.group()[1:] |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 229 | if token in mapping: |
| 230 | return mapping[token] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 231 | else: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 232 | raise MappingException('unknown token %s' % token) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 233 | |
| 234 | def dump_table(self, rows, keys=None): |
| 235 | if not keys: |
| 236 | keys = rows[0].keys() |
| 237 | |
| 238 | lens = {} |
| 239 | for key in keys: |
| 240 | lens[key] = len(key) |
| 241 | |
| 242 | for row in rows: |
| 243 | for key in keys: |
| 244 | thislen = len(str(row.get(key,""))) |
| 245 | lens[key] = max(lens.get(key,0), thislen) |
| 246 | |
| 247 | for key in keys: |
| 248 | print "%*s" % (lens[key], key), |
| 249 | print |
| 250 | |
| 251 | for row in rows: |
| 252 | for key in keys: |
| 253 | print "%*s" % (lens[key], str(row.get(key,""))), |
| 254 | print |
| 255 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 256 | def schema_to_cols(self, schema): |
| 257 | fields = schema["fields"] |
| 258 | |
| 259 | colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"} |
| 260 | |
| 261 | cols = [] |
| 262 | i=0 |
| 263 | for field in fields: |
| 264 | col = {"type": colTypes[field["type"]], |
| 265 | "id": "Col%d" % i, |
| 266 | "label": reverse_mappings[self.tableName].get(field["name"],field["name"])} |
| 267 | cols.append(col) |
| 268 | i=i+1 |
| 269 | |
| 270 | return cols |
| 271 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 272 | def main(): |
| 273 | bq = BigQueryAnalytics() |
| 274 | |
| 275 | rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname") |
| 276 | |
| 277 | bq.dump_table(rows) |
| 278 | |
| 279 | if __name__ == "__main__": |
| 280 | main() |