blob: 71bb6322e075b97722282a6c65f99be9340c72d4 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20 "math"
21 "sync"
22 "time"
23
24 "golang.org/x/time/rate"
25)
26
27type RateLimiter interface {
28 // When gets an item and gets to decide how long that item should wait
29 When(item interface{}) time.Duration
30 // Forget indicates that an item is finished being retried. Doesn't matter whether its for perm failing
31 // or for success, we'll stop tracking it
32 Forget(item interface{})
33 // NumRequeues returns back how many failures the item has had
34 NumRequeues(item interface{}) int
35}
36
37// DefaultControllerRateLimiter is a no-arg constructor for a default rate limiter for a workqueue. It has
38// both overall and per-item rate limiting. The overall is a token bucket and the per-item is exponential
39func DefaultControllerRateLimiter() RateLimiter {
40 return NewMaxOfRateLimiter(
41 NewItemExponentialFailureRateLimiter(5*time.Millisecond, 1000*time.Second),
42 // 10 qps, 100 bucket size. This is only for retry speed and its only the overall factor (not per item)
43 &BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)},
44 )
45}
46
47// BucketRateLimiter adapts a standard bucket to the workqueue ratelimiter API
48type BucketRateLimiter struct {
49 *rate.Limiter
50}
51
52var _ RateLimiter = &BucketRateLimiter{}
53
54func (r *BucketRateLimiter) When(item interface{}) time.Duration {
55 return r.Limiter.Reserve().Delay()
56}
57
58func (r *BucketRateLimiter) NumRequeues(item interface{}) int {
59 return 0
60}
61
62func (r *BucketRateLimiter) Forget(item interface{}) {
63}
64
65// ItemExponentialFailureRateLimiter does a simple baseDelay*2^<num-failures> limit
66// dealing with max failures and expiration are up to the caller
67type ItemExponentialFailureRateLimiter struct {
68 failuresLock sync.Mutex
69 failures map[interface{}]int
70
71 baseDelay time.Duration
72 maxDelay time.Duration
73}
74
75var _ RateLimiter = &ItemExponentialFailureRateLimiter{}
76
77func NewItemExponentialFailureRateLimiter(baseDelay time.Duration, maxDelay time.Duration) RateLimiter {
78 return &ItemExponentialFailureRateLimiter{
79 failures: map[interface{}]int{},
80 baseDelay: baseDelay,
81 maxDelay: maxDelay,
82 }
83}
84
85func DefaultItemBasedRateLimiter() RateLimiter {
86 return NewItemExponentialFailureRateLimiter(time.Millisecond, 1000*time.Second)
87}
88
89func (r *ItemExponentialFailureRateLimiter) When(item interface{}) time.Duration {
90 r.failuresLock.Lock()
91 defer r.failuresLock.Unlock()
92
93 exp := r.failures[item]
94 r.failures[item] = r.failures[item] + 1
95
96 // The backoff is capped such that 'calculated' value never overflows.
97 backoff := float64(r.baseDelay.Nanoseconds()) * math.Pow(2, float64(exp))
98 if backoff > math.MaxInt64 {
99 return r.maxDelay
100 }
101
102 calculated := time.Duration(backoff)
103 if calculated > r.maxDelay {
104 return r.maxDelay
105 }
106
107 return calculated
108}
109
110func (r *ItemExponentialFailureRateLimiter) NumRequeues(item interface{}) int {
111 r.failuresLock.Lock()
112 defer r.failuresLock.Unlock()
113
114 return r.failures[item]
115}
116
117func (r *ItemExponentialFailureRateLimiter) Forget(item interface{}) {
118 r.failuresLock.Lock()
119 defer r.failuresLock.Unlock()
120
121 delete(r.failures, item)
122}
123
124// ItemFastSlowRateLimiter does a quick retry for a certain number of attempts, then a slow retry after that
125type ItemFastSlowRateLimiter struct {
126 failuresLock sync.Mutex
127 failures map[interface{}]int
128
129 maxFastAttempts int
130 fastDelay time.Duration
131 slowDelay time.Duration
132}
133
134var _ RateLimiter = &ItemFastSlowRateLimiter{}
135
136func NewItemFastSlowRateLimiter(fastDelay, slowDelay time.Duration, maxFastAttempts int) RateLimiter {
137 return &ItemFastSlowRateLimiter{
138 failures: map[interface{}]int{},
139 fastDelay: fastDelay,
140 slowDelay: slowDelay,
141 maxFastAttempts: maxFastAttempts,
142 }
143}
144
145func (r *ItemFastSlowRateLimiter) When(item interface{}) time.Duration {
146 r.failuresLock.Lock()
147 defer r.failuresLock.Unlock()
148
149 r.failures[item] = r.failures[item] + 1
150
151 if r.failures[item] <= r.maxFastAttempts {
152 return r.fastDelay
153 }
154
155 return r.slowDelay
156}
157
158func (r *ItemFastSlowRateLimiter) NumRequeues(item interface{}) int {
159 r.failuresLock.Lock()
160 defer r.failuresLock.Unlock()
161
162 return r.failures[item]
163}
164
165func (r *ItemFastSlowRateLimiter) Forget(item interface{}) {
166 r.failuresLock.Lock()
167 defer r.failuresLock.Unlock()
168
169 delete(r.failures, item)
170}
171
172// MaxOfRateLimiter calls every RateLimiter and returns the worst case response
173// When used with a token bucket limiter, the burst could be apparently exceeded in cases where particular items
174// were separately delayed a longer time.
175type MaxOfRateLimiter struct {
176 limiters []RateLimiter
177}
178
179func (r *MaxOfRateLimiter) When(item interface{}) time.Duration {
180 ret := time.Duration(0)
181 for _, limiter := range r.limiters {
182 curr := limiter.When(item)
183 if curr > ret {
184 ret = curr
185 }
186 }
187
188 return ret
189}
190
191func NewMaxOfRateLimiter(limiters ...RateLimiter) RateLimiter {
192 return &MaxOfRateLimiter{limiters: limiters}
193}
194
195func (r *MaxOfRateLimiter) NumRequeues(item interface{}) int {
196 ret := 0
197 for _, limiter := range r.limiters {
198 curr := limiter.NumRequeues(item)
199 if curr > ret {
200 ret = curr
201 }
202 }
203
204 return ret
205}
206
207func (r *MaxOfRateLimiter) Forget(item interface{}) {
208 for _, limiter := range r.limiters {
209 limiter.Forget(item)
210 }
211}