blob: 39009b8e79a3ca02beaccc7f1d46ce5ca41cb0d4 [file] [log] [blame]
Matteo Scandoloa4285862020-12-01 18:10:10 -08001/*
2Copyright 2015 The Kubernetes Authors.
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17package workqueue
18
19import (
20 "sync"
21 "time"
22
23 "k8s.io/apimachinery/pkg/util/clock"
24)
25
26type Interface interface {
27 Add(item interface{})
28 Len() int
29 Get() (item interface{}, shutdown bool)
30 Done(item interface{})
31 ShutDown()
32 ShuttingDown() bool
33}
34
35// New constructs a new work queue (see the package comment).
36func New() *Type {
37 return NewNamed("")
38}
39
40func NewNamed(name string) *Type {
41 rc := clock.RealClock{}
42 return newQueue(
43 rc,
44 globalMetricsFactory.newQueueMetrics(name, rc),
45 defaultUnfinishedWorkUpdatePeriod,
46 )
47}
48
49func newQueue(c clock.Clock, metrics queueMetrics, updatePeriod time.Duration) *Type {
50 t := &Type{
51 clock: c,
52 dirty: set{},
53 processing: set{},
54 cond: sync.NewCond(&sync.Mutex{}),
55 metrics: metrics,
56 unfinishedWorkUpdatePeriod: updatePeriod,
57 }
58 go t.updateUnfinishedWorkLoop()
59 return t
60}
61
62const defaultUnfinishedWorkUpdatePeriod = 500 * time.Millisecond
63
64// Type is a work queue (see the package comment).
65type Type struct {
66 // queue defines the order in which we will work on items. Every
67 // element of queue should be in the dirty set and not in the
68 // processing set.
69 queue []t
70
71 // dirty defines all of the items that need to be processed.
72 dirty set
73
74 // Things that are currently being processed are in the processing set.
75 // These things may be simultaneously in the dirty set. When we finish
76 // processing something and remove it from this set, we'll check if
77 // it's in the dirty set, and if so, add it to the queue.
78 processing set
79
80 cond *sync.Cond
81
82 shuttingDown bool
83
84 metrics queueMetrics
85
86 unfinishedWorkUpdatePeriod time.Duration
87 clock clock.Clock
88}
89
90type empty struct{}
91type t interface{}
92type set map[t]empty
93
94func (s set) has(item t) bool {
95 _, exists := s[item]
96 return exists
97}
98
99func (s set) insert(item t) {
100 s[item] = empty{}
101}
102
103func (s set) delete(item t) {
104 delete(s, item)
105}
106
107// Add marks item as needing processing.
108func (q *Type) Add(item interface{}) {
109 q.cond.L.Lock()
110 defer q.cond.L.Unlock()
111 if q.shuttingDown {
112 return
113 }
114 if q.dirty.has(item) {
115 return
116 }
117
118 q.metrics.add(item)
119
120 q.dirty.insert(item)
121 if q.processing.has(item) {
122 return
123 }
124
125 q.queue = append(q.queue, item)
126 q.cond.Signal()
127}
128
129// Len returns the current queue length, for informational purposes only. You
130// shouldn't e.g. gate a call to Add() or Get() on Len() being a particular
131// value, that can't be synchronized properly.
132func (q *Type) Len() int {
133 q.cond.L.Lock()
134 defer q.cond.L.Unlock()
135 return len(q.queue)
136}
137
138// Get blocks until it can return an item to be processed. If shutdown = true,
139// the caller should end their goroutine. You must call Done with item when you
140// have finished processing it.
141func (q *Type) Get() (item interface{}, shutdown bool) {
142 q.cond.L.Lock()
143 defer q.cond.L.Unlock()
144 for len(q.queue) == 0 && !q.shuttingDown {
145 q.cond.Wait()
146 }
147 if len(q.queue) == 0 {
148 // We must be shutting down.
149 return nil, true
150 }
151
152 item, q.queue = q.queue[0], q.queue[1:]
153
154 q.metrics.get(item)
155
156 q.processing.insert(item)
157 q.dirty.delete(item)
158
159 return item, false
160}
161
162// Done marks item as done processing, and if it has been marked as dirty again
163// while it was being processed, it will be re-added to the queue for
164// re-processing.
165func (q *Type) Done(item interface{}) {
166 q.cond.L.Lock()
167 defer q.cond.L.Unlock()
168
169 q.metrics.done(item)
170
171 q.processing.delete(item)
172 if q.dirty.has(item) {
173 q.queue = append(q.queue, item)
174 q.cond.Signal()
175 }
176}
177
178// ShutDown will cause q to ignore all new items added to it. As soon as the
179// worker goroutines have drained the existing items in the queue, they will be
180// instructed to exit.
181func (q *Type) ShutDown() {
182 q.cond.L.Lock()
183 defer q.cond.L.Unlock()
184 q.shuttingDown = true
185 q.cond.Broadcast()
186}
187
188func (q *Type) ShuttingDown() bool {
189 q.cond.L.Lock()
190 defer q.cond.L.Unlock()
191
192 return q.shuttingDown
193}
194
195func (q *Type) updateUnfinishedWorkLoop() {
196 t := q.clock.NewTicker(q.unfinishedWorkUpdatePeriod)
197 defer t.Stop()
198 for range t.C() {
199 if !func() bool {
200 q.cond.L.Lock()
201 defer q.cond.L.Unlock()
202 if !q.shuttingDown {
203 q.metrics.updateUnfinishedWork()
204 return true
205 }
206 return false
207
208 }() {
209 return
210 }
211 }
212}