VOL-1691 Fix openolt adapter getting stuck while registartion with core

Change-Id: I2e1635b4245fcc0059f5b0a601fb7a0ab9ada1c0
diff --git a/kafka/client.go b/kafka/client.go
old mode 100644
new mode 100755
index 3d37f6e..36c1ede
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -16,8 +16,9 @@
 package kafka
 
 import (
-	ca "github.com/opencord/voltha-protos/go/inter_container"
 	"time"
+
+	ca "github.com/opencord/voltha-protos/go/inter_container"
 )
 
 const (
@@ -53,6 +54,7 @@
 	DefaultNumberPartitions         = 3
 	DefaultNumberReplicas           = 1
 	DefaultAutoCreateTopic          = false
+	DefaultMetadataMaxRetry         = 3
 )
 
 // MsgClient represents the set of APIs  a Kafka MsgClient must implement
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
old mode 100644
new mode 100755
index e920a83..0576da9
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -18,15 +18,16 @@
 import (
 	"errors"
 	"fmt"
+	"strings"
+	"sync"
+	"time"
+
 	scc "github.com/bsm/sarama-cluster"
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	ic "github.com/opencord/voltha-protos/go/inter_container"
 	"gopkg.in/Shopify/sarama.v1"
-	"strings"
-	"sync"
-	"time"
 )
 
 func init() {
@@ -73,6 +74,7 @@
 	lockTopicToConsumerChannelMap sync.RWMutex
 	topicLockMap                  map[string]*sync.RWMutex
 	lockOfTopicLockMap            sync.RWMutex
+	metadataMaxRetry              int
 }
 
 type SaramaClientOption func(*SaramaClient)
@@ -179,6 +181,12 @@
 	}
 }
 
+func MetadatMaxRetries(retry int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.metadataMaxRetry = retry
+	}
+}
+
 func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
 	client := &SaramaClient{
 		KafkaHost: DefaultKafkaHost,
@@ -197,6 +205,7 @@
 	client.numPartitions = DefaultNumberPartitions
 	client.numReplicas = DefaultNumberReplicas
 	client.autoCreateTopic = DefaultAutoCreateTopic
+	client.metadataMaxRetry = DefaultMetadataMaxRetry
 
 	for _, option := range opts {
 		option(client)
@@ -679,6 +688,7 @@
 	config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
 	config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	config.Metadata.Retry.Max = sc.metadataMaxRetry
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}