Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 1 | import datetime |
| 2 | import os |
| 3 | import operator |
| 4 | import socket |
| 5 | import pytz |
| 6 | import json |
| 7 | import random |
| 8 | import sys |
| 9 | import time |
| 10 | |
| 11 | if os.path.exists("/home/smbaker/projects/vicci/plstackapi/planetstack"): |
| 12 | sys.path.append("/home/smbaker/projects/vicci/plstackapi/planetstack") |
| 13 | else: |
| 14 | sys.path.append("/opt/planetstack") |
| 15 | |
| 16 | os.environ.setdefault("DJANGO_SETTINGS_MODULE", "planetstack.settings") |
| 17 | from django import db |
| 18 | from django.db import connection |
| 19 | from core.models import Slice, Sliver, ServiceClass, Reservation, Tag, Network, User, Node, Image, Deployment, Site, NetworkTemplate, NetworkSlice, Service |
| 20 | from hpc.models import HpcService, ServiceProvider, ContentProvider, OriginServer, CDNPrefix, HpcService |
| 21 | |
| 22 | # amount of time in milliseconds which will be queried for HPC statistics. |
| 23 | QUERY_TIME=150000 |
| 24 | |
| 25 | # Constants used for computing 'hotness' |
| 26 | # BLUE_LOAD = MB/s which should be a "0" on the hotness scale |
| 27 | # RED_LOAD = MB/s which should be a "1" on the hotness scale |
| 28 | BLUE_LOAD=5000000 |
| 29 | RED_LOAD=15000000 |
| 30 | |
| 31 | MAX_LOAD=RED_LOAD |
| 32 | |
| 33 | def log(what, showdate=True): |
| 34 | try: |
| 35 | if showdate: |
| 36 | file("/tmp/scott-hpcwizard.log", "a").write(time.strftime("%Y-%m-%d %H:%M:%S ", time.gmtime())) |
| 37 | file("/tmp/scott-hpcwizard.log", "a").write("%s\n" % what) |
| 38 | except: |
| 39 | pass # uh oh |
| 40 | |
| 41 | def log_exc(what): |
| 42 | log(what) |
| 43 | log(traceback.format_exc(), showdate=False) |
| 44 | |
| 45 | def avg(x): |
| 46 | return float(sum(x))/len(x) |
| 47 | |
| 48 | def format_float(x): |
| 49 | try: |
| 50 | return "%10.5f" % x |
| 51 | except: |
| 52 | return str(x) |
| 53 | |
| 54 | class HpcWizard: |
| 55 | def __init__(self): |
| 56 | try: |
| 57 | self.hpcService = HpcService.objects.get() |
| 58 | except: |
| 59 | # OpenCloud.us currently has a Service object instantiated instead |
| 60 | # of a HpcService. Fallback for now. |
| 61 | self.hpcService = Service.objects.get(name="HPC Service") |
| 62 | |
| 63 | self.hpcQueryThread = None |
| 64 | |
| 65 | def get_hpc_slices(self): |
| 66 | try: |
| 67 | slices = self.hpcService.slices.all() |
| 68 | except: |
| 69 | # BUG in data model -- Slice.service has related name 'service' and |
| 70 | # it should be 'slices' |
| 71 | slices = self.hpcService.service.all() |
| 72 | return slices |
| 73 | |
| 74 | def get_hpc_slivers(self): |
| 75 | slivers = [] |
| 76 | for slice in self.get_hpc_slices(): |
| 77 | for sliver in slice.slivers.all(): |
| 78 | slivers.append(sliver) |
| 79 | return slivers |
| 80 | |
| 81 | def fill_site_nodes(self, site, hpc_slivers=None): |
| 82 | if hpc_slivers is None: |
| 83 | hpc_slivers = self.get_hpc_slivers() |
| 84 | |
| 85 | site.availNodes = [] |
| 86 | site.hpcNodes = [] |
| 87 | for node in site.nodes.all(): |
| 88 | has_hpc = False |
| 89 | |
| 90 | for sliver in node.slivers.all(): |
| 91 | if sliver in hpc_slivers: |
| 92 | has_hpc = True |
| 93 | |
| 94 | if has_hpc: |
| 95 | site.hpcNodes.append(node) |
| 96 | else: |
| 97 | site.availNodes.append(node) |
| 98 | |
Scott Baker | eec9e0a | 2014-03-20 17:15:14 -0700 | [diff] [blame] | 99 | def merge_site_statistics(self, sites): |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 100 | """ this does it based on the sumb of all bandwidth |
| 101 | |
| 102 | The issue here is that we the computed load reacts immediately to |
| 103 | the addition or deletion of nodes. i.e. 5 nodes at 80% + 1 node at |
| 104 | 0% = average load 66%. |
| 105 | """ |
| 106 | site_dict = {} |
| 107 | for site in self.hpcQueryThread.site_rows: |
| 108 | site_dict[site["site"]] = site |
| 109 | |
| 110 | for site in sites: |
| 111 | if site.name in site_dict: |
| 112 | site.bytes_sent = site_dict[site.name]["sum_bytes_sent"] |
| 113 | time_delta = site_dict[site.name]["time_delta"] |
| 114 | computed_duration = (int(time_delta/30)+1)*30 |
| 115 | if (computed_duration > 0): |
| 116 | site.bandwidth = site.bytes_sent/computed_duration |
| 117 | if len(site.hpcNodes)>0: |
| 118 | # figure out how many bytes_sent would be represented |
| 119 | # by blue and red |
| 120 | blue_load = len(site.hpcNodes) * BLUE_LOAD * computed_duration |
| 121 | red_load = len(site.hpcNodes) * RED_LOAD * computed_duration |
| 122 | max_load = len(site.hpcNodes) * MAX_LOAD * computed_duration |
| 123 | |
| 124 | site.hotness = (min(red_load, max(blue_load, float(site.bytes_sent))) - blue_load)/(red_load-blue_load) |
| 125 | site.load = int(min(100, site.bytes_sent*100/max_load)) |
| 126 | |
| 127 | file("/tmp/scott2.txt","a").write("%s %d %0.2f %0.2f %0.2f %0.2f %d\n" % (site.name, site.bytes_sent, blue_load, red_load, site.hotness, time_delta, computed_duration)) |
| 128 | |
Scott Baker | eec9e0a | 2014-03-20 17:15:14 -0700 | [diff] [blame] | 129 | def merge_site_statistics_new(self, sites): |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 130 | """ This does it based on max load |
| 131 | |
| 132 | Advantage of this method is that since we're effectively reporting |
| 133 | the maximally loaded node, we don't get instantaneous reactions |
| 134 | to adding additional nodes. On the contrary, it will take a while |
| 135 | for the load to balance from the loaded node to the new less-loaded |
| 136 | node. |
| 137 | """ |
| 138 | site_dict = {} |
| 139 | for site in self.hpcQueryThread.site_rows: |
| 140 | site_dict[site["site"]] = site |
| 141 | |
| 142 | for site in sites: |
| 143 | if site.name in site_dict: |
| 144 | site.max_avg_bandwidth = site_dict[site.name]["max_avg_bandwidth"] |
| 145 | site.bytes_sent = site_dict[site.name]["sum_bytes_sent"] |
| 146 | |
Scott Baker | eec9e0a | 2014-03-20 17:15:14 -0700 | [diff] [blame] | 147 | site.hotness = min(1.0, float(max(BLUE_LOAD, site.max_avg_bandwidth) - BLUE_LOAD) / (RED_LOAD-BLUE_LOAD)) |
Scott Baker | 43adf1b | 2014-03-19 21:54:55 -0700 | [diff] [blame] | 148 | site.load = int(site.max_avg_bandwidth*100/MAX_LOAD) |
| 149 | |
| 150 | # we still need site["bandwidth"] for the summary statistics |
| 151 | time_delta = site_dict[site.name]["time_delta"] |
| 152 | computed_duration = (int(time_delta/30)+1)*30 |
| 153 | if (computed_duration > 0): |
| 154 | site.bandwidth = site.bytes_sent/computed_duration |
| 155 | else: |
| 156 | site.bandwidth = 0 |
| 157 | |
| 158 | if len(site.hpcNodes)>0: |
| 159 | file("/tmp/scott3.txt","a").write("%s %d %0.2f %d %0.2f\n" % (site.name, site.bytes_sent, site.hotness, site.load, site.bandwidth)) |
| 160 | |
| 161 | def get_sites(self): |
| 162 | sites = list(Site.objects.all()) |
| 163 | |
| 164 | for site in sites: |
| 165 | self.fill_site_nodes(site, self.get_hpc_slivers()) |
| 166 | site.load = 0 |
| 167 | site.hotness = 0 |
| 168 | site.bandwidth = 0 |
| 169 | site.numNodes = len(site.hpcNodes) + len(site.availNodes) |
| 170 | |
| 171 | if (self.hpcQueryThread is not None) and (self.hpcQueryThread.is_stalled()): |
| 172 | self.initialize_statistics() |
| 173 | |
| 174 | # merge in the statistics data if it is available |
| 175 | if self.hpcQueryThread and self.hpcQueryThread.data_version>0: |
| 176 | self.merge_site_statistics(sites) |
| 177 | |
| 178 | # django will leak extraordinary amounts of memory without this line |
| 179 | db.reset_queries() |
| 180 | |
| 181 | return sites |
| 182 | |
| 183 | def get_nodes_to_sites(self): |
| 184 | nodes_to_sites = {} |
| 185 | |
| 186 | sites = list(Site.objects.all()) |
| 187 | |
| 188 | for site in sites: |
| 189 | for node in site.nodes.all(): |
| 190 | nodes_to_sites[node.name] = site.name |
| 191 | |
| 192 | return nodes_to_sites |
| 193 | |
| 194 | def get_slice_sites(self, slice_name): |
| 195 | sites = list(Site.objects.all()) |
| 196 | slivers = list(Slice.objects.get(name=slice_name).slivers.all()) |
| 197 | for site in sites: |
| 198 | self.fill_site_nodes(site, slivers) |
| 199 | return sites |
| 200 | |
| 201 | def get_sites_for_view(self): |
| 202 | sites = {} |
| 203 | for site in self.get_sites(): |
| 204 | if site.name in ["ON.Lab", "I2 Atlanta"]: |
| 205 | continue |
| 206 | |
| 207 | d = {"lat": float(site.location.latitude), |
| 208 | "long": float(site.location.longitude), |
| 209 | "health": 0, |
| 210 | "numNodes": site.numNodes, |
| 211 | "numHPCSlivers": len(site.hpcNodes), |
| 212 | "siteUrl": str(site.site_url), |
| 213 | "hot": getattr(site,"hotness",0.0), |
| 214 | "load": getattr(site,"load",0)} |
| 215 | sites[str(site.name)] = d |
| 216 | |
| 217 | import pprint |
| 218 | f = file("/tmp/scott.txt","w") |
| 219 | pprint.pprint(sites, f) |
| 220 | f.close() |
| 221 | |
| 222 | return sites |
| 223 | |
| 224 | def get_summary_for_view(self): |
| 225 | total_slivers = 0 |
| 226 | total_bandwidth = 0 |
| 227 | average_cpu = 0 |
| 228 | |
| 229 | sites = [site for site in self.get_sites() if len(site.hpcNodes)>0] |
| 230 | |
| 231 | total_slivers = sum( [len(site.hpcNodes) for site in sites] ) |
| 232 | total_bandwidth = sum( [site.bandwidth for site in sites] ) |
| 233 | average_cpu = int(avg( [site.load for site in sites] )) |
| 234 | |
| 235 | return {"total_slivers": total_slivers, |
| 236 | "total_bandwidth": total_bandwidth, |
| 237 | "average_cpu": average_cpu} |
| 238 | |
| 239 | def initialize_statistics(self): |
| 240 | from query import HpcQueryThread |
| 241 | |
| 242 | if (self.hpcQueryThread is not None): |
| 243 | log("dropping old query thread") |
| 244 | self.hpcQueryThread.please_die = True |
| 245 | self.hpcQueryThread = None |
| 246 | |
| 247 | log("launching new query thread") |
| 248 | |
| 249 | nodes_to_sites = self.get_nodes_to_sites() |
| 250 | self.hpcQueryThread = HpcQueryThread(nodes_to_sites = nodes_to_sites, timeStart=-QUERY_TIME, slice="HyperCache") |
| 251 | |
| 252 | def get_site(self, site_name): |
| 253 | site = Site.objects.get(name=site_name) |
| 254 | self.fill_site_nodes(site) |
| 255 | return site |
| 256 | |
| 257 | def increase_slivers(self, site_name, count): |
| 258 | site = self.get_site(site_name) |
| 259 | hpc_slice = self.get_hpc_slices()[0] |
| 260 | while (len(site.availNodes) > 0) and (count > 0): |
| 261 | node = site.availNodes.pop() |
| 262 | hostname = node.name |
| 263 | sliver = Sliver(name=node.name, |
| 264 | slice=hpc_slice, |
| 265 | node=node, |
| 266 | image = Image.objects.all()[0], |
| 267 | creator = User.objects.get(email="scott@onlab.us"), |
| 268 | deploymentNetwork=node.deployment, |
| 269 | numberCores = 1, |
| 270 | ip=socket.gethostbyname(hostname)) |
| 271 | sliver.save() |
| 272 | |
| 273 | print "created sliver", sliver |
| 274 | |
| 275 | site.hpcNodes.append(node) |
| 276 | |
| 277 | count = count - 1 |
| 278 | |
| 279 | def decrease_slivers(self, site_name, count): |
| 280 | site = self.get_site(site_name) |
| 281 | hpc_slices = self.get_hpc_slices() |
| 282 | while (len(site.hpcNodes) > 0) and (count > 0): |
| 283 | node = site.hpcNodes.pop() |
| 284 | for sliver in node.slivers.all(): |
| 285 | if sliver.slice in hpc_slices: |
| 286 | print "deleting sliver", sliver |
| 287 | sliver.delete() |
| 288 | |
| 289 | site.availNodes.append(node) |
| 290 | count = count - 1 |
| 291 | |
| 292 | def dump(self): |
| 293 | print "slices:" |
| 294 | for slice in self.get_hpc_slices(): |
| 295 | print " ", slice |
| 296 | |
| 297 | print "sites:" |
| 298 | print "%20s %10s %10s %10s %10s %10s %10s" % ("name", "avail", "hpc", "lat", "long", "sent", "hot") |
| 299 | for site in self.get_sites(): |
| 300 | print "%20s %10d %10d %10s %10s %10d %10.2f" % (site.name, |
| 301 | len(site.availNodes), |
| 302 | len(site.hpcNodes), |
| 303 | format_float(site.location.latitude), |
| 304 | format_float(site.location.longitude), |
| 305 | getattr(site,"bytes_sent",0), |
| 306 | getattr(site,"hotness",0.5)) |
| 307 | |
| 308 | #print "slivers:" |
| 309 | #for sliver in self.get_hpc_slivers(): |
| 310 | # print " ", sliver |
| 311 | |
| 312 | glo_hpc_wizard = None |
| 313 | |
| 314 | def get_hpc_wizard(): |
| 315 | global glo_hpc_wizard |
| 316 | |
| 317 | if (glo_hpc_wizard is None): |
| 318 | glo_hpc_wizard = HpcWizard() |
| 319 | glo_hpc_wizard.initialize_statistics() |
| 320 | |
| 321 | return glo_hpc_wizard |
| 322 | |
| 323 | def main(): |
| 324 | x = HpcWizard() |
| 325 | |
| 326 | # initialized the Statistics thread, and wait for some data to show up |
| 327 | x.initialize_statistics() |
| 328 | while x.hpcQueryThread.data_version==0: |
| 329 | time.sleep(1) |
| 330 | |
| 331 | x.dump() |
| 332 | |
| 333 | # quick test of the increase / decrease functions |
| 334 | |
| 335 | x.increase_slivers("Princeton", 1) |
| 336 | x.decrease_slivers("Princeton", 1) |
| 337 | |
| 338 | if __name__=="__main__": |
| 339 | main() |
| 340 | |