blob: 2355dbb2ce73b867e21a399f35cb7b50de3f4179 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "context"
5 "errors"
6 "fmt"
7 "sort"
8 "sync"
9 "time"
kesavandc71914f2022-03-25 11:19:03 +053010
11 "github.com/rcrowley/go-metrics"
kesavand2cde6582020-06-22 04:56:23 -040012)
13
14// ErrClosedConsumerGroup is the error returned when a method is called on a consumer group that has been closed.
15var ErrClosedConsumerGroup = errors.New("kafka: tried to use a consumer group that was closed")
16
17// ConsumerGroup is responsible for dividing up processing of topics and partitions
18// over a collection of processes (the members of the consumer group).
19type ConsumerGroup interface {
20 // Consume joins a cluster of consumers for a given list of topics and
21 // starts a blocking ConsumerGroupSession through the ConsumerGroupHandler.
22 //
23 // The life-cycle of a session is represented by the following steps:
24 //
25 // 1. The consumers join the group (as explained in https://kafka.apache.org/documentation/#intro_consumers)
26 // and is assigned their "fair share" of partitions, aka 'claims'.
27 // 2. Before processing starts, the handler's Setup() hook is called to notify the user
28 // of the claims and allow any necessary preparation or alteration of state.
29 // 3. For each of the assigned claims the handler's ConsumeClaim() function is then called
30 // in a separate goroutine which requires it to be thread-safe. Any state must be carefully protected
31 // from concurrent reads/writes.
32 // 4. The session will persist until one of the ConsumeClaim() functions exits. This can be either when the
kesavandc71914f2022-03-25 11:19:03 +053033 // parent context is canceled or when a server-side rebalance cycle is initiated.
kesavand2cde6582020-06-22 04:56:23 -040034 // 5. Once all the ConsumeClaim() loops have exited, the handler's Cleanup() hook is called
35 // to allow the user to perform any final tasks before a rebalance.
36 // 6. Finally, marked offsets are committed one last time before claims are released.
37 //
38 // Please note, that once a rebalance is triggered, sessions must be completed within
39 // Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
40 // as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
41 // is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
42 // commit failures.
kesavandc71914f2022-03-25 11:19:03 +053043 // This method should be called inside an infinite loop, when a
44 // server-side rebalance happens, the consumer session will need to be
45 // recreated to get the new claims.
kesavand2cde6582020-06-22 04:56:23 -040046 Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error
47
48 // Errors returns a read channel of errors that occurred during the consumer life-cycle.
49 // By default, errors are logged and not returned over this channel.
50 // If you want to implement any custom error handling, set your config's
51 // Consumer.Return.Errors setting to true, and read from this channel.
52 Errors() <-chan error
53
54 // Close stops the ConsumerGroup and detaches any running sessions. It is required to call
55 // this function before the object passes out of scope, as it will otherwise leak memory.
56 Close() error
kesavandc71914f2022-03-25 11:19:03 +053057
58 // Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
59 // records from these partitions until they have been resumed using Resume()/ResumeAll().
60 // Note that this method does not affect partition subscription.
61 // In particular, it does not cause a group rebalance when automatic assignment is used.
62 Pause(partitions map[string][]int32)
63
64 // Resume resumes specified partitions which have been paused with Pause()/PauseAll().
65 // New calls to the broker will return records from these partitions if there are any to be fetched.
66 Resume(partitions map[string][]int32)
67
68 // Pause suspends fetching from all partitions. Future calls to the broker will not return any
69 // records from these partitions until they have been resumed using Resume()/ResumeAll().
70 // Note that this method does not affect partition subscription.
71 // In particular, it does not cause a group rebalance when automatic assignment is used.
72 PauseAll()
73
74 // Resume resumes all partitions which have been paused with Pause()/PauseAll().
75 // New calls to the broker will return records from these partitions if there are any to be fetched.
76 ResumeAll()
kesavand2cde6582020-06-22 04:56:23 -040077}
78
79type consumerGroup struct {
80 client Client
81
82 config *Config
83 consumer Consumer
84 groupID string
85 memberID string
86 errors chan error
87
88 lock sync.Mutex
89 closed chan none
90 closeOnce sync.Once
kesavandc71914f2022-03-25 11:19:03 +053091
92 userData []byte
kesavand2cde6582020-06-22 04:56:23 -040093}
94
95// NewConsumerGroup creates a new consumer group the given broker addresses and configuration.
96func NewConsumerGroup(addrs []string, groupID string, config *Config) (ConsumerGroup, error) {
97 client, err := NewClient(addrs, config)
98 if err != nil {
99 return nil, err
100 }
101
102 c, err := newConsumerGroup(groupID, client)
103 if err != nil {
104 _ = client.Close()
105 }
106 return c, err
107}
108
109// NewConsumerGroupFromClient creates a new consumer group using the given client. It is still
110// necessary to call Close() on the underlying client when shutting down this consumer.
111// PLEASE NOTE: consumer groups can only re-use but not share clients.
112func NewConsumerGroupFromClient(groupID string, client Client) (ConsumerGroup, error) {
113 // For clients passed in by the client, ensure we don't
114 // call Close() on it.
115 cli := &nopCloserClient{client}
116 return newConsumerGroup(groupID, cli)
117}
118
119func newConsumerGroup(groupID string, client Client) (ConsumerGroup, error) {
120 config := client.Config()
121 if !config.Version.IsAtLeast(V0_10_2_0) {
122 return nil, ConfigurationError("consumer groups require Version to be >= V0_10_2_0")
123 }
124
125 consumer, err := NewConsumerFromClient(client)
126 if err != nil {
127 return nil, err
128 }
129
130 return &consumerGroup{
131 client: client,
132 consumer: consumer,
133 config: config,
134 groupID: groupID,
135 errors: make(chan error, config.ChannelBufferSize),
136 closed: make(chan none),
kesavandc71914f2022-03-25 11:19:03 +0530137 userData: config.Consumer.Group.Member.UserData,
kesavand2cde6582020-06-22 04:56:23 -0400138 }, nil
139}
140
141// Errors implements ConsumerGroup.
142func (c *consumerGroup) Errors() <-chan error { return c.errors }
143
144// Close implements ConsumerGroup.
145func (c *consumerGroup) Close() (err error) {
146 c.closeOnce.Do(func() {
147 close(c.closed)
148
kesavand2cde6582020-06-22 04:56:23 -0400149 // leave group
150 if e := c.leave(); e != nil {
151 err = e
152 }
153
154 // drain errors
155 go func() {
156 close(c.errors)
157 }()
158 for e := range c.errors {
159 err = e
160 }
161
162 if e := c.client.Close(); e != nil {
163 err = e
164 }
165 })
166 return
167}
168
169// Consume implements ConsumerGroup.
170func (c *consumerGroup) Consume(ctx context.Context, topics []string, handler ConsumerGroupHandler) error {
171 // Ensure group is not closed
172 select {
173 case <-c.closed:
174 return ErrClosedConsumerGroup
175 default:
176 }
177
178 c.lock.Lock()
179 defer c.lock.Unlock()
180
181 // Quick exit when no topics are provided
182 if len(topics) == 0 {
183 return fmt.Errorf("no topics provided")
184 }
185
186 // Refresh metadata for requested topics
187 if err := c.client.RefreshMetadata(topics...); err != nil {
188 return err
189 }
190
191 // Init session
192 sess, err := c.newSession(ctx, topics, handler, c.config.Consumer.Group.Rebalance.Retry.Max)
193 if err == ErrClosedClient {
194 return ErrClosedConsumerGroup
195 } else if err != nil {
196 return err
197 }
198
kesavandc71914f2022-03-25 11:19:03 +0530199 // loop check topic partition numbers changed
200 // will trigger rebalance when any topic partitions number had changed
201 // avoid Consume function called again that will generate more than loopCheckPartitionNumbers coroutine
202 go c.loopCheckPartitionNumbers(topics, sess)
203
kesavand2cde6582020-06-22 04:56:23 -0400204 // Wait for session exit signal
205 <-sess.ctx.Done()
206
207 // Gracefully release session claims
208 return sess.release(true)
209}
210
kesavandc71914f2022-03-25 11:19:03 +0530211// Pause implements ConsumerGroup.
212func (c *consumerGroup) Pause(partitions map[string][]int32) {
213 c.consumer.Pause(partitions)
214}
215
216// Resume implements ConsumerGroup.
217func (c *consumerGroup) Resume(partitions map[string][]int32) {
218 c.consumer.Resume(partitions)
219}
220
221// PauseAll implements ConsumerGroup.
222func (c *consumerGroup) PauseAll() {
223 c.consumer.PauseAll()
224}
225
226// ResumeAll implements ConsumerGroup.
227func (c *consumerGroup) ResumeAll() {
228 c.consumer.ResumeAll()
229}
230
kesavand2cde6582020-06-22 04:56:23 -0400231func (c *consumerGroup) retryNewSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int, refreshCoordinator bool) (*consumerGroupSession, error) {
232 select {
233 case <-c.closed:
234 return nil, ErrClosedConsumerGroup
235 case <-time.After(c.config.Consumer.Group.Rebalance.Retry.Backoff):
236 }
237
238 if refreshCoordinator {
239 err := c.client.RefreshCoordinator(c.groupID)
240 if err != nil {
241 return c.retryNewSession(ctx, topics, handler, retries, true)
242 }
243 }
244
245 return c.newSession(ctx, topics, handler, retries-1)
246}
247
248func (c *consumerGroup) newSession(ctx context.Context, topics []string, handler ConsumerGroupHandler, retries int) (*consumerGroupSession, error) {
249 coordinator, err := c.client.Coordinator(c.groupID)
250 if err != nil {
251 if retries <= 0 {
252 return nil, err
253 }
254
255 return c.retryNewSession(ctx, topics, handler, retries, true)
256 }
257
kesavandc71914f2022-03-25 11:19:03 +0530258 var (
259 metricRegistry = c.config.MetricRegistry
260 consumerGroupJoinTotal metrics.Counter
261 consumerGroupJoinFailed metrics.Counter
262 consumerGroupSyncTotal metrics.Counter
263 consumerGroupSyncFailed metrics.Counter
264 )
265
266 if metricRegistry != nil {
267 consumerGroupJoinTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-total-%s", c.groupID), metricRegistry)
268 consumerGroupJoinFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-join-failed-%s", c.groupID), metricRegistry)
269 consumerGroupSyncTotal = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-total-%s", c.groupID), metricRegistry)
270 consumerGroupSyncFailed = metrics.GetOrRegisterCounter(fmt.Sprintf("consumer-group-sync-failed-%s", c.groupID), metricRegistry)
271 }
272
kesavand2cde6582020-06-22 04:56:23 -0400273 // Join consumer group
274 join, err := c.joinGroupRequest(coordinator, topics)
kesavandc71914f2022-03-25 11:19:03 +0530275 if consumerGroupJoinTotal != nil {
276 consumerGroupJoinTotal.Inc(1)
277 }
kesavand2cde6582020-06-22 04:56:23 -0400278 if err != nil {
279 _ = coordinator.Close()
kesavandc71914f2022-03-25 11:19:03 +0530280 if consumerGroupJoinFailed != nil {
281 consumerGroupJoinFailed.Inc(1)
282 }
kesavand2cde6582020-06-22 04:56:23 -0400283 return nil, err
284 }
kesavandc71914f2022-03-25 11:19:03 +0530285 if join.Err != ErrNoError {
286 if consumerGroupJoinFailed != nil {
287 consumerGroupJoinFailed.Inc(1)
288 }
289 }
kesavand2cde6582020-06-22 04:56:23 -0400290 switch join.Err {
291 case ErrNoError:
292 c.memberID = join.MemberId
293 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
294 c.memberID = ""
295 return c.newSession(ctx, topics, handler, retries)
296 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
297 if retries <= 0 {
298 return nil, join.Err
299 }
300
301 return c.retryNewSession(ctx, topics, handler, retries, true)
302 case ErrRebalanceInProgress: // retry after backoff
303 if retries <= 0 {
304 return nil, join.Err
305 }
306
307 return c.retryNewSession(ctx, topics, handler, retries, false)
308 default:
309 return nil, join.Err
310 }
311
312 // Prepare distribution plan if we joined as the leader
313 var plan BalanceStrategyPlan
314 if join.LeaderId == join.MemberId {
315 members, err := join.GetMembers()
316 if err != nil {
317 return nil, err
318 }
319
320 plan, err = c.balance(members)
321 if err != nil {
322 return nil, err
323 }
324 }
325
326 // Sync consumer group
kesavandc71914f2022-03-25 11:19:03 +0530327 groupRequest, err := c.syncGroupRequest(coordinator, plan, join.GenerationId)
328 if consumerGroupSyncTotal != nil {
329 consumerGroupSyncTotal.Inc(1)
330 }
kesavand2cde6582020-06-22 04:56:23 -0400331 if err != nil {
332 _ = coordinator.Close()
kesavandc71914f2022-03-25 11:19:03 +0530333 if consumerGroupSyncFailed != nil {
334 consumerGroupSyncFailed.Inc(1)
335 }
kesavand2cde6582020-06-22 04:56:23 -0400336 return nil, err
337 }
kesavandc71914f2022-03-25 11:19:03 +0530338 if groupRequest.Err != ErrNoError {
339 if consumerGroupSyncFailed != nil {
340 consumerGroupSyncFailed.Inc(1)
341 }
342 }
343
344 switch groupRequest.Err {
kesavand2cde6582020-06-22 04:56:23 -0400345 case ErrNoError:
346 case ErrUnknownMemberId, ErrIllegalGeneration: // reset member ID and retry immediately
347 c.memberID = ""
348 return c.newSession(ctx, topics, handler, retries)
349 case ErrNotCoordinatorForConsumer: // retry after backoff with coordinator refresh
350 if retries <= 0 {
kesavandc71914f2022-03-25 11:19:03 +0530351 return nil, groupRequest.Err
kesavand2cde6582020-06-22 04:56:23 -0400352 }
353
354 return c.retryNewSession(ctx, topics, handler, retries, true)
355 case ErrRebalanceInProgress: // retry after backoff
356 if retries <= 0 {
kesavandc71914f2022-03-25 11:19:03 +0530357 return nil, groupRequest.Err
kesavand2cde6582020-06-22 04:56:23 -0400358 }
359
360 return c.retryNewSession(ctx, topics, handler, retries, false)
361 default:
kesavandc71914f2022-03-25 11:19:03 +0530362 return nil, groupRequest.Err
kesavand2cde6582020-06-22 04:56:23 -0400363 }
364
365 // Retrieve and sort claims
366 var claims map[string][]int32
kesavandc71914f2022-03-25 11:19:03 +0530367 if len(groupRequest.MemberAssignment) > 0 {
368 members, err := groupRequest.GetMemberAssignment()
kesavand2cde6582020-06-22 04:56:23 -0400369 if err != nil {
370 return nil, err
371 }
372 claims = members.Topics
373
kesavandc71914f2022-03-25 11:19:03 +0530374 // in the case of stateful balance strategies, hold on to the returned
375 // assignment metadata, otherwise, reset the statically defined conusmer
376 // group metadata
377 if members.UserData != nil {
378 c.userData = members.UserData
379 } else {
380 c.userData = c.config.Consumer.Group.Member.UserData
381 }
382
kesavand2cde6582020-06-22 04:56:23 -0400383 for _, partitions := range claims {
384 sort.Sort(int32Slice(partitions))
385 }
386 }
387
388 return newConsumerGroupSession(ctx, c, claims, join.MemberId, join.GenerationId, handler)
389}
390
391func (c *consumerGroup) joinGroupRequest(coordinator *Broker, topics []string) (*JoinGroupResponse, error) {
392 req := &JoinGroupRequest{
393 GroupId: c.groupID,
394 MemberId: c.memberID,
395 SessionTimeout: int32(c.config.Consumer.Group.Session.Timeout / time.Millisecond),
396 ProtocolType: "consumer",
397 }
398 if c.config.Version.IsAtLeast(V0_10_1_0) {
399 req.Version = 1
400 req.RebalanceTimeout = int32(c.config.Consumer.Group.Rebalance.Timeout / time.Millisecond)
401 }
402
403 meta := &ConsumerGroupMemberMetadata{
404 Topics: topics,
kesavandc71914f2022-03-25 11:19:03 +0530405 UserData: c.userData,
kesavand2cde6582020-06-22 04:56:23 -0400406 }
407 strategy := c.config.Consumer.Group.Rebalance.Strategy
408 if err := req.AddGroupProtocolMetadata(strategy.Name(), meta); err != nil {
409 return nil, err
410 }
411
412 return coordinator.JoinGroup(req)
413}
414
415func (c *consumerGroup) syncGroupRequest(coordinator *Broker, plan BalanceStrategyPlan, generationID int32) (*SyncGroupResponse, error) {
416 req := &SyncGroupRequest{
417 GroupId: c.groupID,
418 MemberId: c.memberID,
419 GenerationId: generationID,
420 }
kesavandc71914f2022-03-25 11:19:03 +0530421 strategy := c.config.Consumer.Group.Rebalance.Strategy
kesavand2cde6582020-06-22 04:56:23 -0400422 for memberID, topics := range plan {
kesavandc71914f2022-03-25 11:19:03 +0530423 assignment := &ConsumerGroupMemberAssignment{Topics: topics}
424 userDataBytes, err := strategy.AssignmentData(memberID, topics, generationID)
kesavand2cde6582020-06-22 04:56:23 -0400425 if err != nil {
426 return nil, err
427 }
kesavandc71914f2022-03-25 11:19:03 +0530428 assignment.UserData = userDataBytes
429 if err := req.AddGroupAssignmentMember(memberID, assignment); err != nil {
430 return nil, err
431 }
kesavand2cde6582020-06-22 04:56:23 -0400432 }
433 return coordinator.SyncGroup(req)
434}
435
436func (c *consumerGroup) heartbeatRequest(coordinator *Broker, memberID string, generationID int32) (*HeartbeatResponse, error) {
437 req := &HeartbeatRequest{
438 GroupId: c.groupID,
439 MemberId: memberID,
440 GenerationId: generationID,
441 }
442
443 return coordinator.Heartbeat(req)
444}
445
446func (c *consumerGroup) balance(members map[string]ConsumerGroupMemberMetadata) (BalanceStrategyPlan, error) {
447 topics := make(map[string][]int32)
448 for _, meta := range members {
449 for _, topic := range meta.Topics {
450 topics[topic] = nil
451 }
452 }
453
454 for topic := range topics {
455 partitions, err := c.client.Partitions(topic)
456 if err != nil {
457 return nil, err
458 }
459 topics[topic] = partitions
460 }
461
462 strategy := c.config.Consumer.Group.Rebalance.Strategy
463 return strategy.Plan(members, topics)
464}
465
kesavandc71914f2022-03-25 11:19:03 +0530466// Leaves the cluster, called by Close.
kesavand2cde6582020-06-22 04:56:23 -0400467func (c *consumerGroup) leave() error {
kesavandc71914f2022-03-25 11:19:03 +0530468 c.lock.Lock()
469 defer c.lock.Unlock()
kesavand2cde6582020-06-22 04:56:23 -0400470 if c.memberID == "" {
471 return nil
472 }
473
474 coordinator, err := c.client.Coordinator(c.groupID)
475 if err != nil {
476 return err
477 }
478
479 resp, err := coordinator.LeaveGroup(&LeaveGroupRequest{
480 GroupId: c.groupID,
481 MemberId: c.memberID,
482 })
483 if err != nil {
484 _ = coordinator.Close()
485 return err
486 }
487
488 // Unset memberID
489 c.memberID = ""
490
491 // Check response
492 switch resp.Err {
493 case ErrRebalanceInProgress, ErrUnknownMemberId, ErrNoError:
494 return nil
495 default:
496 return resp.Err
497 }
498}
499
500func (c *consumerGroup) handleError(err error, topic string, partition int32) {
kesavand2cde6582020-06-22 04:56:23 -0400501 if _, ok := err.(*ConsumerError); !ok && topic != "" && partition > -1 {
502 err = &ConsumerError{
503 Topic: topic,
504 Partition: partition,
505 Err: err,
506 }
507 }
508
kesavandc71914f2022-03-25 11:19:03 +0530509 if !c.config.Consumer.Return.Errors {
kesavand2cde6582020-06-22 04:56:23 -0400510 Logger.Println(err)
kesavandc71914f2022-03-25 11:19:03 +0530511 return
kesavand2cde6582020-06-22 04:56:23 -0400512 }
kesavandc71914f2022-03-25 11:19:03 +0530513
514 select {
515 case <-c.closed:
516 // consumer is closed
517 return
518 default:
519 }
520
521 select {
522 case c.errors <- err:
523 default:
524 // no error listener
525 }
526}
527
528func (c *consumerGroup) loopCheckPartitionNumbers(topics []string, session *consumerGroupSession) {
529 pause := time.NewTicker(c.config.Metadata.RefreshFrequency)
530 defer session.cancel()
531 defer pause.Stop()
532 var oldTopicToPartitionNum map[string]int
533 var err error
534 if oldTopicToPartitionNum, err = c.topicToPartitionNumbers(topics); err != nil {
535 return
536 }
537 for {
538 if newTopicToPartitionNum, err := c.topicToPartitionNumbers(topics); err != nil {
539 return
540 } else {
541 for topic, num := range oldTopicToPartitionNum {
542 if newTopicToPartitionNum[topic] != num {
543 return // trigger the end of the session on exit
544 }
545 }
546 }
547 select {
548 case <-pause.C:
549 case <-session.ctx.Done():
550 Logger.Printf(
551 "consumergroup/%s loop check partition number coroutine will exit, topics %s\n",
552 c.groupID, topics)
553 // if session closed by other, should be exited
554 return
555 case <-c.closed:
556 return
557 }
558 }
559}
560
561func (c *consumerGroup) topicToPartitionNumbers(topics []string) (map[string]int, error) {
562 topicToPartitionNum := make(map[string]int, len(topics))
563 for _, topic := range topics {
564 if partitionNum, err := c.client.Partitions(topic); err != nil {
565 Logger.Printf(
566 "consumergroup/%s topic %s get partition number failed %v\n",
567 c.groupID, err)
568 return nil, err
569 } else {
570 topicToPartitionNum[topic] = len(partitionNum)
571 }
572 }
573 return topicToPartitionNum, nil
kesavand2cde6582020-06-22 04:56:23 -0400574}
575
576// --------------------------------------------------------------------
577
578// ConsumerGroupSession represents a consumer group member session.
579type ConsumerGroupSession interface {
580 // Claims returns information about the claimed partitions by topic.
581 Claims() map[string][]int32
582
583 // MemberID returns the cluster member ID.
584 MemberID() string
585
586 // GenerationID returns the current generation ID.
587 GenerationID() int32
588
589 // MarkOffset marks the provided offset, alongside a metadata string
590 // that represents the state of the partition consumer at that point in time. The
591 // metadata string can be used by another consumer to restore that state, so it
592 // can resume consumption.
593 //
594 // To follow upstream conventions, you are expected to mark the offset of the
595 // next message to read, not the last message read. Thus, when calling `MarkOffset`
596 // you should typically add one to the offset of the last consumed message.
597 //
598 // Note: calling MarkOffset does not necessarily commit the offset to the backend
599 // store immediately for efficiency reasons, and it may never be committed if
600 // your application crashes. This means that you may end up processing the same
601 // message twice, and your processing should ideally be idempotent.
602 MarkOffset(topic string, partition int32, offset int64, metadata string)
603
kesavandc71914f2022-03-25 11:19:03 +0530604 // Commit the offset to the backend
605 //
606 // Note: calling Commit performs a blocking synchronous operation.
607 Commit()
608
kesavand2cde6582020-06-22 04:56:23 -0400609 // ResetOffset resets to the provided offset, alongside a metadata string that
610 // represents the state of the partition consumer at that point in time. Reset
611 // acts as a counterpart to MarkOffset, the difference being that it allows to
612 // reset an offset to an earlier or smaller value, where MarkOffset only
613 // allows incrementing the offset. cf MarkOffset for more details.
614 ResetOffset(topic string, partition int32, offset int64, metadata string)
615
616 // MarkMessage marks a message as consumed.
617 MarkMessage(msg *ConsumerMessage, metadata string)
618
619 // Context returns the session context.
620 Context() context.Context
621}
622
623type consumerGroupSession struct {
624 parent *consumerGroup
625 memberID string
626 generationID int32
627 handler ConsumerGroupHandler
628
629 claims map[string][]int32
630 offsets *offsetManager
631 ctx context.Context
632 cancel func()
633
634 waitGroup sync.WaitGroup
635 releaseOnce sync.Once
636 hbDying, hbDead chan none
637}
638
639func newConsumerGroupSession(ctx context.Context, parent *consumerGroup, claims map[string][]int32, memberID string, generationID int32, handler ConsumerGroupHandler) (*consumerGroupSession, error) {
640 // init offset manager
641 offsets, err := newOffsetManagerFromClient(parent.groupID, memberID, generationID, parent.client)
642 if err != nil {
643 return nil, err
644 }
645
646 // init context
647 ctx, cancel := context.WithCancel(ctx)
648
649 // init session
650 sess := &consumerGroupSession{
651 parent: parent,
652 memberID: memberID,
653 generationID: generationID,
654 handler: handler,
655 offsets: offsets,
656 claims: claims,
657 ctx: ctx,
658 cancel: cancel,
659 hbDying: make(chan none),
660 hbDead: make(chan none),
661 }
662
663 // start heartbeat loop
664 go sess.heartbeatLoop()
665
666 // create a POM for each claim
667 for topic, partitions := range claims {
668 for _, partition := range partitions {
669 pom, err := offsets.ManagePartition(topic, partition)
670 if err != nil {
671 _ = sess.release(false)
672 return nil, err
673 }
674
675 // handle POM errors
676 go func(topic string, partition int32) {
677 for err := range pom.Errors() {
678 sess.parent.handleError(err, topic, partition)
679 }
680 }(topic, partition)
681 }
682 }
683
684 // perform setup
685 if err := handler.Setup(sess); err != nil {
686 _ = sess.release(true)
687 return nil, err
688 }
689
690 // start consuming
691 for topic, partitions := range claims {
692 for _, partition := range partitions {
693 sess.waitGroup.Add(1)
694
695 go func(topic string, partition int32) {
696 defer sess.waitGroup.Done()
697
698 // cancel the as session as soon as the first
699 // goroutine exits
700 defer sess.cancel()
701
702 // consume a single topic/partition, blocking
703 sess.consume(topic, partition)
704 }(topic, partition)
705 }
706 }
707 return sess, nil
708}
709
710func (s *consumerGroupSession) Claims() map[string][]int32 { return s.claims }
711func (s *consumerGroupSession) MemberID() string { return s.memberID }
712func (s *consumerGroupSession) GenerationID() int32 { return s.generationID }
713
714func (s *consumerGroupSession) MarkOffset(topic string, partition int32, offset int64, metadata string) {
715 if pom := s.offsets.findPOM(topic, partition); pom != nil {
716 pom.MarkOffset(offset, metadata)
717 }
718}
719
kesavandc71914f2022-03-25 11:19:03 +0530720func (s *consumerGroupSession) Commit() {
721 s.offsets.Commit()
722}
723
kesavand2cde6582020-06-22 04:56:23 -0400724func (s *consumerGroupSession) ResetOffset(topic string, partition int32, offset int64, metadata string) {
725 if pom := s.offsets.findPOM(topic, partition); pom != nil {
726 pom.ResetOffset(offset, metadata)
727 }
728}
729
730func (s *consumerGroupSession) MarkMessage(msg *ConsumerMessage, metadata string) {
731 s.MarkOffset(msg.Topic, msg.Partition, msg.Offset+1, metadata)
732}
733
734func (s *consumerGroupSession) Context() context.Context {
735 return s.ctx
736}
737
738func (s *consumerGroupSession) consume(topic string, partition int32) {
739 // quick exit if rebalance is due
740 select {
741 case <-s.ctx.Done():
742 return
743 case <-s.parent.closed:
744 return
745 default:
746 }
747
748 // get next offset
749 offset := s.parent.config.Consumer.Offsets.Initial
750 if pom := s.offsets.findPOM(topic, partition); pom != nil {
751 offset, _ = pom.NextOffset()
752 }
753
754 // create new claim
755 claim, err := newConsumerGroupClaim(s, topic, partition, offset)
756 if err != nil {
757 s.parent.handleError(err, topic, partition)
758 return
759 }
760
761 // handle errors
762 go func() {
763 for err := range claim.Errors() {
764 s.parent.handleError(err, topic, partition)
765 }
766 }()
767
768 // trigger close when session is done
769 go func() {
770 select {
771 case <-s.ctx.Done():
772 case <-s.parent.closed:
773 }
774 claim.AsyncClose()
775 }()
776
777 // start processing
778 if err := s.handler.ConsumeClaim(s, claim); err != nil {
779 s.parent.handleError(err, topic, partition)
780 }
781
782 // ensure consumer is closed & drained
783 claim.AsyncClose()
784 for _, err := range claim.waitClosed() {
785 s.parent.handleError(err, topic, partition)
786 }
787}
788
789func (s *consumerGroupSession) release(withCleanup bool) (err error) {
790 // signal release, stop heartbeat
791 s.cancel()
792
793 // wait for consumers to exit
794 s.waitGroup.Wait()
795
796 // perform release
797 s.releaseOnce.Do(func() {
798 if withCleanup {
799 if e := s.handler.Cleanup(s); e != nil {
800 s.parent.handleError(e, "", -1)
801 err = e
802 }
803 }
804
805 if e := s.offsets.Close(); e != nil {
806 err = e
807 }
808
809 close(s.hbDying)
810 <-s.hbDead
811 })
812
kesavandc71914f2022-03-25 11:19:03 +0530813 Logger.Printf(
814 "consumergroup/session/%s/%d released\n",
815 s.MemberID(), s.GenerationID())
816
kesavand2cde6582020-06-22 04:56:23 -0400817 return
818}
819
820func (s *consumerGroupSession) heartbeatLoop() {
821 defer close(s.hbDead)
822 defer s.cancel() // trigger the end of the session on exit
kesavandc71914f2022-03-25 11:19:03 +0530823 defer func() {
824 Logger.Printf(
825 "consumergroup/session/%s/%d heartbeat loop stopped\n",
826 s.MemberID(), s.GenerationID())
827 }()
kesavand2cde6582020-06-22 04:56:23 -0400828
829 pause := time.NewTicker(s.parent.config.Consumer.Group.Heartbeat.Interval)
830 defer pause.Stop()
831
kesavandc71914f2022-03-25 11:19:03 +0530832 retryBackoff := time.NewTimer(s.parent.config.Metadata.Retry.Backoff)
833 defer retryBackoff.Stop()
834
kesavand2cde6582020-06-22 04:56:23 -0400835 retries := s.parent.config.Metadata.Retry.Max
836 for {
837 coordinator, err := s.parent.client.Coordinator(s.parent.groupID)
838 if err != nil {
839 if retries <= 0 {
840 s.parent.handleError(err, "", -1)
841 return
842 }
kesavandc71914f2022-03-25 11:19:03 +0530843 retryBackoff.Reset(s.parent.config.Metadata.Retry.Backoff)
kesavand2cde6582020-06-22 04:56:23 -0400844 select {
845 case <-s.hbDying:
846 return
kesavandc71914f2022-03-25 11:19:03 +0530847 case <-retryBackoff.C:
kesavand2cde6582020-06-22 04:56:23 -0400848 retries--
849 }
850 continue
851 }
852
853 resp, err := s.parent.heartbeatRequest(coordinator, s.memberID, s.generationID)
854 if err != nil {
855 _ = coordinator.Close()
856
857 if retries <= 0 {
858 s.parent.handleError(err, "", -1)
859 return
860 }
861
862 retries--
863 continue
864 }
865
866 switch resp.Err {
867 case ErrNoError:
868 retries = s.parent.config.Metadata.Retry.Max
kesavandc71914f2022-03-25 11:19:03 +0530869 case ErrRebalanceInProgress:
870 retries = s.parent.config.Metadata.Retry.Max
871 s.cancel()
872 case ErrUnknownMemberId, ErrIllegalGeneration:
kesavand2cde6582020-06-22 04:56:23 -0400873 return
874 default:
kesavandc71914f2022-03-25 11:19:03 +0530875 s.parent.handleError(resp.Err, "", -1)
kesavand2cde6582020-06-22 04:56:23 -0400876 return
877 }
878
879 select {
880 case <-pause.C:
881 case <-s.hbDying:
882 return
883 }
884 }
885}
886
887// --------------------------------------------------------------------
888
889// ConsumerGroupHandler instances are used to handle individual topic/partition claims.
890// It also provides hooks for your consumer group session life-cycle and allow you to
891// trigger logic before or after the consume loop(s).
892//
893// PLEASE NOTE that handlers are likely be called from several goroutines concurrently,
894// ensure that all state is safely protected against race conditions.
895type ConsumerGroupHandler interface {
896 // Setup is run at the beginning of a new session, before ConsumeClaim.
897 Setup(ConsumerGroupSession) error
898
899 // Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
900 // but before the offsets are committed for the very last time.
901 Cleanup(ConsumerGroupSession) error
902
903 // ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
904 // Once the Messages() channel is closed, the Handler must finish its processing
905 // loop and exit.
906 ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
907}
908
909// ConsumerGroupClaim processes Kafka messages from a given topic and partition within a consumer group.
910type ConsumerGroupClaim interface {
911 // Topic returns the consumed topic name.
912 Topic() string
913
914 // Partition returns the consumed partition.
915 Partition() int32
916
917 // InitialOffset returns the initial offset that was used as a starting point for this claim.
918 InitialOffset() int64
919
920 // HighWaterMarkOffset returns the high water mark offset of the partition,
921 // i.e. the offset that will be used for the next message that will be produced.
922 // You can use this to determine how far behind the processing is.
923 HighWaterMarkOffset() int64
924
925 // Messages returns the read channel for the messages that are returned by
926 // the broker. The messages channel will be closed when a new rebalance cycle
927 // is due. You must finish processing and mark offsets within
928 // Config.Consumer.Group.Session.Timeout before the topic/partition is eventually
929 // re-assigned to another group member.
930 Messages() <-chan *ConsumerMessage
931}
932
933type consumerGroupClaim struct {
934 topic string
935 partition int32
936 offset int64
937 PartitionConsumer
938}
939
940func newConsumerGroupClaim(sess *consumerGroupSession, topic string, partition int32, offset int64) (*consumerGroupClaim, error) {
941 pcm, err := sess.parent.consumer.ConsumePartition(topic, partition, offset)
942 if err == ErrOffsetOutOfRange {
943 offset = sess.parent.config.Consumer.Offsets.Initial
944 pcm, err = sess.parent.consumer.ConsumePartition(topic, partition, offset)
945 }
946 if err != nil {
947 return nil, err
948 }
949
950 go func() {
951 for err := range pcm.Errors() {
952 sess.parent.handleError(err, topic, partition)
953 }
954 }()
955
956 return &consumerGroupClaim{
957 topic: topic,
958 partition: partition,
959 offset: offset,
960 PartitionConsumer: pcm,
961 }, nil
962}
963
964func (c *consumerGroupClaim) Topic() string { return c.topic }
965func (c *consumerGroupClaim) Partition() int32 { return c.partition }
966func (c *consumerGroupClaim) InitialOffset() int64 { return c.offset }
967
968// Drains messages and errors, ensures the claim is fully closed.
969func (c *consumerGroupClaim) waitClosed() (errs ConsumerErrors) {
970 go func() {
971 for range c.Messages() {
972 }
973 }()
974
975 for err := range c.Errors() {
976 errs = append(errs, err)
977 }
978 return
979}