khenaidoo | ab1f7bd | 2019-11-14 14:00:27 -0500 | [diff] [blame] | 1 | // Copyright 2015 The Go Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | // Package rate provides a rate limiter. |
| 6 | package rate |
| 7 | |
| 8 | import ( |
| 9 | "fmt" |
| 10 | "math" |
| 11 | "sync" |
| 12 | "time" |
| 13 | ) |
| 14 | |
| 15 | // Limit defines the maximum frequency of some events. |
| 16 | // Limit is represented as number of events per second. |
| 17 | // A zero Limit allows no events. |
| 18 | type Limit float64 |
| 19 | |
| 20 | // Inf is the infinite rate limit; it allows all events (even if burst is zero). |
| 21 | const Inf = Limit(math.MaxFloat64) |
| 22 | |
| 23 | // Every converts a minimum time interval between events to a Limit. |
| 24 | func Every(interval time.Duration) Limit { |
| 25 | if interval <= 0 { |
| 26 | return Inf |
| 27 | } |
| 28 | return 1 / Limit(interval.Seconds()) |
| 29 | } |
| 30 | |
| 31 | // A Limiter controls how frequently events are allowed to happen. |
| 32 | // It implements a "token bucket" of size b, initially full and refilled |
| 33 | // at rate r tokens per second. |
| 34 | // Informally, in any large enough time interval, the Limiter limits the |
| 35 | // rate to r tokens per second, with a maximum burst size of b events. |
| 36 | // As a special case, if r == Inf (the infinite rate), b is ignored. |
| 37 | // See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. |
| 38 | // |
| 39 | // The zero value is a valid Limiter, but it will reject all events. |
| 40 | // Use NewLimiter to create non-zero Limiters. |
| 41 | // |
| 42 | // Limiter has three main methods, Allow, Reserve, and Wait. |
| 43 | // Most callers should use Wait. |
| 44 | // |
| 45 | // Each of the three methods consumes a single token. |
| 46 | // They differ in their behavior when no token is available. |
| 47 | // If no token is available, Allow returns false. |
| 48 | // If no token is available, Reserve returns a reservation for a future token |
| 49 | // and the amount of time the caller must wait before using it. |
| 50 | // If no token is available, Wait blocks until one can be obtained |
| 51 | // or its associated context.Context is canceled. |
| 52 | // |
| 53 | // The methods AllowN, ReserveN, and WaitN consume n tokens. |
| 54 | type Limiter struct { |
| 55 | limit Limit |
| 56 | burst int |
| 57 | |
| 58 | mu sync.Mutex |
| 59 | tokens float64 |
| 60 | // last is the last time the limiter's tokens field was updated |
| 61 | last time.Time |
| 62 | // lastEvent is the latest time of a rate-limited event (past or future) |
| 63 | lastEvent time.Time |
| 64 | } |
| 65 | |
| 66 | // Limit returns the maximum overall event rate. |
| 67 | func (lim *Limiter) Limit() Limit { |
| 68 | lim.mu.Lock() |
| 69 | defer lim.mu.Unlock() |
| 70 | return lim.limit |
| 71 | } |
| 72 | |
| 73 | // Burst returns the maximum burst size. Burst is the maximum number of tokens |
| 74 | // that can be consumed in a single call to Allow, Reserve, or Wait, so higher |
| 75 | // Burst values allow more events to happen at once. |
| 76 | // A zero Burst allows no events, unless limit == Inf. |
| 77 | func (lim *Limiter) Burst() int { |
| 78 | return lim.burst |
| 79 | } |
| 80 | |
| 81 | // NewLimiter returns a new Limiter that allows events up to rate r and permits |
| 82 | // bursts of at most b tokens. |
| 83 | func NewLimiter(r Limit, b int) *Limiter { |
| 84 | return &Limiter{ |
| 85 | limit: r, |
| 86 | burst: b, |
| 87 | } |
| 88 | } |
| 89 | |
| 90 | // Allow is shorthand for AllowN(time.Now(), 1). |
| 91 | func (lim *Limiter) Allow() bool { |
| 92 | return lim.AllowN(time.Now(), 1) |
| 93 | } |
| 94 | |
| 95 | // AllowN reports whether n events may happen at time now. |
| 96 | // Use this method if you intend to drop / skip events that exceed the rate limit. |
| 97 | // Otherwise use Reserve or Wait. |
| 98 | func (lim *Limiter) AllowN(now time.Time, n int) bool { |
| 99 | return lim.reserveN(now, n, 0).ok |
| 100 | } |
| 101 | |
| 102 | // A Reservation holds information about events that are permitted by a Limiter to happen after a delay. |
| 103 | // A Reservation may be canceled, which may enable the Limiter to permit additional events. |
| 104 | type Reservation struct { |
| 105 | ok bool |
| 106 | lim *Limiter |
| 107 | tokens int |
| 108 | timeToAct time.Time |
| 109 | // This is the Limit at reservation time, it can change later. |
| 110 | limit Limit |
| 111 | } |
| 112 | |
| 113 | // OK returns whether the limiter can provide the requested number of tokens |
| 114 | // within the maximum wait time. If OK is false, Delay returns InfDuration, and |
| 115 | // Cancel does nothing. |
| 116 | func (r *Reservation) OK() bool { |
| 117 | return r.ok |
| 118 | } |
| 119 | |
| 120 | // Delay is shorthand for DelayFrom(time.Now()). |
| 121 | func (r *Reservation) Delay() time.Duration { |
| 122 | return r.DelayFrom(time.Now()) |
| 123 | } |
| 124 | |
| 125 | // InfDuration is the duration returned by Delay when a Reservation is not OK. |
| 126 | const InfDuration = time.Duration(1<<63 - 1) |
| 127 | |
| 128 | // DelayFrom returns the duration for which the reservation holder must wait |
| 129 | // before taking the reserved action. Zero duration means act immediately. |
| 130 | // InfDuration means the limiter cannot grant the tokens requested in this |
| 131 | // Reservation within the maximum wait time. |
| 132 | func (r *Reservation) DelayFrom(now time.Time) time.Duration { |
| 133 | if !r.ok { |
| 134 | return InfDuration |
| 135 | } |
| 136 | delay := r.timeToAct.Sub(now) |
| 137 | if delay < 0 { |
| 138 | return 0 |
| 139 | } |
| 140 | return delay |
| 141 | } |
| 142 | |
| 143 | // Cancel is shorthand for CancelAt(time.Now()). |
| 144 | func (r *Reservation) Cancel() { |
| 145 | r.CancelAt(time.Now()) |
| 146 | return |
| 147 | } |
| 148 | |
| 149 | // CancelAt indicates that the reservation holder will not perform the reserved action |
| 150 | // and reverses the effects of this Reservation on the rate limit as much as possible, |
| 151 | // considering that other reservations may have already been made. |
| 152 | func (r *Reservation) CancelAt(now time.Time) { |
| 153 | if !r.ok { |
| 154 | return |
| 155 | } |
| 156 | |
| 157 | r.lim.mu.Lock() |
| 158 | defer r.lim.mu.Unlock() |
| 159 | |
| 160 | if r.lim.limit == Inf || r.tokens == 0 || r.timeToAct.Before(now) { |
| 161 | return |
| 162 | } |
| 163 | |
| 164 | // calculate tokens to restore |
| 165 | // The duration between lim.lastEvent and r.timeToAct tells us how many tokens were reserved |
| 166 | // after r was obtained. These tokens should not be restored. |
| 167 | restoreTokens := float64(r.tokens) - r.limit.tokensFromDuration(r.lim.lastEvent.Sub(r.timeToAct)) |
| 168 | if restoreTokens <= 0 { |
| 169 | return |
| 170 | } |
| 171 | // advance time to now |
| 172 | now, _, tokens := r.lim.advance(now) |
| 173 | // calculate new number of tokens |
| 174 | tokens += restoreTokens |
| 175 | if burst := float64(r.lim.burst); tokens > burst { |
| 176 | tokens = burst |
| 177 | } |
| 178 | // update state |
| 179 | r.lim.last = now |
| 180 | r.lim.tokens = tokens |
| 181 | if r.timeToAct == r.lim.lastEvent { |
| 182 | prevEvent := r.timeToAct.Add(r.limit.durationFromTokens(float64(-r.tokens))) |
| 183 | if !prevEvent.Before(now) { |
| 184 | r.lim.lastEvent = prevEvent |
| 185 | } |
| 186 | } |
| 187 | |
| 188 | return |
| 189 | } |
| 190 | |
| 191 | // Reserve is shorthand for ReserveN(time.Now(), 1). |
| 192 | func (lim *Limiter) Reserve() *Reservation { |
| 193 | return lim.ReserveN(time.Now(), 1) |
| 194 | } |
| 195 | |
| 196 | // ReserveN returns a Reservation that indicates how long the caller must wait before n events happen. |
| 197 | // The Limiter takes this Reservation into account when allowing future events. |
| 198 | // ReserveN returns false if n exceeds the Limiter's burst size. |
| 199 | // Usage example: |
| 200 | // r := lim.ReserveN(time.Now(), 1) |
| 201 | // if !r.OK() { |
| 202 | // // Not allowed to act! Did you remember to set lim.burst to be > 0 ? |
| 203 | // return |
| 204 | // } |
| 205 | // time.Sleep(r.Delay()) |
| 206 | // Act() |
| 207 | // Use this method if you wish to wait and slow down in accordance with the rate limit without dropping events. |
| 208 | // If you need to respect a deadline or cancel the delay, use Wait instead. |
| 209 | // To drop or skip events exceeding rate limit, use Allow instead. |
| 210 | func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation { |
| 211 | r := lim.reserveN(now, n, InfDuration) |
| 212 | return &r |
| 213 | } |
| 214 | |
| 215 | // contextContext is a temporary(?) copy of the context.Context type |
| 216 | // to support both Go 1.6 using golang.org/x/net/context and Go 1.7+ |
| 217 | // with the built-in context package. If people ever stop using Go 1.6 |
| 218 | // we can remove this. |
| 219 | type contextContext interface { |
| 220 | Deadline() (deadline time.Time, ok bool) |
| 221 | Done() <-chan struct{} |
| 222 | Err() error |
| 223 | Value(key interface{}) interface{} |
| 224 | } |
| 225 | |
| 226 | // Wait is shorthand for WaitN(ctx, 1). |
| 227 | func (lim *Limiter) wait(ctx contextContext) (err error) { |
| 228 | return lim.WaitN(ctx, 1) |
| 229 | } |
| 230 | |
| 231 | // WaitN blocks until lim permits n events to happen. |
| 232 | // It returns an error if n exceeds the Limiter's burst size, the Context is |
| 233 | // canceled, or the expected wait time exceeds the Context's Deadline. |
| 234 | // The burst limit is ignored if the rate limit is Inf. |
| 235 | func (lim *Limiter) waitN(ctx contextContext, n int) (err error) { |
| 236 | if n > lim.burst && lim.limit != Inf { |
| 237 | return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, lim.burst) |
| 238 | } |
| 239 | // Check if ctx is already cancelled |
| 240 | select { |
| 241 | case <-ctx.Done(): |
| 242 | return ctx.Err() |
| 243 | default: |
| 244 | } |
| 245 | // Determine wait limit |
| 246 | now := time.Now() |
| 247 | waitLimit := InfDuration |
| 248 | if deadline, ok := ctx.Deadline(); ok { |
| 249 | waitLimit = deadline.Sub(now) |
| 250 | } |
| 251 | // Reserve |
| 252 | r := lim.reserveN(now, n, waitLimit) |
| 253 | if !r.ok { |
| 254 | return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n) |
| 255 | } |
| 256 | // Wait if necessary |
| 257 | delay := r.DelayFrom(now) |
| 258 | if delay == 0 { |
| 259 | return nil |
| 260 | } |
| 261 | t := time.NewTimer(delay) |
| 262 | defer t.Stop() |
| 263 | select { |
| 264 | case <-t.C: |
| 265 | // We can proceed. |
| 266 | return nil |
| 267 | case <-ctx.Done(): |
| 268 | // Context was canceled before we could proceed. Cancel the |
| 269 | // reservation, which may permit other events to proceed sooner. |
| 270 | r.Cancel() |
| 271 | return ctx.Err() |
| 272 | } |
| 273 | } |
| 274 | |
| 275 | // SetLimit is shorthand for SetLimitAt(time.Now(), newLimit). |
| 276 | func (lim *Limiter) SetLimit(newLimit Limit) { |
| 277 | lim.SetLimitAt(time.Now(), newLimit) |
| 278 | } |
| 279 | |
| 280 | // SetLimitAt sets a new Limit for the limiter. The new Limit, and Burst, may be violated |
| 281 | // or underutilized by those which reserved (using Reserve or Wait) but did not yet act |
| 282 | // before SetLimitAt was called. |
| 283 | func (lim *Limiter) SetLimitAt(now time.Time, newLimit Limit) { |
| 284 | lim.mu.Lock() |
| 285 | defer lim.mu.Unlock() |
| 286 | |
| 287 | now, _, tokens := lim.advance(now) |
| 288 | |
| 289 | lim.last = now |
| 290 | lim.tokens = tokens |
| 291 | lim.limit = newLimit |
| 292 | } |
| 293 | |
| 294 | // reserveN is a helper method for AllowN, ReserveN, and WaitN. |
| 295 | // maxFutureReserve specifies the maximum reservation wait duration allowed. |
| 296 | // reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN. |
| 297 | func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation { |
| 298 | lim.mu.Lock() |
| 299 | |
| 300 | if lim.limit == Inf { |
| 301 | lim.mu.Unlock() |
| 302 | return Reservation{ |
| 303 | ok: true, |
| 304 | lim: lim, |
| 305 | tokens: n, |
| 306 | timeToAct: now, |
| 307 | } |
| 308 | } |
| 309 | |
| 310 | now, last, tokens := lim.advance(now) |
| 311 | |
| 312 | // Calculate the remaining number of tokens resulting from the request. |
| 313 | tokens -= float64(n) |
| 314 | |
| 315 | // Calculate the wait duration |
| 316 | var waitDuration time.Duration |
| 317 | if tokens < 0 { |
| 318 | waitDuration = lim.limit.durationFromTokens(-tokens) |
| 319 | } |
| 320 | |
| 321 | // Decide result |
| 322 | ok := n <= lim.burst && waitDuration <= maxFutureReserve |
| 323 | |
| 324 | // Prepare reservation |
| 325 | r := Reservation{ |
| 326 | ok: ok, |
| 327 | lim: lim, |
| 328 | limit: lim.limit, |
| 329 | } |
| 330 | if ok { |
| 331 | r.tokens = n |
| 332 | r.timeToAct = now.Add(waitDuration) |
| 333 | } |
| 334 | |
| 335 | // Update state |
| 336 | if ok { |
| 337 | lim.last = now |
| 338 | lim.tokens = tokens |
| 339 | lim.lastEvent = r.timeToAct |
| 340 | } else { |
| 341 | lim.last = last |
| 342 | } |
| 343 | |
| 344 | lim.mu.Unlock() |
| 345 | return r |
| 346 | } |
| 347 | |
| 348 | // advance calculates and returns an updated state for lim resulting from the passage of time. |
| 349 | // lim is not changed. |
| 350 | func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) { |
| 351 | last := lim.last |
| 352 | if now.Before(last) { |
| 353 | last = now |
| 354 | } |
| 355 | |
| 356 | // Avoid making delta overflow below when last is very old. |
| 357 | maxElapsed := lim.limit.durationFromTokens(float64(lim.burst) - lim.tokens) |
| 358 | elapsed := now.Sub(last) |
| 359 | if elapsed > maxElapsed { |
| 360 | elapsed = maxElapsed |
| 361 | } |
| 362 | |
| 363 | // Calculate the new number of tokens, due to time that passed. |
| 364 | delta := lim.limit.tokensFromDuration(elapsed) |
| 365 | tokens := lim.tokens + delta |
| 366 | if burst := float64(lim.burst); tokens > burst { |
| 367 | tokens = burst |
| 368 | } |
| 369 | |
| 370 | return now, last, tokens |
| 371 | } |
| 372 | |
| 373 | // durationFromTokens is a unit conversion function from the number of tokens to the duration |
| 374 | // of time it takes to accumulate them at a rate of limit tokens per second. |
| 375 | func (limit Limit) durationFromTokens(tokens float64) time.Duration { |
| 376 | seconds := tokens / float64(limit) |
| 377 | return time.Nanosecond * time.Duration(1e9*seconds) |
| 378 | } |
| 379 | |
| 380 | // tokensFromDuration is a unit conversion function from a time duration to the number of tokens |
| 381 | // which could be accumulated during that duration at a rate of limit tokens per second. |
| 382 | func (limit Limit) tokensFromDuration(d time.Duration) float64 { |
| 383 | return d.Seconds() * float64(limit) |
| 384 | } |