Fix for timestamps not coming in kafka message
Change-Id: Iab4b396f76d018a9b310b1aff833b8aa7659638a
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore/etcdclient.go
index 868b301..98f0559 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore/etcdclient.go
@@ -24,6 +24,7 @@
"github.com/opencord/voltha-lib-go/v4/pkg/log"
v3Client "go.etcd.io/etcd/clientv3"
+
v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
)
@@ -39,15 +40,10 @@
lockToMutexLock sync.Mutex
}
-// NewEtcdClient returns a new client for the Etcd KV store
-func NewEtcdClient(ctx context.Context, addr string, timeout time.Duration, level log.LogLevel) (*EtcdClient, error) {
- logconfig := log.ConstructZapConfig(log.JSON, level, log.Fields{})
-
- c, err := v3Client.New(v3Client.Config{
- Endpoints: []string{addr},
- DialTimeout: timeout,
- LogConfig: &logconfig,
- })
+// NewEtcdCustomClient returns a new client for the Etcd KV store allowing
+// the called to specify etcd client configuration
+func NewEtcdCustomClient(ctx context.Context, config *v3Client.Config) (*EtcdClient, error) {
+ c, err := v3Client.New(*config)
if err != nil {
logger.Error(ctx, err)
return nil, err
@@ -61,6 +57,18 @@
lockToSessionMap: lockSessionMap}, nil
}
+// NewEtcdClient returns a new client for the Etcd KV store
+func NewEtcdClient(ctx context.Context, addr string, timeout time.Duration, level log.LogLevel) (*EtcdClient, error) {
+ logconfig := log.ConstructZapConfig(log.JSON, level, log.Fields{})
+
+ return NewEtcdCustomClient(
+ ctx,
+ &v3Client.Config{
+ Endpoints: []string{addr},
+ DialTimeout: timeout,
+ LogConfig: &logconfig})
+}
+
// IsConnectionUp returns whether the connection to the Etcd KV store is up. If a timeout occurs then
// it is assumed the connection is down or unreachable.
func (c *EtcdClient) IsConnectionUp(ctx context.Context) bool {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/grpc/server.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/grpc/server.go
index ed2d5ec..ea4573f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/grpc/server.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/grpc/server.go
@@ -17,14 +17,16 @@
import (
"context"
+ "net"
+
grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
grpc_opentracing "github.com/grpc-ecosystem/go-grpc-middleware/tracing/opentracing"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
+ "google.golang.org/grpc/reflection"
"google.golang.org/grpc/status"
- "net"
)
/*
@@ -126,6 +128,7 @@
for _, service := range s.services {
service(s.gs)
}
+ reflection.Register(s.gs)
if err := s.gs.Serve(lis); err != nil {
logger.Fatalf(ctx, "failed to serve: %v\n", err)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go
index 1e4efae..cd6d27b 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/kafka/sarama_client.go
@@ -833,6 +833,7 @@
func (sc *SaramaClient) createPublisher(ctx context.Context) error {
// This Creates the publisher
config := sarama.NewConfig()
+ config.Version = sarama.V1_0_0_0
config.Producer.Partitioner = sarama.NewRandomPartitioner
config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
config.Producer.Flush.Messages = sc.producerFlushMessages
@@ -856,6 +857,7 @@
func (sc *SaramaClient) createConsumer(ctx context.Context) error {
config := sarama.NewConfig()
+ config.Version = sarama.V1_0_0_0
config.Consumer.Return.Errors = true
config.Consumer.Fetch.Min = 1
config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
@@ -877,6 +879,7 @@
// createGroupConsumer creates a consumers group
func (sc *SaramaClient) createGroupConsumer(ctx context.Context, topic *Topic, groupId string, initialOffset int64, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
+ config.Version = sarama.V1_0_0_0
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
config.Consumer.Group.Heartbeat.Interval, _ = time.ParseDuration("1s")