blob: 874022e4231596f7a2deed6770992b4a5f12d3b7 [file] [log] [blame]
Scott Baker43adf1b2014-03-19 21:54:55 -07001import re
2import base64
3import requests
4import urllib
5import json
6import httplib2
7import threading
8import os
9import time
10import traceback
11
12from apiclient.discovery import build
13from apiclient.errors import HttpError
14from oauth2client.client import AccessTokenRefreshError
15from oauth2client.client import OAuth2WebServerFlow
16from oauth2client.client import flow_from_clientsecrets
17from oauth2client.file import Storage
18from oauth2client.tools import run_flow,run
19
Scott Bakerb969c462015-02-04 16:22:05 -080020from bigquery_config import BIGQUERY_SECRETS_FN, BIGQUERY_CREDENTIALS_FN
21
Scott Baker43adf1b2014-03-19 21:54:55 -070022"""
23yum -y install python-httplib2
24easy_install python_gflags
25easy_install google_api_python_client
26"""
27
28
29PROJECT_NUMBER = '549187599759'
30
Scott Baker78ab1012014-03-19 23:44:39 -070031try:
Scott Bakerb969c462015-02-04 16:22:05 -080032 FLOW = flow_from_clientsecrets(BIGQUERY_SECRETS_FN,
Scott Baker78ab1012014-03-19 23:44:39 -070033 scope='https://www.googleapis.com/auth/bigquery')
34except:
35 print "exception while initializing bigquery flow"
36 traceback.print_exc()
37 FLOW = None
Scott Baker43adf1b2014-03-19 21:54:55 -070038
39MINUTE_MS = 60*1000
40HOUR_MS = 60*60*1000
41
42class HpcQuery:
43 def __init__(self):
44 self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
45 self.reverse_mapping = {v:k for k, v in self.mapping.items()}
46
47 def fetch_mapping(self, m=0, table="events"):
48 req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
49 resp = requests.get(req)
50 if (resp.status_code==200):
51 return resp.text
52 else:
53 raise Exception('Error accessing register allocations: %d'%resp.status_code)
54
55 def run_query_old(self, query):
56 req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
57 resp = requests.get(req)
58 if (resp.status_code==200):
59 return resp.text
60 else:
61 raise Exception('Error running query: %d'%resp.status_code)
62 return resp
63
64 def run_query(self, query):
Scott Bakerb969c462015-02-04 16:22:05 -080065 storage = Storage(BIGQUERY_CREDENTIALS_FN)
Scott Baker43adf1b2014-03-19 21:54:55 -070066 credentials = storage.get()
67
68 if credentials is None or credentials.invalid:
69 credentials = run(FLOW, storage)
70
71 http = httplib2.Http()
72 http = credentials.authorize(http)
73
74 service = build('bigquery', 'v2', http=http)
75
76 body = {"query": query}
77 response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
78
79 fieldNames = []
80 for field in response["schema"]["fields"]:
81 fieldNames.append(field["name"])
82
83 result = []
84 if "rows" in response:
85 for row in response["rows"]:
86 this_result = {}
87 for (i,column) in enumerate(row["f"]):
88 this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
89 result.append(this_result)
90
91 return result
92
93 def remap(self, match):
94 token = match.group()[1:]
95 if token in self.mapping:
96 return self.mapping[token]
97 else:
98 raise Exception('unknown token %s' % token)
99
100 def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
101 where = []
102 if slice is not None:
103 where.append("%slice='" + slice + "'")
104 if cp is not None:
105 where.append("%cp='" + cp + "'")
106 if hostname is not None:
107 where.append("%hostname='" + hostname + "'")
108 if site is not None:
109 where.append("%hostname contains " + site)
110 where.append("%bytes_sent>0")
111 where = "WHERE " + " AND ".join(where)
112
113 if timeStart is not None:
114 tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
115 else:
116 tableName = "[vicci.demoevents]"
117
118 query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
119 " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
120 tableName + " " + where
121
122 if groupBy:
123 query = query + " GROUP BY " + ",".join(groupBy)
124
125 p = re.compile('%[a-zA-z_]*')
126 query = p.sub(self.remap, query)
127
128 rows = self.run_query(query)
129
130 for row in rows:
131 row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
132 row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
133 row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
134 row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
135
136 elapsed = (timeStop-timeStart)/1000
137 KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
138 row["KBps"] = KBps
139
140 return rows
141
142 def sites_from_usage(self, rows, nodes_to_sites={}):
143 sites = {}
144 for row in rows:
145 hostname = row["hostname"]
146
147 if hostname in nodes_to_sites:
148 site_name = nodes_to_sites[hostname]
149 else:
150 parts = hostname.split(".")
151 if len(parts)<=2:
152 continue
153 site_name = parts[1]
154
155 if not (site_name in sites):
156 row = row.copy()
157 row["site"] = site_name
158 row["max_avg_bandwidth"] = row["avg_bandwidth"]
159 # sites table doesn't care about hostnames or avg_bandwidth
160 del row["hostname"]
161 del row["avg_bandwidth"]
162 sites[site_name] = row
163 else:
164 site_row = sites[site_name]
165 site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
166 site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
167 site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
168 site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
169
170 return sites.values()
171
172 def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
173 rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
174
175 return self.sites_from_usage(rows)
176
177 def dump_table(self, rows, keys=None):
178 if not keys:
179 keys = rows[0].keys()
180
181 lens = {}
182 for key in keys:
183 lens[key] = len(key)
184
185 for row in rows:
186 for key in keys:
187 thislen = len(str(row.get(key,"")))
188 lens[key] = max(lens.get(key,0), thislen)
189
190 for key in keys:
191 print "%*s" % (lens[key], key),
192 print
193
194 for row in rows:
195 for key in keys:
196 print "%*s" % (lens[key], str(row.get(key,""))),
197 print
198
199class HpcQueryThread(HpcQuery, threading.Thread):
200 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
201 threading.Thread.__init__(self)
202 HpcQuery.__init__(self)
203 self.daemon = True
204 self.interval = interval
205 self.timeStart = timeStart
206 self.nodes_to_sites = nodes_to_sites
207 self.slice = slice
208 self.cp = cp
209 self.data_version = 0
210 self.please_die = False
211 self.update_time = time.time()
212 self.start()
213
214 def is_stalled(self):
215 if time.time()-self.update_time > 300:
216 return True
217 else:
218 return False
219
220 def run(self):
221 while not self.please_die:
222 try:
223 self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
224 self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
225 self.update_time = time.time()
226 self.new_data()
227 self.data_version += 1
228 except:
229 file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
230 time.sleep(self.interval)
231
232 def new_data(self):
233 pass
234
235class HpcDumpThread(HpcQueryThread):
236 def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
237 HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
238
239 def new_data(self):
240 os.system("clear")
241
242 print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
243 print
244
245 self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
246 print
247 self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
248 print
249
250
251def main_old():
252 hq = HpcQuery()
253# print hq.mapping
254
255 print "5 minute"
256 hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
257 print
258 hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
259 print
260
261 print "1 hour"
262 hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
263 print
264 hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
265 print
266
267 print "24 hours"
268 hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
269 hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
270 print
271
272def main():
273 hd = HpcDumpThread()
274 while True:
275 time.sleep(30)
276
277if __name__ == "__main__":
278 main()