blob: 3ccfd575d94824fe24569935d57e4b1cc19bf1f5 [file] [log] [blame]
/*
Copyright 2014 Workiva, LLC
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
The priority queue is almost a spitting image of the logic
used for a regular queue. In order to keep the logic fast,
this code is repeated instead of using casts to cast to interface{}
back and forth. If Go had inheritance and generics, this problem
would be easier to solve.
*/
package queue
import (
"sort"
"sync"
)
// Item is an item that can be added to the priority queue.
type Item interface {
// Compare returns a bool that can be used to determine
// ordering in the priority queue. Assuming the queue
// is in ascending order, this should return > logic.
// Return 1 to indicate this object is greater than the
// the other logic, 0 to indicate equality, and -1 to indicate
// less than other.
Compare(other Item) int
}
type priorityItems []Item
func (items *priorityItems) get(number int) []Item {
returnItems := make([]Item, 0, number)
index := 0
for i := 0; i < number; i++ {
if i >= len(*items) {
break
}
returnItems = append(returnItems, (*items)[i])
(*items)[i] = nil
index++
}
*items = (*items)[index:]
return returnItems
}
func (items *priorityItems) insert(item Item) {
if len(*items) == 0 {
*items = append(*items, item)
return
}
equalFound := false
i := sort.Search(len(*items), func(i int) bool {
result := (*items)[i].Compare(item)
if result == 0 {
equalFound = true
}
return result >= 0
})
if equalFound {
return
}
if i == len(*items) {
*items = append(*items, item)
return
}
*items = append(*items, nil)
copy((*items)[i+1:], (*items)[i:])
(*items)[i] = item
}
// PriorityQueue is similar to queue except that it takes
// items that implement the Item interface and adds them
// to the queue in priority order.
type PriorityQueue struct {
waiters waiters
items priorityItems
lock sync.Mutex
disposeLock sync.Mutex
disposed bool
}
// Put adds items to the queue.
func (pq *PriorityQueue) Put(items ...Item) error {
if len(items) == 0 {
return nil
}
pq.lock.Lock()
if pq.disposed {
pq.lock.Unlock()
return disposedError
}
for _, item := range items {
pq.items.insert(item)
}
for {
sema := pq.waiters.get()
if sema == nil {
break
}
sema.response.Add(1)
sema.wg.Done()
sema.response.Wait()
if len(pq.items) == 0 {
break
}
}
pq.lock.Unlock()
return nil
}
// Get retrieves items from the queue. If the queue is empty,
// this call blocks until the next item is added to the queue. This
// will attempt to retrieve number of items.
func (pq *PriorityQueue) Get(number int) ([]Item, error) {
if number < 1 {
return nil, nil
}
pq.lock.Lock()
if pq.disposed {
pq.lock.Unlock()
return nil, disposedError
}
var items []Item
if len(pq.items) == 0 {
sema := newSema()
pq.waiters.put(sema)
sema.wg.Add(1)
pq.lock.Unlock()
sema.wg.Wait()
pq.disposeLock.Lock()
if pq.disposed {
pq.disposeLock.Unlock()
return nil, disposedError
}
pq.disposeLock.Unlock()
items = pq.items.get(number)
sema.response.Done()
return items, nil
}
items = pq.items.get(number)
pq.lock.Unlock()
return items, nil
}
// Peek will look at the next item without removing it from the queue.
func (pq *PriorityQueue) Peek() Item {
pq.lock.Lock()
defer pq.lock.Unlock()
if len(pq.items) > 0 {
return pq.items[0]
}
return nil
}
// Empty returns a bool indicating if there are any items left
// in the queue.
func (pq *PriorityQueue) Empty() bool {
pq.lock.Lock()
defer pq.lock.Unlock()
return len(pq.items) == 0
}
// Len returns a number indicating how many items are in the queue.
func (pq *PriorityQueue) Len() int {
pq.lock.Lock()
defer pq.lock.Unlock()
return len(pq.items)
}
// Disposed returns a bool indicating if this queue has been disposed.
func (pq *PriorityQueue) Disposed() bool {
pq.lock.Lock()
defer pq.lock.Unlock()
return pq.disposed
}
// Dispose will prevent any further reads/writes to this queue
// and frees available resources.
func (pq *PriorityQueue) Dispose() {
pq.lock.Lock()
defer pq.lock.Unlock()
pq.disposeLock.Lock()
defer pq.disposeLock.Unlock()
pq.disposed = true
for _, waiter := range pq.waiters {
waiter.response.Add(1)
waiter.wg.Done()
}
pq.items = nil
pq.waiters = nil
}
// NewPriorityQueue is the constructor for a priority queue.
func NewPriorityQueue(hint int) *PriorityQueue {
return &PriorityQueue{
items: make(priorityItems, 0, hint),
}
}