check in hpc_wizard and analytics python source
diff --git a/planetstack/hpc_wizard/README b/planetstack/hpc_wizard/README
new file mode 100644
index 0000000..06b12b5
--- /dev/null
+++ b/planetstack/hpc_wizard/README
@@ -0,0 +1,10 @@
+Two files are purposely not included in the git repository:
+ bigquery_credentials.dat
+ client_secrets.json
+
+These files must be manually installed.
+
+Additionally, the following packages must be installed:
+ yum -y install python-httplib2
+ easy_install python_gflags
+ easy_install google_api_python_client
diff --git a/planetstack/hpc_wizard/bigquery_analytics.py b/planetstack/hpc_wizard/bigquery_analytics.py
new file mode 100644
index 0000000..ca08025
--- /dev/null
+++ b/planetstack/hpc_wizard/bigquery_analytics.py
@@ -0,0 +1,127 @@
+import re
+import base64
+import requests
+import urllib
+import json
+import httplib2
+import threading
+import os
+import time
+import traceback
+
+from apiclient.discovery import build
+from apiclient.errors import HttpError
+from oauth2client.client import AccessTokenRefreshError
+from oauth2client.client import OAuth2WebServerFlow
+from oauth2client.client import flow_from_clientsecrets
+from oauth2client.file import Storage
+from oauth2client.tools import run_flow,run
+
+"""
+yum -y install python-httplib2
+easy_install python_gflags
+easy_install google_api_python_client
+"""
+
+PROJECT_NUMBER = '549187599759'
+
+FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
+ scope='https://www.googleapis.com/auth/bigquery')
+
+MINUTE_MS = 60*1000
+HOUR_MS = 60*60*1000
+
+class BigQueryAnalytics:
+ def __init__(self, table = "demoevents"):
+ self.projectName = "vicci"
+ self.tableName = table
+ self.mapping = json.loads(self.fetch_mapping(table=self.tableName))
+ self.reverse_mapping = {v:k for k, v in self.mapping.items()}
+
+ def fetch_mapping(self, m=0, table="events"):
+ req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
+ resp = requests.get(req)
+ if (resp.status_code==200):
+ return resp.text
+ else:
+ raise Exception('Error accessing register allocations: %d'%resp.status_code)
+
+ def run_query_raw(self, query):
+ p = re.compile('%[a-zA-z_]*')
+ query = p.sub(self.remap, query)
+
+ storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
+ credentials = storage.get()
+
+ if credentials is None or credentials.invalid:
+ credentials = run(FLOW, storage)
+
+ http = httplib2.Http()
+ http = credentials.authorize(http)
+
+ service = build('bigquery', 'v2', http=http)
+
+ body = {"query": query}
+ response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
+
+ return response
+
+ def translate_schema(self, response):
+ for field in response["schema"]["fields"]:
+ field["name"] = self.reverse_mapping.get(field["name"], field["name"])
+
+ def run_query(self, query):
+ response = self.run_query_raw(query)
+
+ fieldNames = []
+ for field in response["schema"]["fields"]:
+ fieldNames.append(field["name"])
+
+ result = []
+ if "rows" in response:
+ for row in response["rows"]:
+ this_result = {}
+ for (i,column) in enumerate(row["f"]):
+ this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
+ result.append(this_result)
+
+ return result
+
+ def remap(self, match):
+ token = match.group()[1:]
+ if token in self.mapping:
+ return self.mapping[token]
+ else:
+ raise Exception('unknown token %s' % token)
+
+ def dump_table(self, rows, keys=None):
+ if not keys:
+ keys = rows[0].keys()
+
+ lens = {}
+ for key in keys:
+ lens[key] = len(key)
+
+ for row in rows:
+ for key in keys:
+ thislen = len(str(row.get(key,"")))
+ lens[key] = max(lens.get(key,0), thislen)
+
+ for key in keys:
+ print "%*s" % (lens[key], key),
+ print
+
+ for row in rows:
+ for key in keys:
+ print "%*s" % (lens[key], str(row.get(key,""))),
+ print
+
+def main():
+ bq = BigQueryAnalytics()
+
+ rows = bq.run_query("select %hostname,SUM(%bytes_sent) from [vicci.demoevents] group by %hostname")
+
+ bq.dump_table(rows)
+
+if __name__ == "__main__":
+ main()
diff --git a/planetstack/hpc_wizard/hpc_wizard.py b/planetstack/hpc_wizard/hpc_wizard.py
new file mode 100644
index 0000000..d619f64
--- /dev/null
+++ b/planetstack/hpc_wizard/hpc_wizard.py
@@ -0,0 +1,340 @@
+import datetime
+import os
+import operator
+import socket
+import pytz
+import json
+import random
+import sys
+import time
+
+if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"):
+ sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack")
+else:
+ sys.path.append("/opt/planetstack")
+
+os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings")
+from django import db
+from django.db import connection
+from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service
+from hpc.models import HpcService, ServiceProvider, ContentProvider, OriginServer, CDNPrefix, HpcService
+
+# amount of time in milliseconds which will be queried for HPC statistics.
+QUERY_TIME=150000
+
+# Constants used for computing 'hotness'
+# BLUE_LOAD = MB/s which should be a "0" on the hotness scale
+# RED_LOAD = MB/s which should be a "1" on the hotness scale
+BLUE_LOAD=5000000
+RED_LOAD=15000000
+
+MAX_LOAD=RED_LOAD
+
+def log(what, showdate=True):
+ try:
+ if showdate:
+ file("/tmp/scott-hpcwizard.log", "a").write(time.strftime("%Y-%m-%d %H:%M:%S ", time.gmtime()))
+ file("/tmp/scott-hpcwizard.log", "a").write("%s\n" % what)
+ except:
+ pass # uh oh
+
+def log_exc(what):
+ log(what)
+ log(traceback.format_exc(), showdate=False)
+
+def avg(x):
+ return float(sum(x))/len(x)
+
+def format_float(x):
+ try:
+ return "%10.5f" % x
+ except:
+ return str(x)
+
+class HpcWizard:
+ def __init__(self):
+ try:
+ self.hpcService = HpcService.objects.get()
+ except:
+ # OpenCloud.us currently has a Service object instantiated instead
+ # of a HpcService. Fallback for now.
+ self.hpcService = Service.objects.get(name="HPC Service")
+
+ self.hpcQueryThread = None
+
+ def get_hpc_slices(self):
+ try:
+ slices = self.hpcService.slices.all()
+ except:
+ # BUG in data model -- Slice.service has related name 'service' and
+ # it should be 'slices'
+ slices = self.hpcService.service.all()
+ return slices
+
+ def get_hpc_slivers(self):
+ slivers = []
+ for slice in self.get_hpc_slices():
+ for sliver in slice.slivers.all():
+ slivers.append(sliver)
+ return slivers
+
+ def fill_site_nodes(self, site, hpc_slivers=None):
+ if hpc_slivers is None:
+ hpc_slivers = self.get_hpc_slivers()
+
+ site.availNodes = []
+ site.hpcNodes = []
+ for node in site.nodes.all():
+ has_hpc = False
+
+ for sliver in node.slivers.all():
+ if sliver in hpc_slivers:
+ has_hpc = True
+
+ if has_hpc:
+ site.hpcNodes.append(node)
+ else:
+ site.availNodes.append(node)
+
+ def merge_site_statistics_old(self, sites):
+ """ this does it based on the sumb of all bandwidth
+
+ The issue here is that we the computed load reacts immediately to
+ the addition or deletion of nodes. i.e. 5 nodes at 80% + 1 node at
+ 0% = average load 66%.
+ """
+ site_dict = {}
+ for site in self.hpcQueryThread.site_rows:
+ site_dict[site["site"]] = site
+
+ for site in sites:
+ if site.name in site_dict:
+ site.bytes_sent = site_dict[site.name]["sum_bytes_sent"]
+ time_delta = site_dict[site.name]["time_delta"]
+ computed_duration = (int(time_delta/30)+1)*30
+ if (computed_duration > 0):
+ site.bandwidth = site.bytes_sent/computed_duration
+ if len(site.hpcNodes)>0:
+ # figure out how many bytes_sent would be represented
+ # by blue and red
+ blue_load = len(site.hpcNodes) * BLUE_LOAD * computed_duration
+ red_load = len(site.hpcNodes) * RED_LOAD * computed_duration
+ max_load = len(site.hpcNodes) * MAX_LOAD * computed_duration
+
+ site.hotness = (min(red_load, max(blue_load, float(site.bytes_sent))) - blue_load)/(red_load-blue_load)
+ site.load = int(min(100, site.bytes_sent*100/max_load))
+
+ file("/tmp/scott2.txt","a").write("%s %d %0.2f %0.2f %0.2f %0.2f %d\n" % (site.name, site.bytes_sent, blue_load, red_load, site.hotness, time_delta, computed_duration))
+
+ def merge_site_statistics(self, sites):
+ """ This does it based on max load
+
+ Advantage of this method is that since we're effectively reporting
+ the maximally loaded node, we don't get instantaneous reactions
+ to adding additional nodes. On the contrary, it will take a while
+ for the load to balance from the loaded node to the new less-loaded
+ node.
+ """
+ site_dict = {}
+ for site in self.hpcQueryThread.site_rows:
+ site_dict[site["site"]] = site
+
+ for site in sites:
+ if site.name in site_dict:
+ site.max_avg_bandwidth = site_dict[site.name]["max_avg_bandwidth"]
+ site.bytes_sent = site_dict[site.name]["sum_bytes_sent"]
+
+ site.hotness = min(1.0, float(max(BLUE_LOAD, site.max_avg_bandwidth) - BLUE_LOAD) / RED_LOAD)
+ site.load = int(site.max_avg_bandwidth*100/MAX_LOAD)
+
+ # we still need site["bandwidth"] for the summary statistics
+ time_delta = site_dict[site.name]["time_delta"]
+ computed_duration = (int(time_delta/30)+1)*30
+ if (computed_duration > 0):
+ site.bandwidth = site.bytes_sent/computed_duration
+ else:
+ site.bandwidth = 0
+
+ if len(site.hpcNodes)>0:
+ file("/tmp/scott3.txt","a").write("%s %d %0.2f %d %0.2f\n" % (site.name, site.bytes_sent, site.hotness, site.load, site.bandwidth))
+
+ def get_sites(self):
+ sites = list(Site.objects.all())
+
+ for site in sites:
+ self.fill_site_nodes(site, self.get_hpc_slivers())
+ site.load = 0
+ site.hotness = 0
+ site.bandwidth = 0
+ site.numNodes = len(site.hpcNodes) + len(site.availNodes)
+
+ if (self.hpcQueryThread is not None) and (self.hpcQueryThread.is_stalled()):
+ self.initialize_statistics()
+
+ # merge in the statistics data if it is available
+ if self.hpcQueryThread and self.hpcQueryThread.data_version>0:
+ self.merge_site_statistics(sites)
+
+ # django will leak extraordinary amounts of memory without this line
+ db.reset_queries()
+
+ return sites
+
+ def get_nodes_to_sites(self):
+ nodes_to_sites = {}
+
+ sites = list(Site.objects.all())
+
+ for site in sites:
+ for node in site.nodes.all():
+ nodes_to_sites[node.name] = site.name
+
+ return nodes_to_sites
+
+ def get_slice_sites(self, slice_name):
+ sites = list(Site.objects.all())
+ slivers = list(Slice.objects.get(name=slice_name).slivers.all())
+ for site in sites:
+ self.fill_site_nodes(site, slivers)
+ return sites
+
+ def get_sites_for_view(self):
+ sites = {}
+ for site in self.get_sites():
+ if site.name in ["ON.Lab", "I2 Atlanta"]:
+ continue
+
+ d = {"lat": float(site.location.latitude),
+ "long": float(site.location.longitude),
+ "health": 0,
+ "numNodes": site.numNodes,
+ "numHPCSlivers": len(site.hpcNodes),
+ "siteUrl": str(site.site_url),
+ "hot": getattr(site,"hotness",0.0),
+ "load": getattr(site,"load",0)}
+ sites[str(site.name)] = d
+
+ import pprint
+ f = file("/tmp/scott.txt","w")
+ pprint.pprint(sites, f)
+ f.close()
+
+ return sites
+
+ def get_summary_for_view(self):
+ total_slivers = 0
+ total_bandwidth = 0
+ average_cpu = 0
+
+ sites = [site for site in self.get_sites() if len(site.hpcNodes)>0]
+
+ total_slivers = sum( [len(site.hpcNodes) for site in sites] )
+ total_bandwidth = sum( [site.bandwidth for site in sites] )
+ average_cpu = int(avg( [site.load for site in sites] ))
+
+ return {"total_slivers": total_slivers,
+ "total_bandwidth": total_bandwidth,
+ "average_cpu": average_cpu}
+
+ def initialize_statistics(self):
+ from query import HpcQueryThread
+
+ if (self.hpcQueryThread is not None):
+ log("dropping old query thread")
+ self.hpcQueryThread.please_die = True
+ self.hpcQueryThread = None
+
+ log("launching new query thread")
+
+ nodes_to_sites = self.get_nodes_to_sites()
+ self.hpcQueryThread = HpcQueryThread(nodes_to_sites = nodes_to_sites, timeStart=-QUERY_TIME, slice="HyperCache")
+
+ def get_site(self, site_name):
+ site = Site.objects.get(name=site_name)
+ self.fill_site_nodes(site)
+ return site
+
+ def increase_slivers(self, site_name, count):
+ site = self.get_site(site_name)
+ hpc_slice = self.get_hpc_slices()[0]
+ while (len(site.availNodes) > 0) and (count > 0):
+ node = site.availNodes.pop()
+ hostname = node.name
+ sliver = Sliver(name=node.name,
+ slice=hpc_slice,
+ node=node,
+ image = Image.objects.all()[0],
+ creator = User.objects.get(email="scott@onlab.us"),
+ deploymentNetwork=node.deployment,
+ numberCores = 1,
+ ip=socket.gethostbyname(hostname))
+ sliver.save()
+
+ print "created sliver", sliver
+
+ site.hpcNodes.append(node)
+
+ count = count - 1
+
+ def decrease_slivers(self, site_name, count):
+ site = self.get_site(site_name)
+ hpc_slices = self.get_hpc_slices()
+ while (len(site.hpcNodes) > 0) and (count > 0):
+ node = site.hpcNodes.pop()
+ for sliver in node.slivers.all():
+ if sliver.slice in hpc_slices:
+ print "deleting sliver", sliver
+ sliver.delete()
+
+ site.availNodes.append(node)
+ count = count - 1
+
+ def dump(self):
+ print "slices:"
+ for slice in self.get_hpc_slices():
+ print " ", slice
+
+ print "sites:"
+ print "%20s %10s %10s %10s %10s %10s %10s" % ("name", "avail", "hpc", "lat", "long", "sent", "hot")
+ for site in self.get_sites():
+ print "%20s %10d %10d %10s %10s %10d %10.2f" % (site.name,
+ len(site.availNodes),
+ len(site.hpcNodes),
+ format_float(site.location.latitude),
+ format_float(site.location.longitude),
+ getattr(site,"bytes_sent",0),
+ getattr(site,"hotness",0.5))
+
+ #print "slivers:"
+ #for sliver in self.get_hpc_slivers():
+ # print " ", sliver
+
+glo_hpc_wizard = None
+
+def get_hpc_wizard():
+ global glo_hpc_wizard
+
+ if (glo_hpc_wizard is None):
+ glo_hpc_wizard = HpcWizard()
+ glo_hpc_wizard.initialize_statistics()
+
+ return glo_hpc_wizard
+
+def main():
+ x = HpcWizard()
+
+ # initialized the Statistics thread, and wait for some data to show up
+ x.initialize_statistics()
+ while x.hpcQueryThread.data_version==0:
+ time.sleep(1)
+
+ x.dump()
+
+ # quick test of the increase / decrease functions
+
+ x.increase_slivers("Princeton", 1)
+ x.decrease_slivers("Princeton", 1)
+
+if __name__=="__main__":
+ main()
+
diff --git a/planetstack/hpc_wizard/planetstack_analytics.py b/planetstack/hpc_wizard/planetstack_analytics.py
new file mode 100644
index 0000000..cc34933
--- /dev/null
+++ b/planetstack/hpc_wizard/planetstack_analytics.py
@@ -0,0 +1,202 @@
+from bigquery_analytics import BigQueryAnalytics
+import json
+
+class PlanetStackAnalytics(BigQueryAnalytics):
+ def __init__(self, tableName="demoevents"):
+ BigQueryAnalytics.__init__(self, tableName)
+
+ def compose_query(self, slice=None, site=None, node=None, timeField="MinuteTime", avg=[], sum=[], count=[], computed=[], groupBy=["MinuteTime"], orderBy=["MinuteTime"], tableName="demoevents"):
+ tablePart = "%s.%s@-3600000--1" % ("vicci", tableName)
+
+ fields = []
+ fieldNames = []
+
+ if (timeField=="MinuteTime"):
+ fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60)*60 as MinuteTime")
+ elif (timeField=="HourTime"):
+ fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60)*60*60 as HourTime")
+ elif (timeField=="DayTime"):
+ fields.append("INTEGER(TIMESTAMP_TO_SEC(time)/60/60/24)*60*60*24 as DayTime")
+
+ for fieldName in avg:
+ fields.append("AVG(%s) as avg_%s" % (fieldName, fieldName.replace("%","")))
+ fieldNames.append("avg_%s" % fieldName.replace("%",""))
+
+ for fieldName in sum:
+ fields.append("SUM(%s) as sum_%s" % (fieldName, fieldName.replace("%","")))
+ fieldNames.append("sum_%s" % fieldName.replace("%",""))
+
+ for fieldName in count:
+ fields.append("COUNT(distinct %s) as count_%s" % (fieldName, fieldName.replace("%","")))
+ fieldNames.append("count_%s" % fieldName.replace("%",""))
+
+ for fieldName in computed:
+ operator = "/"
+ parts = fieldName.split("/")
+ computedFieldName = "computed_" + parts[0].replace("%","")+"_div_"+parts[1].replace("%","")
+ if len(parts)==1:
+ operator = "*"
+ parts = computed.split("*")
+ computedFieldName = "computed_" + parts[0].replace("%","")+"_mult_"+parts[1].replace("%","")
+ fields.append("SUM(%s)%sSUM(%s) as %s" % (parts[0], operator, parts[1], computedFieldName))
+ fieldNames.append(computedFieldName)
+
+ fields = ", ".join(fields)
+
+ where = []
+
+ if slice:
+ where.append("%%slice='%s'" % slice)
+ if site:
+ where.append("%%site='%s'" % site)
+ if node:
+ where.append("%%hostname='%s'" % node)
+
+ if where:
+ where = " WHERE " + " AND ".join(where)
+ else:
+ where =""
+
+ if groupBy:
+ groupBy = " GROUP BY " + ",".join(groupBy)
+ else:
+ groupBy = ""
+
+ if orderBy:
+ orderBy = " ORDER BY " + ",".join(orderBy)
+ else:
+ orderBy = ""
+
+ if computed:
+ subQuery = "SELECT %%hostname, %s FROM [%s]" % (fields, tablePart)
+ if where:
+ subQuery = subQuery + where
+ subQuery = subQuery + " GROUP BY %s,%%hostname" % timeField
+
+ sumFields = []
+ for fieldName in fieldNames:
+ if fieldName.startswith("avg"):
+ sumFields.append("AVG(%s) as avg_%s"%(fieldName,fieldName))
+ else:
+ sumFields.append("SUM(%s) as sum_%s"%(fieldName,fieldName))
+
+ sumFields = ",".join(sumFields)
+
+ query = "SELECT %s, %s FROM (%s)" % (timeField, sumFields, subQuery)
+ if groupBy:
+ query = query + groupBy
+ if orderBy:
+ query = query + orderBy
+ else:
+ query = "SELECT %s FROM [%s]" % (fields, tablePart)
+ if where:
+ query = query + " " + where
+ if groupBy:
+ query = query + groupBy
+ if orderBy:
+ query = query + orderBy
+
+ return query
+
+ def get_list_from_req(self, req, name, default=[]):
+ value = req.GET.get(name, None)
+ if not value:
+ return default
+ return value.split(",")
+
+ def format_result(self, format, result, query):
+ if (format == "json_dicts"):
+ result = {"query": query, "rows": result}
+ return ("application/javascript", json.dumps(result))
+
+ elif (format == "json_arrays"):
+ new_result = []
+ for row in result:
+ new_row = []
+ for key in sorted(row.keys()):
+ new_row.append(row[key])
+ new_result.append(new_row)
+ new_result = {"query": query, "rows": new_result}
+ return ("application/javascript", json.dumps(new_result))
+
+ elif (format == "html_table"):
+ new_rows = []
+ for row in result:
+ new_row = []
+ for key in sorted(row.keys()):
+ new_row.append("<TD>%s</TD>" % str(row[key]))
+ new_rows.append("<TR>%s</TR>" % "".join(new_row))
+
+ new_result = "<TABLE>%s</TABLE>" % "\n".join(new_rows)
+
+ return ("text/html", new_result)
+
+ def process_request(self, req):
+ print req.GET
+
+ tqx = req.GET.get("reqId", None)
+
+ slice = req.GET.get("slice", None)
+ site = req.GET.get("site", None)
+ node = req.GET.get("node", None)
+
+ format = req.GET.get("format", "json_dicts")
+
+ timeField = req.GET.get("timeField", "MinuteTime")
+ avg = self.get_list_from_req(req, "avg")
+ sum = self.get_list_from_req(req, "sum")
+ count = self.get_list_from_req(req, "count")
+ computed = self.get_list_from_req(req, "computed")
+ groupBy = self.get_list_from_req(req, "groupBy", ["MinuteTime"])
+ orderBy = self.get_list_from_req(req, "orderBy", ["MinuteTime"])
+
+ maxRows = req.GET.get("maxRows", None)
+
+ q = self.compose_query(slice, site, node, timeField, avg, sum, count, computed, groupBy, orderBy)
+
+ print q
+
+ if (format=="raw"):
+ result = self.run_query_raw(q)
+ result["reqId"] = 0 # XXX FIXME
+ return ("application/javascript", json.dumps(result))
+ else:
+ result = self.run_query(q)
+
+ if maxRows:
+ result = result[-int(maxRows):]
+
+ return self.format_result(format, result, q)
+
+
+def DoPlanetStackAnalytics(request):
+ bq = PlanetStackAnalytics()
+ result = bq.process_request(request)
+
+ return result
+
+def main():
+ bq = PlanetStackAnalytics()
+
+ q=bq.compose_query(avg=["%cpu"], count=["%hostname"], slice="HyperCache")
+ print q
+ bq.dump_table(bq.run_query(q))
+
+ q=bq.compose_query(computed=["%bytes_sent/%elapsed"])
+ print
+ print q
+ bq.dump_table(bq.run_query(q))
+ #print bq.run_query_raw(q)
+
+ q=bq.compose_query(timeField="HourTime", avg=["%cpu"], count=["%hostname"], computed=["%bytes_sent/%elapsed"], groupBy=["HourTime"], orderBy=["HourTime"])
+ print
+ print q
+ bq.dump_table(bq.run_query(q))
+
+if __name__ == "__main__":
+ main()
+
+
+
+
+
diff --git a/planetstack/hpc_wizard/query.py b/planetstack/hpc_wizard/query.py
new file mode 100644
index 0000000..3570a56
--- /dev/null
+++ b/planetstack/hpc_wizard/query.py
@@ -0,0 +1,271 @@
+import re
+import base64
+import requests
+import urllib
+import json
+import httplib2
+import threading
+import os
+import time
+import traceback
+
+from apiclient.discovery import build
+from apiclient.errors import HttpError
+from oauth2client.client import AccessTokenRefreshError
+from oauth2client.client import OAuth2WebServerFlow
+from oauth2client.client import flow_from_clientsecrets
+from oauth2client.file import Storage
+from oauth2client.tools import run_flow,run
+
+"""
+yum -y install python-httplib2
+easy_install python_gflags
+easy_install google_api_python_client
+"""
+
+
+PROJECT_NUMBER = '549187599759'
+
+FLOW = flow_from_clientsecrets('/opt/planetstack/hpc_wizard/client_secrets.json',
+ scope='https://www.googleapis.com/auth/bigquery')
+
+MINUTE_MS = 60*1000
+HOUR_MS = 60*60*1000
+
+class HpcQuery:
+ def __init__(self):
+ self.mapping = json.loads(self.fetch_mapping(table="demoevents"))
+ self.reverse_mapping = {v:k for k, v in self.mapping.items()}
+
+ def fetch_mapping(self, m=0, table="events"):
+ req = 'http://cloud-scrutiny.appspot.com/command?action=get_allocations&multiplexer=%d&table=%s'% (m,table)
+ resp = requests.get(req)
+ if (resp.status_code==200):
+ return resp.text
+ else:
+ raise Exception('Error accessing register allocations: %d'%resp.status_code)
+
+ def run_query_old(self, query):
+ req = 'http://cloud-scrutiny.appspot.com/command?action=send_query&q=%s' % urllib.quote(query)
+ resp = requests.get(req)
+ if (resp.status_code==200):
+ return resp.text
+ else:
+ raise Exception('Error running query: %d'%resp.status_code)
+ return resp
+
+ def run_query(self, query):
+ storage = Storage('/opt/planetstack/hpc_wizard/bigquery_credentials.dat')
+ credentials = storage.get()
+
+ if credentials is None or credentials.invalid:
+ credentials = run(FLOW, storage)
+
+ http = httplib2.Http()
+ http = credentials.authorize(http)
+
+ service = build('bigquery', 'v2', http=http)
+
+ body = {"query": query}
+ response = service.jobs().query(projectId=PROJECT_NUMBER, body=body).execute()
+
+ fieldNames = []
+ for field in response["schema"]["fields"]:
+ fieldNames.append(field["name"])
+
+ result = []
+ if "rows" in response:
+ for row in response["rows"]:
+ this_result = {}
+ for (i,column) in enumerate(row["f"]):
+ this_result[self.reverse_mapping.get(fieldNames[i],fieldNames[i])] = column["v"]
+ result.append(this_result)
+
+ return result
+
+ def remap(self, match):
+ token = match.group()[1:]
+ if token in self.mapping:
+ return self.mapping[token]
+ else:
+ raise Exception('unknown token %s' % token)
+
+ def get_usage(self, cp=None, hostname=None, site=None, slice=None, timeStart=-HOUR_MS, timeStop=-1, groupBy=["%hostname", "%cp"]):
+ where = []
+ if slice is not None:
+ where.append("%slice='" + slice + "'")
+ if cp is not None:
+ where.append("%cp='" + cp + "'")
+ if hostname is not None:
+ where.append("%hostname='" + hostname + "'")
+ if site is not None:
+ where.append("%hostname contains " + site)
+ where.append("%bytes_sent>0")
+ where = "WHERE " + " AND ".join(where)
+
+ if timeStart is not None:
+ tableName = "[vicci.demoevents@%d-%d]" % (timeStart,timeStop)
+ else:
+ tableName = "[vicci.demoevents]"
+
+ query = "SELECT %hostname,%cp,sum(%bytes_sent) as sum_bytes_sent,sum(%bytes_hit) as sum_bytes_hit, AVG(%bandwidth) as avg_bandwidth," + \
+ " MAX(TIMESTAMP_TO_MSEC(time))-MIN(TIMESTAMP_TO_MSEC(time)) as time_delta FROM " + \
+ tableName + " " + where
+
+ if groupBy:
+ query = query + " GROUP BY " + ",".join(groupBy)
+
+ p = re.compile('%[a-zA-z_]*')
+ query = p.sub(self.remap, query)
+
+ rows = self.run_query(query)
+
+ for row in rows:
+ row["sum_bytes_sent"] = int(row.get("sum_bytes_sent",0))
+ row["sum_bytes_hit"] = int(row.get("sum_bytes_hit",0))
+ row["avg_bandwidth"] = int(float(row.get("avg_bandwidth",0)))
+ row["time_delta"] = float(row.get("time_delta",0.0))/1000.0
+
+ elapsed = (timeStop-timeStart)/1000
+ KBps = int(row.get("sum_bytes_sent",0)) / elapsed / 1024
+ row["KBps"] = KBps
+
+ return rows
+
+ def sites_from_usage(self, rows, nodes_to_sites={}):
+ sites = {}
+ for row in rows:
+ hostname = row["hostname"]
+
+ if hostname in nodes_to_sites:
+ site_name = nodes_to_sites[hostname]
+ else:
+ parts = hostname.split(".")
+ if len(parts)<=2:
+ continue
+ site_name = parts[1]
+
+ if not (site_name in sites):
+ row = row.copy()
+ row["site"] = site_name
+ row["max_avg_bandwidth"] = row["avg_bandwidth"]
+ # sites table doesn't care about hostnames or avg_bandwidth
+ del row["hostname"]
+ del row["avg_bandwidth"]
+ sites[site_name] = row
+ else:
+ site_row = sites[site_name]
+ site_row["sum_bytes_sent"] = site_row["sum_bytes_sent"] + row["sum_bytes_sent"]
+ site_row["sum_bytes_hit"] = site_row["sum_bytes_hit"] + row["sum_bytes_hit"]
+ site_row["max_avg_bandwidth"] = max(site_row["max_avg_bandwidth"], row["avg_bandwidth"])
+ site_row["time_delta"] = max(site_row["time_delta"], row["time_delta"])
+
+ return sites.values()
+
+ def get_usage_sites(self, cp=None, slice=None, timeStart=-HOUR_MS, timeStop=-1):
+ rows = self.get_usage(cp=cp, slice=slice, timeStart=timeStart, timeStop=timeStop)
+
+ return self.sites_from_usage(rows)
+
+ def dump_table(self, rows, keys=None):
+ if not keys:
+ keys = rows[0].keys()
+
+ lens = {}
+ for key in keys:
+ lens[key] = len(key)
+
+ for row in rows:
+ for key in keys:
+ thislen = len(str(row.get(key,"")))
+ lens[key] = max(lens.get(key,0), thislen)
+
+ for key in keys:
+ print "%*s" % (lens[key], key),
+ print
+
+ for row in rows:
+ for key in keys:
+ print "%*s" % (lens[key], str(row.get(key,""))),
+ print
+
+class HpcQueryThread(HpcQuery, threading.Thread):
+ def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None, nodes_to_sites={}):
+ threading.Thread.__init__(self)
+ HpcQuery.__init__(self)
+ self.daemon = True
+ self.interval = interval
+ self.timeStart = timeStart
+ self.nodes_to_sites = nodes_to_sites
+ self.slice = slice
+ self.cp = cp
+ self.data_version = 0
+ self.please_die = False
+ self.update_time = time.time()
+ self.start()
+
+ def is_stalled(self):
+ if time.time()-self.update_time > 300:
+ return True
+ else:
+ return False
+
+ def run(self):
+ while not self.please_die:
+ try:
+ self.rows = self.get_usage(timeStart=self.timeStart, cp=self.cp, slice=self.slice)
+ self.site_rows = self.sites_from_usage(self.rows, self.nodes_to_sites)
+ self.update_time = time.time()
+ self.new_data()
+ self.data_version += 1
+ except:
+ file("/tmp/hpcquery_fail.txt","a").write(traceback.format_exc() + "\n")
+ time.sleep(self.interval)
+
+ def new_data(self):
+ pass
+
+class HpcDumpThread(HpcQueryThread):
+ def __init__(self, interval=30, slice=None, timeStart=-HOUR_MS, cp=None):
+ HpcQueryThread.__init__(self, interval, slice, timeStart, cp)
+
+ def new_data(self):
+ os.system("clear")
+
+ print "update %d, data for last %d minutes" % (self.data_version, -self.timeStart/1000/60)
+ print
+
+ self.dump_table(self.rows, ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+ self.dump_table(self.site_rows, ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+
+
+def main_old():
+ hq = HpcQuery()
+# print hq.mapping
+
+ print "5 minute"
+ hq.dump_table(hq.get_usage(timeStart=-MINUTE_MS*5), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+ hq.dump_table(hq.get_usage_sites(timeStart=-MINUTE_MS*5), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+
+ print "1 hour"
+ hq.dump_table(hq.get_usage(), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+ hq.dump_table(hq.get_usage_sites(), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+
+ print "24 hours"
+ hq.dump_table(hq.get_usage(timeStart=-HOUR_MS*24), ["hostname", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ hq.dump_table(hq.get_usage_sites(timeStart=-HOUR_MS*24), ["site", "cp", "sum_bytes_sent", "sum_bytes_hit", "KBps"])
+ print
+
+def main():
+ hd = HpcDumpThread()
+ while True:
+ time.sleep(30)
+
+if __name__ == "__main__":
+ main()