blob: e671c044d004d9ebd92ac0e99d6c5ec9a72fb208 [file] [log] [blame]
Zack Williamse940c7a2019-08-21 14:25:39 -07001/*
2Copyright 2014 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package flowcontrol
18
19import (
20 "sync"
21 "time"
22
23 "golang.org/x/time/rate"
24)
25
26type RateLimiter interface {
27 // TryAccept returns true if a token is taken immediately. Otherwise,
28 // it returns false.
29 TryAccept() bool
30 // Accept returns once a token becomes available.
31 Accept()
32 // Stop stops the rate limiter, subsequent calls to CanAccept will return false
33 Stop()
34 // QPS returns QPS of this rate limiter
35 QPS() float32
36}
37
38type tokenBucketRateLimiter struct {
39 limiter *rate.Limiter
40 clock Clock
41 qps float32
42}
43
44// NewTokenBucketRateLimiter creates a rate limiter which implements a token bucket approach.
45// The rate limiter allows bursts of up to 'burst' to exceed the QPS, while still maintaining a
46// smoothed qps rate of 'qps'.
47// The bucket is initially filled with 'burst' tokens, and refills at a rate of 'qps'.
48// The maximum number of tokens in the bucket is capped at 'burst'.
49func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
50 limiter := rate.NewLimiter(rate.Limit(qps), burst)
51 return newTokenBucketRateLimiter(limiter, realClock{}, qps)
52}
53
54// An injectable, mockable clock interface.
55type Clock interface {
56 Now() time.Time
57 Sleep(time.Duration)
58}
59
60type realClock struct{}
61
62func (realClock) Now() time.Time {
63 return time.Now()
64}
65func (realClock) Sleep(d time.Duration) {
66 time.Sleep(d)
67}
68
69// NewTokenBucketRateLimiterWithClock is identical to NewTokenBucketRateLimiter
70// but allows an injectable clock, for testing.
71func NewTokenBucketRateLimiterWithClock(qps float32, burst int, c Clock) RateLimiter {
72 limiter := rate.NewLimiter(rate.Limit(qps), burst)
73 return newTokenBucketRateLimiter(limiter, c, qps)
74}
75
76func newTokenBucketRateLimiter(limiter *rate.Limiter, c Clock, qps float32) RateLimiter {
77 return &tokenBucketRateLimiter{
78 limiter: limiter,
79 clock: c,
80 qps: qps,
81 }
82}
83
84func (t *tokenBucketRateLimiter) TryAccept() bool {
85 return t.limiter.AllowN(t.clock.Now(), 1)
86}
87
88// Accept will block until a token becomes available
89func (t *tokenBucketRateLimiter) Accept() {
90 now := t.clock.Now()
91 t.clock.Sleep(t.limiter.ReserveN(now, 1).DelayFrom(now))
92}
93
94func (t *tokenBucketRateLimiter) Stop() {
95}
96
97func (t *tokenBucketRateLimiter) QPS() float32 {
98 return t.qps
99}
100
101type fakeAlwaysRateLimiter struct{}
102
103func NewFakeAlwaysRateLimiter() RateLimiter {
104 return &fakeAlwaysRateLimiter{}
105}
106
107func (t *fakeAlwaysRateLimiter) TryAccept() bool {
108 return true
109}
110
111func (t *fakeAlwaysRateLimiter) Stop() {}
112
113func (t *fakeAlwaysRateLimiter) Accept() {}
114
115func (t *fakeAlwaysRateLimiter) QPS() float32 {
116 return 1
117}
118
119type fakeNeverRateLimiter struct {
120 wg sync.WaitGroup
121}
122
123func NewFakeNeverRateLimiter() RateLimiter {
124 rl := fakeNeverRateLimiter{}
125 rl.wg.Add(1)
126 return &rl
127}
128
129func (t *fakeNeverRateLimiter) TryAccept() bool {
130 return false
131}
132
133func (t *fakeNeverRateLimiter) Stop() {
134 t.wg.Done()
135}
136
137func (t *fakeNeverRateLimiter) Accept() {
138 t.wg.Wait()
139}
140
141func (t *fakeNeverRateLimiter) QPS() float32 {
142 return 1
143}