blob: 6c9b1e9e7311b74916173d55a198ed0428e51bbe [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001package sarama
2
3import (
4 "errors"
5 "math/rand"
6 "sync"
7)
8
9// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
10// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
11// Methods with stricter requirements will specify the minimum broker version required.
12// You MUST call Close() on a client to avoid leaks
13type ClusterAdmin interface {
14 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
15 // It may take several seconds after CreateTopic returns success for all the brokers
16 // to become aware that the topic has been created. During this time, listTopics
17 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
18 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
19
20 // List the topics available in the cluster with the default options.
21 ListTopics() (map[string]TopicDetail, error)
22
23 // Describe some topics in the cluster.
24 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
25
26 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
27 // and for all the brokers to become aware that the topics are gone.
28 // During this time, listTopics may continue to return information about the deleted topic.
29 // If delete.topic.enable is false on the brokers, deleteTopic will mark
30 // the topic for deletion, but not actually delete them.
31 // This operation is supported by brokers with version 0.10.1.0 or higher.
32 DeleteTopic(topic string) error
33
34 // Increase the number of partitions of the topics according to the corresponding values.
35 // If partitions are increased for a topic that has a key, the partition logic or ordering of
36 // the messages will be affected. It may take several seconds after this method returns
37 // success for all the brokers to become aware that the partitions have been created.
38 // During this time, ClusterAdmin#describeTopics may not return information about the
39 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
40 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
41
42 // Delete records whose offset is smaller than the given offset of the corresponding partition.
43 // This operation is supported by brokers with version 0.11.0.0 or higher.
44 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
45
46 // Get the configuration for the specified resources.
47 // The returned configuration includes default values and the Default is true
48 // can be used to distinguish them from user supplied values.
49 // Config entries where ReadOnly is true cannot be updated.
50 // The value of config entries where Sensitive is true is always nil so
51 // sensitive information is not disclosed.
52 // This operation is supported by brokers with version 0.11.0.0 or higher.
53 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
54
55 // Update the configuration for the specified resources with the default options.
56 // This operation is supported by brokers with version 0.11.0.0 or higher.
57 // The resources with their configs (topic is the only resource type with configs
58 // that can be updated currently Updates are not transactional so they may succeed
59 // for some resources while fail for others. The configs for a particular resource are updated automatically.
60 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
61
62 // Creates access control lists (ACLs) which are bound to specific resources.
63 // This operation is not transactional so it may succeed for some ACLs while fail for others.
64 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
65 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
66 CreateACL(resource Resource, acl Acl) error
67
68 // Lists access control lists (ACLs) according to the supplied filter.
69 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
70 // This operation is supported by brokers with version 0.11.0.0 or higher.
71 ListAcls(filter AclFilter) ([]ResourceAcls, error)
72
73 // Deletes access control lists (ACLs) according to the supplied filters.
74 // This operation is not transactional so it may succeed for some ACLs while fail for others.
75 // This operation is supported by brokers with version 0.11.0.0 or higher.
76 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
77
78 // List the consumer groups available in the cluster.
79 ListConsumerGroups() (map[string]string, error)
80
81 // Describe the given consumer groups.
82 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
83
84 // List the consumer group offsets available in the cluster.
85 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
86
87 // Delete a consumer group.
88 DeleteConsumerGroup(group string) error
89
90 // Get information about the nodes in the cluster
91 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
92
93 // Close shuts down the admin and closes underlying client.
94 Close() error
95}
96
97type clusterAdmin struct {
98 client Client
99 conf *Config
100}
101
102// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
103func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
104 client, err := NewClient(addrs, conf)
105 if err != nil {
106 return nil, err
107 }
108 return NewClusterAdminFromClient(client)
109}
110
111// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
112// Note that underlying client will also be closed on admin's Close() call.
113func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
114 //make sure we can retrieve the controller
115 _, err := client.Controller()
116 if err != nil {
117 return nil, err
118 }
119
120 ca := &clusterAdmin{
121 client: client,
122 conf: client.Config(),
123 }
124 return ca, nil
125}
126
127func (ca *clusterAdmin) Close() error {
128 return ca.client.Close()
129}
130
131func (ca *clusterAdmin) Controller() (*Broker, error) {
132 return ca.client.Controller()
133}
134
135func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
136
137 if topic == "" {
138 return ErrInvalidTopic
139 }
140
141 if detail == nil {
142 return errors.New("you must specify topic details")
143 }
144
145 topicDetails := make(map[string]*TopicDetail)
146 topicDetails[topic] = detail
147
148 request := &CreateTopicsRequest{
149 TopicDetails: topicDetails,
150 ValidateOnly: validateOnly,
151 Timeout: ca.conf.Admin.Timeout,
152 }
153
154 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
155 request.Version = 1
156 }
157 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
158 request.Version = 2
159 }
160
161 b, err := ca.Controller()
162 if err != nil {
163 return err
164 }
165
166 rsp, err := b.CreateTopics(request)
167 if err != nil {
168 return err
169 }
170
171 topicErr, ok := rsp.TopicErrors[topic]
172 if !ok {
173 return ErrIncompleteResponse
174 }
175
176 if topicErr.Err != ErrNoError {
177 return topicErr
178 }
179
180 return nil
181}
182
183func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
184 controller, err := ca.Controller()
185 if err != nil {
186 return nil, err
187 }
188
189 request := &MetadataRequest{
190 Topics: topics,
191 AllowAutoTopicCreation: false,
192 }
193
194 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
195 request.Version = 5
196 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
197 request.Version = 4
198 }
199
200 response, err := controller.GetMetadata(request)
201 if err != nil {
202 return nil, err
203 }
204 return response.Topics, nil
205}
206
207func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
208 controller, err := ca.Controller()
209 if err != nil {
210 return nil, int32(0), err
211 }
212
213 request := &MetadataRequest{
214 Topics: []string{},
215 }
216
217 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
218 request.Version = 1
219 }
220
221 response, err := controller.GetMetadata(request)
222 if err != nil {
223 return nil, int32(0), err
224 }
225
226 return response.Brokers, response.ControllerID, nil
227}
228
229func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
230 brokers := ca.client.Brokers()
231 if len(brokers) > 0 {
232 index := rand.Intn(len(brokers))
233 return brokers[index], nil
234 }
235 return nil, errors.New("no available broker")
236}
237
238func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
239 // In order to build TopicDetails we need to first get the list of all
240 // topics using a MetadataRequest and then get their configs using a
241 // DescribeConfigsRequest request. To avoid sending many requests to the
242 // broker, we use a single DescribeConfigsRequest.
243
244 // Send the all-topic MetadataRequest
245 b, err := ca.findAnyBroker()
246 if err != nil {
247 return nil, err
248 }
249 _ = b.Open(ca.client.Config())
250
251 metadataReq := &MetadataRequest{}
252 metadataResp, err := b.GetMetadata(metadataReq)
253 if err != nil {
254 return nil, err
255 }
256
257 topicsDetailsMap := make(map[string]TopicDetail)
258
259 var describeConfigsResources []*ConfigResource
260
261 for _, topic := range metadataResp.Topics {
262 topicDetails := TopicDetail{
263 NumPartitions: int32(len(topic.Partitions)),
264 }
265 if len(topic.Partitions) > 0 {
266 topicDetails.ReplicaAssignment = map[int32][]int32{}
267 for _, partition := range topic.Partitions {
268 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
269 }
270 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
271 }
272 topicsDetailsMap[topic.Name] = topicDetails
273
274 // we populate the resources we want to describe from the MetadataResponse
275 topicResource := ConfigResource{
276 Type: TopicResource,
277 Name: topic.Name,
278 }
279 describeConfigsResources = append(describeConfigsResources, &topicResource)
280 }
281
282 // Send the DescribeConfigsRequest
283 describeConfigsReq := &DescribeConfigsRequest{
284 Resources: describeConfigsResources,
285 }
286 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
287 if err != nil {
288 return nil, err
289 }
290
291 for _, resource := range describeConfigsResp.Resources {
292 topicDetails := topicsDetailsMap[resource.Name]
293 topicDetails.ConfigEntries = make(map[string]*string)
294
295 for _, entry := range resource.Configs {
296 // only include non-default non-sensitive config
297 // (don't actually think topic config will ever be sensitive)
298 if entry.Default || entry.Sensitive {
299 continue
300 }
301 topicDetails.ConfigEntries[entry.Name] = &entry.Value
302 }
303
304 topicsDetailsMap[resource.Name] = topicDetails
305 }
306
307 return topicsDetailsMap, nil
308}
309
310func (ca *clusterAdmin) DeleteTopic(topic string) error {
311
312 if topic == "" {
313 return ErrInvalidTopic
314 }
315
316 request := &DeleteTopicsRequest{
317 Topics: []string{topic},
318 Timeout: ca.conf.Admin.Timeout,
319 }
320
321 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
322 request.Version = 1
323 }
324
325 b, err := ca.Controller()
326 if err != nil {
327 return err
328 }
329
330 rsp, err := b.DeleteTopics(request)
331 if err != nil {
332 return err
333 }
334
335 topicErr, ok := rsp.TopicErrorCodes[topic]
336 if !ok {
337 return ErrIncompleteResponse
338 }
339
340 if topicErr != ErrNoError {
341 return topicErr
342 }
343 return nil
344}
345
346func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
347 if topic == "" {
348 return ErrInvalidTopic
349 }
350
351 topicPartitions := make(map[string]*TopicPartition)
352 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
353
354 request := &CreatePartitionsRequest{
355 TopicPartitions: topicPartitions,
356 Timeout: ca.conf.Admin.Timeout,
357 }
358
359 b, err := ca.Controller()
360 if err != nil {
361 return err
362 }
363
364 rsp, err := b.CreatePartitions(request)
365 if err != nil {
366 return err
367 }
368
369 topicErr, ok := rsp.TopicPartitionErrors[topic]
370 if !ok {
371 return ErrIncompleteResponse
372 }
373
374 if topicErr.Err != ErrNoError {
375 return topicErr
376 }
377
378 return nil
379}
380
381func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
382
383 if topic == "" {
384 return ErrInvalidTopic
385 }
386 partitionPerBroker := make(map[*Broker][]int32)
387 for partition := range partitionOffsets {
388 broker, err := ca.client.Leader(topic, partition)
389 if err != nil {
390 return err
391 }
392 if _, ok := partitionPerBroker[broker]; ok {
393 partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
394 } else {
395 partitionPerBroker[broker] = []int32{partition}
396 }
397 }
398 errs := make([]error, 0)
399 for broker, partitions := range partitionPerBroker {
400 topics := make(map[string]*DeleteRecordsRequestTopic)
401 recordsToDelete := make(map[int32]int64)
402 for _, p := range partitions {
403 recordsToDelete[p] = partitionOffsets[p]
404 }
405 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
406 request := &DeleteRecordsRequest{
407 Topics: topics,
408 Timeout: ca.conf.Admin.Timeout,
409 }
410
411 rsp, err := broker.DeleteRecords(request)
412 if err != nil {
413 errs = append(errs, err)
414 } else {
415 deleteRecordsResponseTopic, ok := rsp.Topics[topic]
416 if !ok {
417 errs = append(errs, ErrIncompleteResponse)
418 } else {
419 for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
420 if deleteRecordsResponsePartition.Err != ErrNoError {
421 errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
422 }
423 }
424 }
425 }
426 }
427 if len(errs) > 0 {
428 return ErrDeleteRecords{MultiError{&errs}}
429 }
430 //todo since we are dealing with couple of partitions it would be good if we return slice of errors
431 //for each partition instead of one error
432 return nil
433}
434
435func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
436
437 var entries []ConfigEntry
438 var resources []*ConfigResource
439 resources = append(resources, &resource)
440
441 request := &DescribeConfigsRequest{
442 Resources: resources,
443 }
444
445 b, err := ca.Controller()
446 if err != nil {
447 return nil, err
448 }
449
450 rsp, err := b.DescribeConfigs(request)
451 if err != nil {
452 return nil, err
453 }
454
455 for _, rspResource := range rsp.Resources {
456 if rspResource.Name == resource.Name {
457 if rspResource.ErrorMsg != "" {
458 return nil, errors.New(rspResource.ErrorMsg)
459 }
460 for _, cfgEntry := range rspResource.Configs {
461 entries = append(entries, *cfgEntry)
462 }
463 }
464 }
465 return entries, nil
466}
467
468func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
469
470 var resources []*AlterConfigsResource
471 resources = append(resources, &AlterConfigsResource{
472 Type: resourceType,
473 Name: name,
474 ConfigEntries: entries,
475 })
476
477 request := &AlterConfigsRequest{
478 Resources: resources,
479 ValidateOnly: validateOnly,
480 }
481
482 b, err := ca.Controller()
483 if err != nil {
484 return err
485 }
486
487 rsp, err := b.AlterConfigs(request)
488 if err != nil {
489 return err
490 }
491
492 for _, rspResource := range rsp.Resources {
493 if rspResource.Name == name {
494 if rspResource.ErrorMsg != "" {
495 return errors.New(rspResource.ErrorMsg)
496 }
497 }
498 }
499 return nil
500}
501
502func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
503 var acls []*AclCreation
504 acls = append(acls, &AclCreation{resource, acl})
505 request := &CreateAclsRequest{AclCreations: acls}
506
507 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
508 request.Version = 1
509 }
510
511 b, err := ca.Controller()
512 if err != nil {
513 return err
514 }
515
516 _, err = b.CreateAcls(request)
517 return err
518}
519
520func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
521
522 request := &DescribeAclsRequest{AclFilter: filter}
523
524 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
525 request.Version = 1
526 }
527
528 b, err := ca.Controller()
529 if err != nil {
530 return nil, err
531 }
532
533 rsp, err := b.DescribeAcls(request)
534 if err != nil {
535 return nil, err
536 }
537
538 var lAcls []ResourceAcls
539 for _, rAcl := range rsp.ResourceAcls {
540 lAcls = append(lAcls, *rAcl)
541 }
542 return lAcls, nil
543}
544
545func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
546 var filters []*AclFilter
547 filters = append(filters, &filter)
548 request := &DeleteAclsRequest{Filters: filters}
549
550 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
551 request.Version = 1
552 }
553
554 b, err := ca.Controller()
555 if err != nil {
556 return nil, err
557 }
558
559 rsp, err := b.DeleteAcls(request)
560 if err != nil {
561 return nil, err
562 }
563
564 var mAcls []MatchingAcl
565 for _, fr := range rsp.FilterResponses {
566 for _, mACL := range fr.MatchingAcls {
567 mAcls = append(mAcls, *mACL)
568 }
569
570 }
571 return mAcls, nil
572}
573
574func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
575 groupsPerBroker := make(map[*Broker][]string)
576
577 for _, group := range groups {
578 controller, err := ca.client.Coordinator(group)
579 if err != nil {
580 return nil, err
581 }
582 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
583
584 }
585
586 for broker, brokerGroups := range groupsPerBroker {
587 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
588 Groups: brokerGroups,
589 })
590 if err != nil {
591 return nil, err
592 }
593
594 result = append(result, response.Groups...)
595 }
596 return result, nil
597}
598
599func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
600 allGroups = make(map[string]string)
601
602 // Query brokers in parallel, since we have to query *all* brokers
603 brokers := ca.client.Brokers()
604 groupMaps := make(chan map[string]string, len(brokers))
605 errors := make(chan error, len(brokers))
606 wg := sync.WaitGroup{}
607
608 for _, b := range brokers {
609 wg.Add(1)
610 go func(b *Broker, conf *Config) {
611 defer wg.Done()
612 _ = b.Open(conf) // Ensure that broker is opened
613
614 response, err := b.ListGroups(&ListGroupsRequest{})
615 if err != nil {
616 errors <- err
617 return
618 }
619
620 groups := make(map[string]string)
621 for group, typ := range response.Groups {
622 groups[group] = typ
623 }
624
625 groupMaps <- groups
626
627 }(b, ca.conf)
628 }
629
630 wg.Wait()
631 close(groupMaps)
632 close(errors)
633
634 for groupMap := range groupMaps {
635 for group, protocolType := range groupMap {
636 allGroups[group] = protocolType
637 }
638 }
639
640 // Intentionally return only the first error for simplicity
641 err = <-errors
642 return
643}
644
645func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
646 coordinator, err := ca.client.Coordinator(group)
647 if err != nil {
648 return nil, err
649 }
650
651 request := &OffsetFetchRequest{
652 ConsumerGroup: group,
653 partitions: topicPartitions,
654 }
655
656 if ca.conf.Version.IsAtLeast(V0_10_2_0) {
657 request.Version = 2
658 } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
659 request.Version = 1
660 }
661
662 return coordinator.FetchOffset(request)
663}
664
665func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
666 coordinator, err := ca.client.Coordinator(group)
667 if err != nil {
668 return err
669 }
670
671 request := &DeleteGroupsRequest{
672 Groups: []string{group},
673 }
674
675 resp, err := coordinator.DeleteGroups(request)
676 if err != nil {
677 return err
678 }
679
680 groupErr, ok := resp.GroupErrorCodes[group]
681 if !ok {
682 return ErrIncompleteResponse
683 }
684
685 if groupErr != ErrNoError {
686 return groupErr
687 }
688
689 return nil
690}