blob: 44b5dbdb9cb08c91654b4ed6c72c191eecafd47a [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
Scott Baker78ab1012014-03-19 23:44:39 -070028try:
29 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31except:
32 print "exception while initializing bigquery flow"
33 traceback.print_exc()
34 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070035
36MINUTE_MS = 60*1000
37HOUR_MS = 60*60*1000
38
39class BigQueryAnalytics:
40 def __init__(self, table = "demoevents"):
41 self.projectName = "vicci"
42 self.tableName = table
43 self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
44 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
45
46 def fetch_mapping(self, m=0, table="events"):
47 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
48 resp = requests.get(req)
49 if (resp.status_code==200):
50 return resp.text
51 else:
52 raise Exception('Error accessing register allocations: %d'%resp.status_code)
53
54 def run_query_raw(self, query):
55 p = re.compile('%[a-zA-z_]*')
56 query = p.sub(self.remap, query)
57
58 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59 credentials = storage.get()
60
61 if credentials is None or credentials.invalid:
62 credentials = run(FLOW, storage)
63
64 http = httplib2.Http()
65 http = credentials.authorize(http)
66
67 service = build('bigquery', 'v2', http=http)
68
69 body = {"query": query}
70 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
71
72 return response
73
74 def translate_schema(self, response):
75 for field in response["schema"]["fields"]:
76 field["name"] = self.reverse_mapping.get(field["name"], field["name"])
77
78 def run_query(self, query):
79 response = self.run_query_raw(query)
80
81 fieldNames = []
82 for field in response["schema"]["fields"]:
83 fieldNames.append(field["name"])
84
85 result = []
86 if "rows" in response:
87 for row in response["rows"]:
88 this_result = {}
89 for (i,column) in enumerate(row["f"]):
90 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
91 result.append(this_result)
92
93 return result
94
95 def remap(self, match):
96 token = match.group()[1:]
97 if token in self.mapping:
98 return self.mapping[token]
99 else:
100 raise Exception('unknown token %s' % token)
101
102 def dump_table(self, rows, keys=None):
103 if not keys:
104 keys = rows[0].keys()
105
106 lens = {}
107 for key in keys:
108 lens[key] = len(key)
109
110 for row in rows:
111 for key in keys:
112 thislen = len(str(row.get(key,"")))
113 lens[key] = max(lens.get(key,0), thislen)
114
115 for key in keys:
116 print "%*s" % (lens[key], key),
117 print
118
119 for row in rows:
120 for key in keys:
121 print "%*s" % (lens[key], str(row.get(key,""))),
122 print
123
124def main():
125 bq = BigQueryAnalytics()
126
127 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
128
129 bq.dump_table(rows)
130
131if __name__ == "__main__":
132 main()