blob: 1db6a0ee5c29ccdd95a1de824fad41344060900d [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "errors"
5 "math/rand"
6 "sync"
7)
8
9// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
10// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
11// Methods with stricter requirements will specify the minimum broker version required.
12// You MUST call Close() on a client to avoid leaks
13type ClusterAdmin interface {
14 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
15 // It may take several seconds after CreateTopic returns success for all the brokers
16 // to become aware that the topic has been created. During this time, listTopics
17 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
18 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
19
20 // List the topics available in the cluster with the default options.
21 ListTopics() (map[string]TopicDetail, error)
22
Abhilash S.L3b494632019-07-16 15:51:09 +053023 // Describe some topics in the cluster.
William Kurkianea869482019-04-09 15:16:11 -040024 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
25
26 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
27 // and for all the brokers to become aware that the topics are gone.
28 // During this time, listTopics may continue to return information about the deleted topic.
29 // If delete.topic.enable is false on the brokers, deleteTopic will mark
30 // the topic for deletion, but not actually delete them.
31 // This operation is supported by brokers with version 0.10.1.0 or higher.
32 DeleteTopic(topic string) error
33
34 // Increase the number of partitions of the topics according to the corresponding values.
35 // If partitions are increased for a topic that has a key, the partition logic or ordering of
36 // the messages will be affected. It may take several seconds after this method returns
37 // success for all the brokers to become aware that the partitions have been created.
38 // During this time, ClusterAdmin#describeTopics may not return information about the
39 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
40 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
41
42 // Delete records whose offset is smaller than the given offset of the corresponding partition.
43 // This operation is supported by brokers with version 0.11.0.0 or higher.
44 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
45
46 // Get the configuration for the specified resources.
47 // The returned configuration includes default values and the Default is true
48 // can be used to distinguish them from user supplied values.
49 // Config entries where ReadOnly is true cannot be updated.
50 // The value of config entries where Sensitive is true is always nil so
51 // sensitive information is not disclosed.
52 // This operation is supported by brokers with version 0.11.0.0 or higher.
53 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
54
55 // Update the configuration for the specified resources with the default options.
56 // This operation is supported by brokers with version 0.11.0.0 or higher.
57 // The resources with their configs (topic is the only resource type with configs
58 // that can be updated currently Updates are not transactional so they may succeed
59 // for some resources while fail for others. The configs for a particular resource are updated automatically.
60 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
61
62 // Creates access control lists (ACLs) which are bound to specific resources.
63 // This operation is not transactional so it may succeed for some ACLs while fail for others.
64 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
65 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
66 CreateACL(resource Resource, acl Acl) error
67
68 // Lists access control lists (ACLs) according to the supplied filter.
69 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
70 // This operation is supported by brokers with version 0.11.0.0 or higher.
71 ListAcls(filter AclFilter) ([]ResourceAcls, error)
72
73 // Deletes access control lists (ACLs) according to the supplied filters.
74 // This operation is not transactional so it may succeed for some ACLs while fail for others.
75 // This operation is supported by brokers with version 0.11.0.0 or higher.
76 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
77
78 // List the consumer groups available in the cluster.
79 ListConsumerGroups() (map[string]string, error)
80
Abhilash S.L3b494632019-07-16 15:51:09 +053081 // Describe the given consumer groups.
William Kurkianea869482019-04-09 15:16:11 -040082 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
83
84 // List the consumer group offsets available in the cluster.
85 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
86
Abhilash S.L3b494632019-07-16 15:51:09 +053087 // Delete a consumer group.
88 DeleteConsumerGroup(group string) error
89
William Kurkianea869482019-04-09 15:16:11 -040090 // Get information about the nodes in the cluster
91 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
92
93 // Close shuts down the admin and closes underlying client.
94 Close() error
95}
96
97type clusterAdmin struct {
98 client Client
99 conf *Config
100}
101
102// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
103func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
104 client, err := NewClient(addrs, conf)
105 if err != nil {
106 return nil, err
107 }
108
109 //make sure we can retrieve the controller
110 _, err = client.Controller()
111 if err != nil {
112 return nil, err
113 }
114
115 ca := &clusterAdmin{
116 client: client,
117 conf: client.Config(),
118 }
119 return ca, nil
120}
121
122func (ca *clusterAdmin) Close() error {
123 return ca.client.Close()
124}
125
126func (ca *clusterAdmin) Controller() (*Broker, error) {
127 return ca.client.Controller()
128}
129
130func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
131
132 if topic == "" {
133 return ErrInvalidTopic
134 }
135
136 if detail == nil {
Abhilash S.L3b494632019-07-16 15:51:09 +0530137 return errors.New("you must specify topic details")
William Kurkianea869482019-04-09 15:16:11 -0400138 }
139
140 topicDetails := make(map[string]*TopicDetail)
141 topicDetails[topic] = detail
142
143 request := &CreateTopicsRequest{
144 TopicDetails: topicDetails,
145 ValidateOnly: validateOnly,
146 Timeout: ca.conf.Admin.Timeout,
147 }
148
149 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
150 request.Version = 1
151 }
152 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
153 request.Version = 2
154 }
155
156 b, err := ca.Controller()
157 if err != nil {
158 return err
159 }
160
161 rsp, err := b.CreateTopics(request)
162 if err != nil {
163 return err
164 }
165
166 topicErr, ok := rsp.TopicErrors[topic]
167 if !ok {
168 return ErrIncompleteResponse
169 }
170
171 if topicErr.Err != ErrNoError {
Abhilash S.L3b494632019-07-16 15:51:09 +0530172 return topicErr
William Kurkianea869482019-04-09 15:16:11 -0400173 }
174
175 return nil
176}
177
178func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
179 controller, err := ca.Controller()
180 if err != nil {
181 return nil, err
182 }
183
184 request := &MetadataRequest{
185 Topics: topics,
186 AllowAutoTopicCreation: false,
187 }
188
Abhilash S.L3b494632019-07-16 15:51:09 +0530189 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
190 request.Version = 5
191 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
William Kurkianea869482019-04-09 15:16:11 -0400192 request.Version = 4
193 }
194
195 response, err := controller.GetMetadata(request)
196 if err != nil {
197 return nil, err
198 }
199 return response.Topics, nil
200}
201
202func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
203 controller, err := ca.Controller()
204 if err != nil {
205 return nil, int32(0), err
206 }
207
208 request := &MetadataRequest{
209 Topics: []string{},
210 }
211
212 response, err := controller.GetMetadata(request)
213 if err != nil {
214 return nil, int32(0), err
215 }
216
217 return response.Brokers, response.ControllerID, nil
218}
219
220func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
221 brokers := ca.client.Brokers()
222 if len(brokers) > 0 {
223 index := rand.Intn(len(brokers))
224 return brokers[index], nil
225 }
226 return nil, errors.New("no available broker")
227}
228
229func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
230 // In order to build TopicDetails we need to first get the list of all
231 // topics using a MetadataRequest and then get their configs using a
232 // DescribeConfigsRequest request. To avoid sending many requests to the
233 // broker, we use a single DescribeConfigsRequest.
234
235 // Send the all-topic MetadataRequest
236 b, err := ca.findAnyBroker()
237 if err != nil {
238 return nil, err
239 }
240 _ = b.Open(ca.client.Config())
241
242 metadataReq := &MetadataRequest{}
243 metadataResp, err := b.GetMetadata(metadataReq)
244 if err != nil {
245 return nil, err
246 }
247
248 topicsDetailsMap := make(map[string]TopicDetail)
249
250 var describeConfigsResources []*ConfigResource
251
252 for _, topic := range metadataResp.Topics {
253 topicDetails := TopicDetail{
254 NumPartitions: int32(len(topic.Partitions)),
255 }
256 if len(topic.Partitions) > 0 {
257 topicDetails.ReplicaAssignment = map[int32][]int32{}
258 for _, partition := range topic.Partitions {
259 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
260 }
261 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
262 }
263 topicsDetailsMap[topic.Name] = topicDetails
264
265 // we populate the resources we want to describe from the MetadataResponse
266 topicResource := ConfigResource{
267 Type: TopicResource,
268 Name: topic.Name,
269 }
270 describeConfigsResources = append(describeConfigsResources, &topicResource)
271 }
272
273 // Send the DescribeConfigsRequest
274 describeConfigsReq := &DescribeConfigsRequest{
275 Resources: describeConfigsResources,
276 }
277 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
278 if err != nil {
279 return nil, err
280 }
281
282 for _, resource := range describeConfigsResp.Resources {
283 topicDetails := topicsDetailsMap[resource.Name]
284 topicDetails.ConfigEntries = make(map[string]*string)
285
286 for _, entry := range resource.Configs {
287 // only include non-default non-sensitive config
288 // (don't actually think topic config will ever be sensitive)
289 if entry.Default || entry.Sensitive {
290 continue
291 }
292 topicDetails.ConfigEntries[entry.Name] = &entry.Value
293 }
294
295 topicsDetailsMap[resource.Name] = topicDetails
296 }
297
298 return topicsDetailsMap, nil
299}
300
301func (ca *clusterAdmin) DeleteTopic(topic string) error {
302
303 if topic == "" {
304 return ErrInvalidTopic
305 }
306
307 request := &DeleteTopicsRequest{
308 Topics: []string{topic},
309 Timeout: ca.conf.Admin.Timeout,
310 }
311
312 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
313 request.Version = 1
314 }
315
316 b, err := ca.Controller()
317 if err != nil {
318 return err
319 }
320
321 rsp, err := b.DeleteTopics(request)
322 if err != nil {
323 return err
324 }
325
326 topicErr, ok := rsp.TopicErrorCodes[topic]
327 if !ok {
328 return ErrIncompleteResponse
329 }
330
331 if topicErr != ErrNoError {
332 return topicErr
333 }
334 return nil
335}
336
337func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
338 if topic == "" {
339 return ErrInvalidTopic
340 }
341
342 topicPartitions := make(map[string]*TopicPartition)
343 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
344
345 request := &CreatePartitionsRequest{
346 TopicPartitions: topicPartitions,
347 Timeout: ca.conf.Admin.Timeout,
348 }
349
350 b, err := ca.Controller()
351 if err != nil {
352 return err
353 }
354
355 rsp, err := b.CreatePartitions(request)
356 if err != nil {
357 return err
358 }
359
360 topicErr, ok := rsp.TopicPartitionErrors[topic]
361 if !ok {
362 return ErrIncompleteResponse
363 }
364
365 if topicErr.Err != ErrNoError {
Abhilash S.L3b494632019-07-16 15:51:09 +0530366 return topicErr
William Kurkianea869482019-04-09 15:16:11 -0400367 }
368
369 return nil
370}
371
372func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
373
374 if topic == "" {
375 return ErrInvalidTopic
376 }
Devmalya Pauleb2e9552019-08-27 19:42:00 -0400377 partitionPerBroker := make(map[*Broker][]int32)
378 for partition := range partitionOffsets {
379 broker, err := ca.client.Leader(topic, partition)
380 if err != nil {
381 return err
382 }
383 if _, ok := partitionPerBroker[broker]; ok {
384 partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
385 } else {
386 partitionPerBroker[broker] = []int32{partition}
387 }
William Kurkianea869482019-04-09 15:16:11 -0400388 }
Devmalya Pauleb2e9552019-08-27 19:42:00 -0400389 errs := make([]error, 0)
390 for broker, partitions := range partitionPerBroker {
391 topics := make(map[string]*DeleteRecordsRequestTopic)
392 recordsToDelete := make(map[int32]int64)
393 for _, p := range partitions {
394 recordsToDelete[p] = partitionOffsets[p]
395 }
396 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
397 request := &DeleteRecordsRequest{
398 Topics: topics,
399 Timeout: ca.conf.Admin.Timeout,
400 }
William Kurkianea869482019-04-09 15:16:11 -0400401
Devmalya Pauleb2e9552019-08-27 19:42:00 -0400402 rsp, err := broker.DeleteRecords(request)
403 if err != nil {
404 errs = append(errs, err)
405 } else {
406 deleteRecordsResponseTopic, ok := rsp.Topics[topic]
407 if !ok {
408 errs = append(errs, ErrIncompleteResponse)
409 } else {
410 for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
411 if deleteRecordsResponsePartition.Err != ErrNoError {
412 errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
413 }
414 }
415 }
416 }
William Kurkianea869482019-04-09 15:16:11 -0400417 }
Devmalya Pauleb2e9552019-08-27 19:42:00 -0400418 if len(errs) > 0 {
419 return ErrDeleteRecords{MultiError{&errs}}
William Kurkianea869482019-04-09 15:16:11 -0400420 }
William Kurkianea869482019-04-09 15:16:11 -0400421 //todo since we are dealing with couple of partitions it would be good if we return slice of errors
422 //for each partition instead of one error
423 return nil
424}
425
426func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
427
428 var entries []ConfigEntry
429 var resources []*ConfigResource
430 resources = append(resources, &resource)
431
432 request := &DescribeConfigsRequest{
433 Resources: resources,
434 }
435
436 b, err := ca.Controller()
437 if err != nil {
438 return nil, err
439 }
440
441 rsp, err := b.DescribeConfigs(request)
442 if err != nil {
443 return nil, err
444 }
445
446 for _, rspResource := range rsp.Resources {
447 if rspResource.Name == resource.Name {
448 if rspResource.ErrorMsg != "" {
449 return nil, errors.New(rspResource.ErrorMsg)
450 }
451 for _, cfgEntry := range rspResource.Configs {
452 entries = append(entries, *cfgEntry)
453 }
454 }
455 }
456 return entries, nil
457}
458
459func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
460
461 var resources []*AlterConfigsResource
462 resources = append(resources, &AlterConfigsResource{
463 Type: resourceType,
464 Name: name,
465 ConfigEntries: entries,
466 })
467
468 request := &AlterConfigsRequest{
469 Resources: resources,
470 ValidateOnly: validateOnly,
471 }
472
473 b, err := ca.Controller()
474 if err != nil {
475 return err
476 }
477
478 rsp, err := b.AlterConfigs(request)
479 if err != nil {
480 return err
481 }
482
483 for _, rspResource := range rsp.Resources {
484 if rspResource.Name == name {
485 if rspResource.ErrorMsg != "" {
486 return errors.New(rspResource.ErrorMsg)
487 }
488 }
489 }
490 return nil
491}
492
493func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
494 var acls []*AclCreation
495 acls = append(acls, &AclCreation{resource, acl})
496 request := &CreateAclsRequest{AclCreations: acls}
497
498 b, err := ca.Controller()
499 if err != nil {
500 return err
501 }
502
503 _, err = b.CreateAcls(request)
504 return err
505}
506
507func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
508
509 request := &DescribeAclsRequest{AclFilter: filter}
510
511 b, err := ca.Controller()
512 if err != nil {
513 return nil, err
514 }
515
516 rsp, err := b.DescribeAcls(request)
517 if err != nil {
518 return nil, err
519 }
520
521 var lAcls []ResourceAcls
522 for _, rAcl := range rsp.ResourceAcls {
523 lAcls = append(lAcls, *rAcl)
524 }
525 return lAcls, nil
526}
527
528func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
529 var filters []*AclFilter
530 filters = append(filters, &filter)
531 request := &DeleteAclsRequest{Filters: filters}
532
533 b, err := ca.Controller()
534 if err != nil {
535 return nil, err
536 }
537
538 rsp, err := b.DeleteAcls(request)
539 if err != nil {
540 return nil, err
541 }
542
543 var mAcls []MatchingAcl
544 for _, fr := range rsp.FilterResponses {
545 for _, mACL := range fr.MatchingAcls {
546 mAcls = append(mAcls, *mACL)
547 }
548
549 }
550 return mAcls, nil
551}
552
553func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
554 groupsPerBroker := make(map[*Broker][]string)
555
556 for _, group := range groups {
557 controller, err := ca.client.Coordinator(group)
558 if err != nil {
559 return nil, err
560 }
561 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
562
563 }
564
565 for broker, brokerGroups := range groupsPerBroker {
566 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
567 Groups: brokerGroups,
568 })
569 if err != nil {
570 return nil, err
571 }
572
573 result = append(result, response.Groups...)
574 }
575 return result, nil
576}
577
578func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
579 allGroups = make(map[string]string)
580
581 // Query brokers in parallel, since we have to query *all* brokers
582 brokers := ca.client.Brokers()
583 groupMaps := make(chan map[string]string, len(brokers))
584 errors := make(chan error, len(brokers))
585 wg := sync.WaitGroup{}
586
587 for _, b := range brokers {
588 wg.Add(1)
589 go func(b *Broker, conf *Config) {
590 defer wg.Done()
591 _ = b.Open(conf) // Ensure that broker is opened
592
593 response, err := b.ListGroups(&ListGroupsRequest{})
594 if err != nil {
595 errors <- err
596 return
597 }
598
599 groups := make(map[string]string)
600 for group, typ := range response.Groups {
601 groups[group] = typ
602 }
603
604 groupMaps <- groups
605
606 }(b, ca.conf)
607 }
608
609 wg.Wait()
610 close(groupMaps)
611 close(errors)
612
613 for groupMap := range groupMaps {
614 for group, protocolType := range groupMap {
615 allGroups[group] = protocolType
616 }
617 }
618
619 // Intentionally return only the first error for simplicity
620 err = <-errors
621 return
622}
623
624func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
625 coordinator, err := ca.client.Coordinator(group)
626 if err != nil {
627 return nil, err
628 }
629
630 request := &OffsetFetchRequest{
631 ConsumerGroup: group,
632 partitions: topicPartitions,
633 }
634
Abhilash S.L3b494632019-07-16 15:51:09 +0530635 if ca.conf.Version.IsAtLeast(V0_10_2_0) {
636 request.Version = 2
637 } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
William Kurkianea869482019-04-09 15:16:11 -0400638 request.Version = 1
639 }
640
641 return coordinator.FetchOffset(request)
642}
Abhilash S.L3b494632019-07-16 15:51:09 +0530643
644func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
645 coordinator, err := ca.client.Coordinator(group)
646 if err != nil {
647 return err
648 }
649
650 request := &DeleteGroupsRequest{
651 Groups: []string{group},
652 }
653
654 resp, err := coordinator.DeleteGroups(request)
655 if err != nil {
656 return err
657 }
658
659 groupErr, ok := resp.GroupErrorCodes[group]
660 if !ok {
661 return ErrIncompleteResponse
662 }
663
664 if groupErr != ErrNoError {
665 return groupErr
666 }
667
668 return nil
669}