blob: 4ddf4f668d4ad66dcf142b6c5e5eec45f7ce8d49 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
20"""
21yum -y install python-httplib2
22easy_install python_gflags
23easy_install google_api_python_client
24"""
25
26
27PROJECT_NUMBER = '549187599759'
28
Scott Baker78ab1012014-03-19 23:44:39 -070029try:
30 FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
31 scope='https://www.googleapis.com/auth/bigquery')
32except:
33 print "exception while initializing bigquery flow"
34 traceback.print_exc()
35 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070036
37MINUTE_MS = 60*1000
38HOUR_MS = 60*60*1000
39
40class HpcQuery:
41 def __init__(self):
42 self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
43 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
44
45 def fetch_mapping(self, m=0, table="events"):
46 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
47 resp = requests.get(req)
48 if (resp.status_code==200):
49 return resp.text
50 else:
51 raise Exception('Error accessing register allocations: %d'%resp.status_code)
52
53 def run_query_old(self, query):
54 req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
55 resp = requests.get(req)
56 if (resp.status_code==200):
57 return resp.text
58 else:
59 raise Exception('Error running query: %d'%resp.status_code)
60 return resp
61
62 def run_query(self, query):
63 storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
64 credentials = storage.get()
65
66 if credentials is None or credentials.invalid:
67 credentials = run(FLOW, storage)
68
69 http = httplib2.Http()
70 http = credentials.authorize(http)
71
72 service = build('bigquery', 'v2', http=http)
73
74 body = {"query": query}
75 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
76
77 fieldNames = []
78 for field in response["schema"]["fields"]:
79 fieldNames.append(field["name"])
80
81 result = []
82 if "rows" in response:
83 for row in response["rows"]:
84 this_result = {}
85 for (i,column) in enumerate(row["f"]):
86 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
87 result.append(this_result)
88
89 return result
90
91 def remap(self, match):
92 token = match.group()[1:]
93 if token in self.mapping:
94 return self.mapping[token]
95 else:
96 raise Exception('unknown token %s' % token)
97
98 def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
99 where = []
100 if slice is not None:
101 where.append("%slice='" + slice + "'")
102 if cp is not None:
103 where.append("%cp='" + cp + "'")
104 if hostname is not None:
105 where.append("%hostname='" + hostname + "'")
106 if site is not None:
107 where.append("%hostname contains " + site)
108 where.append("%bytes_sent>0")
109 where = "WHERE " + " AND ".join(where)
110
111 if timeStart is not None:
112 tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
113 else:
114 tableName = "[vicci.demoevents]"
115
116 query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
117 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
118 tableName + " " + where
119
120 if groupBy:
121 query = query + " GROUP BY " + ",".join(groupBy)
122
123 p = re.compile('%[a-zA-z_]*')
124 query = p.sub(self.remap, query)
125
126 rows = self.run_query(query)
127
128 for row in rows:
129 row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
130 row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
131 row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
132 row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
133
134 elapsed = (timeStop-timeStart)/1000
135 KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
136 row["KBps"] = KBps
137
138 return rows
139
140 def sites_from_usage(self, rows, nodes_to_sites={}):
141 sites = {}
142 for row in rows:
143 hostname = row["hostname"]
144
145 if hostname in nodes_to_sites:
146 site_name = nodes_to_sites[hostname]
147 else:
148 parts = hostname.split(".")
149 if len(parts)<=2:
150 continue
151 site_name = parts[1]
152
153 if not (site_name in sites):
154 row = row.copy()
155 row["site"] = site_name
156 row["max_avg_bandwidth"] = row["avg_bandwidth"]
157 # sites table doesn't care about hostnames or avg_bandwidth
158 del row["hostname"]
159 del row["avg_bandwidth"]
160 sites[site_name] = row
161 else:
162 site_row = sites[site_name]
163 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
164 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
165 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
166 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
167
168 return sites.values()
169
170 def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
171 rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
172
173 return self.sites_from_usage(rows)
174
175 def dump_table(self, rows, keys=None):
176 if not keys:
177 keys = rows[0].keys()
178
179 lens = {}
180 for key in keys:
181 lens[key] = len(key)
182
183 for row in rows:
184 for key in keys:
185 thislen = len(str(row.get(key,"")))
186 lens[key] = max(lens.get(key,0), thislen)
187
188 for key in keys:
189 print "%*s" % (lens[key], key),
190 print
191
192 for row in rows:
193 for key in keys:
194 print "%*s" % (lens[key], str(row.get(key,""))),
195 print
196
197class HpcQueryThread(HpcQuery, threading.Thread):
198 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
199 threading.Thread.__init__(self)
200 HpcQuery.__init__(self)
201 self.daemon = True
202 self.interval = interval
203 self.timeStart = timeStart
204 self.nodes_to_sites = nodes_to_sites
205 self.slice = slice
206 self.cp = cp
207 self.data_version = 0
208 self.please_die = False
209 self.update_time = time.time()
210 self.start()
211
212 def is_stalled(self):
213 if time.time()-self.update_time > 300:
214 return True
215 else:
216 return False
217
218 def run(self):
219 while not self.please_die:
220 try:
221 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
222 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
223 self.update_time = time.time()
224 self.new_data()
225 self.data_version += 1
226 except:
227 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
228 time.sleep(self.interval)
229
230 def new_data(self):
231 pass
232
233class HpcDumpThread(HpcQueryThread):
234 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
235 HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
236
237 def new_data(self):
238 os.system("clear")
239
240 print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
241 print
242
243 self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
244 print
245 self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
246 print
247
248
249def main_old():
250 hq = HpcQuery()
251# print hq.mapping
252
253 print "5 minute"
254 hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
255 print
256 hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
257 print
258
259 print "1 hour"
260 hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
261 print
262 hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
263 print
264
265 print "24 hours"
266 hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
267 hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
268 print
269
270def main():
271 hd = HpcDumpThread()
272 while True:
273 time.sleep(30)
274
275if __name__ == "__main__":
276 main()