VOL-1967 move api-server to separate repository
Change-Id: I21b85be74205805be15f8a85e53a903d16785671
diff --git a/vendor/github.com/Shopify/sarama/admin.go b/vendor/github.com/Shopify/sarama/admin.go
index 18b055a..1db6a0e 100644
--- a/vendor/github.com/Shopify/sarama/admin.go
+++ b/vendor/github.com/Shopify/sarama/admin.go
@@ -20,7 +20,7 @@
// List the topics available in the cluster with the default options.
ListTopics() (map[string]TopicDetail, error)
- // Describe some topics in the cluster
+ // Describe some topics in the cluster.
DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
// Delete a topic. It may take several seconds after the DeleteTopic to returns success
@@ -78,12 +78,15 @@
// List the consumer groups available in the cluster.
ListConsumerGroups() (map[string]string, error)
- // Describe the given consumer group
+ // Describe the given consumer groups.
DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
// List the consumer group offsets available in the cluster.
ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
+ // Delete a consumer group.
+ DeleteConsumerGroup(group string) error
+
// Get information about the nodes in the cluster
DescribeCluster() (brokers []*Broker, controllerID int32, err error)
@@ -131,7 +134,7 @@
}
if detail == nil {
- return errors.New("You must specify topic details")
+ return errors.New("you must specify topic details")
}
topicDetails := make(map[string]*TopicDetail)
@@ -166,7 +169,7 @@
}
if topicErr.Err != ErrNoError {
- return topicErr.Err
+ return topicErr
}
return nil
@@ -183,7 +186,9 @@
AllowAutoTopicCreation: false,
}
- if ca.conf.Version.IsAtLeast(V0_11_0_0) {
+ if ca.conf.Version.IsAtLeast(V1_0_0_0) {
+ request.Version = 5
+ } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
}
@@ -358,7 +363,7 @@
}
if topicErr.Err != ErrNoError {
- return topicErr.Err
+ return topicErr
}
return nil
@@ -369,29 +374,50 @@
if topic == "" {
return ErrInvalidTopic
}
-
- topics := make(map[string]*DeleteRecordsRequestTopic)
- topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
- request := &DeleteRecordsRequest{
- Topics: topics,
- Timeout: ca.conf.Admin.Timeout,
+ partitionPerBroker := make(map[*Broker][]int32)
+ for partition := range partitionOffsets {
+ broker, err := ca.client.Leader(topic, partition)
+ if err != nil {
+ return err
+ }
+ if _, ok := partitionPerBroker[broker]; ok {
+ partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
+ } else {
+ partitionPerBroker[broker] = []int32{partition}
+ }
}
+ errs := make([]error, 0)
+ for broker, partitions := range partitionPerBroker {
+ topics := make(map[string]*DeleteRecordsRequestTopic)
+ recordsToDelete := make(map[int32]int64)
+ for _, p := range partitions {
+ recordsToDelete[p] = partitionOffsets[p]
+ }
+ topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
+ request := &DeleteRecordsRequest{
+ Topics: topics,
+ Timeout: ca.conf.Admin.Timeout,
+ }
- b, err := ca.Controller()
- if err != nil {
- return err
+ rsp, err := broker.DeleteRecords(request)
+ if err != nil {
+ errs = append(errs, err)
+ } else {
+ deleteRecordsResponseTopic, ok := rsp.Topics[topic]
+ if !ok {
+ errs = append(errs, ErrIncompleteResponse)
+ } else {
+ for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
+ if deleteRecordsResponsePartition.Err != ErrNoError {
+ errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
+ }
+ }
+ }
+ }
}
-
- rsp, err := b.DeleteRecords(request)
- if err != nil {
- return err
+ if len(errs) > 0 {
+ return ErrDeleteRecords{MultiError{&errs}}
}
-
- _, ok := rsp.Topics[topic]
- if !ok {
- return ErrIncompleteResponse
- }
-
//todo since we are dealing with couple of partitions it would be good if we return slice of errors
//for each partition instead of one error
return nil
@@ -606,9 +632,38 @@
partitions: topicPartitions,
}
- if ca.conf.Version.IsAtLeast(V0_8_2_2) {
+ if ca.conf.Version.IsAtLeast(V0_10_2_0) {
+ request.Version = 2
+ } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
request.Version = 1
}
return coordinator.FetchOffset(request)
}
+
+func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
+ coordinator, err := ca.client.Coordinator(group)
+ if err != nil {
+ return err
+ }
+
+ request := &DeleteGroupsRequest{
+ Groups: []string{group},
+ }
+
+ resp, err := coordinator.DeleteGroups(request)
+ if err != nil {
+ return err
+ }
+
+ groupErr, ok := resp.GroupErrorCodes[group]
+ if !ok {
+ return ErrIncompleteResponse
+ }
+
+ if groupErr != ErrNoError {
+ return groupErr
+ }
+
+ return nil
+}