VOL-1691 Fix openolt adapter getting stuck while registartion with core
Change-Id: I2e1635b4245fcc0059f5b0a601fb7a0ab9ada1c0
diff --git a/kafka/client.go b/kafka/client.go
old mode 100644
new mode 100755
index 3d37f6e..36c1ede
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -16,8 +16,9 @@
package kafka
import (
- ca "github.com/opencord/voltha-protos/go/inter_container"
"time"
+
+ ca "github.com/opencord/voltha-protos/go/inter_container"
)
const (
@@ -53,6 +54,7 @@
DefaultNumberPartitions = 3
DefaultNumberReplicas = 1
DefaultAutoCreateTopic = false
+ DefaultMetadataMaxRetry = 3
)
// MsgClient represents the set of APIs a Kafka MsgClient must implement
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
old mode 100644
new mode 100755
index e920a83..0576da9
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -18,15 +18,16 @@
import (
"errors"
"fmt"
+ "strings"
+ "sync"
+ "time"
+
scc "github.com/bsm/sarama-cluster"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
ic "github.com/opencord/voltha-protos/go/inter_container"
"gopkg.in/Shopify/sarama.v1"
- "strings"
- "sync"
- "time"
)
func init() {
@@ -73,6 +74,7 @@
lockTopicToConsumerChannelMap sync.RWMutex
topicLockMap map[string]*sync.RWMutex
lockOfTopicLockMap sync.RWMutex
+ metadataMaxRetry int
}
type SaramaClientOption func(*SaramaClient)
@@ -179,6 +181,12 @@
}
}
+func MetadatMaxRetries(retry int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.metadataMaxRetry = retry
+ }
+}
+
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
KafkaHost: DefaultKafkaHost,
@@ -197,6 +205,7 @@
client.numPartitions = DefaultNumberPartitions
client.numReplicas = DefaultNumberReplicas
client.autoCreateTopic = DefaultAutoCreateTopic
+ client.metadataMaxRetry = DefaultMetadataMaxRetry
for _, option := range opts {
option(client)
@@ -679,6 +688,7 @@
config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ config.Metadata.Retry.Max = sc.metadataMaxRetry
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}