| /* |
| Copyright 2016 The Kubernetes Authors. |
| |
| Licensed under the Apache License, Version 2.0 (the "License"); |
| you may not use this file except in compliance with the License. |
| You may obtain a copy of the License at |
| |
| http://www.apache.org/licenses/LICENSE-2.0 |
| |
| Unless required by applicable law or agreed to in writing, software |
| distributed under the License is distributed on an "AS IS" BASIS, |
| WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| See the License for the specific language governing permissions and |
| limitations under the License. |
| */ |
| |
| package workqueue |
| |
| import ( |
| "container/heap" |
| "sync" |
| "time" |
| |
| "k8s.io/apimachinery/pkg/util/clock" |
| utilruntime "k8s.io/apimachinery/pkg/util/runtime" |
| ) |
| |
| // DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to |
| // requeue items after failures without ending up in a hot-loop. |
| type DelayingInterface interface { |
| Interface |
| // AddAfter adds an item to the workqueue after the indicated duration has passed |
| AddAfter(item interface{}, duration time.Duration) |
| } |
| |
| // NewDelayingQueue constructs a new workqueue with delayed queuing ability |
| func NewDelayingQueue() DelayingInterface { |
| return NewDelayingQueueWithCustomClock(clock.RealClock{}, "") |
| } |
| |
| // NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to |
| // inject custom queue Interface instead of the default one |
| func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface { |
| return newDelayingQueue(clock.RealClock{}, q, name) |
| } |
| |
| // NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability |
| func NewNamedDelayingQueue(name string) DelayingInterface { |
| return NewDelayingQueueWithCustomClock(clock.RealClock{}, name) |
| } |
| |
| // NewDelayingQueueWithCustomClock constructs a new named workqueue |
| // with ability to inject real or fake clock for testing purposes |
| func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface { |
| return newDelayingQueue(clock, NewNamed(name), name) |
| } |
| |
| func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType { |
| ret := &delayingType{ |
| Interface: q, |
| clock: clock, |
| heartbeat: clock.NewTicker(maxWait), |
| stopCh: make(chan struct{}), |
| waitingForAddCh: make(chan *waitFor, 1000), |
| metrics: newRetryMetrics(name), |
| } |
| |
| go ret.waitingLoop() |
| return ret |
| } |
| |
| // delayingType wraps an Interface and provides delayed re-enquing |
| type delayingType struct { |
| Interface |
| |
| // clock tracks time for delayed firing |
| clock clock.Clock |
| |
| // stopCh lets us signal a shutdown to the waiting loop |
| stopCh chan struct{} |
| // stopOnce guarantees we only signal shutdown a single time |
| stopOnce sync.Once |
| |
| // heartbeat ensures we wait no more than maxWait before firing |
| heartbeat clock.Ticker |
| |
| // waitingForAddCh is a buffered channel that feeds waitingForAdd |
| waitingForAddCh chan *waitFor |
| |
| // metrics counts the number of retries |
| metrics retryMetrics |
| } |
| |
| // waitFor holds the data to add and the time it should be added |
| type waitFor struct { |
| data t |
| readyAt time.Time |
| // index in the priority queue (heap) |
| index int |
| } |
| |
| // waitForPriorityQueue implements a priority queue for waitFor items. |
| // |
| // waitForPriorityQueue implements heap.Interface. The item occurring next in |
| // time (i.e., the item with the smallest readyAt) is at the root (index 0). |
| // Peek returns this minimum item at index 0. Pop returns the minimum item after |
| // it has been removed from the queue and placed at index Len()-1 by |
| // container/heap. Push adds an item at index Len(), and container/heap |
| // percolates it into the correct location. |
| type waitForPriorityQueue []*waitFor |
| |
| func (pq waitForPriorityQueue) Len() int { |
| return len(pq) |
| } |
| func (pq waitForPriorityQueue) Less(i, j int) bool { |
| return pq[i].readyAt.Before(pq[j].readyAt) |
| } |
| func (pq waitForPriorityQueue) Swap(i, j int) { |
| pq[i], pq[j] = pq[j], pq[i] |
| pq[i].index = i |
| pq[j].index = j |
| } |
| |
| // Push adds an item to the queue. Push should not be called directly; instead, |
| // use `heap.Push`. |
| func (pq *waitForPriorityQueue) Push(x interface{}) { |
| n := len(*pq) |
| item := x.(*waitFor) |
| item.index = n |
| *pq = append(*pq, item) |
| } |
| |
| // Pop removes an item from the queue. Pop should not be called directly; |
| // instead, use `heap.Pop`. |
| func (pq *waitForPriorityQueue) Pop() interface{} { |
| n := len(*pq) |
| item := (*pq)[n-1] |
| item.index = -1 |
| *pq = (*pq)[0:(n - 1)] |
| return item |
| } |
| |
| // Peek returns the item at the beginning of the queue, without removing the |
| // item or otherwise mutating the queue. It is safe to call directly. |
| func (pq waitForPriorityQueue) Peek() interface{} { |
| return pq[0] |
| } |
| |
| // ShutDown stops the queue. After the queue drains, the returned shutdown bool |
| // on Get() will be true. This method may be invoked more than once. |
| func (q *delayingType) ShutDown() { |
| q.stopOnce.Do(func() { |
| q.Interface.ShutDown() |
| close(q.stopCh) |
| q.heartbeat.Stop() |
| }) |
| } |
| |
| // AddAfter adds the given item to the work queue after the given delay |
| func (q *delayingType) AddAfter(item interface{}, duration time.Duration) { |
| // don't add if we're already shutting down |
| if q.ShuttingDown() { |
| return |
| } |
| |
| q.metrics.retry() |
| |
| // immediately add things with no delay |
| if duration <= 0 { |
| q.Add(item) |
| return |
| } |
| |
| select { |
| case <-q.stopCh: |
| // unblock if ShutDown() is called |
| case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}: |
| } |
| } |
| |
| // maxWait keeps a max bound on the wait time. It's just insurance against weird things happening. |
| // Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an |
| // expired item sitting for more than 10 seconds. |
| const maxWait = 10 * time.Second |
| |
| // waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added. |
| func (q *delayingType) waitingLoop() { |
| defer utilruntime.HandleCrash() |
| |
| // Make a placeholder channel to use when there are no items in our list |
| never := make(<-chan time.Time) |
| |
| // Make a timer that expires when the item at the head of the waiting queue is ready |
| var nextReadyAtTimer clock.Timer |
| |
| waitingForQueue := &waitForPriorityQueue{} |
| heap.Init(waitingForQueue) |
| |
| waitingEntryByData := map[t]*waitFor{} |
| |
| for { |
| if q.Interface.ShuttingDown() { |
| return |
| } |
| |
| now := q.clock.Now() |
| |
| // Add ready entries |
| for waitingForQueue.Len() > 0 { |
| entry := waitingForQueue.Peek().(*waitFor) |
| if entry.readyAt.After(now) { |
| break |
| } |
| |
| entry = heap.Pop(waitingForQueue).(*waitFor) |
| q.Add(entry.data) |
| delete(waitingEntryByData, entry.data) |
| } |
| |
| // Set up a wait for the first item's readyAt (if one exists) |
| nextReadyAt := never |
| if waitingForQueue.Len() > 0 { |
| if nextReadyAtTimer != nil { |
| nextReadyAtTimer.Stop() |
| } |
| entry := waitingForQueue.Peek().(*waitFor) |
| nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now)) |
| nextReadyAt = nextReadyAtTimer.C() |
| } |
| |
| select { |
| case <-q.stopCh: |
| return |
| |
| case <-q.heartbeat.C(): |
| // continue the loop, which will add ready items |
| |
| case <-nextReadyAt: |
| // continue the loop, which will add ready items |
| |
| case waitEntry := <-q.waitingForAddCh: |
| if waitEntry.readyAt.After(q.clock.Now()) { |
| insert(waitingForQueue, waitingEntryByData, waitEntry) |
| } else { |
| q.Add(waitEntry.data) |
| } |
| |
| drained := false |
| for !drained { |
| select { |
| case waitEntry := <-q.waitingForAddCh: |
| if waitEntry.readyAt.After(q.clock.Now()) { |
| insert(waitingForQueue, waitingEntryByData, waitEntry) |
| } else { |
| q.Add(waitEntry.data) |
| } |
| default: |
| drained = true |
| } |
| } |
| } |
| } |
| } |
| |
| // insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue |
| func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) { |
| // if the entry already exists, update the time only if it would cause the item to be queued sooner |
| existing, exists := knownEntries[entry.data] |
| if exists { |
| if existing.readyAt.After(entry.readyAt) { |
| existing.readyAt = entry.readyAt |
| heap.Fix(q, existing.index) |
| } |
| |
| return |
| } |
| |
| heap.Push(q, entry) |
| knownEntries[entry.data] = entry |
| } |