blob: b974dd9af4fc5e836a63453b324fda71dc344076 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18 // Consume joins a cluster of consumers for a given list of topics and
19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20 //
21 // The life-cycle of a session is represented by the following steps:
22 //
23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24 // and is assigned their "fair share" of partitions, aka 'claims'.
25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
26 // of the claims and allow any necessary preparation or alteration of state.
27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29 // from concurrent reads/writes.
30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31 // parent context is cancelled or when a server-side rebalance cycle is initiated.
32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33 // to allow the user to perform any final tasks before a rebalance.
34 // 6. Finally, marked offsets are committed one last time before claims are released.
35 //
36 // Please note, that once a rebalance is triggered, sessions must be completed within
37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40 // commit failures.
41 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
42
43 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
44 // By default, errors are logged and not returned over this channel.
45 // If you want to implement any custom error handling, set your config's
46 // Consumer.Return.Errors setting to true, and read from this channel.
47 Errors() <-chan error
48
49 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
50 // this function before the object passes out of scope, as it will otherwise leak memory.
51 Close() error
52}
53
54type consumerGroup struct {
55 client Client
56
57 config *Config
58 consumer Consumer
59 groupID string
60 memberID string
61 errors chan error
62
63 lock sync.Mutex
64 closed chan none
65 closeOnce sync.Once
66
67 userData []byte
68}
69
70// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
71func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
72 client, err := NewClient(addrs, config)
73 if err != nil {
74 return nil, err
75 }
76
77 c, err := newConsumerGroup(groupID, client)
78 if err != nil {
79 _ = client.Close()
80 }
81 return c, err
82}
83
84// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
85// necessary to call Close() on the underlying client when shutting down this consumer.
86// PLEASE NOTE: consumer groups can only re-use but not share clients.
87func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
88 // For clients passed in by the client, ensure we don't
89 // call Close() on it.
90 cli := &nopCloserClient{client}
91 return newConsumerGroup(groupID, cli)
92}
93
94func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
95 config := client.Config()
96 if !config.Version.IsAtLeast(V0_10_2_0) {
97 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
98 }
99
100 consumer, err := NewConsumerFromClient(client)
101 if err != nil {
102 return nil, err
103 }
104
105 return &consumerGroup{
106 client: client,
107 consumer: consumer,
108 config: config,
109 groupID: groupID,
110 errors: make(chan error, config.ChannelBufferSize),
111 closed: make(chan none),
112 }, nil
113}
114
115// Errors implements ConsumerGroup.
116func (c *consumerGroup) Errors() <-chan error { return c.errors }
117
118// Close implements ConsumerGroup.
119func (c *consumerGroup) Close() (err error) {
120 c.closeOnce.Do(func() {
121 close(c.closed)
122
123 c.lock.Lock()
124 defer c.lock.Unlock()
125
126 // leave group
127 if e := c.leave(); e != nil {
128 err = e
129 }
130
131 // drain errors
132 go func() {
133 close(c.errors)
134 }()
135 for e := range c.errors {
136 err = e
137 }
138
139 if e := c.client.Close(); e != nil {
140 err = e
141 }
142 })
143 return
144}
145
146// Consume implements ConsumerGroup.
147func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
148 // Ensure group is not closed
149 select {
150 case <-c.closed:
151 return ErrClosedConsumerGroup
152 default:
153 }
154
155 c.lock.Lock()
156 defer c.lock.Unlock()
157
158 // Quick exit when no topics are provided
159 if len(topics) == 0 {
160 return fmt.Errorf("no topics provided")
161 }
162
163 // Refresh metadata for requested topics
164 if err := c.client.RefreshMetadata(topics...); err != nil {
165 return err
166 }
167
168 // Init session
169 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
170 if err == ErrClosedClient {
171 return ErrClosedConsumerGroup
172 } else if err != nil {
173 return err
174 }
175
176 // loop check topic partition numbers changed
177 // will trigger rebalance when any topic partitions number had changed
178 go c.loopCheckPartitionNumbers(topics, sess)
179
180 // Wait for session exit signal
181 <-sess.ctx.Done()
182
183 // Gracefully release session claims
184 return sess.release(true)
185}
186
187func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
188 select {
189 case <-c.closed:
190 return nil, ErrClosedConsumerGroup
191 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
192 }
193
194 if refreshCoordinator {
195 err := c.client.RefreshCoordinator(c.groupID)
196 if err != nil {
197 return c.retryNewSession(ctx, topics, handler, retries, true)
198 }
199 }
200
201 return c.newSession(ctx, topics, handler, retries-1)
202}
203
204func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
205 coordinator, err := c.client.Coordinator(c.groupID)
206 if err != nil {
207 if retries <= 0 {
208 return nil, err
209 }
210
211 return c.retryNewSession(ctx, topics, handler, retries, true)
212 }
213
214 // Join consumer group
215 join, err := c.joinGroupRequest(coordinator, topics)
216 if err != nil {
217 _ = coordinator.Close()
218 return nil, err
219 }
220 switch join.Err {
221 case ErrNoError:
222 c.memberID = join.MemberId
223 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
224 c.memberID = ""
225 return c.newSession(ctx, topics, handler, retries)
226 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
227 if retries <= 0 {
228 return nil, join.Err
229 }
230
231 return c.retryNewSession(ctx, topics, handler, retries, true)
232 case ErrRebalanceInProgress: // retry after backoff
233 if retries <= 0 {
234 return nil, join.Err
235 }
236
237 return c.retryNewSession(ctx, topics, handler, retries, false)
238 default:
239 return nil, join.Err
240 }
241
242 // Prepare distribution plan if we joined as the leader
243 var plan BalanceStrategyPlan
244 if join.LeaderId == join.MemberId {
245 members, err := join.GetMembers()
246 if err != nil {
247 return nil, err
248 }
249
250 plan, err = c.balance(members)
251 if err != nil {
252 return nil, err
253 }
254 }
255
256 // Sync consumer group
257 sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
258 if err != nil {
259 _ = coordinator.Close()
260 return nil, err
261 }
262 switch sync.Err {
263 case ErrNoError:
264 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
265 c.memberID = ""
266 return c.newSession(ctx, topics, handler, retries)
267 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
268 if retries <= 0 {
269 return nil, sync.Err
270 }
271
272 return c.retryNewSession(ctx, topics, handler, retries, true)
273 case ErrRebalanceInProgress: // retry after backoff
274 if retries <= 0 {
275 return nil, sync.Err
276 }
277
278 return c.retryNewSession(ctx, topics, handler, retries, false)
279 default:
280 return nil, sync.Err
281 }
282
283 // Retrieve and sort claims
284 var claims map[string][]int32
285 if len(sync.MemberAssignment) > 0 {
286 members, err := sync.GetMemberAssignment()
287 if err != nil {
288 return nil, err
289 }
290 claims = members.Topics
291 c.userData = members.UserData
292
293 for _, partitions := range claims {
294 sort.Sort(int32Slice(partitions))
295 }
296 }
297
298 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
299}
300
301func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
302 req := &JoinGroupRequest{
303 GroupId: c.groupID,
304 MemberId: c.memberID,
305 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
306 ProtocolType: "consumer",
307 }
308 if c.config.Version.IsAtLeast(V0_10_1_0) {
309 req.Version = 1
310 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
311 }
312
313 // use static user-data if configured, otherwise use consumer-group userdata from the last sync
314 userData := c.config.Consumer.Group.Member.UserData
315 if len(userData) == 0 {
316 userData = c.userData
317 }
318 meta := &ConsumerGroupMemberMetadata{
319 Topics: topics,
320 UserData: userData,
321 }
322 strategy := c.config.Consumer.Group.Rebalance.Strategy
323 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
324 return nil, err
325 }
326
327 return coordinator.JoinGroup(req)
328}
329
330func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
331 req := &SyncGroupRequest{
332 GroupId: c.groupID,
333 MemberId: c.memberID,
334 GenerationId: generationID,
335 }
336 for memberID, topics := range plan {
337 assignment := &ConsumerGroupMemberAssignment{Topics: topics}
338
339 // Include topic assignments in group-assignment userdata for each consumer-group member
340 if c.config.Consumer.Group.Rebalance.Strategy.Name() == StickyBalanceStrategyName {
341 userDataBytes, err := encode(&StickyAssignorUserDataV1{
342 Topics: topics,
343 Generation: generationID,
344 }, nil)
345 if err != nil {
346 return nil, err
347 }
348 assignment.UserData = userDataBytes
349 }
350 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
351 return nil, err
352 }
353 }
354 return coordinator.SyncGroup(req)
355}
356
357func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
358 req := &HeartbeatRequest{
359 GroupId: c.groupID,
360 MemberId: memberID,
361 GenerationId: generationID,
362 }
363
364 return coordinator.Heartbeat(req)
365}
366
367func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
368 topics := make(map[string][]int32)
369 for _, meta := range members {
370 for _, topic := range meta.Topics {
371 topics[topic] = nil
372 }
373 }
374
375 for topic := range topics {
376 partitions, err := c.client.Partitions(topic)
377 if err != nil {
378 return nil, err
379 }
380 topics[topic] = partitions
381 }
382
383 strategy := c.config.Consumer.Group.Rebalance.Strategy
384 return strategy.Plan(members, topics)
385}
386
387// Leaves the cluster, called by Close, protected by lock.
388func (c *consumerGroup) leave() error {
389 if c.memberID == "" {
390 return nil
391 }
392
393 coordinator, err := c.client.Coordinator(c.groupID)
394 if err != nil {
395 return err
396 }
397
398 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
399 GroupId: c.groupID,
400 MemberId: c.memberID,
401 })
402 if err != nil {
403 _ = coordinator.Close()
404 return err
405 }
406
407 // Unset memberID
408 c.memberID = ""
409
410 // Check response
411 switch resp.Err {
412 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
413 return nil
414 default:
415 return resp.Err
416 }
417}
418
419func (c *consumerGroup) handleError(err error, topic string, partition int32) {
420 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
421 err = &ConsumerError{
422 Topic: topic,
423 Partition: partition,
424 Err: err,
425 }
426 }
427
428 if !c.config.Consumer.Return.Errors {
429 Logger.Println(err)
430 return
431 }
432
433 c.lock.Lock()
434 defer c.lock.Unlock()
435
436 select {
437 case <-c.closed:
438 //consumer is closed
439 return
440 default:
441 }
442
443 select {
444 case c.errors <- err:
445 default:
446 // no error listener
447 }
448}
449
450func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
451 pause := time.NewTicker(c.config.Consumer.Group.Heartbeat.Interval * 2)
452 defer session.cancel()
453 defer pause.Stop()
454 var oldTopicToPartitionNum map[string]int
455 var err error
456 if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
457 return
458 }
459 for {
460 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
461 return
462 } else {
463 for topic, num := range oldTopicToPartitionNum {
464 if newTopicToPartitionNum[topic] != num {
465 return // trigger the end of the session on exit
466 }
467 }
468 }
469 select {
470 case <-pause.C:
471 case <-c.closed:
472 return
473 }
474 }
475}
476
477func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
478 if err := c.client.RefreshMetadata(topics...); err != nil {
479 Logger.Printf("Consumer Group refresh metadata failed %v", err)
480 return nil, err
481 }
482 topicToPartitionNum := make(map[string]int, len(topics))
483 for _, topic := range topics {
484 if partitionNum, err := c.client.Partitions(topic); err != nil {
485 Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
486 return nil, err
487 } else {
488 topicToPartitionNum[topic] = len(partitionNum)
489 }
490 }
491 return topicToPartitionNum, nil
492}
493
494// --------------------------------------------------------------------
495
496// ConsumerGroupSession represents a consumer group member session.
497type ConsumerGroupSession interface {
498 // Claims returns information about the claimed partitions by topic.
499 Claims() map[string][]int32
500
501 // MemberID returns the cluster member ID.
502 MemberID() string
503
504 // GenerationID returns the current generation ID.
505 GenerationID() int32
506
507 // MarkOffset marks the provided offset, alongside a metadata string
508 // that represents the state of the partition consumer at that point in time. The
509 // metadata string can be used by another consumer to restore that state, so it
510 // can resume consumption.
511 //
512 // To follow upstream conventions, you are expected to mark the offset of the
513 // next message to read, not the last message read. Thus, when calling `MarkOffset`
514 // you should typically add one to the offset of the last consumed message.
515 //
516 // Note: calling MarkOffset does not necessarily commit the offset to the backend
517 // store immediately for efficiency reasons, and it may never be committed if
518 // your application crashes. This means that you may end up processing the same
519 // message twice, and your processing should ideally be idempotent.
520 MarkOffset(topic string, partition int32, offset int64, metadata string)
521
522 // ResetOffset resets to the provided offset, alongside a metadata string that
523 // represents the state of the partition consumer at that point in time. Reset
524 // acts as a counterpart to MarkOffset, the difference being that it allows to
525 // reset an offset to an earlier or smaller value, where MarkOffset only
526 // allows incrementing the offset. cf MarkOffset for more details.
527 ResetOffset(topic string, partition int32, offset int64, metadata string)
528
529 // MarkMessage marks a message as consumed.
530 MarkMessage(msg *ConsumerMessage, metadata string)
531
532 // Context returns the session context.
533 Context() context.Context
534}
535
536type consumerGroupSession struct {
537 parent *consumerGroup
538 memberID string
539 generationID int32
540 handler ConsumerGroupHandler
541
542 claims map[string][]int32
543 offsets *offsetManager
544 ctx context.Context
545 cancel func()
546
547 waitGroup sync.WaitGroup
548 releaseOnce sync.Once
549 hbDying, hbDead chan none
550}
551
552func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
553 // init offset manager
554 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
555 if err != nil {
556 return nil, err
557 }
558
559 // init context
560 ctx, cancel := context.WithCancel(ctx)
561
562 // init session
563 sess := &consumerGroupSession{
564 parent: parent,
565 memberID: memberID,
566 generationID: generationID,
567 handler: handler,
568 offsets: offsets,
569 claims: claims,
570 ctx: ctx,
571 cancel: cancel,
572 hbDying: make(chan none),
573 hbDead: make(chan none),
574 }
575
576 // start heartbeat loop
577 go sess.heartbeatLoop()
578
579 // create a POM for each claim
580 for topic, partitions := range claims {
581 for _, partition := range partitions {
582 pom, err := offsets.ManagePartition(topic, partition)
583 if err != nil {
584 _ = sess.release(false)
585 return nil, err
586 }
587
588 // handle POM errors
589 go func(topic string, partition int32) {
590 for err := range pom.Errors() {
591 sess.parent.handleError(err, topic, partition)
592 }
593 }(topic, partition)
594 }
595 }
596
597 // perform setup
598 if err := handler.Setup(sess); err != nil {
599 _ = sess.release(true)
600 return nil, err
601 }
602
603 // start consuming
604 for topic, partitions := range claims {
605 for _, partition := range partitions {
606 sess.waitGroup.Add(1)
607
608 go func(topic string, partition int32) {
609 defer sess.waitGroup.Done()
610
611 // cancel the as session as soon as the first
612 // goroutine exits
613 defer sess.cancel()
614
615 // consume a single topic/partition, blocking
616 sess.consume(topic, partition)
617 }(topic, partition)
618 }
619 }
620 return sess, nil
621}
622
623func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
624func (s *consumerGroupSession) MemberID() string { return s.memberID }
625func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
626
627func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
628 if pom := s.offsets.findPOM(topic, partition); pom != nil {
629 pom.MarkOffset(offset, metadata)
630 }
631}
632
633func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
634 if pom := s.offsets.findPOM(topic, partition); pom != nil {
635 pom.ResetOffset(offset, metadata)
636 }
637}
638
639func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
640 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
641}
642
643func (s *consumerGroupSession) Context() context.Context {
644 return s.ctx
645}
646
647func (s *consumerGroupSession) consume(topic string, partition int32) {
648 // quick exit if rebalance is due
649 select {
650 case <-s.ctx.Done():
651 return
652 case <-s.parent.closed:
653 return
654 default:
655 }
656
657 // get next offset
658 offset := s.parent.config.Consumer.Offsets.Initial
659 if pom := s.offsets.findPOM(topic, partition); pom != nil {
660 offset, _ = pom.NextOffset()
661 }
662
663 // create new claim
664 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
665 if err != nil {
666 s.parent.handleError(err, topic, partition)
667 return
668 }
669
670 // handle errors
671 go func() {
672 for err := range claim.Errors() {
673 s.parent.handleError(err, topic, partition)
674 }
675 }()
676
677 // trigger close when session is done
678 go func() {
679 select {
680 case <-s.ctx.Done():
681 case <-s.parent.closed:
682 }
683 claim.AsyncClose()
684 }()
685
686 // start processing
687 if err := s.handler.ConsumeClaim(s, claim); err != nil {
688 s.parent.handleError(err, topic, partition)
689 }
690
691 // ensure consumer is closed & drained
692 claim.AsyncClose()
693 for _, err := range claim.waitClosed() {
694 s.parent.handleError(err, topic, partition)
695 }
696}
697
698func (s *consumerGroupSession) release(withCleanup bool) (err error) {
699 // signal release, stop heartbeat
700 s.cancel()
701
702 // wait for consumers to exit
703 s.waitGroup.Wait()
704
705 // perform release
706 s.releaseOnce.Do(func() {
707 if withCleanup {
708 if e := s.handler.Cleanup(s); e != nil {
709 s.parent.handleError(e, "", -1)
710 err = e
711 }
712 }
713
714 if e := s.offsets.Close(); e != nil {
715 err = e
716 }
717
718 close(s.hbDying)
719 <-s.hbDead
720 })
721
722 return
723}
724
725func (s *consumerGroupSession) heartbeatLoop() {
726 defer close(s.hbDead)
727 defer s.cancel() // trigger the end of the session on exit
728
729 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
730 defer pause.Stop()
731
732 retries := s.parent.config.Metadata.Retry.Max
733 for {
734 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
735 if err != nil {
736 if retries <= 0 {
737 s.parent.handleError(err, "", -1)
738 return
739 }
740
741 select {
742 case <-s.hbDying:
743 return
744 case <-time.After(s.parent.config.Metadata.Retry.Backoff):
745 retries--
746 }
747 continue
748 }
749
750 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
751 if err != nil {
752 _ = coordinator.Close()
753
754 if retries <= 0 {
755 s.parent.handleError(err, "", -1)
756 return
757 }
758
759 retries--
760 continue
761 }
762
763 switch resp.Err {
764 case ErrNoError:
765 retries = s.parent.config.Metadata.Retry.Max
766 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
767 return
768 default:
769 s.parent.handleError(err, "", -1)
770 return
771 }
772
773 select {
774 case <-pause.C:
775 case <-s.hbDying:
776 return
777 }
778 }
779}
780
781// --------------------------------------------------------------------
782
783// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
784// It also provides hooks for your consumer group session life-cycle and allow you to
785// trigger logic before or after the consume loop(s).
786//
787// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
788// ensure that all state is safely protected against race conditions.
789type ConsumerGroupHandler interface {
790 // Setup is run at the beginning of a new session, before ConsumeClaim.
791 Setup(ConsumerGroupSession) error
792
793 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
794 // but before the offsets are committed for the very last time.
795 Cleanup(ConsumerGroupSession) error
796
797 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
798 // Once the Messages() channel is closed, the Handler must finish its processing
799 // loop and exit.
800 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
801}
802
803// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
804type ConsumerGroupClaim interface {
805 // Topic returns the consumed topic name.
806 Topic() string
807
808 // Partition returns the consumed partition.
809 Partition() int32
810
811 // InitialOffset returns the initial offset that was used as a starting point for this claim.
812 InitialOffset() int64
813
814 // HighWaterMarkOffset returns the high water mark offset of the partition,
815 // i.e. the offset that will be used for the next message that will be produced.
816 // You can use this to determine how far behind the processing is.
817 HighWaterMarkOffset() int64
818
819 // Messages returns the read channel for the messages that are returned by
820 // the broker. The messages channel will be closed when a new rebalance cycle
821 // is due. You must finish processing and mark offsets within
822 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
823 // re-assigned to another group member.
824 Messages() <-chan *ConsumerMessage
825}
826
827type consumerGroupClaim struct {
828 topic string
829 partition int32
830 offset int64
831 PartitionConsumer
832}
833
834func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
835 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
836 if err == ErrOffsetOutOfRange {
837 offset = sess.parent.config.Consumer.Offsets.Initial
838 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
839 }
840 if err != nil {
841 return nil, err
842 }
843
844 go func() {
845 for err := range pcm.Errors() {
846 sess.parent.handleError(err, topic, partition)
847 }
848 }()
849
850 return &consumerGroupClaim{
851 topic: topic,
852 partition: partition,
853 offset: offset,
854 PartitionConsumer: pcm,
855 }, nil
856}
857
858func (c *consumerGroupClaim) Topic() string { return c.topic }
859func (c *consumerGroupClaim) Partition() int32 { return c.partition }
860func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
861
862// Drains messages and errors, ensures the claim is fully closed.
863func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
864 go func() {
865 for range c.Messages() {
866 }
867 }()
868
869 for err := range c.Errors() {
870 errs = append(errs, err)
871 }
872 return
873}