blob: 8de95137e6f4a5e5d1e223d40747f5726d94db05 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18 // Consume joins a cluster of consumers for a given list of topics and
19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20 //
21 // The life-cycle of a session is represented by the following steps:
22 //
23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24 // and is assigned their "fair share" of partitions, aka 'claims'.
25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
26 // of the claims and allow any necessary preparation or alteration of state.
27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29 // from concurrent reads/writes.
30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31 // parent context is cancelled or when a server-side rebalance cycle is initiated.
32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33 // to allow the user to perform any final tasks before a rebalance.
34 // 6. Finally, marked offsets are committed one last time before claims are released.
35 //
36 // Please note, that once a rebalance is triggered, sessions must be completed within
37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40 // commit failures.
41 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
42
43 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
44 // By default, errors are logged and not returned over this channel.
45 // If you want to implement any custom error handling, set your config's
46 // Consumer.Return.Errors setting to true, and read from this channel.
47 Errors() <-chan error
48
49 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
50 // this function before the object passes out of scope, as it will otherwise leak memory.
51 Close() error
52}
53
54type consumerGroup struct {
Scott Baker8461e152019-10-01 14:44:30 -070055 client Client
khenaidooac637102019-01-14 15:44:34 -050056
57 config *Config
58 consumer Consumer
59 groupID string
60 memberID string
61 errors chan error
62
63 lock sync.Mutex
64 closed chan none
65 closeOnce sync.Once
66}
67
68// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
69func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
70 client, err := NewClient(addrs, config)
71 if err != nil {
72 return nil, err
73 }
74
Scott Baker8461e152019-10-01 14:44:30 -070075 c, err := newConsumerGroup(groupID, client)
khenaidooac637102019-01-14 15:44:34 -050076 if err != nil {
77 _ = client.Close()
khenaidooac637102019-01-14 15:44:34 -050078 }
Scott Baker8461e152019-10-01 14:44:30 -070079 return c, err
khenaidooac637102019-01-14 15:44:34 -050080}
81
82// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
83// necessary to call Close() on the underlying client when shutting down this consumer.
84// PLEASE NOTE: consumer groups can only re-use but not share clients.
85func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
Scott Baker8461e152019-10-01 14:44:30 -070086 // For clients passed in by the client, ensure we don't
87 // call Close() on it.
88 cli := &nopCloserClient{client}
89 return newConsumerGroup(groupID, cli)
90}
91
92func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
khenaidooac637102019-01-14 15:44:34 -050093 config := client.Config()
94 if !config.Version.IsAtLeast(V0_10_2_0) {
95 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
96 }
97
98 consumer, err := NewConsumerFromClient(client)
99 if err != nil {
100 return nil, err
101 }
102
103 return &consumerGroup{
104 client: client,
105 consumer: consumer,
106 config: config,
107 groupID: groupID,
108 errors: make(chan error, config.ChannelBufferSize),
109 closed: make(chan none),
110 }, nil
111}
112
113// Errors implements ConsumerGroup.
114func (c *consumerGroup) Errors() <-chan error { return c.errors }
115
116// Close implements ConsumerGroup.
117func (c *consumerGroup) Close() (err error) {
118 c.closeOnce.Do(func() {
119 close(c.closed)
120
121 c.lock.Lock()
122 defer c.lock.Unlock()
123
124 // leave group
125 if e := c.leave(); e != nil {
126 err = e
127 }
128
129 // drain errors
130 go func() {
131 close(c.errors)
132 }()
133 for e := range c.errors {
134 err = e
135 }
136
Scott Baker8461e152019-10-01 14:44:30 -0700137 if e := c.client.Close(); e != nil {
138 err = e
khenaidooac637102019-01-14 15:44:34 -0500139 }
140 })
141 return
142}
143
144// Consume implements ConsumerGroup.
145func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
146 // Ensure group is not closed
147 select {
148 case <-c.closed:
149 return ErrClosedConsumerGroup
150 default:
151 }
152
153 c.lock.Lock()
154 defer c.lock.Unlock()
155
156 // Quick exit when no topics are provided
157 if len(topics) == 0 {
158 return fmt.Errorf("no topics provided")
159 }
160
161 // Refresh metadata for requested topics
162 if err := c.client.RefreshMetadata(topics...); err != nil {
163 return err
164 }
165
khenaidooac637102019-01-14 15:44:34 -0500166 // Init session
Scott Baker8461e152019-10-01 14:44:30 -0700167 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
khenaidooac637102019-01-14 15:44:34 -0500168 if err == ErrClosedClient {
169 return ErrClosedConsumerGroup
170 } else if err != nil {
171 return err
172 }
173
174 // Wait for session exit signal
175 <-sess.ctx.Done()
176
177 // Gracefully release session claims
178 return sess.release(true)
179}
180
Scott Baker8461e152019-10-01 14:44:30 -0700181func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
182 select {
183 case <-c.closed:
184 return nil, ErrClosedConsumerGroup
185 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
186 }
187
188 if refreshCoordinator {
189 err := c.client.RefreshCoordinator(c.groupID)
190 if err != nil {
191 return c.retryNewSession(ctx, topics, handler, retries, true)
192 }
193 }
194
195 return c.newSession(ctx, topics, handler, retries-1)
196}
197
198func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
199 coordinator, err := c.client.Coordinator(c.groupID)
200 if err != nil {
201 if retries <= 0 {
202 return nil, err
203 }
204
205 return c.retryNewSession(ctx, topics, handler, retries, true)
206 }
207
khenaidooac637102019-01-14 15:44:34 -0500208 // Join consumer group
209 join, err := c.joinGroupRequest(coordinator, topics)
210 if err != nil {
211 _ = coordinator.Close()
212 return nil, err
213 }
214 switch join.Err {
215 case ErrNoError:
216 c.memberID = join.MemberId
217 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
218 c.memberID = ""
Scott Baker8461e152019-10-01 14:44:30 -0700219 return c.newSession(ctx, topics, handler, retries)
220 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
221 if retries <= 0 {
222 return nil, join.Err
223 }
224
225 return c.retryNewSession(ctx, topics, handler, retries, true)
khenaidooac637102019-01-14 15:44:34 -0500226 case ErrRebalanceInProgress: // retry after backoff
227 if retries <= 0 {
228 return nil, join.Err
229 }
230
Scott Baker8461e152019-10-01 14:44:30 -0700231 return c.retryNewSession(ctx, topics, handler, retries, false)
khenaidooac637102019-01-14 15:44:34 -0500232 default:
233 return nil, join.Err
234 }
235
236 // Prepare distribution plan if we joined as the leader
237 var plan BalanceStrategyPlan
238 if join.LeaderId == join.MemberId {
239 members, err := join.GetMembers()
240 if err != nil {
241 return nil, err
242 }
243
244 plan, err = c.balance(members)
245 if err != nil {
246 return nil, err
247 }
248 }
249
250 // Sync consumer group
251 sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
252 if err != nil {
253 _ = coordinator.Close()
254 return nil, err
255 }
256 switch sync.Err {
257 case ErrNoError:
258 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
259 c.memberID = ""
Scott Baker8461e152019-10-01 14:44:30 -0700260 return c.newSession(ctx, topics, handler, retries)
261 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
262 if retries <= 0 {
263 return nil, sync.Err
264 }
265
266 return c.retryNewSession(ctx, topics, handler, retries, true)
khenaidooac637102019-01-14 15:44:34 -0500267 case ErrRebalanceInProgress: // retry after backoff
268 if retries <= 0 {
269 return nil, sync.Err
270 }
271
Scott Baker8461e152019-10-01 14:44:30 -0700272 return c.retryNewSession(ctx, topics, handler, retries, false)
khenaidooac637102019-01-14 15:44:34 -0500273 default:
274 return nil, sync.Err
275 }
276
277 // Retrieve and sort claims
278 var claims map[string][]int32
279 if len(sync.MemberAssignment) > 0 {
280 members, err := sync.GetMemberAssignment()
281 if err != nil {
282 return nil, err
283 }
284 claims = members.Topics
285
286 for _, partitions := range claims {
287 sort.Sort(int32Slice(partitions))
288 }
289 }
290
291 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
292}
293
294func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
295 req := &JoinGroupRequest{
296 GroupId: c.groupID,
297 MemberId: c.memberID,
298 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
299 ProtocolType: "consumer",
300 }
301 if c.config.Version.IsAtLeast(V0_10_1_0) {
302 req.Version = 1
303 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
304 }
305
306 meta := &ConsumerGroupMemberMetadata{
307 Topics: topics,
308 UserData: c.config.Consumer.Group.Member.UserData,
309 }
310 strategy := c.config.Consumer.Group.Rebalance.Strategy
311 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
312 return nil, err
313 }
314
315 return coordinator.JoinGroup(req)
316}
317
318func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
319 req := &SyncGroupRequest{
320 GroupId: c.groupID,
321 MemberId: c.memberID,
322 GenerationId: generationID,
323 }
324 for memberID, topics := range plan {
325 err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
326 Topics: topics,
327 })
328 if err != nil {
329 return nil, err
330 }
331 }
332 return coordinator.SyncGroup(req)
333}
334
335func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
336 req := &HeartbeatRequest{
337 GroupId: c.groupID,
338 MemberId: memberID,
339 GenerationId: generationID,
340 }
341
342 return coordinator.Heartbeat(req)
343}
344
345func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
346 topics := make(map[string][]int32)
347 for _, meta := range members {
348 for _, topic := range meta.Topics {
349 topics[topic] = nil
350 }
351 }
352
353 for topic := range topics {
354 partitions, err := c.client.Partitions(topic)
355 if err != nil {
356 return nil, err
357 }
358 topics[topic] = partitions
359 }
360
361 strategy := c.config.Consumer.Group.Rebalance.Strategy
362 return strategy.Plan(members, topics)
363}
364
365// Leaves the cluster, called by Close, protected by lock.
366func (c *consumerGroup) leave() error {
367 if c.memberID == "" {
368 return nil
369 }
370
371 coordinator, err := c.client.Coordinator(c.groupID)
372 if err != nil {
373 return err
374 }
375
376 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
377 GroupId: c.groupID,
378 MemberId: c.memberID,
379 })
380 if err != nil {
381 _ = coordinator.Close()
382 return err
383 }
384
385 // Unset memberID
386 c.memberID = ""
387
388 // Check response
389 switch resp.Err {
390 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
391 return nil
392 default:
393 return resp.Err
394 }
395}
396
397func (c *consumerGroup) handleError(err error, topic string, partition int32) {
398 select {
399 case <-c.closed:
400 return
401 default:
402 }
403
404 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
405 err = &ConsumerError{
406 Topic: topic,
407 Partition: partition,
408 Err: err,
409 }
410 }
411
412 if c.config.Consumer.Return.Errors {
413 select {
414 case c.errors <- err:
415 default:
416 }
417 } else {
418 Logger.Println(err)
419 }
420}
421
422// --------------------------------------------------------------------
423
424// ConsumerGroupSession represents a consumer group member session.
425type ConsumerGroupSession interface {
426 // Claims returns information about the claimed partitions by topic.
427 Claims() map[string][]int32
428
429 // MemberID returns the cluster member ID.
430 MemberID() string
431
432 // GenerationID returns the current generation ID.
433 GenerationID() int32
434
435 // MarkOffset marks the provided offset, alongside a metadata string
436 // that represents the state of the partition consumer at that point in time. The
437 // metadata string can be used by another consumer to restore that state, so it
438 // can resume consumption.
439 //
440 // To follow upstream conventions, you are expected to mark the offset of the
441 // next message to read, not the last message read. Thus, when calling `MarkOffset`
442 // you should typically add one to the offset of the last consumed message.
443 //
444 // Note: calling MarkOffset does not necessarily commit the offset to the backend
445 // store immediately for efficiency reasons, and it may never be committed if
446 // your application crashes. This means that you may end up processing the same
447 // message twice, and your processing should ideally be idempotent.
448 MarkOffset(topic string, partition int32, offset int64, metadata string)
449
450 // ResetOffset resets to the provided offset, alongside a metadata string that
451 // represents the state of the partition consumer at that point in time. Reset
452 // acts as a counterpart to MarkOffset, the difference being that it allows to
453 // reset an offset to an earlier or smaller value, where MarkOffset only
454 // allows incrementing the offset. cf MarkOffset for more details.
455 ResetOffset(topic string, partition int32, offset int64, metadata string)
456
457 // MarkMessage marks a message as consumed.
458 MarkMessage(msg *ConsumerMessage, metadata string)
459
460 // Context returns the session context.
461 Context() context.Context
462}
463
464type consumerGroupSession struct {
465 parent *consumerGroup
466 memberID string
467 generationID int32
468 handler ConsumerGroupHandler
469
470 claims map[string][]int32
471 offsets *offsetManager
472 ctx context.Context
473 cancel func()
474
475 waitGroup sync.WaitGroup
476 releaseOnce sync.Once
477 hbDying, hbDead chan none
478}
479
480func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
481 // init offset manager
482 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
483 if err != nil {
484 return nil, err
485 }
486
487 // init context
488 ctx, cancel := context.WithCancel(ctx)
489
490 // init session
491 sess := &consumerGroupSession{
492 parent: parent,
493 memberID: memberID,
494 generationID: generationID,
495 handler: handler,
496 offsets: offsets,
497 claims: claims,
498 ctx: ctx,
499 cancel: cancel,
500 hbDying: make(chan none),
501 hbDead: make(chan none),
502 }
503
504 // start heartbeat loop
505 go sess.heartbeatLoop()
506
507 // create a POM for each claim
508 for topic, partitions := range claims {
509 for _, partition := range partitions {
510 pom, err := offsets.ManagePartition(topic, partition)
511 if err != nil {
512 _ = sess.release(false)
513 return nil, err
514 }
515
516 // handle POM errors
517 go func(topic string, partition int32) {
518 for err := range pom.Errors() {
519 sess.parent.handleError(err, topic, partition)
520 }
521 }(topic, partition)
522 }
523 }
524
525 // perform setup
526 if err := handler.Setup(sess); err != nil {
527 _ = sess.release(true)
528 return nil, err
529 }
530
531 // start consuming
532 for topic, partitions := range claims {
533 for _, partition := range partitions {
534 sess.waitGroup.Add(1)
535
536 go func(topic string, partition int32) {
537 defer sess.waitGroup.Done()
538
539 // cancel the as session as soon as the first
540 // goroutine exits
541 defer sess.cancel()
542
543 // consume a single topic/partition, blocking
544 sess.consume(topic, partition)
545 }(topic, partition)
546 }
547 }
548 return sess, nil
549}
550
551func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
552func (s *consumerGroupSession) MemberID() string { return s.memberID }
553func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
554
555func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
556 if pom := s.offsets.findPOM(topic, partition); pom != nil {
557 pom.MarkOffset(offset, metadata)
558 }
559}
560
561func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
562 if pom := s.offsets.findPOM(topic, partition); pom != nil {
563 pom.ResetOffset(offset, metadata)
564 }
565}
566
567func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
568 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
569}
570
571func (s *consumerGroupSession) Context() context.Context {
572 return s.ctx
573}
574
575func (s *consumerGroupSession) consume(topic string, partition int32) {
576 // quick exit if rebalance is due
577 select {
578 case <-s.ctx.Done():
579 return
580 case <-s.parent.closed:
581 return
582 default:
583 }
584
585 // get next offset
586 offset := s.parent.config.Consumer.Offsets.Initial
587 if pom := s.offsets.findPOM(topic, partition); pom != nil {
588 offset, _ = pom.NextOffset()
589 }
590
591 // create new claim
592 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
593 if err != nil {
594 s.parent.handleError(err, topic, partition)
595 return
596 }
597
598 // handle errors
599 go func() {
600 for err := range claim.Errors() {
601 s.parent.handleError(err, topic, partition)
602 }
603 }()
604
605 // trigger close when session is done
606 go func() {
607 select {
608 case <-s.ctx.Done():
609 case <-s.parent.closed:
610 }
611 claim.AsyncClose()
612 }()
613
614 // start processing
615 if err := s.handler.ConsumeClaim(s, claim); err != nil {
616 s.parent.handleError(err, topic, partition)
617 }
618
619 // ensure consumer is closed & drained
620 claim.AsyncClose()
621 for _, err := range claim.waitClosed() {
622 s.parent.handleError(err, topic, partition)
623 }
624}
625
626func (s *consumerGroupSession) release(withCleanup bool) (err error) {
627 // signal release, stop heartbeat
628 s.cancel()
629
630 // wait for consumers to exit
631 s.waitGroup.Wait()
632
633 // perform release
634 s.releaseOnce.Do(func() {
635 if withCleanup {
636 if e := s.handler.Cleanup(s); e != nil {
Scott Baker8461e152019-10-01 14:44:30 -0700637 s.parent.handleError(e, "", -1)
khenaidooac637102019-01-14 15:44:34 -0500638 err = e
639 }
640 }
641
642 if e := s.offsets.Close(); e != nil {
643 err = e
644 }
645
646 close(s.hbDying)
647 <-s.hbDead
648 })
649
650 return
651}
652
653func (s *consumerGroupSession) heartbeatLoop() {
654 defer close(s.hbDead)
655 defer s.cancel() // trigger the end of the session on exit
656
657 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
658 defer pause.Stop()
659
660 retries := s.parent.config.Metadata.Retry.Max
661 for {
662 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
663 if err != nil {
664 if retries <= 0 {
665 s.parent.handleError(err, "", -1)
666 return
667 }
668
669 select {
670 case <-s.hbDying:
671 return
672 case <-time.After(s.parent.config.Metadata.Retry.Backoff):
673 retries--
674 }
675 continue
676 }
677
678 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
679 if err != nil {
680 _ = coordinator.Close()
William Kurkiandaa6bb22019-03-07 12:26:28 -0500681
682 if retries <= 0 {
683 s.parent.handleError(err, "", -1)
684 return
685 }
686
khenaidooac637102019-01-14 15:44:34 -0500687 retries--
688 continue
689 }
690
691 switch resp.Err {
692 case ErrNoError:
693 retries = s.parent.config.Metadata.Retry.Max
694 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
695 return
696 default:
697 s.parent.handleError(err, "", -1)
698 return
699 }
700
701 select {
702 case <-pause.C:
703 case <-s.hbDying:
704 return
705 }
706 }
707}
708
709// --------------------------------------------------------------------
710
711// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
712// It also provides hooks for your consumer group session life-cycle and allow you to
713// trigger logic before or after the consume loop(s).
714//
715// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
716// ensure that all state is safely protected against race conditions.
717type ConsumerGroupHandler interface {
718 // Setup is run at the beginning of a new session, before ConsumeClaim.
719 Setup(ConsumerGroupSession) error
720
721 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
722 // but before the offsets are committed for the very last time.
723 Cleanup(ConsumerGroupSession) error
724
725 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
726 // Once the Messages() channel is closed, the Handler must finish its processing
727 // loop and exit.
728 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
729}
730
731// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
732type ConsumerGroupClaim interface {
733 // Topic returns the consumed topic name.
734 Topic() string
735
736 // Partition returns the consumed partition.
737 Partition() int32
738
739 // InitialOffset returns the initial offset that was used as a starting point for this claim.
740 InitialOffset() int64
741
742 // HighWaterMarkOffset returns the high water mark offset of the partition,
743 // i.e. the offset that will be used for the next message that will be produced.
744 // You can use this to determine how far behind the processing is.
745 HighWaterMarkOffset() int64
746
747 // Messages returns the read channel for the messages that are returned by
748 // the broker. The messages channel will be closed when a new rebalance cycle
749 // is due. You must finish processing and mark offsets within
750 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
751 // re-assigned to another group member.
752 Messages() <-chan *ConsumerMessage
753}
754
755type consumerGroupClaim struct {
756 topic string
757 partition int32
758 offset int64
759 PartitionConsumer
760}
761
762func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
763 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
764 if err == ErrOffsetOutOfRange {
765 offset = sess.parent.config.Consumer.Offsets.Initial
766 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
767 }
768 if err != nil {
769 return nil, err
770 }
771
772 go func() {
773 for err := range pcm.Errors() {
774 sess.parent.handleError(err, topic, partition)
775 }
776 }()
777
778 return &consumerGroupClaim{
779 topic: topic,
780 partition: partition,
781 offset: offset,
782 PartitionConsumer: pcm,
783 }, nil
784}
785
786func (c *consumerGroupClaim) Topic() string { return c.topic }
787func (c *consumerGroupClaim) Partition() int32 { return c.partition }
788func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
789
790// Drains messages and errors, ensures the claim is fully closed.
791func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
792 go func() {
793 for range c.Messages() {
794 }
795 }()
796
797 for err := range c.Errors() {
798 errs = append(errs, err)
799 }
800 return
801}