blob: d7d14f8eb63d01039c945d7b8b263a3bedc545dc [file] [log] [blame]
khenaidooab1f7bd2019-11-14 14:00:27 -05001// Package quantile computes approximate quantiles over an unbounded data
2// stream within low memory and CPU bounds.
3//
4// A small amount of accuracy is traded to achieve the above properties.
5//
6// Multiple streams can be merged before calling Query to generate a single set
7// of results. This is meaningful when the streams represent the same type of
8// data. See Merge and Samples.
9//
10// For more detailed information about the algorithm used, see:
11//
12// Effective Computation of Biased Quantiles over Data Streams
13//
14// http://www.cs.rutgers.edu/~muthu/bquant.pdf
15package quantile
16
17import (
18 "math"
19 "sort"
20)
21
22// Sample holds an observed value and meta information for compression. JSON
23// tags have been added for convenience.
24type Sample struct {
25 Value float64 `json:",string"`
26 Width float64 `json:",string"`
27 Delta float64 `json:",string"`
28}
29
30// Samples represents a slice of samples. It implements sort.Interface.
31type Samples []Sample
32
33func (a Samples) Len() int { return len(a) }
34func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value }
35func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
36
37type invariant func(s *stream, r float64) float64
38
39// NewLowBiased returns an initialized Stream for low-biased quantiles
40// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
41// error guarantees can still be given even for the lower ranks of the data
42// distribution.
43//
44// The provided epsilon is a relative error, i.e. the true quantile of a value
45// returned by a query is guaranteed to be within (1±Epsilon)*Quantile.
46//
47// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
48// properties.
49func NewLowBiased(epsilon float64) *Stream {
50 ƒ := func(s *stream, r float64) float64 {
51 return 2 * epsilon * r
52 }
53 return newStream(ƒ)
54}
55
56// NewHighBiased returns an initialized Stream for high-biased quantiles
57// (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but
58// error guarantees can still be given even for the higher ranks of the data
59// distribution.
60//
61// The provided epsilon is a relative error, i.e. the true quantile of a value
62// returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile).
63//
64// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error
65// properties.
66func NewHighBiased(epsilon float64) *Stream {
67 ƒ := func(s *stream, r float64) float64 {
68 return 2 * epsilon * (s.n - r)
69 }
70 return newStream(ƒ)
71}
72
73// NewTargeted returns an initialized Stream concerned with a particular set of
74// quantile values that are supplied a priori. Knowing these a priori reduces
75// space and computation time. The targets map maps the desired quantiles to
76// their absolute errors, i.e. the true quantile of a value returned by a query
77// is guaranteed to be within (Quantile±Epsilon).
78//
79// See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties.
80func NewTargeted(targetMap map[float64]float64) *Stream {
81 // Convert map to slice to avoid slow iterations on a map.
82 // ƒ is called on the hot path, so converting the map to a slice
83 // beforehand results in significant CPU savings.
84 targets := targetMapToSlice(targetMap)
85
86 ƒ := func(s *stream, r float64) float64 {
87 var m = math.MaxFloat64
88 var f float64
89 for _, t := range targets {
90 if t.quantile*s.n <= r {
91 f = (2 * t.epsilon * r) / t.quantile
92 } else {
93 f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile)
94 }
95 if f < m {
96 m = f
97 }
98 }
99 return m
100 }
101 return newStream(ƒ)
102}
103
104type target struct {
105 quantile float64
106 epsilon float64
107}
108
109func targetMapToSlice(targetMap map[float64]float64) []target {
110 targets := make([]target, 0, len(targetMap))
111
112 for quantile, epsilon := range targetMap {
113 t := target{
114 quantile: quantile,
115 epsilon: epsilon,
116 }
117 targets = append(targets, t)
118 }
119
120 return targets
121}
122
123// Stream computes quantiles for a stream of float64s. It is not thread-safe by
124// design. Take care when using across multiple goroutines.
125type Stream struct {
126 *stream
127 b Samples
128 sorted bool
129}
130
131func newStream(ƒ invariant) *Stream {
132 x := &stream{ƒ: ƒ}
133 return &Stream{x, make(Samples, 0, 500), true}
134}
135
136// Insert inserts v into the stream.
137func (s *Stream) Insert(v float64) {
138 s.insert(Sample{Value: v, Width: 1})
139}
140
141func (s *Stream) insert(sample Sample) {
142 s.b = append(s.b, sample)
143 s.sorted = false
144 if len(s.b) == cap(s.b) {
145 s.flush()
146 }
147}
148
149// Query returns the computed qth percentiles value. If s was created with
150// NewTargeted, and q is not in the set of quantiles provided a priori, Query
151// will return an unspecified result.
152func (s *Stream) Query(q float64) float64 {
153 if !s.flushed() {
154 // Fast path when there hasn't been enough data for a flush;
155 // this also yields better accuracy for small sets of data.
156 l := len(s.b)
157 if l == 0 {
158 return 0
159 }
160 i := int(math.Ceil(float64(l) * q))
161 if i > 0 {
162 i -= 1
163 }
164 s.maybeSort()
165 return s.b[i].Value
166 }
167 s.flush()
168 return s.stream.query(q)
169}
170
171// Merge merges samples into the underlying streams samples. This is handy when
172// merging multiple streams from separate threads, database shards, etc.
173//
174// ATTENTION: This method is broken and does not yield correct results. The
175// underlying algorithm is not capable of merging streams correctly.
176func (s *Stream) Merge(samples Samples) {
177 sort.Sort(samples)
178 s.stream.merge(samples)
179}
180
181// Reset reinitializes and clears the list reusing the samples buffer memory.
182func (s *Stream) Reset() {
183 s.stream.reset()
184 s.b = s.b[:0]
185}
186
187// Samples returns stream samples held by s.
188func (s *Stream) Samples() Samples {
189 if !s.flushed() {
190 return s.b
191 }
192 s.flush()
193 return s.stream.samples()
194}
195
196// Count returns the total number of samples observed in the stream
197// since initialization.
198func (s *Stream) Count() int {
199 return len(s.b) + s.stream.count()
200}
201
202func (s *Stream) flush() {
203 s.maybeSort()
204 s.stream.merge(s.b)
205 s.b = s.b[:0]
206}
207
208func (s *Stream) maybeSort() {
209 if !s.sorted {
210 s.sorted = true
211 sort.Sort(s.b)
212 }
213}
214
215func (s *Stream) flushed() bool {
216 return len(s.stream.l) > 0
217}
218
219type stream struct {
220 n float64
221 l []Sample
222 ƒ invariant
223}
224
225func (s *stream) reset() {
226 s.l = s.l[:0]
227 s.n = 0
228}
229
230func (s *stream) insert(v float64) {
231 s.merge(Samples{{v, 1, 0}})
232}
233
234func (s *stream) merge(samples Samples) {
235 // TODO(beorn7): This tries to merge not only individual samples, but
236 // whole summaries. The paper doesn't mention merging summaries at
237 // all. Unittests show that the merging is inaccurate. Find out how to
238 // do merges properly.
239 var r float64
240 i := 0
241 for _, sample := range samples {
242 for ; i < len(s.l); i++ {
243 c := s.l[i]
244 if c.Value > sample.Value {
245 // Insert at position i.
246 s.l = append(s.l, Sample{})
247 copy(s.l[i+1:], s.l[i:])
248 s.l[i] = Sample{
249 sample.Value,
250 sample.Width,
251 math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1),
252 // TODO(beorn7): How to calculate delta correctly?
253 }
254 i++
255 goto inserted
256 }
257 r += c.Width
258 }
259 s.l = append(s.l, Sample{sample.Value, sample.Width, 0})
260 i++
261 inserted:
262 s.n += sample.Width
263 r += sample.Width
264 }
265 s.compress()
266}
267
268func (s *stream) count() int {
269 return int(s.n)
270}
271
272func (s *stream) query(q float64) float64 {
273 t := math.Ceil(q * s.n)
274 t += math.Ceil(s.ƒ(s, t) / 2)
275 p := s.l[0]
276 var r float64
277 for _, c := range s.l[1:] {
278 r += p.Width
279 if r+c.Width+c.Delta > t {
280 return p.Value
281 }
282 p = c
283 }
284 return p.Value
285}
286
287func (s *stream) compress() {
288 if len(s.l) < 2 {
289 return
290 }
291 x := s.l[len(s.l)-1]
292 xi := len(s.l) - 1
293 r := s.n - 1 - x.Width
294
295 for i := len(s.l) - 2; i >= 0; i-- {
296 c := s.l[i]
297 if c.Width+x.Width+x.Delta <= s.ƒ(s, r) {
298 x.Width += c.Width
299 s.l[xi] = x
300 // Remove element at i.
301 copy(s.l[i:], s.l[i+1:])
302 s.l = s.l[:len(s.l)-1]
303 xi -= 1
304 } else {
305 x = c
306 xi = i
307 }
308 r -= c.Width
309 }
310}
311
312func (s *stream) samples() Samples {
313 samples := make(Samples, len(s.l))
314 copy(samples, s.l)
315 return samples
316}