| // Package quantile computes approximate quantiles over an unbounded data |
| // stream within low memory and CPU bounds. |
| // |
| // A small amount of accuracy is traded to achieve the above properties. |
| // |
| // Multiple streams can be merged before calling Query to generate a single set |
| // of results. This is meaningful when the streams represent the same type of |
| // data. See Merge and Samples. |
| // |
| // For more detailed information about the algorithm used, see: |
| // |
| // Effective Computation of Biased Quantiles over Data Streams |
| // |
| // http://www.cs.rutgers.edu/~muthu/bquant.pdf |
| package quantile |
| |
| import ( |
| "math" |
| "sort" |
| ) |
| |
| // Sample holds an observed value and meta information for compression. JSON |
| // tags have been added for convenience. |
| type Sample struct { |
| Value float64 `json:",string"` |
| Width float64 `json:",string"` |
| Delta float64 `json:",string"` |
| } |
| |
| // Samples represents a slice of samples. It implements sort.Interface. |
| type Samples []Sample |
| |
| func (a Samples) Len() int { return len(a) } |
| func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value } |
| func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
| |
| type invariant func(s *stream, r float64) float64 |
| |
| // NewLowBiased returns an initialized Stream for low-biased quantiles |
| // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but |
| // error guarantees can still be given even for the lower ranks of the data |
| // distribution. |
| // |
| // The provided epsilon is a relative error, i.e. the true quantile of a value |
| // returned by a query is guaranteed to be within (1±Epsilon)*Quantile. |
| // |
| // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error |
| // properties. |
| func NewLowBiased(epsilon float64) *Stream { |
| ƒ := func(s *stream, r float64) float64 { |
| return 2 * epsilon * r |
| } |
| return newStream(ƒ) |
| } |
| |
| // NewHighBiased returns an initialized Stream for high-biased quantiles |
| // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but |
| // error guarantees can still be given even for the higher ranks of the data |
| // distribution. |
| // |
| // The provided epsilon is a relative error, i.e. the true quantile of a value |
| // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile). |
| // |
| // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error |
| // properties. |
| func NewHighBiased(epsilon float64) *Stream { |
| ƒ := func(s *stream, r float64) float64 { |
| return 2 * epsilon * (s.n - r) |
| } |
| return newStream(ƒ) |
| } |
| |
| // NewTargeted returns an initialized Stream concerned with a particular set of |
| // quantile values that are supplied a priori. Knowing these a priori reduces |
| // space and computation time. The targets map maps the desired quantiles to |
| // their absolute errors, i.e. the true quantile of a value returned by a query |
| // is guaranteed to be within (Quantile±Epsilon). |
| // |
| // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. |
| func NewTargeted(targetMap map[float64]float64) *Stream { |
| // Convert map to slice to avoid slow iterations on a map. |
| // ƒ is called on the hot path, so converting the map to a slice |
| // beforehand results in significant CPU savings. |
| targets := targetMapToSlice(targetMap) |
| |
| ƒ := func(s *stream, r float64) float64 { |
| var m = math.MaxFloat64 |
| var f float64 |
| for _, t := range targets { |
| if t.quantile*s.n <= r { |
| f = (2 * t.epsilon * r) / t.quantile |
| } else { |
| f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile) |
| } |
| if f < m { |
| m = f |
| } |
| } |
| return m |
| } |
| return newStream(ƒ) |
| } |
| |
| type target struct { |
| quantile float64 |
| epsilon float64 |
| } |
| |
| func targetMapToSlice(targetMap map[float64]float64) []target { |
| targets := make([]target, 0, len(targetMap)) |
| |
| for quantile, epsilon := range targetMap { |
| t := target{ |
| quantile: quantile, |
| epsilon: epsilon, |
| } |
| targets = append(targets, t) |
| } |
| |
| return targets |
| } |
| |
| // Stream computes quantiles for a stream of float64s. It is not thread-safe by |
| // design. Take care when using across multiple goroutines. |
| type Stream struct { |
| *stream |
| b Samples |
| sorted bool |
| } |
| |
| func newStream(ƒ invariant) *Stream { |
| x := &stream{ƒ: ƒ} |
| return &Stream{x, make(Samples, 0, 500), true} |
| } |
| |
| // Insert inserts v into the stream. |
| func (s *Stream) Insert(v float64) { |
| s.insert(Sample{Value: v, Width: 1}) |
| } |
| |
| func (s *Stream) insert(sample Sample) { |
| s.b = append(s.b, sample) |
| s.sorted = false |
| if len(s.b) == cap(s.b) { |
| s.flush() |
| } |
| } |
| |
| // Query returns the computed qth percentiles value. If s was created with |
| // NewTargeted, and q is not in the set of quantiles provided a priori, Query |
| // will return an unspecified result. |
| func (s *Stream) Query(q float64) float64 { |
| if !s.flushed() { |
| // Fast path when there hasn't been enough data for a flush; |
| // this also yields better accuracy for small sets of data. |
| l := len(s.b) |
| if l == 0 { |
| return 0 |
| } |
| i := int(math.Ceil(float64(l) * q)) |
| if i > 0 { |
| i -= 1 |
| } |
| s.maybeSort() |
| return s.b[i].Value |
| } |
| s.flush() |
| return s.stream.query(q) |
| } |
| |
| // Merge merges samples into the underlying streams samples. This is handy when |
| // merging multiple streams from separate threads, database shards, etc. |
| // |
| // ATTENTION: This method is broken and does not yield correct results. The |
| // underlying algorithm is not capable of merging streams correctly. |
| func (s *Stream) Merge(samples Samples) { |
| sort.Sort(samples) |
| s.stream.merge(samples) |
| } |
| |
| // Reset reinitializes and clears the list reusing the samples buffer memory. |
| func (s *Stream) Reset() { |
| s.stream.reset() |
| s.b = s.b[:0] |
| } |
| |
| // Samples returns stream samples held by s. |
| func (s *Stream) Samples() Samples { |
| if !s.flushed() { |
| return s.b |
| } |
| s.flush() |
| return s.stream.samples() |
| } |
| |
| // Count returns the total number of samples observed in the stream |
| // since initialization. |
| func (s *Stream) Count() int { |
| return len(s.b) + s.stream.count() |
| } |
| |
| func (s *Stream) flush() { |
| s.maybeSort() |
| s.stream.merge(s.b) |
| s.b = s.b[:0] |
| } |
| |
| func (s *Stream) maybeSort() { |
| if !s.sorted { |
| s.sorted = true |
| sort.Sort(s.b) |
| } |
| } |
| |
| func (s *Stream) flushed() bool { |
| return len(s.stream.l) > 0 |
| } |
| |
| type stream struct { |
| n float64 |
| l []Sample |
| ƒ invariant |
| } |
| |
| func (s *stream) reset() { |
| s.l = s.l[:0] |
| s.n = 0 |
| } |
| |
| func (s *stream) insert(v float64) { |
| s.merge(Samples{{v, 1, 0}}) |
| } |
| |
| func (s *stream) merge(samples Samples) { |
| // TODO(beorn7): This tries to merge not only individual samples, but |
| // whole summaries. The paper doesn't mention merging summaries at |
| // all. Unittests show that the merging is inaccurate. Find out how to |
| // do merges properly. |
| var r float64 |
| i := 0 |
| for _, sample := range samples { |
| for ; i < len(s.l); i++ { |
| c := s.l[i] |
| if c.Value > sample.Value { |
| // Insert at position i. |
| s.l = append(s.l, Sample{}) |
| copy(s.l[i+1:], s.l[i:]) |
| s.l[i] = Sample{ |
| sample.Value, |
| sample.Width, |
| math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1), |
| // TODO(beorn7): How to calculate delta correctly? |
| } |
| i++ |
| goto inserted |
| } |
| r += c.Width |
| } |
| s.l = append(s.l, Sample{sample.Value, sample.Width, 0}) |
| i++ |
| inserted: |
| s.n += sample.Width |
| r += sample.Width |
| } |
| s.compress() |
| } |
| |
| func (s *stream) count() int { |
| return int(s.n) |
| } |
| |
| func (s *stream) query(q float64) float64 { |
| t := math.Ceil(q * s.n) |
| t += math.Ceil(s.ƒ(s, t) / 2) |
| p := s.l[0] |
| var r float64 |
| for _, c := range s.l[1:] { |
| r += p.Width |
| if r+c.Width+c.Delta > t { |
| return p.Value |
| } |
| p = c |
| } |
| return p.Value |
| } |
| |
| func (s *stream) compress() { |
| if len(s.l) < 2 { |
| return |
| } |
| x := s.l[len(s.l)-1] |
| xi := len(s.l) - 1 |
| r := s.n - 1 - x.Width |
| |
| for i := len(s.l) - 2; i >= 0; i-- { |
| c := s.l[i] |
| if c.Width+x.Width+x.Delta <= s.ƒ(s, r) { |
| x.Width += c.Width |
| s.l[xi] = x |
| // Remove element at i. |
| copy(s.l[i:], s.l[i+1:]) |
| s.l = s.l[:len(s.l)-1] |
| xi -= 1 |
| } else { |
| x = c |
| xi = i |
| } |
| r -= c.Width |
| } |
| } |
| |
| func (s *stream) samples() Samples { |
| samples := make(Samples, len(s.l)) |
| copy(samples, s.l) |
| return samples |
| } |