[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 1db6a0e..abe18b1 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -2,8 +2,11 @@
 
 import (
 	"errors"
+	"fmt"
 	"math/rand"
+	"strconv"
 	"sync"
+	"time"
 )
 
 // ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
@@ -39,6 +42,14 @@
 	// new partitions. This operation is supported by brokers with version 1.0.0 or higher.
 	CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
 
+	// Alter the replica assignment for partitions.
+	// This operation is supported by brokers with version 2.4.0.0 or higher.
+	AlterPartitionReassignments(topic string, assignment [][]int32) error
+
+	// Provides info on ongoing partitions replica reassignments.
+	// This operation is supported by brokers with version 2.4.0.0 or higher.
+	ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
+
 	// Delete records whose offset is smaller than the given offset of the corresponding partition.
 	// This operation is supported by brokers with version 0.11.0.0 or higher.
 	DeleteRecords(topic string, partitionOffsets map[int32]int64) error
@@ -90,6 +101,18 @@
 	// Get information about the nodes in the cluster
 	DescribeCluster() (brokers []*Broker, controllerID int32, err error)
 
+	// Get information about all log directories on the given set of brokers
+	DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
+
+	// Get information about SCRAM users
+	DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
+
+	// Delete SCRAM users
+	DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
+
+	// Upsert SCRAM users
+	UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
+
 	// Close shuts down the admin and closes underlying client.
 	Close() error
 }
@@ -105,9 +128,14 @@
 	if err != nil {
 		return nil, err
 	}
+	return NewClusterAdminFromClient(client)
+}
 
-	//make sure we can retrieve the controller
-	_, err = client.Controller()
+// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
+// Note that underlying client will also be closed on admin's Close() call.
+func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
+	// make sure we can retrieve the controller
+	_, err := client.Controller()
 	if err != nil {
 		return nil, err
 	}
@@ -127,8 +155,45 @@
 	return ca.client.Controller()
 }
 
-func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
+func (ca *clusterAdmin) refreshController() (*Broker, error) {
+	return ca.client.RefreshController()
+}
 
+// isErrNoController returns `true` if the given error type unwraps to an
+// `ErrNotController` response from Kafka
+func isErrNoController(err error) bool {
+	switch e := err.(type) {
+	case *TopicError:
+		return e.Err == ErrNotController
+	case *TopicPartitionError:
+		return e.Err == ErrNotController
+	case KError:
+		return e == ErrNotController
+	}
+	return false
+}
+
+// retryOnError will repeatedly call the given (error-returning) func in the
+// case that its response is non-nil and retryable (as determined by the
+// provided retryable func) up to the maximum number of tries permitted by
+// the admin client configuration
+func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
+	var err error
+	for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
+		err = fn()
+		if err == nil || !retryable(err) {
+			return err
+		}
+		Logger.Printf(
+			"admin/request retrying after %dms... (%d attempts remaining)\n",
+			ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
+		time.Sleep(ca.conf.Admin.Retry.Backoff)
+		continue
+	}
+	return err
+}
+
+func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
 	if topic == "" {
 		return ErrInvalidTopic
 	}
@@ -153,26 +218,31 @@
 		request.Version = 2
 	}
 
-	b, err := ca.Controller()
-	if err != nil {
-		return err
-	}
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
 
-	rsp, err := b.CreateTopics(request)
-	if err != nil {
-		return err
-	}
+		rsp, err := b.CreateTopics(request)
+		if err != nil {
+			return err
+		}
 
-	topicErr, ok := rsp.TopicErrors[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
+		topicErr, ok := rsp.TopicErrors[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
 
-	if topicErr.Err != ErrNoError {
-		return topicErr
-	}
+		if topicErr.Err != ErrNoError {
+			if topicErr.Err == ErrNotController {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
 
-	return nil
+		return nil
+	})
 }
 
 func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
@@ -209,6 +279,10 @@
 		Topics: []string{},
 	}
 
+	if ca.conf.Version.IsAtLeast(V0_10_0_0) {
+		request.Version = 1
+	}
+
 	response, err := controller.GetMetadata(request)
 	if err != nil {
 		return nil, int32(0), err
@@ -217,6 +291,16 @@
 	return response.Brokers, response.ControllerID, nil
 }
 
+func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
+	brokers := ca.client.Brokers()
+	for _, b := range brokers {
+		if b.ID() == id {
+			return b, nil
+		}
+	}
+	return nil, fmt.Errorf("could not find broker id %d", id)
+}
+
 func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
 	brokers := ca.client.Brokers()
 	if len(brokers) > 0 {
@@ -274,6 +358,15 @@
 	describeConfigsReq := &DescribeConfigsRequest{
 		Resources: describeConfigsResources,
 	}
+
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		describeConfigsReq.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		describeConfigsReq.Version = 2
+	}
+
 	describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
 	if err != nil {
 		return nil, err
@@ -299,7 +392,6 @@
 }
 
 func (ca *clusterAdmin) DeleteTopic(topic string) error {
-
 	if topic == "" {
 		return ErrInvalidTopic
 	}
@@ -313,25 +405,31 @@
 		request.Version = 1
 	}
 
-	b, err := ca.Controller()
-	if err != nil {
-		return err
-	}
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
 
-	rsp, err := b.DeleteTopics(request)
-	if err != nil {
-		return err
-	}
+		rsp, err := b.DeleteTopics(request)
+		if err != nil {
+			return err
+		}
 
-	topicErr, ok := rsp.TopicErrorCodes[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
+		topicErr, ok := rsp.TopicErrorCodes[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
 
-	if topicErr != ErrNoError {
-		return topicErr
-	}
-	return nil
+		if topicErr != ErrNoError {
+			if topicErr == ErrNotController {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
+
+		return nil
+	})
 }
 
 func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
@@ -347,30 +445,110 @@
 		Timeout:         ca.conf.Admin.Timeout,
 	}
 
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		rsp, err := b.CreatePartitions(request)
+		if err != nil {
+			return err
+		}
+
+		topicErr, ok := rsp.TopicPartitionErrors[topic]
+		if !ok {
+			return ErrIncompleteResponse
+		}
+
+		if topicErr.Err != ErrNoError {
+			if topicErr.Err == ErrNotController {
+				_, _ = ca.refreshController()
+			}
+			return topicErr
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
+	if topic == "" {
+		return ErrInvalidTopic
+	}
+
+	request := &AlterPartitionReassignmentsRequest{
+		TimeoutMs: int32(60000),
+		Version:   int16(0),
+	}
+
+	for i := 0; i < len(assignment); i++ {
+		request.AddBlock(topic, int32(i), assignment[i])
+	}
+
+	return ca.retryOnError(isErrNoController, func() error {
+		b, err := ca.Controller()
+		if err != nil {
+			return err
+		}
+
+		errs := make([]error, 0)
+
+		rsp, err := b.AlterPartitionReassignments(request)
+
+		if err != nil {
+			errs = append(errs, err)
+		} else {
+			if rsp.ErrorCode > 0 {
+				errs = append(errs, errors.New(rsp.ErrorCode.Error()))
+			}
+
+			for topic, topicErrors := range rsp.Errors {
+				for partition, partitionError := range topicErrors {
+					if partitionError.errorCode != ErrNoError {
+						errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
+						errs = append(errs, errors.New(errStr))
+					}
+				}
+			}
+		}
+
+		if len(errs) > 0 {
+			return ErrReassignPartitions{MultiError{&errs}}
+		}
+
+		return nil
+	})
+}
+
+func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
+	if topic == "" {
+		return nil, ErrInvalidTopic
+	}
+
+	request := &ListPartitionReassignmentsRequest{
+		TimeoutMs: int32(60000),
+		Version:   int16(0),
+	}
+
+	request.AddBlock(topic, partitions)
+
 	b, err := ca.Controller()
 	if err != nil {
-		return err
+		return nil, err
 	}
+	_ = b.Open(ca.client.Config())
 
-	rsp, err := b.CreatePartitions(request)
-	if err != nil {
-		return err
+	rsp, err := b.ListPartitionReassignments(request)
+
+	if err == nil && rsp != nil {
+		return rsp.TopicStatus, nil
+	} else {
+		return nil, err
 	}
-
-	topicErr, ok := rsp.TopicPartitionErrors[topic]
-	if !ok {
-		return ErrIncompleteResponse
-	}
-
-	if topicErr.Err != ErrNoError {
-		return topicErr
-	}
-
-	return nil
 }
 
 func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
-
 	if topic == "" {
 		return ErrInvalidTopic
 	}
@@ -380,11 +558,7 @@
 		if err != nil {
 			return err
 		}
-		if _, ok := partitionPerBroker[broker]; ok {
-			partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
-		} else {
-			partitionPerBroker[broker] = []int32{partition}
-		}
+		partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
 	}
 	errs := make([]error, 0)
 	for broker, partitions := range partitionPerBroker {
@@ -418,13 +592,19 @@
 	if len(errs) > 0 {
 		return ErrDeleteRecords{MultiError{&errs}}
 	}
-	//todo since we are dealing with couple of partitions it would be good if we return slice of errors
-	//for each partition instead of one error
+	// todo since we are dealing with couple of partitions it would be good if we return slice of errors
+	// for each partition instead of one error
 	return nil
 }
 
-func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
+// Returns a bool indicating whether the resource request needs to go to a
+// specific broker
+func dependsOnSpecificNode(resource ConfigResource) bool {
+	return (resource.Type == BrokerResource && resource.Name != "") ||
+		resource.Type == BrokerLoggerResource
+}
 
+func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
 	var entries []ConfigEntry
 	var resources []*ConfigResource
 	resources = append(resources, &resource)
@@ -433,11 +613,35 @@
 		Resources: resources,
 	}
 
-	b, err := ca.Controller()
+	if ca.conf.Version.IsAtLeast(V1_1_0_0) {
+		request.Version = 1
+	}
+
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 2
+	}
+
+	var (
+		b   *Broker
+		err error
+	)
+
+	// DescribeConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(resource) {
+		var id int64
+		id, err = strconv.ParseInt(resource.Name, 10, 32)
+		if err != nil {
+			return nil, err
+		}
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
 	if err != nil {
 		return nil, err
 	}
 
+	_ = b.Open(ca.client.Config())
 	rsp, err := b.DescribeConfigs(request)
 	if err != nil {
 		return nil, err
@@ -448,6 +652,9 @@
 			if rspResource.ErrorMsg != "" {
 				return nil, errors.New(rspResource.ErrorMsg)
 			}
+			if rspResource.ErrorCode != 0 {
+				return nil, KError(rspResource.ErrorCode)
+			}
 			for _, cfgEntry := range rspResource.Configs {
 				entries = append(entries, *cfgEntry)
 			}
@@ -457,7 +664,6 @@
 }
 
 func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
-
 	var resources []*AlterConfigsResource
 	resources = append(resources, &AlterConfigsResource{
 		Type:          resourceType,
@@ -470,11 +676,27 @@
 		ValidateOnly: validateOnly,
 	}
 
-	b, err := ca.Controller()
+	var (
+		b   *Broker
+		err error
+	)
+
+	// AlterConfig of broker/broker logger must be sent to the broker in question
+	if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
+		var id int64
+		id, err = strconv.ParseInt(name, 10, 32)
+		if err != nil {
+			return err
+		}
+		b, err = ca.findBroker(int32(id))
+	} else {
+		b, err = ca.findAnyBroker()
+	}
 	if err != nil {
 		return err
 	}
 
+	_ = b.Open(ca.client.Config())
 	rsp, err := b.AlterConfigs(request)
 	if err != nil {
 		return err
@@ -485,6 +707,9 @@
 			if rspResource.ErrorMsg != "" {
 				return errors.New(rspResource.ErrorMsg)
 			}
+			if rspResource.ErrorCode != 0 {
+				return KError(rspResource.ErrorCode)
+			}
 		}
 	}
 	return nil
@@ -495,6 +720,10 @@
 	acls = append(acls, &AclCreation{resource, acl})
 	request := &CreateAclsRequest{AclCreations: acls}
 
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
 	b, err := ca.Controller()
 	if err != nil {
 		return err
@@ -505,9 +734,12 @@
 }
 
 func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
-
 	request := &DescribeAclsRequest{AclFilter: filter}
 
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
 	b, err := ca.Controller()
 	if err != nil {
 		return nil, err
@@ -530,6 +762,10 @@
 	filters = append(filters, &filter)
 	request := &DeleteAclsRequest{Filters: filters}
 
+	if ca.conf.Version.IsAtLeast(V2_0_0_0) {
+		request.Version = 1
+	}
+
 	b, err := ca.Controller()
 	if err != nil {
 		return nil, err
@@ -545,7 +781,6 @@
 		for _, mACL := range fr.MatchingAcls {
 			mAcls = append(mAcls, *mACL)
 		}
-
 	}
 	return mAcls, nil
 }
@@ -559,7 +794,6 @@
 			return nil, err
 		}
 		groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
-
 	}
 
 	for broker, brokerGroups := range groupsPerBroker {
@@ -581,7 +815,7 @@
 	// Query brokers in parallel, since we have to query *all* brokers
 	brokers := ca.client.Brokers()
 	groupMaps := make(chan map[string]string, len(brokers))
-	errors := make(chan error, len(brokers))
+	errChan := make(chan error, len(brokers))
 	wg := sync.WaitGroup{}
 
 	for _, b := range brokers {
@@ -592,7 +826,7 @@
 
 			response, err := b.ListGroups(&ListGroupsRequest{})
 			if err != nil {
-				errors <- err
+				errChan <- err
 				return
 			}
 
@@ -602,13 +836,12 @@
 			}
 
 			groupMaps <- groups
-
 		}(b, ca.conf)
 	}
 
 	wg.Wait()
 	close(groupMaps)
-	close(errors)
+	close(errChan)
 
 	for groupMap := range groupMaps {
 		for group, protocolType := range groupMap {
@@ -617,7 +850,7 @@
 	}
 
 	// Intentionally return only the first error for simplicity
-	err = <-errors
+	err = <-errChan
 	return
 }
 
@@ -667,3 +900,106 @@
 
 	return nil
 }
+
+func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
+	allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
+
+	// Query brokers in parallel, since we may have to query multiple brokers
+	logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
+	errChan := make(chan error, len(brokerIds))
+	wg := sync.WaitGroup{}
+
+	for _, b := range brokerIds {
+		wg.Add(1)
+		broker, err := ca.findBroker(b)
+		if err != nil {
+			Logger.Printf("Unable to find broker with ID = %v\n", b)
+			continue
+		}
+		go func(b *Broker, conf *Config) {
+			defer wg.Done()
+			_ = b.Open(conf) // Ensure that broker is opened
+
+			response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
+			if err != nil {
+				errChan <- err
+				return
+			}
+			logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
+			logDirs[b.ID()] = response.LogDirs
+			logDirsMaps <- logDirs
+		}(broker, ca.conf)
+	}
+
+	wg.Wait()
+	close(logDirsMaps)
+	close(errChan)
+
+	for logDirsMap := range logDirsMaps {
+		for id, logDirs := range logDirsMap {
+			allLogDirs[id] = logDirs
+		}
+	}
+
+	// Intentionally return only the first error for simplicity
+	err = <-errChan
+	return
+}
+
+func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
+	req := &DescribeUserScramCredentialsRequest{}
+	for _, u := range users {
+		req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
+			Name: u,
+		})
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.DescribeUserScramCredentials(req)
+	if err != nil {
+		return nil, err
+	}
+
+	return rsp.Results, nil
+}
+
+func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
+	res, err := ca.AlterUserScramCredentials(upsert, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
+func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+	res, err := ca.AlterUserScramCredentials(nil, delete)
+	if err != nil {
+		return nil, err
+	}
+
+	return res, nil
+}
+
+func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
+	req := &AlterUserScramCredentialsRequest{
+		Deletions:  d,
+		Upsertions: u,
+	}
+
+	b, err := ca.Controller()
+	if err != nil {
+		return nil, err
+	}
+
+	rsp, err := b.AlterUserScramCredentials(req)
+	if err != nil {
+		return nil, err
+	}
+
+	return rsp.Results, nil
+}