[VOL-3678] First implementation of the BBSim-sadis-server

+Copyright 2016 The Kubernetes Authors.
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+    http://www.apache.org/licenses/LICENSE-2.0
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+See the License for the specific language governing permissions and
+limitations under the License.
+package workqueue
+import (
+	"container/heap"
+	"sync"
+	"time"
+	"k8s.io/apimachinery/pkg/util/clock"
+	utilruntime "k8s.io/apimachinery/pkg/util/runtime"
+// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
+// requeue items after failures without ending up in a hot-loop.
+type DelayingInterface interface {
+	Interface
+	// AddAfter adds an item to the workqueue after the indicated duration has passed
+	AddAfter(item interface{}, duration time.Duration)
+// NewDelayingQueue constructs a new workqueue with delayed queuing ability
+func NewDelayingQueue() DelayingInterface {
+	return NewDelayingQueueWithCustomClock(clock.RealClock{}, "")
+// NewDelayingQueueWithCustomQueue constructs a new workqueue with ability to
+// inject custom queue Interface instead of the default one
+func NewDelayingQueueWithCustomQueue(q Interface, name string) DelayingInterface {
+	return newDelayingQueue(clock.RealClock{}, q, name)
+// NewNamedDelayingQueue constructs a new named workqueue with delayed queuing ability
+func NewNamedDelayingQueue(name string) DelayingInterface {
+	return NewDelayingQueueWithCustomClock(clock.RealClock{}, name)
+// NewDelayingQueueWithCustomClock constructs a new named workqueue
+// with ability to inject real or fake clock for testing purposes
+func NewDelayingQueueWithCustomClock(clock clock.Clock, name string) DelayingInterface {
+	return newDelayingQueue(clock, NewNamed(name), name)
+func newDelayingQueue(clock clock.Clock, q Interface, name string) *delayingType {
+	ret := &delayingType{
+		Interface:       q,
+		clock:           clock,
+		heartbeat:       clock.NewTicker(maxWait),
+		stopCh:          make(chan struct{}),
+		waitingForAddCh: make(chan *waitFor, 1000),
+		metrics:         newRetryMetrics(name),
+	}
+	go ret.waitingLoop()
+	return ret
+// delayingType wraps an Interface and provides delayed re-enquing
+type delayingType struct {
+	Interface
+	// clock tracks time for delayed firing
+	clock clock.Clock
+	// stopCh lets us signal a shutdown to the waiting loop
+	stopCh chan struct{}
+	// stopOnce guarantees we only signal shutdown a single time
+	stopOnce sync.Once
+	// heartbeat ensures we wait no more than maxWait before firing
+	heartbeat clock.Ticker
+	// waitingForAddCh is a buffered channel that feeds waitingForAdd
+	waitingForAddCh chan *waitFor
+	// metrics counts the number of retries
+	metrics retryMetrics
+// waitFor holds the data to add and the time it should be added
+type waitFor struct {
+	data    t
+	readyAt time.Time
+	// index in the priority queue (heap)
+	index int
+// waitForPriorityQueue implements a priority queue for waitFor items.
+// waitForPriorityQueue implements heap.Interface. The item occurring next in
+// time (i.e., the item with the smallest readyAt) is at the root (index 0).
+// Peek returns this minimum item at index 0. Pop returns the minimum item after
+// it has been removed from the queue and placed at index Len()-1 by
+// container/heap. Push adds an item at index Len(), and container/heap
+// percolates it into the correct location.
+type waitForPriorityQueue []*waitFor
+func (pq waitForPriorityQueue) Len() int {
+	return len(pq)
+func (pq waitForPriorityQueue) Less(i, j int) bool {
+	return pq[i].readyAt.Before(pq[j].readyAt)
+func (pq waitForPriorityQueue) Swap(i, j int) {
+	pq[i], pq[j] = pq[j], pq[i]
+	pq[i].index = i
+	pq[j].index = j
+// Push adds an item to the queue. Push should not be called directly; instead,
+// use `heap.Push`.
+func (pq *waitForPriorityQueue) Push(x interface{}) {
+	n := len(*pq)
+	item := x.(*waitFor)
+	item.index = n
+	*pq = append(*pq, item)
+// Pop removes an item from the queue. Pop should not be called directly;
+// instead, use `heap.Pop`.
+func (pq *waitForPriorityQueue) Pop() interface{} {
+	n := len(*pq)
+	item := (*pq)[n-1]
+	item.index = -1
+	*pq = (*pq)[0:(n - 1)]
+	return item
+// Peek returns the item at the beginning of the queue, without removing the
+// item or otherwise mutating the queue. It is safe to call directly.
+func (pq waitForPriorityQueue) Peek() interface{} {
+	return pq[0]
+// ShutDown stops the queue. After the queue drains, the returned shutdown bool
+// on Get() will be true. This method may be invoked more than once.
+func (q *delayingType) ShutDown() {
+	q.stopOnce.Do(func() {
+		q.Interface.ShutDown()
+		close(q.stopCh)
+		q.heartbeat.Stop()
+	})
+// AddAfter adds the given item to the work queue after the given delay
+func (q *delayingType) AddAfter(item interface{}, duration time.Duration) {
+	// don't add if we're already shutting down
+	if q.ShuttingDown() {
+		return
+	}
+	q.metrics.retry()
+	// immediately add things with no delay
+	if duration <= 0 {
+		q.Add(item)
+		return
+	}
+	select {
+	case <-q.stopCh:
+		// unblock if ShutDown() is called
+	case q.waitingForAddCh <- &waitFor{data: item, readyAt: q.clock.Now().Add(duration)}:
+	}
+// maxWait keeps a max bound on the wait time. It's just insurance against weird things happening.
+// Checking the queue every 10 seconds isn't expensive and we know that we'll never end up with an
+// expired item sitting for more than 10 seconds.
+const maxWait = 10 * time.Second
+// waitingLoop runs until the workqueue is shutdown and keeps a check on the list of items to be added.
+func (q *delayingType) waitingLoop() {
+	defer utilruntime.HandleCrash()
+	// Make a placeholder channel to use when there are no items in our list
+	never := make(<-chan time.Time)
+	// Make a timer that expires when the item at the head of the waiting queue is ready
+	var nextReadyAtTimer clock.Timer
+	waitingForQueue := &waitForPriorityQueue{}
+	heap.Init(waitingForQueue)
+	waitingEntryByData := map[t]*waitFor{}
+	for {
+		if q.Interface.ShuttingDown() {
+			return
+		}
+		now := q.clock.Now()
+		// Add ready entries
+		for waitingForQueue.Len() > 0 {
+			entry := waitingForQueue.Peek().(*waitFor)
+			if entry.readyAt.After(now) {
+				break
+			}
+			entry = heap.Pop(waitingForQueue).(*waitFor)
+			q.Add(entry.data)
+			delete(waitingEntryByData, entry.data)
+		}
+		// Set up a wait for the first item's readyAt (if one exists)
+		nextReadyAt := never
+		if waitingForQueue.Len() > 0 {
+			if nextReadyAtTimer != nil {
+				nextReadyAtTimer.Stop()
+			}
+			entry := waitingForQueue.Peek().(*waitFor)
+			nextReadyAtTimer = q.clock.NewTimer(entry.readyAt.Sub(now))
+			nextReadyAt = nextReadyAtTimer.C()
+		}
+		select {
+		case <-q.stopCh:
+			return
+		case <-q.heartbeat.C():
+			// continue the loop, which will add ready items
+		case <-nextReadyAt:
+			// continue the loop, which will add ready items
+		case waitEntry := <-q.waitingForAddCh:
+			if waitEntry.readyAt.After(q.clock.Now()) {
+				insert(waitingForQueue, waitingEntryByData, waitEntry)
+			} else {
+				q.Add(waitEntry.data)
+			}
+			drained := false
+			for !drained {
+				select {
+				case waitEntry := <-q.waitingForAddCh:
+					if waitEntry.readyAt.After(q.clock.Now()) {
+						insert(waitingForQueue, waitingEntryByData, waitEntry)
+					} else {
+						q.Add(waitEntry.data)
+					}
+				default:
+					drained = true
+				}
+			}
+		}
+	}
+// insert adds the entry to the priority queue, or updates the readyAt if it already exists in the queue
+func insert(q *waitForPriorityQueue, knownEntries map[t]*waitFor, entry *waitFor) {
+	// if the entry already exists, update the time only if it would cause the item to be queued sooner
+	existing, exists := knownEntries[entry.data]
+	if exists {
+		if existing.readyAt.After(entry.readyAt) {
+			existing.readyAt = entry.readyAt
+			heap.Fix(q, existing.index)
+		}
+		return
+	}
+	heap.Push(q, entry)
+	knownEntries[entry.data] = entry