blob: 3ccfd575d94824fe24569935d57e4b1cc19bf1f5 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2Copyright 2014 Workiva, LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16
17/*
18The priority queue is almost a spitting image of the logic
19used for a regular queue. In order to keep the logic fast,
20this code is repeated instead of using casts to cast to interface{}
21back and forth. If Go had inheritance and generics, this problem
22would be easier to solve.
23*/
24package queue
25
26import (
27 "sort"
28 "sync"
29)
30
31// Item is an item that can be added to the priority queue.
32type Item interface {
33 // Compare returns a bool that can be used to determine
34 // ordering in the priority queue. Assuming the queue
35 // is in ascending order, this should return > logic.
36 // Return 1 to indicate this object is greater than the
37 // the other logic, 0 to indicate equality, and -1 to indicate
38 // less than other.
39 Compare(other Item) int
40}
41
42type priorityItems []Item
43
44func (items *priorityItems) get(number int) []Item {
45 returnItems := make([]Item, 0, number)
46 index := 0
47 for i := 0; i < number; i++ {
48 if i >= len(*items) {
49 break
50 }
51
52 returnItems = append(returnItems, (*items)[i])
53 (*items)[i] = nil
54 index++
55 }
56
57 *items = (*items)[index:]
58 return returnItems
59}
60
61func (items *priorityItems) insert(item Item) {
62 if len(*items) == 0 {
63 *items = append(*items, item)
64 return
65 }
66
67 equalFound := false
68 i := sort.Search(len(*items), func(i int) bool {
69 result := (*items)[i].Compare(item)
70 if result == 0 {
71 equalFound = true
72 }
73 return result >= 0
74 })
75
76 if equalFound {
77 return
78 }
79
80 if i == len(*items) {
81 *items = append(*items, item)
82 return
83 }
84
85 *items = append(*items, nil)
86 copy((*items)[i+1:], (*items)[i:])
87 (*items)[i] = item
88}
89
90// PriorityQueue is similar to queue except that it takes
91// items that implement the Item interface and adds them
92// to the queue in priority order.
93type PriorityQueue struct {
94 waiters waiters
95 items priorityItems
96 lock sync.Mutex
97 disposeLock sync.Mutex
98 disposed bool
99}
100
101// Put adds items to the queue.
102func (pq *PriorityQueue) Put(items ...Item) error {
103 if len(items) == 0 {
104 return nil
105 }
106
107 pq.lock.Lock()
108 if pq.disposed {
109 pq.lock.Unlock()
110 return disposedError
111 }
112
113 for _, item := range items {
114 pq.items.insert(item)
115 }
116
117 for {
118 sema := pq.waiters.get()
119 if sema == nil {
120 break
121 }
122
123 sema.response.Add(1)
124 sema.wg.Done()
125 sema.response.Wait()
126 if len(pq.items) == 0 {
127 break
128 }
129 }
130
131 pq.lock.Unlock()
132 return nil
133}
134
135// Get retrieves items from the queue. If the queue is empty,
136// this call blocks until the next item is added to the queue. This
137// will attempt to retrieve number of items.
138func (pq *PriorityQueue) Get(number int) ([]Item, error) {
139 if number < 1 {
140 return nil, nil
141 }
142
143 pq.lock.Lock()
144
145 if pq.disposed {
146 pq.lock.Unlock()
147 return nil, disposedError
148 }
149
150 var items []Item
151
152 if len(pq.items) == 0 {
153 sema := newSema()
154 pq.waiters.put(sema)
155 sema.wg.Add(1)
156 pq.lock.Unlock()
157
158 sema.wg.Wait()
159 pq.disposeLock.Lock()
160 if pq.disposed {
161 pq.disposeLock.Unlock()
162 return nil, disposedError
163 }
164 pq.disposeLock.Unlock()
165
166 items = pq.items.get(number)
167 sema.response.Done()
168 return items, nil
169 }
170
171 items = pq.items.get(number)
172 pq.lock.Unlock()
173 return items, nil
174}
175
176// Peek will look at the next item without removing it from the queue.
177func (pq *PriorityQueue) Peek() Item {
178 pq.lock.Lock()
179 defer pq.lock.Unlock()
180 if len(pq.items) > 0 {
181 return pq.items[0]
182 }
183 return nil
184}
185
186// Empty returns a bool indicating if there are any items left
187// in the queue.
188func (pq *PriorityQueue) Empty() bool {
189 pq.lock.Lock()
190 defer pq.lock.Unlock()
191
192 return len(pq.items) == 0
193}
194
195// Len returns a number indicating how many items are in the queue.
196func (pq *PriorityQueue) Len() int {
197 pq.lock.Lock()
198 defer pq.lock.Unlock()
199
200 return len(pq.items)
201}
202
203// Disposed returns a bool indicating if this queue has been disposed.
204func (pq *PriorityQueue) Disposed() bool {
205 pq.lock.Lock()
206 defer pq.lock.Unlock()
207
208 return pq.disposed
209}
210
211// Dispose will prevent any further reads/writes to this queue
212// and frees available resources.
213func (pq *PriorityQueue) Dispose() {
214 pq.lock.Lock()
215 defer pq.lock.Unlock()
216
217 pq.disposeLock.Lock()
218 defer pq.disposeLock.Unlock()
219
220 pq.disposed = true
221 for _, waiter := range pq.waiters {
222 waiter.response.Add(1)
223 waiter.wg.Done()
224 }
225
226 pq.items = nil
227 pq.waiters = nil
228}
229
230// NewPriorityQueue is the constructor for a priority queue.
231func NewPriorityQueue(hint int) *PriorityQueue {
232 return &PriorityQueue{
233 items: make(priorityItems, 0, hint),
234 }
235}