blob: f50a9aca8123d2d4b9268ee0abc7e75a5286c354 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
Scott Bakerba60d822014-03-27 09:12:28 -070039# global to hold cached mappings
40mappings = {}
41reverse_mappings = {}
42
43class MappingException(Exception):
44 pass
45
Scott Baker43adf1b2014-03-19 21:54:55 -070046class BigQueryAnalytics:
47 def __init__(self, table = "demoevents"):
48 self.projectName = "vicci"
49 self.tableName = table
Scott Bakerba60d822014-03-27 09:12:28 -070050
51 def reload_mapping(self):
52 global mappings, reverse_mappings
53 mappings[self.tableName] = json.loads(self.fetch_mapping(table=self.tableName))
54 reverse_mappings[self.tableName] = {v:k for k, v in mappings[self.tableName].items()}
Scott Baker43adf1b2014-03-19 21:54:55 -070055
56 def fetch_mapping(self, m=0, table="events"):
57 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
58 resp = requests.get(req)
59 if (resp.status_code==200):
60 return resp.text
61 else:
62 raise Exception('Error accessing register allocations: %d'%resp.status_code)
63
64 def run_query_raw(self, query):
65 p = re.compile('%[a-zA-z_]*')
Scott Bakerba60d822014-03-27 09:12:28 -070066
67 try:
68 query = p.sub(self.remap, query)
69 except MappingException:
70 self.reload_mapping()
71 query = p.sub(self.remap, query)
Scott Baker43adf1b2014-03-19 21:54:55 -070072
73 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
74 credentials = storage.get()
75
76 if credentials is None or credentials.invalid:
77 credentials = run(FLOW, storage)
78
79 http = httplib2.Http()
80 http = credentials.authorize(http)
81
82 service = build('bigquery', 'v2', http=http)
83
Scott Baker08172092014-03-20 15:07:06 -070084 body = {"query": query,
85 "timeoutMs": 30000}
Scott Baker43adf1b2014-03-19 21:54:55 -070086 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
87
88 return response
89
90 def translate_schema(self, response):
91 for field in response["schema"]["fields"]:
Scott Bakerba60d822014-03-27 09:12:28 -070092 field["name"] = reverse_mappings[self.tableName].get(field["name"], field["name"])
Scott Baker43adf1b2014-03-19 21:54:55 -070093
94 def run_query(self, query):
95 response = self.run_query_raw(query)
96
97 fieldNames = []
98 for field in response["schema"]["fields"]:
99 fieldNames.append(field["name"])
100
101 result = []
102 if "rows" in response:
103 for row in response["rows"]:
104 this_result = {}
105 for (i,column) in enumerate(row["f"]):
Scott Bakerba60d822014-03-27 09:12:28 -0700106 this_result[reverse_mappings[self.tableName].get(fieldNames[i],fieldNames[i])] = column["v"]
Scott Baker43adf1b2014-03-19 21:54:55 -0700107 result.append(this_result)
108
109 return result
110
111 def remap(self, match):
Scott Bakerba60d822014-03-27 09:12:28 -0700112 if not self.tableName in mappings:
113 raise MappingException("no mapping for table %s" % self.tableName)
114
115 mapping = mappings[self.tableName]
116
Scott Baker43adf1b2014-03-19 21:54:55 -0700117 token = match.group()[1:]
Scott Bakerba60d822014-03-27 09:12:28 -0700118 if token in mapping:
119 return mapping[token]
Scott Baker43adf1b2014-03-19 21:54:55 -0700120 else:
Scott Bakerba60d822014-03-27 09:12:28 -0700121 raise MappingException('unknown token %s' % token)
Scott Baker43adf1b2014-03-19 21:54:55 -0700122
123 def dump_table(self, rows, keys=None):
124 if not keys:
125 keys = rows[0].keys()
126
127 lens = {}
128 for key in keys:
129 lens[key] = len(key)
130
131 for row in rows:
132 for key in keys:
133 thislen = len(str(row.get(key,"")))
134 lens[key] = max(lens.get(key,0), thislen)
135
136 for key in keys:
137 print "%*s" % (lens[key], key),
138 print
139
140 for row in rows:
141 for key in keys:
142 print "%*s" % (lens[key], str(row.get(key,""))),
143 print
144
Scott Bakerba60d822014-03-27 09:12:28 -0700145 def schema_to_cols(self, schema):
146 fields = schema["fields"]
147
148 colTypes = {"STRING": "string", "INTEGER": "number", "FLOAT": "number", "TIMESTAMP": "date"}
149
150 cols = []
151 i=0
152 for field in fields:
153 col = {"type": colTypes[field["type"]],
154 "id": "Col%d" % i,
155 "label": reverse_mappings[self.tableName].get(field["name"],field["name"])}
156 cols.append(col)
157 i=i+1
158
159 return cols
160
Scott Baker43adf1b2014-03-19 21:54:55 -0700161def main():
162 bq = BigQueryAnalytics()
163
164 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
165
166 bq.dump_table(rows)
167
168if __name__ == "__main__":
169 main()