VOL-2859 Added test cases for RequestQueue.
Also moved RequestQueue into its own file.
Change-Id: Iadac5fcbaeb9e7e18b74bd7d3c04a5ec6a397ada
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index aacb8ae..798b4d8 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -17,9 +17,7 @@
package utils
import (
- "context"
"os"
- "sync"
"time"
"google.golang.org/grpc/codes"
@@ -44,97 +42,6 @@
return os.Getenv("HOSTNAME")
}
-type request struct {
- prev, next *request
- notifyOnComplete chan<- struct{}
-}
-
-// RequestQueue represents a request processing queue where each request is processed to completion before another
-// request is given the green light to proceed.
-type RequestQueue struct {
- mutex sync.Mutex
-
- last, current *request
- lastCompleteCh <-chan struct{}
-}
-
-// NewRequestQueue creates a new request queue
-func NewRequestQueue() *RequestQueue {
- ch := make(chan struct{})
- close(ch) // assume the "current" request is already complete
- return &RequestQueue{lastCompleteCh: ch}
-}
-
-// WaitForGreenLight is invoked by a function processing a request to receive the green light before
-// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
-// too long (previous requests taking too long)
-func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
- // add ourselves to the end of the queue
- rq.mutex.Lock()
- waitingOn := rq.lastCompleteCh
-
- ch := make(chan struct{})
- rq.lastCompleteCh = ch
- r := &request{notifyOnComplete: ch}
-
- if rq.last != nil {
- rq.last.next, r.prev = r, rq.last
- }
- rq.last = r
- rq.mutex.Unlock()
-
- // wait for our turn
- select {
- case <-ctx.Done():
- // canceled, so cleanup
- rq.mutex.Lock()
- defer rq.mutex.Unlock()
-
- select {
- case <-waitingOn:
- // chan has been closed, so the lock has been acquired
- // context is canceled, so just release the lock immediately
- rq.current = r
- rq.releaseWithoutLock()
- default:
- // on abort, skip our position in the queue
- r.prev.notifyOnComplete = r.notifyOnComplete
- // and remove ourselves from the queue
- if r.next != nil { // if we are somewhere in the middle of the queue
- r.prev.next = r.next
- r.next.prev = r.prev
- } else { // if we are at the end of the queue
- rq.last = r.prev
- r.prev.next = nil
- }
- }
- return ctx.Err()
-
- case <-waitingOn:
- // lock is acquired
- rq.current = r
- return nil
- }
-}
-
-// RequestComplete must be invoked by a process when it completes processing the request. That process must have
-// invoked WaitForGreenLight() before.
-func (rq *RequestQueue) RequestComplete() {
- rq.mutex.Lock()
- defer rq.mutex.Unlock()
-
- rq.releaseWithoutLock()
-}
-
-func (rq *RequestQueue) releaseWithoutLock() {
- // Notify the next waiting request. This will panic if the lock is released more than once.
- close(rq.current.notifyOnComplete)
-
- if rq.current.next != nil {
- rq.current.next.prev = nil
- }
-}
-
// Response -
type Response struct {
*response
diff --git a/rw_core/utils/request_queue.go b/rw_core/utils/request_queue.go
new file mode 100644
index 0000000..2c95e23
--- /dev/null
+++ b/rw_core/utils/request_queue.go
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "context"
+ "sync"
+)
+
+type request struct {
+ prev, next *request
+ notifyOnComplete chan<- struct{}
+}
+
+// RequestQueue represents a request processing queue where each request is processed to completion before another
+// request is given the green light to proceed.
+type RequestQueue struct {
+ mutex sync.Mutex
+
+ last, current *request
+ lastCompleteCh <-chan struct{}
+}
+
+// NewRequestQueue creates a new request queue
+func NewRequestQueue() *RequestQueue {
+ ch := make(chan struct{})
+ close(ch) // assume the "current" request is already complete
+ return &RequestQueue{lastCompleteCh: ch}
+}
+
+// WaitForGreenLight is invoked by a function processing a request to receive the green light before
+// proceeding. The caller can also provide a context with timeout. The timeout will be triggered if the wait is
+// too long (previous requests taking too long)
+func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
+ // add ourselves to the end of the queue
+ rq.mutex.Lock()
+ waitingOn := rq.lastCompleteCh
+
+ ch := make(chan struct{})
+ rq.lastCompleteCh = ch
+ r := &request{notifyOnComplete: ch}
+
+ if rq.last != nil {
+ rq.last.next, r.prev = r, rq.last
+ }
+ rq.last = r
+ rq.mutex.Unlock()
+
+ // wait for our turn
+ select {
+ case <-ctx.Done():
+ // canceled, so cleanup
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ select {
+ case <-waitingOn:
+ // chan has been closed, so the lock has been acquired
+ // context is canceled, so just release the lock immediately
+ rq.current = r
+ rq.releaseWithoutLock()
+ default:
+ // on abort, skip our position in the queue
+ r.prev.notifyOnComplete = r.notifyOnComplete
+ // and remove ourselves from the queue
+ if r.next != nil { // if we are somewhere in the middle of the queue
+ r.prev.next = r.next
+ r.next.prev = r.prev
+ } else { // if we are at the end of the queue
+ rq.last = r.prev
+ r.prev.next = nil
+ }
+ }
+ return ctx.Err()
+
+ case <-waitingOn:
+ // lock is acquired
+ rq.current = r
+ return nil
+ }
+}
+
+// RequestComplete must be invoked by a process when it completes processing the request. That process must have
+// invoked WaitForGreenLight() before.
+func (rq *RequestQueue) RequestComplete() {
+ rq.mutex.Lock()
+ defer rq.mutex.Unlock()
+
+ rq.releaseWithoutLock()
+}
+
+func (rq *RequestQueue) releaseWithoutLock() {
+ // Notify the next waiting request. This will panic if the lock is released more than once.
+ close(rq.current.notifyOnComplete)
+
+ if rq.current.next != nil {
+ rq.current.next.prev = nil
+ }
+}
diff --git a/rw_core/utils/request_queue_test.go b/rw_core/utils/request_queue_test.go
new file mode 100644
index 0000000..007d375
--- /dev/null
+++ b/rw_core/utils/request_queue_test.go
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+)
+
+func TestRequestQueueOrdering(t *testing.T) {
+ rq := NewRequestQueue()
+ // acquire lock immediately, so our requests will queue up
+ if err := rq.WaitForGreenLight(context.Background()); err != nil {
+ t.Error(err)
+ return
+ }
+
+ doneOrder := make([]int, 0, 10)
+
+ wg := sync.WaitGroup{}
+ wg.Add(10)
+
+ // queue up 10 requests
+ for i := 0; i < 10; i++ {
+ go func(i int) {
+ if err := rq.WaitForGreenLight(context.Background()); err != nil {
+ t.Error(err)
+ }
+ doneOrder = append(doneOrder, i)
+ rq.RequestComplete()
+
+ wg.Done()
+ }(i)
+
+ // ensure that the last request is queued before starting the next one
+ time.Sleep(time.Millisecond)
+ }
+
+ // complete the first process
+ rq.RequestComplete()
+
+ wg.Wait()
+
+ // verify that the processes completed in the correct order
+ for i := 0; i < 10; i++ {
+ if doneOrder[i] != i {
+ t.Errorf("Thread %d executed at time %d, should have been %d", doneOrder[i], i, doneOrder[i])
+ }
+ }
+}
+
+func TestRequestQueueCancellation(t *testing.T) {
+ rq := NewRequestQueue()
+ // acquire lock immediately, so our requests will queue up
+ if err := rq.WaitForGreenLight(context.Background()); err != nil {
+ t.Error(err)
+ return
+ }
+
+ wg := sync.WaitGroup{}
+ wg.Add(10)
+
+ willCancelContext, cancel := context.WithCancel(context.Background())
+
+ // queue up 10 requests
+ for i := 0; i < 10; i++ {
+ go func(i int) {
+ // will cancel processes 0, 1, 4, 5, 8, 9
+ willCancel := (i/2)%2 == 0
+
+ ctx := context.Background()
+ if willCancel {
+ ctx = willCancelContext
+ }
+
+ if err := rq.WaitForGreenLight(ctx); err != nil {
+ if !willCancel || err != context.Canceled {
+ t.Errorf("wait gave unexpected error %s", err)
+ } //else cancellation was expected
+ } else {
+ if willCancel {
+ t.Error("this should have been canceled")
+ } //else completed as expected
+ rq.RequestComplete()
+ }
+ wg.Done()
+ }(i)
+ }
+
+ // cancel processes
+ cancel()
+
+ // wait a moment for the cancellations to go through
+ time.Sleep(time.Millisecond)
+
+ // release the lock, and allow the processes to complete
+ rq.RequestComplete()
+
+ // wait for all processes to complete
+ wg.Wait()
+}