[VOL-3678] First implementation of the BBSim-sadis-server
Change-Id: I5077a8f861f4cc6af9759f31a4a415042c05eba3
diff --git a/vendor/k8s.io/client-go/util/workqueue/delaying_queue.go b/vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
new file mode 100644
index 0000000..31d9182
--- /dev/null
+++ b/vendor/k8s.io/client-go/util/workqueue/delaying_queue.go
@@ -0,0 +1,280 @@
+/*
+Copyright 2016 The Kubernetes Authors.
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+*/
+
+package workqueue
+
+import (
+ "container/heap"
+ "sync"
+ "time"
+
+ "k8s.io/apimachinery/pkg/util/clock"
+ utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+)
+
+// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
+// requeue items after failures without ending up in a hot-loop.
+type DelayingInterface interface {
+ Interface
+ // AddAfter adds an item to the workqueue after the indicated duration has passed
+ AddAfter(item interface{}, duration time.Duration)
+}
+
+// NewDelayingQueue constructs a new workqueue with delayed queuing ability
+func NewDelayingQueue() DelayingInterface {
+ return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
+}
+
+// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
+// inject custom queue Interface instead of the default one
+func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
+ return newDelayingQueue(clock.RealClock{}, q, name)
+}
+
+// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
+func NewNamedDelayingQueue(name string) DelayingInterface {
+ return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
+}
+
+// NewDelayingQueueWithCustomClock constructs a new named workqueue
+// with ability to inject real or fake clock for testing purposes
+func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
+ return newDelayingQueue(clock, NewNamed(name), name)
+}
+
+func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
+ ret := &delayingType{
+ Interface: q,
+ clock: clock,
+ heartbeat: clock.NewTicker(maxWait),
+ stopCh: make(chan struct{}),
+ waitingForAddCh: make(chan *waitFor, 1000),
+ metrics: newRetryMetrics(name),
+ }
+
+ go ret.waitingLoop()
+ return ret
+}
+
+// delayingType wraps an Interface and provides delayed re-enquing
+type delayingType struct {
+ Interface
+
+ // clock tracks time for delayed firing
+ clock clock.Clock
+
+ // stopCh lets us signal a shutdown to the waiting loop
+ stopCh chan struct{}
+ // stopOnce guarantees we only signal shutdown a single time
+ stopOnce sync.Once
+
+ // heartbeat ensures we wait no more than maxWait before firing
+ heartbeat clock.Ticker
+
+ // waitingForAddCh is a buffered channel that feeds waitingForAdd
+ waitingForAddCh chan *waitFor
+
+ // metrics counts the number of retries
+ metrics retryMetrics
+}
+
+// waitFor holds the data to add and the time it should be added
+type waitFor struct {
+ data t
+ readyAt time.Time
+ // index in the priority queue (heap)
+ index int
+}
+
+// waitForPriorityQueue implements a priority queue for waitFor items.
+//
+// waitForPriorityQueue implements heap.Interface. The item occurring next in
+// time (i.e., the item with the smallest readyAt) is at the root (index 0).
+// Peek returns this minimum item at index 0. Pop returns the minimum item after
+// it has been removed from the queue and placed at index Len()-1 by
+// container/heap. Push adds an item at index Len(), and container/heap
+// percolates it into the correct location.
+type waitForPriorityQueue []*waitFor
+
+func (pq waitForPriorityQueue) Len() int {
+ return len(pq)
+}
+func (pq waitForPriorityQueue) Less(i, j int) bool {
+ return pq[i].readyAt.Before(pq[j].readyAt)
+}
+func (pq waitForPriorityQueue) Swap(i, j int) {
+ pq[i], pq[j] = pq[j], pq[i]
+ pq[i].index = i
+ pq[j].index = j
+}
+
+// Push adds an item to the queue. Push should not be called directly; instead,
+// use `heap.Push`.
+func (pq *waitForPriorityQueue) Push(x interface{}) {
+ n := len(*pq)
+ item := x.(*waitFor)
+ item.index = n
+ *pq = append(*pq, item)
+}
+
+// Pop removes an item from the queue. Pop should not be called directly;
+// instead, use `heap.Pop`.
+func (pq *waitForPriorityQueue) Pop() interface{} {
+ n := len(*pq)
+ item := (*pq)[n-1]
+ item.index = -1
+ *pq = (*pq)[0:(n - 1)]
+ return item
+}
+
+// Peek returns the item at the beginning of the queue, without removing the
+// item or otherwise mutating the queue. It is safe to call directly.
+func (pq waitForPriorityQueue) Peek() interface{} {
+ return pq[0]
+}
+
+// ShutDown stops the queue. After the queue drains, the returned shutdown bool
+// on Get() will be true. This method may be invoked more than once.
+func (q *delayingType) ShutDown() {
+ q.stopOnce.Do(func() {
+ q.Interface.ShutDown()
+ close(q.stopCh)
+ q.heartbeat.Stop()
+ })
+}
+
+// AddAfter adds the given item to the work queue after the given delay
+func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
+ // don't add if we're already shutting down
+ if q.ShuttingDown() {
+ return
+ }
+
+ q.metrics.retry()
+
+ // immediately add things with no delay
+ if duration <= 0 {
+ q.Add(item)
+ return
+ }
+
+ select {
+ case <-q.stopCh:
+ // unblock if ShutDown() is called
+ case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
+ }
+}
+
+// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
+// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
+// expired item sitting for more than 10 seconds.
+const maxWait = 10 * time.Second
+
+// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
+func (q *delayingType) waitingLoop() {
+ defer utilruntime.HandleCrash()
+
+ // Make a placeholder channel to use when there are no items in our list
+ never := make(<-chan time.Time)
+
+ // Make a timer that expires when the item at the head of the waiting queue is ready
+ var nextReadyAtTimer clock.Timer
+
+ waitingForQueue := &waitForPriorityQueue{}
+ heap.Init(waitingForQueue)
+
+ waitingEntryByData := map[t]*waitFor{}
+
+ for {
+ if q.Interface.ShuttingDown() {
+ return
+ }
+
+ now := q.clock.Now()
+
+ // Add ready entries
+ for waitingForQueue.Len() > 0 {
+ entry := waitingForQueue.Peek().(*waitFor)
+ if entry.readyAt.After(now) {
+ break
+ }
+
+ entry = heap.Pop(waitingForQueue).(*waitFor)
+ q.Add(entry.data)
+ delete(waitingEntryByData, entry.data)
+ }
+
+ // Set up a wait for the first item's readyAt (if one exists)
+ nextReadyAt := never
+ if waitingForQueue.Len() > 0 {
+ if nextReadyAtTimer != nil {
+ nextReadyAtTimer.Stop()
+ }
+ entry := waitingForQueue.Peek().(*waitFor)
+ nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
+ nextReadyAt = nextReadyAtTimer.C()
+ }
+
+ select {
+ case <-q.stopCh:
+ return
+
+ case <-q.heartbeat.C():
+ // continue the loop, which will add ready items
+
+ case <-nextReadyAt:
+ // continue the loop, which will add ready items
+
+ case waitEntry := <-q.waitingForAddCh:
+ if waitEntry.readyAt.After(q.clock.Now()) {
+ insert(waitingForQueue, waitingEntryByData, waitEntry)
+ } else {
+ q.Add(waitEntry.data)
+ }
+
+ drained := false
+ for !drained {
+ select {
+ case waitEntry := <-q.waitingForAddCh:
+ if waitEntry.readyAt.After(q.clock.Now()) {
+ insert(waitingForQueue, waitingEntryByData, waitEntry)
+ } else {
+ q.Add(waitEntry.data)
+ }
+ default:
+ drained = true
+ }
+ }
+ }
+ }
+}
+
+// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
+func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
+ // if the entry already exists, update the time only if it would cause the item to be queued sooner
+ existing, exists := knownEntries[entry.data]
+ if exists {
+ if existing.readyAt.After(entry.readyAt) {
+ existing.readyAt = entry.readyAt
+ heap.Fix(q, existing.index)
+ }
+
+ return
+ }
+
+ heap.Push(q, entry)
+ knownEntries[entry.data] = entry
+}