[VOL-3069]Pass Context down the execution call hierarchy across voltha codebase
Change-Id: I97a2630d9a4fe5dc3161113539edda476534f186
diff --git a/rw_core/utils/core_utils.go b/rw_core/utils/core_utils.go
index 798b4d8..141ab2b 100644
--- a/rw_core/utils/core_utils.go
+++ b/rw_core/utils/core_utils.go
@@ -17,6 +17,7 @@
package utils
import (
+ "context"
"os"
"time"
@@ -25,7 +26,7 @@
)
// ResponseCallback is the function signature for callbacks to execute after a response is received.
-type ResponseCallback func(rpc string, response interface{}, reqArgs ...interface{})
+type ResponseCallback func(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{})
// DeviceID represent device id attribute
type DeviceID struct {
@@ -38,7 +39,7 @@
}
// GetHostName returns host name
-func GetHostName() string {
+func GetHostName(ctx context.Context) string {
return os.Getenv("HOSTNAME")
}
@@ -53,7 +54,7 @@
}
// NewResponse -
-func NewResponse() Response {
+func NewResponse(ctx context.Context) Response {
return Response{
&response{
ch: make(chan struct{}),
@@ -62,7 +63,7 @@
}
// Fake a completed response.
-func DoneResponse() Response {
+func DoneResponse(ctx context.Context) Response {
r := Response{
&response{
err: nil,
@@ -75,7 +76,7 @@
}
// Error sends a response with the given error. It may only be called once.
-func (r Response) Error(err error) {
+func (r Response) Error(ctx context.Context, err error) {
// if this is called twice, it will panic; this is intentional
r.err = err
r.done = true
@@ -83,7 +84,7 @@
}
// Done sends a non-error response unless Error has already been called, in which case this is a no-op.
-func (r Response) Done() {
+func (r Response) Done(ctx context.Context) {
if !r.done {
close(r.ch)
}
@@ -94,7 +95,7 @@
//The error will be at the index corresponding to the order in which the channel appear in the parameter list.
//If no errors is found then nil is returned. This method also takes in a timeout in milliseconds. If a
//timeout is obtained then this function will stop waiting for the remaining responses and abort.
-func WaitForNilOrErrorResponses(timeout time.Duration, responses ...Response) []error {
+func WaitForNilOrErrorResponses(ctx context.Context, timeout time.Duration, responses ...Response) []error {
timedOut := make(chan struct{})
timer := time.AfterFunc(timeout, func() { close(timedOut) })
defer timer.Stop()
diff --git a/rw_core/utils/core_utils_test.go b/rw_core/utils/core_utils_test.go
index e55b38c..4cc11f6 100644
--- a/rw_core/utils/core_utils_test.go
+++ b/rw_core/utils/core_utils_test.go
@@ -16,13 +16,13 @@
package utils
import (
- "math/rand"
- "testing"
- "time"
-
+ "context"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "math/rand"
+ "testing"
+ "time"
)
var (
@@ -37,12 +37,12 @@
func runSuccessfulTask(response Response, durationRange int) {
time.Sleep(time.Duration(rand.Intn(durationRange)) * time.Millisecond)
- response.Done()
+ response.Done(context.Background())
}
func runFailureTask(response Response, durationRange int) {
time.Sleep(time.Duration(rand.Intn(durationRange)) * time.Millisecond)
- response.Error(taskFailureError)
+ response.Error(context.Background(), taskFailureError)
}
func runMultipleTasks(timeout time.Duration, numTasks, taskDurationRange, numSuccessfulTask, numFailuretask int) []error {
@@ -52,7 +52,7 @@
numSuccessfulTaskCreated := 0
responses := make([]Response, numTasks)
for i := 0; i < numTasks; i++ {
- responses[i] = NewResponse()
+ responses[i] = NewResponse(context.Background())
if numSuccessfulTaskCreated < numSuccessfulTask {
go runSuccessfulTask(responses[i], taskDurationRange)
numSuccessfulTaskCreated++
@@ -60,7 +60,7 @@
}
go runFailureTask(responses[i], taskDurationRange)
}
- return WaitForNilOrErrorResponses(timeout, responses...)
+ return WaitForNilOrErrorResponses(context.Background(), timeout, responses...)
}
func getNumSuccessFailure(inputs []error) (numSuccess, numFailure, numTimeout int) {
diff --git a/rw_core/utils/id.go b/rw_core/utils/id.go
index 862b909..259c831 100644
--- a/rw_core/utils/id.go
+++ b/rw_core/utils/id.go
@@ -17,6 +17,7 @@
package utils
import (
+ "context"
"errors"
"fmt"
"math/rand"
@@ -26,23 +27,23 @@
)
// CreateDeviceID produces a device ID. The device ID is a UUID
-func CreateDeviceID() string {
+func CreateDeviceID(ctx context.Context) string {
return uuid.New().String()
}
// CreateLogicalDeviceID produces a logical device ID. The logical device ID is a UUID
-func CreateLogicalDeviceID() string {
+func CreateLogicalDeviceID(ctx context.Context) string {
return uuid.New().String()
}
// CreateLogicalPortID produces a random port ID for a logical device.
-func CreateLogicalPortID() uint32 {
+func CreateLogicalPortID(ctx context.Context) uint32 {
// A logical port is a uint32
return rand.Uint32()
}
// CreateDataPathID creates uint64 pathid from string pathid
-func CreateDataPathID(idInHexString string) (uint64, error) {
+func CreateDataPathID(ctx context.Context, idInHexString string) (uint64, error) {
if idInHexString == "" {
return 0, errors.New("id-empty")
}
diff --git a/rw_core/utils/request_queue.go b/rw_core/utils/request_queue.go
index 2c95e23..1e92690 100644
--- a/rw_core/utils/request_queue.go
+++ b/rw_core/utils/request_queue.go
@@ -36,7 +36,7 @@
}
// NewRequestQueue creates a new request queue
-func NewRequestQueue() *RequestQueue {
+func NewRequestQueue(ctx context.Context) *RequestQueue {
ch := make(chan struct{})
close(ch) // assume the "current" request is already complete
return &RequestQueue{lastCompleteCh: ch}
@@ -72,7 +72,7 @@
// chan has been closed, so the lock has been acquired
// context is canceled, so just release the lock immediately
rq.current = r
- rq.releaseWithoutLock()
+ rq.releaseWithoutLock(ctx)
default:
// on abort, skip our position in the queue
r.prev.notifyOnComplete = r.notifyOnComplete
@@ -96,14 +96,14 @@
// RequestComplete must be invoked by a process when it completes processing the request. That process must have
// invoked WaitForGreenLight() before.
-func (rq *RequestQueue) RequestComplete() {
+func (rq *RequestQueue) RequestComplete(ctx context.Context) {
rq.mutex.Lock()
defer rq.mutex.Unlock()
- rq.releaseWithoutLock()
+ rq.releaseWithoutLock(ctx)
}
-func (rq *RequestQueue) releaseWithoutLock() {
+func (rq *RequestQueue) releaseWithoutLock(ctx context.Context) {
// Notify the next waiting request. This will panic if the lock is released more than once.
close(rq.current.notifyOnComplete)
diff --git a/rw_core/utils/request_queue_test.go b/rw_core/utils/request_queue_test.go
index 007d375..a7a20b7 100644
--- a/rw_core/utils/request_queue_test.go
+++ b/rw_core/utils/request_queue_test.go
@@ -24,7 +24,7 @@
)
func TestRequestQueueOrdering(t *testing.T) {
- rq := NewRequestQueue()
+ rq := NewRequestQueue(context.Background())
// acquire lock immediately, so our requests will queue up
if err := rq.WaitForGreenLight(context.Background()); err != nil {
t.Error(err)
@@ -43,7 +43,7 @@
t.Error(err)
}
doneOrder = append(doneOrder, i)
- rq.RequestComplete()
+ rq.RequestComplete(context.Background())
wg.Done()
}(i)
@@ -53,7 +53,7 @@
}
// complete the first process
- rq.RequestComplete()
+ rq.RequestComplete(context.Background())
wg.Wait()
@@ -66,7 +66,7 @@
}
func TestRequestQueueCancellation(t *testing.T) {
- rq := NewRequestQueue()
+ rq := NewRequestQueue(context.Background())
// acquire lock immediately, so our requests will queue up
if err := rq.WaitForGreenLight(context.Background()); err != nil {
t.Error(err)
@@ -97,7 +97,7 @@
if willCancel {
t.Error("this should have been canceled")
} //else completed as expected
- rq.RequestComplete()
+ rq.RequestComplete(context.Background())
}
wg.Done()
}(i)
@@ -110,7 +110,7 @@
time.Sleep(time.Millisecond)
// release the lock, and allow the processes to complete
- rq.RequestComplete()
+ rq.RequestComplete(context.Background())
// wait for all processes to complete
wg.Wait()