Matteo Scandolo | a428586 | 2020-12-01 18:10:10 -0800 | [diff] [blame] | 1 | /* |
| 2 | Copyright 2016 The Kubernetes Authors. |
| 3 | |
| 4 | Licensed under the Apache License, Version 2.0 (the "License"); |
| 5 | you may not use this file except in compliance with the License. |
| 6 | You may obtain a copy of the License at |
| 7 | |
| 8 | http://www.apache.org/licenses/LICENSE-2.0 |
| 9 | |
| 10 | Unless required by applicable law or agreed to in writing, software |
| 11 | distributed under the License is distributed on an "AS IS" BASIS, |
| 12 | WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 13 | See the License for the specific language governing permissions and |
| 14 | limitations under the License. |
| 15 | */ |
| 16 | |
| 17 | package workqueue |
| 18 | |
| 19 | import ( |
| 20 | "container/heap" |
| 21 | "sync" |
| 22 | "time" |
| 23 | |
| 24 | "k8s.io/apimachinery/pkg/util/clock" |
| 25 | utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| 26 | ) |
| 27 | |
| 28 | // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to |
| 29 | // requeue items after failures without ending up in a hot-loop. |
| 30 | type DelayingInterface interface { |
| 31 | Interface |
| 32 | // AddAfter adds an item to the workqueue after the indicated duration has passed |
| 33 | AddAfter(item interface{}, duration time.Duration) |
| 34 | } |
| 35 | |
| 36 | // NewDelayingQueue constructs a new workqueue with delayed queuing ability |
| 37 | func NewDelayingQueue() DelayingInterface { |
| 38 | return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") |
| 39 | } |
| 40 | |
| 41 | // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to |
| 42 | // inject custom queue Interface instead of the default one |
| 43 | func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { |
| 44 | return newDelayingQueue(clock.RealClock{}, q, name) |
| 45 | } |
| 46 | |
| 47 | // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability |
| 48 | func NewNamedDelayingQueue(name string) DelayingInterface { |
| 49 | return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) |
| 50 | } |
| 51 | |
| 52 | // NewDelayingQueueWithCustomClock constructs a new named workqueue |
| 53 | // with ability to inject real or fake clock for testing purposes |
| 54 | func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { |
| 55 | return newDelayingQueue(clock, NewNamed(name), name) |
| 56 | } |
| 57 | |
| 58 | func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType { |
| 59 | ret := &delayingType{ |
| 60 | Interface: q, |
| 61 | clock: clock, |
| 62 | heartbeat: clock.NewTicker(maxWait), |
| 63 | stopCh: make(chan struct{}), |
| 64 | waitingForAddCh: make(chan *waitFor, 1000), |
| 65 | metrics: newRetryMetrics(name), |
| 66 | } |
| 67 | |
| 68 | go ret.waitingLoop() |
| 69 | return ret |
| 70 | } |
| 71 | |
| 72 | // delayingType wraps an Interface and provides delayed re-enquing |
| 73 | type delayingType struct { |
| 74 | Interface |
| 75 | |
| 76 | // clock tracks time for delayed firing |
| 77 | clock clock.Clock |
| 78 | |
| 79 | // stopCh lets us signal a shutdown to the waiting loop |
| 80 | stopCh chan struct{} |
| 81 | // stopOnce guarantees we only signal shutdown a single time |
| 82 | stopOnce sync.Once |
| 83 | |
| 84 | // heartbeat ensures we wait no more than maxWait before firing |
| 85 | heartbeat clock.Ticker |
| 86 | |
| 87 | // waitingForAddCh is a buffered channel that feeds waitingForAdd |
| 88 | waitingForAddCh chan *waitFor |
| 89 | |
| 90 | // metrics counts the number of retries |
| 91 | metrics retryMetrics |
| 92 | } |
| 93 | |
| 94 | // waitFor holds the data to add and the time it should be added |
| 95 | type waitFor struct { |
| 96 | data t |
| 97 | readyAt time.Time |
| 98 | // index in the priority queue (heap) |
| 99 | index int |
| 100 | } |
| 101 | |
| 102 | // waitForPriorityQueue implements a priority queue for waitFor items. |
| 103 | // |
| 104 | // waitForPriorityQueue implements heap.Interface. The item occurring next in |
| 105 | // time (i.e., the item with the smallest readyAt) is at the root (index 0). |
| 106 | // Peek returns this minimum item at index 0. Pop returns the minimum item after |
| 107 | // it has been removed from the queue and placed at index Len()-1 by |
| 108 | // container/heap. Push adds an item at index Len(), and container/heap |
| 109 | // percolates it into the correct location. |
| 110 | type waitForPriorityQueue []*waitFor |
| 111 | |
| 112 | func (pq waitForPriorityQueue) Len() int { |
| 113 | return len(pq) |
| 114 | } |
| 115 | func (pq waitForPriorityQueue) Less(i, j int) bool { |
| 116 | return pq[i].readyAt.Before(pq[j].readyAt) |
| 117 | } |
| 118 | func (pq waitForPriorityQueue) Swap(i, j int) { |
| 119 | pq[i], pq[j] = pq[j], pq[i] |
| 120 | pq[i].index = i |
| 121 | pq[j].index = j |
| 122 | } |
| 123 | |
| 124 | // Push adds an item to the queue. Push should not be called directly; instead, |
| 125 | // use `heap.Push`. |
| 126 | func (pq *waitForPriorityQueue) Push(x interface{}) { |
| 127 | n := len(*pq) |
| 128 | item := x.(*waitFor) |
| 129 | item.index = n |
| 130 | *pq = append(*pq, item) |
| 131 | } |
| 132 | |
| 133 | // Pop removes an item from the queue. Pop should not be called directly; |
| 134 | // instead, use `heap.Pop`. |
| 135 | func (pq *waitForPriorityQueue) Pop() interface{} { |
| 136 | n := len(*pq) |
| 137 | item := (*pq)[n-1] |
| 138 | item.index = -1 |
| 139 | *pq = (*pq)[0:(n - 1)] |
| 140 | return item |
| 141 | } |
| 142 | |
| 143 | // Peek returns the item at the beginning of the queue, without removing the |
| 144 | // item or otherwise mutating the queue. It is safe to call directly. |
| 145 | func (pq waitForPriorityQueue) Peek() interface{} { |
| 146 | return pq[0] |
| 147 | } |
| 148 | |
| 149 | // ShutDown stops the queue. After the queue drains, the returned shutdown bool |
| 150 | // on Get() will be true. This method may be invoked more than once. |
| 151 | func (q *delayingType) ShutDown() { |
| 152 | q.stopOnce.Do(func() { |
| 153 | q.Interface.ShutDown() |
| 154 | close(q.stopCh) |
| 155 | q.heartbeat.Stop() |
| 156 | }) |
| 157 | } |
| 158 | |
| 159 | // AddAfter adds the given item to the work queue after the given delay |
| 160 | func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { |
| 161 | // don't add if we're already shutting down |
| 162 | if q.ShuttingDown() { |
| 163 | return |
| 164 | } |
| 165 | |
| 166 | q.metrics.retry() |
| 167 | |
| 168 | // immediately add things with no delay |
| 169 | if duration <= 0 { |
| 170 | q.Add(item) |
| 171 | return |
| 172 | } |
| 173 | |
| 174 | select { |
| 175 | case <-q.stopCh: |
| 176 | // unblock if ShutDown() is called |
| 177 | case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: |
| 178 | } |
| 179 | } |
| 180 | |
| 181 | // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. |
| 182 | // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an |
| 183 | // expired item sitting for more than 10 seconds. |
| 184 | const maxWait = 10 * time.Second |
| 185 | |
| 186 | // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. |
| 187 | func (q *delayingType) waitingLoop() { |
| 188 | defer utilruntime.HandleCrash() |
| 189 | |
| 190 | // Make a placeholder channel to use when there are no items in our list |
| 191 | never := make(<-chan time.Time) |
| 192 | |
| 193 | // Make a timer that expires when the item at the head of the waiting queue is ready |
| 194 | var nextReadyAtTimer clock.Timer |
| 195 | |
| 196 | waitingForQueue := &waitForPriorityQueue{} |
| 197 | heap.Init(waitingForQueue) |
| 198 | |
| 199 | waitingEntryByData := map[t]*waitFor{} |
| 200 | |
| 201 | for { |
| 202 | if q.Interface.ShuttingDown() { |
| 203 | return |
| 204 | } |
| 205 | |
| 206 | now := q.clock.Now() |
| 207 | |
| 208 | // Add ready entries |
| 209 | for waitingForQueue.Len() > 0 { |
| 210 | entry := waitingForQueue.Peek().(*waitFor) |
| 211 | if entry.readyAt.After(now) { |
| 212 | break |
| 213 | } |
| 214 | |
| 215 | entry = heap.Pop(waitingForQueue).(*waitFor) |
| 216 | q.Add(entry.data) |
| 217 | delete(waitingEntryByData, entry.data) |
| 218 | } |
| 219 | |
| 220 | // Set up a wait for the first item's readyAt (if one exists) |
| 221 | nextReadyAt := never |
| 222 | if waitingForQueue.Len() > 0 { |
| 223 | if nextReadyAtTimer != nil { |
| 224 | nextReadyAtTimer.Stop() |
| 225 | } |
| 226 | entry := waitingForQueue.Peek().(*waitFor) |
| 227 | nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) |
| 228 | nextReadyAt = nextReadyAtTimer.C() |
| 229 | } |
| 230 | |
| 231 | select { |
| 232 | case <-q.stopCh: |
| 233 | return |
| 234 | |
| 235 | case <-q.heartbeat.C(): |
| 236 | // continue the loop, which will add ready items |
| 237 | |
| 238 | case <-nextReadyAt: |
| 239 | // continue the loop, which will add ready items |
| 240 | |
| 241 | case waitEntry := <-q.waitingForAddCh: |
| 242 | if waitEntry.readyAt.After(q.clock.Now()) { |
| 243 | insert(waitingForQueue, waitingEntryByData, waitEntry) |
| 244 | } else { |
| 245 | q.Add(waitEntry.data) |
| 246 | } |
| 247 | |
| 248 | drained := false |
| 249 | for !drained { |
| 250 | select { |
| 251 | case waitEntry := <-q.waitingForAddCh: |
| 252 | if waitEntry.readyAt.After(q.clock.Now()) { |
| 253 | insert(waitingForQueue, waitingEntryByData, waitEntry) |
| 254 | } else { |
| 255 | q.Add(waitEntry.data) |
| 256 | } |
| 257 | default: |
| 258 | drained = true |
| 259 | } |
| 260 | } |
| 261 | } |
| 262 | } |
| 263 | } |
| 264 | |
| 265 | // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue |
| 266 | func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { |
| 267 | // if the entry already exists, update the time only if it would cause the item to be queued sooner |
| 268 | existing, exists := knownEntries[entry.data] |
| 269 | if exists { |
| 270 | if existing.readyAt.After(entry.readyAt) { |
| 271 | existing.readyAt = entry.readyAt |
| 272 | heap.Fix(q, existing.index) |
| 273 | } |
| 274 | |
| 275 | return |
| 276 | } |
| 277 | |
| 278 | heap.Push(q, entry) |
| 279 | knownEntries[entry.data] = entry |
| 280 | } |