blob: e7a67dac8b6ef68fe39db3955b46ac4e392978b9 [file] [log] [blame]
Takahiro Suzuki241c10e2020-12-17 20:17:57 +09001package cluster
2
3import (
4 "sort"
5 "sync"
6 "sync/atomic"
7 "time"
8
9 "github.com/Shopify/sarama"
10)
11
12// Consumer is a cluster group consumer
13type Consumer struct {
14 client *Client
15 ownClient bool
16
17 consumer sarama.Consumer
18 subs *partitionMap
19
20 consumerID string
21 groupID string
22
23 memberID string
24 generationID int32
25 membershipMu sync.RWMutex
26
27 coreTopics []string
28 extraTopics []string
29
30 dying, dead chan none
31 closeOnce sync.Once
32
33 consuming int32
34 messages chan *sarama.ConsumerMessage
35 errors chan error
36 partitions chan PartitionConsumer
37 notifications chan *Notification
38
39 commitMu sync.Mutex
40}
41
42// NewConsumer initializes a new consumer
43func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
44 client, err := NewClient(addrs, config)
45 if err != nil {
46 return nil, err
47 }
48
49 consumer, err := NewConsumerFromClient(client, groupID, topics)
50 if err != nil {
51 return nil, err
52 }
53 consumer.ownClient = true
54 return consumer, nil
55}
56
57// NewConsumerFromClient initializes a new consumer from an existing client.
58//
59// Please note that clients cannot be shared between consumers (due to Kafka internals),
60// they can only be re-used which requires the user to call Close() on the first consumer
61// before using this method again to initialize another one. Attempts to use a client with
62// more than one consumer at a time will return errors.
63func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
64 if !client.claim() {
65 return nil, errClientInUse
66 }
67
68 consumer, err := sarama.NewConsumerFromClient(client.Client)
69 if err != nil {
70 client.release()
71 return nil, err
72 }
73
74 sort.Strings(topics)
75 c := &Consumer{
76 client: client,
77 consumer: consumer,
78 subs: newPartitionMap(),
79 groupID: groupID,
80
81 coreTopics: topics,
82
83 dying: make(chan none),
84 dead: make(chan none),
85
86 messages: make(chan *sarama.ConsumerMessage),
87 errors: make(chan error, client.config.ChannelBufferSize),
88 partitions: make(chan PartitionConsumer, 1),
89 notifications: make(chan *Notification),
90 }
91 if err := c.client.RefreshCoordinator(groupID); err != nil {
92 client.release()
93 return nil, err
94 }
95
96 go c.mainLoop()
97 return c, nil
98}
99
100// Messages returns the read channel for the messages that are returned by
101// the broker.
102//
103// This channel will only return if Config.Group.Mode option is set to
104// ConsumerModeMultiplex (default).
105func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
106
107// Partitions returns the read channels for individual partitions of this broker.
108//
109// This will channel will only return if Config.Group.Mode option is set to
110// ConsumerModePartitions.
111//
112// The Partitions() channel must be listened to for the life of this consumer;
113// when a rebalance happens old partitions will be closed (naturally come to
114// completion) and new ones will be emitted. The returned channel will only close
115// when the consumer is completely shut down.
116func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
117
118// Errors returns a read channel of errors that occur during offset management, if
119// enabled. By default, errors are logged and not returned over this channel. If
120// you want to implement any custom error handling, set your config's
121// Consumer.Return.Errors setting to true, and read from this channel.
122func (c *Consumer) Errors() <-chan error { return c.errors }
123
124// Notifications returns a channel of Notifications that occur during consumer
125// rebalancing. Notifications will only be emitted over this channel, if your config's
126// Group.Return.Notifications setting to true.
127func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
128
129// HighWaterMarks returns the current high water marks for each topic and partition
130// Consistency between partitions is not guaranteed since high water marks are updated separately.
131func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
132
133// MarkOffset marks the provided message as processed, alongside a metadata string
134// that represents the state of the partition consumer at that point in time. The
135// metadata string can be used by another consumer to restore that state, so it
136// can resume consumption.
137//
138// Note: calling MarkOffset does not necessarily commit the offset to the backend
139// store immediately for efficiency reasons, and it may never be committed if
140// your application crashes. This means that you may end up processing the same
141// message twice, and your processing should ideally be idempotent.
142func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
143 if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
144 sub.MarkOffset(msg.Offset, metadata)
145 }
146}
147
148// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
149// See MarkOffset for additional explanation.
150func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
151 if sub := c.subs.Fetch(topic, partition); sub != nil {
152 sub.MarkOffset(offset, metadata)
153 }
154}
155
156// MarkOffsets marks stashed offsets as processed.
157// See MarkOffset for additional explanation.
158func (c *Consumer) MarkOffsets(s *OffsetStash) {
159 s.mu.Lock()
160 defer s.mu.Unlock()
161
162 for tp, info := range s.offsets {
163 if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
164 sub.MarkOffset(info.Offset, info.Metadata)
165 }
166 delete(s.offsets, tp)
167 }
168}
169
170// ResetOffsets marks the provided message as processed, alongside a metadata string
171// that represents the state of the partition consumer at that point in time. The
172// metadata string can be used by another consumer to restore that state, so it
173// can resume consumption.
174//
175// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
176func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
177 if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
178 sub.ResetOffset(msg.Offset, metadata)
179 }
180}
181
182// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
183// See ResetOffset for additional explanation.
184func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
185 sub := c.subs.Fetch(topic, partition)
186 if sub != nil {
187 sub.ResetOffset(offset, metadata)
188 }
189}
190
191// ResetOffsets marks stashed offsets as processed.
192// See ResetOffset for additional explanation.
193func (c *Consumer) ResetOffsets(s *OffsetStash) {
194 s.mu.Lock()
195 defer s.mu.Unlock()
196
197 for tp, info := range s.offsets {
198 if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
199 sub.ResetOffset(info.Offset, info.Metadata)
200 }
201 delete(s.offsets, tp)
202 }
203}
204
205// Subscriptions returns the consumed topics and partitions
206func (c *Consumer) Subscriptions() map[string][]int32 {
207 return c.subs.Info()
208}
209
210// CommitOffsets allows to manually commit previously marked offsets. By default there is no
211// need to call this function as the consumer will commit offsets automatically
212// using the Config.Consumer.Offsets.CommitInterval setting.
213//
214// Please be aware that calling this function during an internal rebalance cycle may return
215// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
216func (c *Consumer) CommitOffsets() error {
217 c.commitMu.Lock()
218 defer c.commitMu.Unlock()
219
220 memberID, generationID := c.membership()
221 req := &sarama.OffsetCommitRequest{
222 Version: 2,
223 ConsumerGroup: c.groupID,
224 ConsumerGroupGeneration: generationID,
225 ConsumerID: memberID,
226 RetentionTime: -1,
227 }
228
229 if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
230 req.RetentionTime = int64(ns / time.Millisecond)
231 }
232
233 snap := c.subs.Snapshot()
234 dirty := false
235 for tp, state := range snap {
236 if state.Dirty {
237 dirty = true
238 req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
239 }
240 }
241 if !dirty {
242 return nil
243 }
244
245 broker, err := c.client.Coordinator(c.groupID)
246 if err != nil {
247 c.closeCoordinator(broker, err)
248 return err
249 }
250
251 resp, err := broker.CommitOffset(req)
252 if err != nil {
253 c.closeCoordinator(broker, err)
254 return err
255 }
256
257 for topic, errs := range resp.Errors {
258 for partition, kerr := range errs {
259 if kerr != sarama.ErrNoError {
260 err = kerr
261 } else if state, ok := snap[topicPartition{topic, partition}]; ok {
262 if sub := c.subs.Fetch(topic, partition); sub != nil {
263 sub.markCommitted(state.Info.Offset)
264 }
265 }
266 }
267 }
268 return err
269}
270
271// Close safely closes the consumer and releases all resources
272func (c *Consumer) Close() (err error) {
273 c.closeOnce.Do(func() {
274 close(c.dying)
275 <-c.dead
276
277 if e := c.release(); e != nil {
278 err = e
279 }
280 if e := c.consumer.Close(); e != nil {
281 err = e
282 }
283 close(c.messages)
284 close(c.errors)
285
286 if e := c.leaveGroup(); e != nil {
287 err = e
288 }
289 close(c.partitions)
290 close(c.notifications)
291
292 // drain
293 for range c.messages {
294 }
295 for range c.errors {
296 }
297 for p := range c.partitions {
298 _ = p.Close()
299 }
300 for range c.notifications {
301 }
302
303 c.client.release()
304 if c.ownClient {
305 if e := c.client.Close(); e != nil {
306 err = e
307 }
308 }
309 })
310 return
311}
312
313func (c *Consumer) mainLoop() {
314 defer close(c.dead)
315 defer atomic.StoreInt32(&c.consuming, 0)
316
317 for {
318 atomic.StoreInt32(&c.consuming, 0)
319
320 // Check if close was requested
321 select {
322 case <-c.dying:
323 return
324 default:
325 }
326
327 // Start next consume cycle
328 c.nextTick()
329 }
330}
331
332func (c *Consumer) nextTick() {
333 // Remember previous subscriptions
334 var notification *Notification
335 if c.client.config.Group.Return.Notifications {
336 notification = newNotification(c.subs.Info())
337 }
338
339 // Refresh coordinator
340 if err := c.refreshCoordinator(); err != nil {
341 c.rebalanceError(err, nil)
342 return
343 }
344
345 // Release subscriptions
346 if err := c.release(); err != nil {
347 c.rebalanceError(err, nil)
348 return
349 }
350
351 // Issue rebalance start notification
352 if c.client.config.Group.Return.Notifications {
353 c.handleNotification(notification)
354 }
355
356 // Rebalance, fetch new subscriptions
357 subs, err := c.rebalance()
358 if err != nil {
359 c.rebalanceError(err, notification)
360 return
361 }
362
363 // Coordinate loops, make sure everything is
364 // stopped on exit
365 tomb := newLoopTomb()
366 defer tomb.Close()
367
368 // Start the heartbeat
369 tomb.Go(c.hbLoop)
370
371 // Subscribe to topic/partitions
372 if err := c.subscribe(tomb, subs); err != nil {
373 c.rebalanceError(err, notification)
374 return
375 }
376
377 // Update/issue notification with new claims
378 if c.client.config.Group.Return.Notifications {
379 notification = notification.success(subs)
380 c.handleNotification(notification)
381 }
382
383 // Start topic watcher loop
384 tomb.Go(c.twLoop)
385
386 // Start consuming and committing offsets
387 tomb.Go(c.cmLoop)
388 atomic.StoreInt32(&c.consuming, 1)
389
390 // Wait for signals
391 select {
392 case <-tomb.Dying():
393 case <-c.dying:
394 }
395}
396
397// heartbeat loop, triggered by the mainLoop
398func (c *Consumer) hbLoop(stopped <-chan none) {
399 ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
400 defer ticker.Stop()
401
402 for {
403 select {
404 case <-ticker.C:
405 switch err := c.heartbeat(); err {
406 case nil, sarama.ErrNoError:
407 case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
408 return
409 default:
410 c.handleError(&Error{Ctx: "heartbeat", error: err})
411 return
412 }
413 case <-stopped:
414 return
415 case <-c.dying:
416 return
417 }
418 }
419}
420
421// topic watcher loop, triggered by the mainLoop
422func (c *Consumer) twLoop(stopped <-chan none) {
423 ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
424 defer ticker.Stop()
425
426 for {
427 select {
428 case <-ticker.C:
429 topics, err := c.client.Topics()
430 if err != nil {
431 c.handleError(&Error{Ctx: "topics", error: err})
432 return
433 }
434
435 for _, topic := range topics {
436 if !c.isKnownCoreTopic(topic) &&
437 !c.isKnownExtraTopic(topic) &&
438 c.isPotentialExtraTopic(topic) {
439 return
440 }
441 }
442 case <-stopped:
443 return
444 case <-c.dying:
445 return
446 }
447 }
448}
449
450// commit loop, triggered by the mainLoop
451func (c *Consumer) cmLoop(stopped <-chan none) {
452 ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
453 defer ticker.Stop()
454
455 for {
456 select {
457 case <-ticker.C:
458 if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
459 c.handleError(&Error{Ctx: "commit", error: err})
460 return
461 }
462 case <-stopped:
463 return
464 case <-c.dying:
465 return
466 }
467 }
468}
469
470func (c *Consumer) rebalanceError(err error, n *Notification) {
471 if n != nil {
472 n.Type = RebalanceError
473 c.handleNotification(n)
474 }
475
476 switch err {
477 case sarama.ErrRebalanceInProgress:
478 default:
479 c.handleError(&Error{Ctx: "rebalance", error: err})
480 }
481
482 select {
483 case <-c.dying:
484 case <-time.After(c.client.config.Metadata.Retry.Backoff):
485 }
486}
487
488func (c *Consumer) handleNotification(n *Notification) {
489 if c.client.config.Group.Return.Notifications {
490 select {
491 case c.notifications <- n:
492 case <-c.dying:
493 return
494 }
495 }
496}
497
498func (c *Consumer) handleError(e *Error) {
499 if c.client.config.Consumer.Return.Errors {
500 select {
501 case c.errors <- e:
502 case <-c.dying:
503 return
504 }
505 } else {
506 sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
507 }
508}
509
510// Releases the consumer and commits offsets, called from rebalance() and Close()
511func (c *Consumer) release() (err error) {
512 // Stop all consumers
513 c.subs.Stop()
514
515 // Clear subscriptions on exit
516 defer c.subs.Clear()
517
518 // Wait for messages to be processed
519 timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
520 defer timeout.Stop()
521
522 select {
523 case <-c.dying:
524 case <-timeout.C:
525 }
526
527 // Commit offsets, continue on errors
528 if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
529 err = e
530 }
531
532 return
533}
534
535// --------------------------------------------------------------------
536
537// Performs a heartbeat, part of the mainLoop()
538func (c *Consumer) heartbeat() error {
539 broker, err := c.client.Coordinator(c.groupID)
540 if err != nil {
541 c.closeCoordinator(broker, err)
542 return err
543 }
544
545 memberID, generationID := c.membership()
546 resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
547 GroupId: c.groupID,
548 MemberId: memberID,
549 GenerationId: generationID,
550 })
551 if err != nil {
552 c.closeCoordinator(broker, err)
553 return err
554 }
555 return resp.Err
556}
557
558// Performs a rebalance, part of the mainLoop()
559func (c *Consumer) rebalance() (map[string][]int32, error) {
560 memberID, _ := c.membership()
561 sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
562
563 allTopics, err := c.client.Topics()
564 if err != nil {
565 return nil, err
566 }
567 c.extraTopics = c.selectExtraTopics(allTopics)
568 sort.Strings(c.extraTopics)
569
570 // Re-join consumer group
571 strategy, err := c.joinGroup()
572 switch {
573 case err == sarama.ErrUnknownMemberId:
574 c.membershipMu.Lock()
575 c.memberID = ""
576 c.membershipMu.Unlock()
577 return nil, err
578 case err != nil:
579 return nil, err
580 }
581
582 // Sync consumer group state, fetch subscriptions
583 subs, err := c.syncGroup(strategy)
584 switch {
585 case err == sarama.ErrRebalanceInProgress:
586 return nil, err
587 case err != nil:
588 _ = c.leaveGroup()
589 return nil, err
590 }
591 return subs, nil
592}
593
594// Performs the subscription, part of the mainLoop()
595func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
596 // fetch offsets
597 offsets, err := c.fetchOffsets(subs)
598 if err != nil {
599 _ = c.leaveGroup()
600 return err
601 }
602
603 // create consumers in parallel
604 var mu sync.Mutex
605 var wg sync.WaitGroup
606
607 for topic, partitions := range subs {
608 for _, partition := range partitions {
609 wg.Add(1)
610
611 info := offsets[topic][partition]
612 go func(topic string, partition int32) {
613 if e := c.createConsumer(tomb, topic, partition, info); e != nil {
614 mu.Lock()
615 err = e
616 mu.Unlock()
617 }
618 wg.Done()
619 }(topic, partition)
620 }
621 }
622 wg.Wait()
623
624 if err != nil {
625 _ = c.release()
626 _ = c.leaveGroup()
627 }
628 return err
629}
630
631// --------------------------------------------------------------------
632
633// Send a request to the broker to join group on rebalance()
634func (c *Consumer) joinGroup() (*balancer, error) {
635 memberID, _ := c.membership()
636 req := &sarama.JoinGroupRequest{
637 GroupId: c.groupID,
638 MemberId: memberID,
639 SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
640 ProtocolType: "consumer",
641 }
642
643 meta := &sarama.ConsumerGroupMemberMetadata{
644 Version: 1,
645 Topics: append(c.coreTopics, c.extraTopics...),
646 UserData: c.client.config.Group.Member.UserData,
647 }
648 err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
649 if err != nil {
650 return nil, err
651 }
652 err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
653 if err != nil {
654 return nil, err
655 }
656
657 broker, err := c.client.Coordinator(c.groupID)
658 if err != nil {
659 c.closeCoordinator(broker, err)
660 return nil, err
661 }
662
663 resp, err := broker.JoinGroup(req)
664 if err != nil {
665 c.closeCoordinator(broker, err)
666 return nil, err
667 } else if resp.Err != sarama.ErrNoError {
668 c.closeCoordinator(broker, resp.Err)
669 return nil, resp.Err
670 }
671
672 var strategy *balancer
673 if resp.LeaderId == resp.MemberId {
674 members, err := resp.GetMembers()
675 if err != nil {
676 return nil, err
677 }
678
679 strategy, err = newBalancerFromMeta(c.client, members)
680 if err != nil {
681 return nil, err
682 }
683 }
684
685 c.membershipMu.Lock()
686 c.memberID = resp.MemberId
687 c.generationID = resp.GenerationId
688 c.membershipMu.Unlock()
689
690 return strategy, nil
691}
692
693// Send a request to the broker to sync the group on rebalance().
694// Returns a list of topics and partitions to consume.
695func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
696 memberID, generationID := c.membership()
697 req := &sarama.SyncGroupRequest{
698 GroupId: c.groupID,
699 MemberId: memberID,
700 GenerationId: generationID,
701 }
702
703 if strategy != nil {
704 for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
705 if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
706 Topics: topics,
707 }); err != nil {
708 return nil, err
709 }
710 }
711 }
712
713 broker, err := c.client.Coordinator(c.groupID)
714 if err != nil {
715 c.closeCoordinator(broker, err)
716 return nil, err
717 }
718
719 resp, err := broker.SyncGroup(req)
720 if err != nil {
721 c.closeCoordinator(broker, err)
722 return nil, err
723 } else if resp.Err != sarama.ErrNoError {
724 c.closeCoordinator(broker, resp.Err)
725 return nil, resp.Err
726 }
727
728 // Return if there is nothing to subscribe to
729 if len(resp.MemberAssignment) == 0 {
730 return nil, nil
731 }
732
733 // Get assigned subscriptions
734 members, err := resp.GetMemberAssignment()
735 if err != nil {
736 return nil, err
737 }
738
739 // Sort partitions, for each topic
740 for topic := range members.Topics {
741 sort.Sort(int32Slice(members.Topics[topic]))
742 }
743 return members.Topics, nil
744}
745
746// Fetches latest committed offsets for all subscriptions
747func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
748 offsets := make(map[string]map[int32]offsetInfo, len(subs))
749 req := &sarama.OffsetFetchRequest{
750 Version: 1,
751 ConsumerGroup: c.groupID,
752 }
753
754 for topic, partitions := range subs {
755 offsets[topic] = make(map[int32]offsetInfo, len(partitions))
756 for _, partition := range partitions {
757 offsets[topic][partition] = offsetInfo{Offset: -1}
758 req.AddPartition(topic, partition)
759 }
760 }
761
762 broker, err := c.client.Coordinator(c.groupID)
763 if err != nil {
764 c.closeCoordinator(broker, err)
765 return nil, err
766 }
767
768 resp, err := broker.FetchOffset(req)
769 if err != nil {
770 c.closeCoordinator(broker, err)
771 return nil, err
772 }
773
774 for topic, partitions := range subs {
775 for _, partition := range partitions {
776 block := resp.GetBlock(topic, partition)
777 if block == nil {
778 return nil, sarama.ErrIncompleteResponse
779 }
780
781 if block.Err == sarama.ErrNoError {
782 offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
783 } else {
784 return nil, block.Err
785 }
786 }
787 }
788 return offsets, nil
789}
790
791// Send a request to the broker to leave the group on failes rebalance() and on Close()
792func (c *Consumer) leaveGroup() error {
793 broker, err := c.client.Coordinator(c.groupID)
794 if err != nil {
795 c.closeCoordinator(broker, err)
796 return err
797 }
798
799 memberID, _ := c.membership()
800 if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
801 GroupId: c.groupID,
802 MemberId: memberID,
803 }); err != nil {
804 c.closeCoordinator(broker, err)
805 }
806 return err
807}
808
809// --------------------------------------------------------------------
810
811func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
812 memberID, _ := c.membership()
813 sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
814
815 // Create partitionConsumer
816 pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
817 if err != nil {
818 return err
819 }
820
821 // Store in subscriptions
822 c.subs.Store(topic, partition, pc)
823
824 // Start partition consumer goroutine
825 tomb.Go(func(stopper <-chan none) {
826 if c.client.config.Group.Mode == ConsumerModePartitions {
827 pc.waitFor(stopper, c.errors)
828 } else {
829 pc.multiplex(stopper, c.messages, c.errors)
830 }
831 })
832
833 if c.client.config.Group.Mode == ConsumerModePartitions {
834 c.partitions <- pc
835 }
836 return nil
837}
838
839func (c *Consumer) commitOffsetsWithRetry(retries int) error {
840 err := c.CommitOffsets()
841 if err != nil && retries > 0 {
842 return c.commitOffsetsWithRetry(retries - 1)
843 }
844 return err
845}
846
847func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
848 if broker != nil {
849 _ = broker.Close()
850 }
851
852 switch err {
853 case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
854 _ = c.client.RefreshCoordinator(c.groupID)
855 }
856}
857
858func (c *Consumer) selectExtraTopics(allTopics []string) []string {
859 extra := allTopics[:0]
860 for _, topic := range allTopics {
861 if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
862 extra = append(extra, topic)
863 }
864 }
865 return extra
866}
867
868func (c *Consumer) isKnownCoreTopic(topic string) bool {
869 pos := sort.SearchStrings(c.coreTopics, topic)
870 return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
871}
872
873func (c *Consumer) isKnownExtraTopic(topic string) bool {
874 pos := sort.SearchStrings(c.extraTopics, topic)
875 return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
876}
877
878func (c *Consumer) isPotentialExtraTopic(topic string) bool {
879 rx := c.client.config.Group.Topics
880 if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
881 return false
882 }
883 if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
884 return true
885 }
886 return false
887}
888
889func (c *Consumer) refreshCoordinator() error {
890 if err := c.refreshMetadata(); err != nil {
891 return err
892 }
893 return c.client.RefreshCoordinator(c.groupID)
894}
895
896func (c *Consumer) refreshMetadata() (err error) {
897 if c.client.config.Metadata.Full {
898 err = c.client.RefreshMetadata()
899 } else {
900 var topics []string
901 if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
902 err = c.client.RefreshMetadata(topics...)
903 }
904 }
905
906 // maybe we didn't have authorization to describe all topics
907 switch err {
908 case sarama.ErrTopicAuthorizationFailed:
909 err = c.client.RefreshMetadata(c.coreTopics...)
910 }
911 return
912}
913
914func (c *Consumer) membership() (memberID string, generationID int32) {
915 c.membershipMu.RLock()
916 memberID, generationID = c.memberID, c.generationID
917 c.membershipMu.RUnlock()
918 return
919}