Revert "[VOL-3069]Pass Context in methods which are performing logging and need the context"
This reverts commit 3c425fbeabed17ec8dad437678b4d105deaf2fbe.
Reason for revert: Merging higher-priority patches first.
Change-Id: Iaa03a5977357dcd86de358d76e90cc54cd6b1fa5
diff --git a/pkg/mocks/etcd/common.go b/pkg/mocks/etcd/common.go
index 63d4ab0..a45b4b2 100644
--- a/pkg/mocks/etcd/common.go
+++ b/pkg/mocks/etcd/common.go
@@ -19,12 +19,12 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-var logger log.CLogger
+var logger log.Logger
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
if err != nil {
panic(err)
}
diff --git a/pkg/mocks/etcd/etcd_server.go b/pkg/mocks/etcd/etcd_server.go
index 6113b3a..b4e201d 100644
--- a/pkg/mocks/etcd/etcd_server.go
+++ b/pkg/mocks/etcd/etcd_server.go
@@ -16,7 +16,6 @@
package etcd
import (
- "context"
"fmt"
"go.etcd.io/etcd/embed"
"net/url"
@@ -52,25 +51,25 @@
* :param localPersistentStorageDir: The name of a local directory which will hold the Etcd server data
* :param logLevel: One of debug, info, warn, error, panic, or fatal. Default 'info'.
*/
-func MKConfig(ctx context.Context, configName string, clientPort, peerPort int, localPersistentStorageDir string, logLevel string) *embed.Config {
+func MKConfig(configName string, clientPort, peerPort int, localPersistentStorageDir string, logLevel string) *embed.Config {
cfg := embed.NewConfig()
cfg.Name = configName
cfg.Dir = localPersistentStorageDir
cfg.Logger = "zap"
if !islogLevelValid(logLevel) {
- logger.Fatalf(ctx, "Invalid log level -%s", logLevel)
+ logger.Fatalf("Invalid log level -%s", logLevel)
}
cfg.LogLevel = logLevel
acurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", clientPort))
if err != nil {
- logger.Fatalf(ctx, "Invalid client port -%d", clientPort)
+ logger.Fatalf("Invalid client port -%d", clientPort)
}
cfg.ACUrls = []url.URL{*acurl}
cfg.LCUrls = []url.URL{*acurl}
apurl, err := url.Parse(fmt.Sprintf("http://localhost:%d", peerPort))
if err != nil {
- logger.Fatalf(ctx, "Invalid peer port -%d", peerPort)
+ logger.Fatalf("Invalid peer port -%d", peerPort)
}
cfg.LPUrls = []url.URL{*apurl}
cfg.APUrls = []url.URL{*apurl}
@@ -92,7 +91,7 @@
//StartEtcdServer creates and starts an embedded Etcd server. A local directory to store data is created for the
//embedded server lifetime (for the duration of a unit test. The server runs at localhost:2379.
-func StartEtcdServer(ctx context.Context, cfg *embed.Config) *EtcdServer {
+func StartEtcdServer(cfg *embed.Config) *EtcdServer {
// If the server is already running, just return
if cfg == nil {
cfg = getDefaultCfg()
@@ -100,35 +99,35 @@
// Remove the local directory as
// a safeguard for the case where a prior test failed
if err := os.RemoveAll(cfg.Dir); err != nil {
- logger.Fatalf(ctx, "Failure removing local directory %s", cfg.Dir)
+ logger.Fatalf("Failure removing local directory %s", cfg.Dir)
}
e, err := embed.StartEtcd(cfg)
if err != nil {
- logger.Fatal(ctx, err)
+ logger.Fatal(err)
}
select {
case <-e.Server.ReadyNotify():
- logger.Debug(ctx, "Embedded Etcd server is ready!")
+ logger.Debug("Embedded Etcd server is ready!")
case <-time.After(serverStartUpTimeout):
e.Server.HardStop() // trigger a shutdown
e.Close()
- logger.Fatal(ctx, "Embedded Etcd server took too long to start!")
+ logger.Fatal("Embedded Etcd server took too long to start!")
case err := <-e.Err():
e.Server.HardStop() // trigger a shutdown
e.Close()
- logger.Fatalf(ctx, "Embedded Etcd server errored out - %s", err)
+ logger.Fatalf("Embedded Etcd server errored out - %s", err)
}
return &EtcdServer{server: e}
}
//Stop closes the embedded Etcd server and removes the local data directory as well
-func (es *EtcdServer) Stop(ctx context.Context) {
+func (es *EtcdServer) Stop() {
if es != nil {
storage := es.server.Config().Dir
es.server.Server.HardStop()
es.server.Close()
if err := os.RemoveAll(storage); err != nil {
- logger.Fatalf(ctx, "Failure removing local directory %s", es.server.Config().Dir)
+ logger.Fatalf("Failure removing local directory %s", es.server.Config().Dir)
}
}
}
diff --git a/pkg/mocks/etcd/etcd_server_test.go b/pkg/mocks/etcd/etcd_server_test.go
index b26b262..43c7a42 100644
--- a/pkg/mocks/etcd/etcd_server_test.go
+++ b/pkg/mocks/etcd/etcd_server_test.go
@@ -32,24 +32,23 @@
var client *kvstore.EtcdClient
func setup() {
- ctx := context.Background()
clientPort, err := freeport.GetFreePort()
if err != nil {
- logger.Fatal(ctx, err)
+ logger.Fatal(err)
}
peerPort, err := freeport.GetFreePort()
if err != nil {
- logger.Fatal(ctx, err)
+ logger.Fatal(err)
}
- etcdServer = StartEtcdServer(ctx, MKConfig(ctx, "voltha.mock.test", clientPort, peerPort, "voltha.lib.mocks.etcd", "error"))
+ etcdServer = StartEtcdServer(MKConfig("voltha.mock.test", clientPort, peerPort, "voltha.lib.mocks.etcd", "error"))
if etcdServer == nil {
- logger.Fatal(ctx, "Embedded server failed to start")
+ logger.Fatal("Embedded server failed to start")
}
clientAddr := fmt.Sprintf("localhost:%d", clientPort)
- client, err = kvstore.NewEtcdClient(ctx, clientAddr, 10*time.Second, log.WarnLevel)
+ client, err = kvstore.NewEtcdClient(clientAddr, 10*time.Second, log.WarnLevel)
if err != nil || client == nil {
- etcdServer.Stop(ctx)
- logger.Fatal(ctx, "Failed to create an Etcd client")
+ etcdServer.Stop()
+ logger.Fatal("Failed to create an Etcd client")
}
}
@@ -78,10 +77,10 @@
func shutdown() {
if client != nil {
- client.Close(context.Background())
+ client.Close()
}
if etcdServer != nil {
- etcdServer.Stop(context.Background())
+ etcdServer.Stop()
}
}
diff --git a/pkg/mocks/kafka/common.go b/pkg/mocks/kafka/common.go
index e980b05..05bc5f9 100644
--- a/pkg/mocks/kafka/common.go
+++ b/pkg/mocks/kafka/common.go
@@ -19,12 +19,12 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
)
-var logger log.CLogger
+var logger log.Logger
func init() {
// Setup this package so that it's log level can be modified at run time
var err error
- logger, err = log.RegisterPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "mocks"})
if err != nil {
panic(err)
}
diff --git a/pkg/mocks/kafka/endpoint_manager.go b/pkg/mocks/kafka/endpoint_manager.go
index 8b8e7f6..fedbebf 100644
--- a/pkg/mocks/kafka/endpoint_manager.go
+++ b/pkg/mocks/kafka/endpoint_manager.go
@@ -17,7 +17,6 @@
package kafka
import (
- "context"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
)
@@ -28,16 +27,16 @@
return mock
}
-func (em *EndpointManager) GetEndpoint(ctx context.Context, deviceID string, serviceType string) (kafka.Endpoint, error) {
+func (em *EndpointManager) GetEndpoint(deviceID string, serviceType string) (kafka.Endpoint, error) {
// TODO add mocks call and args
return kafka.Endpoint(serviceType), nil
}
-func (em *EndpointManager) IsDeviceOwnedByService(ctx context.Context, deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+func (em *EndpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
// TODO add mocks call and args
return true, nil
}
-func (em *EndpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, serviceType string) (kafka.ReplicaID, error) {
+func (em *EndpointManager) GetReplicaAssignment(deviceID string, serviceType string) (kafka.ReplicaID, error) {
return kafka.ReplicaID(1), nil
}
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index 9d6f50c..7c5508b 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -16,7 +16,6 @@
package kafka
import (
- "context"
"fmt"
"sync"
"time"
@@ -43,12 +42,12 @@
}
}
-func (kc *KafkaClient) Start(ctx context.Context) error {
- logger.Debug(ctx, "kafka-client-started")
+func (kc *KafkaClient) Start() error {
+ logger.Debug("kafka-client-started")
return nil
}
-func (kc *KafkaClient) Stop(ctx context.Context) {
+func (kc *KafkaClient) Stop() {
kc.lock.Lock()
defer kc.lock.Unlock()
for topic, chnls := range kc.topicsChannelMap {
@@ -57,11 +56,11 @@
}
delete(kc.topicsChannelMap, topic)
}
- logger.Debug(ctx, "kafka-client-stopped")
+ logger.Debug("kafka-client-stopped")
}
-func (kc *KafkaClient) CreateTopic(ctx context.Context, topic *kafka.Topic, numPartition int, repFactor int) error {
- logger.Debugw(ctx, "CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
+func (kc *KafkaClient) CreateTopic(topic *kafka.Topic, numPartition int, repFactor int) error {
+ logger.Debugw("CreatingTopic", log.Fields{"topic": topic.Name, "numPartition": numPartition, "replicationFactor": repFactor})
kc.lock.Lock()
defer kc.lock.Unlock()
if _, ok := kc.topicsChannelMap[topic.Name]; ok {
@@ -72,16 +71,16 @@
return nil
}
-func (kc *KafkaClient) DeleteTopic(ctx context.Context, topic *kafka.Topic) error {
- logger.Debugw(ctx, "DeleteTopic", log.Fields{"topic": topic.Name})
+func (kc *KafkaClient) DeleteTopic(topic *kafka.Topic) error {
+ logger.Debugw("DeleteTopic", log.Fields{"topic": topic.Name})
kc.lock.Lock()
defer kc.lock.Unlock()
delete(kc.topicsChannelMap, topic.Name)
return nil
}
-func (kc *KafkaClient) Subscribe(ctx context.Context, topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
- logger.Debugw(ctx, "Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
+func (kc *KafkaClient) Subscribe(topic *kafka.Topic, kvArgs ...*kafka.KVArg) (<-chan *ic.InterContainerMessage, error) {
+ logger.Debugw("Subscribe", log.Fields{"topic": topic.Name, "args": kvArgs})
kc.lock.Lock()
defer kc.lock.Unlock()
ch := make(chan *ic.InterContainerMessage)
@@ -94,8 +93,8 @@
return s[:len(s)-1]
}
-func (kc *KafkaClient) UnSubscribe(ctx context.Context, topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
- logger.Debugw(ctx, "UnSubscribe", log.Fields{"topic": topic.Name})
+func (kc *KafkaClient) UnSubscribe(topic *kafka.Topic, ch <-chan *ic.InterContainerMessage) error {
+ logger.Debugw("UnSubscribe", log.Fields{"topic": topic.Name})
kc.lock.Lock()
defer kc.lock.Unlock()
if chnls, ok := kc.topicsChannelMap[topic.Name]; ok {
@@ -113,11 +112,11 @@
return nil
}
-func (kc *KafkaClient) SubscribeForMetadata(ctx context.Context, _ func(fromTopic string, timestamp time.Time)) {
- logger.Debug(ctx, "SubscribeForMetadata - unimplemented")
+func (kc *KafkaClient) SubscribeForMetadata(_ func(fromTopic string, timestamp time.Time)) {
+ logger.Debug("SubscribeForMetadata - unimplemented")
}
-func (kc *KafkaClient) Send(ctx context.Context, msg interface{}, topic *kafka.Topic, keys ...string) error {
+func (kc *KafkaClient) Send(msg interface{}, topic *kafka.Topic, keys ...string) error {
req, ok := msg.(*ic.InterContainerMessage)
if !ok {
return status.Error(codes.InvalidArgument, "msg-not-InterContainerMessage-type")
@@ -128,22 +127,22 @@
kc.lock.RLock()
defer kc.lock.RUnlock()
for _, ch := range kc.topicsChannelMap[topic.Name] {
- logger.Debugw(ctx, "Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
+ logger.Debugw("Publishing", log.Fields{"fromTopic": req.Header.FromTopic, "toTopic": topic.Name, "id": req.Header.Id})
ch <- req
}
return nil
}
-func (kc *KafkaClient) SendLiveness(ctx context.Context) error {
+func (kc *KafkaClient) SendLiveness() error {
return status.Error(codes.Unimplemented, "SendLiveness")
}
-func (kc *KafkaClient) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
- logger.Debug(ctx, "EnableLivenessChannel - unimplemented")
+func (kc *KafkaClient) EnableLivenessChannel(enable bool) chan bool {
+ logger.Debug("EnableLivenessChannel - unimplemented")
return nil
}
-func (kc *KafkaClient) EnableHealthinessChannel(ctx context.Context, enable bool) chan bool {
- logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
+func (kc *KafkaClient) EnableHealthinessChannel(enable bool) chan bool {
+ logger.Debug("EnableHealthinessChannel - unimplemented")
return nil
}
diff --git a/pkg/mocks/kafka/kafka_client_test.go b/pkg/mocks/kafka/kafka_client_test.go
index 7753d66..0e35ec1 100644
--- a/pkg/mocks/kafka/kafka_client_test.go
+++ b/pkg/mocks/kafka/kafka_client_test.go
@@ -17,35 +17,34 @@
package kafka
import (
- "context"
+ "testing"
+ "time"
+
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
"github.com/stretchr/testify/assert"
- "testing"
- "time"
)
func TestKafkaClientCreateTopic(t *testing.T) {
- ctx := context.Background()
cTkc := NewKafkaClient()
topic := kafka.Topic{Name: "myTopic"}
- err := cTkc.CreateTopic(ctx, &topic, 1, 1)
+ err := cTkc.CreateTopic(&topic, 1, 1)
assert.Nil(t, err)
- err = cTkc.CreateTopic(ctx, &topic, 1, 1)
+ err = cTkc.CreateTopic(&topic, 1, 1)
assert.NotNil(t, err)
}
func TestKafkaClientDeleteTopic(t *testing.T) {
cTkc := NewKafkaClient()
topic := kafka.Topic{Name: "myTopic"}
- err := cTkc.DeleteTopic(context.Background(), &topic)
+ err := cTkc.DeleteTopic(&topic)
assert.Nil(t, err)
}
func TestKafkaClientSubscribeSend(t *testing.T) {
cTkc := NewKafkaClient()
topic := kafka.Topic{Name: "myTopic"}
- ch, err := cTkc.Subscribe(context.Background(), &topic)
+ ch, err := cTkc.Subscribe(&topic)
assert.Nil(t, err)
assert.NotNil(t, ch)
testCh := make(chan bool)
@@ -66,7 +65,7 @@
testCh <- false
}
}()
- err = cTkc.Send(context.Background(), msg, &topic)
+ err = cTkc.Send(msg, &topic)
assert.Nil(t, err)
res := <-testCh
assert.True(t, res)
@@ -75,20 +74,20 @@
func TestKafkaClientUnSubscribe(t *testing.T) {
cTkc := NewKafkaClient()
topic := kafka.Topic{Name: "myTopic"}
- ch, err := cTkc.Subscribe(context.Background(), &topic)
+ ch, err := cTkc.Subscribe(&topic)
assert.Nil(t, err)
assert.NotNil(t, ch)
- err = cTkc.UnSubscribe(context.Background(), &topic, ch)
+ err = cTkc.UnSubscribe(&topic, ch)
assert.Nil(t, err)
}
func TestKafkaClientStop(t *testing.T) {
cTkc := NewKafkaClient()
topic := kafka.Topic{Name: "myTopic"}
- ch, err := cTkc.Subscribe(context.Background(), &topic)
+ ch, err := cTkc.Subscribe(&topic)
assert.Nil(t, err)
assert.NotNil(t, ch)
- err = cTkc.UnSubscribe(context.Background(), &topic, ch)
+ err = cTkc.UnSubscribe(&topic, ch)
assert.Nil(t, err)
- cTkc.Stop(context.Background())
+ cTkc.Stop()
}
diff --git a/pkg/mocks/kafka/kafka_inter_container_proxy.go b/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 2a1b5a1..34aec95 100644
--- a/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -53,18 +53,18 @@
InvokeRpcSpy InvokeRpcSpy
}
-func (s *MockKafkaICProxy) Start(ctx context.Context) error { return nil }
+func (s *MockKafkaICProxy) Start() error { return nil }
func (s *MockKafkaICProxy) GetDefaultTopic() *kafka.Topic {
t := kafka.Topic{
Name: "test-topic",
}
return &t
}
-func (s *MockKafkaICProxy) DeleteTopic(ctx context.Context, topic kafka.Topic) error { return nil }
-func (s *MockKafkaICProxy) DeviceDiscovered(ctx context.Context, deviceId string, deviceType string, parentId string, publisher string) error {
+func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
return nil
}
-func (s *MockKafkaICProxy) Stop(ctx context.Context) {}
+func (s *MockKafkaICProxy) Stop() {}
func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
waitForResponse bool, key string, kvArgs ...*kafka.KVArg) chan *kafka.RpcResponse {
@@ -122,16 +122,12 @@
return success, &response
}
-func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(ctx context.Context, topic kafka.Topic, handler interface{}) error {
+func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
return nil
}
-func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(ctx context.Context, topic kafka.Topic, initialOffset int64) error {
+func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(topic kafka.Topic, initialOffset int64) error {
return nil
}
-func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(ctx context.Context, topic kafka.Topic) error {
- return nil
-}
-func (s *MockKafkaICProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
- return nil
-}
-func (s *MockKafkaICProxy) SendLiveness(ctx context.Context) error { return nil }
+func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) EnableLivenessChannel(enable bool) chan bool { return nil }
+func (s *MockKafkaICProxy) SendLiveness() error { return nil }