blob: fecee5ef68ba78000a53a6aee4c9a5c10137fe4e [file] [log] [blame]
Prince Pereirac1c21d62021-04-22 08:38:15 +00001package metrics
2
3import (
4 "math"
5 "math/rand"
6 "sort"
7 "sync"
8 "time"
9)
10
11const rescaleThreshold = time.Hour
12
13// Samples maintain a statistically-significant selection of values from
14// a stream.
15type Sample interface {
16 Clear()
17 Count() int64
18 Max() int64
19 Mean() float64
20 Min() int64
21 Percentile(float64) float64
22 Percentiles([]float64) []float64
23 Size() int
24 Snapshot() Sample
25 StdDev() float64
26 Sum() int64
27 Update(int64)
28 Values() []int64
29 Variance() float64
30}
31
32// ExpDecaySample is an exponentially-decaying sample using a forward-decaying
33// priority reservoir. See Cormode et al's "Forward Decay: A Practical Time
34// Decay Model for Streaming Systems".
35//
36// <http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf>
37type ExpDecaySample struct {
38 alpha float64
39 count int64
40 mutex sync.Mutex
41 reservoirSize int
42 t0, t1 time.Time
43 values *expDecaySampleHeap
44}
45
46// NewExpDecaySample constructs a new exponentially-decaying sample with the
47// given reservoir size and alpha.
48func NewExpDecaySample(reservoirSize int, alpha float64) Sample {
49 if UseNilMetrics {
50 return NilSample{}
51 }
52 s := &ExpDecaySample{
53 alpha: alpha,
54 reservoirSize: reservoirSize,
55 t0: time.Now(),
56 values: newExpDecaySampleHeap(reservoirSize),
57 }
58 s.t1 = s.t0.Add(rescaleThreshold)
59 return s
60}
61
62// Clear clears all samples.
63func (s *ExpDecaySample) Clear() {
64 s.mutex.Lock()
65 defer s.mutex.Unlock()
66 s.count = 0
67 s.t0 = time.Now()
68 s.t1 = s.t0.Add(rescaleThreshold)
69 s.values.Clear()
70}
71
72// Count returns the number of samples recorded, which may exceed the
73// reservoir size.
74func (s *ExpDecaySample) Count() int64 {
75 s.mutex.Lock()
76 defer s.mutex.Unlock()
77 return s.count
78}
79
80// Max returns the maximum value in the sample, which may not be the maximum
81// value ever to be part of the sample.
82func (s *ExpDecaySample) Max() int64 {
83 return SampleMax(s.Values())
84}
85
86// Mean returns the mean of the values in the sample.
87func (s *ExpDecaySample) Mean() float64 {
88 return SampleMean(s.Values())
89}
90
91// Min returns the minimum value in the sample, which may not be the minimum
92// value ever to be part of the sample.
93func (s *ExpDecaySample) Min() int64 {
94 return SampleMin(s.Values())
95}
96
97// Percentile returns an arbitrary percentile of values in the sample.
98func (s *ExpDecaySample) Percentile(p float64) float64 {
99 return SamplePercentile(s.Values(), p)
100}
101
102// Percentiles returns a slice of arbitrary percentiles of values in the
103// sample.
104func (s *ExpDecaySample) Percentiles(ps []float64) []float64 {
105 return SamplePercentiles(s.Values(), ps)
106}
107
108// Size returns the size of the sample, which is at most the reservoir size.
109func (s *ExpDecaySample) Size() int {
110 s.mutex.Lock()
111 defer s.mutex.Unlock()
112 return s.values.Size()
113}
114
115// Snapshot returns a read-only copy of the sample.
116func (s *ExpDecaySample) Snapshot() Sample {
117 s.mutex.Lock()
118 defer s.mutex.Unlock()
119 vals := s.values.Values()
120 values := make([]int64, len(vals))
121 for i, v := range vals {
122 values[i] = v.v
123 }
124 return &SampleSnapshot{
125 count: s.count,
126 values: values,
127 }
128}
129
130// StdDev returns the standard deviation of the values in the sample.
131func (s *ExpDecaySample) StdDev() float64 {
132 return SampleStdDev(s.Values())
133}
134
135// Sum returns the sum of the values in the sample.
136func (s *ExpDecaySample) Sum() int64 {
137 return SampleSum(s.Values())
138}
139
140// Update samples a new value.
141func (s *ExpDecaySample) Update(v int64) {
142 s.update(time.Now(), v)
143}
144
145// Values returns a copy of the values in the sample.
146func (s *ExpDecaySample) Values() []int64 {
147 s.mutex.Lock()
148 defer s.mutex.Unlock()
149 vals := s.values.Values()
150 values := make([]int64, len(vals))
151 for i, v := range vals {
152 values[i] = v.v
153 }
154 return values
155}
156
157// Variance returns the variance of the values in the sample.
158func (s *ExpDecaySample) Variance() float64 {
159 return SampleVariance(s.Values())
160}
161
162// update samples a new value at a particular timestamp. This is a method all
163// its own to facilitate testing.
164func (s *ExpDecaySample) update(t time.Time, v int64) {
165 s.mutex.Lock()
166 defer s.mutex.Unlock()
167 s.count++
168 if s.values.Size() == s.reservoirSize {
169 s.values.Pop()
170 }
171 s.values.Push(expDecaySample{
172 k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(),
173 v: v,
174 })
175 if t.After(s.t1) {
176 values := s.values.Values()
177 t0 := s.t0
178 s.values.Clear()
179 s.t0 = t
180 s.t1 = s.t0.Add(rescaleThreshold)
181 for _, v := range values {
182 v.k = v.k * math.Exp(-s.alpha*s.t0.Sub(t0).Seconds())
183 s.values.Push(v)
184 }
185 }
186}
187
188// NilSample is a no-op Sample.
189type NilSample struct{}
190
191// Clear is a no-op.
192func (NilSample) Clear() {}
193
194// Count is a no-op.
195func (NilSample) Count() int64 { return 0 }
196
197// Max is a no-op.
198func (NilSample) Max() int64 { return 0 }
199
200// Mean is a no-op.
201func (NilSample) Mean() float64 { return 0.0 }
202
203// Min is a no-op.
204func (NilSample) Min() int64 { return 0 }
205
206// Percentile is a no-op.
207func (NilSample) Percentile(p float64) float64 { return 0.0 }
208
209// Percentiles is a no-op.
210func (NilSample) Percentiles(ps []float64) []float64 {
211 return make([]float64, len(ps))
212}
213
214// Size is a no-op.
215func (NilSample) Size() int { return 0 }
216
217// Sample is a no-op.
218func (NilSample) Snapshot() Sample { return NilSample{} }
219
220// StdDev is a no-op.
221func (NilSample) StdDev() float64 { return 0.0 }
222
223// Sum is a no-op.
224func (NilSample) Sum() int64 { return 0 }
225
226// Update is a no-op.
227func (NilSample) Update(v int64) {}
228
229// Values is a no-op.
230func (NilSample) Values() []int64 { return []int64{} }
231
232// Variance is a no-op.
233func (NilSample) Variance() float64 { return 0.0 }
234
235// SampleMax returns the maximum value of the slice of int64.
236func SampleMax(values []int64) int64 {
237 if 0 == len(values) {
238 return 0
239 }
240 var max int64 = math.MinInt64
241 for _, v := range values {
242 if max < v {
243 max = v
244 }
245 }
246 return max
247}
248
249// SampleMean returns the mean value of the slice of int64.
250func SampleMean(values []int64) float64 {
251 if 0 == len(values) {
252 return 0.0
253 }
254 return float64(SampleSum(values)) / float64(len(values))
255}
256
257// SampleMin returns the minimum value of the slice of int64.
258func SampleMin(values []int64) int64 {
259 if 0 == len(values) {
260 return 0
261 }
262 var min int64 = math.MaxInt64
263 for _, v := range values {
264 if min > v {
265 min = v
266 }
267 }
268 return min
269}
270
271// SamplePercentiles returns an arbitrary percentile of the slice of int64.
272func SamplePercentile(values int64Slice, p float64) float64 {
273 return SamplePercentiles(values, []float64{p})[0]
274}
275
276// SamplePercentiles returns a slice of arbitrary percentiles of the slice of
277// int64.
278func SamplePercentiles(values int64Slice, ps []float64) []float64 {
279 scores := make([]float64, len(ps))
280 size := len(values)
281 if size > 0 {
282 sort.Sort(values)
283 for i, p := range ps {
284 pos := p * float64(size+1)
285 if pos < 1.0 {
286 scores[i] = float64(values[0])
287 } else if pos >= float64(size) {
288 scores[i] = float64(values[size-1])
289 } else {
290 lower := float64(values[int(pos)-1])
291 upper := float64(values[int(pos)])
292 scores[i] = lower + (pos-math.Floor(pos))*(upper-lower)
293 }
294 }
295 }
296 return scores
297}
298
299// SampleSnapshot is a read-only copy of another Sample.
300type SampleSnapshot struct {
301 count int64
302 values []int64
303}
304
305func NewSampleSnapshot(count int64, values []int64) *SampleSnapshot {
306 return &SampleSnapshot{
307 count: count,
308 values: values,
309 }
310}
311
312// Clear panics.
313func (*SampleSnapshot) Clear() {
314 panic("Clear called on a SampleSnapshot")
315}
316
317// Count returns the count of inputs at the time the snapshot was taken.
318func (s *SampleSnapshot) Count() int64 { return s.count }
319
320// Max returns the maximal value at the time the snapshot was taken.
321func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) }
322
323// Mean returns the mean value at the time the snapshot was taken.
324func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) }
325
326// Min returns the minimal value at the time the snapshot was taken.
327func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) }
328
329// Percentile returns an arbitrary percentile of values at the time the
330// snapshot was taken.
331func (s *SampleSnapshot) Percentile(p float64) float64 {
332 return SamplePercentile(s.values, p)
333}
334
335// Percentiles returns a slice of arbitrary percentiles of values at the time
336// the snapshot was taken.
337func (s *SampleSnapshot) Percentiles(ps []float64) []float64 {
338 return SamplePercentiles(s.values, ps)
339}
340
341// Size returns the size of the sample at the time the snapshot was taken.
342func (s *SampleSnapshot) Size() int { return len(s.values) }
343
344// Snapshot returns the snapshot.
345func (s *SampleSnapshot) Snapshot() Sample { return s }
346
347// StdDev returns the standard deviation of values at the time the snapshot was
348// taken.
349func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) }
350
351// Sum returns the sum of values at the time the snapshot was taken.
352func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) }
353
354// Update panics.
355func (*SampleSnapshot) Update(int64) {
356 panic("Update called on a SampleSnapshot")
357}
358
359// Values returns a copy of the values in the sample.
360func (s *SampleSnapshot) Values() []int64 {
361 values := make([]int64, len(s.values))
362 copy(values, s.values)
363 return values
364}
365
366// Variance returns the variance of values at the time the snapshot was taken.
367func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) }
368
369// SampleStdDev returns the standard deviation of the slice of int64.
370func SampleStdDev(values []int64) float64 {
371 return math.Sqrt(SampleVariance(values))
372}
373
374// SampleSum returns the sum of the slice of int64.
375func SampleSum(values []int64) int64 {
376 var sum int64
377 for _, v := range values {
378 sum += v
379 }
380 return sum
381}
382
383// SampleVariance returns the variance of the slice of int64.
384func SampleVariance(values []int64) float64 {
385 if 0 == len(values) {
386 return 0.0
387 }
388 m := SampleMean(values)
389 var sum float64
390 for _, v := range values {
391 d := float64(v) - m
392 sum += d * d
393 }
394 return sum / float64(len(values))
395}
396
397// A uniform sample using Vitter's Algorithm R.
398//
399// <http://www.cs.umd.edu/~samir/498/vitter.pdf>
400type UniformSample struct {
401 count int64
402 mutex sync.Mutex
403 reservoirSize int
404 values []int64
405}
406
407// NewUniformSample constructs a new uniform sample with the given reservoir
408// size.
409func NewUniformSample(reservoirSize int) Sample {
410 if UseNilMetrics {
411 return NilSample{}
412 }
413 return &UniformSample{
414 reservoirSize: reservoirSize,
415 values: make([]int64, 0, reservoirSize),
416 }
417}
418
419// Clear clears all samples.
420func (s *UniformSample) Clear() {
421 s.mutex.Lock()
422 defer s.mutex.Unlock()
423 s.count = 0
424 s.values = make([]int64, 0, s.reservoirSize)
425}
426
427// Count returns the number of samples recorded, which may exceed the
428// reservoir size.
429func (s *UniformSample) Count() int64 {
430 s.mutex.Lock()
431 defer s.mutex.Unlock()
432 return s.count
433}
434
435// Max returns the maximum value in the sample, which may not be the maximum
436// value ever to be part of the sample.
437func (s *UniformSample) Max() int64 {
438 s.mutex.Lock()
439 defer s.mutex.Unlock()
440 return SampleMax(s.values)
441}
442
443// Mean returns the mean of the values in the sample.
444func (s *UniformSample) Mean() float64 {
445 s.mutex.Lock()
446 defer s.mutex.Unlock()
447 return SampleMean(s.values)
448}
449
450// Min returns the minimum value in the sample, which may not be the minimum
451// value ever to be part of the sample.
452func (s *UniformSample) Min() int64 {
453 s.mutex.Lock()
454 defer s.mutex.Unlock()
455 return SampleMin(s.values)
456}
457
458// Percentile returns an arbitrary percentile of values in the sample.
459func (s *UniformSample) Percentile(p float64) float64 {
460 s.mutex.Lock()
461 defer s.mutex.Unlock()
462 return SamplePercentile(s.values, p)
463}
464
465// Percentiles returns a slice of arbitrary percentiles of values in the
466// sample.
467func (s *UniformSample) Percentiles(ps []float64) []float64 {
468 s.mutex.Lock()
469 defer s.mutex.Unlock()
470 return SamplePercentiles(s.values, ps)
471}
472
473// Size returns the size of the sample, which is at most the reservoir size.
474func (s *UniformSample) Size() int {
475 s.mutex.Lock()
476 defer s.mutex.Unlock()
477 return len(s.values)
478}
479
480// Snapshot returns a read-only copy of the sample.
481func (s *UniformSample) Snapshot() Sample {
482 s.mutex.Lock()
483 defer s.mutex.Unlock()
484 values := make([]int64, len(s.values))
485 copy(values, s.values)
486 return &SampleSnapshot{
487 count: s.count,
488 values: values,
489 }
490}
491
492// StdDev returns the standard deviation of the values in the sample.
493func (s *UniformSample) StdDev() float64 {
494 s.mutex.Lock()
495 defer s.mutex.Unlock()
496 return SampleStdDev(s.values)
497}
498
499// Sum returns the sum of the values in the sample.
500func (s *UniformSample) Sum() int64 {
501 s.mutex.Lock()
502 defer s.mutex.Unlock()
503 return SampleSum(s.values)
504}
505
506// Update samples a new value.
507func (s *UniformSample) Update(v int64) {
508 s.mutex.Lock()
509 defer s.mutex.Unlock()
510 s.count++
511 if len(s.values) < s.reservoirSize {
512 s.values = append(s.values, v)
513 } else {
514 r := rand.Int63n(s.count)
515 if r < int64(len(s.values)) {
516 s.values[int(r)] = v
517 }
518 }
519}
520
521// Values returns a copy of the values in the sample.
522func (s *UniformSample) Values() []int64 {
523 s.mutex.Lock()
524 defer s.mutex.Unlock()
525 values := make([]int64, len(s.values))
526 copy(values, s.values)
527 return values
528}
529
530// Variance returns the variance of the values in the sample.
531func (s *UniformSample) Variance() float64 {
532 s.mutex.Lock()
533 defer s.mutex.Unlock()
534 return SampleVariance(s.values)
535}
536
537// expDecaySample represents an individual sample in a heap.
538type expDecaySample struct {
539 k float64
540 v int64
541}
542
543func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap {
544 return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)}
545}
546
547// expDecaySampleHeap is a min-heap of expDecaySamples.
548// The internal implementation is copied from the standard library's container/heap
549type expDecaySampleHeap struct {
550 s []expDecaySample
551}
552
553func (h *expDecaySampleHeap) Clear() {
554 h.s = h.s[:0]
555}
556
557func (h *expDecaySampleHeap) Push(s expDecaySample) {
558 n := len(h.s)
559 h.s = h.s[0 : n+1]
560 h.s[n] = s
561 h.up(n)
562}
563
564func (h *expDecaySampleHeap) Pop() expDecaySample {
565 n := len(h.s) - 1
566 h.s[0], h.s[n] = h.s[n], h.s[0]
567 h.down(0, n)
568
569 n = len(h.s)
570 s := h.s[n-1]
571 h.s = h.s[0 : n-1]
572 return s
573}
574
575func (h *expDecaySampleHeap) Size() int {
576 return len(h.s)
577}
578
579func (h *expDecaySampleHeap) Values() []expDecaySample {
580 return h.s
581}
582
583func (h *expDecaySampleHeap) up(j int) {
584 for {
585 i := (j - 1) / 2 // parent
586 if i == j || !(h.s[j].k < h.s[i].k) {
587 break
588 }
589 h.s[i], h.s[j] = h.s[j], h.s[i]
590 j = i
591 }
592}
593
594func (h *expDecaySampleHeap) down(i, n int) {
595 for {
596 j1 := 2*i + 1
597 if j1 >= n || j1 < 0 { // j1 < 0 after int overflow
598 break
599 }
600 j := j1 // left child
601 if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) {
602 j = j2 // = 2*i + 2 // right child
603 }
604 if !(h.s[j].k < h.s[i].k) {
605 break
606 }
607 h.s[i], h.s[j] = h.s[j], h.s[i]
608 i = j
609 }
610}
611
612type int64Slice []int64
613
614func (p int64Slice) Len() int { return len(p) }
615func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] }
616func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }