VOL-2859 Added test cases for RequestQueue.

Also moved RequestQueue into its own file.

Change-Id: Iadac5fcbaeb9e7e18b74bd7d3c04a5ec6a397ada
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index aacb8ae..798b4d8 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -17,9 +17,7 @@
 package utils
 
 import (
-	"context"
 	"os"
-	"sync"
 	"time"
 
 	"google.golang.org/grpc/codes"
@@ -44,97 +42,6 @@
 	return os.Getenv("HOSTNAME")
 }
 
-type request struct {
-	prev, next       *request
-	notifyOnComplete chan<- struct{}
-}
-
-// RequestQueue represents a request processing queue where each request is processed to completion before another
-// request is given the green light to proceed.
-type RequestQueue struct {
-	mutex sync.Mutex
-
-	last, current  *request
-	lastCompleteCh <-chan struct{}
-}
-
-// NewRequestQueue creates a new request queue
-func NewRequestQueue() *RequestQueue {
-	ch := make(chan struct{})
-	close(ch) // assume the "current" request is already complete
-	return &RequestQueue{lastCompleteCh: ch}
-}
-
-// WaitForGreenLight is invoked by a function processing a request to receive the green light before
-// proceeding.  The caller can also provide a context with timeout.  The timeout will be triggered if the wait is
-// too long (previous requests taking too long)
-func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
-	// add ourselves to the end of the queue
-	rq.mutex.Lock()
-	waitingOn := rq.lastCompleteCh
-
-	ch := make(chan struct{})
-	rq.lastCompleteCh = ch
-	r := &request{notifyOnComplete: ch}
-
-	if rq.last != nil {
-		rq.last.next, r.prev = r, rq.last
-	}
-	rq.last = r
-	rq.mutex.Unlock()
-
-	// wait for our turn
-	select {
-	case <-ctx.Done():
-		// canceled, so cleanup
-		rq.mutex.Lock()
-		defer rq.mutex.Unlock()
-
-		select {
-		case <-waitingOn:
-			// chan has been closed, so the lock has been acquired
-			// context is canceled, so just release the lock immediately
-			rq.current = r
-			rq.releaseWithoutLock()
-		default:
-			// on abort, skip our position in the queue
-			r.prev.notifyOnComplete = r.notifyOnComplete
-			// and remove ourselves from the queue
-			if r.next != nil { // if we are somewhere in the middle of the queue
-				r.prev.next = r.next
-				r.next.prev = r.prev
-			} else { // if we are at the end of the queue
-				rq.last = r.prev
-				r.prev.next = nil
-			}
-		}
-		return ctx.Err()
-
-	case <-waitingOn:
-		// lock is acquired
-		rq.current = r
-		return nil
-	}
-}
-
-// RequestComplete must be invoked by a process when it completes processing the request.  That process must have
-// invoked WaitForGreenLight() before.
-func (rq *RequestQueue) RequestComplete() {
-	rq.mutex.Lock()
-	defer rq.mutex.Unlock()
-
-	rq.releaseWithoutLock()
-}
-
-func (rq *RequestQueue) releaseWithoutLock() {
-	// Notify the next waiting request.  This will panic if the lock is released more than once.
-	close(rq.current.notifyOnComplete)
-
-	if rq.current.next != nil {
-		rq.current.next.prev = nil
-	}
-}
-
 // Response -
 type Response struct {
 	*response
diff --git a/rw_core/utils/request_queue.go b/rw_core/utils/request_queue.go
new file mode 100644
index 0000000..2c95e23
--- /dev/null
+++ b/rw_core/utils/request_queue.go
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+	"context"
+	"sync"
+)
+
+type request struct {
+	prev, next       *request
+	notifyOnComplete chan<- struct{}
+}
+
+// RequestQueue represents a request processing queue where each request is processed to completion before another
+// request is given the green light to proceed.
+type RequestQueue struct {
+	mutex sync.Mutex
+
+	last, current  *request
+	lastCompleteCh <-chan struct{}
+}
+
+// NewRequestQueue creates a new request queue
+func NewRequestQueue() *RequestQueue {
+	ch := make(chan struct{})
+	close(ch) // assume the "current" request is already complete
+	return &RequestQueue{lastCompleteCh: ch}
+}
+
+// WaitForGreenLight is invoked by a function processing a request to receive the green light before
+// proceeding.  The caller can also provide a context with timeout.  The timeout will be triggered if the wait is
+// too long (previous requests taking too long)
+func (rq *RequestQueue) WaitForGreenLight(ctx context.Context) error {
+	// add ourselves to the end of the queue
+	rq.mutex.Lock()
+	waitingOn := rq.lastCompleteCh
+
+	ch := make(chan struct{})
+	rq.lastCompleteCh = ch
+	r := &request{notifyOnComplete: ch}
+
+	if rq.last != nil {
+		rq.last.next, r.prev = r, rq.last
+	}
+	rq.last = r
+	rq.mutex.Unlock()
+
+	// wait for our turn
+	select {
+	case <-ctx.Done():
+		// canceled, so cleanup
+		rq.mutex.Lock()
+		defer rq.mutex.Unlock()
+
+		select {
+		case <-waitingOn:
+			// chan has been closed, so the lock has been acquired
+			// context is canceled, so just release the lock immediately
+			rq.current = r
+			rq.releaseWithoutLock()
+		default:
+			// on abort, skip our position in the queue
+			r.prev.notifyOnComplete = r.notifyOnComplete
+			// and remove ourselves from the queue
+			if r.next != nil { // if we are somewhere in the middle of the queue
+				r.prev.next = r.next
+				r.next.prev = r.prev
+			} else { // if we are at the end of the queue
+				rq.last = r.prev
+				r.prev.next = nil
+			}
+		}
+		return ctx.Err()
+
+	case <-waitingOn:
+		// lock is acquired
+		rq.current = r
+		return nil
+	}
+}
+
+// RequestComplete must be invoked by a process when it completes processing the request.  That process must have
+// invoked WaitForGreenLight() before.
+func (rq *RequestQueue) RequestComplete() {
+	rq.mutex.Lock()
+	defer rq.mutex.Unlock()
+
+	rq.releaseWithoutLock()
+}
+
+func (rq *RequestQueue) releaseWithoutLock() {
+	// Notify the next waiting request.  This will panic if the lock is released more than once.
+	close(rq.current.notifyOnComplete)
+
+	if rq.current.next != nil {
+		rq.current.next.prev = nil
+	}
+}
diff --git a/rw_core/utils/request_queue_test.go b/rw_core/utils/request_queue_test.go
new file mode 100644
index 0000000..007d375
--- /dev/null
+++ b/rw_core/utils/request_queue_test.go
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package utils
+
+import (
+	"context"
+	"sync"
+	"testing"
+	"time"
+)
+
+func TestRequestQueueOrdering(t *testing.T) {
+	rq := NewRequestQueue()
+	// acquire lock immediately, so our requests will queue up
+	if err := rq.WaitForGreenLight(context.Background()); err != nil {
+		t.Error(err)
+		return
+	}
+
+	doneOrder := make([]int, 0, 10)
+
+	wg := sync.WaitGroup{}
+	wg.Add(10)
+
+	// queue up 10 requests
+	for i := 0; i < 10; i++ {
+		go func(i int) {
+			if err := rq.WaitForGreenLight(context.Background()); err != nil {
+				t.Error(err)
+			}
+			doneOrder = append(doneOrder, i)
+			rq.RequestComplete()
+
+			wg.Done()
+		}(i)
+
+		// ensure that the last request is queued before starting the next one
+		time.Sleep(time.Millisecond)
+	}
+
+	// complete the first process
+	rq.RequestComplete()
+
+	wg.Wait()
+
+	// verify that the processes completed in the correct order
+	for i := 0; i < 10; i++ {
+		if doneOrder[i] != i {
+			t.Errorf("Thread %d executed at time %d, should have been %d", doneOrder[i], i, doneOrder[i])
+		}
+	}
+}
+
+func TestRequestQueueCancellation(t *testing.T) {
+	rq := NewRequestQueue()
+	// acquire lock immediately, so our requests will queue up
+	if err := rq.WaitForGreenLight(context.Background()); err != nil {
+		t.Error(err)
+		return
+	}
+
+	wg := sync.WaitGroup{}
+	wg.Add(10)
+
+	willCancelContext, cancel := context.WithCancel(context.Background())
+
+	// queue up 10 requests
+	for i := 0; i < 10; i++ {
+		go func(i int) {
+			// will cancel processes 0, 1, 4, 5, 8, 9
+			willCancel := (i/2)%2 == 0
+
+			ctx := context.Background()
+			if willCancel {
+				ctx = willCancelContext
+			}
+
+			if err := rq.WaitForGreenLight(ctx); err != nil {
+				if !willCancel || err != context.Canceled {
+					t.Errorf("wait gave unexpected error %s", err)
+				} //else cancellation was expected
+			} else {
+				if willCancel {
+					t.Error("this should have been canceled")
+				} //else completed as expected
+				rq.RequestComplete()
+			}
+			wg.Done()
+		}(i)
+	}
+
+	// cancel processes
+	cancel()
+
+	// wait a moment for the cancellations to go through
+	time.Sleep(time.Millisecond)
+
+	// release the lock, and allow the processes to complete
+	rq.RequestComplete()
+
+	// wait for all processes to complete
+	wg.Wait()
+}