blob: abe18b19f04f1d48952f4874c473ba1116db7648 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "errors"
khenaidoo106c61a2021-08-11 18:05:46 -04005 "fmt"
William Kurkianea869482019-04-09 15:16:11 -04006 "math/rand"
khenaidoo106c61a2021-08-11 18:05:46 -04007 "strconv"
William Kurkianea869482019-04-09 15:16:11 -04008 "sync"
khenaidoo106c61a2021-08-11 18:05:46 -04009 "time"
William Kurkianea869482019-04-09 15:16:11 -040010)
11
12// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
13// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
14// Methods with stricter requirements will specify the minimum broker version required.
15// You MUST call Close() on a client to avoid leaks
16type ClusterAdmin interface {
17 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
18 // It may take several seconds after CreateTopic returns success for all the brokers
19 // to become aware that the topic has been created. During this time, listTopics
20 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
21 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
22
23 // List the topics available in the cluster with the default options.
24 ListTopics() (map[string]TopicDetail, error)
25
Abhilash S.L3b494632019-07-16 15:51:09 +053026 // Describe some topics in the cluster.
William Kurkianea869482019-04-09 15:16:11 -040027 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
28
29 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
30 // and for all the brokers to become aware that the topics are gone.
31 // During this time, listTopics may continue to return information about the deleted topic.
32 // If delete.topic.enable is false on the brokers, deleteTopic will mark
33 // the topic for deletion, but not actually delete them.
34 // This operation is supported by brokers with version 0.10.1.0 or higher.
35 DeleteTopic(topic string) error
36
37 // Increase the number of partitions of the topics according to the corresponding values.
38 // If partitions are increased for a topic that has a key, the partition logic or ordering of
39 // the messages will be affected. It may take several seconds after this method returns
40 // success for all the brokers to become aware that the partitions have been created.
41 // During this time, ClusterAdmin#describeTopics may not return information about the
42 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
43 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
44
khenaidoo106c61a2021-08-11 18:05:46 -040045 // Alter the replica assignment for partitions.
46 // This operation is supported by brokers with version 2.4.0.0 or higher.
47 AlterPartitionReassignments(topic string, assignment [][]int32) error
48
49 // Provides info on ongoing partitions replica reassignments.
50 // This operation is supported by brokers with version 2.4.0.0 or higher.
51 ListPartitionReassignments(topics string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error)
52
William Kurkianea869482019-04-09 15:16:11 -040053 // Delete records whose offset is smaller than the given offset of the corresponding partition.
54 // This operation is supported by brokers with version 0.11.0.0 or higher.
55 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
56
57 // Get the configuration for the specified resources.
58 // The returned configuration includes default values and the Default is true
59 // can be used to distinguish them from user supplied values.
60 // Config entries where ReadOnly is true cannot be updated.
61 // The value of config entries where Sensitive is true is always nil so
62 // sensitive information is not disclosed.
63 // This operation is supported by brokers with version 0.11.0.0 or higher.
64 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
65
66 // Update the configuration for the specified resources with the default options.
67 // This operation is supported by brokers with version 0.11.0.0 or higher.
68 // The resources with their configs (topic is the only resource type with configs
69 // that can be updated currently Updates are not transactional so they may succeed
70 // for some resources while fail for others. The configs for a particular resource are updated automatically.
71 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
72
73 // Creates access control lists (ACLs) which are bound to specific resources.
74 // This operation is not transactional so it may succeed for some ACLs while fail for others.
75 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
76 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
77 CreateACL(resource Resource, acl Acl) error
78
79 // Lists access control lists (ACLs) according to the supplied filter.
80 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
81 // This operation is supported by brokers with version 0.11.0.0 or higher.
82 ListAcls(filter AclFilter) ([]ResourceAcls, error)
83
84 // Deletes access control lists (ACLs) according to the supplied filters.
85 // This operation is not transactional so it may succeed for some ACLs while fail for others.
86 // This operation is supported by brokers with version 0.11.0.0 or higher.
87 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
88
89 // List the consumer groups available in the cluster.
90 ListConsumerGroups() (map[string]string, error)
91
Abhilash S.L3b494632019-07-16 15:51:09 +053092 // Describe the given consumer groups.
William Kurkianea869482019-04-09 15:16:11 -040093 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
94
95 // List the consumer group offsets available in the cluster.
96 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
97
Abhilash S.L3b494632019-07-16 15:51:09 +053098 // Delete a consumer group.
99 DeleteConsumerGroup(group string) error
100
William Kurkianea869482019-04-09 15:16:11 -0400101 // Get information about the nodes in the cluster
102 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
103
khenaidoo106c61a2021-08-11 18:05:46 -0400104 // Get information about all log directories on the given set of brokers
105 DescribeLogDirs(brokers []int32) (map[int32][]DescribeLogDirsResponseDirMetadata, error)
106
107 // Get information about SCRAM users
108 DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error)
109
110 // Delete SCRAM users
111 DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error)
112
113 // Upsert SCRAM users
114 UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error)
115
William Kurkianea869482019-04-09 15:16:11 -0400116 // Close shuts down the admin and closes underlying client.
117 Close() error
118}
119
120type clusterAdmin struct {
121 client Client
122 conf *Config
123}
124
125// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
126func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
127 client, err := NewClient(addrs, conf)
128 if err != nil {
129 return nil, err
130 }
khenaidoo106c61a2021-08-11 18:05:46 -0400131 return NewClusterAdminFromClient(client)
132}
William Kurkianea869482019-04-09 15:16:11 -0400133
khenaidoo106c61a2021-08-11 18:05:46 -0400134// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
135// Note that underlying client will also be closed on admin's Close() call.
136func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
137 // make sure we can retrieve the controller
138 _, err := client.Controller()
William Kurkianea869482019-04-09 15:16:11 -0400139 if err != nil {
140 return nil, err
141 }
142
143 ca := &clusterAdmin{
144 client: client,
145 conf: client.Config(),
146 }
147 return ca, nil
148}
149
150func (ca *clusterAdmin) Close() error {
151 return ca.client.Close()
152}
153
154func (ca *clusterAdmin) Controller() (*Broker, error) {
155 return ca.client.Controller()
156}
157
khenaidoo106c61a2021-08-11 18:05:46 -0400158func (ca *clusterAdmin) refreshController() (*Broker, error) {
159 return ca.client.RefreshController()
160}
William Kurkianea869482019-04-09 15:16:11 -0400161
khenaidoo106c61a2021-08-11 18:05:46 -0400162// isErrNoController returns `true` if the given error type unwraps to an
163// `ErrNotController` response from Kafka
164func isErrNoController(err error) bool {
165 switch e := err.(type) {
166 case *TopicError:
167 return e.Err == ErrNotController
168 case *TopicPartitionError:
169 return e.Err == ErrNotController
170 case KError:
171 return e == ErrNotController
172 }
173 return false
174}
175
176// retryOnError will repeatedly call the given (error-returning) func in the
177// case that its response is non-nil and retryable (as determined by the
178// provided retryable func) up to the maximum number of tries permitted by
179// the admin client configuration
180func (ca *clusterAdmin) retryOnError(retryable func(error) bool, fn func() error) error {
181 var err error
182 for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
183 err = fn()
184 if err == nil || !retryable(err) {
185 return err
186 }
187 Logger.Printf(
188 "admin/request retrying after %dms... (%d attempts remaining)\n",
189 ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
190 time.Sleep(ca.conf.Admin.Retry.Backoff)
191 continue
192 }
193 return err
194}
195
196func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
William Kurkianea869482019-04-09 15:16:11 -0400197 if topic == "" {
198 return ErrInvalidTopic
199 }
200
201 if detail == nil {
Abhilash S.L3b494632019-07-16 15:51:09 +0530202 return errors.New("you must specify topic details")
William Kurkianea869482019-04-09 15:16:11 -0400203 }
204
205 topicDetails := make(map[string]*TopicDetail)
206 topicDetails[topic] = detail
207
208 request := &CreateTopicsRequest{
209 TopicDetails: topicDetails,
210 ValidateOnly: validateOnly,
211 Timeout: ca.conf.Admin.Timeout,
212 }
213
214 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
215 request.Version = 1
216 }
217 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
218 request.Version = 2
219 }
220
khenaidoo106c61a2021-08-11 18:05:46 -0400221 return ca.retryOnError(isErrNoController, func() error {
222 b, err := ca.Controller()
223 if err != nil {
224 return err
225 }
William Kurkianea869482019-04-09 15:16:11 -0400226
khenaidoo106c61a2021-08-11 18:05:46 -0400227 rsp, err := b.CreateTopics(request)
228 if err != nil {
229 return err
230 }
William Kurkianea869482019-04-09 15:16:11 -0400231
khenaidoo106c61a2021-08-11 18:05:46 -0400232 topicErr, ok := rsp.TopicErrors[topic]
233 if !ok {
234 return ErrIncompleteResponse
235 }
William Kurkianea869482019-04-09 15:16:11 -0400236
khenaidoo106c61a2021-08-11 18:05:46 -0400237 if topicErr.Err != ErrNoError {
238 if topicErr.Err == ErrNotController {
239 _, _ = ca.refreshController()
240 }
241 return topicErr
242 }
William Kurkianea869482019-04-09 15:16:11 -0400243
khenaidoo106c61a2021-08-11 18:05:46 -0400244 return nil
245 })
William Kurkianea869482019-04-09 15:16:11 -0400246}
247
248func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
249 controller, err := ca.Controller()
250 if err != nil {
251 return nil, err
252 }
253
254 request := &MetadataRequest{
255 Topics: topics,
256 AllowAutoTopicCreation: false,
257 }
258
Abhilash S.L3b494632019-07-16 15:51:09 +0530259 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
260 request.Version = 5
261 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
William Kurkianea869482019-04-09 15:16:11 -0400262 request.Version = 4
263 }
264
265 response, err := controller.GetMetadata(request)
266 if err != nil {
267 return nil, err
268 }
269 return response.Topics, nil
270}
271
272func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
273 controller, err := ca.Controller()
274 if err != nil {
275 return nil, int32(0), err
276 }
277
278 request := &MetadataRequest{
279 Topics: []string{},
280 }
281
khenaidoo106c61a2021-08-11 18:05:46 -0400282 if ca.conf.Version.IsAtLeast(V0_10_0_0) {
283 request.Version = 1
284 }
285
William Kurkianea869482019-04-09 15:16:11 -0400286 response, err := controller.GetMetadata(request)
287 if err != nil {
288 return nil, int32(0), err
289 }
290
291 return response.Brokers, response.ControllerID, nil
292}
293
khenaidoo106c61a2021-08-11 18:05:46 -0400294func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
295 brokers := ca.client.Brokers()
296 for _, b := range brokers {
297 if b.ID() == id {
298 return b, nil
299 }
300 }
301 return nil, fmt.Errorf("could not find broker id %d", id)
302}
303
William Kurkianea869482019-04-09 15:16:11 -0400304func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
305 brokers := ca.client.Brokers()
306 if len(brokers) > 0 {
307 index := rand.Intn(len(brokers))
308 return brokers[index], nil
309 }
310 return nil, errors.New("no available broker")
311}
312
313func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
314 // In order to build TopicDetails we need to first get the list of all
315 // topics using a MetadataRequest and then get their configs using a
316 // DescribeConfigsRequest request. To avoid sending many requests to the
317 // broker, we use a single DescribeConfigsRequest.
318
319 // Send the all-topic MetadataRequest
320 b, err := ca.findAnyBroker()
321 if err != nil {
322 return nil, err
323 }
324 _ = b.Open(ca.client.Config())
325
326 metadataReq := &MetadataRequest{}
327 metadataResp, err := b.GetMetadata(metadataReq)
328 if err != nil {
329 return nil, err
330 }
331
332 topicsDetailsMap := make(map[string]TopicDetail)
333
334 var describeConfigsResources []*ConfigResource
335
336 for _, topic := range metadataResp.Topics {
337 topicDetails := TopicDetail{
338 NumPartitions: int32(len(topic.Partitions)),
339 }
340 if len(topic.Partitions) > 0 {
341 topicDetails.ReplicaAssignment = map[int32][]int32{}
342 for _, partition := range topic.Partitions {
343 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
344 }
345 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
346 }
347 topicsDetailsMap[topic.Name] = topicDetails
348
349 // we populate the resources we want to describe from the MetadataResponse
350 topicResource := ConfigResource{
351 Type: TopicResource,
352 Name: topic.Name,
353 }
354 describeConfigsResources = append(describeConfigsResources, &topicResource)
355 }
356
357 // Send the DescribeConfigsRequest
358 describeConfigsReq := &DescribeConfigsRequest{
359 Resources: describeConfigsResources,
360 }
khenaidoo106c61a2021-08-11 18:05:46 -0400361
362 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
363 describeConfigsReq.Version = 1
364 }
365
366 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
367 describeConfigsReq.Version = 2
368 }
369
William Kurkianea869482019-04-09 15:16:11 -0400370 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
371 if err != nil {
372 return nil, err
373 }
374
375 for _, resource := range describeConfigsResp.Resources {
376 topicDetails := topicsDetailsMap[resource.Name]
377 topicDetails.ConfigEntries = make(map[string]*string)
378
379 for _, entry := range resource.Configs {
380 // only include non-default non-sensitive config
381 // (don't actually think topic config will ever be sensitive)
382 if entry.Default || entry.Sensitive {
383 continue
384 }
385 topicDetails.ConfigEntries[entry.Name] = &entry.Value
386 }
387
388 topicsDetailsMap[resource.Name] = topicDetails
389 }
390
391 return topicsDetailsMap, nil
392}
393
394func (ca *clusterAdmin) DeleteTopic(topic string) error {
William Kurkianea869482019-04-09 15:16:11 -0400395 if topic == "" {
396 return ErrInvalidTopic
397 }
398
399 request := &DeleteTopicsRequest{
400 Topics: []string{topic},
401 Timeout: ca.conf.Admin.Timeout,
402 }
403
404 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
405 request.Version = 1
406 }
407
khenaidoo106c61a2021-08-11 18:05:46 -0400408 return ca.retryOnError(isErrNoController, func() error {
409 b, err := ca.Controller()
410 if err != nil {
411 return err
412 }
William Kurkianea869482019-04-09 15:16:11 -0400413
khenaidoo106c61a2021-08-11 18:05:46 -0400414 rsp, err := b.DeleteTopics(request)
415 if err != nil {
416 return err
417 }
William Kurkianea869482019-04-09 15:16:11 -0400418
khenaidoo106c61a2021-08-11 18:05:46 -0400419 topicErr, ok := rsp.TopicErrorCodes[topic]
420 if !ok {
421 return ErrIncompleteResponse
422 }
William Kurkianea869482019-04-09 15:16:11 -0400423
khenaidoo106c61a2021-08-11 18:05:46 -0400424 if topicErr != ErrNoError {
425 if topicErr == ErrNotController {
426 _, _ = ca.refreshController()
427 }
428 return topicErr
429 }
430
431 return nil
432 })
William Kurkianea869482019-04-09 15:16:11 -0400433}
434
435func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
436 if topic == "" {
437 return ErrInvalidTopic
438 }
439
440 topicPartitions := make(map[string]*TopicPartition)
441 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
442
443 request := &CreatePartitionsRequest{
444 TopicPartitions: topicPartitions,
445 Timeout: ca.conf.Admin.Timeout,
446 }
447
khenaidoo106c61a2021-08-11 18:05:46 -0400448 return ca.retryOnError(isErrNoController, func() error {
449 b, err := ca.Controller()
450 if err != nil {
451 return err
452 }
453
454 rsp, err := b.CreatePartitions(request)
455 if err != nil {
456 return err
457 }
458
459 topicErr, ok := rsp.TopicPartitionErrors[topic]
460 if !ok {
461 return ErrIncompleteResponse
462 }
463
464 if topicErr.Err != ErrNoError {
465 if topicErr.Err == ErrNotController {
466 _, _ = ca.refreshController()
467 }
468 return topicErr
469 }
470
471 return nil
472 })
473}
474
475func (ca *clusterAdmin) AlterPartitionReassignments(topic string, assignment [][]int32) error {
476 if topic == "" {
477 return ErrInvalidTopic
478 }
479
480 request := &AlterPartitionReassignmentsRequest{
481 TimeoutMs: int32(60000),
482 Version: int16(0),
483 }
484
485 for i := 0; i < len(assignment); i++ {
486 request.AddBlock(topic, int32(i), assignment[i])
487 }
488
489 return ca.retryOnError(isErrNoController, func() error {
490 b, err := ca.Controller()
491 if err != nil {
492 return err
493 }
494
495 errs := make([]error, 0)
496
497 rsp, err := b.AlterPartitionReassignments(request)
498
499 if err != nil {
500 errs = append(errs, err)
501 } else {
502 if rsp.ErrorCode > 0 {
503 errs = append(errs, errors.New(rsp.ErrorCode.Error()))
504 }
505
506 for topic, topicErrors := range rsp.Errors {
507 for partition, partitionError := range topicErrors {
508 if partitionError.errorCode != ErrNoError {
509 errStr := fmt.Sprintf("[%s-%d]: %s", topic, partition, partitionError.errorCode.Error())
510 errs = append(errs, errors.New(errStr))
511 }
512 }
513 }
514 }
515
516 if len(errs) > 0 {
517 return ErrReassignPartitions{MultiError{&errs}}
518 }
519
520 return nil
521 })
522}
523
524func (ca *clusterAdmin) ListPartitionReassignments(topic string, partitions []int32) (topicStatus map[string]map[int32]*PartitionReplicaReassignmentsStatus, err error) {
525 if topic == "" {
526 return nil, ErrInvalidTopic
527 }
528
529 request := &ListPartitionReassignmentsRequest{
530 TimeoutMs: int32(60000),
531 Version: int16(0),
532 }
533
534 request.AddBlock(topic, partitions)
535
William Kurkianea869482019-04-09 15:16:11 -0400536 b, err := ca.Controller()
537 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400538 return nil, err
William Kurkianea869482019-04-09 15:16:11 -0400539 }
khenaidoo106c61a2021-08-11 18:05:46 -0400540 _ = b.Open(ca.client.Config())
William Kurkianea869482019-04-09 15:16:11 -0400541
khenaidoo106c61a2021-08-11 18:05:46 -0400542 rsp, err := b.ListPartitionReassignments(request)
543
544 if err == nil && rsp != nil {
545 return rsp.TopicStatus, nil
546 } else {
547 return nil, err
William Kurkianea869482019-04-09 15:16:11 -0400548 }
William Kurkianea869482019-04-09 15:16:11 -0400549}
550
551func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
William Kurkianea869482019-04-09 15:16:11 -0400552 if topic == "" {
553 return ErrInvalidTopic
554 }
David Bainbridge788e5202019-10-21 18:49:40 +0000555 partitionPerBroker := make(map[*Broker][]int32)
556 for partition := range partitionOffsets {
557 broker, err := ca.client.Leader(topic, partition)
558 if err != nil {
559 return err
560 }
khenaidoo106c61a2021-08-11 18:05:46 -0400561 partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
William Kurkianea869482019-04-09 15:16:11 -0400562 }
David Bainbridge788e5202019-10-21 18:49:40 +0000563 errs := make([]error, 0)
564 for broker, partitions := range partitionPerBroker {
565 topics := make(map[string]*DeleteRecordsRequestTopic)
566 recordsToDelete := make(map[int32]int64)
567 for _, p := range partitions {
568 recordsToDelete[p] = partitionOffsets[p]
569 }
570 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
571 request := &DeleteRecordsRequest{
572 Topics: topics,
573 Timeout: ca.conf.Admin.Timeout,
574 }
William Kurkianea869482019-04-09 15:16:11 -0400575
David Bainbridge788e5202019-10-21 18:49:40 +0000576 rsp, err := broker.DeleteRecords(request)
577 if err != nil {
578 errs = append(errs, err)
579 } else {
580 deleteRecordsResponseTopic, ok := rsp.Topics[topic]
581 if !ok {
582 errs = append(errs, ErrIncompleteResponse)
583 } else {
584 for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
585 if deleteRecordsResponsePartition.Err != ErrNoError {
586 errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
587 }
588 }
589 }
590 }
William Kurkianea869482019-04-09 15:16:11 -0400591 }
David Bainbridge788e5202019-10-21 18:49:40 +0000592 if len(errs) > 0 {
593 return ErrDeleteRecords{MultiError{&errs}}
William Kurkianea869482019-04-09 15:16:11 -0400594 }
khenaidoo106c61a2021-08-11 18:05:46 -0400595 // todo since we are dealing with couple of partitions it would be good if we return slice of errors
596 // for each partition instead of one error
William Kurkianea869482019-04-09 15:16:11 -0400597 return nil
598}
599
khenaidoo106c61a2021-08-11 18:05:46 -0400600// Returns a bool indicating whether the resource request needs to go to a
601// specific broker
602func dependsOnSpecificNode(resource ConfigResource) bool {
603 return (resource.Type == BrokerResource && resource.Name != "") ||
604 resource.Type == BrokerLoggerResource
605}
William Kurkianea869482019-04-09 15:16:11 -0400606
khenaidoo106c61a2021-08-11 18:05:46 -0400607func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
William Kurkianea869482019-04-09 15:16:11 -0400608 var entries []ConfigEntry
609 var resources []*ConfigResource
610 resources = append(resources, &resource)
611
612 request := &DescribeConfigsRequest{
613 Resources: resources,
614 }
615
khenaidoo106c61a2021-08-11 18:05:46 -0400616 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
617 request.Version = 1
618 }
619
620 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
621 request.Version = 2
622 }
623
624 var (
625 b *Broker
626 err error
627 )
628
629 // DescribeConfig of broker/broker logger must be sent to the broker in question
630 if dependsOnSpecificNode(resource) {
631 var id int64
632 id, err = strconv.ParseInt(resource.Name, 10, 32)
633 if err != nil {
634 return nil, err
635 }
636 b, err = ca.findBroker(int32(id))
637 } else {
638 b, err = ca.findAnyBroker()
639 }
William Kurkianea869482019-04-09 15:16:11 -0400640 if err != nil {
641 return nil, err
642 }
643
khenaidoo106c61a2021-08-11 18:05:46 -0400644 _ = b.Open(ca.client.Config())
William Kurkianea869482019-04-09 15:16:11 -0400645 rsp, err := b.DescribeConfigs(request)
646 if err != nil {
647 return nil, err
648 }
649
650 for _, rspResource := range rsp.Resources {
651 if rspResource.Name == resource.Name {
652 if rspResource.ErrorMsg != "" {
653 return nil, errors.New(rspResource.ErrorMsg)
654 }
khenaidoo106c61a2021-08-11 18:05:46 -0400655 if rspResource.ErrorCode != 0 {
656 return nil, KError(rspResource.ErrorCode)
657 }
William Kurkianea869482019-04-09 15:16:11 -0400658 for _, cfgEntry := range rspResource.Configs {
659 entries = append(entries, *cfgEntry)
660 }
661 }
662 }
663 return entries, nil
664}
665
666func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
William Kurkianea869482019-04-09 15:16:11 -0400667 var resources []*AlterConfigsResource
668 resources = append(resources, &AlterConfigsResource{
669 Type: resourceType,
670 Name: name,
671 ConfigEntries: entries,
672 })
673
674 request := &AlterConfigsRequest{
675 Resources: resources,
676 ValidateOnly: validateOnly,
677 }
678
khenaidoo106c61a2021-08-11 18:05:46 -0400679 var (
680 b *Broker
681 err error
682 )
683
684 // AlterConfig of broker/broker logger must be sent to the broker in question
685 if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
686 var id int64
687 id, err = strconv.ParseInt(name, 10, 32)
688 if err != nil {
689 return err
690 }
691 b, err = ca.findBroker(int32(id))
692 } else {
693 b, err = ca.findAnyBroker()
694 }
William Kurkianea869482019-04-09 15:16:11 -0400695 if err != nil {
696 return err
697 }
698
khenaidoo106c61a2021-08-11 18:05:46 -0400699 _ = b.Open(ca.client.Config())
William Kurkianea869482019-04-09 15:16:11 -0400700 rsp, err := b.AlterConfigs(request)
701 if err != nil {
702 return err
703 }
704
705 for _, rspResource := range rsp.Resources {
706 if rspResource.Name == name {
707 if rspResource.ErrorMsg != "" {
708 return errors.New(rspResource.ErrorMsg)
709 }
khenaidoo106c61a2021-08-11 18:05:46 -0400710 if rspResource.ErrorCode != 0 {
711 return KError(rspResource.ErrorCode)
712 }
William Kurkianea869482019-04-09 15:16:11 -0400713 }
714 }
715 return nil
716}
717
718func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
719 var acls []*AclCreation
720 acls = append(acls, &AclCreation{resource, acl})
721 request := &CreateAclsRequest{AclCreations: acls}
722
khenaidoo106c61a2021-08-11 18:05:46 -0400723 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
724 request.Version = 1
725 }
726
William Kurkianea869482019-04-09 15:16:11 -0400727 b, err := ca.Controller()
728 if err != nil {
729 return err
730 }
731
732 _, err = b.CreateAcls(request)
733 return err
734}
735
736func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
William Kurkianea869482019-04-09 15:16:11 -0400737 request := &DescribeAclsRequest{AclFilter: filter}
738
khenaidoo106c61a2021-08-11 18:05:46 -0400739 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
740 request.Version = 1
741 }
742
William Kurkianea869482019-04-09 15:16:11 -0400743 b, err := ca.Controller()
744 if err != nil {
745 return nil, err
746 }
747
748 rsp, err := b.DescribeAcls(request)
749 if err != nil {
750 return nil, err
751 }
752
753 var lAcls []ResourceAcls
754 for _, rAcl := range rsp.ResourceAcls {
755 lAcls = append(lAcls, *rAcl)
756 }
757 return lAcls, nil
758}
759
760func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
761 var filters []*AclFilter
762 filters = append(filters, &filter)
763 request := &DeleteAclsRequest{Filters: filters}
764
khenaidoo106c61a2021-08-11 18:05:46 -0400765 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
766 request.Version = 1
767 }
768
William Kurkianea869482019-04-09 15:16:11 -0400769 b, err := ca.Controller()
770 if err != nil {
771 return nil, err
772 }
773
774 rsp, err := b.DeleteAcls(request)
775 if err != nil {
776 return nil, err
777 }
778
779 var mAcls []MatchingAcl
780 for _, fr := range rsp.FilterResponses {
781 for _, mACL := range fr.MatchingAcls {
782 mAcls = append(mAcls, *mACL)
783 }
William Kurkianea869482019-04-09 15:16:11 -0400784 }
785 return mAcls, nil
786}
787
788func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
789 groupsPerBroker := make(map[*Broker][]string)
790
791 for _, group := range groups {
792 controller, err := ca.client.Coordinator(group)
793 if err != nil {
794 return nil, err
795 }
796 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
William Kurkianea869482019-04-09 15:16:11 -0400797 }
798
799 for broker, brokerGroups := range groupsPerBroker {
800 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
801 Groups: brokerGroups,
802 })
803 if err != nil {
804 return nil, err
805 }
806
807 result = append(result, response.Groups...)
808 }
809 return result, nil
810}
811
812func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
813 allGroups = make(map[string]string)
814
815 // Query brokers in parallel, since we have to query *all* brokers
816 brokers := ca.client.Brokers()
817 groupMaps := make(chan map[string]string, len(brokers))
khenaidoo106c61a2021-08-11 18:05:46 -0400818 errChan := make(chan error, len(brokers))
William Kurkianea869482019-04-09 15:16:11 -0400819 wg := sync.WaitGroup{}
820
821 for _, b := range brokers {
822 wg.Add(1)
823 go func(b *Broker, conf *Config) {
824 defer wg.Done()
825 _ = b.Open(conf) // Ensure that broker is opened
826
827 response, err := b.ListGroups(&ListGroupsRequest{})
828 if err != nil {
khenaidoo106c61a2021-08-11 18:05:46 -0400829 errChan <- err
William Kurkianea869482019-04-09 15:16:11 -0400830 return
831 }
832
833 groups := make(map[string]string)
834 for group, typ := range response.Groups {
835 groups[group] = typ
836 }
837
838 groupMaps <- groups
William Kurkianea869482019-04-09 15:16:11 -0400839 }(b, ca.conf)
840 }
841
842 wg.Wait()
843 close(groupMaps)
khenaidoo106c61a2021-08-11 18:05:46 -0400844 close(errChan)
William Kurkianea869482019-04-09 15:16:11 -0400845
846 for groupMap := range groupMaps {
847 for group, protocolType := range groupMap {
848 allGroups[group] = protocolType
849 }
850 }
851
852 // Intentionally return only the first error for simplicity
khenaidoo106c61a2021-08-11 18:05:46 -0400853 err = <-errChan
William Kurkianea869482019-04-09 15:16:11 -0400854 return
855}
856
857func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
858 coordinator, err := ca.client.Coordinator(group)
859 if err != nil {
860 return nil, err
861 }
862
863 request := &OffsetFetchRequest{
864 ConsumerGroup: group,
865 partitions: topicPartitions,
866 }
867
Abhilash S.L3b494632019-07-16 15:51:09 +0530868 if ca.conf.Version.IsAtLeast(V0_10_2_0) {
869 request.Version = 2
870 } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
William Kurkianea869482019-04-09 15:16:11 -0400871 request.Version = 1
872 }
873
874 return coordinator.FetchOffset(request)
875}
Abhilash S.L3b494632019-07-16 15:51:09 +0530876
877func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
878 coordinator, err := ca.client.Coordinator(group)
879 if err != nil {
880 return err
881 }
882
883 request := &DeleteGroupsRequest{
884 Groups: []string{group},
885 }
886
887 resp, err := coordinator.DeleteGroups(request)
888 if err != nil {
889 return err
890 }
891
892 groupErr, ok := resp.GroupErrorCodes[group]
893 if !ok {
894 return ErrIncompleteResponse
895 }
896
897 if groupErr != ErrNoError {
898 return groupErr
899 }
900
901 return nil
902}
khenaidoo106c61a2021-08-11 18:05:46 -0400903
904func (ca *clusterAdmin) DescribeLogDirs(brokerIds []int32) (allLogDirs map[int32][]DescribeLogDirsResponseDirMetadata, err error) {
905 allLogDirs = make(map[int32][]DescribeLogDirsResponseDirMetadata)
906
907 // Query brokers in parallel, since we may have to query multiple brokers
908 logDirsMaps := make(chan map[int32][]DescribeLogDirsResponseDirMetadata, len(brokerIds))
909 errChan := make(chan error, len(brokerIds))
910 wg := sync.WaitGroup{}
911
912 for _, b := range brokerIds {
913 wg.Add(1)
914 broker, err := ca.findBroker(b)
915 if err != nil {
916 Logger.Printf("Unable to find broker with ID = %v\n", b)
917 continue
918 }
919 go func(b *Broker, conf *Config) {
920 defer wg.Done()
921 _ = b.Open(conf) // Ensure that broker is opened
922
923 response, err := b.DescribeLogDirs(&DescribeLogDirsRequest{})
924 if err != nil {
925 errChan <- err
926 return
927 }
928 logDirs := make(map[int32][]DescribeLogDirsResponseDirMetadata)
929 logDirs[b.ID()] = response.LogDirs
930 logDirsMaps <- logDirs
931 }(broker, ca.conf)
932 }
933
934 wg.Wait()
935 close(logDirsMaps)
936 close(errChan)
937
938 for logDirsMap := range logDirsMaps {
939 for id, logDirs := range logDirsMap {
940 allLogDirs[id] = logDirs
941 }
942 }
943
944 // Intentionally return only the first error for simplicity
945 err = <-errChan
946 return
947}
948
949func (ca *clusterAdmin) DescribeUserScramCredentials(users []string) ([]*DescribeUserScramCredentialsResult, error) {
950 req := &DescribeUserScramCredentialsRequest{}
951 for _, u := range users {
952 req.DescribeUsers = append(req.DescribeUsers, DescribeUserScramCredentialsRequestUser{
953 Name: u,
954 })
955 }
956
957 b, err := ca.Controller()
958 if err != nil {
959 return nil, err
960 }
961
962 rsp, err := b.DescribeUserScramCredentials(req)
963 if err != nil {
964 return nil, err
965 }
966
967 return rsp.Results, nil
968}
969
970func (ca *clusterAdmin) UpsertUserScramCredentials(upsert []AlterUserScramCredentialsUpsert) ([]*AlterUserScramCredentialsResult, error) {
971 res, err := ca.AlterUserScramCredentials(upsert, nil)
972 if err != nil {
973 return nil, err
974 }
975
976 return res, nil
977}
978
979func (ca *clusterAdmin) DeleteUserScramCredentials(delete []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
980 res, err := ca.AlterUserScramCredentials(nil, delete)
981 if err != nil {
982 return nil, err
983 }
984
985 return res, nil
986}
987
988func (ca *clusterAdmin) AlterUserScramCredentials(u []AlterUserScramCredentialsUpsert, d []AlterUserScramCredentialsDelete) ([]*AlterUserScramCredentialsResult, error) {
989 req := &AlterUserScramCredentialsRequest{
990 Deletions: d,
991 Upsertions: u,
992 }
993
994 b, err := ca.Controller()
995 if err != nil {
996 return nil, err
997 }
998
999 rsp, err := b.AlterUserScramCredentials(req)
1000 if err != nil {
1001 return nil, err
1002 }
1003
1004 return rsp.Results, nil
1005}