| package metrics |
| |
| import ( |
| "math" |
| "math/rand" |
| "sort" |
| "sync" |
| "time" |
| ) |
| |
| const rescaleThreshold = time.Hour |
| |
| // Samples maintain a statistically-significant selection of values from |
| // a stream. |
| type Sample interface { |
| Clear() |
| Count() int64 |
| Max() int64 |
| Mean() float64 |
| Min() int64 |
| Percentile(float64) float64 |
| Percentiles([]float64) []float64 |
| Size() int |
| Snapshot() Sample |
| StdDev() float64 |
| Sum() int64 |
| Update(int64) |
| Values() []int64 |
| Variance() float64 |
| } |
| |
| // ExpDecaySample is an exponentially-decaying sample using a forward-decaying |
| // priority reservoir. See Cormode et al's "Forward Decay: A Practical Time |
| // Decay Model for Streaming Systems". |
| // |
| // <http://dimacs.rutgers.edu/~graham/pubs/papers/fwddecay.pdf> |
| type ExpDecaySample struct { |
| alpha float64 |
| count int64 |
| mutex sync.Mutex |
| reservoirSize int |
| t0, t1 time.Time |
| values *expDecaySampleHeap |
| } |
| |
| // NewExpDecaySample constructs a new exponentially-decaying sample with the |
| // given reservoir size and alpha. |
| func NewExpDecaySample(reservoirSize int, alpha float64) Sample { |
| if UseNilMetrics { |
| return NilSample{} |
| } |
| s := &ExpDecaySample{ |
| alpha: alpha, |
| reservoirSize: reservoirSize, |
| t0: time.Now(), |
| values: newExpDecaySampleHeap(reservoirSize), |
| } |
| s.t1 = s.t0.Add(rescaleThreshold) |
| return s |
| } |
| |
| // Clear clears all samples. |
| func (s *ExpDecaySample) Clear() { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| s.count = 0 |
| s.t0 = time.Now() |
| s.t1 = s.t0.Add(rescaleThreshold) |
| s.values.Clear() |
| } |
| |
| // Count returns the number of samples recorded, which may exceed the |
| // reservoir size. |
| func (s *ExpDecaySample) Count() int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return s.count |
| } |
| |
| // Max returns the maximum value in the sample, which may not be the maximum |
| // value ever to be part of the sample. |
| func (s *ExpDecaySample) Max() int64 { |
| return SampleMax(s.Values()) |
| } |
| |
| // Mean returns the mean of the values in the sample. |
| func (s *ExpDecaySample) Mean() float64 { |
| return SampleMean(s.Values()) |
| } |
| |
| // Min returns the minimum value in the sample, which may not be the minimum |
| // value ever to be part of the sample. |
| func (s *ExpDecaySample) Min() int64 { |
| return SampleMin(s.Values()) |
| } |
| |
| // Percentile returns an arbitrary percentile of values in the sample. |
| func (s *ExpDecaySample) Percentile(p float64) float64 { |
| return SamplePercentile(s.Values(), p) |
| } |
| |
| // Percentiles returns a slice of arbitrary percentiles of values in the |
| // sample. |
| func (s *ExpDecaySample) Percentiles(ps []float64) []float64 { |
| return SamplePercentiles(s.Values(), ps) |
| } |
| |
| // Size returns the size of the sample, which is at most the reservoir size. |
| func (s *ExpDecaySample) Size() int { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return s.values.Size() |
| } |
| |
| // Snapshot returns a read-only copy of the sample. |
| func (s *ExpDecaySample) Snapshot() Sample { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| vals := s.values.Values() |
| values := make([]int64, len(vals)) |
| for i, v := range vals { |
| values[i] = v.v |
| } |
| return &SampleSnapshot{ |
| count: s.count, |
| values: values, |
| } |
| } |
| |
| // StdDev returns the standard deviation of the values in the sample. |
| func (s *ExpDecaySample) StdDev() float64 { |
| return SampleStdDev(s.Values()) |
| } |
| |
| // Sum returns the sum of the values in the sample. |
| func (s *ExpDecaySample) Sum() int64 { |
| return SampleSum(s.Values()) |
| } |
| |
| // Update samples a new value. |
| func (s *ExpDecaySample) Update(v int64) { |
| s.update(time.Now(), v) |
| } |
| |
| // Values returns a copy of the values in the sample. |
| func (s *ExpDecaySample) Values() []int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| vals := s.values.Values() |
| values := make([]int64, len(vals)) |
| for i, v := range vals { |
| values[i] = v.v |
| } |
| return values |
| } |
| |
| // Variance returns the variance of the values in the sample. |
| func (s *ExpDecaySample) Variance() float64 { |
| return SampleVariance(s.Values()) |
| } |
| |
| // update samples a new value at a particular timestamp. This is a method all |
| // its own to facilitate testing. |
| func (s *ExpDecaySample) update(t time.Time, v int64) { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| s.count++ |
| if s.values.Size() == s.reservoirSize { |
| s.values.Pop() |
| } |
| s.values.Push(expDecaySample{ |
| k: math.Exp(t.Sub(s.t0).Seconds()*s.alpha) / rand.Float64(), |
| v: v, |
| }) |
| if t.After(s.t1) { |
| values := s.values.Values() |
| t0 := s.t0 |
| s.values.Clear() |
| s.t0 = t |
| s.t1 = s.t0.Add(rescaleThreshold) |
| for _, v := range values { |
| v.k = v.k * math.Exp(-s.alpha*s.t0.Sub(t0).Seconds()) |
| s.values.Push(v) |
| } |
| } |
| } |
| |
| // NilSample is a no-op Sample. |
| type NilSample struct{} |
| |
| // Clear is a no-op. |
| func (NilSample) Clear() {} |
| |
| // Count is a no-op. |
| func (NilSample) Count() int64 { return 0 } |
| |
| // Max is a no-op. |
| func (NilSample) Max() int64 { return 0 } |
| |
| // Mean is a no-op. |
| func (NilSample) Mean() float64 { return 0.0 } |
| |
| // Min is a no-op. |
| func (NilSample) Min() int64 { return 0 } |
| |
| // Percentile is a no-op. |
| func (NilSample) Percentile(p float64) float64 { return 0.0 } |
| |
| // Percentiles is a no-op. |
| func (NilSample) Percentiles(ps []float64) []float64 { |
| return make([]float64, len(ps)) |
| } |
| |
| // Size is a no-op. |
| func (NilSample) Size() int { return 0 } |
| |
| // Sample is a no-op. |
| func (NilSample) Snapshot() Sample { return NilSample{} } |
| |
| // StdDev is a no-op. |
| func (NilSample) StdDev() float64 { return 0.0 } |
| |
| // Sum is a no-op. |
| func (NilSample) Sum() int64 { return 0 } |
| |
| // Update is a no-op. |
| func (NilSample) Update(v int64) {} |
| |
| // Values is a no-op. |
| func (NilSample) Values() []int64 { return []int64{} } |
| |
| // Variance is a no-op. |
| func (NilSample) Variance() float64 { return 0.0 } |
| |
| // SampleMax returns the maximum value of the slice of int64. |
| func SampleMax(values []int64) int64 { |
| if 0 == len(values) { |
| return 0 |
| } |
| var max int64 = math.MinInt64 |
| for _, v := range values { |
| if max < v { |
| max = v |
| } |
| } |
| return max |
| } |
| |
| // SampleMean returns the mean value of the slice of int64. |
| func SampleMean(values []int64) float64 { |
| if 0 == len(values) { |
| return 0.0 |
| } |
| return float64(SampleSum(values)) / float64(len(values)) |
| } |
| |
| // SampleMin returns the minimum value of the slice of int64. |
| func SampleMin(values []int64) int64 { |
| if 0 == len(values) { |
| return 0 |
| } |
| var min int64 = math.MaxInt64 |
| for _, v := range values { |
| if min > v { |
| min = v |
| } |
| } |
| return min |
| } |
| |
| // SamplePercentiles returns an arbitrary percentile of the slice of int64. |
| func SamplePercentile(values int64Slice, p float64) float64 { |
| return SamplePercentiles(values, []float64{p})[0] |
| } |
| |
| // SamplePercentiles returns a slice of arbitrary percentiles of the slice of |
| // int64. |
| func SamplePercentiles(values int64Slice, ps []float64) []float64 { |
| scores := make([]float64, len(ps)) |
| size := len(values) |
| if size > 0 { |
| sort.Sort(values) |
| for i, p := range ps { |
| pos := p * float64(size+1) |
| if pos < 1.0 { |
| scores[i] = float64(values[0]) |
| } else if pos >= float64(size) { |
| scores[i] = float64(values[size-1]) |
| } else { |
| lower := float64(values[int(pos)-1]) |
| upper := float64(values[int(pos)]) |
| scores[i] = lower + (pos-math.Floor(pos))*(upper-lower) |
| } |
| } |
| } |
| return scores |
| } |
| |
| // SampleSnapshot is a read-only copy of another Sample. |
| type SampleSnapshot struct { |
| count int64 |
| values []int64 |
| } |
| |
| func NewSampleSnapshot(count int64, values []int64) *SampleSnapshot { |
| return &SampleSnapshot{ |
| count: count, |
| values: values, |
| } |
| } |
| |
| // Clear panics. |
| func (*SampleSnapshot) Clear() { |
| panic("Clear called on a SampleSnapshot") |
| } |
| |
| // Count returns the count of inputs at the time the snapshot was taken. |
| func (s *SampleSnapshot) Count() int64 { return s.count } |
| |
| // Max returns the maximal value at the time the snapshot was taken. |
| func (s *SampleSnapshot) Max() int64 { return SampleMax(s.values) } |
| |
| // Mean returns the mean value at the time the snapshot was taken. |
| func (s *SampleSnapshot) Mean() float64 { return SampleMean(s.values) } |
| |
| // Min returns the minimal value at the time the snapshot was taken. |
| func (s *SampleSnapshot) Min() int64 { return SampleMin(s.values) } |
| |
| // Percentile returns an arbitrary percentile of values at the time the |
| // snapshot was taken. |
| func (s *SampleSnapshot) Percentile(p float64) float64 { |
| return SamplePercentile(s.values, p) |
| } |
| |
| // Percentiles returns a slice of arbitrary percentiles of values at the time |
| // the snapshot was taken. |
| func (s *SampleSnapshot) Percentiles(ps []float64) []float64 { |
| return SamplePercentiles(s.values, ps) |
| } |
| |
| // Size returns the size of the sample at the time the snapshot was taken. |
| func (s *SampleSnapshot) Size() int { return len(s.values) } |
| |
| // Snapshot returns the snapshot. |
| func (s *SampleSnapshot) Snapshot() Sample { return s } |
| |
| // StdDev returns the standard deviation of values at the time the snapshot was |
| // taken. |
| func (s *SampleSnapshot) StdDev() float64 { return SampleStdDev(s.values) } |
| |
| // Sum returns the sum of values at the time the snapshot was taken. |
| func (s *SampleSnapshot) Sum() int64 { return SampleSum(s.values) } |
| |
| // Update panics. |
| func (*SampleSnapshot) Update(int64) { |
| panic("Update called on a SampleSnapshot") |
| } |
| |
| // Values returns a copy of the values in the sample. |
| func (s *SampleSnapshot) Values() []int64 { |
| values := make([]int64, len(s.values)) |
| copy(values, s.values) |
| return values |
| } |
| |
| // Variance returns the variance of values at the time the snapshot was taken. |
| func (s *SampleSnapshot) Variance() float64 { return SampleVariance(s.values) } |
| |
| // SampleStdDev returns the standard deviation of the slice of int64. |
| func SampleStdDev(values []int64) float64 { |
| return math.Sqrt(SampleVariance(values)) |
| } |
| |
| // SampleSum returns the sum of the slice of int64. |
| func SampleSum(values []int64) int64 { |
| var sum int64 |
| for _, v := range values { |
| sum += v |
| } |
| return sum |
| } |
| |
| // SampleVariance returns the variance of the slice of int64. |
| func SampleVariance(values []int64) float64 { |
| if 0 == len(values) { |
| return 0.0 |
| } |
| m := SampleMean(values) |
| var sum float64 |
| for _, v := range values { |
| d := float64(v) - m |
| sum += d * d |
| } |
| return sum / float64(len(values)) |
| } |
| |
| // A uniform sample using Vitter's Algorithm R. |
| // |
| // <http://www.cs.umd.edu/~samir/498/vitter.pdf> |
| type UniformSample struct { |
| count int64 |
| mutex sync.Mutex |
| reservoirSize int |
| values []int64 |
| } |
| |
| // NewUniformSample constructs a new uniform sample with the given reservoir |
| // size. |
| func NewUniformSample(reservoirSize int) Sample { |
| if UseNilMetrics { |
| return NilSample{} |
| } |
| return &UniformSample{ |
| reservoirSize: reservoirSize, |
| values: make([]int64, 0, reservoirSize), |
| } |
| } |
| |
| // Clear clears all samples. |
| func (s *UniformSample) Clear() { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| s.count = 0 |
| s.values = make([]int64, 0, s.reservoirSize) |
| } |
| |
| // Count returns the number of samples recorded, which may exceed the |
| // reservoir size. |
| func (s *UniformSample) Count() int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return s.count |
| } |
| |
| // Max returns the maximum value in the sample, which may not be the maximum |
| // value ever to be part of the sample. |
| func (s *UniformSample) Max() int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SampleMax(s.values) |
| } |
| |
| // Mean returns the mean of the values in the sample. |
| func (s *UniformSample) Mean() float64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SampleMean(s.values) |
| } |
| |
| // Min returns the minimum value in the sample, which may not be the minimum |
| // value ever to be part of the sample. |
| func (s *UniformSample) Min() int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SampleMin(s.values) |
| } |
| |
| // Percentile returns an arbitrary percentile of values in the sample. |
| func (s *UniformSample) Percentile(p float64) float64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SamplePercentile(s.values, p) |
| } |
| |
| // Percentiles returns a slice of arbitrary percentiles of values in the |
| // sample. |
| func (s *UniformSample) Percentiles(ps []float64) []float64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SamplePercentiles(s.values, ps) |
| } |
| |
| // Size returns the size of the sample, which is at most the reservoir size. |
| func (s *UniformSample) Size() int { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return len(s.values) |
| } |
| |
| // Snapshot returns a read-only copy of the sample. |
| func (s *UniformSample) Snapshot() Sample { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| values := make([]int64, len(s.values)) |
| copy(values, s.values) |
| return &SampleSnapshot{ |
| count: s.count, |
| values: values, |
| } |
| } |
| |
| // StdDev returns the standard deviation of the values in the sample. |
| func (s *UniformSample) StdDev() float64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SampleStdDev(s.values) |
| } |
| |
| // Sum returns the sum of the values in the sample. |
| func (s *UniformSample) Sum() int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SampleSum(s.values) |
| } |
| |
| // Update samples a new value. |
| func (s *UniformSample) Update(v int64) { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| s.count++ |
| if len(s.values) < s.reservoirSize { |
| s.values = append(s.values, v) |
| } else { |
| r := rand.Int63n(s.count) |
| if r < int64(len(s.values)) { |
| s.values[int(r)] = v |
| } |
| } |
| } |
| |
| // Values returns a copy of the values in the sample. |
| func (s *UniformSample) Values() []int64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| values := make([]int64, len(s.values)) |
| copy(values, s.values) |
| return values |
| } |
| |
| // Variance returns the variance of the values in the sample. |
| func (s *UniformSample) Variance() float64 { |
| s.mutex.Lock() |
| defer s.mutex.Unlock() |
| return SampleVariance(s.values) |
| } |
| |
| // expDecaySample represents an individual sample in a heap. |
| type expDecaySample struct { |
| k float64 |
| v int64 |
| } |
| |
| func newExpDecaySampleHeap(reservoirSize int) *expDecaySampleHeap { |
| return &expDecaySampleHeap{make([]expDecaySample, 0, reservoirSize)} |
| } |
| |
| // expDecaySampleHeap is a min-heap of expDecaySamples. |
| // The internal implementation is copied from the standard library's container/heap |
| type expDecaySampleHeap struct { |
| s []expDecaySample |
| } |
| |
| func (h *expDecaySampleHeap) Clear() { |
| h.s = h.s[:0] |
| } |
| |
| func (h *expDecaySampleHeap) Push(s expDecaySample) { |
| n := len(h.s) |
| h.s = h.s[0 : n+1] |
| h.s[n] = s |
| h.up(n) |
| } |
| |
| func (h *expDecaySampleHeap) Pop() expDecaySample { |
| n := len(h.s) - 1 |
| h.s[0], h.s[n] = h.s[n], h.s[0] |
| h.down(0, n) |
| |
| n = len(h.s) |
| s := h.s[n-1] |
| h.s = h.s[0 : n-1] |
| return s |
| } |
| |
| func (h *expDecaySampleHeap) Size() int { |
| return len(h.s) |
| } |
| |
| func (h *expDecaySampleHeap) Values() []expDecaySample { |
| return h.s |
| } |
| |
| func (h *expDecaySampleHeap) up(j int) { |
| for { |
| i := (j - 1) / 2 // parent |
| if i == j || !(h.s[j].k < h.s[i].k) { |
| break |
| } |
| h.s[i], h.s[j] = h.s[j], h.s[i] |
| j = i |
| } |
| } |
| |
| func (h *expDecaySampleHeap) down(i, n int) { |
| for { |
| j1 := 2*i + 1 |
| if j1 >= n || j1 < 0 { // j1 < 0 after int overflow |
| break |
| } |
| j := j1 // left child |
| if j2 := j1 + 1; j2 < n && !(h.s[j1].k < h.s[j2].k) { |
| j = j2 // = 2*i + 2 // right child |
| } |
| if !(h.s[j].k < h.s[i].k) { |
| break |
| } |
| h.s[i], h.s[j] = h.s[j], h.s[i] |
| i = j |
| } |
| } |
| |
| type int64Slice []int64 |
| |
| func (p int64Slice) Len() int { return len(p) } |
| func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } |
| func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } |