Don Newton | 98fd881 | 2019-09-23 15:15:02 -0400 | [diff] [blame] | 1 | // Copyright 2015 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | // Package timeseries implements a time series structure for stats collection. |
| 6 | package timeseries // import "golang.org/x/net/internal/timeseries" |
| 7 | |
| 8 | import ( |
| 9 | "fmt" |
| 10 | "log" |
| 11 | "time" |
| 12 | ) |
| 13 | |
| 14 | const ( |
| 15 | timeSeriesNumBuckets = 64 |
| 16 | minuteHourSeriesNumBuckets = 60 |
| 17 | ) |
| 18 | |
| 19 | var timeSeriesResolutions = []time.Duration{ |
| 20 | 1 * time.Second, |
| 21 | 10 * time.Second, |
| 22 | 1 * time.Minute, |
| 23 | 10 * time.Minute, |
| 24 | 1 * time.Hour, |
| 25 | 6 * time.Hour, |
| 26 | 24 * time.Hour, // 1 day |
| 27 | 7 * 24 * time.Hour, // 1 week |
| 28 | 4 * 7 * 24 * time.Hour, // 4 weeks |
| 29 | 16 * 7 * 24 * time.Hour, // 16 weeks |
| 30 | } |
| 31 | |
| 32 | var minuteHourSeriesResolutions = []time.Duration{ |
| 33 | 1 * time.Second, |
| 34 | 1 * time.Minute, |
| 35 | } |
| 36 | |
| 37 | // An Observable is a kind of data that can be aggregated in a time series. |
| 38 | type Observable interface { |
| 39 | Multiply(ratio float64) // Multiplies the data in self by a given ratio |
| 40 | Add(other Observable) // Adds the data from a different observation to self |
| 41 | Clear() // Clears the observation so it can be reused. |
| 42 | CopyFrom(other Observable) // Copies the contents of a given observation to self |
| 43 | } |
| 44 | |
| 45 | // Float attaches the methods of Observable to a float64. |
| 46 | type Float float64 |
| 47 | |
| 48 | // NewFloat returns a Float. |
| 49 | func NewFloat() Observable { |
| 50 | f := Float(0) |
| 51 | return &f |
| 52 | } |
| 53 | |
| 54 | // String returns the float as a string. |
| 55 | func (f *Float) String() string { return fmt.Sprintf("%g", f.Value()) } |
| 56 | |
| 57 | // Value returns the float's value. |
| 58 | func (f *Float) Value() float64 { return float64(*f) } |
| 59 | |
| 60 | func (f *Float) Multiply(ratio float64) { *f *= Float(ratio) } |
| 61 | |
| 62 | func (f *Float) Add(other Observable) { |
| 63 | o := other.(*Float) |
| 64 | *f += *o |
| 65 | } |
| 66 | |
| 67 | func (f *Float) Clear() { *f = 0 } |
| 68 | |
| 69 | func (f *Float) CopyFrom(other Observable) { |
| 70 | o := other.(*Float) |
| 71 | *f = *o |
| 72 | } |
| 73 | |
| 74 | // A Clock tells the current time. |
| 75 | type Clock interface { |
| 76 | Time() time.Time |
| 77 | } |
| 78 | |
| 79 | type defaultClock int |
| 80 | |
| 81 | var defaultClockInstance defaultClock |
| 82 | |
| 83 | func (defaultClock) Time() time.Time { return time.Now() } |
| 84 | |
| 85 | // Information kept per level. Each level consists of a circular list of |
| 86 | // observations. The start of the level may be derived from end and the |
| 87 | // len(buckets) * sizeInMillis. |
| 88 | type tsLevel struct { |
| 89 | oldest int // index to oldest bucketed Observable |
| 90 | newest int // index to newest bucketed Observable |
| 91 | end time.Time // end timestamp for this level |
| 92 | size time.Duration // duration of the bucketed Observable |
| 93 | buckets []Observable // collections of observations |
| 94 | provider func() Observable // used for creating new Observable |
| 95 | } |
| 96 | |
| 97 | func (l *tsLevel) Clear() { |
| 98 | l.oldest = 0 |
| 99 | l.newest = len(l.buckets) - 1 |
| 100 | l.end = time.Time{} |
| 101 | for i := range l.buckets { |
| 102 | if l.buckets[i] != nil { |
| 103 | l.buckets[i].Clear() |
| 104 | l.buckets[i] = nil |
| 105 | } |
| 106 | } |
| 107 | } |
| 108 | |
| 109 | func (l *tsLevel) InitLevel(size time.Duration, numBuckets int, f func() Observable) { |
| 110 | l.size = size |
| 111 | l.provider = f |
| 112 | l.buckets = make([]Observable, numBuckets) |
| 113 | } |
| 114 | |
| 115 | // Keeps a sequence of levels. Each level is responsible for storing data at |
| 116 | // a given resolution. For example, the first level stores data at a one |
| 117 | // minute resolution while the second level stores data at a one hour |
| 118 | // resolution. |
| 119 | |
| 120 | // Each level is represented by a sequence of buckets. Each bucket spans an |
| 121 | // interval equal to the resolution of the level. New observations are added |
| 122 | // to the last bucket. |
| 123 | type timeSeries struct { |
| 124 | provider func() Observable // make more Observable |
| 125 | numBuckets int // number of buckets in each level |
| 126 | levels []*tsLevel // levels of bucketed Observable |
| 127 | lastAdd time.Time // time of last Observable tracked |
| 128 | total Observable // convenient aggregation of all Observable |
| 129 | clock Clock // Clock for getting current time |
| 130 | pending Observable // observations not yet bucketed |
| 131 | pendingTime time.Time // what time are we keeping in pending |
| 132 | dirty bool // if there are pending observations |
| 133 | } |
| 134 | |
| 135 | // init initializes a level according to the supplied criteria. |
| 136 | func (ts *timeSeries) init(resolutions []time.Duration, f func() Observable, numBuckets int, clock Clock) { |
| 137 | ts.provider = f |
| 138 | ts.numBuckets = numBuckets |
| 139 | ts.clock = clock |
| 140 | ts.levels = make([]*tsLevel, len(resolutions)) |
| 141 | |
| 142 | for i := range resolutions { |
| 143 | if i > 0 && resolutions[i-1] >= resolutions[i] { |
| 144 | log.Print("timeseries: resolutions must be monotonically increasing") |
| 145 | break |
| 146 | } |
| 147 | newLevel := new(tsLevel) |
| 148 | newLevel.InitLevel(resolutions[i], ts.numBuckets, ts.provider) |
| 149 | ts.levels[i] = newLevel |
| 150 | } |
| 151 | |
| 152 | ts.Clear() |
| 153 | } |
| 154 | |
| 155 | // Clear removes all observations from the time series. |
| 156 | func (ts *timeSeries) Clear() { |
| 157 | ts.lastAdd = time.Time{} |
| 158 | ts.total = ts.resetObservation(ts.total) |
| 159 | ts.pending = ts.resetObservation(ts.pending) |
| 160 | ts.pendingTime = time.Time{} |
| 161 | ts.dirty = false |
| 162 | |
| 163 | for i := range ts.levels { |
| 164 | ts.levels[i].Clear() |
| 165 | } |
| 166 | } |
| 167 | |
| 168 | // Add records an observation at the current time. |
| 169 | func (ts *timeSeries) Add(observation Observable) { |
| 170 | ts.AddWithTime(observation, ts.clock.Time()) |
| 171 | } |
| 172 | |
| 173 | // AddWithTime records an observation at the specified time. |
| 174 | func (ts *timeSeries) AddWithTime(observation Observable, t time.Time) { |
| 175 | |
| 176 | smallBucketDuration := ts.levels[0].size |
| 177 | |
| 178 | if t.After(ts.lastAdd) { |
| 179 | ts.lastAdd = t |
| 180 | } |
| 181 | |
| 182 | if t.After(ts.pendingTime) { |
| 183 | ts.advance(t) |
| 184 | ts.mergePendingUpdates() |
| 185 | ts.pendingTime = ts.levels[0].end |
| 186 | ts.pending.CopyFrom(observation) |
| 187 | ts.dirty = true |
| 188 | } else if t.After(ts.pendingTime.Add(-1 * smallBucketDuration)) { |
| 189 | // The observation is close enough to go into the pending bucket. |
| 190 | // This compensates for clock skewing and small scheduling delays |
| 191 | // by letting the update stay in the fast path. |
| 192 | ts.pending.Add(observation) |
| 193 | ts.dirty = true |
| 194 | } else { |
| 195 | ts.mergeValue(observation, t) |
| 196 | } |
| 197 | } |
| 198 | |
| 199 | // mergeValue inserts the observation at the specified time in the past into all levels. |
| 200 | func (ts *timeSeries) mergeValue(observation Observable, t time.Time) { |
| 201 | for _, level := range ts.levels { |
| 202 | index := (ts.numBuckets - 1) - int(level.end.Sub(t)/level.size) |
| 203 | if 0 <= index && index < ts.numBuckets { |
| 204 | bucketNumber := (level.oldest + index) % ts.numBuckets |
| 205 | if level.buckets[bucketNumber] == nil { |
| 206 | level.buckets[bucketNumber] = level.provider() |
| 207 | } |
| 208 | level.buckets[bucketNumber].Add(observation) |
| 209 | } |
| 210 | } |
| 211 | ts.total.Add(observation) |
| 212 | } |
| 213 | |
| 214 | // mergePendingUpdates applies the pending updates into all levels. |
| 215 | func (ts *timeSeries) mergePendingUpdates() { |
| 216 | if ts.dirty { |
| 217 | ts.mergeValue(ts.pending, ts.pendingTime) |
| 218 | ts.pending = ts.resetObservation(ts.pending) |
| 219 | ts.dirty = false |
| 220 | } |
| 221 | } |
| 222 | |
| 223 | // advance cycles the buckets at each level until the latest bucket in |
| 224 | // each level can hold the time specified. |
| 225 | func (ts *timeSeries) advance(t time.Time) { |
| 226 | if !t.After(ts.levels[0].end) { |
| 227 | return |
| 228 | } |
| 229 | for i := 0; i < len(ts.levels); i++ { |
| 230 | level := ts.levels[i] |
| 231 | if !level.end.Before(t) { |
| 232 | break |
| 233 | } |
| 234 | |
| 235 | // If the time is sufficiently far, just clear the level and advance |
| 236 | // directly. |
| 237 | if !t.Before(level.end.Add(level.size * time.Duration(ts.numBuckets))) { |
| 238 | for _, b := range level.buckets { |
| 239 | ts.resetObservation(b) |
| 240 | } |
| 241 | level.end = time.Unix(0, (t.UnixNano()/level.size.Nanoseconds())*level.size.Nanoseconds()) |
| 242 | } |
| 243 | |
| 244 | for t.After(level.end) { |
| 245 | level.end = level.end.Add(level.size) |
| 246 | level.newest = level.oldest |
| 247 | level.oldest = (level.oldest + 1) % ts.numBuckets |
| 248 | ts.resetObservation(level.buckets[level.newest]) |
| 249 | } |
| 250 | |
| 251 | t = level.end |
| 252 | } |
| 253 | } |
| 254 | |
| 255 | // Latest returns the sum of the num latest buckets from the level. |
| 256 | func (ts *timeSeries) Latest(level, num int) Observable { |
| 257 | now := ts.clock.Time() |
| 258 | if ts.levels[0].end.Before(now) { |
| 259 | ts.advance(now) |
| 260 | } |
| 261 | |
| 262 | ts.mergePendingUpdates() |
| 263 | |
| 264 | result := ts.provider() |
| 265 | l := ts.levels[level] |
| 266 | index := l.newest |
| 267 | |
| 268 | for i := 0; i < num; i++ { |
| 269 | if l.buckets[index] != nil { |
| 270 | result.Add(l.buckets[index]) |
| 271 | } |
| 272 | if index == 0 { |
| 273 | index = ts.numBuckets |
| 274 | } |
| 275 | index-- |
| 276 | } |
| 277 | |
| 278 | return result |
| 279 | } |
| 280 | |
| 281 | // LatestBuckets returns a copy of the num latest buckets from level. |
| 282 | func (ts *timeSeries) LatestBuckets(level, num int) []Observable { |
| 283 | if level < 0 || level > len(ts.levels) { |
| 284 | log.Print("timeseries: bad level argument: ", level) |
| 285 | return nil |
| 286 | } |
| 287 | if num < 0 || num >= ts.numBuckets { |
| 288 | log.Print("timeseries: bad num argument: ", num) |
| 289 | return nil |
| 290 | } |
| 291 | |
| 292 | results := make([]Observable, num) |
| 293 | now := ts.clock.Time() |
| 294 | if ts.levels[0].end.Before(now) { |
| 295 | ts.advance(now) |
| 296 | } |
| 297 | |
| 298 | ts.mergePendingUpdates() |
| 299 | |
| 300 | l := ts.levels[level] |
| 301 | index := l.newest |
| 302 | |
| 303 | for i := 0; i < num; i++ { |
| 304 | result := ts.provider() |
| 305 | results[i] = result |
| 306 | if l.buckets[index] != nil { |
| 307 | result.CopyFrom(l.buckets[index]) |
| 308 | } |
| 309 | |
| 310 | if index == 0 { |
| 311 | index = ts.numBuckets |
| 312 | } |
| 313 | index -= 1 |
| 314 | } |
| 315 | return results |
| 316 | } |
| 317 | |
| 318 | // ScaleBy updates observations by scaling by factor. |
| 319 | func (ts *timeSeries) ScaleBy(factor float64) { |
| 320 | for _, l := range ts.levels { |
| 321 | for i := 0; i < ts.numBuckets; i++ { |
| 322 | l.buckets[i].Multiply(factor) |
| 323 | } |
| 324 | } |
| 325 | |
| 326 | ts.total.Multiply(factor) |
| 327 | ts.pending.Multiply(factor) |
| 328 | } |
| 329 | |
| 330 | // Range returns the sum of observations added over the specified time range. |
| 331 | // If start or finish times don't fall on bucket boundaries of the same |
| 332 | // level, then return values are approximate answers. |
| 333 | func (ts *timeSeries) Range(start, finish time.Time) Observable { |
| 334 | return ts.ComputeRange(start, finish, 1)[0] |
| 335 | } |
| 336 | |
| 337 | // Recent returns the sum of observations from the last delta. |
| 338 | func (ts *timeSeries) Recent(delta time.Duration) Observable { |
| 339 | now := ts.clock.Time() |
| 340 | return ts.Range(now.Add(-delta), now) |
| 341 | } |
| 342 | |
| 343 | // Total returns the total of all observations. |
| 344 | func (ts *timeSeries) Total() Observable { |
| 345 | ts.mergePendingUpdates() |
| 346 | return ts.total |
| 347 | } |
| 348 | |
| 349 | // ComputeRange computes a specified number of values into a slice using |
| 350 | // the observations recorded over the specified time period. The return |
| 351 | // values are approximate if the start or finish times don't fall on the |
| 352 | // bucket boundaries at the same level or if the number of buckets spanning |
| 353 | // the range is not an integral multiple of num. |
| 354 | func (ts *timeSeries) ComputeRange(start, finish time.Time, num int) []Observable { |
| 355 | if start.After(finish) { |
| 356 | log.Printf("timeseries: start > finish, %v>%v", start, finish) |
| 357 | return nil |
| 358 | } |
| 359 | |
| 360 | if num < 0 { |
| 361 | log.Printf("timeseries: num < 0, %v", num) |
| 362 | return nil |
| 363 | } |
| 364 | |
| 365 | results := make([]Observable, num) |
| 366 | |
| 367 | for _, l := range ts.levels { |
| 368 | if !start.Before(l.end.Add(-l.size * time.Duration(ts.numBuckets))) { |
| 369 | ts.extract(l, start, finish, num, results) |
| 370 | return results |
| 371 | } |
| 372 | } |
| 373 | |
| 374 | // Failed to find a level that covers the desired range. So just |
| 375 | // extract from the last level, even if it doesn't cover the entire |
| 376 | // desired range. |
| 377 | ts.extract(ts.levels[len(ts.levels)-1], start, finish, num, results) |
| 378 | |
| 379 | return results |
| 380 | } |
| 381 | |
| 382 | // RecentList returns the specified number of values in slice over the most |
| 383 | // recent time period of the specified range. |
| 384 | func (ts *timeSeries) RecentList(delta time.Duration, num int) []Observable { |
| 385 | if delta < 0 { |
| 386 | return nil |
| 387 | } |
| 388 | now := ts.clock.Time() |
| 389 | return ts.ComputeRange(now.Add(-delta), now, num) |
| 390 | } |
| 391 | |
| 392 | // extract returns a slice of specified number of observations from a given |
| 393 | // level over a given range. |
| 394 | func (ts *timeSeries) extract(l *tsLevel, start, finish time.Time, num int, results []Observable) { |
| 395 | ts.mergePendingUpdates() |
| 396 | |
| 397 | srcInterval := l.size |
| 398 | dstInterval := finish.Sub(start) / time.Duration(num) |
| 399 | dstStart := start |
| 400 | srcStart := l.end.Add(-srcInterval * time.Duration(ts.numBuckets)) |
| 401 | |
| 402 | srcIndex := 0 |
| 403 | |
| 404 | // Where should scanning start? |
| 405 | if dstStart.After(srcStart) { |
Don Newton | e0d34a8 | 2019-11-14 10:58:06 -0500 | [diff] [blame] | 406 | advance := int(dstStart.Sub(srcStart) / srcInterval) |
| 407 | srcIndex += advance |
| 408 | srcStart = srcStart.Add(time.Duration(advance) * srcInterval) |
Don Newton | 98fd881 | 2019-09-23 15:15:02 -0400 | [diff] [blame] | 409 | } |
| 410 | |
| 411 | // The i'th value is computed as show below. |
| 412 | // interval = (finish/start)/num |
| 413 | // i'th value = sum of observation in range |
| 414 | // [ start + i * interval, |
| 415 | // start + (i + 1) * interval ) |
| 416 | for i := 0; i < num; i++ { |
| 417 | results[i] = ts.resetObservation(results[i]) |
| 418 | dstEnd := dstStart.Add(dstInterval) |
| 419 | for srcIndex < ts.numBuckets && srcStart.Before(dstEnd) { |
| 420 | srcEnd := srcStart.Add(srcInterval) |
| 421 | if srcEnd.After(ts.lastAdd) { |
| 422 | srcEnd = ts.lastAdd |
| 423 | } |
| 424 | |
| 425 | if !srcEnd.Before(dstStart) { |
| 426 | srcValue := l.buckets[(srcIndex+l.oldest)%ts.numBuckets] |
| 427 | if !srcStart.Before(dstStart) && !srcEnd.After(dstEnd) { |
| 428 | // dst completely contains src. |
| 429 | if srcValue != nil { |
| 430 | results[i].Add(srcValue) |
| 431 | } |
| 432 | } else { |
| 433 | // dst partially overlaps src. |
| 434 | overlapStart := maxTime(srcStart, dstStart) |
| 435 | overlapEnd := minTime(srcEnd, dstEnd) |
| 436 | base := srcEnd.Sub(srcStart) |
| 437 | fraction := overlapEnd.Sub(overlapStart).Seconds() / base.Seconds() |
| 438 | |
| 439 | used := ts.provider() |
| 440 | if srcValue != nil { |
| 441 | used.CopyFrom(srcValue) |
| 442 | } |
| 443 | used.Multiply(fraction) |
| 444 | results[i].Add(used) |
| 445 | } |
| 446 | |
| 447 | if srcEnd.After(dstEnd) { |
| 448 | break |
| 449 | } |
| 450 | } |
| 451 | srcIndex++ |
| 452 | srcStart = srcStart.Add(srcInterval) |
| 453 | } |
| 454 | dstStart = dstStart.Add(dstInterval) |
| 455 | } |
| 456 | } |
| 457 | |
| 458 | // resetObservation clears the content so the struct may be reused. |
| 459 | func (ts *timeSeries) resetObservation(observation Observable) Observable { |
| 460 | if observation == nil { |
| 461 | observation = ts.provider() |
| 462 | } else { |
| 463 | observation.Clear() |
| 464 | } |
| 465 | return observation |
| 466 | } |
| 467 | |
| 468 | // TimeSeries tracks data at granularities from 1 second to 16 weeks. |
| 469 | type TimeSeries struct { |
| 470 | timeSeries |
| 471 | } |
| 472 | |
| 473 | // NewTimeSeries creates a new TimeSeries using the function provided for creating new Observable. |
| 474 | func NewTimeSeries(f func() Observable) *TimeSeries { |
| 475 | return NewTimeSeriesWithClock(f, defaultClockInstance) |
| 476 | } |
| 477 | |
| 478 | // NewTimeSeriesWithClock creates a new TimeSeries using the function provided for creating new Observable and the clock for |
| 479 | // assigning timestamps. |
| 480 | func NewTimeSeriesWithClock(f func() Observable, clock Clock) *TimeSeries { |
| 481 | ts := new(TimeSeries) |
| 482 | ts.timeSeries.init(timeSeriesResolutions, f, timeSeriesNumBuckets, clock) |
| 483 | return ts |
| 484 | } |
| 485 | |
| 486 | // MinuteHourSeries tracks data at granularities of 1 minute and 1 hour. |
| 487 | type MinuteHourSeries struct { |
| 488 | timeSeries |
| 489 | } |
| 490 | |
| 491 | // NewMinuteHourSeries creates a new MinuteHourSeries using the function provided for creating new Observable. |
| 492 | func NewMinuteHourSeries(f func() Observable) *MinuteHourSeries { |
| 493 | return NewMinuteHourSeriesWithClock(f, defaultClockInstance) |
| 494 | } |
| 495 | |
| 496 | // NewMinuteHourSeriesWithClock creates a new MinuteHourSeries using the function provided for creating new Observable and the clock for |
| 497 | // assigning timestamps. |
| 498 | func NewMinuteHourSeriesWithClock(f func() Observable, clock Clock) *MinuteHourSeries { |
| 499 | ts := new(MinuteHourSeries) |
| 500 | ts.timeSeries.init(minuteHourSeriesResolutions, f, |
| 501 | minuteHourSeriesNumBuckets, clock) |
| 502 | return ts |
| 503 | } |
| 504 | |
| 505 | func (ts *MinuteHourSeries) Minute() Observable { |
| 506 | return ts.timeSeries.Latest(0, 60) |
| 507 | } |
| 508 | |
| 509 | func (ts *MinuteHourSeries) Hour() Observable { |
| 510 | return ts.timeSeries.Latest(1, 60) |
| 511 | } |
| 512 | |
| 513 | func minTime(a, b time.Time) time.Time { |
| 514 | if a.Before(b) { |
| 515 | return a |
| 516 | } |
| 517 | return b |
| 518 | } |
| 519 | |
| 520 | func maxTime(a, b time.Time) time.Time { |
| 521 | if a.After(b) { |
| 522 | return a |
| 523 | } |
| 524 | return b |
| 525 | } |