blob: da99e8811c8b6fae0c3761ea42ad76efa2718038 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18 // Consume joins a cluster of consumers for a given list of topics and
19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20 //
21 // The life-cycle of a session is represented by the following steps:
22 //
23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24 // and is assigned their "fair share" of partitions, aka 'claims'.
25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
26 // of the claims and allow any necessary preparation or alteration of state.
27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29 // from concurrent reads/writes.
30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31 // parent context is cancelled or when a server-side rebalance cycle is initiated.
32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33 // to allow the user to perform any final tasks before a rebalance.
34 // 6. Finally, marked offsets are committed one last time before claims are released.
35 //
36 // Please note, that once a rebalance is triggered, sessions must be completed within
37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40 // commit failures.
41 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
42
43 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
44 // By default, errors are logged and not returned over this channel.
45 // If you want to implement any custom error handling, set your config's
46 // Consumer.Return.Errors setting to true, and read from this channel.
47 Errors() <-chan error
48
49 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
50 // this function before the object passes out of scope, as it will otherwise leak memory.
51 Close() error
52}
53
54type consumerGroup struct {
55 client Client
56
57 config *Config
58 consumer Consumer
59 groupID string
60 memberID string
61 errors chan error
62
63 lock sync.Mutex
64 closed chan none
65 closeOnce sync.Once
66
67 userData []byte
68}
69
70// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
71func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
72 client, err := NewClient(addrs, config)
73 if err != nil {
74 return nil, err
75 }
76
77 c, err := newConsumerGroup(groupID, client)
78 if err != nil {
79 _ = client.Close()
80 }
81 return c, err
82}
83
84// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
85// necessary to call Close() on the underlying client when shutting down this consumer.
86// PLEASE NOTE: consumer groups can only re-use but not share clients.
87func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
88 // For clients passed in by the client, ensure we don't
89 // call Close() on it.
90 cli := &nopCloserClient{client}
91 return newConsumerGroup(groupID, cli)
92}
93
94func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
95 config := client.Config()
96 if !config.Version.IsAtLeast(V0_10_2_0) {
97 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
98 }
99
100 consumer, err := NewConsumerFromClient(client)
101 if err != nil {
102 return nil, err
103 }
104
105 return &consumerGroup{
106 client: client,
107 consumer: consumer,
108 config: config,
109 groupID: groupID,
110 errors: make(chan error, config.ChannelBufferSize),
111 closed: make(chan none),
112 }, nil
113}
114
115// Errors implements ConsumerGroup.
116func (c *consumerGroup) Errors() <-chan error { return c.errors }
117
118// Close implements ConsumerGroup.
119func (c *consumerGroup) Close() (err error) {
120 c.closeOnce.Do(func() {
121 close(c.closed)
122
123 c.lock.Lock()
124 defer c.lock.Unlock()
125
126 // leave group
127 if e := c.leave(); e != nil {
128 err = e
129 }
130
131 // drain errors
132 go func() {
133 close(c.errors)
134 }()
135 for e := range c.errors {
136 err = e
137 }
138
139 if e := c.client.Close(); e != nil {
140 err = e
141 }
142 })
143 return
144}
145
146// Consume implements ConsumerGroup.
147func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
148 // Ensure group is not closed
149 select {
150 case <-c.closed:
151 return ErrClosedConsumerGroup
152 default:
153 }
154
155 c.lock.Lock()
156 defer c.lock.Unlock()
157
158 // Quick exit when no topics are provided
159 if len(topics) == 0 {
160 return fmt.Errorf("no topics provided")
161 }
162
163 // Refresh metadata for requested topics
164 if err := c.client.RefreshMetadata(topics...); err != nil {
165 return err
166 }
167
168 // Init session
169 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
170 if err == ErrClosedClient {
171 return ErrClosedConsumerGroup
172 } else if err != nil {
173 return err
174 }
175
176 // loop check topic partition numbers changed
177 // will trigger rebalance when any topic partitions number had changed
178 go c.loopCheckPartitionNumbers(topics, sess)
179
180 // Wait for session exit signal
181 <-sess.ctx.Done()
182
183 // Gracefully release session claims
184 return sess.release(true)
185}
186
187func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
188 select {
189 case <-c.closed:
190 return nil, ErrClosedConsumerGroup
191 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
192 }
193
194 if refreshCoordinator {
195 err := c.client.RefreshCoordinator(c.groupID)
196 if err != nil {
197 return c.retryNewSession(ctx, topics, handler, retries, true)
198 }
199 }
200
201 return c.newSession(ctx, topics, handler, retries-1)
202}
203
204func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
205 coordinator, err := c.client.Coordinator(c.groupID)
206 if err != nil {
207 if retries <= 0 {
208 return nil, err
209 }
210
211 return c.retryNewSession(ctx, topics, handler, retries, true)
212 }
213
214 // Join consumer group
215 join, err := c.joinGroupRequest(coordinator, topics)
216 if err != nil {
217 _ = coordinator.Close()
218 return nil, err
219 }
220 switch join.Err {
221 case ErrNoError:
222 c.memberID = join.MemberId
223 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
224 c.memberID = ""
225 return c.newSession(ctx, topics, handler, retries)
226 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
227 if retries <= 0 {
228 return nil, join.Err
229 }
230
231 return c.retryNewSession(ctx, topics, handler, retries, true)
232 case ErrRebalanceInProgress: // retry after backoff
233 if retries <= 0 {
234 return nil, join.Err
235 }
236
237 return c.retryNewSession(ctx, topics, handler, retries, false)
238 default:
239 return nil, join.Err
240 }
241
242 // Prepare distribution plan if we joined as the leader
243 var plan BalanceStrategyPlan
244 if join.LeaderId == join.MemberId {
245 members, err := join.GetMembers()
246 if err != nil {
247 return nil, err
248 }
249
250 plan, err = c.balance(members)
251 if err != nil {
252 return nil, err
253 }
254 }
255
256 // Sync consumer group
257 sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
258 if err != nil {
259 _ = coordinator.Close()
260 return nil, err
261 }
262 switch sync.Err {
263 case ErrNoError:
264 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
265 c.memberID = ""
266 return c.newSession(ctx, topics, handler, retries)
267 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
268 if retries <= 0 {
269 return nil, sync.Err
270 }
271
272 return c.retryNewSession(ctx, topics, handler, retries, true)
273 case ErrRebalanceInProgress: // retry after backoff
274 if retries <= 0 {
275 return nil, sync.Err
276 }
277
278 return c.retryNewSession(ctx, topics, handler, retries, false)
279 default:
280 return nil, sync.Err
281 }
282
283 // Retrieve and sort claims
284 var claims map[string][]int32
285 if len(sync.MemberAssignment) > 0 {
286 members, err := sync.GetMemberAssignment()
287 if err != nil {
288 return nil, err
289 }
290 claims = members.Topics
291 c.userData = members.UserData
292
293 for _, partitions := range claims {
294 sort.Sort(int32Slice(partitions))
295 }
296 }
297
298 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
299}
300
301func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
302 req := &JoinGroupRequest{
303 GroupId: c.groupID,
304 MemberId: c.memberID,
305 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
306 ProtocolType: "consumer",
307 }
308 if c.config.Version.IsAtLeast(V0_10_1_0) {
309 req.Version = 1
310 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
311 }
312
313 // use static user-data if configured, otherwise use consumer-group userdata from the last sync
314 userData := c.config.Consumer.Group.Member.UserData
315 if len(userData) == 0 {
316 userData = c.userData
317 }
318 meta := &ConsumerGroupMemberMetadata{
319 Topics: topics,
320 UserData: userData,
321 }
322 strategy := c.config.Consumer.Group.Rebalance.Strategy
323 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
324 return nil, err
325 }
326
327 return coordinator.JoinGroup(req)
328}
329
330func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
331 req := &SyncGroupRequest{
332 GroupId: c.groupID,
333 MemberId: c.memberID,
334 GenerationId: generationID,
335 }
336 for memberID, topics := range plan {
337 assignment := &ConsumerGroupMemberAssignment{Topics: topics}
338
339 // Include topic assignments in group-assignment userdata for each consumer-group member
340 if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
341 userDataBytes, err := encode(&StickyAssignorUserDataV1{
342 Topics: topics,
343 Generation: generationID,
344 }, nil)
345 if err != nil {
346 return nil, err
347 }
348 assignment.UserData = userDataBytes
349 }
350 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
351 return nil, err
352 }
353 }
354 return coordinator.SyncGroup(req)
355}
356
357func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
358 req := &HeartbeatRequest{
359 GroupId: c.groupID,
360 MemberId: memberID,
361 GenerationId: generationID,
362 }
363
364 return coordinator.Heartbeat(req)
365}
366
367func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
368 topics := make(map[string][]int32)
369 for _, meta := range members {
370 for _, topic := range meta.Topics {
371 topics[topic] = nil
372 }
373 }
374
375 for topic := range topics {
376 partitions, err := c.client.Partitions(topic)
377 if err != nil {
378 return nil, err
379 }
380 topics[topic] = partitions
381 }
382
383 strategy := c.config.Consumer.Group.Rebalance.Strategy
384 return strategy.Plan(members, topics)
385}
386
387// Leaves the cluster, called by Close, protected by lock.
388func (c *consumerGroup) leave() error {
389 if c.memberID == "" {
390 return nil
391 }
392
393 coordinator, err := c.client.Coordinator(c.groupID)
394 if err != nil {
395 return err
396 }
397
398 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
399 GroupId: c.groupID,
400 MemberId: c.memberID,
401 })
402 if err != nil {
403 _ = coordinator.Close()
404 return err
405 }
406
407 // Unset memberID
408 c.memberID = ""
409
410 // Check response
411 switch resp.Err {
412 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
413 return nil
414 default:
415 return resp.Err
416 }
417}
418
419func (c *consumerGroup) handleError(err error, topic string, partition int32) {
420 select {
421 case <-c.closed:
422 return
423 default:
424 }
425
426 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
427 err = &ConsumerError{
428 Topic: topic,
429 Partition: partition,
430 Err: err,
431 }
432 }
433
434 if c.config.Consumer.Return.Errors {
435 select {
436 case c.errors <- err:
437 default:
438 }
439 } else {
440 Logger.Println(err)
441 }
442}
443
444func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
445 pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
446 defer session.cancel()
447 defer pause.Stop()
448 var oldTopicToPartitionNum map[string]int
449 var err error
450 if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
451 return
452 }
453 for {
454 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
455 return
456 } else {
457 for topic, num := range oldTopicToPartitionNum {
458 if newTopicToPartitionNum[topic] != num {
459 return // trigger the end of the session on exit
460 }
461 }
462 }
463 select {
464 case <-pause.C:
465 case <-c.closed:
466 return
467 }
468 }
469}
470
471func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
472 if err := c.client.RefreshMetadata(topics...); err != nil {
473 Logger.Printf("Consumer Group refresh metadata failed %v", err)
474 return nil, err
475 }
476 topicToPartitionNum := make(map[string]int, len(topics))
477 for _, topic := range topics {
478 if partitionNum, err := c.client.Partitions(topic); err != nil {
479 Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
480 return nil, err
481 } else {
482 topicToPartitionNum[topic] = len(partitionNum)
483 }
484 }
485 return topicToPartitionNum, nil
486}
487
488// --------------------------------------------------------------------
489
490// ConsumerGroupSession represents a consumer group member session.
491type ConsumerGroupSession interface {
492 // Claims returns information about the claimed partitions by topic.
493 Claims() map[string][]int32
494
495 // MemberID returns the cluster member ID.
496 MemberID() string
497
498 // GenerationID returns the current generation ID.
499 GenerationID() int32
500
501 // MarkOffset marks the provided offset, alongside a metadata string
502 // that represents the state of the partition consumer at that point in time. The
503 // metadata string can be used by another consumer to restore that state, so it
504 // can resume consumption.
505 //
506 // To follow upstream conventions, you are expected to mark the offset of the
507 // next message to read, not the last message read. Thus, when calling `MarkOffset`
508 // you should typically add one to the offset of the last consumed message.
509 //
510 // Note: calling MarkOffset does not necessarily commit the offset to the backend
511 // store immediately for efficiency reasons, and it may never be committed if
512 // your application crashes. This means that you may end up processing the same
513 // message twice, and your processing should ideally be idempotent.
514 MarkOffset(topic string, partition int32, offset int64, metadata string)
515
516 // ResetOffset resets to the provided offset, alongside a metadata string that
517 // represents the state of the partition consumer at that point in time. Reset
518 // acts as a counterpart to MarkOffset, the difference being that it allows to
519 // reset an offset to an earlier or smaller value, where MarkOffset only
520 // allows incrementing the offset. cf MarkOffset for more details.
521 ResetOffset(topic string, partition int32, offset int64, metadata string)
522
523 // MarkMessage marks a message as consumed.
524 MarkMessage(msg *ConsumerMessage, metadata string)
525
526 // Context returns the session context.
527 Context() context.Context
528}
529
530type consumerGroupSession struct {
531 parent *consumerGroup
532 memberID string
533 generationID int32
534 handler ConsumerGroupHandler
535
536 claims map[string][]int32
537 offsets *offsetManager
538 ctx context.Context
539 cancel func()
540
541 waitGroup sync.WaitGroup
542 releaseOnce sync.Once
543 hbDying, hbDead chan none
544}
545
546func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
547 // init offset manager
548 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
549 if err != nil {
550 return nil, err
551 }
552
553 // init context
554 ctx, cancel := context.WithCancel(ctx)
555
556 // init session
557 sess := &consumerGroupSession{
558 parent: parent,
559 memberID: memberID,
560 generationID: generationID,
561 handler: handler,
562 offsets: offsets,
563 claims: claims,
564 ctx: ctx,
565 cancel: cancel,
566 hbDying: make(chan none),
567 hbDead: make(chan none),
568 }
569
570 // start heartbeat loop
571 go sess.heartbeatLoop()
572
573 // create a POM for each claim
574 for topic, partitions := range claims {
575 for _, partition := range partitions {
576 pom, err := offsets.ManagePartition(topic, partition)
577 if err != nil {
578 _ = sess.release(false)
579 return nil, err
580 }
581
582 // handle POM errors
583 go func(topic string, partition int32) {
584 for err := range pom.Errors() {
585 sess.parent.handleError(err, topic, partition)
586 }
587 }(topic, partition)
588 }
589 }
590
591 // perform setup
592 if err := handler.Setup(sess); err != nil {
593 _ = sess.release(true)
594 return nil, err
595 }
596
597 // start consuming
598 for topic, partitions := range claims {
599 for _, partition := range partitions {
600 sess.waitGroup.Add(1)
601
602 go func(topic string, partition int32) {
603 defer sess.waitGroup.Done()
604
605 // cancel the as session as soon as the first
606 // goroutine exits
607 defer sess.cancel()
608
609 // consume a single topic/partition, blocking
610 sess.consume(topic, partition)
611 }(topic, partition)
612 }
613 }
614 return sess, nil
615}
616
617func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
618func (s *consumerGroupSession) MemberID() string { return s.memberID }
619func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
620
621func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
622 if pom := s.offsets.findPOM(topic, partition); pom != nil {
623 pom.MarkOffset(offset, metadata)
624 }
625}
626
627func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
628 if pom := s.offsets.findPOM(topic, partition); pom != nil {
629 pom.ResetOffset(offset, metadata)
630 }
631}
632
633func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
634 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
635}
636
637func (s *consumerGroupSession) Context() context.Context {
638 return s.ctx
639}
640
641func (s *consumerGroupSession) consume(topic string, partition int32) {
642 // quick exit if rebalance is due
643 select {
644 case <-s.ctx.Done():
645 return
646 case <-s.parent.closed:
647 return
648 default:
649 }
650
651 // get next offset
652 offset := s.parent.config.Consumer.Offsets.Initial
653 if pom := s.offsets.findPOM(topic, partition); pom != nil {
654 offset, _ = pom.NextOffset()
655 }
656
657 // create new claim
658 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
659 if err != nil {
660 s.parent.handleError(err, topic, partition)
661 return
662 }
663
664 // handle errors
665 go func() {
666 for err := range claim.Errors() {
667 s.parent.handleError(err, topic, partition)
668 }
669 }()
670
671 // trigger close when session is done
672 go func() {
673 select {
674 case <-s.ctx.Done():
675 case <-s.parent.closed:
676 }
677 claim.AsyncClose()
678 }()
679
680 // start processing
681 if err := s.handler.ConsumeClaim(s, claim); err != nil {
682 s.parent.handleError(err, topic, partition)
683 }
684
685 // ensure consumer is closed & drained
686 claim.AsyncClose()
687 for _, err := range claim.waitClosed() {
688 s.parent.handleError(err, topic, partition)
689 }
690}
691
692func (s *consumerGroupSession) release(withCleanup bool) (err error) {
693 // signal release, stop heartbeat
694 s.cancel()
695
696 // wait for consumers to exit
697 s.waitGroup.Wait()
698
699 // perform release
700 s.releaseOnce.Do(func() {
701 if withCleanup {
702 if e := s.handler.Cleanup(s); e != nil {
703 s.parent.handleError(e, "", -1)
704 err = e
705 }
706 }
707
708 if e := s.offsets.Close(); e != nil {
709 err = e
710 }
711
712 close(s.hbDying)
713 <-s.hbDead
714 })
715
716 return
717}
718
719func (s *consumerGroupSession) heartbeatLoop() {
720 defer close(s.hbDead)
721 defer s.cancel() // trigger the end of the session on exit
722
723 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
724 defer pause.Stop()
725
726 retries := s.parent.config.Metadata.Retry.Max
727 for {
728 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
729 if err != nil {
730 if retries <= 0 {
731 s.parent.handleError(err, "", -1)
732 return
733 }
734
735 select {
736 case <-s.hbDying:
737 return
738 case <-time.After(s.parent.config.Metadata.Retry.Backoff):
739 retries--
740 }
741 continue
742 }
743
744 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
745 if err != nil {
746 _ = coordinator.Close()
747
748 if retries <= 0 {
749 s.parent.handleError(err, "", -1)
750 return
751 }
752
753 retries--
754 continue
755 }
756
757 switch resp.Err {
758 case ErrNoError:
759 retries = s.parent.config.Metadata.Retry.Max
760 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
761 return
762 default:
763 s.parent.handleError(err, "", -1)
764 return
765 }
766
767 select {
768 case <-pause.C:
769 case <-s.hbDying:
770 return
771 }
772 }
773}
774
775// --------------------------------------------------------------------
776
777// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
778// It also provides hooks for your consumer group session life-cycle and allow you to
779// trigger logic before or after the consume loop(s).
780//
781// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
782// ensure that all state is safely protected against race conditions.
783type ConsumerGroupHandler interface {
784 // Setup is run at the beginning of a new session, before ConsumeClaim.
785 Setup(ConsumerGroupSession) error
786
787 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
788 // but before the offsets are committed for the very last time.
789 Cleanup(ConsumerGroupSession) error
790
791 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
792 // Once the Messages() channel is closed, the Handler must finish its processing
793 // loop and exit.
794 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
795}
796
797// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
798type ConsumerGroupClaim interface {
799 // Topic returns the consumed topic name.
800 Topic() string
801
802 // Partition returns the consumed partition.
803 Partition() int32
804
805 // InitialOffset returns the initial offset that was used as a starting point for this claim.
806 InitialOffset() int64
807
808 // HighWaterMarkOffset returns the high water mark offset of the partition,
809 // i.e. the offset that will be used for the next message that will be produced.
810 // You can use this to determine how far behind the processing is.
811 HighWaterMarkOffset() int64
812
813 // Messages returns the read channel for the messages that are returned by
814 // the broker. The messages channel will be closed when a new rebalance cycle
815 // is due. You must finish processing and mark offsets within
816 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
817 // re-assigned to another group member.
818 Messages() <-chan *ConsumerMessage
819}
820
821type consumerGroupClaim struct {
822 topic string
823 partition int32
824 offset int64
825 PartitionConsumer
826}
827
828func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
829 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
830 if err == ErrOffsetOutOfRange {
831 offset = sess.parent.config.Consumer.Offsets.Initial
832 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
833 }
834 if err != nil {
835 return nil, err
836 }
837
838 go func() {
839 for err := range pcm.Errors() {
840 sess.parent.handleError(err, topic, partition)
841 }
842 }()
843
844 return &consumerGroupClaim{
845 topic: topic,
846 partition: partition,
847 offset: offset,
848 PartitionConsumer: pcm,
849 }, nil
850}
851
852func (c *consumerGroupClaim) Topic() string { return c.topic }
853func (c *consumerGroupClaim) Partition() int32 { return c.partition }
854func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
855
856// Drains messages and errors, ensures the claim is fully closed.
857func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
858 go func() {
859 for range c.Messages() {
860 }
861 }()
862
863 for err := range c.Errors() {
864 errs = append(errs, err)
865 }
866 return
867}