[VOL-3187]Pass Context down the execution call hierarchy across voltha-go codebase
Change-Id: I6bc2a0f7226c1beed4ae01a15d7b5c4dc04358d8
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index b44bc96..d0bd26e 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -78,18 +78,18 @@
coreKafkaProxy.Start()
adapterKafkaProxy.Start()
- subscribeTarget(coreKafkaProxy)
+ subscribeTarget(context.Background(), coreKafkaProxy)
}
-func subscribeTarget(kmp *kk.InterContainerProxy) {
+func subscribeTarget(ctx context.Context, kmp *kk.InterContainerProxy) {
topic := kk.Topic{Name: "Core"}
requestProxy := &api.AdapterRequestHandlerProxy{TestMode: true}
- kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
+ kmp.SubscribeWithRequestHandlerInterface(ctx, topic, requestProxy)
}
-func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
+func waitForRPCMessage(ctx context.Context, topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
for msg := range ch {
- logger.Debugw("Got-RPC-message", log.Fields{"msg": msg})
+ logger.Debugw(ctx, "Got-RPC-message", log.Fields{"msg": msg})
// Unpack message
requestBody := &ic.InterContainerRequestBody{}
if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
@@ -176,12 +176,12 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -199,12 +199,12 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -226,11 +226,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
@@ -251,11 +251,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -274,11 +274,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
@@ -297,11 +297,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
@@ -324,11 +324,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Ports{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
expectedLen := len(unpackResult.Items) >= 1
assert.Equal(t, true, expectedLen)
@@ -347,12 +347,12 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -391,7 +391,7 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
@@ -430,7 +430,7 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
@@ -459,11 +459,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -493,7 +493,7 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow(ctx, "Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
@@ -506,7 +506,7 @@
}
msg := <-ch
- logger.Debugw("msg-received", log.Fields{"msg": msg})
+ logger.Debugw(ctx, "msg-received", log.Fields{"msg": msg})
waitingChannel <- msg
return nil
}
@@ -541,7 +541,7 @@
assert.Equal(t, dd.DeviceType, "TestDevicetype")
assert.Equal(t, dd.ParentId, "TestParentId")
assert.Equal(t, dd.Publisher, "myPODName")
- logger.Debugw("TotalTime", log.Fields{"time": totalTime})
+ logger.Debugw(ctx, "TotalTime", log.Fields{"time": totalTime})
}
func TestStopKafkaProxy(t *testing.T) {
@@ -550,5 +550,5 @@
}
//func TestMain(m *testing.T) {
-// logger.Info("Main")
+// logger.Info(ctx, "Main")
//}