khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Package quantile computes approximate quantiles over an unbounded data |
| 2 | // stream within low memory and CPU bounds. |
| 3 | // |
| 4 | // A small amount of accuracy is traded to achieve the above properties. |
| 5 | // |
| 6 | // Multiple streams can be merged before calling Query to generate a single set |
| 7 | // of results. This is meaningful when the streams represent the same type of |
| 8 | // data. See Merge and Samples. |
| 9 | // |
| 10 | // For more detailed information about the algorithm used, see: |
| 11 | // |
| 12 | // Effective Computation of Biased Quantiles over Data Streams |
| 13 | // |
| 14 | // http://www.cs.rutgers.edu/~muthu/bquant.pdf |
| 15 | package quantile |
| 16 | |
| 17 | import ( |
| 18 | "math" |
| 19 | "sort" |
| 20 | ) |
| 21 | |
| 22 | // Sample holds an observed value and meta information for compression. JSON |
| 23 | // tags have been added for convenience. |
| 24 | type Sample struct { |
| 25 | Value float64 `json:",string"` |
| 26 | Width float64 `json:",string"` |
| 27 | Delta float64 `json:",string"` |
| 28 | } |
| 29 | |
| 30 | // Samples represents a slice of samples. It implements sort.Interface. |
| 31 | type Samples []Sample |
| 32 | |
| 33 | func (a Samples) Len() int { return len(a) } |
| 34 | func (a Samples) Less(i, j int) bool { return a[i].Value < a[j].Value } |
| 35 | func (a Samples) Swap(i, j int) { a[i], a[j] = a[j], a[i] } |
| 36 | |
| 37 | type invariant func(s *stream, r float64) float64 |
| 38 | |
| 39 | // NewLowBiased returns an initialized Stream for low-biased quantiles |
| 40 | // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but |
| 41 | // error guarantees can still be given even for the lower ranks of the data |
| 42 | // distribution. |
| 43 | // |
| 44 | // The provided epsilon is a relative error, i.e. the true quantile of a value |
| 45 | // returned by a query is guaranteed to be within (1±Epsilon)*Quantile. |
| 46 | // |
| 47 | // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error |
| 48 | // properties. |
| 49 | func NewLowBiased(epsilon float64) *Stream { |
| 50 | ƒ := func(s *stream, r float64) float64 { |
| 51 | return 2 * epsilon * r |
| 52 | } |
| 53 | return newStream(ƒ) |
| 54 | } |
| 55 | |
| 56 | // NewHighBiased returns an initialized Stream for high-biased quantiles |
| 57 | // (e.g. 0.01, 0.1, 0.5) where the needed quantiles are not known a priori, but |
| 58 | // error guarantees can still be given even for the higher ranks of the data |
| 59 | // distribution. |
| 60 | // |
| 61 | // The provided epsilon is a relative error, i.e. the true quantile of a value |
| 62 | // returned by a query is guaranteed to be within 1-(1±Epsilon)*(1-Quantile). |
| 63 | // |
| 64 | // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error |
| 65 | // properties. |
| 66 | func NewHighBiased(epsilon float64) *Stream { |
| 67 | ƒ := func(s *stream, r float64) float64 { |
| 68 | return 2 * epsilon * (s.n - r) |
| 69 | } |
| 70 | return newStream(ƒ) |
| 71 | } |
| 72 | |
| 73 | // NewTargeted returns an initialized Stream concerned with a particular set of |
| 74 | // quantile values that are supplied a priori. Knowing these a priori reduces |
| 75 | // space and computation time. The targets map maps the desired quantiles to |
| 76 | // their absolute errors, i.e. the true quantile of a value returned by a query |
| 77 | // is guaranteed to be within (Quantile±Epsilon). |
| 78 | // |
| 79 | // See http://www.cs.rutgers.edu/~muthu/bquant.pdf for time, space, and error properties. |
| 80 | func NewTargeted(targetMap map[float64]float64) *Stream { |
| 81 | // Convert map to slice to avoid slow iterations on a map. |
| 82 | // ƒ is called on the hot path, so converting the map to a slice |
| 83 | // beforehand results in significant CPU savings. |
| 84 | targets := targetMapToSlice(targetMap) |
| 85 | |
| 86 | ƒ := func(s *stream, r float64) float64 { |
| 87 | var m = math.MaxFloat64 |
| 88 | var f float64 |
| 89 | for _, t := range targets { |
| 90 | if t.quantile*s.n <= r { |
| 91 | f = (2 * t.epsilon * r) / t.quantile |
| 92 | } else { |
| 93 | f = (2 * t.epsilon * (s.n - r)) / (1 - t.quantile) |
| 94 | } |
| 95 | if f < m { |
| 96 | m = f |
| 97 | } |
| 98 | } |
| 99 | return m |
| 100 | } |
| 101 | return newStream(ƒ) |
| 102 | } |
| 103 | |
| 104 | type target struct { |
| 105 | quantile float64 |
| 106 | epsilon float64 |
| 107 | } |
| 108 | |
| 109 | func targetMapToSlice(targetMap map[float64]float64) []target { |
| 110 | targets := make([]target, 0, len(targetMap)) |
| 111 | |
| 112 | for quantile, epsilon := range targetMap { |
| 113 | t := target{ |
| 114 | quantile: quantile, |
| 115 | epsilon: epsilon, |
| 116 | } |
| 117 | targets = append(targets, t) |
| 118 | } |
| 119 | |
| 120 | return targets |
| 121 | } |
| 122 | |
| 123 | // Stream computes quantiles for a stream of float64s. It is not thread-safe by |
| 124 | // design. Take care when using across multiple goroutines. |
| 125 | type Stream struct { |
| 126 | *stream |
| 127 | b Samples |
| 128 | sorted bool |
| 129 | } |
| 130 | |
| 131 | func newStream(ƒ invariant) *Stream { |
| 132 | x := &stream{ƒ: ƒ} |
| 133 | return &Stream{x, make(Samples, 0, 500), true} |
| 134 | } |
| 135 | |
| 136 | // Insert inserts v into the stream. |
| 137 | func (s *Stream) Insert(v float64) { |
| 138 | s.insert(Sample{Value: v, Width: 1}) |
| 139 | } |
| 140 | |
| 141 | func (s *Stream) insert(sample Sample) { |
| 142 | s.b = append(s.b, sample) |
| 143 | s.sorted = false |
| 144 | if len(s.b) == cap(s.b) { |
| 145 | s.flush() |
| 146 | } |
| 147 | } |
| 148 | |
| 149 | // Query returns the computed qth percentiles value. If s was created with |
| 150 | // NewTargeted, and q is not in the set of quantiles provided a priori, Query |
| 151 | // will return an unspecified result. |
| 152 | func (s *Stream) Query(q float64) float64 { |
| 153 | if !s.flushed() { |
| 154 | // Fast path when there hasn't been enough data for a flush; |
| 155 | // this also yields better accuracy for small sets of data. |
| 156 | l := len(s.b) |
| 157 | if l == 0 { |
| 158 | return 0 |
| 159 | } |
| 160 | i := int(math.Ceil(float64(l) * q)) |
| 161 | if i > 0 { |
| 162 | i -= 1 |
| 163 | } |
| 164 | s.maybeSort() |
| 165 | return s.b[i].Value |
| 166 | } |
| 167 | s.flush() |
| 168 | return s.stream.query(q) |
| 169 | } |
| 170 | |
| 171 | // Merge merges samples into the underlying streams samples. This is handy when |
| 172 | // merging multiple streams from separate threads, database shards, etc. |
| 173 | // |
| 174 | // ATTENTION: This method is broken and does not yield correct results. The |
| 175 | // underlying algorithm is not capable of merging streams correctly. |
| 176 | func (s *Stream) Merge(samples Samples) { |
| 177 | sort.Sort(samples) |
| 178 | s.stream.merge(samples) |
| 179 | } |
| 180 | |
| 181 | // Reset reinitializes and clears the list reusing the samples buffer memory. |
| 182 | func (s *Stream) Reset() { |
| 183 | s.stream.reset() |
| 184 | s.b = s.b[:0] |
| 185 | } |
| 186 | |
| 187 | // Samples returns stream samples held by s. |
| 188 | func (s *Stream) Samples() Samples { |
| 189 | if !s.flushed() { |
| 190 | return s.b |
| 191 | } |
| 192 | s.flush() |
| 193 | return s.stream.samples() |
| 194 | } |
| 195 | |
| 196 | // Count returns the total number of samples observed in the stream |
| 197 | // since initialization. |
| 198 | func (s *Stream) Count() int { |
| 199 | return len(s.b) + s.stream.count() |
| 200 | } |
| 201 | |
| 202 | func (s *Stream) flush() { |
| 203 | s.maybeSort() |
| 204 | s.stream.merge(s.b) |
| 205 | s.b = s.b[:0] |
| 206 | } |
| 207 | |
| 208 | func (s *Stream) maybeSort() { |
| 209 | if !s.sorted { |
| 210 | s.sorted = true |
| 211 | sort.Sort(s.b) |
| 212 | } |
| 213 | } |
| 214 | |
| 215 | func (s *Stream) flushed() bool { |
| 216 | return len(s.stream.l) > 0 |
| 217 | } |
| 218 | |
| 219 | type stream struct { |
| 220 | n float64 |
| 221 | l []Sample |
| 222 | ƒ invariant |
| 223 | } |
| 224 | |
| 225 | func (s *stream) reset() { |
| 226 | s.l = s.l[:0] |
| 227 | s.n = 0 |
| 228 | } |
| 229 | |
| 230 | func (s *stream) insert(v float64) { |
| 231 | s.merge(Samples{{v, 1, 0}}) |
| 232 | } |
| 233 | |
| 234 | func (s *stream) merge(samples Samples) { |
| 235 | // TODO(beorn7): This tries to merge not only individual samples, but |
| 236 | // whole summaries. The paper doesn't mention merging summaries at |
| 237 | // all. Unittests show that the merging is inaccurate. Find out how to |
| 238 | // do merges properly. |
| 239 | var r float64 |
| 240 | i := 0 |
| 241 | for _, sample := range samples { |
| 242 | for ; i < len(s.l); i++ { |
| 243 | c := s.l[i] |
| 244 | if c.Value > sample.Value { |
| 245 | // Insert at position i. |
| 246 | s.l = append(s.l, Sample{}) |
| 247 | copy(s.l[i+1:], s.l[i:]) |
| 248 | s.l[i] = Sample{ |
| 249 | sample.Value, |
| 250 | sample.Width, |
| 251 | math.Max(sample.Delta, math.Floor(s.ƒ(s, r))-1), |
| 252 | // TODO(beorn7): How to calculate delta correctly? |
| 253 | } |
| 254 | i++ |
| 255 | goto inserted |
| 256 | } |
| 257 | r += c.Width |
| 258 | } |
| 259 | s.l = append(s.l, Sample{sample.Value, sample.Width, 0}) |
| 260 | i++ |
| 261 | inserted: |
| 262 | s.n += sample.Width |
| 263 | r += sample.Width |
| 264 | } |
| 265 | s.compress() |
| 266 | } |
| 267 | |
| 268 | func (s *stream) count() int { |
| 269 | return int(s.n) |
| 270 | } |
| 271 | |
| 272 | func (s *stream) query(q float64) float64 { |
| 273 | t := math.Ceil(q * s.n) |
| 274 | t += math.Ceil(s.ƒ(s, t) / 2) |
| 275 | p := s.l[0] |
| 276 | var r float64 |
| 277 | for _, c := range s.l[1:] { |
| 278 | r += p.Width |
| 279 | if r+c.Width+c.Delta > t { |
| 280 | return p.Value |
| 281 | } |
| 282 | p = c |
| 283 | } |
| 284 | return p.Value |
| 285 | } |
| 286 | |
| 287 | func (s *stream) compress() { |
| 288 | if len(s.l) < 2 { |
| 289 | return |
| 290 | } |
| 291 | x := s.l[len(s.l)-1] |
| 292 | xi := len(s.l) - 1 |
| 293 | r := s.n - 1 - x.Width |
| 294 | |
| 295 | for i := len(s.l) - 2; i >= 0; i-- { |
| 296 | c := s.l[i] |
| 297 | if c.Width+x.Width+x.Delta <= s.ƒ(s, r) { |
| 298 | x.Width += c.Width |
| 299 | s.l[xi] = x |
| 300 | // Remove element at i. |
| 301 | copy(s.l[i:], s.l[i+1:]) |
| 302 | s.l = s.l[:len(s.l)-1] |
| 303 | xi -= 1 |
| 304 | } else { |
| 305 | x = c |
| 306 | xi = i |
| 307 | } |
| 308 | r -= c.Width |
| 309 | } |
| 310 | } |
| 311 | |
| 312 | func (s *stream) samples() Samples { |
| 313 | samples := make(Samples, len(s.l)) |
| 314 | copy(samples, s.l) |
| 315 | return samples |
| 316 | } |