[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 1db6a0e..abe18b1 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -2,8 +2,11 @@
import (
"errors"
+ "fmt"
"math/rand"
+ "strconv"
"sync"
+ "time"
)
// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -39,6 +42,14 @@
// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
+ // Alter the replica assignment for partitions.
+ // This operation is supported by brokers with version 2.4.0.0 or higher.
+ AlterPartitionReassignments(topic string, assignment [][]int32) error
+
+ // Provides info on ongoing partitions replica reassignments.
+ // This operation is supported by brokers with version 2.4.0.0 or higher.
+ ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
+
// Delete records whose offset is smaller than the given offset of the corresponding partition.
// This operation is supported by brokers with version 0.11.0.0 or higher.
DeleteRecords(topic string, partitionOffsets map[int32]int64) error
@@ -90,6 +101,18 @@
// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
+ // Get information about all log directories on the given set of brokers
+ DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
+
+ // Get information about SCRAM users
+ DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
+
+ // Delete SCRAM users
+ DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
+
+ // Upsert SCRAM users
+ UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
+
// Close shuts down the admin and closes underlying client.
Close() error
}
@@ -105,9 +128,14 @@
if err != nil {
return nil, err
}
+ return NewClusterAdminFromClient(client)
+}
- //make sure we can retrieve the controller
- _, err = client.Controller()
+// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
+// Note that underlying client will also be closed on admin's Close() call.
+func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
+ // make sure we can retrieve the controller
+ _, err := client.Controller()
if err != nil {
return nil, err
}
@@ -127,8 +155,45 @@
return ca.client.Controller()
}
-func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+ return ca.client.RefreshController()
+}
+// isErrNoController returns `true` if the given error type unwraps to an
+// `ErrNotController` response from Kafka
+func isErrNoController(err error) bool {
+ switch e := err.(type) {
+ case *TopicError:
+ return e.Err == ErrNotController
+ case *TopicPartitionError:
+ return e.Err == ErrNotController
+ case KError:
+ return e == ErrNotController
+ }
+ return false
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retryable (as determined by the
+// provided retryable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
+ var err error
+ for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
+ err = fn()
+ if err == nil || !retryable(err) {
+ return err
+ }
+ Logger.Printf(
+ "admin/request retrying after %dms... (%d attempts remaining)\n",
+ ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
+ time.Sleep(ca.conf.Admin.Retry.Backoff)
+ continue
+ }
+ return err
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
if topic == "" {
return ErrInvalidTopic
}
@@ -153,26 +218,31 @@
request.Version = 2
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.CreateTopics(request)
- if err != nil {
- return err
- }
+ rsp, err := b.CreateTopics(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr.Err != ErrNoError {
- return topicErr
- }
+ if topicErr.Err != ErrNoError {
+ if topicErr.Err == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
- return nil
+ return nil
+ })
}
func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
@@ -209,6 +279,10 @@
Topics: []string{},
}
+ if ca.conf.Version.IsAtLeast(V0_10_0_0) {
+ request.Version = 1
+ }
+
response, err := controller.GetMetadata(request)
if err != nil {
return nil, int32(0), err
@@ -217,6 +291,16 @@
return response.Brokers, response.ControllerID, nil
}
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+ brokers := ca.client.Brokers()
+ for _, b := range brokers {
+ if b.ID() == id {
+ return b, nil
+ }
+ }
+ return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
brokers := ca.client.Brokers()
if len(brokers) > 0 {
@@ -274,6 +358,15 @@
describeConfigsReq := &DescribeConfigsRequest{
Resources: describeConfigsResources,
}
+
+ if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ describeConfigsReq.Version = 1
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ describeConfigsReq.Version = 2
+ }
+
describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
if err != nil {
return nil, err
@@ -299,7 +392,6 @@
}
func (ca *clusterAdmin) DeleteTopic(topic string) error {
-
if topic == "" {
return ErrInvalidTopic
}
@@ -313,25 +405,31 @@
request.Version = 1
}
- b, err := ca.Controller()
- if err != nil {
- return err
- }
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
- rsp, err := b.DeleteTopics(request)
- if err != nil {
- return err
- }
+ rsp, err := b.DeleteTopics(request)
+ if err != nil {
+ return err
+ }
- topicErr, ok := rsp.TopicErrorCodes[topic]
- if !ok {
- return ErrIncompleteResponse
- }
+ topicErr, ok := rsp.TopicErrorCodes[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
- if topicErr != ErrNoError {
- return topicErr
- }
- return nil
+ if topicErr != ErrNoError {
+ if topicErr == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
}
func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
@@ -347,30 +445,110 @@
Timeout: ca.conf.Admin.Timeout,
}
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ rsp, err := b.CreatePartitions(request)
+ if err != nil {
+ return err
+ }
+
+ topicErr, ok := rsp.TopicPartitionErrors[topic]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if topicErr.Err != ErrNoError {
+ if topicErr.Err == ErrNotController {
+ _, _ = ca.refreshController()
+ }
+ return topicErr
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
+ if topic == "" {
+ return ErrInvalidTopic
+ }
+
+ request := &AlterPartitionReassignmentsRequest{
+ TimeoutMs: int32(60000),
+ Version: int16(0),
+ }
+
+ for i := 0; i < len(assignment); i++ {
+ request.AddBlock(topic, int32(i), assignment[i])
+ }
+
+ return ca.retryOnError(isErrNoController, func() error {
+ b, err := ca.Controller()
+ if err != nil {
+ return err
+ }
+
+ errs := make([]error, 0)
+
+ rsp, err := b.AlterPartitionReassignments(request)
+
+ if err != nil {
+ errs = append(errs, err)
+ } else {
+ if rsp.ErrorCode > 0 {
+ errs = append(errs, errors.New(rsp.ErrorCode.Error()))
+ }
+
+ for topic, topicErrors := range rsp.Errors {
+ for partition, partitionError := range topicErrors {
+ if partitionError.errorCode != ErrNoError {
+ errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
+ errs = append(errs, errors.New(errStr))
+ }
+ }
+ }
+ }
+
+ if len(errs) > 0 {
+ return ErrReassignPartitions{MultiError{&errs}}
+ }
+
+ return nil
+ })
+}
+
+func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
+ if topic == "" {
+ return nil, ErrInvalidTopic
+ }
+
+ request := &ListPartitionReassignmentsRequest{
+ TimeoutMs: int32(60000),
+ Version: int16(0),
+ }
+
+ request.AddBlock(topic, partitions)
+
b, err := ca.Controller()
if err != nil {
- return err
+ return nil, err
}
+ _ = b.Open(ca.client.Config())
- rsp, err := b.CreatePartitions(request)
- if err != nil {
- return err
+ rsp, err := b.ListPartitionReassignments(request)
+
+ if err == nil && rsp != nil {
+ return rsp.TopicStatus, nil
+ } else {
+ return nil, err
}
-
- topicErr, ok := rsp.TopicPartitionErrors[topic]
- if !ok {
- return ErrIncompleteResponse
- }
-
- if topicErr.Err != ErrNoError {
- return topicErr
- }
-
- return nil
}
func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
-
if topic == "" {
return ErrInvalidTopic
}
@@ -380,11 +558,7 @@
if err != nil {
return err
}
- if _, ok := partitionPerBroker[broker]; ok {
- partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
- } else {
- partitionPerBroker[broker] = []int32{partition}
- }
+ partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
}
errs := make([]error, 0)
for broker, partitions := range partitionPerBroker {
@@ -418,13 +592,19 @@
if len(errs) > 0 {
return ErrDeleteRecords{MultiError{&errs}}
}
- //todo since we are dealing with couple of partitions it would be good if we return slice of errors
- //for each partition instead of one error
+ // todo since we are dealing with couple of partitions it would be good if we return slice of errors
+ // for each partition instead of one error
return nil
}
-func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+ return (resource.Type == BrokerResource && resource.Name != "") ||
+ resource.Type == BrokerLoggerResource
+}
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
var entries []ConfigEntry
var resources []*ConfigResource
resources = append(resources, &resource)
@@ -433,11 +613,35 @@
Resources: resources,
}
- b, err := ca.Controller()
+ if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+ request.Version = 1
+ }
+
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 2
+ }
+
+ var (
+ b *Broker
+ err error
+ )
+
+ // DescribeConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(resource) {
+ var id int64
+ id, err = strconv.ParseInt(resource.Name, 10, 32)
+ if err != nil {
+ return nil, err
+ }
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
if err != nil {
return nil, err
}
+ _ = b.Open(ca.client.Config())
rsp, err := b.DescribeConfigs(request)
if err != nil {
return nil, err
@@ -448,6 +652,9 @@
if rspResource.ErrorMsg != "" {
return nil, errors.New(rspResource.ErrorMsg)
}
+ if rspResource.ErrorCode != 0 {
+ return nil, KError(rspResource.ErrorCode)
+ }
for _, cfgEntry := range rspResource.Configs {
entries = append(entries, *cfgEntry)
}
@@ -457,7 +664,6 @@
}
func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
-
var resources []*AlterConfigsResource
resources = append(resources, &AlterConfigsResource{
Type: resourceType,
@@ -470,11 +676,27 @@
ValidateOnly: validateOnly,
}
- b, err := ca.Controller()
+ var (
+ b *Broker
+ err error
+ )
+
+ // AlterConfig of broker/broker logger must be sent to the broker in question
+ if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+ var id int64
+ id, err = strconv.ParseInt(name, 10, 32)
+ if err != nil {
+ return err
+ }
+ b, err = ca.findBroker(int32(id))
+ } else {
+ b, err = ca.findAnyBroker()
+ }
if err != nil {
return err
}
+ _ = b.Open(ca.client.Config())
rsp, err := b.AlterConfigs(request)
if err != nil {
return err
@@ -485,6 +707,9 @@
if rspResource.ErrorMsg != "" {
return errors.New(rspResource.ErrorMsg)
}
+ if rspResource.ErrorCode != 0 {
+ return KError(rspResource.ErrorCode)
+ }
}
}
return nil
@@ -495,6 +720,10 @@
acls = append(acls, &AclCreation{resource, acl})
request := &CreateAclsRequest{AclCreations: acls}
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
b, err := ca.Controller()
if err != nil {
return err
@@ -505,9 +734,12 @@
}
func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
-
request := &DescribeAclsRequest{AclFilter: filter}
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
b, err := ca.Controller()
if err != nil {
return nil, err
@@ -530,6 +762,10 @@
filters = append(filters, &filter)
request := &DeleteAclsRequest{Filters: filters}
+ if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+ request.Version = 1
+ }
+
b, err := ca.Controller()
if err != nil {
return nil, err
@@ -545,7 +781,6 @@
for _, mACL := range fr.MatchingAcls {
mAcls = append(mAcls, *mACL)
}
-
}
return mAcls, nil
}
@@ -559,7 +794,6 @@
return nil, err
}
groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
-
}
for broker, brokerGroups := range groupsPerBroker {
@@ -581,7 +815,7 @@
// Query brokers in parallel, since we have to query *all* brokers
brokers := ca.client.Brokers()
groupMaps := make(chan map[string]string, len(brokers))
- errors := make(chan error, len(brokers))
+ errChan := make(chan error, len(brokers))
wg := sync.WaitGroup{}
for _, b := range brokers {
@@ -592,7 +826,7 @@
response, err := b.ListGroups(&ListGroupsRequest{})
if err != nil {
- errors <- err
+ errChan <- err
return
}
@@ -602,13 +836,12 @@
}
groupMaps <- groups
-
}(b, ca.conf)
}
wg.Wait()
close(groupMaps)
- close(errors)
+ close(errChan)
for groupMap := range groupMaps {
for group, protocolType := range groupMap {
@@ -617,7 +850,7 @@
}
// Intentionally return only the first error for simplicity
- err = <-errors
+ err = <-errChan
return
}
@@ -667,3 +900,106 @@
return nil
}
+
+func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
+ allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
+
+ // Query brokers in parallel, since we may have to query multiple brokers
+ logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
+ errChan := make(chan error, len(brokerIds))
+ wg := sync.WaitGroup{}
+
+ for _, b := range brokerIds {
+ wg.Add(1)
+ broker, err := ca.findBroker(b)
+ if err != nil {
+ Logger.Printf("Unable to find broker with ID = %v\n", b)
+ continue
+ }
+ go func(b *Broker, conf *Config) {
+ defer wg.Done()
+ _ = b.Open(conf) // Ensure that broker is opened
+
+ response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
+ if err != nil {
+ errChan <- err
+ return
+ }
+ logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
+ logDirs[b.ID()] = response.LogDirs
+ logDirsMaps <- logDirs
+ }(broker, ca.conf)
+ }
+
+ wg.Wait()
+ close(logDirsMaps)
+ close(errChan)
+
+ for logDirsMap := range logDirsMaps {
+ for id, logDirs := range logDirsMap {
+ allLogDirs[id] = logDirs
+ }
+ }
+
+ // Intentionally return only the first error for simplicity
+ err = <-errChan
+ return
+}
+
+func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
+ req := &DescribeUserScramCredentialsRequest{}
+ for _, u := range users {
+ req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
+ Name: u,
+ })
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ rsp, err := b.DescribeUserScramCredentials(req)
+ if err != nil {
+ return nil, err
+ }
+
+ return rsp.Results, nil
+}
+
+func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
+ res, err := ca.AlterUserScramCredentials(upsert, nil)
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+ res, err := ca.AlterUserScramCredentials(nil, delete)
+ if err != nil {
+ return nil, err
+ }
+
+ return res, nil
+}
+
+func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+ req := &AlterUserScramCredentialsRequest{
+ Deletions: d,
+ Upsertions: u,
+ }
+
+ b, err := ca.Controller()
+ if err != nil {
+ return nil, err
+ }
+
+ rsp, err := b.AlterUserScramCredentials(req)
+ if err != nil {
+ return nil, err
+ }
+
+ return rsp.Results, nil
+}