blob: 951f64b3381053a44f6db4ed06141f3ef6a6b1e9 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18 // Consume joins a cluster of consumers for a given list of topics and
19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20 //
21 // The life-cycle of a session is represented by the following steps:
22 //
23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24 // and is assigned their "fair share" of partitions, aka 'claims'.
25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
26 // of the claims and allow any necessary preparation or alteration of state.
27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29 // from concurrent reads/writes.
30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31 // parent context is cancelled or when a server-side rebalance cycle is initiated.
32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33 // to allow the user to perform any final tasks before a rebalance.
34 // 6. Finally, marked offsets are committed one last time before claims are released.
35 //
36 // Please note, that once a rebalance is triggered, sessions must be completed within
37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40 // commit failures.
41 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
42
43 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
44 // By default, errors are logged and not returned over this channel.
45 // If you want to implement any custom error handling, set your config's
46 // Consumer.Return.Errors setting to true, and read from this channel.
47 Errors() <-chan error
48
49 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
50 // this function before the object passes out of scope, as it will otherwise leak memory.
51 Close() error
52}
53
54type consumerGroup struct {
55 client Client
56
57 config *Config
58 consumer Consumer
59 groupID string
60 memberID string
61 errors chan error
62
63 lock sync.Mutex
64 closed chan none
65 closeOnce sync.Once
66
67 userData []byte
68}
69
70// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
71func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
72 client, err := NewClient(addrs, config)
73 if err != nil {
74 return nil, err
75 }
76
77 c, err := newConsumerGroup(groupID, client)
78 if err != nil {
79 _ = client.Close()
80 }
81 return c, err
82}
83
84// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
85// necessary to call Close() on the underlying client when shutting down this consumer.
86// PLEASE NOTE: consumer groups can only re-use but not share clients.
87func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
88 // For clients passed in by the client, ensure we don't
89 // call Close() on it.
90 cli := &nopCloserClient{client}
91 return newConsumerGroup(groupID, cli)
92}
93
94func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
95 config := client.Config()
96 if !config.Version.IsAtLeast(V0_10_2_0) {
97 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
98 }
99
100 consumer, err := NewConsumerFromClient(client)
101 if err != nil {
102 return nil, err
103 }
104
105 return &consumerGroup{
106 client: client,
107 consumer: consumer,
108 config: config,
109 groupID: groupID,
110 errors: make(chan error, config.ChannelBufferSize),
111 closed: make(chan none),
112 }, nil
113}
114
115// Errors implements ConsumerGroup.
116func (c *consumerGroup) Errors() <-chan error { return c.errors }
117
118// Close implements ConsumerGroup.
119func (c *consumerGroup) Close() (err error) {
120 c.closeOnce.Do(func() {
121 close(c.closed)
122
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000123 // leave group
124 if e := c.leave(); e != nil {
125 err = e
126 }
127
128 // drain errors
129 go func() {
130 close(c.errors)
131 }()
132 for e := range c.errors {
133 err = e
134 }
135
136 if e := c.client.Close(); e != nil {
137 err = e
138 }
139 })
140 return
141}
142
143// Consume implements ConsumerGroup.
144func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
145 // Ensure group is not closed
146 select {
147 case <-c.closed:
148 return ErrClosedConsumerGroup
149 default:
150 }
151
152 c.lock.Lock()
153 defer c.lock.Unlock()
154
155 // Quick exit when no topics are provided
156 if len(topics) == 0 {
157 return fmt.Errorf("no topics provided")
158 }
159
160 // Refresh metadata for requested topics
161 if err := c.client.RefreshMetadata(topics...); err != nil {
162 return err
163 }
164
165 // Init session
166 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
167 if err == ErrClosedClient {
168 return ErrClosedConsumerGroup
169 } else if err != nil {
170 return err
171 }
172
173 // loop check topic partition numbers changed
174 // will trigger rebalance when any topic partitions number had changed
Scott Baker105df152020-04-13 15:55:14 -0700175 // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000176 go c.loopCheckPartitionNumbers(topics, sess)
177
178 // Wait for session exit signal
179 <-sess.ctx.Done()
180
181 // Gracefully release session claims
182 return sess.release(true)
183}
184
185func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
186 select {
187 case <-c.closed:
188 return nil, ErrClosedConsumerGroup
189 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
190 }
191
192 if refreshCoordinator {
193 err := c.client.RefreshCoordinator(c.groupID)
194 if err != nil {
195 return c.retryNewSession(ctx, topics, handler, retries, true)
196 }
197 }
198
199 return c.newSession(ctx, topics, handler, retries-1)
200}
201
202func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
203 coordinator, err := c.client.Coordinator(c.groupID)
204 if err != nil {
205 if retries <= 0 {
206 return nil, err
207 }
208
209 return c.retryNewSession(ctx, topics, handler, retries, true)
210 }
211
212 // Join consumer group
213 join, err := c.joinGroupRequest(coordinator, topics)
214 if err != nil {
215 _ = coordinator.Close()
216 return nil, err
217 }
218 switch join.Err {
219 case ErrNoError:
220 c.memberID = join.MemberId
221 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
222 c.memberID = ""
223 return c.newSession(ctx, topics, handler, retries)
224 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
225 if retries <= 0 {
226 return nil, join.Err
227 }
228
229 return c.retryNewSession(ctx, topics, handler, retries, true)
230 case ErrRebalanceInProgress: // retry after backoff
231 if retries <= 0 {
232 return nil, join.Err
233 }
234
235 return c.retryNewSession(ctx, topics, handler, retries, false)
236 default:
237 return nil, join.Err
238 }
239
240 // Prepare distribution plan if we joined as the leader
241 var plan BalanceStrategyPlan
242 if join.LeaderId == join.MemberId {
243 members, err := join.GetMembers()
244 if err != nil {
245 return nil, err
246 }
247
248 plan, err = c.balance(members)
249 if err != nil {
250 return nil, err
251 }
252 }
253
254 // Sync consumer group
255 sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
256 if err != nil {
257 _ = coordinator.Close()
258 return nil, err
259 }
260 switch sync.Err {
261 case ErrNoError:
262 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
263 c.memberID = ""
264 return c.newSession(ctx, topics, handler, retries)
265 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
266 if retries <= 0 {
267 return nil, sync.Err
268 }
269
270 return c.retryNewSession(ctx, topics, handler, retries, true)
271 case ErrRebalanceInProgress: // retry after backoff
272 if retries <= 0 {
273 return nil, sync.Err
274 }
275
276 return c.retryNewSession(ctx, topics, handler, retries, false)
277 default:
278 return nil, sync.Err
279 }
280
281 // Retrieve and sort claims
282 var claims map[string][]int32
283 if len(sync.MemberAssignment) > 0 {
284 members, err := sync.GetMemberAssignment()
285 if err != nil {
286 return nil, err
287 }
288 claims = members.Topics
289 c.userData = members.UserData
290
291 for _, partitions := range claims {
292 sort.Sort(int32Slice(partitions))
293 }
294 }
295
296 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
297}
298
299func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
300 req := &JoinGroupRequest{
301 GroupId: c.groupID,
302 MemberId: c.memberID,
303 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
304 ProtocolType: "consumer",
305 }
306 if c.config.Version.IsAtLeast(V0_10_1_0) {
307 req.Version = 1
308 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
309 }
310
311 // use static user-data if configured, otherwise use consumer-group userdata from the last sync
312 userData := c.config.Consumer.Group.Member.UserData
313 if len(userData) == 0 {
314 userData = c.userData
315 }
316 meta := &ConsumerGroupMemberMetadata{
317 Topics: topics,
318 UserData: userData,
319 }
320 strategy := c.config.Consumer.Group.Rebalance.Strategy
321 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
322 return nil, err
323 }
324
325 return coordinator.JoinGroup(req)
326}
327
328func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
329 req := &SyncGroupRequest{
330 GroupId: c.groupID,
331 MemberId: c.memberID,
332 GenerationId: generationID,
333 }
Scott Baker105df152020-04-13 15:55:14 -0700334 strategy := c.config.Consumer.Group.Rebalance.Strategy
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000335 for memberID, topics := range plan {
336 assignment := &ConsumerGroupMemberAssignment{Topics: topics}
Scott Baker105df152020-04-13 15:55:14 -0700337 userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
338 if err != nil {
339 return nil, err
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000340 }
Scott Baker105df152020-04-13 15:55:14 -0700341 assignment.UserData = userDataBytes
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000342 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
343 return nil, err
344 }
345 }
346 return coordinator.SyncGroup(req)
347}
348
349func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
350 req := &HeartbeatRequest{
351 GroupId: c.groupID,
352 MemberId: memberID,
353 GenerationId: generationID,
354 }
355
356 return coordinator.Heartbeat(req)
357}
358
359func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
360 topics := make(map[string][]int32)
361 for _, meta := range members {
362 for _, topic := range meta.Topics {
363 topics[topic] = nil
364 }
365 }
366
367 for topic := range topics {
368 partitions, err := c.client.Partitions(topic)
369 if err != nil {
370 return nil, err
371 }
372 topics[topic] = partitions
373 }
374
375 strategy := c.config.Consumer.Group.Rebalance.Strategy
376 return strategy.Plan(members, topics)
377}
378
Scott Baker105df152020-04-13 15:55:14 -0700379// Leaves the cluster, called by Close.
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000380func (c *consumerGroup) leave() error {
Scott Baker105df152020-04-13 15:55:14 -0700381 c.lock.Lock()
382 defer c.lock.Unlock()
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000383 if c.memberID == "" {
384 return nil
385 }
386
387 coordinator, err := c.client.Coordinator(c.groupID)
388 if err != nil {
389 return err
390 }
391
392 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
393 GroupId: c.groupID,
394 MemberId: c.memberID,
395 })
396 if err != nil {
397 _ = coordinator.Close()
398 return err
399 }
400
401 // Unset memberID
402 c.memberID = ""
403
404 // Check response
405 switch resp.Err {
406 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
407 return nil
408 default:
409 return resp.Err
410 }
411}
412
413func (c *consumerGroup) handleError(err error, topic string, partition int32) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000414 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
415 err = &ConsumerError{
416 Topic: topic,
417 Partition: partition,
418 Err: err,
419 }
420 }
421
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000422 if !c.config.Consumer.Return.Errors {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000423 Logger.Println(err)
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000424 return
425 }
426
Dinesh Belwalkar396b6522020-02-06 22:11:53 +0000427 select {
428 case <-c.closed:
429 //consumer is closed
430 return
431 default:
432 }
433
434 select {
435 case c.errors <- err:
436 default:
437 // no error listener
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000438 }
439}
440
441func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
Scott Baker105df152020-04-13 15:55:14 -0700442 pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000443 defer session.cancel()
444 defer pause.Stop()
445 var oldTopicToPartitionNum map[string]int
446 var err error
447 if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
448 return
449 }
450 for {
451 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
452 return
453 } else {
454 for topic, num := range oldTopicToPartitionNum {
455 if newTopicToPartitionNum[topic] != num {
456 return // trigger the end of the session on exit
457 }
458 }
459 }
460 select {
461 case <-pause.C:
Scott Baker105df152020-04-13 15:55:14 -0700462 case <-session.ctx.Done():
463 Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
464 // if session closed by other, should be exited
465 return
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000466 case <-c.closed:
467 return
468 }
469 }
470}
471
472func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000473 topicToPartitionNum := make(map[string]int, len(topics))
474 for _, topic := range topics {
475 if partitionNum, err := c.client.Partitions(topic); err != nil {
476 Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
477 return nil, err
478 } else {
479 topicToPartitionNum[topic] = len(partitionNum)
480 }
481 }
482 return topicToPartitionNum, nil
483}
484
485// --------------------------------------------------------------------
486
487// ConsumerGroupSession represents a consumer group member session.
488type ConsumerGroupSession interface {
489 // Claims returns information about the claimed partitions by topic.
490 Claims() map[string][]int32
491
492 // MemberID returns the cluster member ID.
493 MemberID() string
494
495 // GenerationID returns the current generation ID.
496 GenerationID() int32
497
498 // MarkOffset marks the provided offset, alongside a metadata string
499 // that represents the state of the partition consumer at that point in time. The
500 // metadata string can be used by another consumer to restore that state, so it
501 // can resume consumption.
502 //
503 // To follow upstream conventions, you are expected to mark the offset of the
504 // next message to read, not the last message read. Thus, when calling `MarkOffset`
505 // you should typically add one to the offset of the last consumed message.
506 //
507 // Note: calling MarkOffset does not necessarily commit the offset to the backend
508 // store immediately for efficiency reasons, and it may never be committed if
509 // your application crashes. This means that you may end up processing the same
510 // message twice, and your processing should ideally be idempotent.
511 MarkOffset(topic string, partition int32, offset int64, metadata string)
512
513 // ResetOffset resets to the provided offset, alongside a metadata string that
514 // represents the state of the partition consumer at that point in time. Reset
515 // acts as a counterpart to MarkOffset, the difference being that it allows to
516 // reset an offset to an earlier or smaller value, where MarkOffset only
517 // allows incrementing the offset. cf MarkOffset for more details.
518 ResetOffset(topic string, partition int32, offset int64, metadata string)
519
520 // MarkMessage marks a message as consumed.
521 MarkMessage(msg *ConsumerMessage, metadata string)
522
523 // Context returns the session context.
524 Context() context.Context
525}
526
527type consumerGroupSession struct {
528 parent *consumerGroup
529 memberID string
530 generationID int32
531 handler ConsumerGroupHandler
532
533 claims map[string][]int32
534 offsets *offsetManager
535 ctx context.Context
536 cancel func()
537
538 waitGroup sync.WaitGroup
539 releaseOnce sync.Once
540 hbDying, hbDead chan none
541}
542
543func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
544 // init offset manager
545 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
546 if err != nil {
547 return nil, err
548 }
549
550 // init context
551 ctx, cancel := context.WithCancel(ctx)
552
553 // init session
554 sess := &consumerGroupSession{
555 parent: parent,
556 memberID: memberID,
557 generationID: generationID,
558 handler: handler,
559 offsets: offsets,
560 claims: claims,
561 ctx: ctx,
562 cancel: cancel,
563 hbDying: make(chan none),
564 hbDead: make(chan none),
565 }
566
567 // start heartbeat loop
568 go sess.heartbeatLoop()
569
570 // create a POM for each claim
571 for topic, partitions := range claims {
572 for _, partition := range partitions {
573 pom, err := offsets.ManagePartition(topic, partition)
574 if err != nil {
575 _ = sess.release(false)
576 return nil, err
577 }
578
579 // handle POM errors
580 go func(topic string, partition int32) {
581 for err := range pom.Errors() {
582 sess.parent.handleError(err, topic, partition)
583 }
584 }(topic, partition)
585 }
586 }
587
588 // perform setup
589 if err := handler.Setup(sess); err != nil {
590 _ = sess.release(true)
591 return nil, err
592 }
593
594 // start consuming
595 for topic, partitions := range claims {
596 for _, partition := range partitions {
597 sess.waitGroup.Add(1)
598
599 go func(topic string, partition int32) {
600 defer sess.waitGroup.Done()
601
602 // cancel the as session as soon as the first
603 // goroutine exits
604 defer sess.cancel()
605
606 // consume a single topic/partition, blocking
607 sess.consume(topic, partition)
608 }(topic, partition)
609 }
610 }
611 return sess, nil
612}
613
614func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
615func (s *consumerGroupSession) MemberID() string { return s.memberID }
616func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
617
618func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
619 if pom := s.offsets.findPOM(topic, partition); pom != nil {
620 pom.MarkOffset(offset, metadata)
621 }
622}
623
624func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
625 if pom := s.offsets.findPOM(topic, partition); pom != nil {
626 pom.ResetOffset(offset, metadata)
627 }
628}
629
630func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
631 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
632}
633
634func (s *consumerGroupSession) Context() context.Context {
635 return s.ctx
636}
637
638func (s *consumerGroupSession) consume(topic string, partition int32) {
639 // quick exit if rebalance is due
640 select {
641 case <-s.ctx.Done():
642 return
643 case <-s.parent.closed:
644 return
645 default:
646 }
647
648 // get next offset
649 offset := s.parent.config.Consumer.Offsets.Initial
650 if pom := s.offsets.findPOM(topic, partition); pom != nil {
651 offset, _ = pom.NextOffset()
652 }
653
654 // create new claim
655 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
656 if err != nil {
657 s.parent.handleError(err, topic, partition)
658 return
659 }
660
661 // handle errors
662 go func() {
663 for err := range claim.Errors() {
664 s.parent.handleError(err, topic, partition)
665 }
666 }()
667
668 // trigger close when session is done
669 go func() {
670 select {
671 case <-s.ctx.Done():
672 case <-s.parent.closed:
673 }
674 claim.AsyncClose()
675 }()
676
677 // start processing
678 if err := s.handler.ConsumeClaim(s, claim); err != nil {
679 s.parent.handleError(err, topic, partition)
680 }
681
682 // ensure consumer is closed & drained
683 claim.AsyncClose()
684 for _, err := range claim.waitClosed() {
685 s.parent.handleError(err, topic, partition)
686 }
687}
688
689func (s *consumerGroupSession) release(withCleanup bool) (err error) {
690 // signal release, stop heartbeat
691 s.cancel()
692
693 // wait for consumers to exit
694 s.waitGroup.Wait()
695
696 // perform release
697 s.releaseOnce.Do(func() {
698 if withCleanup {
699 if e := s.handler.Cleanup(s); e != nil {
700 s.parent.handleError(e, "", -1)
701 err = e
702 }
703 }
704
705 if e := s.offsets.Close(); e != nil {
706 err = e
707 }
708
709 close(s.hbDying)
710 <-s.hbDead
711 })
712
713 return
714}
715
716func (s *consumerGroupSession) heartbeatLoop() {
717 defer close(s.hbDead)
718 defer s.cancel() // trigger the end of the session on exit
719
720 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
721 defer pause.Stop()
722
723 retries := s.parent.config.Metadata.Retry.Max
724 for {
725 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
726 if err != nil {
727 if retries <= 0 {
728 s.parent.handleError(err, "", -1)
729 return
730 }
731
732 select {
733 case <-s.hbDying:
734 return
735 case <-time.After(s.parent.config.Metadata.Retry.Backoff):
736 retries--
737 }
738 continue
739 }
740
741 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
742 if err != nil {
743 _ = coordinator.Close()
744
745 if retries <= 0 {
746 s.parent.handleError(err, "", -1)
747 return
748 }
749
750 retries--
751 continue
752 }
753
754 switch resp.Err {
755 case ErrNoError:
756 retries = s.parent.config.Metadata.Retry.Max
757 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
758 return
759 default:
760 s.parent.handleError(err, "", -1)
761 return
762 }
763
764 select {
765 case <-pause.C:
766 case <-s.hbDying:
767 return
768 }
769 }
770}
771
772// --------------------------------------------------------------------
773
774// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
775// It also provides hooks for your consumer group session life-cycle and allow you to
776// trigger logic before or after the consume loop(s).
777//
778// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
779// ensure that all state is safely protected against race conditions.
780type ConsumerGroupHandler interface {
781 // Setup is run at the beginning of a new session, before ConsumeClaim.
782 Setup(ConsumerGroupSession) error
783
784 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
785 // but before the offsets are committed for the very last time.
786 Cleanup(ConsumerGroupSession) error
787
788 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
789 // Once the Messages() channel is closed, the Handler must finish its processing
790 // loop and exit.
791 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
792}
793
794// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
795type ConsumerGroupClaim interface {
796 // Topic returns the consumed topic name.
797 Topic() string
798
799 // Partition returns the consumed partition.
800 Partition() int32
801
802 // InitialOffset returns the initial offset that was used as a starting point for this claim.
803 InitialOffset() int64
804
805 // HighWaterMarkOffset returns the high water mark offset of the partition,
806 // i.e. the offset that will be used for the next message that will be produced.
807 // You can use this to determine how far behind the processing is.
808 HighWaterMarkOffset() int64
809
810 // Messages returns the read channel for the messages that are returned by
811 // the broker. The messages channel will be closed when a new rebalance cycle
812 // is due. You must finish processing and mark offsets within
813 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
814 // re-assigned to another group member.
815 Messages() <-chan *ConsumerMessage
816}
817
818type consumerGroupClaim struct {
819 topic string
820 partition int32
821 offset int64
822 PartitionConsumer
823}
824
825func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
826 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
827 if err == ErrOffsetOutOfRange {
828 offset = sess.parent.config.Consumer.Offsets.Initial
829 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
830 }
831 if err != nil {
832 return nil, err
833 }
834
835 go func() {
836 for err := range pcm.Errors() {
837 sess.parent.handleError(err, topic, partition)
838 }
839 }()
840
841 return &consumerGroupClaim{
842 topic: topic,
843 partition: partition,
844 offset: offset,
845 PartitionConsumer: pcm,
846 }, nil
847}
848
849func (c *consumerGroupClaim) Topic() string { return c.topic }
850func (c *consumerGroupClaim) Partition() int32 { return c.partition }
851func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
852
853// Drains messages and errors, ensures the claim is fully closed.
854func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
855 go func() {
856 for range c.Messages() {
857 }
858 }()
859
860 for err := range c.Errors() {
861 errs = append(errs, err)
862 }
863 return
864}