blob: 856ae3eff36eba41f8865c7d63ffd068335fd3cd [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2Copyright 2014 Workiva, LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18Package queue includes a regular queue and a priority queue.
19These queues rely on waitgroups to pause listening threads
20on empty queues until a message is received. If any thread
21calls Dispose on the queue, any listeners are immediately returned
22with an error. Any subsequent put to the queue will return an error
23as opposed to panicking as with channels. Queues will grow with unbounded
24behavior as opposed to channels which can be buffered but will pause
25while a thread attempts to put to a full channel.
26
27Recently added is a lockless ring buffer using the same basic C design as
28found here:
29
30http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
31
32Modified for use with Go with the addition of some dispose semantics providing
33the capability to release blocked threads. This works for both puts
34and gets, either will return an error if they are blocked and the buffer
35is disposed. This could serve as a signal to kill a goroutine. All threadsafety
36is acheived using CAS operations, making this buffer pretty quick.
37
38Benchmarks:
39BenchmarkPriorityQueue-8 2000000 782 ns/op
40BenchmarkQueue-8 2000000 671 ns/op
41BenchmarkChannel-8 1000000 2083 ns/op
42BenchmarkQueuePut-8 20000 84299 ns/op
43BenchmarkQueueGet-8 20000 80753 ns/op
44BenchmarkExecuteInParallel-8 20000 68891 ns/op
45BenchmarkRBLifeCycle-8 10000000 177 ns/op
46BenchmarkRBPut-8 30000000 58.1 ns/op
47BenchmarkRBGet-8 50000000 26.8 ns/op
48
49TODO: We really need a Fibonacci heap for the priority queue.
50TODO: Unify the types of queue to the same interface.
51*/
52package queue
53
54import (
55 "runtime"
56 "sync"
57 "sync/atomic"
58)
59
60type waiters []*sema
61
62func (w *waiters) get() *sema {
63 if len(*w) == 0 {
64 return nil
65 }
66
67 sema := (*w)[0]
68 copy((*w)[0:], (*w)[1:])
69 (*w)[len(*w)-1] = nil // or the zero value of T
70 *w = (*w)[:len(*w)-1]
71 return sema
72}
73
74func (w *waiters) put(sema *sema) {
75 *w = append(*w, sema)
76}
77
78type items []interface{}
79
80func (items *items) get(number int64) []interface{} {
81 returnItems := make([]interface{}, 0, number)
82 index := int64(0)
83 for i := int64(0); i < number; i++ {
84 if i >= int64(len(*items)) {
85 break
86 }
87
88 returnItems = append(returnItems, (*items)[i])
89 (*items)[i] = nil
90 index++
91 }
92
93 *items = (*items)[index:]
94 return returnItems
95}
96
97func (items *items) getUntil(checker func(item interface{}) bool) []interface{} {
98 length := len(*items)
99
100 if len(*items) == 0 {
101 // returning nil here actually wraps that nil in a list
102 // of interfaces... thanks go
103 return []interface{}{}
104 }
105
106 returnItems := make([]interface{}, 0, length)
107 index := 0
108 for i, item := range *items {
109 if !checker(item) {
110 break
111 }
112
113 returnItems = append(returnItems, item)
114 index = i
115 }
116
117 *items = (*items)[index:]
118 return returnItems
119}
120
121type sema struct {
122 wg *sync.WaitGroup
123 response *sync.WaitGroup
124}
125
126func newSema() *sema {
127 return &sema{
128 wg: &sync.WaitGroup{},
129 response: &sync.WaitGroup{},
130 }
131}
132
133// Queue is the struct responsible for tracking the state
134// of the queue.
135type Queue struct {
136 waiters waiters
137 items items
138 lock sync.Mutex
139 disposed bool
140}
141
142// Put will add the specified items to the queue.
143func (q *Queue) Put(items ...interface{}) error {
144 if len(items) == 0 {
145 return nil
146 }
147
148 q.lock.Lock()
149
150 if q.disposed {
151 q.lock.Unlock()
152 return disposedError
153 }
154
155 q.items = append(q.items, items...)
156 for {
157 sema := q.waiters.get()
158 if sema == nil {
159 break
160 }
161 sema.response.Add(1)
162 sema.wg.Done()
163 sema.response.Wait()
164 if len(q.items) == 0 {
165 break
166 }
167 }
168
169 q.lock.Unlock()
170 return nil
171}
172
173// Get will add an item to the queue. If there are some items in the
174// queue, get will return a number UP TO the number passed in as a
175// parameter. If no items are in the queue, this method will pause
176// until items are added to the queue.
177func (q *Queue) Get(number int64) ([]interface{}, error) {
178 if number < 1 {
179 // thanks again go
180 return []interface{}{}, nil
181 }
182
183 q.lock.Lock()
184
185 if q.disposed {
186 q.lock.Unlock()
187 return nil, disposedError
188 }
189
190 var items []interface{}
191
192 if len(q.items) == 0 {
193 sema := newSema()
194 q.waiters.put(sema)
195 sema.wg.Add(1)
196 q.lock.Unlock()
197
198 sema.wg.Wait()
199 // we are now inside the put's lock
200 if q.disposed {
201 return nil, disposedError
202 }
203 items = q.items.get(number)
204 sema.response.Done()
205 return items, nil
206 }
207
208 items = q.items.get(number)
209 q.lock.Unlock()
210 return items, nil
211}
212
213// TakeUntil takes a function and returns a list of items that
214// match the checker until the checker returns false. This does not
215// wait if there are no items in the queue.
216func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) {
217 if checker == nil {
218 return nil, nil
219 }
220
221 q.lock.Lock()
222
223 if q.disposed {
224 q.lock.Unlock()
225 return nil, disposedError
226 }
227
228 result := q.items.getUntil(checker)
229 q.lock.Unlock()
230 return result, nil
231}
232
233// Empty returns a bool indicating if this bool is empty.
234func (q *Queue) Empty() bool {
235 q.lock.Lock()
236 defer q.lock.Unlock()
237
238 return len(q.items) == 0
239}
240
241// Len returns the number of items in this queue.
242func (q *Queue) Len() int64 {
243 q.lock.Lock()
244 defer q.lock.Unlock()
245
246 return int64(len(q.items))
247}
248
249// Disposed returns a bool indicating if this queue
250// has had disposed called on it.
251func (q *Queue) Disposed() bool {
252 q.lock.Lock()
253 defer q.lock.Unlock()
254
255 return q.disposed
256}
257
258// Dispose will dispose of this queue. Any subsequent
259// calls to Get or Put will return an error.
260func (q *Queue) Dispose() {
261 q.lock.Lock()
262 defer q.lock.Unlock()
263
264 q.disposed = true
265 for _, waiter := range q.waiters {
266 waiter.response.Add(1)
267 waiter.wg.Done()
268 }
269
270 q.items = nil
271 q.waiters = nil
272}
273
274// New is a constructor for a new threadsafe queue.
275func New(hint int64) *Queue {
276 return &Queue{
277 items: make([]interface{}, 0, hint),
278 }
279}
280
281// ExecuteInParallel will (in parallel) call the provided function
282// with each item in the queue until the queue is exhausted. When the queue
283// is exhausted execution is complete and all goroutines will be killed.
284// This means that the queue will be disposed so cannot be used again.
285func ExecuteInParallel(q *Queue, fn func(interface{})) {
286 if q == nil {
287 return
288 }
289
290 q.lock.Lock() // so no one touches anything in the middle
291 // of this process
292 todo, done := uint64(len(q.items)), int64(-1)
293 // this is important or we might face an infinite loop
294 if todo == 0 {
295 return
296 }
297
298 numCPU := 1
299 if runtime.NumCPU() > 1 {
300 numCPU = runtime.NumCPU() - 1
301 }
302
303 var wg sync.WaitGroup
304 wg.Add(numCPU)
305 items := q.items
306
307 for i := 0; i < numCPU; i++ {
308 go func() {
309 for {
310 index := atomic.AddInt64(&done, 1)
311 if index >= int64(todo) {
312 wg.Done()
313 break
314 }
315
316 fn(items[index])
317 items[index] = 0
318 }
319 }()
320 }
321 wg.Wait()
322 q.lock.Unlock()
323 q.Dispose()
324}