blob: ffd912c56022ea5ee6cbec44676a46b809962866 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package flowcontrol
18
19import (
20 "context"
21 "errors"
22 "sync"
23 "time"
24
25 "golang.org/x/time/rate"
26)
27
28type RateLimiter interface {
29 // TryAccept returns true if a token is taken immediately. Otherwise,
30 // it returns false.
31 TryAccept() bool
32 // Accept returns once a token becomes available.
33 Accept()
34 // Stop stops the rate limiter, subsequent calls to CanAccept will return false
35 Stop()
36 // QPS returns QPS of this rate limiter
37 QPS() float32
38 // Wait returns nil if a token is taken before the Context is done.
39 Wait(ctx context.Context) error
40}
41
42type tokenBucketRateLimiter struct {
43 limiter *rate.Limiter
44 clock Clock
45 qps float32
46}
47
48// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
49// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
50// smoothed qps rate of 'qps'.
51// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
52// The maximum number of tokens in the bucket is capped at 'burst'.
53func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
54 limiter := rate.NewLimiter(rate.Limit(qps), burst)
55 return newTokenBucketRateLimiter(limiter, realClock{}, qps)
56}
57
58// An injectable, mockable clock interface.
59type Clock interface {
60 Now() time.Time
61 Sleep(time.Duration)
62}
63
64type realClock struct{}
65
66func (realClock) Now() time.Time {
67 return time.Now()
68}
69func (realClock) Sleep(d time.Duration) {
70 time.Sleep(d)
71}
72
73// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
74// but allows an injectable clock, for testing.
75func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
76 limiter := rate.NewLimiter(rate.Limit(qps), burst)
77 return newTokenBucketRateLimiter(limiter, c, qps)
78}
79
80func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
81 return &tokenBucketRateLimiter{
82 limiter: limiter,
83 clock: c,
84 qps: qps,
85 }
86}
87
88func (t *tokenBucketRateLimiter) TryAccept() bool {
89 return t.limiter.AllowN(t.clock.Now(), 1)
90}
91
92// Accept will block until a token becomes available
93func (t *tokenBucketRateLimiter) Accept() {
94 now := t.clock.Now()
95 t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
96}
97
98func (t *tokenBucketRateLimiter) Stop() {
99}
100
101func (t *tokenBucketRateLimiter) QPS() float32 {
102 return t.qps
103}
104
105func (t *tokenBucketRateLimiter) Wait(ctx context.Context) error {
106 return t.limiter.Wait(ctx)
107}
108
109type fakeAlwaysRateLimiter struct{}
110
111func NewFakeAlwaysRateLimiter() RateLimiter {
112 return &fakeAlwaysRateLimiter{}
113}
114
115func (t *fakeAlwaysRateLimiter) TryAccept() bool {
116 return true
117}
118
119func (t *fakeAlwaysRateLimiter) Stop() {}
120
121func (t *fakeAlwaysRateLimiter) Accept() {}
122
123func (t *fakeAlwaysRateLimiter) QPS() float32 {
124 return 1
125}
126
127func (t *fakeAlwaysRateLimiter) Wait(ctx context.Context) error {
128 return nil
129}
130
131type fakeNeverRateLimiter struct {
132 wg sync.WaitGroup
133}
134
135func NewFakeNeverRateLimiter() RateLimiter {
136 rl := fakeNeverRateLimiter{}
137 rl.wg.Add(1)
138 return &rl
139}
140
141func (t *fakeNeverRateLimiter) TryAccept() bool {
142 return false
143}
144
145func (t *fakeNeverRateLimiter) Stop() {
146 t.wg.Done()
147}
148
149func (t *fakeNeverRateLimiter) Accept() {
150 t.wg.Wait()
151}
152
153func (t *fakeNeverRateLimiter) QPS() float32 {
154 return 1
155}
156
157func (t *fakeNeverRateLimiter) Wait(ctx context.Context) error {
158 return errors.New("can not be accept")
159}