blob: 9c137a979881694294ce3de20ffc00f9ebf9477d [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001/*
2Copyright 2014 Workiva, LLC
3
4Licensed under the Apache License, Version 2.0 (the "License");
5you may not use this file except in compliance with the License.
6You may obtain a copy of the License at
7
8 http://www.apache.org/licenses/LICENSE-2.0
9
10Unless required by applicable law or agreed to in writing, software
11distributed under the License is distributed on an "AS IS" BASIS,
12WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13See the License for the specific language governing permissions and
14limitations under the License.
15*/
16package queue
17
18import (
19 "runtime"
20 "sync/atomic"
21)
22
23// roundUp takes a uint64 greater than 0 and rounds it up to the next
24// power of 2.
25func roundUp(v uint64) uint64 {
26 v--
27 v |= v >> 1
28 v |= v >> 2
29 v |= v >> 4
30 v |= v >> 8
31 v |= v >> 16
32 v |= v >> 32
33 v++
34 return v
35}
36
37type node struct {
38 position uint64
39 data interface{}
40}
41
42type nodes []*node
43
44// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations
45// only. A put on full or get on empty call will block until an item
46// is put or retrieved. Calling Dispose on the RingBuffer will unblock
47// any blocked threads with an error. This buffer is similar to the buffer
48// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
49// with some minor additions.
50type RingBuffer struct {
51 nodes nodes
52 queue, dequeue, mask, disposed uint64
53}
54
55func (rb *RingBuffer) init(size uint64) {
56 size = roundUp(size)
57 rb.nodes = make(nodes, size)
58 for i := uint64(0); i < size; i++ {
59 rb.nodes[i] = &node{position: i}
60 }
61 rb.mask = size - 1 // so we don't have to do this with every put/get operation
62}
63
64// Put adds the provided item to the queue. If the queue is full, this
65// call will block until an item is added to the queue or Dispose is called
66// on the queue. An error will be returned if the queue is disposed.
67func (rb *RingBuffer) Put(item interface{}) error {
68 var n *node
69 pos := atomic.LoadUint64(&rb.queue)
70L:
71 for {
72 if atomic.LoadUint64(&rb.disposed) == 1 {
73 return disposedError
74 }
75
76 n = rb.nodes[pos&rb.mask]
77 seq := atomic.LoadUint64(&n.position)
78 switch dif := seq - pos; {
79 case dif == 0:
80 if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) {
81 break L
82 }
83 case dif < 0:
84 panic(`Ring buffer in a compromised state during a put operation.`)
85 default:
86 pos = atomic.LoadUint64(&rb.queue)
87 }
88 runtime.Gosched() // free up the cpu before the next iteration
89 }
90
91 n.data = item
92 atomic.StoreUint64(&n.position, pos+1)
93 return nil
94}
95
96// Get will return the next item in the queue. This call will block
97// if the queue is empty. This call will unblock when an item is added
98// to the queue or Dispose is called on the queue. An error will be returned
99// if the queue is disposed.
100func (rb *RingBuffer) Get() (interface{}, error) {
101 var n *node
102 pos := atomic.LoadUint64(&rb.dequeue)
103L:
104 for {
105 if atomic.LoadUint64(&rb.disposed) == 1 {
106 return nil, disposedError
107 }
108
109 n = rb.nodes[pos&rb.mask]
110 seq := atomic.LoadUint64(&n.position)
111 switch dif := seq - (pos + 1); {
112 case dif == 0:
113 if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) {
114 break L
115 }
116 case dif < 0:
117 panic(`Ring buffer in compromised state during a get operation.`)
118 default:
119 pos = atomic.LoadUint64(&rb.dequeue)
120 }
121 runtime.Gosched() // free up cpu before next iteration
122 }
123 data := n.data
124 n.data = nil
125 atomic.StoreUint64(&n.position, pos+rb.mask+1)
126 return data, nil
127}
128
129// Len returns the number of items in the queue.
130func (rb *RingBuffer) Len() uint64 {
131 return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
132}
133
134// Cap returns the capacity of this ring buffer.
135func (rb *RingBuffer) Cap() uint64 {
136 return uint64(len(rb.nodes))
137}
138
139// Dispose will dispose of this queue and free any blocked threads
140// in the Put and/or Get methods. Calling those methods on a disposed
141// queue will return an error.
142func (rb *RingBuffer) Dispose() {
143 atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
144}
145
146// IsDisposed will return a bool indicating if this queue has been
147// disposed.
148func (rb *RingBuffer) IsDisposed() bool {
149 return atomic.LoadUint64(&rb.disposed) == 1
150}
151
152// NewRingBuffer will allocate, initialize, and return a ring buffer
153// with the specified size.
154func NewRingBuffer(size uint64) *RingBuffer {
155 rb := &RingBuffer{}
156 rb.init(size)
157 return rb
158}