blob: 367e124abd8dd2ce71867db22216bac9618e2d88 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "errors"
kesavandc71914f2022-03-25 11:19:03 +05305 "fmt"
kesavand2cde6582020-06-22 04:56:23 -04006 "math/rand"
kesavandc71914f2022-03-25 11:19:03 +05307 "strconv"
kesavand2cde6582020-06-22 04:56:23 -04008 "sync"
kesavandc71914f2022-03-25 11:19:03 +05309 "time"
kesavand2cde6582020-06-22 04:56:23 -040010)
11
12// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
13// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
14// Methods with stricter requirements will specify the minimum broker version required.
15// You MUST call Close() on a client to avoid leaks
16type ClusterAdmin interface {
17 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
18 // It may take several seconds after CreateTopic returns success for all the brokers
19 // to become aware that the topic has been created. During this time, listTopics
20 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
21 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
22
23 // List the topics available in the cluster with the default options.
24 ListTopics() (map[string]TopicDetail, error)
25
26 // Describe some topics in the cluster.
27 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
28
29 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
30 // and for all the brokers to become aware that the topics are gone.
31 // During this time, listTopics may continue to return information about the deleted topic.
32 // If delete.topic.enable is false on the brokers, deleteTopic will mark
33 // the topic for deletion, but not actually delete them.
34 // This operation is supported by brokers with version 0.10.1.0 or higher.
35 DeleteTopic(topic string) error
36
37 // Increase the number of partitions of the topics according to the corresponding values.
38 // If partitions are increased for a topic that has a key, the partition logic or ordering of
39 // the messages will be affected. It may take several seconds after this method returns
40 // success for all the brokers to become aware that the partitions have been created.
41 // During this time, ClusterAdmin#describeTopics may not return information about the
42 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
43 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
44
kesavandc71914f2022-03-25 11:19:03 +053045 // Alter the replica assignment for partitions.
46 // This operation is supported by brokers with version 2.4.0.0 or higher.
47 AlterPartitionReassignments(topic string, assignment [][]int32) error
48
49 // Provides info on ongoing partitions replica reassignments.
50 // This operation is supported by brokers with version 2.4.0.0 or higher.
51 ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
52
kesavand2cde6582020-06-22 04:56:23 -040053 // Delete records whose offset is smaller than the given offset of the corresponding partition.
54 // This operation is supported by brokers with version 0.11.0.0 or higher.
55 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
56
57 // Get the configuration for the specified resources.
58 // The returned configuration includes default values and the Default is true
59 // can be used to distinguish them from user supplied values.
60 // Config entries where ReadOnly is true cannot be updated.
61 // The value of config entries where Sensitive is true is always nil so
62 // sensitive information is not disclosed.
63 // This operation is supported by brokers with version 0.11.0.0 or higher.
64 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
65
66 // Update the configuration for the specified resources with the default options.
67 // This operation is supported by brokers with version 0.11.0.0 or higher.
68 // The resources with their configs (topic is the only resource type with configs
69 // that can be updated currently Updates are not transactional so they may succeed
70 // for some resources while fail for others. The configs for a particular resource are updated automatically.
71 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
72
kesavandc71914f2022-03-25 11:19:03 +053073 // IncrementalAlterConfig Incrementally Update the configuration for the specified resources with the default options.
74 // This operation is supported by brokers with version 2.3.0.0 or higher.
75 // Updates are not transactional so they may succeed for some resources while fail for others.
76 // The configs for a particular resource are updated automatically.
77 IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error
78
kesavand2cde6582020-06-22 04:56:23 -040079 // Creates access control lists (ACLs) which are bound to specific resources.
80 // This operation is not transactional so it may succeed for some ACLs while fail for others.
81 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
82 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
83 CreateACL(resource Resource, acl Acl) error
84
85 // Lists access control lists (ACLs) according to the supplied filter.
86 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
87 // This operation is supported by brokers with version 0.11.0.0 or higher.
88 ListAcls(filter AclFilter) ([]ResourceAcls, error)
89
90 // Deletes access control lists (ACLs) according to the supplied filters.
91 // This operation is not transactional so it may succeed for some ACLs while fail for others.
92 // This operation is supported by brokers with version 0.11.0.0 or higher.
93 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
94
95 // List the consumer groups available in the cluster.
96 ListConsumerGroups() (map[string]string, error)
97
98 // Describe the given consumer groups.
99 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
100
101 // List the consumer group offsets available in the cluster.
102 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
103
kesavandc71914f2022-03-25 11:19:03 +0530104 // Deletes a consumer group offset
105 DeleteConsumerGroupOffset(group string, topic string, partition int32) error
106
kesavand2cde6582020-06-22 04:56:23 -0400107 // Delete a consumer group.
108 DeleteConsumerGroup(group string) error
109
110 // Get information about the nodes in the cluster
111 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
112
kesavandc71914f2022-03-25 11:19:03 +0530113 // Get information about all log directories on the given set of brokers
114 DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
115
116 // Get information about SCRAM users
117 DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
118
119 // Delete SCRAM users
120 DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
121
122 // Upsert SCRAM users
123 UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
124
125 // Get client quota configurations corresponding to the specified filter.
126 // This operation is supported by brokers with version 2.6.0.0 or higher.
127 DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error)
128
129 // Alters client quota configurations with the specified alterations.
130 // This operation is supported by brokers with version 2.6.0.0 or higher.
131 AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error
132
133 // Controller returns the cluster controller broker. It will return a
134 // locally cached value if it's available.
135 Controller() (*Broker, error)
136
kesavand2cde6582020-06-22 04:56:23 -0400137 // Close shuts down the admin and closes underlying client.
138 Close() error
139}
140
141type clusterAdmin struct {
142 client Client
143 conf *Config
144}
145
146// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
147func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
148 client, err := NewClient(addrs, conf)
149 if err != nil {
150 return nil, err
151 }
kesavandc71914f2022-03-25 11:19:03 +0530152 return NewClusterAdminFromClient(client)
153}
kesavand2cde6582020-06-22 04:56:23 -0400154
kesavandc71914f2022-03-25 11:19:03 +0530155// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
156// Note that underlying client will also be closed on admin's Close() call.
157func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
158 // make sure we can retrieve the controller
159 _, err := client.Controller()
kesavand2cde6582020-06-22 04:56:23 -0400160 if err != nil {
161 return nil, err
162 }
163
164 ca := &clusterAdmin{
165 client: client,
166 conf: client.Config(),
167 }
168 return ca, nil
169}
170
171func (ca *clusterAdmin) Close() error {
172 return ca.client.Close()
173}
174
175func (ca *clusterAdmin) Controller() (*Broker, error) {
176 return ca.client.Controller()
177}
178
kesavandc71914f2022-03-25 11:19:03 +0530179func (ca *clusterAdmin) refreshController() (*Broker, error) {
180 return ca.client.RefreshController()
181}
kesavand2cde6582020-06-22 04:56:23 -0400182
kesavandc71914f2022-03-25 11:19:03 +0530183// isErrNoController returns `true` if the given error type unwraps to an
184// `ErrNotController` response from Kafka
185func isErrNoController(err error) bool {
186 switch e := err.(type) {
187 case *TopicError:
188 return e.Err == ErrNotController
189 case *TopicPartitionError:
190 return e.Err == ErrNotController
191 case KError:
192 return e == ErrNotController
193 }
194 return false
195}
196
197// retryOnError will repeatedly call the given (error-returning) func in the
198// case that its response is non-nil and retryable (as determined by the
199// provided retryable func) up to the maximum number of tries permitted by
200// the admin client configuration
201func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
202 var err error
203 for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
204 err = fn()
205 if err == nil || !retryable(err) {
206 return err
207 }
208 Logger.Printf(
209 "admin/request retrying after %dms... (%d attempts remaining)\n",
210 ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
211 time.Sleep(ca.conf.Admin.Retry.Backoff)
212 continue
213 }
214 return err
215}
216
217func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
kesavand2cde6582020-06-22 04:56:23 -0400218 if topic == "" {
219 return ErrInvalidTopic
220 }
221
222 if detail == nil {
223 return errors.New("you must specify topic details")
224 }
225
226 topicDetails := make(map[string]*TopicDetail)
227 topicDetails[topic] = detail
228
229 request := &CreateTopicsRequest{
230 TopicDetails: topicDetails,
231 ValidateOnly: validateOnly,
232 Timeout: ca.conf.Admin.Timeout,
233 }
234
235 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
236 request.Version = 1
237 }
238 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
239 request.Version = 2
240 }
241
kesavandc71914f2022-03-25 11:19:03 +0530242 return ca.retryOnError(isErrNoController, func() error {
243 b, err := ca.Controller()
244 if err != nil {
245 return err
246 }
kesavand2cde6582020-06-22 04:56:23 -0400247
kesavandc71914f2022-03-25 11:19:03 +0530248 rsp, err := b.CreateTopics(request)
249 if err != nil {
250 return err
251 }
kesavand2cde6582020-06-22 04:56:23 -0400252
kesavandc71914f2022-03-25 11:19:03 +0530253 topicErr, ok := rsp.TopicErrors[topic]
254 if !ok {
255 return ErrIncompleteResponse
256 }
kesavand2cde6582020-06-22 04:56:23 -0400257
kesavandc71914f2022-03-25 11:19:03 +0530258 if topicErr.Err != ErrNoError {
259 if topicErr.Err == ErrNotController {
260 _, _ = ca.refreshController()
261 }
262 return topicErr
263 }
kesavand2cde6582020-06-22 04:56:23 -0400264
kesavandc71914f2022-03-25 11:19:03 +0530265 return nil
266 })
kesavand2cde6582020-06-22 04:56:23 -0400267}
268
269func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
270 controller, err := ca.Controller()
271 if err != nil {
272 return nil, err
273 }
274
275 request := &MetadataRequest{
276 Topics: topics,
277 AllowAutoTopicCreation: false,
278 }
279
280 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
281 request.Version = 5
282 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
283 request.Version = 4
284 }
285
286 response, err := controller.GetMetadata(request)
287 if err != nil {
288 return nil, err
289 }
290 return response.Topics, nil
291}
292
293func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
294 controller, err := ca.Controller()
295 if err != nil {
296 return nil, int32(0), err
297 }
298
299 request := &MetadataRequest{
300 Topics: []string{},
301 }
302
kesavandc71914f2022-03-25 11:19:03 +0530303 if ca.conf.Version.IsAtLeast(V0_10_0_0) {
304 request.Version = 1
305 }
306
kesavand2cde6582020-06-22 04:56:23 -0400307 response, err := controller.GetMetadata(request)
308 if err != nil {
309 return nil, int32(0), err
310 }
311
312 return response.Brokers, response.ControllerID, nil
313}
314
kesavandc71914f2022-03-25 11:19:03 +0530315func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
316 brokers := ca.client.Brokers()
317 for _, b := range brokers {
318 if b.ID() == id {
319 return b, nil
320 }
321 }
322 return nil, fmt.Errorf("could not find broker id %d", id)
323}
324
kesavand2cde6582020-06-22 04:56:23 -0400325func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
326 brokers := ca.client.Brokers()
327 if len(brokers) > 0 {
328 index := rand.Intn(len(brokers))
329 return brokers[index], nil
330 }
331 return nil, errors.New("no available broker")
332}
333
334func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
335 // In order to build TopicDetails we need to first get the list of all
336 // topics using a MetadataRequest and then get their configs using a
337 // DescribeConfigsRequest request. To avoid sending many requests to the
338 // broker, we use a single DescribeConfigsRequest.
339
340 // Send the all-topic MetadataRequest
341 b, err := ca.findAnyBroker()
342 if err != nil {
343 return nil, err
344 }
345 _ = b.Open(ca.client.Config())
346
347 metadataReq := &MetadataRequest{}
348 metadataResp, err := b.GetMetadata(metadataReq)
349 if err != nil {
350 return nil, err
351 }
352
353 topicsDetailsMap := make(map[string]TopicDetail)
354
355 var describeConfigsResources []*ConfigResource
356
357 for _, topic := range metadataResp.Topics {
358 topicDetails := TopicDetail{
359 NumPartitions: int32(len(topic.Partitions)),
360 }
361 if len(topic.Partitions) > 0 {
362 topicDetails.ReplicaAssignment = map[int32][]int32{}
363 for _, partition := range topic.Partitions {
364 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
365 }
366 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
367 }
368 topicsDetailsMap[topic.Name] = topicDetails
369
370 // we populate the resources we want to describe from the MetadataResponse
371 topicResource := ConfigResource{
372 Type: TopicResource,
373 Name: topic.Name,
374 }
375 describeConfigsResources = append(describeConfigsResources, &topicResource)
376 }
377
378 // Send the DescribeConfigsRequest
379 describeConfigsReq := &DescribeConfigsRequest{
380 Resources: describeConfigsResources,
381 }
kesavandc71914f2022-03-25 11:19:03 +0530382
383 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
384 describeConfigsReq.Version = 1
385 }
386
387 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
388 describeConfigsReq.Version = 2
389 }
390
kesavand2cde6582020-06-22 04:56:23 -0400391 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
392 if err != nil {
393 return nil, err
394 }
395
396 for _, resource := range describeConfigsResp.Resources {
397 topicDetails := topicsDetailsMap[resource.Name]
398 topicDetails.ConfigEntries = make(map[string]*string)
399
400 for _, entry := range resource.Configs {
401 // only include non-default non-sensitive config
402 // (don't actually think topic config will ever be sensitive)
403 if entry.Default || entry.Sensitive {
404 continue
405 }
406 topicDetails.ConfigEntries[entry.Name] = &entry.Value
407 }
408
409 topicsDetailsMap[resource.Name] = topicDetails
410 }
411
412 return topicsDetailsMap, nil
413}
414
415func (ca *clusterAdmin) DeleteTopic(topic string) error {
kesavand2cde6582020-06-22 04:56:23 -0400416 if topic == "" {
417 return ErrInvalidTopic
418 }
419
420 request := &DeleteTopicsRequest{
421 Topics: []string{topic},
422 Timeout: ca.conf.Admin.Timeout,
423 }
424
425 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
426 request.Version = 1
427 }
428
kesavandc71914f2022-03-25 11:19:03 +0530429 return ca.retryOnError(isErrNoController, func() error {
430 b, err := ca.Controller()
431 if err != nil {
432 return err
433 }
kesavand2cde6582020-06-22 04:56:23 -0400434
kesavandc71914f2022-03-25 11:19:03 +0530435 rsp, err := b.DeleteTopics(request)
436 if err != nil {
437 return err
438 }
kesavand2cde6582020-06-22 04:56:23 -0400439
kesavandc71914f2022-03-25 11:19:03 +0530440 topicErr, ok := rsp.TopicErrorCodes[topic]
441 if !ok {
442 return ErrIncompleteResponse
443 }
kesavand2cde6582020-06-22 04:56:23 -0400444
kesavandc71914f2022-03-25 11:19:03 +0530445 if topicErr != ErrNoError {
446 if topicErr == ErrNotController {
447 _, _ = ca.refreshController()
448 }
449 return topicErr
450 }
451
452 return nil
453 })
kesavand2cde6582020-06-22 04:56:23 -0400454}
455
456func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
457 if topic == "" {
458 return ErrInvalidTopic
459 }
460
461 topicPartitions := make(map[string]*TopicPartition)
462 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
463
464 request := &CreatePartitionsRequest{
465 TopicPartitions: topicPartitions,
466 Timeout: ca.conf.Admin.Timeout,
kesavandc71914f2022-03-25 11:19:03 +0530467 ValidateOnly: validateOnly,
kesavand2cde6582020-06-22 04:56:23 -0400468 }
469
kesavandc71914f2022-03-25 11:19:03 +0530470 return ca.retryOnError(isErrNoController, func() error {
471 b, err := ca.Controller()
472 if err != nil {
473 return err
474 }
kesavand2cde6582020-06-22 04:56:23 -0400475
kesavandc71914f2022-03-25 11:19:03 +0530476 rsp, err := b.CreatePartitions(request)
477 if err != nil {
478 return err
479 }
kesavand2cde6582020-06-22 04:56:23 -0400480
kesavandc71914f2022-03-25 11:19:03 +0530481 topicErr, ok := rsp.TopicPartitionErrors[topic]
482 if !ok {
483 return ErrIncompleteResponse
484 }
kesavand2cde6582020-06-22 04:56:23 -0400485
kesavandc71914f2022-03-25 11:19:03 +0530486 if topicErr.Err != ErrNoError {
487 if topicErr.Err == ErrNotController {
488 _, _ = ca.refreshController()
489 }
490 return topicErr
491 }
kesavand2cde6582020-06-22 04:56:23 -0400492
kesavandc71914f2022-03-25 11:19:03 +0530493 return nil
494 })
kesavand2cde6582020-06-22 04:56:23 -0400495}
496
kesavandc71914f2022-03-25 11:19:03 +0530497func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
kesavand2cde6582020-06-22 04:56:23 -0400498 if topic == "" {
499 return ErrInvalidTopic
500 }
501
kesavandc71914f2022-03-25 11:19:03 +0530502 request := &AlterPartitionReassignmentsRequest{
503 TimeoutMs: int32(60000),
504 Version: int16(0),
kesavand2cde6582020-06-22 04:56:23 -0400505 }
506
kesavandc71914f2022-03-25 11:19:03 +0530507 for i := 0; i < len(assignment); i++ {
508 request.AddBlock(topic, int32(i), assignment[i])
509 }
510
511 return ca.retryOnError(isErrNoController, func() error {
512 b, err := ca.Controller()
513 if err != nil {
514 return err
515 }
516
517 errs := make([]error, 0)
518
519 rsp, err := b.AlterPartitionReassignments(request)
520
521 if err != nil {
522 errs = append(errs, err)
523 } else {
524 if rsp.ErrorCode > 0 {
525 errs = append(errs, errors.New(rsp.ErrorCode.Error()))
526 }
527
528 for topic, topicErrors := range rsp.Errors {
529 for partition, partitionError := range topicErrors {
530 if partitionError.errorCode != ErrNoError {
531 errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
532 errs = append(errs, errors.New(errStr))
533 }
534 }
535 }
536 }
537
538 if len(errs) > 0 {
539 return ErrReassignPartitions{MultiError{&errs}}
540 }
541
542 return nil
543 })
544}
545
546func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
547 if topic == "" {
548 return nil, ErrInvalidTopic
549 }
550
551 request := &ListPartitionReassignmentsRequest{
552 TimeoutMs: int32(60000),
553 Version: int16(0),
554 }
555
556 request.AddBlock(topic, partitions)
557
kesavand2cde6582020-06-22 04:56:23 -0400558 b, err := ca.Controller()
559 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +0530560 return nil, err
kesavand2cde6582020-06-22 04:56:23 -0400561 }
kesavandc71914f2022-03-25 11:19:03 +0530562 _ = b.Open(ca.client.Config())
kesavand2cde6582020-06-22 04:56:23 -0400563
kesavandc71914f2022-03-25 11:19:03 +0530564 rsp, err := b.ListPartitionReassignments(request)
565
566 if err == nil && rsp != nil {
567 return rsp.TopicStatus, nil
568 } else {
569 return nil, err
kesavand2cde6582020-06-22 04:56:23 -0400570 }
kesavandc71914f2022-03-25 11:19:03 +0530571}
kesavand2cde6582020-06-22 04:56:23 -0400572
kesavandc71914f2022-03-25 11:19:03 +0530573func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
574 if topic == "" {
575 return ErrInvalidTopic
kesavand2cde6582020-06-22 04:56:23 -0400576 }
kesavandc71914f2022-03-25 11:19:03 +0530577 partitionPerBroker := make(map[*Broker][]int32)
578 for partition := range partitionOffsets {
579 broker, err := ca.client.Leader(topic, partition)
580 if err != nil {
581 return err
582 }
583 partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
584 }
585 errs := make([]error, 0)
586 for broker, partitions := range partitionPerBroker {
587 topics := make(map[string]*DeleteRecordsRequestTopic)
588 recordsToDelete := make(map[int32]int64)
589 for _, p := range partitions {
590 recordsToDelete[p] = partitionOffsets[p]
591 }
592 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
593 request := &DeleteRecordsRequest{
594 Topics: topics,
595 Timeout: ca.conf.Admin.Timeout,
596 }
kesavand2cde6582020-06-22 04:56:23 -0400597
kesavandc71914f2022-03-25 11:19:03 +0530598 rsp, err := broker.DeleteRecords(request)
599 if err != nil {
600 errs = append(errs, err)
601 } else {
602 deleteRecordsResponseTopic, ok := rsp.Topics[topic]
603 if !ok {
604 errs = append(errs, ErrIncompleteResponse)
605 } else {
606 for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
607 if deleteRecordsResponsePartition.Err != ErrNoError {
608 errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
609 }
610 }
611 }
612 }
613 }
614 if len(errs) > 0 {
615 return ErrDeleteRecords{MultiError{&errs}}
616 }
617 // todo since we are dealing with couple of partitions it would be good if we return slice of errors
618 // for each partition instead of one error
kesavand2cde6582020-06-22 04:56:23 -0400619 return nil
620}
621
kesavandc71914f2022-03-25 11:19:03 +0530622// Returns a bool indicating whether the resource request needs to go to a
623// specific broker
624func dependsOnSpecificNode(resource ConfigResource) bool {
625 return (resource.Type == BrokerResource && resource.Name != "") ||
626 resource.Type == BrokerLoggerResource
627}
kesavand2cde6582020-06-22 04:56:23 -0400628
kesavandc71914f2022-03-25 11:19:03 +0530629func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
kesavand2cde6582020-06-22 04:56:23 -0400630 var entries []ConfigEntry
631 var resources []*ConfigResource
632 resources = append(resources, &resource)
633
634 request := &DescribeConfigsRequest{
635 Resources: resources,
636 }
637
kesavandc71914f2022-03-25 11:19:03 +0530638 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
639 request.Version = 1
640 }
641
642 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
643 request.Version = 2
644 }
645
646 var (
647 b *Broker
648 err error
649 )
650
651 // DescribeConfig of broker/broker logger must be sent to the broker in question
652 if dependsOnSpecificNode(resource) {
653 var id int64
654 id, err = strconv.ParseInt(resource.Name, 10, 32)
655 if err != nil {
656 return nil, err
657 }
658 b, err = ca.findBroker(int32(id))
659 } else {
660 b, err = ca.findAnyBroker()
661 }
kesavand2cde6582020-06-22 04:56:23 -0400662 if err != nil {
663 return nil, err
664 }
665
kesavandc71914f2022-03-25 11:19:03 +0530666 _ = b.Open(ca.client.Config())
kesavand2cde6582020-06-22 04:56:23 -0400667 rsp, err := b.DescribeConfigs(request)
668 if err != nil {
669 return nil, err
670 }
671
672 for _, rspResource := range rsp.Resources {
673 if rspResource.Name == resource.Name {
674 if rspResource.ErrorMsg != "" {
675 return nil, errors.New(rspResource.ErrorMsg)
676 }
kesavandc71914f2022-03-25 11:19:03 +0530677 if rspResource.ErrorCode != 0 {
678 return nil, KError(rspResource.ErrorCode)
679 }
kesavand2cde6582020-06-22 04:56:23 -0400680 for _, cfgEntry := range rspResource.Configs {
681 entries = append(entries, *cfgEntry)
682 }
683 }
684 }
685 return entries, nil
686}
687
688func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
kesavand2cde6582020-06-22 04:56:23 -0400689 var resources []*AlterConfigsResource
690 resources = append(resources, &AlterConfigsResource{
691 Type: resourceType,
692 Name: name,
693 ConfigEntries: entries,
694 })
695
696 request := &AlterConfigsRequest{
697 Resources: resources,
698 ValidateOnly: validateOnly,
699 }
700
kesavandc71914f2022-03-25 11:19:03 +0530701 var (
702 b *Broker
703 err error
704 )
705
706 // AlterConfig of broker/broker logger must be sent to the broker in question
707 if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
708 var id int64
709 id, err = strconv.ParseInt(name, 10, 32)
710 if err != nil {
711 return err
712 }
713 b, err = ca.findBroker(int32(id))
714 } else {
715 b, err = ca.findAnyBroker()
716 }
kesavand2cde6582020-06-22 04:56:23 -0400717 if err != nil {
718 return err
719 }
720
kesavandc71914f2022-03-25 11:19:03 +0530721 _ = b.Open(ca.client.Config())
kesavand2cde6582020-06-22 04:56:23 -0400722 rsp, err := b.AlterConfigs(request)
723 if err != nil {
724 return err
725 }
726
727 for _, rspResource := range rsp.Resources {
728 if rspResource.Name == name {
729 if rspResource.ErrorMsg != "" {
730 return errors.New(rspResource.ErrorMsg)
731 }
kesavandc71914f2022-03-25 11:19:03 +0530732 if rspResource.ErrorCode != 0 {
733 return KError(rspResource.ErrorCode)
734 }
735 }
736 }
737 return nil
738}
739
740func (ca *clusterAdmin) IncrementalAlterConfig(resourceType ConfigResourceType, name string, entries map[string]IncrementalAlterConfigsEntry, validateOnly bool) error {
741 var resources []*IncrementalAlterConfigsResource
742 resources = append(resources, &IncrementalAlterConfigsResource{
743 Type: resourceType,
744 Name: name,
745 ConfigEntries: entries,
746 })
747
748 request := &IncrementalAlterConfigsRequest{
749 Resources: resources,
750 ValidateOnly: validateOnly,
751 }
752
753 var (
754 b *Broker
755 err error
756 )
757
758 // AlterConfig of broker/broker logger must be sent to the broker in question
759 if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
760 var id int64
761 id, err = strconv.ParseInt(name, 10, 32)
762 if err != nil {
763 return err
764 }
765 b, err = ca.findBroker(int32(id))
766 } else {
767 b, err = ca.findAnyBroker()
768 }
769 if err != nil {
770 return err
771 }
772
773 _ = b.Open(ca.client.Config())
774 rsp, err := b.IncrementalAlterConfigs(request)
775 if err != nil {
776 return err
777 }
778
779 for _, rspResource := range rsp.Resources {
780 if rspResource.Name == name {
781 if rspResource.ErrorMsg != "" {
782 return errors.New(rspResource.ErrorMsg)
783 }
784 if rspResource.ErrorCode != 0 {
785 return KError(rspResource.ErrorCode)
786 }
kesavand2cde6582020-06-22 04:56:23 -0400787 }
788 }
789 return nil
790}
791
792func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
793 var acls []*AclCreation
794 acls = append(acls, &AclCreation{resource, acl})
795 request := &CreateAclsRequest{AclCreations: acls}
796
kesavandc71914f2022-03-25 11:19:03 +0530797 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
798 request.Version = 1
799 }
800
kesavand2cde6582020-06-22 04:56:23 -0400801 b, err := ca.Controller()
802 if err != nil {
803 return err
804 }
805
806 _, err = b.CreateAcls(request)
807 return err
808}
809
810func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
kesavand2cde6582020-06-22 04:56:23 -0400811 request := &DescribeAclsRequest{AclFilter: filter}
812
kesavandc71914f2022-03-25 11:19:03 +0530813 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
814 request.Version = 1
815 }
816
kesavand2cde6582020-06-22 04:56:23 -0400817 b, err := ca.Controller()
818 if err != nil {
819 return nil, err
820 }
821
822 rsp, err := b.DescribeAcls(request)
823 if err != nil {
824 return nil, err
825 }
826
827 var lAcls []ResourceAcls
828 for _, rAcl := range rsp.ResourceAcls {
829 lAcls = append(lAcls, *rAcl)
830 }
831 return lAcls, nil
832}
833
834func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
835 var filters []*AclFilter
836 filters = append(filters, &filter)
837 request := &DeleteAclsRequest{Filters: filters}
838
kesavandc71914f2022-03-25 11:19:03 +0530839 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
840 request.Version = 1
841 }
842
kesavand2cde6582020-06-22 04:56:23 -0400843 b, err := ca.Controller()
844 if err != nil {
845 return nil, err
846 }
847
848 rsp, err := b.DeleteAcls(request)
849 if err != nil {
850 return nil, err
851 }
852
853 var mAcls []MatchingAcl
854 for _, fr := range rsp.FilterResponses {
855 for _, mACL := range fr.MatchingAcls {
856 mAcls = append(mAcls, *mACL)
857 }
kesavand2cde6582020-06-22 04:56:23 -0400858 }
859 return mAcls, nil
860}
861
862func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
863 groupsPerBroker := make(map[*Broker][]string)
864
865 for _, group := range groups {
866 controller, err := ca.client.Coordinator(group)
867 if err != nil {
868 return nil, err
869 }
870 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
kesavand2cde6582020-06-22 04:56:23 -0400871 }
872
873 for broker, brokerGroups := range groupsPerBroker {
874 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
875 Groups: brokerGroups,
876 })
877 if err != nil {
878 return nil, err
879 }
880
881 result = append(result, response.Groups...)
882 }
883 return result, nil
884}
885
886func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
887 allGroups = make(map[string]string)
888
889 // Query brokers in parallel, since we have to query *all* brokers
890 brokers := ca.client.Brokers()
891 groupMaps := make(chan map[string]string, len(brokers))
kesavandc71914f2022-03-25 11:19:03 +0530892 errChan := make(chan error, len(brokers))
kesavand2cde6582020-06-22 04:56:23 -0400893 wg := sync.WaitGroup{}
894
895 for _, b := range brokers {
896 wg.Add(1)
897 go func(b *Broker, conf *Config) {
898 defer wg.Done()
899 _ = b.Open(conf) // Ensure that broker is opened
900
901 response, err := b.ListGroups(&ListGroupsRequest{})
902 if err != nil {
kesavandc71914f2022-03-25 11:19:03 +0530903 errChan <- err
kesavand2cde6582020-06-22 04:56:23 -0400904 return
905 }
906
907 groups := make(map[string]string)
908 for group, typ := range response.Groups {
909 groups[group] = typ
910 }
911
912 groupMaps <- groups
kesavand2cde6582020-06-22 04:56:23 -0400913 }(b, ca.conf)
914 }
915
916 wg.Wait()
917 close(groupMaps)
kesavandc71914f2022-03-25 11:19:03 +0530918 close(errChan)
kesavand2cde6582020-06-22 04:56:23 -0400919
920 for groupMap := range groupMaps {
921 for group, protocolType := range groupMap {
922 allGroups[group] = protocolType
923 }
924 }
925
926 // Intentionally return only the first error for simplicity
kesavandc71914f2022-03-25 11:19:03 +0530927 err = <-errChan
kesavand2cde6582020-06-22 04:56:23 -0400928 return
929}
930
931func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
932 coordinator, err := ca.client.Coordinator(group)
933 if err != nil {
934 return nil, err
935 }
936
937 request := &OffsetFetchRequest{
938 ConsumerGroup: group,
939 partitions: topicPartitions,
940 }
941
942 if ca.conf.Version.IsAtLeast(V0_10_2_0) {
943 request.Version = 2
944 } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
945 request.Version = 1
946 }
947
948 return coordinator.FetchOffset(request)
949}
950
kesavandc71914f2022-03-25 11:19:03 +0530951func (ca *clusterAdmin) DeleteConsumerGroupOffset(group string, topic string, partition int32) error {
952 coordinator, err := ca.client.Coordinator(group)
953 if err != nil {
954 return err
955 }
956
957 request := &DeleteOffsetsRequest{
958 Group: group,
959 partitions: map[string][]int32{
960 topic: {partition},
961 },
962 }
963
964 resp, err := coordinator.DeleteOffsets(request)
965 if err != nil {
966 return err
967 }
968
969 if resp.ErrorCode != ErrNoError {
970 return resp.ErrorCode
971 }
972
973 if resp.Errors[topic][partition] != ErrNoError {
974 return resp.Errors[topic][partition]
975 }
976 return nil
977}
978
kesavand2cde6582020-06-22 04:56:23 -0400979func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
980 coordinator, err := ca.client.Coordinator(group)
981 if err != nil {
982 return err
983 }
984
985 request := &DeleteGroupsRequest{
986 Groups: []string{group},
987 }
988
989 resp, err := coordinator.DeleteGroups(request)
990 if err != nil {
991 return err
992 }
993
994 groupErr, ok := resp.GroupErrorCodes[group]
995 if !ok {
996 return ErrIncompleteResponse
997 }
998
999 if groupErr != ErrNoError {
1000 return groupErr
1001 }
1002
1003 return nil
1004}
kesavandc71914f2022-03-25 11:19:03 +05301005
1006func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
1007 allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
1008
1009 // Query brokers in parallel, since we may have to query multiple brokers
1010 logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
1011 errChan := make(chan error, len(brokerIds))
1012 wg := sync.WaitGroup{}
1013
1014 for _, b := range brokerIds {
1015 wg.Add(1)
1016 broker, err := ca.findBroker(b)
1017 if err != nil {
1018 Logger.Printf("Unable to find broker with ID = %v\n", b)
1019 continue
1020 }
1021 go func(b *Broker, conf *Config) {
1022 defer wg.Done()
1023 _ = b.Open(conf) // Ensure that broker is opened
1024
1025 response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
1026 if err != nil {
1027 errChan <- err
1028 return
1029 }
1030 logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
1031 logDirs[b.ID()] = response.LogDirs
1032 logDirsMaps <- logDirs
1033 }(broker, ca.conf)
1034 }
1035
1036 wg.Wait()
1037 close(logDirsMaps)
1038 close(errChan)
1039
1040 for logDirsMap := range logDirsMaps {
1041 for id, logDirs := range logDirsMap {
1042 allLogDirs[id] = logDirs
1043 }
1044 }
1045
1046 // Intentionally return only the first error for simplicity
1047 err = <-errChan
1048 return
1049}
1050
1051func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
1052 req := &DescribeUserScramCredentialsRequest{}
1053 for _, u := range users {
1054 req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
1055 Name: u,
1056 })
1057 }
1058
1059 b, err := ca.Controller()
1060 if err != nil {
1061 return nil, err
1062 }
1063
1064 rsp, err := b.DescribeUserScramCredentials(req)
1065 if err != nil {
1066 return nil, err
1067 }
1068
1069 return rsp.Results, nil
1070}
1071
1072func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
1073 res, err := ca.AlterUserScramCredentials(upsert, nil)
1074 if err != nil {
1075 return nil, err
1076 }
1077
1078 return res, nil
1079}
1080
1081func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1082 res, err := ca.AlterUserScramCredentials(nil, delete)
1083 if err != nil {
1084 return nil, err
1085 }
1086
1087 return res, nil
1088}
1089
1090func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
1091 req := &AlterUserScramCredentialsRequest{
1092 Deletions: d,
1093 Upsertions: u,
1094 }
1095
1096 b, err := ca.Controller()
1097 if err != nil {
1098 return nil, err
1099 }
1100
1101 rsp, err := b.AlterUserScramCredentials(req)
1102 if err != nil {
1103 return nil, err
1104 }
1105
1106 return rsp.Results, nil
1107}
1108
1109// Describe All : use an empty/nil components slice + strict = false
1110// Contains components: strict = false
1111// Contains only components: strict = true
1112func (ca *clusterAdmin) DescribeClientQuotas(components []QuotaFilterComponent, strict bool) ([]DescribeClientQuotasEntry, error) {
1113 request := &DescribeClientQuotasRequest{
1114 Components: components,
1115 Strict: strict,
1116 }
1117
1118 b, err := ca.Controller()
1119 if err != nil {
1120 return nil, err
1121 }
1122
1123 rsp, err := b.DescribeClientQuotas(request)
1124 if err != nil {
1125 return nil, err
1126 }
1127
1128 if rsp.ErrorMsg != nil && len(*rsp.ErrorMsg) > 0 {
1129 return nil, errors.New(*rsp.ErrorMsg)
1130 }
1131 if rsp.ErrorCode != ErrNoError {
1132 return nil, rsp.ErrorCode
1133 }
1134
1135 return rsp.Entries, nil
1136}
1137
1138func (ca *clusterAdmin) AlterClientQuotas(entity []QuotaEntityComponent, op ClientQuotasOp, validateOnly bool) error {
1139 entry := AlterClientQuotasEntry{
1140 Entity: entity,
1141 Ops: []ClientQuotasOp{op},
1142 }
1143
1144 request := &AlterClientQuotasRequest{
1145 Entries: []AlterClientQuotasEntry{entry},
1146 ValidateOnly: validateOnly,
1147 }
1148
1149 b, err := ca.Controller()
1150 if err != nil {
1151 return err
1152 }
1153
1154 rsp, err := b.AlterClientQuotas(request)
1155 if err != nil {
1156 return err
1157 }
1158
1159 for _, entry := range rsp.Entries {
1160 if entry.ErrorMsg != nil && len(*entry.ErrorMsg) > 0 {
1161 return errors.New(*entry.ErrorMsg)
1162 }
1163 if entry.ErrorCode != ErrNoError {
1164 return entry.ErrorCode
1165 }
1166 }
1167
1168 return nil
1169}