XVOL-1689 : ONU stays in DISCOVERED state
VOL-1586 : Possible race condition in openolt python adapter during onu discovery
1) gets Device in response of ChildDeviceDetected.
This avoids race and also removes the need for GetChildDevice.
2)Puts the Device Id into cache to use in future requests,
especially avoid the fail when calling GetChildDevice
in onuIndication because of race.
Change-Id: I60944a6ee0e2ffad80a31ef93f72b55b0b136284
diff --git a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
old mode 100644
new mode 100755
index e920a83..0576da9
--- a/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-go/kafka/sarama_client.go
@@ -18,15 +18,16 @@
import (
"errors"
"fmt"
+ "strings"
+ "sync"
+ "time"
+
scc "github.com/bsm/sarama-cluster"
"github.com/golang/protobuf/proto"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
ic "github.com/opencord/voltha-protos/go/inter_container"
"gopkg.in/Shopify/sarama.v1"
- "strings"
- "sync"
- "time"
)
func init() {
@@ -73,6 +74,7 @@
lockTopicToConsumerChannelMap sync.RWMutex
topicLockMap map[string]*sync.RWMutex
lockOfTopicLockMap sync.RWMutex
+ metadataMaxRetry int
}
type SaramaClientOption func(*SaramaClient)
@@ -179,6 +181,12 @@
}
}
+func MetadatMaxRetries(retry int) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.metadataMaxRetry = retry
+ }
+}
+
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
KafkaHost: DefaultKafkaHost,
@@ -197,6 +205,7 @@
client.numPartitions = DefaultNumberPartitions
client.numReplicas = DefaultNumberReplicas
client.autoCreateTopic = DefaultAutoCreateTopic
+ client.metadataMaxRetry = DefaultMetadataMaxRetry
for _, option := range opts {
option(client)
@@ -679,6 +688,7 @@
config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ config.Metadata.Retry.Max = sc.metadataMaxRetry
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}