Girish Gowdra | 631ef3d | 2020-06-15 10:45:52 -0700 | [diff] [blame] | 1 | // Copyright (c) 2017 Uber Technologies, Inc. |
| 2 | // |
| 3 | // Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | // you may not use this file except in compliance with the License. |
| 5 | // You may obtain a copy of the License at |
| 6 | // |
| 7 | // http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | // |
| 9 | // Unless required by applicable law or agreed to in writing, software |
| 10 | // distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | // See the License for the specific language governing permissions and |
| 13 | // limitations under the License. |
| 14 | |
| 15 | package utils |
| 16 | |
| 17 | import ( |
| 18 | "sync" |
| 19 | "time" |
| 20 | ) |
| 21 | |
| 22 | // RateLimiter is a filter used to check if a message that is worth itemCost units is within the rate limits. |
| 23 | // |
| 24 | // TODO (breaking change) remove this interface in favor of public struct below |
| 25 | // |
| 26 | // Deprecated, use ReconfigurableRateLimiter. |
| 27 | type RateLimiter interface { |
| 28 | CheckCredit(itemCost float64) bool |
| 29 | } |
| 30 | |
| 31 | // ReconfigurableRateLimiter is a rate limiter based on leaky bucket algorithm, formulated in terms of a |
| 32 | // credits balance that is replenished every time CheckCredit() method is called (tick) by the amount proportional |
| 33 | // to the time elapsed since the last tick, up to max of creditsPerSecond. A call to CheckCredit() takes a cost |
| 34 | // of an item we want to pay with the balance. If the balance exceeds the cost of the item, the item is "purchased" |
| 35 | // and the balance reduced, indicated by returned value of true. Otherwise the balance is unchanged and return false. |
| 36 | // |
| 37 | // This can be used to limit a rate of messages emitted by a service by instantiating the Rate Limiter with the |
| 38 | // max number of messages a service is allowed to emit per second, and calling CheckCredit(1.0) for each message |
| 39 | // to determine if the message is within the rate limit. |
| 40 | // |
| 41 | // It can also be used to limit the rate of traffic in bytes, by setting creditsPerSecond to desired throughput |
| 42 | // as bytes/second, and calling CheckCredit() with the actual message size. |
| 43 | // |
| 44 | // TODO (breaking change) rename to RateLimiter once the interface is removed |
| 45 | type ReconfigurableRateLimiter struct { |
| 46 | lock sync.Mutex |
| 47 | |
| 48 | creditsPerSecond float64 |
| 49 | balance float64 |
| 50 | maxBalance float64 |
| 51 | lastTick time.Time |
| 52 | |
| 53 | timeNow func() time.Time |
| 54 | } |
| 55 | |
| 56 | // NewRateLimiter creates a new ReconfigurableRateLimiter. |
| 57 | func NewRateLimiter(creditsPerSecond, maxBalance float64) *ReconfigurableRateLimiter { |
| 58 | return &ReconfigurableRateLimiter{ |
| 59 | creditsPerSecond: creditsPerSecond, |
| 60 | balance: maxBalance, |
| 61 | maxBalance: maxBalance, |
| 62 | lastTick: time.Now(), |
| 63 | timeNow: time.Now, |
| 64 | } |
| 65 | } |
| 66 | |
| 67 | // CheckCredit tries to reduce the current balance by itemCost provided that the current balance |
| 68 | // is not lest than itemCost. |
| 69 | func (rl *ReconfigurableRateLimiter) CheckCredit(itemCost float64) bool { |
| 70 | rl.lock.Lock() |
| 71 | defer rl.lock.Unlock() |
| 72 | |
| 73 | // if we have enough credits to pay for current item, then reduce balance and allow |
| 74 | if rl.balance >= itemCost { |
| 75 | rl.balance -= itemCost |
| 76 | return true |
| 77 | } |
| 78 | // otherwise check if balance can be increased due to time elapsed, and try again |
| 79 | rl.updateBalance() |
| 80 | if rl.balance >= itemCost { |
| 81 | rl.balance -= itemCost |
| 82 | return true |
| 83 | } |
| 84 | return false |
| 85 | } |
| 86 | |
| 87 | // updateBalance recalculates current balance based on time elapsed. Must be called while holding a lock. |
| 88 | func (rl *ReconfigurableRateLimiter) updateBalance() { |
| 89 | // calculate how much time passed since the last tick, and update current tick |
| 90 | currentTime := rl.timeNow() |
| 91 | elapsedTime := currentTime.Sub(rl.lastTick) |
| 92 | rl.lastTick = currentTime |
| 93 | // calculate how much credit have we accumulated since the last tick |
| 94 | rl.balance += elapsedTime.Seconds() * rl.creditsPerSecond |
| 95 | if rl.balance > rl.maxBalance { |
| 96 | rl.balance = rl.maxBalance |
| 97 | } |
| 98 | } |
| 99 | |
| 100 | // Update changes the main parameters of the rate limiter in-place, while retaining |
| 101 | // the current accumulated balance (pro-rated to the new maxBalance value). Using this method |
| 102 | // instead of creating a new rate limiter helps to avoid thundering herd when sampling |
| 103 | // strategies are updated. |
| 104 | func (rl *ReconfigurableRateLimiter) Update(creditsPerSecond, maxBalance float64) { |
| 105 | rl.lock.Lock() |
| 106 | defer rl.lock.Unlock() |
| 107 | |
| 108 | rl.updateBalance() // get up to date balance |
| 109 | rl.balance = rl.balance * maxBalance / rl.maxBalance |
| 110 | rl.creditsPerSecond = creditsPerSecond |
| 111 | rl.maxBalance = maxBalance |
| 112 | } |