blob: 18b055a4656a1044482d1791219186ed28a64911 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "errors"
5 "math/rand"
6 "sync"
7)
8
9// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
10// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
11// Methods with stricter requirements will specify the minimum broker version required.
12// You MUST call Close() on a client to avoid leaks
13type ClusterAdmin interface {
14 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
15 // It may take several seconds after CreateTopic returns success for all the brokers
16 // to become aware that the topic has been created. During this time, listTopics
17 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
18 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
19
20 // List the topics available in the cluster with the default options.
21 ListTopics() (map[string]TopicDetail, error)
22
23 // Describe some topics in the cluster
24 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
25
26 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
27 // and for all the brokers to become aware that the topics are gone.
28 // During this time, listTopics may continue to return information about the deleted topic.
29 // If delete.topic.enable is false on the brokers, deleteTopic will mark
30 // the topic for deletion, but not actually delete them.
31 // This operation is supported by brokers with version 0.10.1.0 or higher.
32 DeleteTopic(topic string) error
33
34 // Increase the number of partitions of the topics according to the corresponding values.
35 // If partitions are increased for a topic that has a key, the partition logic or ordering of
36 // the messages will be affected. It may take several seconds after this method returns
37 // success for all the brokers to become aware that the partitions have been created.
38 // During this time, ClusterAdmin#describeTopics may not return information about the
39 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
40 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
41
42 // Delete records whose offset is smaller than the given offset of the corresponding partition.
43 // This operation is supported by brokers with version 0.11.0.0 or higher.
44 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
45
46 // Get the configuration for the specified resources.
47 // The returned configuration includes default values and the Default is true
48 // can be used to distinguish them from user supplied values.
49 // Config entries where ReadOnly is true cannot be updated.
50 // The value of config entries where Sensitive is true is always nil so
51 // sensitive information is not disclosed.
52 // This operation is supported by brokers with version 0.11.0.0 or higher.
53 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
54
55 // Update the configuration for the specified resources with the default options.
56 // This operation is supported by brokers with version 0.11.0.0 or higher.
57 // The resources with their configs (topic is the only resource type with configs
58 // that can be updated currently Updates are not transactional so they may succeed
59 // for some resources while fail for others. The configs for a particular resource are updated automatically.
60 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
61
62 // Creates access control lists (ACLs) which are bound to specific resources.
63 // This operation is not transactional so it may succeed for some ACLs while fail for others.
64 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
65 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
66 CreateACL(resource Resource, acl Acl) error
67
68 // Lists access control lists (ACLs) according to the supplied filter.
69 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
70 // This operation is supported by brokers with version 0.11.0.0 or higher.
71 ListAcls(filter AclFilter) ([]ResourceAcls, error)
72
73 // Deletes access control lists (ACLs) according to the supplied filters.
74 // This operation is not transactional so it may succeed for some ACLs while fail for others.
75 // This operation is supported by brokers with version 0.11.0.0 or higher.
76 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
77
78 // List the consumer groups available in the cluster.
79 ListConsumerGroups() (map[string]string, error)
80
81 // Describe the given consumer group
82 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
83
84 // List the consumer group offsets available in the cluster.
85 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
86
87 // Get information about the nodes in the cluster
88 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
89
90 // Close shuts down the admin and closes underlying client.
91 Close() error
92}
93
94type clusterAdmin struct {
95 client Client
96 conf *Config
97}
98
99// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
100func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
101 client, err := NewClient(addrs, conf)
102 if err != nil {
103 return nil, err
104 }
105
106 //make sure we can retrieve the controller
107 _, err = client.Controller()
108 if err != nil {
109 return nil, err
110 }
111
112 ca := &clusterAdmin{
113 client: client,
114 conf: client.Config(),
115 }
116 return ca, nil
117}
118
119func (ca *clusterAdmin) Close() error {
120 return ca.client.Close()
121}
122
123func (ca *clusterAdmin) Controller() (*Broker, error) {
124 return ca.client.Controller()
125}
126
127func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
128
129 if topic == "" {
130 return ErrInvalidTopic
131 }
132
133 if detail == nil {
134 return errors.New("You must specify topic details")
135 }
136
137 topicDetails := make(map[string]*TopicDetail)
138 topicDetails[topic] = detail
139
140 request := &CreateTopicsRequest{
141 TopicDetails: topicDetails,
142 ValidateOnly: validateOnly,
143 Timeout: ca.conf.Admin.Timeout,
144 }
145
146 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
147 request.Version = 1
148 }
149 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
150 request.Version = 2
151 }
152
153 b, err := ca.Controller()
154 if err != nil {
155 return err
156 }
157
158 rsp, err := b.CreateTopics(request)
159 if err != nil {
160 return err
161 }
162
163 topicErr, ok := rsp.TopicErrors[topic]
164 if !ok {
165 return ErrIncompleteResponse
166 }
167
168 if topicErr.Err != ErrNoError {
169 return topicErr.Err
170 }
171
172 return nil
173}
174
175func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
176 controller, err := ca.Controller()
177 if err != nil {
178 return nil, err
179 }
180
181 request := &MetadataRequest{
182 Topics: topics,
183 AllowAutoTopicCreation: false,
184 }
185
186 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
187 request.Version = 4
188 }
189
190 response, err := controller.GetMetadata(request)
191 if err != nil {
192 return nil, err
193 }
194 return response.Topics, nil
195}
196
197func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
198 controller, err := ca.Controller()
199 if err != nil {
200 return nil, int32(0), err
201 }
202
203 request := &MetadataRequest{
204 Topics: []string{},
205 }
206
207 response, err := controller.GetMetadata(request)
208 if err != nil {
209 return nil, int32(0), err
210 }
211
212 return response.Brokers, response.ControllerID, nil
213}
214
215func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
216 brokers := ca.client.Brokers()
217 if len(brokers) > 0 {
218 index := rand.Intn(len(brokers))
219 return brokers[index], nil
220 }
221 return nil, errors.New("no available broker")
222}
223
224func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
225 // In order to build TopicDetails we need to first get the list of all
226 // topics using a MetadataRequest and then get their configs using a
227 // DescribeConfigsRequest request. To avoid sending many requests to the
228 // broker, we use a single DescribeConfigsRequest.
229
230 // Send the all-topic MetadataRequest
231 b, err := ca.findAnyBroker()
232 if err != nil {
233 return nil, err
234 }
235 _ = b.Open(ca.client.Config())
236
237 metadataReq := &MetadataRequest{}
238 metadataResp, err := b.GetMetadata(metadataReq)
239 if err != nil {
240 return nil, err
241 }
242
243 topicsDetailsMap := make(map[string]TopicDetail)
244
245 var describeConfigsResources []*ConfigResource
246
247 for _, topic := range metadataResp.Topics {
248 topicDetails := TopicDetail{
249 NumPartitions: int32(len(topic.Partitions)),
250 }
251 if len(topic.Partitions) > 0 {
252 topicDetails.ReplicaAssignment = map[int32][]int32{}
253 for _, partition := range topic.Partitions {
254 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
255 }
256 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
257 }
258 topicsDetailsMap[topic.Name] = topicDetails
259
260 // we populate the resources we want to describe from the MetadataResponse
261 topicResource := ConfigResource{
262 Type: TopicResource,
263 Name: topic.Name,
264 }
265 describeConfigsResources = append(describeConfigsResources, &topicResource)
266 }
267
268 // Send the DescribeConfigsRequest
269 describeConfigsReq := &DescribeConfigsRequest{
270 Resources: describeConfigsResources,
271 }
272 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
273 if err != nil {
274 return nil, err
275 }
276
277 for _, resource := range describeConfigsResp.Resources {
278 topicDetails := topicsDetailsMap[resource.Name]
279 topicDetails.ConfigEntries = make(map[string]*string)
280
281 for _, entry := range resource.Configs {
282 // only include non-default non-sensitive config
283 // (don't actually think topic config will ever be sensitive)
284 if entry.Default || entry.Sensitive {
285 continue
286 }
287 topicDetails.ConfigEntries[entry.Name] = &entry.Value
288 }
289
290 topicsDetailsMap[resource.Name] = topicDetails
291 }
292
293 return topicsDetailsMap, nil
294}
295
296func (ca *clusterAdmin) DeleteTopic(topic string) error {
297
298 if topic == "" {
299 return ErrInvalidTopic
300 }
301
302 request := &DeleteTopicsRequest{
303 Topics: []string{topic},
304 Timeout: ca.conf.Admin.Timeout,
305 }
306
307 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
308 request.Version = 1
309 }
310
311 b, err := ca.Controller()
312 if err != nil {
313 return err
314 }
315
316 rsp, err := b.DeleteTopics(request)
317 if err != nil {
318 return err
319 }
320
321 topicErr, ok := rsp.TopicErrorCodes[topic]
322 if !ok {
323 return ErrIncompleteResponse
324 }
325
326 if topicErr != ErrNoError {
327 return topicErr
328 }
329 return nil
330}
331
332func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
333 if topic == "" {
334 return ErrInvalidTopic
335 }
336
337 topicPartitions := make(map[string]*TopicPartition)
338 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
339
340 request := &CreatePartitionsRequest{
341 TopicPartitions: topicPartitions,
342 Timeout: ca.conf.Admin.Timeout,
343 }
344
345 b, err := ca.Controller()
346 if err != nil {
347 return err
348 }
349
350 rsp, err := b.CreatePartitions(request)
351 if err != nil {
352 return err
353 }
354
355 topicErr, ok := rsp.TopicPartitionErrors[topic]
356 if !ok {
357 return ErrIncompleteResponse
358 }
359
360 if topicErr.Err != ErrNoError {
361 return topicErr.Err
362 }
363
364 return nil
365}
366
367func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
368
369 if topic == "" {
370 return ErrInvalidTopic
371 }
372
373 topics := make(map[string]*DeleteRecordsRequestTopic)
374 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: partitionOffsets}
375 request := &DeleteRecordsRequest{
376 Topics: topics,
377 Timeout: ca.conf.Admin.Timeout,
378 }
379
380 b, err := ca.Controller()
381 if err != nil {
382 return err
383 }
384
385 rsp, err := b.DeleteRecords(request)
386 if err != nil {
387 return err
388 }
389
390 _, ok := rsp.Topics[topic]
391 if !ok {
392 return ErrIncompleteResponse
393 }
394
395 //todo since we are dealing with couple of partitions it would be good if we return slice of errors
396 //for each partition instead of one error
397 return nil
398}
399
400func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
401
402 var entries []ConfigEntry
403 var resources []*ConfigResource
404 resources = append(resources, &resource)
405
406 request := &DescribeConfigsRequest{
407 Resources: resources,
408 }
409
410 b, err := ca.Controller()
411 if err != nil {
412 return nil, err
413 }
414
415 rsp, err := b.DescribeConfigs(request)
416 if err != nil {
417 return nil, err
418 }
419
420 for _, rspResource := range rsp.Resources {
421 if rspResource.Name == resource.Name {
422 if rspResource.ErrorMsg != "" {
423 return nil, errors.New(rspResource.ErrorMsg)
424 }
425 for _, cfgEntry := range rspResource.Configs {
426 entries = append(entries, *cfgEntry)
427 }
428 }
429 }
430 return entries, nil
431}
432
433func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
434
435 var resources []*AlterConfigsResource
436 resources = append(resources, &AlterConfigsResource{
437 Type: resourceType,
438 Name: name,
439 ConfigEntries: entries,
440 })
441
442 request := &AlterConfigsRequest{
443 Resources: resources,
444 ValidateOnly: validateOnly,
445 }
446
447 b, err := ca.Controller()
448 if err != nil {
449 return err
450 }
451
452 rsp, err := b.AlterConfigs(request)
453 if err != nil {
454 return err
455 }
456
457 for _, rspResource := range rsp.Resources {
458 if rspResource.Name == name {
459 if rspResource.ErrorMsg != "" {
460 return errors.New(rspResource.ErrorMsg)
461 }
462 }
463 }
464 return nil
465}
466
467func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
468 var acls []*AclCreation
469 acls = append(acls, &AclCreation{resource, acl})
470 request := &CreateAclsRequest{AclCreations: acls}
471
472 b, err := ca.Controller()
473 if err != nil {
474 return err
475 }
476
477 _, err = b.CreateAcls(request)
478 return err
479}
480
481func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
482
483 request := &DescribeAclsRequest{AclFilter: filter}
484
485 b, err := ca.Controller()
486 if err != nil {
487 return nil, err
488 }
489
490 rsp, err := b.DescribeAcls(request)
491 if err != nil {
492 return nil, err
493 }
494
495 var lAcls []ResourceAcls
496 for _, rAcl := range rsp.ResourceAcls {
497 lAcls = append(lAcls, *rAcl)
498 }
499 return lAcls, nil
500}
501
502func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
503 var filters []*AclFilter
504 filters = append(filters, &filter)
505 request := &DeleteAclsRequest{Filters: filters}
506
507 b, err := ca.Controller()
508 if err != nil {
509 return nil, err
510 }
511
512 rsp, err := b.DeleteAcls(request)
513 if err != nil {
514 return nil, err
515 }
516
517 var mAcls []MatchingAcl
518 for _, fr := range rsp.FilterResponses {
519 for _, mACL := range fr.MatchingAcls {
520 mAcls = append(mAcls, *mACL)
521 }
522
523 }
524 return mAcls, nil
525}
526
527func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
528 groupsPerBroker := make(map[*Broker][]string)
529
530 for _, group := range groups {
531 controller, err := ca.client.Coordinator(group)
532 if err != nil {
533 return nil, err
534 }
535 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
536
537 }
538
539 for broker, brokerGroups := range groupsPerBroker {
540 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
541 Groups: brokerGroups,
542 })
543 if err != nil {
544 return nil, err
545 }
546
547 result = append(result, response.Groups...)
548 }
549 return result, nil
550}
551
552func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
553 allGroups = make(map[string]string)
554
555 // Query brokers in parallel, since we have to query *all* brokers
556 brokers := ca.client.Brokers()
557 groupMaps := make(chan map[string]string, len(brokers))
558 errors := make(chan error, len(brokers))
559 wg := sync.WaitGroup{}
560
561 for _, b := range brokers {
562 wg.Add(1)
563 go func(b *Broker, conf *Config) {
564 defer wg.Done()
565 _ = b.Open(conf) // Ensure that broker is opened
566
567 response, err := b.ListGroups(&ListGroupsRequest{})
568 if err != nil {
569 errors <- err
570 return
571 }
572
573 groups := make(map[string]string)
574 for group, typ := range response.Groups {
575 groups[group] = typ
576 }
577
578 groupMaps <- groups
579
580 }(b, ca.conf)
581 }
582
583 wg.Wait()
584 close(groupMaps)
585 close(errors)
586
587 for groupMap := range groupMaps {
588 for group, protocolType := range groupMap {
589 allGroups[group] = protocolType
590 }
591 }
592
593 // Intentionally return only the first error for simplicity
594 err = <-errors
595 return
596}
597
598func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
599 coordinator, err := ca.client.Coordinator(group)
600 if err != nil {
601 return nil, err
602 }
603
604 request := &OffsetFetchRequest{
605 ConsumerGroup: group,
606 partitions: topicPartitions,
607 }
608
609 if ca.conf.Version.IsAtLeast(V0_8_2_2) {
610 request.Version = 1
611 }
612
613 return coordinator.FetchOffset(request)
614}