VOL-1859 : If streaming calls: ReceivePacketsIn/ChangeEvents are received
multiple times from ofagent while it tries to subscribe to the cores,
just retain the most recent one and exit the last receiver whose stream won't be valid.
Just retain one receiver for streaming packetins/changeevents in order to avoid
packet drops.
Also removing useless debug logs
Change-Id: I18950a2970044e341912b0b71243206c8488c3b7
diff --git a/Gopkg.lock b/Gopkg.lock
index 3105662..f5825ef 100644
--- a/Gopkg.lock
+++ b/Gopkg.lock
@@ -119,14 +119,6 @@
version = "v1.2.1"
[[projects]]
- branch = "master"
- digest = "1:d6abd135858a47cf5863d9f095bfa88c43f8f18dcf43c94498c2af7c6d72f6c2"
- name = "github.com/golang-collections/go-datastructures"
- packages = ["queue"]
- pruneopts = "UT"
- revision = "59788d5eb2591d3497ffb8fafed2f16fe00e7775"
-
-[[projects]]
digest = "1:c33a34578570e05f7aab50030acfbeee2d95ee7690bfb803a08bbd9a7b820e4e"
name = "github.com/golang/protobuf"
packages = [
@@ -780,7 +772,6 @@
"github.com/bsm/sarama-cluster",
"github.com/cevaris/ordered_map",
"github.com/gogo/protobuf/proto",
- "github.com/golang-collections/go-datastructures/queue",
"github.com/golang/protobuf/descriptor",
"github.com/golang/protobuf/proto",
"github.com/golang/protobuf/protoc-gen-go",
diff --git a/Gopkg.toml b/Gopkg.toml
index 8ffc5b0..b9190ae 100644
--- a/Gopkg.toml
+++ b/Gopkg.toml
@@ -56,10 +56,6 @@
version = "1.2.0"
[[constraint]]
- branch = "master"
- name = "github.com/golang-collections/go-datastructures"
-
-[[constraint]]
name = "github.com/golang/protobuf"
version = "=1.3.1"
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 55d3855..f2c16e7 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -18,7 +18,6 @@
import (
"context"
"errors"
- "github.com/golang-collections/go-datastructures/queue"
"github.com/golang/protobuf/ptypes/empty"
da "github.com/opencord/voltha-go/common/core/northbound/grpc"
"github.com/opencord/voltha-go/common/log"
@@ -31,6 +30,7 @@
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/status"
"io"
+ "sync"
"time"
)
@@ -45,8 +45,10 @@
deviceMgr *DeviceManager
logicalDeviceMgr *LogicalDeviceManager
adapterMgr *AdapterManager
- packetInQueue *queue.Queue
- changeEventQueue *queue.Queue
+ packetInQueue chan openflow_13.PacketIn
+ changeEventQueue chan openflow_13.ChangeEvent
+ packetInQueueDone chan bool
+ changeEventQueueDone chan bool
coreInCompetingMode bool
longRunningRequestTimeout int64
defaultRequestTimeout int64
@@ -62,10 +64,11 @@
coreInCompetingMode: core.config.InCompetingMode,
longRunningRequestTimeout: core.config.LongRunningRequestTimeout,
defaultRequestTimeout: core.config.DefaultRequestTimeout,
- // TODO: Figure out what the 'hint' parameter to queue.New does
- packetInQueue: queue.New(10),
- changeEventQueue: queue.New(10),
- core: core,
+ packetInQueue: make(chan openflow_13.PacketIn, 100),
+ changeEventQueue: make(chan openflow_13.ChangeEvent, 100),
+ packetInQueueDone: make(chan bool, 1),
+ changeEventQueueDone: make(chan bool, 1),
+ core: core,
}
return handler
}
@@ -805,31 +808,79 @@
// TODO: Augment the OF PacketIn to include the transactionId
packetIn := openflow_13.PacketIn{Id: deviceId, PacketIn: packet}
log.Debugw("sendPacketIn", log.Fields{"packetIn": packetIn})
- // Enqueue the packet
- if err := handler.packetInQueue.Put(packetIn); err != nil {
- log.Errorw("failed-to-enqueue-packet", log.Fields{"error": err})
+ handler.packetInQueue <- packetIn
+}
+
+type callTracker struct {
+ failedPacket interface{}
+}
+type streamTracker struct {
+ calls map[string]*callTracker
+ sync.Mutex
+}
+
+var streamingTracker = &streamTracker{calls: make(map[string]*callTracker)}
+
+func (handler *APIHandler) getStreamingTracker(method string, done chan<- bool) *callTracker {
+ streamingTracker.Lock()
+ defer streamingTracker.Unlock()
+ if _, ok := streamingTracker.calls[method]; ok {
+ // bail out the other packet in thread
+ log.Debugf("%s streaming call already running. Exiting it", method)
+ done <- true
+ log.Debugf("Last %s exited. Continuing ...", method)
+ } else {
+ streamingTracker.calls[method] = &callTracker{failedPacket: nil}
}
+ return streamingTracker.calls[method]
+}
+
+func (handler *APIHandler) flushFailedPackets(tracker *callTracker) error {
+ if tracker.failedPacket != nil {
+ switch tracker.failedPacket.(type) {
+ case openflow_13.PacketIn:
+ log.Debug("Enqueueing last failed packetIn")
+ handler.packetInQueue <- tracker.failedPacket.(openflow_13.PacketIn)
+ case openflow_13.ChangeEvent:
+ log.Debug("Enqueueing last failed changeEvent")
+ handler.changeEventQueue <- tracker.failedPacket.(openflow_13.ChangeEvent)
+ }
+ }
+ return nil
}
func (handler *APIHandler) ReceivePacketsIn(
empty *empty.Empty,
packetsIn voltha.VolthaService_ReceivePacketsInServer,
) error {
+ var streamingTracker = handler.getStreamingTracker("ReceivePacketsIn", handler.packetInQueueDone)
log.Debugw("ReceivePacketsIn-request", log.Fields{"packetsIn": packetsIn})
+ handler.flushFailedPackets(streamingTracker)
+
+loop:
for {
- // Dequeue a packet
- if packets, err := handler.packetInQueue.Get(1); err == nil {
- log.Debugw("dequeued-packet", log.Fields{"packet": packets[0]})
- if packet, ok := packets[0].(openflow_13.PacketIn); ok {
- log.Debugw("sending-packet-in", log.Fields{"packet": packet})
- if err := packetsIn.Send(&packet); err != nil {
- log.Errorw("failed-to-send-packet", log.Fields{"error": err})
+ select {
+ case packet := <-handler.packetInQueue:
+ log.Debugw("sending-packet-in", log.Fields{"packet": packet})
+ if err := packetsIn.Send(&packet); err != nil {
+ log.Errorw("failed-to-send-packet", log.Fields{"error": err})
+ // save the last failed packet in
+ streamingTracker.failedPacket = packet
+ } else {
+ if streamingTracker.failedPacket != nil {
+ // reset last failed packet saved to avoid flush
+ streamingTracker.failedPacket = nil
}
}
+ case <-handler.packetInQueueDone:
+ log.Debug("Another ReceivePacketsIn running. Bailing out ...")
+ break loop
}
}
- //TODO: FInd an elegant way to get out of the above loop when the Core is stopped
+
+ //TODO: Find an elegant way to get out of the above loop when the Core is stopped
+ return nil
}
func (handler *APIHandler) sendChangeEvent(deviceId string, portStatus *openflow_13.OfpPortStatus) {
@@ -838,29 +889,41 @@
//}
event := openflow_13.ChangeEvent{Id: deviceId, Event: &openflow_13.ChangeEvent_PortStatus{PortStatus: portStatus}}
log.Debugw("sendChangeEvent", log.Fields{"event": event})
- // Enqueue the change event
- if err := handler.changeEventQueue.Put(event); err != nil {
- log.Errorw("failed-to-enqueue-change-event", log.Fields{"error": err})
- }
+ handler.changeEventQueue <- event
}
func (handler *APIHandler) ReceiveChangeEvents(
empty *empty.Empty,
changeEvents voltha.VolthaService_ReceiveChangeEventsServer,
) error {
+ var streamingTracker = handler.getStreamingTracker("ReceiveChangeEvents", handler.changeEventQueueDone)
log.Debugw("ReceiveChangeEvents-request", log.Fields{"changeEvents": changeEvents})
+
+ handler.flushFailedPackets(streamingTracker)
+
+loop:
for {
+ select {
// Dequeue a change event
- if events, err := handler.changeEventQueue.Get(1); err == nil {
- log.Debugw("dequeued-change-event", log.Fields{"event": events[0]})
- if event, ok := events[0].(openflow_13.ChangeEvent); ok {
- log.Debugw("sending-change-event", log.Fields{"event": event})
- if err := changeEvents.Send(&event); err != nil {
- log.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+ case event := <-handler.changeEventQueue:
+ log.Debugw("sending-change-event", log.Fields{"event": event})
+ if err := changeEvents.Send(&event); err != nil {
+ log.Errorw("failed-to-send-change-event", log.Fields{"error": err})
+ // save last failed changeevent
+ streamingTracker.failedPacket = event
+ } else {
+ if streamingTracker.failedPacket != nil {
+ // reset last failed event saved on success to avoid flushing
+ streamingTracker.failedPacket = nil
}
}
+ case <-handler.changeEventQueueDone:
+ log.Debug("Another ReceiveChangeEvents already running. Bailing out ...")
+ break loop
}
}
+
+ return nil
}
func (handler *APIHandler) Subscribe(
diff --git a/vendor/github.com/golang-collections/go-datastructures/LICENSE b/vendor/github.com/golang-collections/go-datastructures/LICENSE
deleted file mode 100644
index 7a4a3ea..0000000
--- a/vendor/github.com/golang-collections/go-datastructures/LICENSE
+++ /dev/null
@@ -1,202 +0,0 @@
-
- Apache License
- Version 2.0, January 2004
- http://www.apache.org/licenses/
-
- TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
-
- 1. Definitions.
-
- "License" shall mean the terms and conditions for use, reproduction,
- and distribution as defined by Sections 1 through 9 of this document.
-
- "Licensor" shall mean the copyright owner or entity authorized by
- the copyright owner that is granting the License.
-
- "Legal Entity" shall mean the union of the acting entity and all
- other entities that control, are controlled by, or are under common
- control with that entity. For the purposes of this definition,
- "control" means (i) the power, direct or indirect, to cause the
- direction or management of such entity, whether by contract or
- otherwise, or (ii) ownership of fifty percent (50%) or more of the
- outstanding shares, or (iii) beneficial ownership of such entity.
-
- "You" (or "Your") shall mean an individual or Legal Entity
- exercising permissions granted by this License.
-
- "Source" form shall mean the preferred form for making modifications,
- including but not limited to software source code, documentation
- source, and configuration files.
-
- "Object" form shall mean any form resulting from mechanical
- transformation or translation of a Source form, including but
- not limited to compiled object code, generated documentation,
- and conversions to other media types.
-
- "Work" shall mean the work of authorship, whether in Source or
- Object form, made available under the License, as indicated by a
- copyright notice that is included in or attached to the work
- (an example is provided in the Appendix below).
-
- "Derivative Works" shall mean any work, whether in Source or Object
- form, that is based on (or derived from) the Work and for which the
- editorial revisions, annotations, elaborations, or other modifications
- represent, as a whole, an original work of authorship. For the purposes
- of this License, Derivative Works shall not include works that remain
- separable from, or merely link (or bind by name) to the interfaces of,
- the Work and Derivative Works thereof.
-
- "Contribution" shall mean any work of authorship, including
- the original version of the Work and any modifications or additions
- to that Work or Derivative Works thereof, that is intentionally
- submitted to Licensor for inclusion in the Work by the copyright owner
- or by an individual or Legal Entity authorized to submit on behalf of
- the copyright owner. For the purposes of this definition, "submitted"
- means any form of electronic, verbal, or written communication sent
- to the Licensor or its representatives, including but not limited to
- communication on electronic mailing lists, source code control systems,
- and issue tracking systems that are managed by, or on behalf of, the
- Licensor for the purpose of discussing and improving the Work, but
- excluding communication that is conspicuously marked or otherwise
- designated in writing by the copyright owner as "Not a Contribution."
-
- "Contributor" shall mean Licensor and any individual or Legal Entity
- on behalf of whom a Contribution has been received by Licensor and
- subsequently incorporated within the Work.
-
- 2. Grant of Copyright License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- copyright license to reproduce, prepare Derivative Works of,
- publicly display, publicly perform, sublicense, and distribute the
- Work and such Derivative Works in Source or Object form.
-
- 3. Grant of Patent License. Subject to the terms and conditions of
- this License, each Contributor hereby grants to You a perpetual,
- worldwide, non-exclusive, no-charge, royalty-free, irrevocable
- (except as stated in this section) patent license to make, have made,
- use, offer to sell, sell, import, and otherwise transfer the Work,
- where such license applies only to those patent claims licensable
- by such Contributor that are necessarily infringed by their
- Contribution(s) alone or by combination of their Contribution(s)
- with the Work to which such Contribution(s) was submitted. If You
- institute patent litigation against any entity (including a
- cross-claim or counterclaim in a lawsuit) alleging that the Work
- or a Contribution incorporated within the Work constitutes direct
- or contributory patent infringement, then any patent licenses
- granted to You under this License for that Work shall terminate
- as of the date such litigation is filed.
-
- 4. Redistribution. You may reproduce and distribute copies of the
- Work or Derivative Works thereof in any medium, with or without
- modifications, and in Source or Object form, provided that You
- meet the following conditions:
-
- (a) You must give any other recipients of the Work or
- Derivative Works a copy of this License; and
-
- (b) You must cause any modified files to carry prominent notices
- stating that You changed the files; and
-
- (c) You must retain, in the Source form of any Derivative Works
- that You distribute, all copyright, patent, trademark, and
- attribution notices from the Source form of the Work,
- excluding those notices that do not pertain to any part of
- the Derivative Works; and
-
- (d) If the Work includes a "NOTICE" text file as part of its
- distribution, then any Derivative Works that You distribute must
- include a readable copy of the attribution notices contained
- within such NOTICE file, excluding those notices that do not
- pertain to any part of the Derivative Works, in at least one
- of the following places: within a NOTICE text file distributed
- as part of the Derivative Works; within the Source form or
- documentation, if provided along with the Derivative Works; or,
- within a display generated by the Derivative Works, if and
- wherever such third-party notices normally appear. The contents
- of the NOTICE file are for informational purposes only and
- do not modify the License. You may add Your own attribution
- notices within Derivative Works that You distribute, alongside
- or as an addendum to the NOTICE text from the Work, provided
- that such additional attribution notices cannot be construed
- as modifying the License.
-
- You may add Your own copyright statement to Your modifications and
- may provide additional or different license terms and conditions
- for use, reproduction, or distribution of Your modifications, or
- for any such Derivative Works as a whole, provided Your use,
- reproduction, and distribution of the Work otherwise complies with
- the conditions stated in this License.
-
- 5. Submission of Contributions. Unless You explicitly state otherwise,
- any Contribution intentionally submitted for inclusion in the Work
- by You to the Licensor shall be under the terms and conditions of
- this License, without any additional terms or conditions.
- Notwithstanding the above, nothing herein shall supersede or modify
- the terms of any separate license agreement you may have executed
- with Licensor regarding such Contributions.
-
- 6. Trademarks. This License does not grant permission to use the trade
- names, trademarks, service marks, or product names of the Licensor,
- except as required for reasonable and customary use in describing the
- origin of the Work and reproducing the content of the NOTICE file.
-
- 7. Disclaimer of Warranty. Unless required by applicable law or
- agreed to in writing, Licensor provides the Work (and each
- Contributor provides its Contributions) on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- implied, including, without limitation, any warranties or conditions
- of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
- PARTICULAR PURPOSE. You are solely responsible for determining the
- appropriateness of using or redistributing the Work and assume any
- risks associated with Your exercise of permissions under this License.
-
- 8. Limitation of Liability. In no event and under no legal theory,
- whether in tort (including negligence), contract, or otherwise,
- unless required by applicable law (such as deliberate and grossly
- negligent acts) or agreed to in writing, shall any Contributor be
- liable to You for damages, including any direct, indirect, special,
- incidental, or consequential damages of any character arising as a
- result of this License or out of the use or inability to use the
- Work (including but not limited to damages for loss of goodwill,
- work stoppage, computer failure or malfunction, or any and all
- other commercial damages or losses), even if such Contributor
- has been advised of the possibility of such damages.
-
- 9. Accepting Warranty or Additional Liability. While redistributing
- the Work or Derivative Works thereof, You may choose to offer,
- and charge a fee for, acceptance of support, warranty, indemnity,
- or other liability obligations and/or rights consistent with this
- License. However, in accepting such obligations, You may act only
- on Your own behalf and on Your sole responsibility, not on behalf
- of any other Contributor, and only if You agree to indemnify,
- defend, and hold each Contributor harmless for any liability
- incurred by, or claims asserted against, such Contributor by reason
- of your accepting any such warranty or additional liability.
-
- END OF TERMS AND CONDITIONS
-
- APPENDIX: How to apply the Apache License to your work.
-
- To apply the Apache License to your work, attach the following
- boilerplate notice, with the fields enclosed by brackets "[]"
- replaced with your own identifying information. (Don't include
- the brackets!) The text should be enclosed in the appropriate
- comment syntax for the file format. We also recommend that a
- file or class name and description of purpose be included on the
- same "printed page" as the copyright notice for easier
- identification within third-party archives.
-
- Copyright [yyyy] [name of copyright owner]
-
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
\ No newline at end of file
diff --git a/vendor/github.com/golang-collections/go-datastructures/queue/error.go b/vendor/github.com/golang-collections/go-datastructures/queue/error.go
deleted file mode 100644
index 29c062c..0000000
--- a/vendor/github.com/golang-collections/go-datastructures/queue/error.go
+++ /dev/null
@@ -1,21 +0,0 @@
-/*
-Copyright 2014 Workiva, LLC
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-package queue
-
-import "errors"
-
-var disposedError = errors.New(`Queue has been disposed.`)
diff --git a/vendor/github.com/golang-collections/go-datastructures/queue/priority_queue.go b/vendor/github.com/golang-collections/go-datastructures/queue/priority_queue.go
deleted file mode 100644
index 3ccfd57..0000000
--- a/vendor/github.com/golang-collections/go-datastructures/queue/priority_queue.go
+++ /dev/null
@@ -1,235 +0,0 @@
-/*
-Copyright 2014 Workiva, LLC
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-/*
-The priority queue is almost a spitting image of the logic
-used for a regular queue. In order to keep the logic fast,
-this code is repeated instead of using casts to cast to interface{}
-back and forth. If Go had inheritance and generics, this problem
-would be easier to solve.
-*/
-package queue
-
-import (
- "sort"
- "sync"
-)
-
-// Item is an item that can be added to the priority queue.
-type Item interface {
- // Compare returns a bool that can be used to determine
- // ordering in the priority queue. Assuming the queue
- // is in ascending order, this should return > logic.
- // Return 1 to indicate this object is greater than the
- // the other logic, 0 to indicate equality, and -1 to indicate
- // less than other.
- Compare(other Item) int
-}
-
-type priorityItems []Item
-
-func (items *priorityItems) get(number int) []Item {
- returnItems := make([]Item, 0, number)
- index := 0
- for i := 0; i < number; i++ {
- if i >= len(*items) {
- break
- }
-
- returnItems = append(returnItems, (*items)[i])
- (*items)[i] = nil
- index++
- }
-
- *items = (*items)[index:]
- return returnItems
-}
-
-func (items *priorityItems) insert(item Item) {
- if len(*items) == 0 {
- *items = append(*items, item)
- return
- }
-
- equalFound := false
- i := sort.Search(len(*items), func(i int) bool {
- result := (*items)[i].Compare(item)
- if result == 0 {
- equalFound = true
- }
- return result >= 0
- })
-
- if equalFound {
- return
- }
-
- if i == len(*items) {
- *items = append(*items, item)
- return
- }
-
- *items = append(*items, nil)
- copy((*items)[i+1:], (*items)[i:])
- (*items)[i] = item
-}
-
-// PriorityQueue is similar to queue except that it takes
-// items that implement the Item interface and adds them
-// to the queue in priority order.
-type PriorityQueue struct {
- waiters waiters
- items priorityItems
- lock sync.Mutex
- disposeLock sync.Mutex
- disposed bool
-}
-
-// Put adds items to the queue.
-func (pq *PriorityQueue) Put(items ...Item) error {
- if len(items) == 0 {
- return nil
- }
-
- pq.lock.Lock()
- if pq.disposed {
- pq.lock.Unlock()
- return disposedError
- }
-
- for _, item := range items {
- pq.items.insert(item)
- }
-
- for {
- sema := pq.waiters.get()
- if sema == nil {
- break
- }
-
- sema.response.Add(1)
- sema.wg.Done()
- sema.response.Wait()
- if len(pq.items) == 0 {
- break
- }
- }
-
- pq.lock.Unlock()
- return nil
-}
-
-// Get retrieves items from the queue. If the queue is empty,
-// this call blocks until the next item is added to the queue. This
-// will attempt to retrieve number of items.
-func (pq *PriorityQueue) Get(number int) ([]Item, error) {
- if number < 1 {
- return nil, nil
- }
-
- pq.lock.Lock()
-
- if pq.disposed {
- pq.lock.Unlock()
- return nil, disposedError
- }
-
- var items []Item
-
- if len(pq.items) == 0 {
- sema := newSema()
- pq.waiters.put(sema)
- sema.wg.Add(1)
- pq.lock.Unlock()
-
- sema.wg.Wait()
- pq.disposeLock.Lock()
- if pq.disposed {
- pq.disposeLock.Unlock()
- return nil, disposedError
- }
- pq.disposeLock.Unlock()
-
- items = pq.items.get(number)
- sema.response.Done()
- return items, nil
- }
-
- items = pq.items.get(number)
- pq.lock.Unlock()
- return items, nil
-}
-
-// Peek will look at the next item without removing it from the queue.
-func (pq *PriorityQueue) Peek() Item {
- pq.lock.Lock()
- defer pq.lock.Unlock()
- if len(pq.items) > 0 {
- return pq.items[0]
- }
- return nil
-}
-
-// Empty returns a bool indicating if there are any items left
-// in the queue.
-func (pq *PriorityQueue) Empty() bool {
- pq.lock.Lock()
- defer pq.lock.Unlock()
-
- return len(pq.items) == 0
-}
-
-// Len returns a number indicating how many items are in the queue.
-func (pq *PriorityQueue) Len() int {
- pq.lock.Lock()
- defer pq.lock.Unlock()
-
- return len(pq.items)
-}
-
-// Disposed returns a bool indicating if this queue has been disposed.
-func (pq *PriorityQueue) Disposed() bool {
- pq.lock.Lock()
- defer pq.lock.Unlock()
-
- return pq.disposed
-}
-
-// Dispose will prevent any further reads/writes to this queue
-// and frees available resources.
-func (pq *PriorityQueue) Dispose() {
- pq.lock.Lock()
- defer pq.lock.Unlock()
-
- pq.disposeLock.Lock()
- defer pq.disposeLock.Unlock()
-
- pq.disposed = true
- for _, waiter := range pq.waiters {
- waiter.response.Add(1)
- waiter.wg.Done()
- }
-
- pq.items = nil
- pq.waiters = nil
-}
-
-// NewPriorityQueue is the constructor for a priority queue.
-func NewPriorityQueue(hint int) *PriorityQueue {
- return &PriorityQueue{
- items: make(priorityItems, 0, hint),
- }
-}
diff --git a/vendor/github.com/golang-collections/go-datastructures/queue/queue.go b/vendor/github.com/golang-collections/go-datastructures/queue/queue.go
deleted file mode 100644
index 856ae3e..0000000
--- a/vendor/github.com/golang-collections/go-datastructures/queue/queue.go
+++ /dev/null
@@ -1,324 +0,0 @@
-/*
-Copyright 2014 Workiva, LLC
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-
-/*
-Package queue includes a regular queue and a priority queue.
-These queues rely on waitgroups to pause listening threads
-on empty queues until a message is received. If any thread
-calls Dispose on the queue, any listeners are immediately returned
-with an error. Any subsequent put to the queue will return an error
-as opposed to panicking as with channels. Queues will grow with unbounded
-behavior as opposed to channels which can be buffered but will pause
-while a thread attempts to put to a full channel.
-
-Recently added is a lockless ring buffer using the same basic C design as
-found here:
-
-http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
-
-Modified for use with Go with the addition of some dispose semantics providing
-the capability to release blocked threads. This works for both puts
-and gets, either will return an error if they are blocked and the buffer
-is disposed. This could serve as a signal to kill a goroutine. All threadsafety
-is acheived using CAS operations, making this buffer pretty quick.
-
-Benchmarks:
-BenchmarkPriorityQueue-8 2000000 782 ns/op
-BenchmarkQueue-8 2000000 671 ns/op
-BenchmarkChannel-8 1000000 2083 ns/op
-BenchmarkQueuePut-8 20000 84299 ns/op
-BenchmarkQueueGet-8 20000 80753 ns/op
-BenchmarkExecuteInParallel-8 20000 68891 ns/op
-BenchmarkRBLifeCycle-8 10000000 177 ns/op
-BenchmarkRBPut-8 30000000 58.1 ns/op
-BenchmarkRBGet-8 50000000 26.8 ns/op
-
-TODO: We really need a Fibonacci heap for the priority queue.
-TODO: Unify the types of queue to the same interface.
-*/
-package queue
-
-import (
- "runtime"
- "sync"
- "sync/atomic"
-)
-
-type waiters []*sema
-
-func (w *waiters) get() *sema {
- if len(*w) == 0 {
- return nil
- }
-
- sema := (*w)[0]
- copy((*w)[0:], (*w)[1:])
- (*w)[len(*w)-1] = nil // or the zero value of T
- *w = (*w)[:len(*w)-1]
- return sema
-}
-
-func (w *waiters) put(sema *sema) {
- *w = append(*w, sema)
-}
-
-type items []interface{}
-
-func (items *items) get(number int64) []interface{} {
- returnItems := make([]interface{}, 0, number)
- index := int64(0)
- for i := int64(0); i < number; i++ {
- if i >= int64(len(*items)) {
- break
- }
-
- returnItems = append(returnItems, (*items)[i])
- (*items)[i] = nil
- index++
- }
-
- *items = (*items)[index:]
- return returnItems
-}
-
-func (items *items) getUntil(checker func(item interface{}) bool) []interface{} {
- length := len(*items)
-
- if len(*items) == 0 {
- // returning nil here actually wraps that nil in a list
- // of interfaces... thanks go
- return []interface{}{}
- }
-
- returnItems := make([]interface{}, 0, length)
- index := 0
- for i, item := range *items {
- if !checker(item) {
- break
- }
-
- returnItems = append(returnItems, item)
- index = i
- }
-
- *items = (*items)[index:]
- return returnItems
-}
-
-type sema struct {
- wg *sync.WaitGroup
- response *sync.WaitGroup
-}
-
-func newSema() *sema {
- return &sema{
- wg: &sync.WaitGroup{},
- response: &sync.WaitGroup{},
- }
-}
-
-// Queue is the struct responsible for tracking the state
-// of the queue.
-type Queue struct {
- waiters waiters
- items items
- lock sync.Mutex
- disposed bool
-}
-
-// Put will add the specified items to the queue.
-func (q *Queue) Put(items ...interface{}) error {
- if len(items) == 0 {
- return nil
- }
-
- q.lock.Lock()
-
- if q.disposed {
- q.lock.Unlock()
- return disposedError
- }
-
- q.items = append(q.items, items...)
- for {
- sema := q.waiters.get()
- if sema == nil {
- break
- }
- sema.response.Add(1)
- sema.wg.Done()
- sema.response.Wait()
- if len(q.items) == 0 {
- break
- }
- }
-
- q.lock.Unlock()
- return nil
-}
-
-// Get will add an item to the queue. If there are some items in the
-// queue, get will return a number UP TO the number passed in as a
-// parameter. If no items are in the queue, this method will pause
-// until items are added to the queue.
-func (q *Queue) Get(number int64) ([]interface{}, error) {
- if number < 1 {
- // thanks again go
- return []interface{}{}, nil
- }
-
- q.lock.Lock()
-
- if q.disposed {
- q.lock.Unlock()
- return nil, disposedError
- }
-
- var items []interface{}
-
- if len(q.items) == 0 {
- sema := newSema()
- q.waiters.put(sema)
- sema.wg.Add(1)
- q.lock.Unlock()
-
- sema.wg.Wait()
- // we are now inside the put's lock
- if q.disposed {
- return nil, disposedError
- }
- items = q.items.get(number)
- sema.response.Done()
- return items, nil
- }
-
- items = q.items.get(number)
- q.lock.Unlock()
- return items, nil
-}
-
-// TakeUntil takes a function and returns a list of items that
-// match the checker until the checker returns false. This does not
-// wait if there are no items in the queue.
-func (q *Queue) TakeUntil(checker func(item interface{}) bool) ([]interface{}, error) {
- if checker == nil {
- return nil, nil
- }
-
- q.lock.Lock()
-
- if q.disposed {
- q.lock.Unlock()
- return nil, disposedError
- }
-
- result := q.items.getUntil(checker)
- q.lock.Unlock()
- return result, nil
-}
-
-// Empty returns a bool indicating if this bool is empty.
-func (q *Queue) Empty() bool {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return len(q.items) == 0
-}
-
-// Len returns the number of items in this queue.
-func (q *Queue) Len() int64 {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return int64(len(q.items))
-}
-
-// Disposed returns a bool indicating if this queue
-// has had disposed called on it.
-func (q *Queue) Disposed() bool {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- return q.disposed
-}
-
-// Dispose will dispose of this queue. Any subsequent
-// calls to Get or Put will return an error.
-func (q *Queue) Dispose() {
- q.lock.Lock()
- defer q.lock.Unlock()
-
- q.disposed = true
- for _, waiter := range q.waiters {
- waiter.response.Add(1)
- waiter.wg.Done()
- }
-
- q.items = nil
- q.waiters = nil
-}
-
-// New is a constructor for a new threadsafe queue.
-func New(hint int64) *Queue {
- return &Queue{
- items: make([]interface{}, 0, hint),
- }
-}
-
-// ExecuteInParallel will (in parallel) call the provided function
-// with each item in the queue until the queue is exhausted. When the queue
-// is exhausted execution is complete and all goroutines will be killed.
-// This means that the queue will be disposed so cannot be used again.
-func ExecuteInParallel(q *Queue, fn func(interface{})) {
- if q == nil {
- return
- }
-
- q.lock.Lock() // so no one touches anything in the middle
- // of this process
- todo, done := uint64(len(q.items)), int64(-1)
- // this is important or we might face an infinite loop
- if todo == 0 {
- return
- }
-
- numCPU := 1
- if runtime.NumCPU() > 1 {
- numCPU = runtime.NumCPU() - 1
- }
-
- var wg sync.WaitGroup
- wg.Add(numCPU)
- items := q.items
-
- for i := 0; i < numCPU; i++ {
- go func() {
- for {
- index := atomic.AddInt64(&done, 1)
- if index >= int64(todo) {
- wg.Done()
- break
- }
-
- fn(items[index])
- items[index] = 0
- }
- }()
- }
- wg.Wait()
- q.lock.Unlock()
- q.Dispose()
-}
diff --git a/vendor/github.com/golang-collections/go-datastructures/queue/ring.go b/vendor/github.com/golang-collections/go-datastructures/queue/ring.go
deleted file mode 100644
index 9c137a9..0000000
--- a/vendor/github.com/golang-collections/go-datastructures/queue/ring.go
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
-Copyright 2014 Workiva, LLC
-
-Licensed under the Apache License, Version 2.0 (the "License");
-you may not use this file except in compliance with the License.
-You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
-Unless required by applicable law or agreed to in writing, software
-distributed under the License is distributed on an "AS IS" BASIS,
-WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-See the License for the specific language governing permissions and
-limitations under the License.
-*/
-package queue
-
-import (
- "runtime"
- "sync/atomic"
-)
-
-// roundUp takes a uint64 greater than 0 and rounds it up to the next
-// power of 2.
-func roundUp(v uint64) uint64 {
- v--
- v |= v >> 1
- v |= v >> 2
- v |= v >> 4
- v |= v >> 8
- v |= v >> 16
- v |= v >> 32
- v++
- return v
-}
-
-type node struct {
- position uint64
- data interface{}
-}
-
-type nodes []*node
-
-// RingBuffer is a MPMC buffer that achieves threadsafety with CAS operations
-// only. A put on full or get on empty call will block until an item
-// is put or retrieved. Calling Dispose on the RingBuffer will unblock
-// any blocked threads with an error. This buffer is similar to the buffer
-// described here: http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
-// with some minor additions.
-type RingBuffer struct {
- nodes nodes
- queue, dequeue, mask, disposed uint64
-}
-
-func (rb *RingBuffer) init(size uint64) {
- size = roundUp(size)
- rb.nodes = make(nodes, size)
- for i := uint64(0); i < size; i++ {
- rb.nodes[i] = &node{position: i}
- }
- rb.mask = size - 1 // so we don't have to do this with every put/get operation
-}
-
-// Put adds the provided item to the queue. If the queue is full, this
-// call will block until an item is added to the queue or Dispose is called
-// on the queue. An error will be returned if the queue is disposed.
-func (rb *RingBuffer) Put(item interface{}) error {
- var n *node
- pos := atomic.LoadUint64(&rb.queue)
-L:
- for {
- if atomic.LoadUint64(&rb.disposed) == 1 {
- return disposedError
- }
-
- n = rb.nodes[pos&rb.mask]
- seq := atomic.LoadUint64(&n.position)
- switch dif := seq - pos; {
- case dif == 0:
- if atomic.CompareAndSwapUint64(&rb.queue, pos, pos+1) {
- break L
- }
- case dif < 0:
- panic(`Ring buffer in a compromised state during a put operation.`)
- default:
- pos = atomic.LoadUint64(&rb.queue)
- }
- runtime.Gosched() // free up the cpu before the next iteration
- }
-
- n.data = item
- atomic.StoreUint64(&n.position, pos+1)
- return nil
-}
-
-// Get will return the next item in the queue. This call will block
-// if the queue is empty. This call will unblock when an item is added
-// to the queue or Dispose is called on the queue. An error will be returned
-// if the queue is disposed.
-func (rb *RingBuffer) Get() (interface{}, error) {
- var n *node
- pos := atomic.LoadUint64(&rb.dequeue)
-L:
- for {
- if atomic.LoadUint64(&rb.disposed) == 1 {
- return nil, disposedError
- }
-
- n = rb.nodes[pos&rb.mask]
- seq := atomic.LoadUint64(&n.position)
- switch dif := seq - (pos + 1); {
- case dif == 0:
- if atomic.CompareAndSwapUint64(&rb.dequeue, pos, pos+1) {
- break L
- }
- case dif < 0:
- panic(`Ring buffer in compromised state during a get operation.`)
- default:
- pos = atomic.LoadUint64(&rb.dequeue)
- }
- runtime.Gosched() // free up cpu before next iteration
- }
- data := n.data
- n.data = nil
- atomic.StoreUint64(&n.position, pos+rb.mask+1)
- return data, nil
-}
-
-// Len returns the number of items in the queue.
-func (rb *RingBuffer) Len() uint64 {
- return atomic.LoadUint64(&rb.queue) - atomic.LoadUint64(&rb.dequeue)
-}
-
-// Cap returns the capacity of this ring buffer.
-func (rb *RingBuffer) Cap() uint64 {
- return uint64(len(rb.nodes))
-}
-
-// Dispose will dispose of this queue and free any blocked threads
-// in the Put and/or Get methods. Calling those methods on a disposed
-// queue will return an error.
-func (rb *RingBuffer) Dispose() {
- atomic.CompareAndSwapUint64(&rb.disposed, 0, 1)
-}
-
-// IsDisposed will return a bool indicating if this queue has been
-// disposed.
-func (rb *RingBuffer) IsDisposed() bool {
- return atomic.LoadUint64(&rb.disposed) == 1
-}
-
-// NewRingBuffer will allocate, initialize, and return a ring buffer
-// with the specified size.
-func NewRingBuffer(size uint64) *RingBuffer {
- rb := &RingBuffer{}
- rb.init(size)
- return rb
-}