VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/github.com/Shopify/sarama/client.go b/vendor/github.com/Shopify/sarama/client.go
index 0016f8f..c4c54b2 100644
--- a/vendor/github.com/Shopify/sarama/client.go
+++ b/vendor/github.com/Shopify/sarama/client.go
@@ -46,6 +46,10 @@
// the partition leader.
InSyncReplicas(topic string, partitionID int32) ([]int32, error)
+ // OfflineReplicas returns the set of all offline replica IDs for the given
+ // partition. Offline replicas are replicas which are offline
+ OfflineReplicas(topic string, partitionID int32) ([]int32, error)
+
// RefreshMetadata takes a list of topics and queries the cluster to refresh the
// available metadata for those topics. If no topics are provided, it will refresh
// metadata for all topics.
@@ -288,7 +292,8 @@
partitions = client.cachedPartitions(topic, allPartitions)
}
- if partitions == nil {
+ // no partitions found after refresh metadata
+ if len(partitions) == 0 {
return nil, ErrUnknownTopicOrPartition
}
@@ -373,6 +378,31 @@
return dupInt32Slice(metadata.Isr), nil
}
+func (client *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
+ if client.Closed() {
+ return nil, ErrClosedClient
+ }
+
+ metadata := client.cachedMetadata(topic, partitionID)
+
+ if metadata == nil {
+ err := client.RefreshMetadata(topic)
+ if err != nil {
+ return nil, err
+ }
+ metadata = client.cachedMetadata(topic, partitionID)
+ }
+
+ if metadata == nil {
+ return nil, ErrUnknownTopicOrPartition
+ }
+
+ if metadata.Err == ErrReplicaNotAvailable {
+ return dupInt32Slice(metadata.OfflineReplicas), metadata.Err
+ }
+ return dupInt32Slice(metadata.OfflineReplicas), nil
+}
+
func (client *client) Leader(topic string, partitionID int32) (*Broker, error) {
if client.Closed() {
return nil, ErrClosedClient
@@ -405,7 +435,11 @@
}
}
- return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max)
+ deadline := time.Time{}
+ if client.conf.Metadata.Timeout > 0 {
+ deadline = time.Now().Add(client.conf.Metadata.Timeout)
+ }
+ return client.tryRefreshMetadata(topics, client.conf.Metadata.Retry.Max, deadline)
}
func (client *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
@@ -707,32 +741,47 @@
return nil
}
-func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int) error {
+func (client *client) tryRefreshMetadata(topics []string, attemptsRemaining int, deadline time.Time) error {
+ pastDeadline := func(backoff time.Duration) bool {
+ if !deadline.IsZero() && time.Now().Add(backoff).After(deadline) {
+ // we are past the deadline
+ return true
+ }
+ return false
+ }
retry := func(err error) error {
if attemptsRemaining > 0 {
backoff := client.computeBackoff(attemptsRemaining)
+ if pastDeadline(backoff) {
+ Logger.Println("client/metadata skipping last retries as we would go past the metadata timeout")
+ return err
+ }
Logger.Printf("client/metadata retrying after %dms... (%d attempts remaining)\n", client.conf.Metadata.Retry.Backoff/time.Millisecond, attemptsRemaining)
if backoff > 0 {
time.Sleep(backoff)
}
- return client.tryRefreshMetadata(topics, attemptsRemaining-1)
+ return client.tryRefreshMetadata(topics, attemptsRemaining-1, deadline)
}
return err
}
- for broker := client.any(); broker != nil; broker = client.any() {
+ broker := client.any()
+ for ; broker != nil && !pastDeadline(0); broker = client.any() {
+ allowAutoTopicCreation := true
if len(topics) > 0 {
Logger.Printf("client/metadata fetching metadata for %v from broker %s\n", topics, broker.addr)
} else {
+ allowAutoTopicCreation = false
Logger.Printf("client/metadata fetching metadata for all topics from broker %s\n", broker.addr)
}
- req := &MetadataRequest{Topics: topics}
- if client.conf.Version.IsAtLeast(V0_10_0_0) {
+ req := &MetadataRequest{Topics: topics, AllowAutoTopicCreation: allowAutoTopicCreation}
+ if client.conf.Version.IsAtLeast(V1_0_0_0) {
+ req.Version = 5
+ } else if client.conf.Version.IsAtLeast(V0_10_0_0) {
req.Version = 1
}
response, err := broker.GetMetadata(req)
-
switch err.(type) {
case nil:
allKnownMetaData := len(topics) == 0
@@ -747,6 +796,18 @@
case PacketEncodingError:
// didn't even send, return the error
return err
+
+ case KError:
+ // if SASL auth error return as this _should_ be a non retryable err for all brokers
+ if err.(KError) == ErrSASLAuthenticationFailed {
+ Logger.Println("client/metadata failed SASL authentication")
+ return err
+ }
+ // else remove that broker and try again
+ Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
+ _ = broker.Close()
+ client.deregisterBroker(broker)
+
default:
// some other error, remove that broker and try again
Logger.Printf("client/metadata got error from broker %d while fetching metadata: %v\n", broker.ID(), err)
@@ -755,6 +816,11 @@
}
}
+ if broker != nil {
+ Logger.Println("client/metadata not fetching metadata from broker %s as we would go past the metadata timeout\n", broker.addr)
+ return retry(ErrOutOfBrokers)
+ }
+
Logger.Println("client/metadata no available broker to send metadata request to")
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
@@ -792,7 +858,7 @@
switch topic.Err {
case ErrNoError:
- break
+ // no-op
case ErrInvalidTopic, ErrTopicAuthorizationFailed: // don't retry, don't store partial results
err = topic.Err
continue
@@ -802,7 +868,6 @@
continue
case ErrLeaderNotAvailable: // retry, but store partial partition results
retry = true
- break
default: // don't retry, don't store partial results
Logger.Printf("Unexpected topic-level metadata error: %s", topic.Err)
err = topic.Err
@@ -847,9 +912,8 @@
maxRetries := client.conf.Metadata.Retry.Max
retries := maxRetries - attemptsRemaining
return client.conf.Metadata.Retry.BackoffFunc(retries, maxRetries)
- } else {
- return client.conf.Metadata.Retry.Backoff
}
+ return client.conf.Metadata.Retry.Backoff
}
func (client *client) getConsumerMetadata(consumerGroup string, attemptsRemaining int) (*FindCoordinatorResponse, error) {
@@ -911,3 +975,18 @@
client.resurrectDeadBrokers()
return retry(ErrOutOfBrokers)
}
+
+// nopCloserClient embeds an existing Client, but disables
+// the Close method (yet all other methods pass
+// through unchanged). This is for use in larger structs
+// where it is undesirable to close the client that was
+// passed in by the caller.
+type nopCloserClient struct {
+ Client
+}
+
+// Close intercepts and purposely does not call the underlying
+// client's Close() method.
+func (ncc *nopCloserClient) Close() error {
+ return nil
+}