blob: 2bf236ae53c2579dfe1e76f85f75c05add99cd24 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
10)
11
12// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
13var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
14
15// ConsumerGroup is responsible for dividing up processing of topics and partitions
16// over a collection of processes (the members of the consumer group).
17type ConsumerGroup interface {
18 // Consume joins a cluster of consumers for a given list of topics and
19 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
20 //
21 // The life-cycle of a session is represented by the following steps:
22 //
23 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
24 // and is assigned their "fair share" of partitions, aka 'claims'.
25 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
26 // of the claims and allow any necessary preparation or alteration of state.
27 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
28 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
29 // from concurrent reads/writes.
30 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
31 // parent context is cancelled or when a server-side rebalance cycle is initiated.
32 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
33 // to allow the user to perform any final tasks before a rebalance.
34 // 6. Finally, marked offsets are committed one last time before claims are released.
35 //
36 // Please note, that once a rebalance is triggered, sessions must be completed within
37 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
38 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
39 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
40 // commit failures.
khenaidood948f772021-08-11 17:49:24 -040041 // This method should be called inside an infinite loop, when a
42 // server-side rebalance happens, the consumer session will need to be
43 // recreated to get the new claims.
khenaidooac637102019-01-14 15:44:34 -050044 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
45
46 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
47 // By default, errors are logged and not returned over this channel.
48 // If you want to implement any custom error handling, set your config's
49 // Consumer.Return.Errors setting to true, and read from this channel.
50 Errors() <-chan error
51
52 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
53 // this function before the object passes out of scope, as it will otherwise leak memory.
54 Close() error
55}
56
57type consumerGroup struct {
Scott Baker8461e152019-10-01 14:44:30 -070058 client Client
khenaidooac637102019-01-14 15:44:34 -050059
60 config *Config
61 consumer Consumer
62 groupID string
63 memberID string
64 errors chan error
65
66 lock sync.Mutex
67 closed chan none
68 closeOnce sync.Once
khenaidood948f772021-08-11 17:49:24 -040069
70 userData []byte
khenaidooac637102019-01-14 15:44:34 -050071}
72
73// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
74func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
75 client, err := NewClient(addrs, config)
76 if err != nil {
77 return nil, err
78 }
79
Scott Baker8461e152019-10-01 14:44:30 -070080 c, err := newConsumerGroup(groupID, client)
khenaidooac637102019-01-14 15:44:34 -050081 if err != nil {
82 _ = client.Close()
khenaidooac637102019-01-14 15:44:34 -050083 }
Scott Baker8461e152019-10-01 14:44:30 -070084 return c, err
khenaidooac637102019-01-14 15:44:34 -050085}
86
87// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
88// necessary to call Close() on the underlying client when shutting down this consumer.
89// PLEASE NOTE: consumer groups can only re-use but not share clients.
90func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
Scott Baker8461e152019-10-01 14:44:30 -070091 // For clients passed in by the client, ensure we don't
92 // call Close() on it.
93 cli := &nopCloserClient{client}
94 return newConsumerGroup(groupID, cli)
95}
96
97func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
khenaidooac637102019-01-14 15:44:34 -050098 config := client.Config()
99 if !config.Version.IsAtLeast(V0_10_2_0) {
100 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
101 }
102
103 consumer, err := NewConsumerFromClient(client)
104 if err != nil {
105 return nil, err
106 }
107
108 return &consumerGroup{
109 client: client,
110 consumer: consumer,
111 config: config,
112 groupID: groupID,
113 errors: make(chan error, config.ChannelBufferSize),
114 closed: make(chan none),
115 }, nil
116}
117
118// Errors implements ConsumerGroup.
119func (c *consumerGroup) Errors() <-chan error { return c.errors }
120
121// Close implements ConsumerGroup.
122func (c *consumerGroup) Close() (err error) {
123 c.closeOnce.Do(func() {
124 close(c.closed)
125
khenaidooac637102019-01-14 15:44:34 -0500126 // leave group
127 if e := c.leave(); e != nil {
128 err = e
129 }
130
131 // drain errors
132 go func() {
133 close(c.errors)
134 }()
135 for e := range c.errors {
136 err = e
137 }
138
Scott Baker8461e152019-10-01 14:44:30 -0700139 if e := c.client.Close(); e != nil {
140 err = e
khenaidooac637102019-01-14 15:44:34 -0500141 }
142 })
143 return
144}
145
146// Consume implements ConsumerGroup.
147func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
148 // Ensure group is not closed
149 select {
150 case <-c.closed:
151 return ErrClosedConsumerGroup
152 default:
153 }
154
155 c.lock.Lock()
156 defer c.lock.Unlock()
157
158 // Quick exit when no topics are provided
159 if len(topics) == 0 {
160 return fmt.Errorf("no topics provided")
161 }
162
163 // Refresh metadata for requested topics
164 if err := c.client.RefreshMetadata(topics...); err != nil {
165 return err
166 }
167
khenaidooac637102019-01-14 15:44:34 -0500168 // Init session
Scott Baker8461e152019-10-01 14:44:30 -0700169 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
khenaidooac637102019-01-14 15:44:34 -0500170 if err == ErrClosedClient {
171 return ErrClosedConsumerGroup
172 } else if err != nil {
173 return err
174 }
175
khenaidood948f772021-08-11 17:49:24 -0400176 // loop check topic partition numbers changed
177 // will trigger rebalance when any topic partitions number had changed
178 // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
179 go c.loopCheckPartitionNumbers(topics, sess)
180
khenaidooac637102019-01-14 15:44:34 -0500181 // Wait for session exit signal
182 <-sess.ctx.Done()
183
184 // Gracefully release session claims
185 return sess.release(true)
186}
187
Scott Baker8461e152019-10-01 14:44:30 -0700188func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
189 select {
190 case <-c.closed:
191 return nil, ErrClosedConsumerGroup
192 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
193 }
194
195 if refreshCoordinator {
196 err := c.client.RefreshCoordinator(c.groupID)
197 if err != nil {
198 return c.retryNewSession(ctx, topics, handler, retries, true)
199 }
200 }
201
202 return c.newSession(ctx, topics, handler, retries-1)
203}
204
205func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
206 coordinator, err := c.client.Coordinator(c.groupID)
207 if err != nil {
208 if retries <= 0 {
209 return nil, err
210 }
211
212 return c.retryNewSession(ctx, topics, handler, retries, true)
213 }
214
khenaidooac637102019-01-14 15:44:34 -0500215 // Join consumer group
216 join, err := c.joinGroupRequest(coordinator, topics)
217 if err != nil {
218 _ = coordinator.Close()
219 return nil, err
220 }
221 switch join.Err {
222 case ErrNoError:
223 c.memberID = join.MemberId
224 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
225 c.memberID = ""
Scott Baker8461e152019-10-01 14:44:30 -0700226 return c.newSession(ctx, topics, handler, retries)
227 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
228 if retries <= 0 {
229 return nil, join.Err
230 }
231
232 return c.retryNewSession(ctx, topics, handler, retries, true)
khenaidooac637102019-01-14 15:44:34 -0500233 case ErrRebalanceInProgress: // retry after backoff
234 if retries <= 0 {
235 return nil, join.Err
236 }
237
Scott Baker8461e152019-10-01 14:44:30 -0700238 return c.retryNewSession(ctx, topics, handler, retries, false)
khenaidooac637102019-01-14 15:44:34 -0500239 default:
240 return nil, join.Err
241 }
242
243 // Prepare distribution plan if we joined as the leader
244 var plan BalanceStrategyPlan
245 if join.LeaderId == join.MemberId {
246 members, err := join.GetMembers()
247 if err != nil {
248 return nil, err
249 }
250
251 plan, err = c.balance(members)
252 if err != nil {
253 return nil, err
254 }
255 }
256
257 // Sync consumer group
khenaidood948f772021-08-11 17:49:24 -0400258 groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
khenaidooac637102019-01-14 15:44:34 -0500259 if err != nil {
260 _ = coordinator.Close()
261 return nil, err
262 }
khenaidood948f772021-08-11 17:49:24 -0400263 switch groupRequest.Err {
khenaidooac637102019-01-14 15:44:34 -0500264 case ErrNoError:
265 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
266 c.memberID = ""
Scott Baker8461e152019-10-01 14:44:30 -0700267 return c.newSession(ctx, topics, handler, retries)
268 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
269 if retries <= 0 {
khenaidood948f772021-08-11 17:49:24 -0400270 return nil, groupRequest.Err
Scott Baker8461e152019-10-01 14:44:30 -0700271 }
272
273 return c.retryNewSession(ctx, topics, handler, retries, true)
khenaidooac637102019-01-14 15:44:34 -0500274 case ErrRebalanceInProgress: // retry after backoff
275 if retries <= 0 {
khenaidood948f772021-08-11 17:49:24 -0400276 return nil, groupRequest.Err
khenaidooac637102019-01-14 15:44:34 -0500277 }
278
Scott Baker8461e152019-10-01 14:44:30 -0700279 return c.retryNewSession(ctx, topics, handler, retries, false)
khenaidooac637102019-01-14 15:44:34 -0500280 default:
khenaidood948f772021-08-11 17:49:24 -0400281 return nil, groupRequest.Err
khenaidooac637102019-01-14 15:44:34 -0500282 }
283
284 // Retrieve and sort claims
285 var claims map[string][]int32
khenaidood948f772021-08-11 17:49:24 -0400286 if len(groupRequest.MemberAssignment) > 0 {
287 members, err := groupRequest.GetMemberAssignment()
khenaidooac637102019-01-14 15:44:34 -0500288 if err != nil {
289 return nil, err
290 }
291 claims = members.Topics
khenaidood948f772021-08-11 17:49:24 -0400292 c.userData = members.UserData
khenaidooac637102019-01-14 15:44:34 -0500293
294 for _, partitions := range claims {
295 sort.Sort(int32Slice(partitions))
296 }
297 }
298
299 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
300}
301
302func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
303 req := &JoinGroupRequest{
304 GroupId: c.groupID,
305 MemberId: c.memberID,
306 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
307 ProtocolType: "consumer",
308 }
309 if c.config.Version.IsAtLeast(V0_10_1_0) {
310 req.Version = 1
311 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
312 }
313
khenaidood948f772021-08-11 17:49:24 -0400314 // use static user-data if configured, otherwise use consumer-group userdata from the last sync
315 userData := c.config.Consumer.Group.Member.UserData
316 if len(userData) == 0 {
317 userData = c.userData
318 }
khenaidooac637102019-01-14 15:44:34 -0500319 meta := &ConsumerGroupMemberMetadata{
320 Topics: topics,
khenaidood948f772021-08-11 17:49:24 -0400321 UserData: userData,
khenaidooac637102019-01-14 15:44:34 -0500322 }
323 strategy := c.config.Consumer.Group.Rebalance.Strategy
324 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
325 return nil, err
326 }
327
328 return coordinator.JoinGroup(req)
329}
330
331func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
332 req := &SyncGroupRequest{
333 GroupId: c.groupID,
334 MemberId: c.memberID,
335 GenerationId: generationID,
336 }
khenaidood948f772021-08-11 17:49:24 -0400337 strategy := c.config.Consumer.Group.Rebalance.Strategy
khenaidooac637102019-01-14 15:44:34 -0500338 for memberID, topics := range plan {
khenaidood948f772021-08-11 17:49:24 -0400339 assignment := &ConsumerGroupMemberAssignment{Topics: topics}
340 userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
khenaidooac637102019-01-14 15:44:34 -0500341 if err != nil {
342 return nil, err
343 }
khenaidood948f772021-08-11 17:49:24 -0400344 assignment.UserData = userDataBytes
345 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
346 return nil, err
347 }
khenaidooac637102019-01-14 15:44:34 -0500348 }
349 return coordinator.SyncGroup(req)
350}
351
352func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
353 req := &HeartbeatRequest{
354 GroupId: c.groupID,
355 MemberId: memberID,
356 GenerationId: generationID,
357 }
358
359 return coordinator.Heartbeat(req)
360}
361
362func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
363 topics := make(map[string][]int32)
364 for _, meta := range members {
365 for _, topic := range meta.Topics {
366 topics[topic] = nil
367 }
368 }
369
370 for topic := range topics {
371 partitions, err := c.client.Partitions(topic)
372 if err != nil {
373 return nil, err
374 }
375 topics[topic] = partitions
376 }
377
378 strategy := c.config.Consumer.Group.Rebalance.Strategy
379 return strategy.Plan(members, topics)
380}
381
khenaidood948f772021-08-11 17:49:24 -0400382// Leaves the cluster, called by Close.
khenaidooac637102019-01-14 15:44:34 -0500383func (c *consumerGroup) leave() error {
khenaidood948f772021-08-11 17:49:24 -0400384 c.lock.Lock()
385 defer c.lock.Unlock()
khenaidooac637102019-01-14 15:44:34 -0500386 if c.memberID == "" {
387 return nil
388 }
389
390 coordinator, err := c.client.Coordinator(c.groupID)
391 if err != nil {
392 return err
393 }
394
395 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
396 GroupId: c.groupID,
397 MemberId: c.memberID,
398 })
399 if err != nil {
400 _ = coordinator.Close()
401 return err
402 }
403
404 // Unset memberID
405 c.memberID = ""
406
407 // Check response
408 switch resp.Err {
409 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
410 return nil
411 default:
412 return resp.Err
413 }
414}
415
416func (c *consumerGroup) handleError(err error, topic string, partition int32) {
khenaidooac637102019-01-14 15:44:34 -0500417 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
418 err = &ConsumerError{
419 Topic: topic,
420 Partition: partition,
421 Err: err,
422 }
423 }
424
khenaidood948f772021-08-11 17:49:24 -0400425 if !c.config.Consumer.Return.Errors {
khenaidooac637102019-01-14 15:44:34 -0500426 Logger.Println(err)
khenaidood948f772021-08-11 17:49:24 -0400427 return
khenaidooac637102019-01-14 15:44:34 -0500428 }
khenaidood948f772021-08-11 17:49:24 -0400429
430 select {
431 case <-c.closed:
432 // consumer is closed
433 return
434 default:
435 }
436
437 select {
438 case c.errors <- err:
439 default:
440 // no error listener
441 }
442}
443
444func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
445 pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
446 defer session.cancel()
447 defer pause.Stop()
448 var oldTopicToPartitionNum map[string]int
449 var err error
450 if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
451 return
452 }
453 for {
454 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
455 return
456 } else {
457 for topic, num := range oldTopicToPartitionNum {
458 if newTopicToPartitionNum[topic] != num {
459 return // trigger the end of the session on exit
460 }
461 }
462 }
463 select {
464 case <-pause.C:
465 case <-session.ctx.Done():
466 Logger.Printf("loop check partition number coroutine will exit, topics %s", topics)
467 // if session closed by other, should be exited
468 return
469 case <-c.closed:
470 return
471 }
472 }
473}
474
475func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
476 topicToPartitionNum := make(map[string]int, len(topics))
477 for _, topic := range topics {
478 if partitionNum, err := c.client.Partitions(topic); err != nil {
479 Logger.Printf("Consumer Group topic %s get partition number failed %v", topic, err)
480 return nil, err
481 } else {
482 topicToPartitionNum[topic] = len(partitionNum)
483 }
484 }
485 return topicToPartitionNum, nil
khenaidooac637102019-01-14 15:44:34 -0500486}
487
488// --------------------------------------------------------------------
489
490// ConsumerGroupSession represents a consumer group member session.
491type ConsumerGroupSession interface {
492 // Claims returns information about the claimed partitions by topic.
493 Claims() map[string][]int32
494
495 // MemberID returns the cluster member ID.
496 MemberID() string
497
498 // GenerationID returns the current generation ID.
499 GenerationID() int32
500
501 // MarkOffset marks the provided offset, alongside a metadata string
502 // that represents the state of the partition consumer at that point in time. The
503 // metadata string can be used by another consumer to restore that state, so it
504 // can resume consumption.
505 //
506 // To follow upstream conventions, you are expected to mark the offset of the
507 // next message to read, not the last message read. Thus, when calling `MarkOffset`
508 // you should typically add one to the offset of the last consumed message.
509 //
510 // Note: calling MarkOffset does not necessarily commit the offset to the backend
511 // store immediately for efficiency reasons, and it may never be committed if
512 // your application crashes. This means that you may end up processing the same
513 // message twice, and your processing should ideally be idempotent.
514 MarkOffset(topic string, partition int32, offset int64, metadata string)
515
khenaidood948f772021-08-11 17:49:24 -0400516 // Commit the offset to the backend
517 //
518 // Note: calling Commit performs a blocking synchronous operation.
519 Commit()
520
khenaidooac637102019-01-14 15:44:34 -0500521 // ResetOffset resets to the provided offset, alongside a metadata string that
522 // represents the state of the partition consumer at that point in time. Reset
523 // acts as a counterpart to MarkOffset, the difference being that it allows to
524 // reset an offset to an earlier or smaller value, where MarkOffset only
525 // allows incrementing the offset. cf MarkOffset for more details.
526 ResetOffset(topic string, partition int32, offset int64, metadata string)
527
528 // MarkMessage marks a message as consumed.
529 MarkMessage(msg *ConsumerMessage, metadata string)
530
531 // Context returns the session context.
532 Context() context.Context
533}
534
535type consumerGroupSession struct {
536 parent *consumerGroup
537 memberID string
538 generationID int32
539 handler ConsumerGroupHandler
540
541 claims map[string][]int32
542 offsets *offsetManager
543 ctx context.Context
544 cancel func()
545
546 waitGroup sync.WaitGroup
547 releaseOnce sync.Once
548 hbDying, hbDead chan none
549}
550
551func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
552 // init offset manager
553 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
554 if err != nil {
555 return nil, err
556 }
557
558 // init context
559 ctx, cancel := context.WithCancel(ctx)
560
561 // init session
562 sess := &consumerGroupSession{
563 parent: parent,
564 memberID: memberID,
565 generationID: generationID,
566 handler: handler,
567 offsets: offsets,
568 claims: claims,
569 ctx: ctx,
570 cancel: cancel,
571 hbDying: make(chan none),
572 hbDead: make(chan none),
573 }
574
575 // start heartbeat loop
576 go sess.heartbeatLoop()
577
578 // create a POM for each claim
579 for topic, partitions := range claims {
580 for _, partition := range partitions {
581 pom, err := offsets.ManagePartition(topic, partition)
582 if err != nil {
583 _ = sess.release(false)
584 return nil, err
585 }
586
587 // handle POM errors
588 go func(topic string, partition int32) {
589 for err := range pom.Errors() {
590 sess.parent.handleError(err, topic, partition)
591 }
592 }(topic, partition)
593 }
594 }
595
596 // perform setup
597 if err := handler.Setup(sess); err != nil {
598 _ = sess.release(true)
599 return nil, err
600 }
601
602 // start consuming
603 for topic, partitions := range claims {
604 for _, partition := range partitions {
605 sess.waitGroup.Add(1)
606
607 go func(topic string, partition int32) {
608 defer sess.waitGroup.Done()
609
610 // cancel the as session as soon as the first
611 // goroutine exits
612 defer sess.cancel()
613
614 // consume a single topic/partition, blocking
615 sess.consume(topic, partition)
616 }(topic, partition)
617 }
618 }
619 return sess, nil
620}
621
622func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
623func (s *consumerGroupSession) MemberID() string { return s.memberID }
624func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
625
626func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
627 if pom := s.offsets.findPOM(topic, partition); pom != nil {
628 pom.MarkOffset(offset, metadata)
629 }
630}
631
khenaidood948f772021-08-11 17:49:24 -0400632func (s *consumerGroupSession) Commit() {
633 s.offsets.Commit()
634}
635
khenaidooac637102019-01-14 15:44:34 -0500636func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
637 if pom := s.offsets.findPOM(topic, partition); pom != nil {
638 pom.ResetOffset(offset, metadata)
639 }
640}
641
642func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
643 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
644}
645
646func (s *consumerGroupSession) Context() context.Context {
647 return s.ctx
648}
649
650func (s *consumerGroupSession) consume(topic string, partition int32) {
651 // quick exit if rebalance is due
652 select {
653 case <-s.ctx.Done():
654 return
655 case <-s.parent.closed:
656 return
657 default:
658 }
659
660 // get next offset
661 offset := s.parent.config.Consumer.Offsets.Initial
662 if pom := s.offsets.findPOM(topic, partition); pom != nil {
663 offset, _ = pom.NextOffset()
664 }
665
666 // create new claim
667 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
668 if err != nil {
669 s.parent.handleError(err, topic, partition)
670 return
671 }
672
673 // handle errors
674 go func() {
675 for err := range claim.Errors() {
676 s.parent.handleError(err, topic, partition)
677 }
678 }()
679
680 // trigger close when session is done
681 go func() {
682 select {
683 case <-s.ctx.Done():
684 case <-s.parent.closed:
685 }
686 claim.AsyncClose()
687 }()
688
689 // start processing
690 if err := s.handler.ConsumeClaim(s, claim); err != nil {
691 s.parent.handleError(err, topic, partition)
692 }
693
694 // ensure consumer is closed & drained
695 claim.AsyncClose()
696 for _, err := range claim.waitClosed() {
697 s.parent.handleError(err, topic, partition)
698 }
699}
700
701func (s *consumerGroupSession) release(withCleanup bool) (err error) {
702 // signal release, stop heartbeat
703 s.cancel()
704
705 // wait for consumers to exit
706 s.waitGroup.Wait()
707
708 // perform release
709 s.releaseOnce.Do(func() {
710 if withCleanup {
711 if e := s.handler.Cleanup(s); e != nil {
Scott Baker8461e152019-10-01 14:44:30 -0700712 s.parent.handleError(e, "", -1)
khenaidooac637102019-01-14 15:44:34 -0500713 err = e
714 }
715 }
716
717 if e := s.offsets.Close(); e != nil {
718 err = e
719 }
720
721 close(s.hbDying)
722 <-s.hbDead
723 })
724
725 return
726}
727
728func (s *consumerGroupSession) heartbeatLoop() {
729 defer close(s.hbDead)
730 defer s.cancel() // trigger the end of the session on exit
731
732 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
733 defer pause.Stop()
734
khenaidood948f772021-08-11 17:49:24 -0400735 retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
736 defer retryBackoff.Stop()
737
khenaidooac637102019-01-14 15:44:34 -0500738 retries := s.parent.config.Metadata.Retry.Max
739 for {
740 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
741 if err != nil {
742 if retries <= 0 {
743 s.parent.handleError(err, "", -1)
744 return
745 }
khenaidood948f772021-08-11 17:49:24 -0400746 retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
khenaidooac637102019-01-14 15:44:34 -0500747 select {
748 case <-s.hbDying:
749 return
khenaidood948f772021-08-11 17:49:24 -0400750 case <-retryBackoff.C:
khenaidooac637102019-01-14 15:44:34 -0500751 retries--
752 }
753 continue
754 }
755
756 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
757 if err != nil {
758 _ = coordinator.Close()
William Kurkiandaa6bb22019-03-07 12:26:28 -0500759
760 if retries <= 0 {
761 s.parent.handleError(err, "", -1)
762 return
763 }
764
khenaidooac637102019-01-14 15:44:34 -0500765 retries--
766 continue
767 }
768
769 switch resp.Err {
770 case ErrNoError:
771 retries = s.parent.config.Metadata.Retry.Max
772 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrIllegalGeneration:
773 return
774 default:
khenaidood948f772021-08-11 17:49:24 -0400775 s.parent.handleError(resp.Err, "", -1)
khenaidooac637102019-01-14 15:44:34 -0500776 return
777 }
778
779 select {
780 case <-pause.C:
781 case <-s.hbDying:
782 return
783 }
784 }
785}
786
787// --------------------------------------------------------------------
788
789// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
790// It also provides hooks for your consumer group session life-cycle and allow you to
791// trigger logic before or after the consume loop(s).
792//
793// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
794// ensure that all state is safely protected against race conditions.
795type ConsumerGroupHandler interface {
796 // Setup is run at the beginning of a new session, before ConsumeClaim.
797 Setup(ConsumerGroupSession) error
798
799 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
800 // but before the offsets are committed for the very last time.
801 Cleanup(ConsumerGroupSession) error
802
803 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
804 // Once the Messages() channel is closed, the Handler must finish its processing
805 // loop and exit.
806 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
807}
808
809// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
810type ConsumerGroupClaim interface {
811 // Topic returns the consumed topic name.
812 Topic() string
813
814 // Partition returns the consumed partition.
815 Partition() int32
816
817 // InitialOffset returns the initial offset that was used as a starting point for this claim.
818 InitialOffset() int64
819
820 // HighWaterMarkOffset returns the high water mark offset of the partition,
821 // i.e. the offset that will be used for the next message that will be produced.
822 // You can use this to determine how far behind the processing is.
823 HighWaterMarkOffset() int64
824
825 // Messages returns the read channel for the messages that are returned by
826 // the broker. The messages channel will be closed when a new rebalance cycle
827 // is due. You must finish processing and mark offsets within
828 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
829 // re-assigned to another group member.
830 Messages() <-chan *ConsumerMessage
831}
832
833type consumerGroupClaim struct {
834 topic string
835 partition int32
836 offset int64
837 PartitionConsumer
838}
839
840func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
841 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
842 if err == ErrOffsetOutOfRange {
843 offset = sess.parent.config.Consumer.Offsets.Initial
844 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
845 }
846 if err != nil {
847 return nil, err
848 }
849
850 go func() {
851 for err := range pcm.Errors() {
852 sess.parent.handleError(err, topic, partition)
853 }
854 }()
855
856 return &consumerGroupClaim{
857 topic: topic,
858 partition: partition,
859 offset: offset,
860 PartitionConsumer: pcm,
861 }, nil
862}
863
864func (c *consumerGroupClaim) Topic() string { return c.topic }
865func (c *consumerGroupClaim) Partition() int32 { return c.partition }
866func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
867
868// Drains messages and errors, ensures the claim is fully closed.
869func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
870 go func() {
871 for range c.Messages() {
872 }
873 }()
874
875 for err := range c.Errors() {
876 errs = append(errs, err)
877 }
878 return
879}