Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 1 | import re |
| 2 | import base64 |
| 3 | import requests |
| 4 | import urllib |
| 5 | import json |
| 6 | import httplib2 |
| 7 | import threading |
| 8 | import os |
| 9 | import time |
| 10 | import traceback |
| 11 | |
| 12 | from apiclient.discovery import build |
| 13 | from apiclient.errors import HttpError |
| 14 | from oauth2client.client import AccessTokenRefreshError |
| 15 | from oauth2client.client import OAuth2WebServerFlow |
| 16 | from oauth2client.client import flow_from_clientsecrets |
| 17 | from oauth2client.file import Storage |
| 18 | from oauth2client.tools import run_flow,run |
| 19 | |
| 20 | """ |
| 21 | yum -y install python-httplib2 |
| 22 | easy_install python_gflags |
| 23 | easy_install google_api_python_client |
| 24 | """ |
| 25 | |
| 26 | PROJECT_NUMBER = '549187599759' |
| 27 | |
Scott Baker | 78ab101 | 2014-03-19 23:44:39 -0700 | [diff] [blame] | 28 | try: |
| 29 | FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json', |
| 30 | scope='https://www.googleapis.com/auth/bigquery') |
| 31 | except: |
| 32 | print "exception while initializing bigquery flow" |
| 33 | traceback.print_exc() |
| 34 | FLOW = None |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 35 | |
| 36 | MINUTE_MS = 60*1000 |
| 37 | HOUR_MS = 60*60*1000 |
| 38 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 39 | # global to hold cached mappings |
| 40 | mappings = {} |
| 41 | reverse_mappings = {} |
| 42 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 43 | def to_number(s): |
| 44 | try: |
| 45 | if "." in str(s): |
| 46 | return float(s) |
| 47 | else: |
| 48 | return int(s) |
| 49 | except: |
| 50 | return 0 |
| 51 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 52 | class MappingException(Exception): |
| 53 | pass |
| 54 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 55 | class BigQueryAnalytics: |
| 56 | def __init__(self, table = "demoevents"): |
| 57 | self.projectName = "vicci" |
| 58 | self.tableName = table |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 59 | |
| 60 | def reload_mapping(self): |
| 61 | global mappings, reverse_mappings |
| 62 | mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName)) |
| 63 | reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 64 | |
| 65 | def fetch_mapping(self, m=0, table="events"): |
| 66 | req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table) |
| 67 | resp = requests.get(req) |
| 68 | if (resp.status_code==200): |
| 69 | return resp.text |
| 70 | else: |
| 71 | raise Exception('Error accessing register allocations: %d'%resp.status_code) |
| 72 | |
| 73 | def run_query_raw(self, query): |
| 74 | p = re.compile('%[a-zA-z_]*') |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 75 | |
| 76 | try: |
| 77 | query = p.sub(self.remap, query) |
| 78 | except MappingException: |
| 79 | self.reload_mapping() |
| 80 | query = p.sub(self.remap, query) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 81 | |
| 82 | storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat') |
| 83 | credentials = storage.get() |
| 84 | |
| 85 | if credentials is None or credentials.invalid: |
| 86 | credentials = run(FLOW, storage) |
| 87 | |
| 88 | http = httplib2.Http() |
| 89 | http = credentials.authorize(http) |
| 90 | |
| 91 | service = build('bigquery', 'v2', http=http) |
| 92 | |
Scott Baker | 0817209 | 2014-03-20 15:07:06 -0700 | [diff] [blame] | 93 | body = {"query": query, |
Scott Baker | db403bb | 2014-04-21 00:26:00 -0700 | [diff] [blame] | 94 | "timeoutMs": 60000} |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 95 | response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute() |
| 96 | |
| 97 | return response |
| 98 | |
| 99 | def translate_schema(self, response): |
| 100 | for field in response["schema"]["fields"]: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 101 | field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"]) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 102 | |
| 103 | def run_query(self, query): |
| 104 | response = self.run_query_raw(query) |
| 105 | |
| 106 | fieldNames = [] |
| 107 | for field in response["schema"]["fields"]: |
| 108 | fieldNames.append(field["name"]) |
| 109 | |
| 110 | result = [] |
| 111 | if "rows" in response: |
| 112 | for row in response["rows"]: |
| 113 | this_result = {} |
| 114 | for (i,column) in enumerate(row["f"]): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 115 | this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 116 | result.append(this_result) |
| 117 | |
| 118 | return result |
| 119 | |
Scott Baker | c655e66 | 2014-04-18 10:46:25 -0700 | [diff] [blame] | 120 | """ Filter_results, groupby_results, do_computed_fields, and postprocess_results |
| 121 | are all used for postprocessing queries. The idea is to do one query that |
| 122 | includes the ungrouped and unfiltered data, and cache it for multiple |
| 123 | consumers who will filter and group it as necessary. |
| 124 | |
| 125 | TODO: Find a more generalized source for these sorts operations. Perhaps |
| 126 | put the results in SQLite and then run SQL queries against it. |
| 127 | """ |
| 128 | |
| 129 | def filter_results(self, rows, name, value): |
| 130 | result = [row for row in rows if row.get(name)==value] |
| 131 | return result |
| 132 | |
| 133 | def groupby_results(self, rows, groupBy=[], sum=[], count=[], avg=[], maxi=[]): |
| 134 | new_rows = {} |
| 135 | for row in rows: |
| 136 | groupby_key = [row.get(k, None) for k in groupBy] |
| 137 | |
| 138 | if str(groupby_key) not in new_rows: |
| 139 | new_row = {} |
| 140 | for k in groupBy: |
| 141 | new_row[k] = row.get(k, None) |
| 142 | |
| 143 | new_rows[str(groupby_key)] = new_row |
| 144 | else: |
| 145 | new_row = new_rows[str(groupby_key)] |
| 146 | |
| 147 | for k in sum: |
| 148 | new_row["sum_" + k] = new_row.get("sum_" + k, 0) + to_number(row.get(k,0)) |
| 149 | |
| 150 | for k in avg: |
| 151 | new_row["avg_" + k] = new_row.get("avg_" + k, 0) + to_number(row.get(k,0)) |
| 152 | new_row["avg_base_" + k] = new_row.get("avg_base_"+k,0) + 1 |
| 153 | |
| 154 | for k in maxi: |
| 155 | new_row["max_" + k] = max(new_row.get("max_" + k, 0), to_number(row.get(k,0))) |
| 156 | |
| 157 | for k in count: |
| 158 | new_row["count_" + k] = new_row.get("count_" + k, 0) + 1 |
| 159 | |
| 160 | for row in new_rows.values(): |
| 161 | for k in avg: |
| 162 | row["avg_" + k] = float(row["avg_" + k]) / row["avg_base_" + k] |
| 163 | del row["avg_base_" + k] |
| 164 | |
| 165 | return new_rows.values() |
| 166 | |
| 167 | def do_computed_fields(self, rows, computed=[]): |
| 168 | computedFieldNames=[] |
| 169 | for row in rows: |
| 170 | for k in computed: |
| 171 | if "/" in k: |
| 172 | parts = k.split("/") |
| 173 | computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","") |
| 174 | try: |
| 175 | row[computedFieldName] = to_number(row[parts[0]]) / to_number(row[parts[1]]) |
| 176 | except: |
| 177 | pass |
| 178 | |
| 179 | if computedFieldName not in computedFieldNames: |
| 180 | computedFieldNames.append(computedFieldName) |
| 181 | return (computedFieldNames, rows) |
| 182 | |
| 183 | def postprocess_results(self, rows, filter={}, groupBy=[], sum=[], count=[], avg=[], computed=[], maxi=[], maxDeltaTime=None): |
| 184 | sum = [x.replace("%","") for x in sum] |
| 185 | count = [x.replace("%","") for x in count] |
| 186 | avg = [x.replace("%","") for x in avg] |
| 187 | computed = [x.replace("%","") for x in computed] |
| 188 | maxi = [x.replace("%","") for x in maxi] |
| 189 | |
| 190 | for (k,v) in filter.items(): |
| 191 | rows = self.filter_results(rows, k, v) |
| 192 | |
| 193 | if maxDeltaTime is not None: |
| 194 | maxTime = max([float(row["time"]) for row in rows]) |
| 195 | rows = [row for row in rows if float(row["time"])>=maxTime-maxDeltaTime] |
| 196 | |
| 197 | (computedFieldNames, rows) = self.do_computed_fields(rows, computed) |
| 198 | sum = sum + computedFieldNames |
| 199 | rows = self.groupby_results(rows, groupBy, sum, count, avg, maxi) |
| 200 | return rows |
| 201 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 202 | def remap(self, match): |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 203 | if not self.tableName in mappings: |
| 204 | raise MappingException("no mapping for table %s" % self.tableName) |
| 205 | |
| 206 | mapping = mappings[self.tableName] |
| 207 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 208 | token = match.group()[1:] |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 209 | if token in mapping: |
| 210 | return mapping[token] |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 211 | else: |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 212 | raise MappingException('unknown token %s' % token) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 213 | |
| 214 | def dump_table(self, rows, keys=None): |
| 215 | if not keys: |
| 216 | keys = rows[0].keys() |
| 217 | |
| 218 | lens = {} |
| 219 | for key in keys: |
| 220 | lens[key] = len(key) |
| 221 | |
| 222 | for row in rows: |
| 223 | for key in keys: |
| 224 | thislen = len(str(row.get(key,""))) |
| 225 | lens[key] = max(lens.get(key,0), thislen) |
| 226 | |
| 227 | for key in keys: |
| 228 | print "%*s" % (lens[key], key), |
| 229 | print |
| 230 | |
| 231 | for row in rows: |
| 232 | for key in keys: |
| 233 | print "%*s" % (lens[key], str(row.get(key,""))), |
| 234 | print |
| 235 | |
Scott Baker | ba60d82 | 2014-03-27 09:12:28 -0700 | [diff] [blame] | 236 | def schema_to_cols(self, schema): |
| 237 | fields = schema["fields"] |
| 238 | |
| 239 | colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"} |
| 240 | |
| 241 | cols = [] |
| 242 | i=0 |
| 243 | for field in fields: |
| 244 | col = {"type": colTypes[field["type"]], |
| 245 | "id": "Col%d" % i, |
| 246 | "label": reverse_mappings[self.tableName].get(field["name"],field["name"])} |
| 247 | cols.append(col) |
| 248 | i=i+1 |
| 249 | |
| 250 | return cols |
| 251 | |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 252 | def main(): |
| 253 | bq = BigQueryAnalytics() |
| 254 | |
| 255 | rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname") |
| 256 | |
| 257 | bq.dump_table(rows) |
| 258 | |
| 259 | if __name__ == "__main__": |
| 260 | main() |