blob: a4d1bc5106e8649cf94529ae813b7a9c7973013e [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "errors"
5 "math/rand"
6 "sync"
7)
8
9// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
10// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
11// Methods with stricter requirements will specify the minimum broker version required.
12// You MUST call Close() on a client to avoid leaks
13type ClusterAdmin interface {
14 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
15 // It may take several seconds after CreateTopic returns success for all the brokers
16 // to become aware that the topic has been created. During this time, listTopics
17 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
18 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
19
20 // List the topics available in the cluster with the default options.
21 ListTopics() (map[string]TopicDetail, error)
22
Abhilash S.L3b494632019-07-16 15:51:09 +053023 // Describe some topics in the cluster.
William Kurkianea869482019-04-09 15:16:11 -040024 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
25
26 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
27 // and for all the brokers to become aware that the topics are gone.
28 // During this time, listTopics may continue to return information about the deleted topic.
29 // If delete.topic.enable is false on the brokers, deleteTopic will mark
30 // the topic for deletion, but not actually delete them.
31 // This operation is supported by brokers with version 0.10.1.0 or higher.
32 DeleteTopic(topic string) error
33
34 // Increase the number of partitions of the topics according to the corresponding values.
35 // If partitions are increased for a topic that has a key, the partition logic or ordering of
36 // the messages will be affected. It may take several seconds after this method returns
37 // success for all the brokers to become aware that the partitions have been created.
38 // During this time, ClusterAdmin#describeTopics may not return information about the
39 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
40 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
41
42 // Delete records whose offset is smaller than the given offset of the corresponding partition.
43 // This operation is supported by brokers with version 0.11.0.0 or higher.
44 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
45
46 // Get the configuration for the specified resources.
47 // The returned configuration includes default values and the Default is true
48 // can be used to distinguish them from user supplied values.
49 // Config entries where ReadOnly is true cannot be updated.
50 // The value of config entries where Sensitive is true is always nil so
51 // sensitive information is not disclosed.
52 // This operation is supported by brokers with version 0.11.0.0 or higher.
53 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
54
55 // Update the configuration for the specified resources with the default options.
56 // This operation is supported by brokers with version 0.11.0.0 or higher.
57 // The resources with their configs (topic is the only resource type with configs
58 // that can be updated currently Updates are not transactional so they may succeed
59 // for some resources while fail for others. The configs for a particular resource are updated automatically.
60 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
61
62 // Creates access control lists (ACLs) which are bound to specific resources.
63 // This operation is not transactional so it may succeed for some ACLs while fail for others.
64 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
65 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
66 CreateACL(resource Resource, acl Acl) error
67
68 // Lists access control lists (ACLs) according to the supplied filter.
69 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
70 // This operation is supported by brokers with version 0.11.0.0 or higher.
71 ListAcls(filter AclFilter) ([]ResourceAcls, error)
72
73 // Deletes access control lists (ACLs) according to the supplied filters.
74 // This operation is not transactional so it may succeed for some ACLs while fail for others.
75 // This operation is supported by brokers with version 0.11.0.0 or higher.
76 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
77
78 // List the consumer groups available in the cluster.
79 ListConsumerGroups() (map[string]string, error)
80
Abhilash S.L3b494632019-07-16 15:51:09 +053081 // Describe the given consumer groups.
William Kurkianea869482019-04-09 15:16:11 -040082 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
83
84 // List the consumer group offsets available in the cluster.
85 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
86
Abhilash S.L3b494632019-07-16 15:51:09 +053087 // Delete a consumer group.
88 DeleteConsumerGroup(group string) error
89
William Kurkianea869482019-04-09 15:16:11 -040090 // Get information about the nodes in the cluster
91 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
92
93 // Close shuts down the admin and closes underlying client.
94 Close() error
95}
96
97type clusterAdmin struct {
98 client Client
99 conf *Config
100}
101
102// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
103func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
104 client, err := NewClient(addrs, conf)
105 if err != nil {
106 return nil, err
107 }
108
109 //make sure we can retrieve the controller
110 _, err = client.Controller()
111 if err != nil {
112 return nil, err
113 }
114
115 ca := &clusterAdmin{
116 client: client,
117 conf: client.Config(),
118 }
119 return ca, nil
120}
121
122func (ca *clusterAdmin) Close() error {
123 return ca.client.Close()
124}
125
126func (ca *clusterAdmin) Controller() (*Broker, error) {
127 return ca.client.Controller()
128}
129
130func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
131
132 if topic == "" {
133 return ErrInvalidTopic
134 }
135
136 if detail == nil {
Abhilash S.L3b494632019-07-16 15:51:09 +0530137 return errors.New("you must specify topic details")
William Kurkianea869482019-04-09 15:16:11 -0400138 }
139
140 topicDetails := make(map[string]*TopicDetail)
141 topicDetails[topic] = detail
142
143 request := &CreateTopicsRequest{
144 TopicDetails: topicDetails,
145 ValidateOnly: validateOnly,
146 Timeout: ca.conf.Admin.Timeout,
147 }
148
149 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
150 request.Version = 1
151 }
152 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
153 request.Version = 2
154 }
155
156 b, err := ca.Controller()
157 if err != nil {
158 return err
159 }
160
161 rsp, err := b.CreateTopics(request)
162 if err != nil {
163 return err
164 }
165
166 topicErr, ok := rsp.TopicErrors[topic]
167 if !ok {
168 return ErrIncompleteResponse
169 }
170
171 if topicErr.Err != ErrNoError {
Abhilash S.L3b494632019-07-16 15:51:09 +0530172 return topicErr
William Kurkianea869482019-04-09 15:16:11 -0400173 }
174
175 return nil
176}
177
178func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
179 controller, err := ca.Controller()
180 if err != nil {
181 return nil, err
182 }
183
184 request := &MetadataRequest{
185 Topics: topics,
186 AllowAutoTopicCreation: false,
187 }
188
Abhilash S.L3b494632019-07-16 15:51:09 +0530189 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
190 request.Version = 5
191 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
William Kurkianea869482019-04-09 15:16:11 -0400192 request.Version = 4
193 }
194
195 response, err := controller.GetMetadata(request)
196 if err != nil {
197 return nil, err
198 }
199 return response.Topics, nil
200}
201
202func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
203 controller, err := ca.Controller()
204 if err != nil {
205 return nil, int32(0), err
206 }
207
208 request := &MetadataRequest{
209 Topics: []string{},
210 }
211
212 response, err := controller.GetMetadata(request)
213 if err != nil {
214 return nil, int32(0), err
215 }
216
217 return response.Brokers, response.ControllerID, nil
218}
219
220func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
221 brokers := ca.client.Brokers()
222 if len(brokers) > 0 {
223 index := rand.Intn(len(brokers))
224 return brokers[index], nil
225 }
226 return nil, errors.New("no available broker")
227}
228
229func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
230 // In order to build TopicDetails we need to first get the list of all
231 // topics using a MetadataRequest and then get their configs using a
232 // DescribeConfigsRequest request. To avoid sending many requests to the
233 // broker, we use a single DescribeConfigsRequest.
234
235 // Send the all-topic MetadataRequest
236 b, err := ca.findAnyBroker()
237 if err != nil {
238 return nil, err
239 }
240 _ = b.Open(ca.client.Config())
241
242 metadataReq := &MetadataRequest{}
243 metadataResp, err := b.GetMetadata(metadataReq)
244 if err != nil {
245 return nil, err
246 }
247
248 topicsDetailsMap := make(map[string]TopicDetail)
249
250 var describeConfigsResources []*ConfigResource
251
252 for _, topic := range metadataResp.Topics {
253 topicDetails := TopicDetail{
254 NumPartitions: int32(len(topic.Partitions)),
255 }
256 if len(topic.Partitions) > 0 {
257 topicDetails.ReplicaAssignment = map[int32][]int32{}
258 for _, partition := range topic.Partitions {
259 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
260 }
261 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
262 }
263 topicsDetailsMap[topic.Name] = topicDetails
264
265 // we populate the resources we want to describe from the MetadataResponse
266 topicResource := ConfigResource{
267 Type: TopicResource,
268 Name: topic.Name,
269 }
270 describeConfigsResources = append(describeConfigsResources, &topicResource)
271 }
272
273 // Send the DescribeConfigsRequest
274 describeConfigsReq := &DescribeConfigsRequest{
275 Resources: describeConfigsResources,
276 }
277 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
278 if err != nil {
279 return nil, err
280 }
281
282 for _, resource := range describeConfigsResp.Resources {
283 topicDetails := topicsDetailsMap[resource.Name]
284 topicDetails.ConfigEntries = make(map[string]*string)
285
286 for _, entry := range resource.Configs {
287 // only include non-default non-sensitive config
288 // (don't actually think topic config will ever be sensitive)
289 if entry.Default || entry.Sensitive {
290 continue
291 }
292 topicDetails.ConfigEntries[entry.Name] = &entry.Value
293 }
294
295 topicsDetailsMap[resource.Name] = topicDetails
296 }
297
298 return topicsDetailsMap, nil
299}
300
301func (ca *clusterAdmin) DeleteTopic(topic string) error {
302
303 if topic == "" {
304 return ErrInvalidTopic
305 }
306
307 request := &DeleteTopicsRequest{
308 Topics: []string{topic},
309 Timeout: ca.conf.Admin.Timeout,
310 }
311
312 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
313 request.Version = 1
314 }
315
316 b, err := ca.Controller()
317 if err != nil {
318 return err
319 }
320
321 rsp, err := b.DeleteTopics(request)
322 if err != nil {
323 return err
324 }
325
326 topicErr, ok := rsp.TopicErrorCodes[topic]
327 if !ok {
328 return ErrIncompleteResponse
329 }
330
331 if topicErr != ErrNoError {
332 return topicErr
333 }
334 return nil
335}
336
337func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
338 if topic == "" {
339 return ErrInvalidTopic
340 }
341
342 topicPartitions := make(map[string]*TopicPartition)
343 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
344
345 request := &CreatePartitionsRequest{
346 TopicPartitions: topicPartitions,
347 Timeout: ca.conf.Admin.Timeout,
348 }
349
350 b, err := ca.Controller()
351 if err != nil {
352 return err
353 }
354
355 rsp, err := b.CreatePartitions(request)
356 if err != nil {
357 return err
358 }
359
360 topicErr, ok := rsp.TopicPartitionErrors[topic]
361 if !ok {
362 return ErrIncompleteResponse
363 }
364
365 if topicErr.Err != ErrNoError {
Abhilash S.L3b494632019-07-16 15:51:09 +0530366 return topicErr
William Kurkianea869482019-04-09 15:16:11 -0400367 }
368
369 return nil
370}
371
372func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
373
374 if topic == "" {
375 return ErrInvalidTopic
376 }
377
378 topics := make(map[string]*DeleteRecordsRequestTopic)
379 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
380 request := &DeleteRecordsRequest{
381 Topics: topics,
382 Timeout: ca.conf.Admin.Timeout,
383 }
384
385 b, err := ca.Controller()
386 if err != nil {
387 return err
388 }
389
390 rsp, err := b.DeleteRecords(request)
391 if err != nil {
392 return err
393 }
394
395 _, ok := rsp.Topics[topic]
396 if !ok {
397 return ErrIncompleteResponse
398 }
399
400 //todo since we are dealing with couple of partitions it would be good if we return slice of errors
401 //for each partition instead of one error
402 return nil
403}
404
405func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
406
407 var entries []ConfigEntry
408 var resources []*ConfigResource
409 resources = append(resources, &resource)
410
411 request := &DescribeConfigsRequest{
412 Resources: resources,
413 }
414
415 b, err := ca.Controller()
416 if err != nil {
417 return nil, err
418 }
419
420 rsp, err := b.DescribeConfigs(request)
421 if err != nil {
422 return nil, err
423 }
424
425 for _, rspResource := range rsp.Resources {
426 if rspResource.Name == resource.Name {
427 if rspResource.ErrorMsg != "" {
428 return nil, errors.New(rspResource.ErrorMsg)
429 }
430 for _, cfgEntry := range rspResource.Configs {
431 entries = append(entries, *cfgEntry)
432 }
433 }
434 }
435 return entries, nil
436}
437
438func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
439
440 var resources []*AlterConfigsResource
441 resources = append(resources, &AlterConfigsResource{
442 Type: resourceType,
443 Name: name,
444 ConfigEntries: entries,
445 })
446
447 request := &AlterConfigsRequest{
448 Resources: resources,
449 ValidateOnly: validateOnly,
450 }
451
452 b, err := ca.Controller()
453 if err != nil {
454 return err
455 }
456
457 rsp, err := b.AlterConfigs(request)
458 if err != nil {
459 return err
460 }
461
462 for _, rspResource := range rsp.Resources {
463 if rspResource.Name == name {
464 if rspResource.ErrorMsg != "" {
465 return errors.New(rspResource.ErrorMsg)
466 }
467 }
468 }
469 return nil
470}
471
472func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
473 var acls []*AclCreation
474 acls = append(acls, &AclCreation{resource, acl})
475 request := &CreateAclsRequest{AclCreations: acls}
476
477 b, err := ca.Controller()
478 if err != nil {
479 return err
480 }
481
482 _, err = b.CreateAcls(request)
483 return err
484}
485
486func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
487
488 request := &DescribeAclsRequest{AclFilter: filter}
489
490 b, err := ca.Controller()
491 if err != nil {
492 return nil, err
493 }
494
495 rsp, err := b.DescribeAcls(request)
496 if err != nil {
497 return nil, err
498 }
499
500 var lAcls []ResourceAcls
501 for _, rAcl := range rsp.ResourceAcls {
502 lAcls = append(lAcls, *rAcl)
503 }
504 return lAcls, nil
505}
506
507func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
508 var filters []*AclFilter
509 filters = append(filters, &filter)
510 request := &DeleteAclsRequest{Filters: filters}
511
512 b, err := ca.Controller()
513 if err != nil {
514 return nil, err
515 }
516
517 rsp, err := b.DeleteAcls(request)
518 if err != nil {
519 return nil, err
520 }
521
522 var mAcls []MatchingAcl
523 for _, fr := range rsp.FilterResponses {
524 for _, mACL := range fr.MatchingAcls {
525 mAcls = append(mAcls, *mACL)
526 }
527
528 }
529 return mAcls, nil
530}
531
532func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
533 groupsPerBroker := make(map[*Broker][]string)
534
535 for _, group := range groups {
536 controller, err := ca.client.Coordinator(group)
537 if err != nil {
538 return nil, err
539 }
540 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
541
542 }
543
544 for broker, brokerGroups := range groupsPerBroker {
545 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
546 Groups: brokerGroups,
547 })
548 if err != nil {
549 return nil, err
550 }
551
552 result = append(result, response.Groups...)
553 }
554 return result, nil
555}
556
557func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
558 allGroups = make(map[string]string)
559
560 // Query brokers in parallel, since we have to query *all* brokers
561 brokers := ca.client.Brokers()
562 groupMaps := make(chan map[string]string, len(brokers))
563 errors := make(chan error, len(brokers))
564 wg := sync.WaitGroup{}
565
566 for _, b := range brokers {
567 wg.Add(1)
568 go func(b *Broker, conf *Config) {
569 defer wg.Done()
570 _ = b.Open(conf) // Ensure that broker is opened
571
572 response, err := b.ListGroups(&ListGroupsRequest{})
573 if err != nil {
574 errors <- err
575 return
576 }
577
578 groups := make(map[string]string)
579 for group, typ := range response.Groups {
580 groups[group] = typ
581 }
582
583 groupMaps <- groups
584
585 }(b, ca.conf)
586 }
587
588 wg.Wait()
589 close(groupMaps)
590 close(errors)
591
592 for groupMap := range groupMaps {
593 for group, protocolType := range groupMap {
594 allGroups[group] = protocolType
595 }
596 }
597
598 // Intentionally return only the first error for simplicity
599 err = <-errors
600 return
601}
602
603func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
604 coordinator, err := ca.client.Coordinator(group)
605 if err != nil {
606 return nil, err
607 }
608
609 request := &OffsetFetchRequest{
610 ConsumerGroup: group,
611 partitions: topicPartitions,
612 }
613
Abhilash S.L3b494632019-07-16 15:51:09 +0530614 if ca.conf.Version.IsAtLeast(V0_10_2_0) {
615 request.Version = 2
616 } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
William Kurkianea869482019-04-09 15:16:11 -0400617 request.Version = 1
618 }
619
620 return coordinator.FetchOffset(request)
621}
Abhilash S.L3b494632019-07-16 15:51:09 +0530622
623func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
624 coordinator, err := ca.client.Coordinator(group)
625 if err != nil {
626 return err
627 }
628
629 request := &DeleteGroupsRequest{
630 Groups: []string{group},
631 }
632
633 resp, err := coordinator.DeleteGroups(request)
634 if err != nil {
635 return err
636 }
637
638 groupErr, ok := resp.GroupErrorCodes[group]
639 if !ok {
640 return ErrIncompleteResponse
641 }
642
643 if groupErr != ErrNoError {
644 return groupErr
645 }
646
647 return nil
648}