blob: dd634846d9e87874921be431be8f67343a0faebb [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package sarama
2
3import (
4 "errors"
5 "fmt"
6 "math/rand"
7 "strconv"
8 "sync"
9 "time"
10)
11
12// ClusterAdmin is the administrative client for Kafka, which supports managing and inspecting topics,
13// brokers, configurations and ACLs. The minimum broker version required is 0.10.0.0.
14// Methods with stricter requirements will specify the minimum broker version required.
15// You MUST call Close() on a client to avoid leaks
16type ClusterAdmin interface {
17 // Creates a new topic. This operation is supported by brokers with version 0.10.1.0 or higher.
18 // It may take several seconds after CreateTopic returns success for all the brokers
19 // to become aware that the topic has been created. During this time, listTopics
20 // may not return information about the new topic.The validateOnly option is supported from version 0.10.2.0.
21 CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error
22
23 // List the topics available in the cluster with the default options.
24 ListTopics() (map[string]TopicDetail, error)
25
26 // Describe some topics in the cluster.
27 DescribeTopics(topics []string) (metadata []*TopicMetadata, err error)
28
29 // Delete a topic. It may take several seconds after the DeleteTopic to returns success
30 // and for all the brokers to become aware that the topics are gone.
31 // During this time, listTopics may continue to return information about the deleted topic.
32 // If delete.topic.enable is false on the brokers, deleteTopic will mark
33 // the topic for deletion, but not actually delete them.
34 // This operation is supported by brokers with version 0.10.1.0 or higher.
35 DeleteTopic(topic string) error
36
37 // Increase the number of partitions of the topics according to the corresponding values.
38 // If partitions are increased for a topic that has a key, the partition logic or ordering of
39 // the messages will be affected. It may take several seconds after this method returns
40 // success for all the brokers to become aware that the partitions have been created.
41 // During this time, ClusterAdmin#describeTopics may not return information about the
42 // new partitions. This operation is supported by brokers with version 1.0.0 or higher.
43 CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error
44
45 // Delete records whose offset is smaller than the given offset of the corresponding partition.
46 // This operation is supported by brokers with version 0.11.0.0 or higher.
47 DeleteRecords(topic string, partitionOffsets map[int32]int64) error
48
49 // Get the configuration for the specified resources.
50 // The returned configuration includes default values and the Default is true
51 // can be used to distinguish them from user supplied values.
52 // Config entries where ReadOnly is true cannot be updated.
53 // The value of config entries where Sensitive is true is always nil so
54 // sensitive information is not disclosed.
55 // This operation is supported by brokers with version 0.11.0.0 or higher.
56 DescribeConfig(resource ConfigResource) ([]ConfigEntry, error)
57
58 // Update the configuration for the specified resources with the default options.
59 // This operation is supported by brokers with version 0.11.0.0 or higher.
60 // The resources with their configs (topic is the only resource type with configs
61 // that can be updated currently Updates are not transactional so they may succeed
62 // for some resources while fail for others. The configs for a particular resource are updated automatically.
63 AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error
64
65 // Creates access control lists (ACLs) which are bound to specific resources.
66 // This operation is not transactional so it may succeed for some ACLs while fail for others.
67 // If you attempt to add an ACL that duplicates an existing ACL, no error will be raised, but
68 // no changes will be made. This operation is supported by brokers with version 0.11.0.0 or higher.
69 CreateACL(resource Resource, acl Acl) error
70
71 // Lists access control lists (ACLs) according to the supplied filter.
72 // it may take some time for changes made by createAcls or deleteAcls to be reflected in the output of ListAcls
73 // This operation is supported by brokers with version 0.11.0.0 or higher.
74 ListAcls(filter AclFilter) ([]ResourceAcls, error)
75
76 // Deletes access control lists (ACLs) according to the supplied filters.
77 // This operation is not transactional so it may succeed for some ACLs while fail for others.
78 // This operation is supported by brokers with version 0.11.0.0 or higher.
79 DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error)
80
81 // List the consumer groups available in the cluster.
82 ListConsumerGroups() (map[string]string, error)
83
84 // Describe the given consumer groups.
85 DescribeConsumerGroups(groups []string) ([]*GroupDescription, error)
86
87 // List the consumer group offsets available in the cluster.
88 ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error)
89
90 // Delete a consumer group.
91 DeleteConsumerGroup(group string) error
92
93 // Get information about the nodes in the cluster
94 DescribeCluster() (brokers []*Broker, controllerID int32, err error)
95
96 // Close shuts down the admin and closes underlying client.
97 Close() error
98}
99
100type clusterAdmin struct {
101 client Client
102 conf *Config
103}
104
105// NewClusterAdmin creates a new ClusterAdmin using the given broker addresses and configuration.
106func NewClusterAdmin(addrs []string, conf *Config) (ClusterAdmin, error) {
107 client, err := NewClient(addrs, conf)
108 if err != nil {
109 return nil, err
110 }
111 return NewClusterAdminFromClient(client)
112}
113
114// NewClusterAdminFromClient creates a new ClusterAdmin using the given client.
115// Note that underlying client will also be closed on admin's Close() call.
116func NewClusterAdminFromClient(client Client) (ClusterAdmin, error) {
117 //make sure we can retrieve the controller
118 _, err := client.Controller()
119 if err != nil {
120 return nil, err
121 }
122
123 ca := &clusterAdmin{
124 client: client,
125 conf: client.Config(),
126 }
127 return ca, nil
128}
129
130func (ca *clusterAdmin) Close() error {
131 return ca.client.Close()
132}
133
134func (ca *clusterAdmin) Controller() (*Broker, error) {
135 return ca.client.Controller()
136}
137
138func (ca *clusterAdmin) refreshController() (*Broker, error) {
139 return ca.client.RefreshController()
140}
141
142// isErrNoController returns `true` if the given error type unwraps to an
143// `ErrNotController` response from Kafka
144func isErrNoController(err error) bool {
145 switch e := err.(type) {
146 case *TopicError:
147 return e.Err == ErrNotController
148 case *TopicPartitionError:
149 return e.Err == ErrNotController
150 case KError:
151 return e == ErrNotController
152 }
153 return false
154}
155
156// retryOnError will repeatedly call the given (error-returning) func in the
157// case that its response is non-nil and retriable (as determined by the
158// provided retriable func) up to the maximum number of tries permitted by
159// the admin client configuration
160func (ca *clusterAdmin) retryOnError(retriable func(error) bool, fn func() error) error {
161 var err error
162 for attempt := 0; attempt < ca.conf.Admin.Retry.Max; attempt++ {
163 err = fn()
164 if err == nil || !retriable(err) {
165 return err
166 }
167 Logger.Printf(
168 "admin/request retrying after %dms... (%d attempts remaining)\n",
169 ca.conf.Admin.Retry.Backoff/time.Millisecond, ca.conf.Admin.Retry.Max-attempt)
170 time.Sleep(ca.conf.Admin.Retry.Backoff)
171 continue
172 }
173 return err
174}
175
176func (ca *clusterAdmin) CreateTopic(topic string, detail *TopicDetail, validateOnly bool) error {
177 if topic == "" {
178 return ErrInvalidTopic
179 }
180
181 if detail == nil {
182 return errors.New("you must specify topic details")
183 }
184
185 topicDetails := make(map[string]*TopicDetail)
186 topicDetails[topic] = detail
187
188 request := &CreateTopicsRequest{
189 TopicDetails: topicDetails,
190 ValidateOnly: validateOnly,
191 Timeout: ca.conf.Admin.Timeout,
192 }
193
194 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
195 request.Version = 1
196 }
197 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
198 request.Version = 2
199 }
200
201 return ca.retryOnError(isErrNoController, func() error {
202 b, err := ca.Controller()
203 if err != nil {
204 return err
205 }
206
207 rsp, err := b.CreateTopics(request)
208 if err != nil {
209 return err
210 }
211
212 topicErr, ok := rsp.TopicErrors[topic]
213 if !ok {
214 return ErrIncompleteResponse
215 }
216
217 if topicErr.Err != ErrNoError {
218 if topicErr.Err == ErrNotController {
219 _, _ = ca.refreshController()
220 }
221 return topicErr
222 }
223
224 return nil
225 })
226}
227
228func (ca *clusterAdmin) DescribeTopics(topics []string) (metadata []*TopicMetadata, err error) {
229 controller, err := ca.Controller()
230 if err != nil {
231 return nil, err
232 }
233
234 request := &MetadataRequest{
235 Topics: topics,
236 AllowAutoTopicCreation: false,
237 }
238
239 if ca.conf.Version.IsAtLeast(V1_0_0_0) {
240 request.Version = 5
241 } else if ca.conf.Version.IsAtLeast(V0_11_0_0) {
242 request.Version = 4
243 }
244
245 response, err := controller.GetMetadata(request)
246 if err != nil {
247 return nil, err
248 }
249 return response.Topics, nil
250}
251
252func (ca *clusterAdmin) DescribeCluster() (brokers []*Broker, controllerID int32, err error) {
253 controller, err := ca.Controller()
254 if err != nil {
255 return nil, int32(0), err
256 }
257
258 request := &MetadataRequest{
259 Topics: []string{},
260 }
261
262 if ca.conf.Version.IsAtLeast(V0_10_0_0) {
263 request.Version = 1
264 }
265
266 response, err := controller.GetMetadata(request)
267 if err != nil {
268 return nil, int32(0), err
269 }
270
271 return response.Brokers, response.ControllerID, nil
272}
273
274func (ca *clusterAdmin) findBroker(id int32) (*Broker, error) {
275 brokers := ca.client.Brokers()
276 for _, b := range brokers {
277 if b.ID() == id {
278 return b, nil
279 }
280 }
281 return nil, fmt.Errorf("could not find broker id %d", id)
282}
283
284func (ca *clusterAdmin) findAnyBroker() (*Broker, error) {
285 brokers := ca.client.Brokers()
286 if len(brokers) > 0 {
287 index := rand.Intn(len(brokers))
288 return brokers[index], nil
289 }
290 return nil, errors.New("no available broker")
291}
292
293func (ca *clusterAdmin) ListTopics() (map[string]TopicDetail, error) {
294 // In order to build TopicDetails we need to first get the list of all
295 // topics using a MetadataRequest and then get their configs using a
296 // DescribeConfigsRequest request. To avoid sending many requests to the
297 // broker, we use a single DescribeConfigsRequest.
298
299 // Send the all-topic MetadataRequest
300 b, err := ca.findAnyBroker()
301 if err != nil {
302 return nil, err
303 }
304 _ = b.Open(ca.client.Config())
305
306 metadataReq := &MetadataRequest{}
307 metadataResp, err := b.GetMetadata(metadataReq)
308 if err != nil {
309 return nil, err
310 }
311
312 topicsDetailsMap := make(map[string]TopicDetail)
313
314 var describeConfigsResources []*ConfigResource
315
316 for _, topic := range metadataResp.Topics {
317 topicDetails := TopicDetail{
318 NumPartitions: int32(len(topic.Partitions)),
319 }
320 if len(topic.Partitions) > 0 {
321 topicDetails.ReplicaAssignment = map[int32][]int32{}
322 for _, partition := range topic.Partitions {
323 topicDetails.ReplicaAssignment[partition.ID] = partition.Replicas
324 }
325 topicDetails.ReplicationFactor = int16(len(topic.Partitions[0].Replicas))
326 }
327 topicsDetailsMap[topic.Name] = topicDetails
328
329 // we populate the resources we want to describe from the MetadataResponse
330 topicResource := ConfigResource{
331 Type: TopicResource,
332 Name: topic.Name,
333 }
334 describeConfigsResources = append(describeConfigsResources, &topicResource)
335 }
336
337 // Send the DescribeConfigsRequest
338 describeConfigsReq := &DescribeConfigsRequest{
339 Resources: describeConfigsResources,
340 }
341 describeConfigsResp, err := b.DescribeConfigs(describeConfigsReq)
342 if err != nil {
343 return nil, err
344 }
345
346 for _, resource := range describeConfigsResp.Resources {
347 topicDetails := topicsDetailsMap[resource.Name]
348 topicDetails.ConfigEntries = make(map[string]*string)
349
350 for _, entry := range resource.Configs {
351 // only include non-default non-sensitive config
352 // (don't actually think topic config will ever be sensitive)
353 if entry.Default || entry.Sensitive {
354 continue
355 }
356 topicDetails.ConfigEntries[entry.Name] = &entry.Value
357 }
358
359 topicsDetailsMap[resource.Name] = topicDetails
360 }
361
362 return topicsDetailsMap, nil
363}
364
365func (ca *clusterAdmin) DeleteTopic(topic string) error {
366 if topic == "" {
367 return ErrInvalidTopic
368 }
369
370 request := &DeleteTopicsRequest{
371 Topics: []string{topic},
372 Timeout: ca.conf.Admin.Timeout,
373 }
374
375 if ca.conf.Version.IsAtLeast(V0_11_0_0) {
376 request.Version = 1
377 }
378
379 return ca.retryOnError(isErrNoController, func() error {
380 b, err := ca.Controller()
381 if err != nil {
382 return err
383 }
384
385 rsp, err := b.DeleteTopics(request)
386 if err != nil {
387 return err
388 }
389
390 topicErr, ok := rsp.TopicErrorCodes[topic]
391 if !ok {
392 return ErrIncompleteResponse
393 }
394
395 if topicErr != ErrNoError {
396 if topicErr == ErrNotController {
397 _, _ = ca.refreshController()
398 }
399 return topicErr
400 }
401
402 return nil
403 })
404}
405
406func (ca *clusterAdmin) CreatePartitions(topic string, count int32, assignment [][]int32, validateOnly bool) error {
407 if topic == "" {
408 return ErrInvalidTopic
409 }
410
411 topicPartitions := make(map[string]*TopicPartition)
412 topicPartitions[topic] = &TopicPartition{Count: count, Assignment: assignment}
413
414 request := &CreatePartitionsRequest{
415 TopicPartitions: topicPartitions,
416 Timeout: ca.conf.Admin.Timeout,
417 }
418
419 return ca.retryOnError(isErrNoController, func() error {
420 b, err := ca.Controller()
421 if err != nil {
422 return err
423 }
424
425 rsp, err := b.CreatePartitions(request)
426 if err != nil {
427 return err
428 }
429
430 topicErr, ok := rsp.TopicPartitionErrors[topic]
431 if !ok {
432 return ErrIncompleteResponse
433 }
434
435 if topicErr.Err != ErrNoError {
436 if topicErr.Err == ErrNotController {
437 _, _ = ca.refreshController()
438 }
439 return topicErr
440 }
441
442 return nil
443 })
444}
445
446func (ca *clusterAdmin) DeleteRecords(topic string, partitionOffsets map[int32]int64) error {
447 if topic == "" {
448 return ErrInvalidTopic
449 }
450 partitionPerBroker := make(map[*Broker][]int32)
451 for partition := range partitionOffsets {
452 broker, err := ca.client.Leader(topic, partition)
453 if err != nil {
454 return err
455 }
456 if _, ok := partitionPerBroker[broker]; ok {
457 partitionPerBroker[broker] = append(partitionPerBroker[broker], partition)
458 } else {
459 partitionPerBroker[broker] = []int32{partition}
460 }
461 }
462 errs := make([]error, 0)
463 for broker, partitions := range partitionPerBroker {
464 topics := make(map[string]*DeleteRecordsRequestTopic)
465 recordsToDelete := make(map[int32]int64)
466 for _, p := range partitions {
467 recordsToDelete[p] = partitionOffsets[p]
468 }
469 topics[topic] = &DeleteRecordsRequestTopic{PartitionOffsets: recordsToDelete}
470 request := &DeleteRecordsRequest{
471 Topics: topics,
472 Timeout: ca.conf.Admin.Timeout,
473 }
474
475 rsp, err := broker.DeleteRecords(request)
476 if err != nil {
477 errs = append(errs, err)
478 } else {
479 deleteRecordsResponseTopic, ok := rsp.Topics[topic]
480 if !ok {
481 errs = append(errs, ErrIncompleteResponse)
482 } else {
483 for _, deleteRecordsResponsePartition := range deleteRecordsResponseTopic.Partitions {
484 if deleteRecordsResponsePartition.Err != ErrNoError {
485 errs = append(errs, errors.New(deleteRecordsResponsePartition.Err.Error()))
486 }
487 }
488 }
489 }
490 }
491 if len(errs) > 0 {
492 return ErrDeleteRecords{MultiError{&errs}}
493 }
494 //todo since we are dealing with couple of partitions it would be good if we return slice of errors
495 //for each partition instead of one error
496 return nil
497}
498
499// Returns a bool indicating whether the resource request needs to go to a
500// specific broker
501func dependsOnSpecificNode(resource ConfigResource) bool {
502 return (resource.Type == BrokerResource && resource.Name != "") ||
503 resource.Type == BrokerLoggerResource
504}
505
506func (ca *clusterAdmin) DescribeConfig(resource ConfigResource) ([]ConfigEntry, error) {
507 var entries []ConfigEntry
508 var resources []*ConfigResource
509 resources = append(resources, &resource)
510
511 request := &DescribeConfigsRequest{
512 Resources: resources,
513 }
514
515 if ca.conf.Version.IsAtLeast(V1_1_0_0) {
516 request.Version = 1
517 }
518
519 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
520 request.Version = 2
521 }
522
523 var (
524 b *Broker
525 err error
526 )
527
528 // DescribeConfig of broker/broker logger must be sent to the broker in question
529 if dependsOnSpecificNode(resource) {
530 id, _ := strconv.Atoi(resource.Name)
531 b, err = ca.findBroker(int32(id))
532 } else {
533 b, err = ca.findAnyBroker()
534 }
535 if err != nil {
536 return nil, err
537 }
538
539 _ = b.Open(ca.client.Config())
540 rsp, err := b.DescribeConfigs(request)
541 if err != nil {
542 return nil, err
543 }
544
545 for _, rspResource := range rsp.Resources {
546 if rspResource.Name == resource.Name {
547 if rspResource.ErrorMsg != "" {
548 return nil, errors.New(rspResource.ErrorMsg)
549 }
550 for _, cfgEntry := range rspResource.Configs {
551 entries = append(entries, *cfgEntry)
552 }
553 }
554 }
555 return entries, nil
556}
557
558func (ca *clusterAdmin) AlterConfig(resourceType ConfigResourceType, name string, entries map[string]*string, validateOnly bool) error {
559 var resources []*AlterConfigsResource
560 resources = append(resources, &AlterConfigsResource{
561 Type: resourceType,
562 Name: name,
563 ConfigEntries: entries,
564 })
565
566 request := &AlterConfigsRequest{
567 Resources: resources,
568 ValidateOnly: validateOnly,
569 }
570
571 var (
572 b *Broker
573 err error
574 )
575
576 // AlterConfig of broker/broker logger must be sent to the broker in question
577 if dependsOnSpecificNode(ConfigResource{Name: name, Type: resourceType}) {
578 id, _ := strconv.Atoi(name)
579 b, err = ca.findBroker(int32(id))
580 } else {
581 b, err = ca.findAnyBroker()
582 }
583 if err != nil {
584 return err
585 }
586
587 _ = b.Open(ca.client.Config())
588 rsp, err := b.AlterConfigs(request)
589 if err != nil {
590 return err
591 }
592
593 for _, rspResource := range rsp.Resources {
594 if rspResource.Name == name {
595 if rspResource.ErrorMsg != "" {
596 return errors.New(rspResource.ErrorMsg)
597 }
598 }
599 }
600 return nil
601}
602
603func (ca *clusterAdmin) CreateACL(resource Resource, acl Acl) error {
604 var acls []*AclCreation
605 acls = append(acls, &AclCreation{resource, acl})
606 request := &CreateAclsRequest{AclCreations: acls}
607
608 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
609 request.Version = 1
610 }
611
612 b, err := ca.Controller()
613 if err != nil {
614 return err
615 }
616
617 _, err = b.CreateAcls(request)
618 return err
619}
620
621func (ca *clusterAdmin) ListAcls(filter AclFilter) ([]ResourceAcls, error) {
622 request := &DescribeAclsRequest{AclFilter: filter}
623
624 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
625 request.Version = 1
626 }
627
628 b, err := ca.Controller()
629 if err != nil {
630 return nil, err
631 }
632
633 rsp, err := b.DescribeAcls(request)
634 if err != nil {
635 return nil, err
636 }
637
638 var lAcls []ResourceAcls
639 for _, rAcl := range rsp.ResourceAcls {
640 lAcls = append(lAcls, *rAcl)
641 }
642 return lAcls, nil
643}
644
645func (ca *clusterAdmin) DeleteACL(filter AclFilter, validateOnly bool) ([]MatchingAcl, error) {
646 var filters []*AclFilter
647 filters = append(filters, &filter)
648 request := &DeleteAclsRequest{Filters: filters}
649
650 if ca.conf.Version.IsAtLeast(V2_0_0_0) {
651 request.Version = 1
652 }
653
654 b, err := ca.Controller()
655 if err != nil {
656 return nil, err
657 }
658
659 rsp, err := b.DeleteAcls(request)
660 if err != nil {
661 return nil, err
662 }
663
664 var mAcls []MatchingAcl
665 for _, fr := range rsp.FilterResponses {
666 for _, mACL := range fr.MatchingAcls {
667 mAcls = append(mAcls, *mACL)
668 }
669 }
670 return mAcls, nil
671}
672
673func (ca *clusterAdmin) DescribeConsumerGroups(groups []string) (result []*GroupDescription, err error) {
674 groupsPerBroker := make(map[*Broker][]string)
675
676 for _, group := range groups {
677 controller, err := ca.client.Coordinator(group)
678 if err != nil {
679 return nil, err
680 }
681 groupsPerBroker[controller] = append(groupsPerBroker[controller], group)
682 }
683
684 for broker, brokerGroups := range groupsPerBroker {
685 response, err := broker.DescribeGroups(&DescribeGroupsRequest{
686 Groups: brokerGroups,
687 })
688 if err != nil {
689 return nil, err
690 }
691
692 result = append(result, response.Groups...)
693 }
694 return result, nil
695}
696
697func (ca *clusterAdmin) ListConsumerGroups() (allGroups map[string]string, err error) {
698 allGroups = make(map[string]string)
699
700 // Query brokers in parallel, since we have to query *all* brokers
701 brokers := ca.client.Brokers()
702 groupMaps := make(chan map[string]string, len(brokers))
703 errors := make(chan error, len(brokers))
704 wg := sync.WaitGroup{}
705
706 for _, b := range brokers {
707 wg.Add(1)
708 go func(b *Broker, conf *Config) {
709 defer wg.Done()
710 _ = b.Open(conf) // Ensure that broker is opened
711
712 response, err := b.ListGroups(&ListGroupsRequest{})
713 if err != nil {
714 errors <- err
715 return
716 }
717
718 groups := make(map[string]string)
719 for group, typ := range response.Groups {
720 groups[group] = typ
721 }
722
723 groupMaps <- groups
724 }(b, ca.conf)
725 }
726
727 wg.Wait()
728 close(groupMaps)
729 close(errors)
730
731 for groupMap := range groupMaps {
732 for group, protocolType := range groupMap {
733 allGroups[group] = protocolType
734 }
735 }
736
737 // Intentionally return only the first error for simplicity
738 err = <-errors
739 return
740}
741
742func (ca *clusterAdmin) ListConsumerGroupOffsets(group string, topicPartitions map[string][]int32) (*OffsetFetchResponse, error) {
743 coordinator, err := ca.client.Coordinator(group)
744 if err != nil {
745 return nil, err
746 }
747
748 request := &OffsetFetchRequest{
749 ConsumerGroup: group,
750 partitions: topicPartitions,
751 }
752
753 if ca.conf.Version.IsAtLeast(V0_10_2_0) {
754 request.Version = 2
755 } else if ca.conf.Version.IsAtLeast(V0_8_2_2) {
756 request.Version = 1
757 }
758
759 return coordinator.FetchOffset(request)
760}
761
762func (ca *clusterAdmin) DeleteConsumerGroup(group string) error {
763 coordinator, err := ca.client.Coordinator(group)
764 if err != nil {
765 return err
766 }
767
768 request := &DeleteGroupsRequest{
769 Groups: []string{group},
770 }
771
772 resp, err := coordinator.DeleteGroups(request)
773 if err != nil {
774 return err
775 }
776
777 groupErr, ok := resp.GroupErrorCodes[group]
778 if !ok {
779 return ErrIncompleteResponse
780 }
781
782 if groupErr != ErrNoError {
783 return groupErr
784 }
785
786 return nil
787}