blob: 3570a56e8f6a8a634aa885a0acdeb0bbe54d65a4 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26
27PROJECT_NUMBER = '549187599759'
28
29FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
30 scope='https://www.googleapis.com/auth/bigquery')
31
32MINUTE_MS = 60*1000
33HOUR_MS = 60*60*1000
34
35class HpcQuery:
36 def __init__(self):
37 self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
38 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
39
40 def fetch_mapping(self, m=0, table="events"):
41 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
42 resp = requests.get(req)
43 if (resp.status_code==200):
44 return resp.text
45 else:
46 raise Exception('Error accessing register allocations: %d'%resp.status_code)
47
48 def run_query_old(self, query):
49 req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
50 resp = requests.get(req)
51 if (resp.status_code==200):
52 return resp.text
53 else:
54 raise Exception('Error running query: %d'%resp.status_code)
55 return resp
56
57 def run_query(self, query):
58 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
59 credentials = storage.get()
60
61 if credentials is None or credentials.invalid:
62 credentials = run(FLOW, storage)
63
64 http = httplib2.Http()
65 http = credentials.authorize(http)
66
67 service = build('bigquery', 'v2', http=http)
68
69 body = {"query": query}
70 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
71
72 fieldNames = []
73 for field in response["schema"]["fields"]:
74 fieldNames.append(field["name"])
75
76 result = []
77 if "rows" in response:
78 for row in response["rows"]:
79 this_result = {}
80 for (i,column) in enumerate(row["f"]):
81 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
82 result.append(this_result)
83
84 return result
85
86 def remap(self, match):
87 token = match.group()[1:]
88 if token in self.mapping:
89 return self.mapping[token]
90 else:
91 raise Exception('unknown token %s' % token)
92
93 def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
94 where = []
95 if slice is not None:
96 where.append("%slice='" + slice + "'")
97 if cp is not None:
98 where.append("%cp='" + cp + "'")
99 if hostname is not None:
100 where.append("%hostname='" + hostname + "'")
101 if site is not None:
102 where.append("%hostname contains " + site)
103 where.append("%bytes_sent>0")
104 where = "WHERE " + " AND ".join(where)
105
106 if timeStart is not None:
107 tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
108 else:
109 tableName = "[vicci.demoevents]"
110
111 query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
112 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
113 tableName + " " + where
114
115 if groupBy:
116 query = query + " GROUP BY " + ",".join(groupBy)
117
118 p = re.compile('%[a-zA-z_]*')
119 query = p.sub(self.remap, query)
120
121 rows = self.run_query(query)
122
123 for row in rows:
124 row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
125 row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
126 row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
127 row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
128
129 elapsed = (timeStop-timeStart)/1000
130 KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
131 row["KBps"] = KBps
132
133 return rows
134
135 def sites_from_usage(self, rows, nodes_to_sites={}):
136 sites = {}
137 for row in rows:
138 hostname = row["hostname"]
139
140 if hostname in nodes_to_sites:
141 site_name = nodes_to_sites[hostname]
142 else:
143 parts = hostname.split(".")
144 if len(parts)<=2:
145 continue
146 site_name = parts[1]
147
148 if not (site_name in sites):
149 row = row.copy()
150 row["site"] = site_name
151 row["max_avg_bandwidth"] = row["avg_bandwidth"]
152 # sites table doesn't care about hostnames or avg_bandwidth
153 del row["hostname"]
154 del row["avg_bandwidth"]
155 sites[site_name] = row
156 else:
157 site_row = sites[site_name]
158 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
159 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
160 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
161 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
162
163 return sites.values()
164
165 def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
166 rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
167
168 return self.sites_from_usage(rows)
169
170 def dump_table(self, rows, keys=None):
171 if not keys:
172 keys = rows[0].keys()
173
174 lens = {}
175 for key in keys:
176 lens[key] = len(key)
177
178 for row in rows:
179 for key in keys:
180 thislen = len(str(row.get(key,"")))
181 lens[key] = max(lens.get(key,0), thislen)
182
183 for key in keys:
184 print "%*s" % (lens[key], key),
185 print
186
187 for row in rows:
188 for key in keys:
189 print "%*s" % (lens[key], str(row.get(key,""))),
190 print
191
192class HpcQueryThread(HpcQuery, threading.Thread):
193 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
194 threading.Thread.__init__(self)
195 HpcQuery.__init__(self)
196 self.daemon = True
197 self.interval = interval
198 self.timeStart = timeStart
199 self.nodes_to_sites = nodes_to_sites
200 self.slice = slice
201 self.cp = cp
202 self.data_version = 0
203 self.please_die = False
204 self.update_time = time.time()
205 self.start()
206
207 def is_stalled(self):
208 if time.time()-self.update_time > 300:
209 return True
210 else:
211 return False
212
213 def run(self):
214 while not self.please_die:
215 try:
216 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
217 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
218 self.update_time = time.time()
219 self.new_data()
220 self.data_version += 1
221 except:
222 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
223 time.sleep(self.interval)
224
225 def new_data(self):
226 pass
227
228class HpcDumpThread(HpcQueryThread):
229 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
230 HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
231
232 def new_data(self):
233 os.system("clear")
234
235 print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
236 print
237
238 self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
239 print
240 self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
241 print
242
243
244def main_old():
245 hq = HpcQuery()
246# print hq.mapping
247
248 print "5 minute"
249 hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
250 print
251 hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
252 print
253
254 print "1 hour"
255 hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
256 print
257 hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
258 print
259
260 print "24 hours"
261 hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
262 hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
263 print
264
265def main():
266 hd = HpcDumpThread()
267 while True:
268 time.sleep(30)
269
270if __name__ == "__main__":
271 main()