blob: ca08025133071c50d340d8f753838fa86821e930 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26PROJECT_NUMBER = '549187599759'
27
28FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
29 scope='https://www.googleapis.com/auth/bigquery')
30
31MINUTE_MS = 60*1000
32HOUR_MS = 60*60*1000
33
34class BigQueryAnalytics:
35 def __init__(self, table = "demoevents"):
36 self.projectName = "vicci"
37 self.tableName = table
38 self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
39 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
40
41 def fetch_mapping(self, m=0, table="events"):
42 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
43 resp = requests.get(req)
44 if (resp.status_code==200):
45 return resp.text
46 else:
47 raise Exception('Error accessing register allocations: %d'%resp.status_code)
48
49 def run_query_raw(self, query):
50 p = re.compile('%[a-zA-z_]*')
51 query = p.sub(self.remap, query)
52
53 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
54 credentials = storage.get()
55
56 if credentials is None or credentials.invalid:
57 credentials = run(FLOW, storage)
58
59 http = httplib2.Http()
60 http = credentials.authorize(http)
61
62 service = build('bigquery', 'v2', http=http)
63
64 body = {"query": query}
65 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
66
67 return response
68
69 def translate_schema(self, response):
70 for field in response["schema"]["fields"]:
71 field["name"] = self.reverse_mapping.get(field["name"], field["name"])
72
73 def run_query(self, query):
74 response = self.run_query_raw(query)
75
76 fieldNames = []
77 for field in response["schema"]["fields"]:
78 fieldNames.append(field["name"])
79
80 result = []
81 if "rows" in response:
82 for row in response["rows"]:
83 this_result = {}
84 for (i,column) in enumerate(row["f"]):
85 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
86 result.append(this_result)
87
88 return result
89
90 def remap(self, match):
91 token = match.group()[1:]
92 if token in self.mapping:
93 return self.mapping[token]
94 else:
95 raise Exception('unknown token %s' % token)
96
97 def dump_table(self, rows, keys=None):
98 if not keys:
99 keys = rows[0].keys()
100
101 lens = {}
102 for key in keys:
103 lens[key] = len(key)
104
105 for row in rows:
106 for key in keys:
107 thislen = len(str(row.get(key,"")))
108 lens[key] = max(lens.get(key,0), thislen)
109
110 for key in keys:
111 print "%*s" % (lens[key], key),
112 print
113
114 for row in rows:
115 for key in keys:
116 print "%*s" % (lens[key], str(row.get(key,""))),
117 print
118
119def main():
120 bq = BigQueryAnalytics()
121
122 rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
123
124 bq.dump_table(rows)
125
126if __name__ == "__main__":
127 main()