Matteo Scandolo | a428586 | 2020-12-01 18:10:10 -0800 | [diff] [blame] | 1 | // Copyright 2015 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | // Package rate provides a rate limiter. |
| 6 | package rate |
| 7 | |
| 8 | import ( |
| 9 | "context" |
| 10 | "fmt" |
| 11 | "math" |
| 12 | "sync" |
| 13 | "time" |
| 14 | ) |
| 15 | |
| 16 | // Limit defines the maximum frequency of some events. |
| 17 | // Limit is represented as number of events per second. |
| 18 | // A zero Limit allows no events. |
| 19 | type Limit float64 |
| 20 | |
| 21 | // Inf is the infinite rate limit; it allows all events (even if burst is zero). |
| 22 | const Inf = Limit(math.MaxFloat64) |
| 23 | |
| 24 | // Every converts a minimum time interval between events to a Limit. |
| 25 | func Every(interval time.Duration) Limit { |
| 26 | if interval <= 0 { |
| 27 | return Inf |
| 28 | } |
| 29 | return 1 / Limit(interval.Seconds()) |
| 30 | } |
| 31 | |
| 32 | // A Limiter controls how frequently events are allowed to happen. |
| 33 | // It implements a "token bucket" of size b, initially full and refilled |
| 34 | // at rate r tokens per second. |
| 35 | // Informally, in any large enough time interval, the Limiter limits the |
| 36 | // rate to r tokens per second, with a maximum burst size of b events. |
| 37 | // As a special case, if r == Inf (the infinite rate), b is ignored. |
| 38 | // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. |
| 39 | // |
| 40 | // The zero value is a valid Limiter, but it will reject all events. |
| 41 | // Use NewLimiter to create non-zero Limiters. |
| 42 | // |
| 43 | // Limiter has three main methods, Allow, Reserve, and Wait. |
| 44 | // Most callers should use Wait. |
| 45 | // |
| 46 | // Each of the three methods consumes a single token. |
| 47 | // They differ in their behavior when no token is available. |
| 48 | // If no token is available, Allow returns false. |
| 49 | // If no token is available, Reserve returns a reservation for a future token |
| 50 | // and the amount of time the caller must wait before using it. |
| 51 | // If no token is available, Wait blocks until one can be obtained |
| 52 | // or its associated context.Context is canceled. |
| 53 | // |
| 54 | // The methods AllowN, ReserveN, and WaitN consume n tokens. |
| 55 | type Limiter struct { |
| 56 | limit Limit |
| 57 | burst int |
| 58 | |
| 59 | mu sync.Mutex |
| 60 | tokens float64 |
| 61 | // last is the last time the limiter's tokens field was updated |
| 62 | last time.Time |
| 63 | // lastEvent is the latest time of a rate-limited event (past or future) |
| 64 | lastEvent time.Time |
| 65 | } |
| 66 | |
| 67 | // Limit returns the maximum overall event rate. |
| 68 | func (lim *Limiter) Limit() Limit { |
| 69 | lim.mu.Lock() |
| 70 | defer lim.mu.Unlock() |
| 71 | return lim.limit |
| 72 | } |
| 73 | |
| 74 | // Burst returns the maximum burst size. Burst is the maximum number of tokens |
| 75 | // that can be consumed in a single call to Allow, Reserve, or Wait, so higher |
| 76 | // Burst values allow more events to happen at once. |
| 77 | // A zero Burst allows no events, unless limit == Inf. |
| 78 | func (lim *Limiter) Burst() int { |
| 79 | return lim.burst |
| 80 | } |
| 81 | |
| 82 | // NewLimiter returns a new Limiter that allows events up to rate r and permits |
| 83 | // bursts of at most b tokens. |
| 84 | func NewLimiter(r Limit, b int) *Limiter { |
| 85 | return &Limiter{ |
| 86 | limit: r, |
| 87 | burst: b, |
| 88 | } |
| 89 | } |
| 90 | |
| 91 | // Allow is shorthand for AllowN(time.Now(), 1). |
| 92 | func (lim *Limiter) Allow() bool { |
| 93 | return lim.AllowN(time.Now(), 1) |
| 94 | } |
| 95 | |
| 96 | // AllowN reports whether n events may happen at time now. |
| 97 | // Use this method if you intend to drop / skip events that exceed the rate limit. |
| 98 | // Otherwise use Reserve or Wait. |
| 99 | func (lim *Limiter) AllowN(now time.Time, n int) bool { |
| 100 | return lim.reserveN(now, n, 0).ok |
| 101 | } |
| 102 | |
| 103 | // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. |
| 104 | // A Reservation may be canceled, which may enable the Limiter to permit additional events. |
| 105 | type Reservation struct { |
| 106 | ok bool |
| 107 | lim *Limiter |
| 108 | tokens int |
| 109 | timeToAct time.Time |
| 110 | // This is the Limit at reservation time, it can change later. |
| 111 | limit Limit |
| 112 | } |
| 113 | |
| 114 | // OK returns whether the limiter can provide the requested number of tokens |
| 115 | // within the maximum wait time. If OK is false, Delay returns InfDuration, and |
| 116 | // Cancel does nothing. |
| 117 | func (r *Reservation) OK() bool { |
| 118 | return r.ok |
| 119 | } |
| 120 | |
| 121 | // Delay is shorthand for DelayFrom(time.Now()). |
| 122 | func (r *Reservation) Delay() time.Duration { |
| 123 | return r.DelayFrom(time.Now()) |
| 124 | } |
| 125 | |
| 126 | // InfDuration is the duration returned by Delay when a Reservation is not OK. |
| 127 | const InfDuration = time.Duration(1<<63 - 1) |
| 128 | |
| 129 | // DelayFrom returns the duration for which the reservation holder must wait |
| 130 | // before taking the reserved action. Zero duration means act immediately. |
| 131 | // InfDuration means the limiter cannot grant the tokens requested in this |
| 132 | // Reservation within the maximum wait time. |
| 133 | func (r *Reservation) DelayFrom(now time.Time) time.Duration { |
| 134 | if !r.ok { |
| 135 | return InfDuration |
| 136 | } |
| 137 | delay := r.timeToAct.Sub(now) |
| 138 | if delay < 0 { |
| 139 | return 0 |
| 140 | } |
| 141 | return delay |
| 142 | } |
| 143 | |
| 144 | // Cancel is shorthand for CancelAt(time.Now()). |
| 145 | func (r *Reservation) Cancel() { |
| 146 | r.CancelAt(time.Now()) |
| 147 | return |
| 148 | } |
| 149 | |
| 150 | // CancelAt indicates that the reservation holder will not perform the reserved action |
| 151 | // and reverses the effects of this Reservation on the rate limit as much as possible, |
| 152 | // considering that other reservations may have already been made. |
| 153 | func (r *Reservation) CancelAt(now time.Time) { |
| 154 | if !r.ok { |
| 155 | return |
| 156 | } |
| 157 | |
| 158 | r.lim.mu.Lock() |
| 159 | defer r.lim.mu.Unlock() |
| 160 | |
| 161 | if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { |
| 162 | return |
| 163 | } |
| 164 | |
| 165 | // calculate tokens to restore |
| 166 | // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved |
| 167 | // after r was obtained. These tokens should not be restored. |
| 168 | restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) |
| 169 | if restoreTokens <= 0 { |
| 170 | return |
| 171 | } |
| 172 | // advance time to now |
| 173 | now, _, tokens := r.lim.advance(now) |
| 174 | // calculate new number of tokens |
| 175 | tokens += restoreTokens |
| 176 | if burst := float64(r.lim.burst); tokens > burst { |
| 177 | tokens = burst |
| 178 | } |
| 179 | // update state |
| 180 | r.lim.last = now |
| 181 | r.lim.tokens = tokens |
| 182 | if r.timeToAct == r.lim.lastEvent { |
| 183 | prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) |
| 184 | if !prevEvent.Before(now) { |
| 185 | r.lim.lastEvent = prevEvent |
| 186 | } |
| 187 | } |
| 188 | |
| 189 | return |
| 190 | } |
| 191 | |
| 192 | // Reserve is shorthand for ReserveN(time.Now(), 1). |
| 193 | func (lim *Limiter) Reserve() *Reservation { |
| 194 | return lim.ReserveN(time.Now(), 1) |
| 195 | } |
| 196 | |
| 197 | // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. |
| 198 | // The Limiter takes this Reservation into account when allowing future events. |
| 199 | // ReserveN returns false if n exceeds the Limiter's burst size. |
| 200 | // Usage example: |
| 201 | // r := lim.ReserveN(time.Now(), 1) |
| 202 | // if !r.OK() { |
| 203 | // // Not allowed to act! Did you remember to set lim.burst to be > 0 ? |
| 204 | // return |
| 205 | // } |
| 206 | // time.Sleep(r.Delay()) |
| 207 | // Act() |
| 208 | // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. |
| 209 | // If you need to respect a deadline or cancel the delay, use Wait instead. |
| 210 | // To drop or skip events exceeding rate limit, use Allow instead. |
| 211 | func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { |
| 212 | r := lim.reserveN(now, n, InfDuration) |
| 213 | return &r |
| 214 | } |
| 215 | |
| 216 | // Wait is shorthand for WaitN(ctx, 1). |
| 217 | func (lim *Limiter) Wait(ctx context.Context) (err error) { |
| 218 | return lim.WaitN(ctx, 1) |
| 219 | } |
| 220 | |
| 221 | // WaitN blocks until lim permits n events to happen. |
| 222 | // It returns an error if n exceeds the Limiter's burst size, the Context is |
| 223 | // canceled, or the expected wait time exceeds the Context's Deadline. |
| 224 | // The burst limit is ignored if the rate limit is Inf. |
| 225 | func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) { |
| 226 | lim.mu.Lock() |
| 227 | burst := lim.burst |
| 228 | limit := lim.limit |
| 229 | lim.mu.Unlock() |
| 230 | |
| 231 | if n > burst && limit != Inf { |
| 232 | return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) |
| 233 | } |
| 234 | // Check if ctx is already cancelled |
| 235 | select { |
| 236 | case <-ctx.Done(): |
| 237 | return ctx.Err() |
| 238 | default: |
| 239 | } |
| 240 | // Determine wait limit |
| 241 | now := time.Now() |
| 242 | waitLimit := InfDuration |
| 243 | if deadline, ok := ctx.Deadline(); ok { |
| 244 | waitLimit = deadline.Sub(now) |
| 245 | } |
| 246 | // Reserve |
| 247 | r := lim.reserveN(now, n, waitLimit) |
| 248 | if !r.ok { |
| 249 | return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) |
| 250 | } |
| 251 | // Wait if necessary |
| 252 | delay := r.DelayFrom(now) |
| 253 | if delay == 0 { |
| 254 | return nil |
| 255 | } |
| 256 | t := time.NewTimer(delay) |
| 257 | defer t.Stop() |
| 258 | select { |
| 259 | case <-t.C: |
| 260 | // We can proceed. |
| 261 | return nil |
| 262 | case <-ctx.Done(): |
| 263 | // Context was canceled before we could proceed. Cancel the |
| 264 | // reservation, which may permit other events to proceed sooner. |
| 265 | r.Cancel() |
| 266 | return ctx.Err() |
| 267 | } |
| 268 | } |
| 269 | |
| 270 | // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). |
| 271 | func (lim *Limiter) SetLimit(newLimit Limit) { |
| 272 | lim.SetLimitAt(time.Now(), newLimit) |
| 273 | } |
| 274 | |
| 275 | // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated |
| 276 | // or underutilized by those which reserved (using Reserve or Wait) but did not yet act |
| 277 | // before SetLimitAt was called. |
| 278 | func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { |
| 279 | lim.mu.Lock() |
| 280 | defer lim.mu.Unlock() |
| 281 | |
| 282 | now, _, tokens := lim.advance(now) |
| 283 | |
| 284 | lim.last = now |
| 285 | lim.tokens = tokens |
| 286 | lim.limit = newLimit |
| 287 | } |
| 288 | |
| 289 | // SetBurst is shorthand for SetBurstAt(time.Now(), newBurst). |
| 290 | func (lim *Limiter) SetBurst(newBurst int) { |
| 291 | lim.SetBurstAt(time.Now(), newBurst) |
| 292 | } |
| 293 | |
| 294 | // SetBurstAt sets a new burst size for the limiter. |
| 295 | func (lim *Limiter) SetBurstAt(now time.Time, newBurst int) { |
| 296 | lim.mu.Lock() |
| 297 | defer lim.mu.Unlock() |
| 298 | |
| 299 | now, _, tokens := lim.advance(now) |
| 300 | |
| 301 | lim.last = now |
| 302 | lim.tokens = tokens |
| 303 | lim.burst = newBurst |
| 304 | } |
| 305 | |
| 306 | // reserveN is a helper method for AllowN, ReserveN, and WaitN. |
| 307 | // maxFutureReserve specifies the maximum reservation wait duration allowed. |
| 308 | // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. |
| 309 | func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { |
| 310 | lim.mu.Lock() |
| 311 | |
| 312 | if lim.limit == Inf { |
| 313 | lim.mu.Unlock() |
| 314 | return Reservation{ |
| 315 | ok: true, |
| 316 | lim: lim, |
| 317 | tokens: n, |
| 318 | timeToAct: now, |
| 319 | } |
| 320 | } |
| 321 | |
| 322 | now, last, tokens := lim.advance(now) |
| 323 | |
| 324 | // Calculate the remaining number of tokens resulting from the request. |
| 325 | tokens -= float64(n) |
| 326 | |
| 327 | // Calculate the wait duration |
| 328 | var waitDuration time.Duration |
| 329 | if tokens < 0 { |
| 330 | waitDuration = lim.limit.durationFromTokens(-tokens) |
| 331 | } |
| 332 | |
| 333 | // Decide result |
| 334 | ok := n <= lim.burst && waitDuration <= maxFutureReserve |
| 335 | |
| 336 | // Prepare reservation |
| 337 | r := Reservation{ |
| 338 | ok: ok, |
| 339 | lim: lim, |
| 340 | limit: lim.limit, |
| 341 | } |
| 342 | if ok { |
| 343 | r.tokens = n |
| 344 | r.timeToAct = now.Add(waitDuration) |
| 345 | } |
| 346 | |
| 347 | // Update state |
| 348 | if ok { |
| 349 | lim.last = now |
| 350 | lim.tokens = tokens |
| 351 | lim.lastEvent = r.timeToAct |
| 352 | } else { |
| 353 | lim.last = last |
| 354 | } |
| 355 | |
| 356 | lim.mu.Unlock() |
| 357 | return r |
| 358 | } |
| 359 | |
| 360 | // advance calculates and returns an updated state for lim resulting from the passage of time. |
| 361 | // lim is not changed. |
| 362 | func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { |
| 363 | last := lim.last |
| 364 | if now.Before(last) { |
| 365 | last = now |
| 366 | } |
| 367 | |
| 368 | // Avoid making delta overflow below when last is very old. |
| 369 | maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) |
| 370 | elapsed := now.Sub(last) |
| 371 | if elapsed > maxElapsed { |
| 372 | elapsed = maxElapsed |
| 373 | } |
| 374 | |
| 375 | // Calculate the new number of tokens, due to time that passed. |
| 376 | delta := lim.limit.tokensFromDuration(elapsed) |
| 377 | tokens := lim.tokens + delta |
| 378 | if burst := float64(lim.burst); tokens > burst { |
| 379 | tokens = burst |
| 380 | } |
| 381 | |
| 382 | return now, last, tokens |
| 383 | } |
| 384 | |
| 385 | // durationFromTokens is a unit conversion function from the number of tokens to the duration |
| 386 | // of time it takes to accumulate them at a rate of limit tokens per second. |
| 387 | func (limit Limit) durationFromTokens(tokens float64) time.Duration { |
| 388 | seconds := tokens / float64(limit) |
| 389 | return time.Nanosecond * time.Duration(1e9*seconds) |
| 390 | } |
| 391 | |
| 392 | // tokensFromDuration is a unit conversion function from a time duration to the number of tokens |
| 393 | // which could be accumulated during that duration at a rate of limit tokens per second. |
| 394 | func (limit Limit) tokensFromDuration(d time.Duration) float64 { |
| 395 | // Split the integer and fractional parts ourself to minimize rounding errors. |
| 396 | // See golang.org/issues/34861. |
| 397 | sec := float64(d/time.Second) * float64(limit) |
| 398 | nsec := float64(d%time.Second) * float64(limit) |
| 399 | return sec + nsec/1e9 |
| 400 | } |