[VOL-2694] Use package specific logger instance in all log statements
Change-Id: Icf1cb5ade42e42179aed7731b767af2f52481e3d
diff --git a/tests/kafka/common.go b/tests/kafka/common.go
new file mode 100644
index 0000000..ef4d06a
--- /dev/null
+++ b/tests/kafka/common.go
@@ -0,0 +1,33 @@
+// +build integration
+
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "kafka"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/tests/kafka/kafka_client_test.go b/tests/kafka/kafka_client_test.go
index a553283..cdc2827 100644
--- a/tests/kafka/kafka_client_test.go
+++ b/tests/kafka/kafka_client_test.go
@@ -45,10 +45,7 @@
type sendToKafka func(interface{}, *kk.Topic, ...string) error
func init() {
- log.AddPackage(log.JSON, log.ErrorLevel, nil)
hostIP := os.Getenv("DOCKER_HOST_IP")
- log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
- log.SetAllLogLevel(log.ErrorLevel)
partionClient = kk.NewSaramaClient(
kk.ConsumerType(kk.PartitionConsumer),
kk.Host(hostIP),
@@ -78,7 +75,7 @@
mytime = time.Now()
}
totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
- //log.Debugw("msg-received", log.Fields{"msg":msg})
+ //logger.Debugw("msg-received", log.Fields{"msg":msg})
totalMessageReceived = totalMessageReceived + 1
if totalMessageReceived == maxMessages {
doneCh <- "All received"
@@ -89,7 +86,7 @@
}
}
}
- log.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
+ logger.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
}
func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
@@ -107,7 +104,7 @@
var err error
body := &ic.InterContainerRequestBody{Rpc: "testRPC", Args: []*ic.Argument{}}
if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
- log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+ logger.Warnw("cannot-marshal-request", log.Fields{"error": err})
return err
}
msg.Body = marshalledArg
@@ -157,7 +154,7 @@
assert.Nil(t, err)
partionClient.Stop()
assert.Equal(t, numMessageToSend, totalMessageReceived)
- log.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+ logger.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
}
func TestGroupConsumer(t *testing.T) {
@@ -171,7 +168,7 @@
assert.Nil(t, err)
groupClient.Stop()
assert.Equal(t, numMessageToSend, totalMessageReceived)
- log.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+ logger.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
}
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 7a7a9d6..9836793 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -48,9 +48,6 @@
var kafkaClient kk.Client
func init() {
- log.AddPackage(log.JSON, log.ErrorLevel, nil)
- log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
- log.SetAllLogLevel(log.ErrorLevel)
affinityRouterTopic = "AffinityRouter"
hostIP = os.Getenv("DOCKER_HOST_IP")
kafkaClient = kk.NewSaramaClient(
@@ -91,7 +88,7 @@
func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
for msg := range ch {
- log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
+ logger.Debugw("Got-RPC-message", log.Fields{"msg": msg})
// Unpack message
requestBody := &ic.InterContainerRequestBody{}
if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
@@ -178,12 +175,12 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -201,12 +198,12 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -228,11 +225,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
@@ -253,11 +250,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -276,11 +273,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
@@ -299,11 +296,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
@@ -326,11 +323,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Ports{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
expectedLen := len(unpackResult.Items) >= 1
assert.Equal(t, true, expectedLen)
@@ -349,12 +346,12 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -393,7 +390,7 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
@@ -432,7 +429,7 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
@@ -461,11 +458,11 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
unpackResult := &ic.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
- log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
@@ -495,7 +492,7 @@
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, TEST_RPC_KEY, args...)
elapsed := time.Since(start)
- log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ logger.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
@@ -508,7 +505,7 @@
}
msg := <-ch
- log.Debugw("msg-received", log.Fields{"msg": msg})
+ logger.Debugw("msg-received", log.Fields{"msg": msg})
waitingChannel <- msg
return nil
}
@@ -543,7 +540,7 @@
assert.Equal(t, dd.DeviceType, "TestDevicetype")
assert.Equal(t, dd.ParentId, "TestParentId")
assert.Equal(t, dd.Publisher, "myPODName")
- log.Debugw("TotalTime", log.Fields{"time": totalTime})
+ logger.Debugw("TotalTime", log.Fields{"time": totalTime})
}
func TestStopKafkaProxy(t *testing.T) {
@@ -552,5 +549,5 @@
}
//func TestMain(m *testing.T) {
-// log.Info("Main")
+// logger.Info("Main")
//}