VOL-1921 - updated to use go mod
Change-Id: I8d5187fa91fa619494f972bc29d3bd61e5be3a82
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index a4d1bc5..1db6a0e 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -374,29 +374,50 @@
if topic == "" {
return ErrInvalidTopic
}
-
- topics := make(map[string]*DeleteRecordsRequestTopic)
- topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
- request := &DeleteRecordsRequest{
- Topics: topics,
- Timeout: ca.conf.Admin.Timeout,
+ partitionPerBroker := make(map[*Broker][]int32)
+ for partition := range partitionOffsets {
+ broker, err := ca.client.Leader(topic, partition)
+ if err != nil {
+ return err
+ }
+ if _, ok := partitionPerBroker[broker]; ok {
+ partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+ } else {
+ partitionPerBroker[broker] = []int32{partition}
+ }
}
+ errs := make([]error, 0)
+ for broker, partitions := range partitionPerBroker {
+ topics := make(map[string]*DeleteRecordsRequestTopic)
+ recordsToDelete := make(map[int32]int64)
+ for _, p := range partitions {
+ recordsToDelete[p] = partitionOffsets[p]
+ }
+ topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
+ request := &DeleteRecordsRequest{
+ Topics: topics,
+ Timeout: ca.conf.Admin.Timeout,
+ }
- b, err := ca.Controller()
- if err != nil {
- return err
+ rsp, err := broker.DeleteRecords(request)
+ if err != nil {
+ errs = append(errs, err)
+ } else {
+ deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+ if !ok {
+ errs = append(errs, ErrIncompleteResponse)
+ } else {
+ for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+ if deleteRecordsResponsePartition.Err != ErrNoError {
+ errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
+ }
+ }
+ }
+ }
}
-
- rsp, err := b.DeleteRecords(request)
- if err != nil {
- return err
+ if len(errs) > 0 {
+ return ErrDeleteRecords{MultiError{&errs}}
}
-
- _, ok := rsp.Topics[topic]
- if !ok {
- return ErrIncompleteResponse
- }
-
//todo since we are dealing with couple of partitions it would be good if we return slice of errors
//for each partition instead of one error
return nil