blob: 8c8babcfd5345cd7fcdf334aaf6f00fc1fb10bef [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18 // Consume joins a cluster of consumers for a given list of topics and
19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20 //
21 // The life-cycle of a session is represented by the following steps:
22 //
23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24 // and is assigned their "fair share" of partitions, aka 'claims'.
25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
26 // of the claims and allow any necessary preparation or alteration of state.
27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29 // from concurrent reads/writes.
30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31 // parent context is cancelled or when a server-side rebalance cycle is initiated.
32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33 // to allow the user to perform any final tasks before a rebalance.
34 // 6. Finally, marked offsets are committed one last time before claims are released.
35 //
36 // Please note, that once a rebalance is triggered, sessions must be completed within
37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40 // commit failures.
41 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
42
43 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
44 // By default, errors are logged and not returned over this channel.
45 // If you want to implement any custom error handling, set your config's
46 // Consumer.Return.Errors setting to true, and read from this channel.
47 Errors() <-chan error
48
49 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
50 // this function before the object passes out of scope, as it will otherwise leak memory.
51 Close() error
52}
53
54type consumerGroup struct {
55 client Client
56 ownClient bool
57
58 config *Config
59 consumer Consumer
60 groupID string
61 memberID string
62 errors chan error
63
64 lock sync.Mutex
65 closed chan none
66 closeOnce sync.Once
67}
68
69// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
70func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
71 client, err := NewClient(addrs, config)
72 if err != nil {
73 return nil, err
74 }
75
76 c, err := NewConsumerGroupFromClient(groupID, client)
77 if err != nil {
78 _ = client.Close()
79 return nil, err
80 }
81
82 c.(*consumerGroup).ownClient = true
83 return c, nil
84}
85
86// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
87// necessary to call Close() on the underlying client when shutting down this consumer.
88// PLEASE NOTE: consumer groups can only re-use but not share clients.
89func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
90 config := client.Config()
91 if !config.Version.IsAtLeast(V0_10_2_0) {
92 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
93 }
94
95 consumer, err := NewConsumerFromClient(client)
96 if err != nil {
97 return nil, err
98 }
99
100 return &consumerGroup{
101 client: client,
102 consumer: consumer,
103 config: config,
104 groupID: groupID,
105 errors: make(chan error, config.ChannelBufferSize),
106 closed: make(chan none),
107 }, nil
108}
109
110// Errors implements ConsumerGroup.
111func (c *consumerGroup) Errors() <-chan error { return c.errors }
112
113// Close implements ConsumerGroup.
114func (c *consumerGroup) Close() (err error) {
115 c.closeOnce.Do(func() {
116 close(c.closed)
117
118 c.lock.Lock()
119 defer c.lock.Unlock()
120
121 // leave group
122 if e := c.leave(); e != nil {
123 err = e
124 }
125
126 // drain errors
127 go func() {
128 close(c.errors)
129 }()
130 for e := range c.errors {
131 err = e
132 }
133
134 if c.ownClient {
135 if e := c.client.Close(); e != nil {
136 err = e
137 }
138 }
139 })
140 return
141}
142
143// Consume implements ConsumerGroup.
144func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
145 // Ensure group is not closed
146 select {
147 case <-c.closed:
148 return ErrClosedConsumerGroup
149 default:
150 }
151
152 c.lock.Lock()
153 defer c.lock.Unlock()
154
155 // Quick exit when no topics are provided
156 if len(topics) == 0 {
157 return fmt.Errorf("no topics provided")
158 }
159
160 // Refresh metadata for requested topics
161 if err := c.client.RefreshMetadata(topics...); err != nil {
162 return err
163 }
164
165 // Get coordinator
166 coordinator, err := c.client.Coordinator(c.groupID)
167 if err != nil {
168 return err
169 }
170
171 // Init session
172 sess, err := c.newSession(ctx, coordinator, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
173 if err == ErrClosedClient {
174 return ErrClosedConsumerGroup
175 } else if err != nil {
176 return err
177 }
178
179 // Wait for session exit signal
180 <-sess.ctx.Done()
181
182 // Gracefully release session claims
183 return sess.release(true)
184}
185
186func (c *consumerGroup) newSession(ctx context.Context, coordinator *Broker, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
187 // Join consumer group
188 join, err := c.joinGroupRequest(coordinator, topics)
189 if err != nil {
190 _ = coordinator.Close()
191 return nil, err
192 }
193 switch join.Err {
194 case ErrNoError:
195 c.memberID = join.MemberId
196 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
197 c.memberID = ""
198 return c.newSession(ctx, coordinator, topics, handler, retries)
199 case ErrRebalanceInProgress: // retry after backoff
200 if retries <= 0 {
201 return nil, join.Err
202 }
203
204 select {
205 case <-c.closed:
206 return nil, ErrClosedConsumerGroup
207 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
208 }
209
210 return c.newSession(ctx, coordinator, topics, handler, retries-1)
211 default:
212 return nil, join.Err
213 }
214
215 // Prepare distribution plan if we joined as the leader
216 var plan BalanceStrategyPlan
217 if join.LeaderId == join.MemberId {
218 members, err := join.GetMembers()
219 if err != nil {
220 return nil, err
221 }
222
223 plan, err = c.balance(members)
224 if err != nil {
225 return nil, err
226 }
227 }
228
229 // Sync consumer group
230 sync, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
231 if err != nil {
232 _ = coordinator.Close()
233 return nil, err
234 }
235 switch sync.Err {
236 case ErrNoError:
237 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
238 c.memberID = ""
239 return c.newSession(ctx, coordinator, topics, handler, retries)
240 case ErrRebalanceInProgress: // retry after backoff
241 if retries <= 0 {
242 return nil, sync.Err
243 }
244
245 select {
246 case <-c.closed:
247 return nil, ErrClosedConsumerGroup
248 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
249 }
250
251 return c.newSession(ctx, coordinator, topics, handler, retries-1)
252 default:
253 return nil, sync.Err
254 }
255
256 // Retrieve and sort claims
257 var claims map[string][]int32
258 if len(sync.MemberAssignment) > 0 {
259 members, err := sync.GetMemberAssignment()
260 if err != nil {
261 return nil, err
262 }
263 claims = members.Topics
264
265 for _, partitions := range claims {
266 sort.Sort(int32Slice(partitions))
267 }
268 }
269
270 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
271}
272
273func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
274 req := &JoinGroupRequest{
275 GroupId: c.groupID,
276 MemberId: c.memberID,
277 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
278 ProtocolType: "consumer",
279 }
280 if c.config.Version.IsAtLeast(V0_10_1_0) {
281 req.Version = 1
282 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
283 }
284
285 meta := &ConsumerGroupMemberMetadata{
286 Topics: topics,
287 UserData: c.config.Consumer.Group.Member.UserData,
288 }
289 strategy := c.config.Consumer.Group.Rebalance.Strategy
290 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
291 return nil, err
292 }
293
294 return coordinator.JoinGroup(req)
295}
296
297func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
298 req := &SyncGroupRequest{
299 GroupId: c.groupID,
300 MemberId: c.memberID,
301 GenerationId: generationID,
302 }
303 for memberID, topics := range plan {
304 err := req.AddGroupAssignmentMember(memberID, &ConsumerGroupMemberAssignment{
305 Topics: topics,
306 })
307 if err != nil {
308 return nil, err
309 }
310 }
311 return coordinator.SyncGroup(req)
312}
313
314func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
315 req := &HeartbeatRequest{
316 GroupId: c.groupID,
317 MemberId: memberID,
318 GenerationId: generationID,
319 }
320
321 return coordinator.Heartbeat(req)
322}
323
324func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
325 topics := make(map[string][]int32)
326 for _, meta := range members {
327 for _, topic := range meta.Topics {
328 topics[topic] = nil
329 }
330 }
331
332 for topic := range topics {
333 partitions, err := c.client.Partitions(topic)
334 if err != nil {
335 return nil, err
336 }
337 topics[topic] = partitions
338 }
339
340 strategy := c.config.Consumer.Group.Rebalance.Strategy
341 return strategy.Plan(members, topics)
342}
343
344// Leaves the cluster, called by Close, protected by lock.
345func (c *consumerGroup) leave() error {
346 if c.memberID == "" {
347 return nil
348 }
349
350 coordinator, err := c.client.Coordinator(c.groupID)
351 if err != nil {
352 return err
353 }
354
355 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
356 GroupId: c.groupID,
357 MemberId: c.memberID,
358 })
359 if err != nil {
360 _ = coordinator.Close()
361 return err
362 }
363
364 // Unset memberID
365 c.memberID = ""
366
367 // Check response
368 switch resp.Err {
369 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
370 return nil
371 default:
372 return resp.Err
373 }
374}
375
376func (c *consumerGroup) handleError(err error, topic string, partition int32) {
377 select {
378 case <-c.closed:
379 return
380 default:
381 }
382
383 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
384 err = &ConsumerError{
385 Topic: topic,
386 Partition: partition,
387 Err: err,
388 }
389 }
390
391 if c.config.Consumer.Return.Errors {
392 select {
393 case c.errors <- err:
394 default:
395 }
396 } else {
397 Logger.Println(err)
398 }
399}
400
401// --------------------------------------------------------------------
402
403// ConsumerGroupSession represents a consumer group member session.
404type ConsumerGroupSession interface {
405 // Claims returns information about the claimed partitions by topic.
406 Claims() map[string][]int32
407
408 // MemberID returns the cluster member ID.
409 MemberID() string
410
411 // GenerationID returns the current generation ID.
412 GenerationID() int32
413
414 // MarkOffset marks the provided offset, alongside a metadata string
415 // that represents the state of the partition consumer at that point in time. The
416 // metadata string can be used by another consumer to restore that state, so it
417 // can resume consumption.
418 //
419 // To follow upstream conventions, you are expected to mark the offset of the
420 // next message to read, not the last message read. Thus, when calling `MarkOffset`
421 // you should typically add one to the offset of the last consumed message.
422 //
423 // Note: calling MarkOffset does not necessarily commit the offset to the backend
424 // store immediately for efficiency reasons, and it may never be committed if
425 // your application crashes. This means that you may end up processing the same
426 // message twice, and your processing should ideally be idempotent.
427 MarkOffset(topic string, partition int32, offset int64, metadata string)
428
429 // ResetOffset resets to the provided offset, alongside a metadata string that
430 // represents the state of the partition consumer at that point in time. Reset
431 // acts as a counterpart to MarkOffset, the difference being that it allows to
432 // reset an offset to an earlier or smaller value, where MarkOffset only
433 // allows incrementing the offset. cf MarkOffset for more details.
434 ResetOffset(topic string, partition int32, offset int64, metadata string)
435
436 // MarkMessage marks a message as consumed.
437 MarkMessage(msg *ConsumerMessage, metadata string)
438
439 // Context returns the session context.
440 Context() context.Context
441}
442
443type consumerGroupSession struct {
444 parent *consumerGroup
445 memberID string
446 generationID int32
447 handler ConsumerGroupHandler
448
449 claims map[string][]int32
450 offsets *offsetManager
451 ctx context.Context
452 cancel func()
453
454 waitGroup sync.WaitGroup
455 releaseOnce sync.Once
456 hbDying, hbDead chan none
457}
458
459func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
460 // init offset manager
461 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
462 if err != nil {
463 return nil, err
464 }
465
466 // init context
467 ctx, cancel := context.WithCancel(ctx)
468
469 // init session
470 sess := &consumerGroupSession{
471 parent: parent,
472 memberID: memberID,
473 generationID: generationID,
474 handler: handler,
475 offsets: offsets,
476 claims: claims,
477 ctx: ctx,
478 cancel: cancel,
479 hbDying: make(chan none),
480 hbDead: make(chan none),
481 }
482
483 // start heartbeat loop
484 go sess.heartbeatLoop()
485
486 // create a POM for each claim
487 for topic, partitions := range claims {
488 for _, partition := range partitions {
489 pom, err := offsets.ManagePartition(topic, partition)
490 if err != nil {
491 _ = sess.release(false)
492 return nil, err
493 }
494
495 // handle POM errors
496 go func(topic string, partition int32) {
497 for err := range pom.Errors() {
498 sess.parent.handleError(err, topic, partition)
499 }
500 }(topic, partition)
501 }
502 }
503
504 // perform setup
505 if err := handler.Setup(sess); err != nil {
506 _ = sess.release(true)
507 return nil, err
508 }
509
510 // start consuming
511 for topic, partitions := range claims {
512 for _, partition := range partitions {
513 sess.waitGroup.Add(1)
514
515 go func(topic string, partition int32) {
516 defer sess.waitGroup.Done()
517
518 // cancel the as session as soon as the first
519 // goroutine exits
520 defer sess.cancel()
521
522 // consume a single topic/partition, blocking
523 sess.consume(topic, partition)
524 }(topic, partition)
525 }
526 }
527 return sess, nil
528}
529
530func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
531func (s *consumerGroupSession) MemberID() string { return s.memberID }
532func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
533
534func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
535 if pom := s.offsets.findPOM(topic, partition); pom != nil {
536 pom.MarkOffset(offset, metadata)
537 }
538}
539
540func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
541 if pom := s.offsets.findPOM(topic, partition); pom != nil {
542 pom.ResetOffset(offset, metadata)
543 }
544}
545
546func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
547 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
548}
549
550func (s *consumerGroupSession) Context() context.Context {
551 return s.ctx
552}
553
554func (s *consumerGroupSession) consume(topic string, partition int32) {
555 // quick exit if rebalance is due
556 select {
557 case <-s.ctx.Done():
558 return
559 case <-s.parent.closed:
560 return
561 default:
562 }
563
564 // get next offset
565 offset := s.parent.config.Consumer.Offsets.Initial
566 if pom := s.offsets.findPOM(topic, partition); pom != nil {
567 offset, _ = pom.NextOffset()
568 }
569
570 // create new claim
571 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
572 if err != nil {
573 s.parent.handleError(err, topic, partition)
574 return
575 }
576
577 // handle errors
578 go func() {
579 for err := range claim.Errors() {
580 s.parent.handleError(err, topic, partition)
581 }
582 }()
583
584 // trigger close when session is done
585 go func() {
586 select {
587 case <-s.ctx.Done():
588 case <-s.parent.closed:
589 }
590 claim.AsyncClose()
591 }()
592
593 // start processing
594 if err := s.handler.ConsumeClaim(s, claim); err != nil {
595 s.parent.handleError(err, topic, partition)
596 }
597
598 // ensure consumer is closed & drained
599 claim.AsyncClose()
600 for _, err := range claim.waitClosed() {
601 s.parent.handleError(err, topic, partition)
602 }
603}
604
605func (s *consumerGroupSession) release(withCleanup bool) (err error) {
606 // signal release, stop heartbeat
607 s.cancel()
608
609 // wait for consumers to exit
610 s.waitGroup.Wait()
611
612 // perform release
613 s.releaseOnce.Do(func() {
614 if withCleanup {
615 if e := s.handler.Cleanup(s); e != nil {
616 s.parent.handleError(err, "", -1)
617 err = e
618 }
619 }
620
621 if e := s.offsets.Close(); e != nil {
622 err = e
623 }
624
625 close(s.hbDying)
626 <-s.hbDead
627 })
628
629 return
630}
631
632func (s *consumerGroupSession) heartbeatLoop() {
633 defer close(s.hbDead)
634 defer s.cancel() // trigger the end of the session on exit
635
636 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
637 defer pause.Stop()
638
639 retries := s.parent.config.Metadata.Retry.Max
640 for {
641 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
642 if err != nil {
643 if retries <= 0 {
644 s.parent.handleError(err, "", -1)
645 return
646 }
647
648 select {
649 case <-s.hbDying:
650 return
651 case <-time.After(s.parent.config.Metadata.Retry.Backoff):
652 retries--
653 }
654 continue
655 }
656
657 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
658 if err != nil {
659 _ = coordinator.Close()
660
661 if retries <= 0 {
662 s.parent.handleError(err, "", -1)
663 return
664 }
665
666 retries--
667 continue
668 }
669
670 switch resp.Err {
671 case ErrNoError:
672 retries = s.parent.config.Metadata.Retry.Max
673 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
674 return
675 default:
676 s.parent.handleError(err, "", -1)
677 return
678 }
679
680 select {
681 case <-pause.C:
682 case <-s.hbDying:
683 return
684 }
685 }
686}
687
688// --------------------------------------------------------------------
689
690// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
691// It also provides hooks for your consumer group session life-cycle and allow you to
692// trigger logic before or after the consume loop(s).
693//
694// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
695// ensure that all state is safely protected against race conditions.
696type ConsumerGroupHandler interface {
697 // Setup is run at the beginning of a new session, before ConsumeClaim.
698 Setup(ConsumerGroupSession) error
699
700 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
701 // but before the offsets are committed for the very last time.
702 Cleanup(ConsumerGroupSession) error
703
704 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
705 // Once the Messages() channel is closed, the Handler must finish its processing
706 // loop and exit.
707 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
708}
709
710// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
711type ConsumerGroupClaim interface {
712 // Topic returns the consumed topic name.
713 Topic() string
714
715 // Partition returns the consumed partition.
716 Partition() int32
717
718 // InitialOffset returns the initial offset that was used as a starting point for this claim.
719 InitialOffset() int64
720
721 // HighWaterMarkOffset returns the high water mark offset of the partition,
722 // i.e. the offset that will be used for the next message that will be produced.
723 // You can use this to determine how far behind the processing is.
724 HighWaterMarkOffset() int64
725
726 // Messages returns the read channel for the messages that are returned by
727 // the broker. The messages channel will be closed when a new rebalance cycle
728 // is due. You must finish processing and mark offsets within
729 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
730 // re-assigned to another group member.
731 Messages() <-chan *ConsumerMessage
732}
733
734type consumerGroupClaim struct {
735 topic string
736 partition int32
737 offset int64
738 PartitionConsumer
739}
740
741func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
742 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
743 if err == ErrOffsetOutOfRange {
744 offset = sess.parent.config.Consumer.Offsets.Initial
745 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
746 }
747 if err != nil {
748 return nil, err
749 }
750
751 go func() {
752 for err := range pcm.Errors() {
753 sess.parent.handleError(err, topic, partition)
754 }
755 }()
756
757 return &consumerGroupClaim{
758 topic: topic,
759 partition: partition,
760 offset: offset,
761 PartitionConsumer: pcm,
762 }, nil
763}
764
765func (c *consumerGroupClaim) Topic() string { return c.topic }
766func (c *consumerGroupClaim) Partition() int32 { return c.partition }
767func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
768
769// Drains messages and errors, ensures the claim is fully closed.
770func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
771 go func() {
772 for range c.Messages() {
773 }
774 }()
775
776 for err := range c.Errors() {
777 errs = append(errs, err)
778 }
779 return
780}