blob: 31d9182dea6f092287c7c690909be4fb6e25623c [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2016 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20 "container/heap"
21 "sync"
22 "time"
23
24 "k8s.io/apimachinery/pkg/util/clock"
25 utilruntime "k8s.io/apimachinery/pkg/util/runtime"
26)
27
28// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
29// requeue items after failures without ending up in a hot-loop.
30type DelayingInterface interface {
31 Interface
32 // AddAfter adds an item to the workqueue after the indicated duration has passed
33 AddAfter(item interface{}, duration time.Duration)
34}
35
36// NewDelayingQueue constructs a new workqueue with delayed queuing ability
37func NewDelayingQueue() DelayingInterface {
38 return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
39}
40
41// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
42// inject custom queue Interface instead of the default one
43func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
44 return newDelayingQueue(clock.RealClock{}, q, name)
45}
46
47// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
48func NewNamedDelayingQueue(name string) DelayingInterface {
49 return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
50}
51
52// NewDelayingQueueWithCustomClock constructs a new named workqueue
53// with ability to inject real or fake clock for testing purposes
54func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
55 return newDelayingQueue(clock, NewNamed(name), name)
56}
57
58func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
59 ret := &delayingType{
60 Interface: q,
61 clock: clock,
62 heartbeat: clock.NewTicker(maxWait),
63 stopCh: make(chan struct{}),
64 waitingForAddCh: make(chan *waitFor, 1000),
65 metrics: newRetryMetrics(name),
66 }
67
68 go ret.waitingLoop()
69 return ret
70}
71
72// delayingType wraps an Interface and provides delayed re-enquing
73type delayingType struct {
74 Interface
75
76 // clock tracks time for delayed firing
77 clock clock.Clock
78
79 // stopCh lets us signal a shutdown to the waiting loop
80 stopCh chan struct{}
81 // stopOnce guarantees we only signal shutdown a single time
82 stopOnce sync.Once
83
84 // heartbeat ensures we wait no more than maxWait before firing
85 heartbeat clock.Ticker
86
87 // waitingForAddCh is a buffered channel that feeds waitingForAdd
88 waitingForAddCh chan *waitFor
89
90 // metrics counts the number of retries
91 metrics retryMetrics
92}
93
94// waitFor holds the data to add and the time it should be added
95type waitFor struct {
96 data t
97 readyAt time.Time
98 // index in the priority queue (heap)
99 index int
100}
101
102// waitForPriorityQueue implements a priority queue for waitFor items.
103//
104// waitForPriorityQueue implements heap.Interface. The item occurring next in
105// time (i.e., the item with the smallest readyAt) is at the root (index 0).
106// Peek returns this minimum item at index 0. Pop returns the minimum item after
107// it has been removed from the queue and placed at index Len()-1 by
108// container/heap. Push adds an item at index Len(), and container/heap
109// percolates it into the correct location.
110type waitForPriorityQueue []*waitFor
111
112func (pq waitForPriorityQueue) Len() int {
113 return len(pq)
114}
115func (pq waitForPriorityQueue) Less(i, j int) bool {
116 return pq[i].readyAt.Before(pq[j].readyAt)
117}
118func (pq waitForPriorityQueue) Swap(i, j int) {
119 pq[i], pq[j] = pq[j], pq[i]
120 pq[i].index = i
121 pq[j].index = j
122}
123
124// Push adds an item to the queue. Push should not be called directly; instead,
125// use `heap.Push`.
126func (pq *waitForPriorityQueue) Push(x interface{}) {
127 n := len(*pq)
128 item := x.(*waitFor)
129 item.index = n
130 *pq = append(*pq, item)
131}
132
133// Pop removes an item from the queue. Pop should not be called directly;
134// instead, use `heap.Pop`.
135func (pq *waitForPriorityQueue) Pop() interface{} {
136 n := len(*pq)
137 item := (*pq)[n-1]
138 item.index = -1
139 *pq = (*pq)[0:(n - 1)]
140 return item
141}
142
143// Peek returns the item at the beginning of the queue, without removing the
144// item or otherwise mutating the queue. It is safe to call directly.
145func (pq waitForPriorityQueue) Peek() interface{} {
146 return pq[0]
147}
148
149// ShutDown stops the queue. After the queue drains, the returned shutdown bool
150// on Get() will be true. This method may be invoked more than once.
151func (q *delayingType) ShutDown() {
152 q.stopOnce.Do(func() {
153 q.Interface.ShutDown()
154 close(q.stopCh)
155 q.heartbeat.Stop()
156 })
157}
158
159// AddAfter adds the given item to the work queue after the given delay
160func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
161 // don't add if we're already shutting down
162 if q.ShuttingDown() {
163 return
164 }
165
166 q.metrics.retry()
167
168 // immediately add things with no delay
169 if duration <= 0 {
170 q.Add(item)
171 return
172 }
173
174 select {
175 case <-q.stopCh:
176 // unblock if ShutDown() is called
177 case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
178 }
179}
180
181// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
182// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
183// expired item sitting for more than 10 seconds.
184const maxWait = 10 * time.Second
185
186// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
187func (q *delayingType) waitingLoop() {
188 defer utilruntime.HandleCrash()
189
190 // Make a placeholder channel to use when there are no items in our list
191 never := make(<-chan time.Time)
192
193 // Make a timer that expires when the item at the head of the waiting queue is ready
194 var nextReadyAtTimer clock.Timer
195
196 waitingForQueue := &waitForPriorityQueue{}
197 heap.Init(waitingForQueue)
198
199 waitingEntryByData := map[t]*waitFor{}
200
201 for {
202 if q.Interface.ShuttingDown() {
203 return
204 }
205
206 now := q.clock.Now()
207
208 // Add ready entries
209 for waitingForQueue.Len() > 0 {
210 entry := waitingForQueue.Peek().(*waitFor)
211 if entry.readyAt.After(now) {
212 break
213 }
214
215 entry = heap.Pop(waitingForQueue).(*waitFor)
216 q.Add(entry.data)
217 delete(waitingEntryByData, entry.data)
218 }
219
220 // Set up a wait for the first item's readyAt (if one exists)
221 nextReadyAt := never
222 if waitingForQueue.Len() > 0 {
223 if nextReadyAtTimer != nil {
224 nextReadyAtTimer.Stop()
225 }
226 entry := waitingForQueue.Peek().(*waitFor)
227 nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
228 nextReadyAt = nextReadyAtTimer.C()
229 }
230
231 select {
232 case <-q.stopCh:
233 return
234
235 case <-q.heartbeat.C():
236 // continue the loop, which will add ready items
237
238 case <-nextReadyAt:
239 // continue the loop, which will add ready items
240
241 case waitEntry := <-q.waitingForAddCh:
242 if waitEntry.readyAt.After(q.clock.Now()) {
243 insert(waitingForQueue, waitingEntryByData, waitEntry)
244 } else {
245 q.Add(waitEntry.data)
246 }
247
248 drained := false
249 for !drained {
250 select {
251 case waitEntry := <-q.waitingForAddCh:
252 if waitEntry.readyAt.After(q.clock.Now()) {
253 insert(waitingForQueue, waitingEntryByData, waitEntry)
254 } else {
255 q.Add(waitEntry.data)
256 }
257 default:
258 drained = true
259 }
260 }
261 }
262 }
263}
264
265// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
266func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
267 // if the entry already exists, update the time only if it would cause the item to be queued sooner
268 existing, exists := knownEntries[entry.data]
269 if exists {
270 if existing.readyAt.After(entry.readyAt) {
271 existing.readyAt = entry.readyAt
272 heap.Fix(q, existing.index)
273 }
274
275 return
276 }
277
278 heap.Push(q, entry)
279 knownEntries[entry.data] = entry
280}