blob: df13c29968673fed31ec335377cf0d2c122d7103 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "math"
7 "sync"
8 "sync/atomic"
9 "time"
10
11 "github.com/rcrowley/go-metrics"
12)
13
14// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15type ConsumerMessage struct {
16 Headers []*RecordHeader // only set if kafka is version 0.11+
17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19
20 Key, Value []byte
21 Topic string
22 Partition int32
23 Offset int64
24}
25
26// ConsumerError is what is provided to the user when an error occurs.
27// It wraps an error and includes the topic and partition.
28type ConsumerError struct {
29 Topic string
30 Partition int32
31 Err error
32}
33
34func (ce ConsumerError) Error() string {
35 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36}
37
kesavandc71914f2022-03-25 11:19:03 +053038func (ce ConsumerError) Unwrap() error {
39 return ce.Err
40}
41
kesavand2cde6582020-06-22 04:56:23 -040042// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
43// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
44// when stopping.
45type ConsumerErrors []*ConsumerError
46
47func (ce ConsumerErrors) Error() string {
48 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
49}
50
51// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
52// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
53// scope.
54type Consumer interface {
55 // Topics returns the set of available topics as retrieved from the cluster
56 // metadata. This method is the same as Client.Topics(), and is provided for
57 // convenience.
58 Topics() ([]string, error)
59
60 // Partitions returns the sorted list of all partition IDs for the given topic.
61 // This method is the same as Client.Partitions(), and is provided for convenience.
62 Partitions(topic string) ([]int32, error)
63
64 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
65 // the given offset. It will return an error if this Consumer is already consuming
66 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
67 // or OffsetOldest
68 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
69
70 // HighWaterMarks returns the current high water marks for each topic and partition.
71 // Consistency between partitions is not guaranteed since high water marks are updated separately.
72 HighWaterMarks() map[string]map[int32]int64
73
74 // Close shuts down the consumer. It must be called after all child
75 // PartitionConsumers have already been closed.
76 Close() error
kesavandc71914f2022-03-25 11:19:03 +053077
78 // Pause suspends fetching from the requested partitions. Future calls to the broker will not return any
79 // records from these partitions until they have been resumed using Resume()/ResumeAll().
80 // Note that this method does not affect partition subscription.
81 // In particular, it does not cause a group rebalance when automatic assignment is used.
82 Pause(topicPartitions map[string][]int32)
83
84 // Resume resumes specified partitions which have been paused with Pause()/PauseAll().
85 // New calls to the broker will return records from these partitions if there are any to be fetched.
86 Resume(topicPartitions map[string][]int32)
87
88 // Pause suspends fetching from all partitions. Future calls to the broker will not return any
89 // records from these partitions until they have been resumed using Resume()/ResumeAll().
90 // Note that this method does not affect partition subscription.
91 // In particular, it does not cause a group rebalance when automatic assignment is used.
92 PauseAll()
93
94 // Resume resumes all partitions which have been paused with Pause()/PauseAll().
95 // New calls to the broker will return records from these partitions if there are any to be fetched.
96 ResumeAll()
kesavand2cde6582020-06-22 04:56:23 -040097}
98
99type consumer struct {
100 conf *Config
101 children map[string]map[int32]*partitionConsumer
102 brokerConsumers map[*Broker]*brokerConsumer
103 client Client
104 lock sync.Mutex
105}
106
107// NewConsumer creates a new consumer using the given broker addresses and configuration.
108func NewConsumer(addrs []string, config *Config) (Consumer, error) {
109 client, err := NewClient(addrs, config)
110 if err != nil {
111 return nil, err
112 }
113 return newConsumer(client)
114}
115
116// NewConsumerFromClient creates a new consumer using the given client. It is still
117// necessary to call Close() on the underlying client when shutting down this consumer.
118func NewConsumerFromClient(client Client) (Consumer, error) {
119 // For clients passed in by the client, ensure we don't
120 // call Close() on it.
121 cli := &nopCloserClient{client}
122 return newConsumer(cli)
123}
124
125func newConsumer(client Client) (Consumer, error) {
126 // Check that we are not dealing with a closed Client before processing any other arguments
127 if client.Closed() {
128 return nil, ErrClosedClient
129 }
130
131 c := &consumer{
132 client: client,
133 conf: client.Config(),
134 children: make(map[string]map[int32]*partitionConsumer),
135 brokerConsumers: make(map[*Broker]*brokerConsumer),
136 }
137
138 return c, nil
139}
140
141func (c *consumer) Close() error {
142 return c.client.Close()
143}
144
145func (c *consumer) Topics() ([]string, error) {
146 return c.client.Topics()
147}
148
149func (c *consumer) Partitions(topic string) ([]int32, error) {
150 return c.client.Partitions(topic)
151}
152
153func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
154 child := &partitionConsumer{
kesavandc71914f2022-03-25 11:19:03 +0530155 consumer: c,
156 conf: c.conf,
157 topic: topic,
158 partition: partition,
159 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
160 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
161 feeder: make(chan *FetchResponse, 1),
162 preferredReadReplica: invalidPreferredReplicaID,
163 trigger: make(chan none, 1),
164 dying: make(chan none),
165 fetchSize: c.conf.Consumer.Fetch.Default,
kesavand2cde6582020-06-22 04:56:23 -0400166 }
167
168 if err := child.chooseStartingOffset(offset); err != nil {
169 return nil, err
170 }
171
172 var leader *Broker
173 var err error
174 if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
175 return nil, err
176 }
177
178 if err := c.addChild(child); err != nil {
179 return nil, err
180 }
181
182 go withRecover(child.dispatcher)
183 go withRecover(child.responseFeeder)
184
185 child.broker = c.refBrokerConsumer(leader)
186 child.broker.input <- child
187
188 return child, nil
189}
190
191func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
192 c.lock.Lock()
193 defer c.lock.Unlock()
194
195 hwms := make(map[string]map[int32]int64)
196 for topic, p := range c.children {
197 hwm := make(map[int32]int64, len(p))
198 for partition, pc := range p {
199 hwm[partition] = pc.HighWaterMarkOffset()
200 }
201 hwms[topic] = hwm
202 }
203
204 return hwms
205}
206
207func (c *consumer) addChild(child *partitionConsumer) error {
208 c.lock.Lock()
209 defer c.lock.Unlock()
210
211 topicChildren := c.children[child.topic]
212 if topicChildren == nil {
213 topicChildren = make(map[int32]*partitionConsumer)
214 c.children[child.topic] = topicChildren
215 }
216
217 if topicChildren[child.partition] != nil {
218 return ConfigurationError("That topic/partition is already being consumed")
219 }
220
221 topicChildren[child.partition] = child
222 return nil
223}
224
225func (c *consumer) removeChild(child *partitionConsumer) {
226 c.lock.Lock()
227 defer c.lock.Unlock()
228
229 delete(c.children[child.topic], child.partition)
230}
231
232func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
233 c.lock.Lock()
234 defer c.lock.Unlock()
235
236 bc := c.brokerConsumers[broker]
237 if bc == nil {
238 bc = c.newBrokerConsumer(broker)
239 c.brokerConsumers[broker] = bc
240 }
241
242 bc.refs++
243
244 return bc
245}
246
247func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
248 c.lock.Lock()
249 defer c.lock.Unlock()
250
251 brokerWorker.refs--
252
253 if brokerWorker.refs == 0 {
254 close(brokerWorker.input)
255 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
256 delete(c.brokerConsumers, brokerWorker.broker)
257 }
258 }
259}
260
261func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
262 c.lock.Lock()
263 defer c.lock.Unlock()
264
265 delete(c.brokerConsumers, brokerWorker.broker)
266}
267
kesavandc71914f2022-03-25 11:19:03 +0530268// Pause implements Consumer.
269func (c *consumer) Pause(topicPartitions map[string][]int32) {
270 c.lock.Lock()
271 defer c.lock.Unlock()
272
273 for topic, partitions := range topicPartitions {
274 for _, partition := range partitions {
275 if topicConsumers, ok := c.children[topic]; ok {
276 if partitionConsumer, ok := topicConsumers[partition]; ok {
277 partitionConsumer.Pause()
278 }
279 }
280 }
281 }
282}
283
284// Resume implements Consumer.
285func (c *consumer) Resume(topicPartitions map[string][]int32) {
286 c.lock.Lock()
287 defer c.lock.Unlock()
288
289 for topic, partitions := range topicPartitions {
290 for _, partition := range partitions {
291 if topicConsumers, ok := c.children[topic]; ok {
292 if partitionConsumer, ok := topicConsumers[partition]; ok {
293 partitionConsumer.Resume()
294 }
295 }
296 }
297 }
298}
299
300// PauseAll implements Consumer.
301func (c *consumer) PauseAll() {
302 c.lock.Lock()
303 defer c.lock.Unlock()
304
305 for _, partitions := range c.children {
306 for _, partitionConsumer := range partitions {
307 partitionConsumer.Pause()
308 }
309 }
310}
311
312// ResumeAll implements Consumer.
313func (c *consumer) ResumeAll() {
314 c.lock.Lock()
315 defer c.lock.Unlock()
316
317 for _, partitions := range c.children {
318 for _, partitionConsumer := range partitions {
319 partitionConsumer.Resume()
320 }
321 }
322}
323
kesavand2cde6582020-06-22 04:56:23 -0400324// PartitionConsumer
325
326// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
327// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
328// of scope.
329//
330// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
331// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
332// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
333// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
334// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
335// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
336// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
337//
338// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
339// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
340// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
341// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
342// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
343type PartitionConsumer interface {
344 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
345 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
346 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
347 // this before calling Close on the underlying client.
348 AsyncClose()
349
350 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
351 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
352 // the Messages channel when this function is called, you will be competing with Close for messages; consider
353 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
354 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
355 Close() error
356
357 // Messages returns the read channel for the messages that are returned by
358 // the broker.
359 Messages() <-chan *ConsumerMessage
360
361 // Errors returns a read channel of errors that occurred during consuming, if
362 // enabled. By default, errors are logged and not returned over this channel.
363 // If you want to implement any custom error handling, set your config's
364 // Consumer.Return.Errors setting to true, and read from this channel.
365 Errors() <-chan *ConsumerError
366
367 // HighWaterMarkOffset returns the high water mark offset of the partition,
368 // i.e. the offset that will be used for the next message that will be produced.
369 // You can use this to determine how far behind the processing is.
370 HighWaterMarkOffset() int64
kesavandc71914f2022-03-25 11:19:03 +0530371
372 // Pause suspends fetching from this partition. Future calls to the broker will not return
373 // any records from these partition until it have been resumed using Resume().
374 // Note that this method does not affect partition subscription.
375 // In particular, it does not cause a group rebalance when automatic assignment is used.
376 Pause()
377
378 // Resume resumes this partition which have been paused with Pause().
379 // New calls to the broker will return records from these partitions if there are any to be fetched.
380 // If the partition was not previously paused, this method is a no-op.
381 Resume()
382
383 // IsPaused indicates if this partition consumer is paused or not
384 IsPaused() bool
kesavand2cde6582020-06-22 04:56:23 -0400385}
386
387type partitionConsumer struct {
388 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
389
390 consumer *consumer
391 conf *Config
392 broker *brokerConsumer
393 messages chan *ConsumerMessage
394 errors chan *ConsumerError
395 feeder chan *FetchResponse
396
kesavandc71914f2022-03-25 11:19:03 +0530397 preferredReadReplica int32
398
kesavand2cde6582020-06-22 04:56:23 -0400399 trigger, dying chan none
400 closeOnce sync.Once
401 topic string
402 partition int32
403 responseResult error
404 fetchSize int32
405 offset int64
406 retries int32
kesavandc71914f2022-03-25 11:19:03 +0530407
408 paused int32
kesavand2cde6582020-06-22 04:56:23 -0400409}
410
411var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
412
413func (child *partitionConsumer) sendError(err error) {
414 cErr := &ConsumerError{
415 Topic: child.topic,
416 Partition: child.partition,
417 Err: err,
418 }
419
420 if child.conf.Consumer.Return.Errors {
421 child.errors <- cErr
422 } else {
423 Logger.Println(cErr)
424 }
425}
426
427func (child *partitionConsumer) computeBackoff() time.Duration {
428 if child.conf.Consumer.Retry.BackoffFunc != nil {
429 retries := atomic.AddInt32(&child.retries, 1)
430 return child.conf.Consumer.Retry.BackoffFunc(int(retries))
431 }
432 return child.conf.Consumer.Retry.Backoff
433}
434
435func (child *partitionConsumer) dispatcher() {
436 for range child.trigger {
437 select {
438 case <-child.dying:
439 close(child.trigger)
440 case <-time.After(child.computeBackoff()):
441 if child.broker != nil {
442 child.consumer.unrefBrokerConsumer(child.broker)
443 child.broker = nil
444 }
445
kesavand2cde6582020-06-22 04:56:23 -0400446 if err := child.dispatch(); err != nil {
447 child.sendError(err)
448 child.trigger <- none{}
449 }
450 }
451 }
452
453 if child.broker != nil {
454 child.consumer.unrefBrokerConsumer(child.broker)
455 }
456 child.consumer.removeChild(child)
457 close(child.feeder)
458}
459
kesavandc71914f2022-03-25 11:19:03 +0530460func (child *partitionConsumer) preferredBroker() (*Broker, error) {
461 if child.preferredReadReplica >= 0 {
462 broker, err := child.consumer.client.Broker(child.preferredReadReplica)
463 if err == nil {
464 return broker, nil
465 }
466 Logger.Printf(
467 "consumer/%s/%d failed to find active broker for preferred read replica %d - will fallback to leader",
468 child.topic, child.partition, child.preferredReadReplica)
469
470 // if we couldn't find it, discard the replica preference and trigger a
471 // metadata refresh whilst falling back to consuming from the leader again
472 child.preferredReadReplica = invalidPreferredReplicaID
473 _ = child.consumer.client.RefreshMetadata(child.topic)
474 }
475
476 // if preferred replica cannot be found fallback to leader
477 return child.consumer.client.Leader(child.topic, child.partition)
478}
479
kesavand2cde6582020-06-22 04:56:23 -0400480func (child *partitionConsumer) dispatch() error {
481 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
482 return err
483 }
484
kesavandc71914f2022-03-25 11:19:03 +0530485 broker, err := child.preferredBroker()
486 if err != nil {
kesavand2cde6582020-06-22 04:56:23 -0400487 return err
488 }
489
kesavandc71914f2022-03-25 11:19:03 +0530490 child.broker = child.consumer.refBrokerConsumer(broker)
kesavand2cde6582020-06-22 04:56:23 -0400491
492 child.broker.input <- child
493
494 return nil
495}
496
497func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
498 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
499 if err != nil {
500 return err
501 }
kesavandc71914f2022-03-25 11:19:03 +0530502
503 child.highWaterMarkOffset = newestOffset
504
kesavand2cde6582020-06-22 04:56:23 -0400505 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
506 if err != nil {
507 return err
508 }
509
510 switch {
511 case offset == OffsetNewest:
512 child.offset = newestOffset
513 case offset == OffsetOldest:
514 child.offset = oldestOffset
515 case offset >= oldestOffset && offset <= newestOffset:
516 child.offset = offset
517 default:
518 return ErrOffsetOutOfRange
519 }
520
521 return nil
522}
523
524func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
525 return child.messages
526}
527
528func (child *partitionConsumer) Errors() <-chan *ConsumerError {
529 return child.errors
530}
531
532func (child *partitionConsumer) AsyncClose() {
533 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
534 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
535 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
536 // also just close itself)
537 child.closeOnce.Do(func() {
538 close(child.dying)
539 })
540}
541
542func (child *partitionConsumer) Close() error {
543 child.AsyncClose()
544
kesavandc71914f2022-03-25 11:19:03 +0530545 var consumerErrors ConsumerErrors
kesavand2cde6582020-06-22 04:56:23 -0400546 for err := range child.errors {
kesavandc71914f2022-03-25 11:19:03 +0530547 consumerErrors = append(consumerErrors, err)
kesavand2cde6582020-06-22 04:56:23 -0400548 }
549
kesavandc71914f2022-03-25 11:19:03 +0530550 if len(consumerErrors) > 0 {
551 return consumerErrors
kesavand2cde6582020-06-22 04:56:23 -0400552 }
553 return nil
554}
555
556func (child *partitionConsumer) HighWaterMarkOffset() int64 {
557 return atomic.LoadInt64(&child.highWaterMarkOffset)
558}
559
560func (child *partitionConsumer) responseFeeder() {
561 var msgs []*ConsumerMessage
562 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
563 firstAttempt := true
564
565feederLoop:
566 for response := range child.feeder {
567 msgs, child.responseResult = child.parseResponse(response)
568
569 if child.responseResult == nil {
570 atomic.StoreInt32(&child.retries, 0)
571 }
572
573 for i, msg := range msgs {
kesavandc71914f2022-03-25 11:19:03 +0530574 child.interceptors(msg)
kesavand2cde6582020-06-22 04:56:23 -0400575 messageSelect:
576 select {
577 case <-child.dying:
578 child.broker.acks.Done()
579 continue feederLoop
580 case child.messages <- msg:
581 firstAttempt = true
582 case <-expiryTicker.C:
583 if !firstAttempt {
584 child.responseResult = errTimedOut
585 child.broker.acks.Done()
586 remainingLoop:
587 for _, msg = range msgs[i:] {
kesavandc71914f2022-03-25 11:19:03 +0530588 child.interceptors(msg)
kesavand2cde6582020-06-22 04:56:23 -0400589 select {
590 case child.messages <- msg:
591 case <-child.dying:
592 break remainingLoop
593 }
594 }
595 child.broker.input <- child
596 continue feederLoop
597 } else {
598 // current message has not been sent, return to select
599 // statement
600 firstAttempt = false
601 goto messageSelect
602 }
603 }
604 }
605
606 child.broker.acks.Done()
607 }
608
609 expiryTicker.Stop()
610 close(child.messages)
611 close(child.errors)
612}
613
614func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
615 var messages []*ConsumerMessage
616 for _, msgBlock := range msgSet.Messages {
617 for _, msg := range msgBlock.Messages() {
618 offset := msg.Offset
619 timestamp := msg.Msg.Timestamp
620 if msg.Msg.Version >= 1 {
621 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
622 offset += baseOffset
623 if msg.Msg.LogAppendTime {
624 timestamp = msgBlock.Msg.Timestamp
625 }
626 }
627 if offset < child.offset {
628 continue
629 }
630 messages = append(messages, &ConsumerMessage{
631 Topic: child.topic,
632 Partition: child.partition,
633 Key: msg.Msg.Key,
634 Value: msg.Msg.Value,
635 Offset: offset,
636 Timestamp: timestamp,
637 BlockTimestamp: msgBlock.Msg.Timestamp,
638 })
639 child.offset = offset + 1
640 }
641 }
642 if len(messages) == 0 {
643 child.offset++
644 }
645 return messages, nil
646}
647
648func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
649 messages := make([]*ConsumerMessage, 0, len(batch.Records))
650
651 for _, rec := range batch.Records {
652 offset := batch.FirstOffset + rec.OffsetDelta
653 if offset < child.offset {
654 continue
655 }
656 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
657 if batch.LogAppendTime {
658 timestamp = batch.MaxTimestamp
659 }
660 messages = append(messages, &ConsumerMessage{
661 Topic: child.topic,
662 Partition: child.partition,
663 Key: rec.Key,
664 Value: rec.Value,
665 Offset: offset,
666 Timestamp: timestamp,
667 Headers: rec.Headers,
668 })
669 child.offset = offset + 1
670 }
671 if len(messages) == 0 {
672 child.offset++
673 }
674 return messages, nil
675}
676
677func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
678 var (
679 metricRegistry = child.conf.MetricRegistry
680 consumerBatchSizeMetric metrics.Histogram
681 )
682
683 if metricRegistry != nil {
684 consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
685 }
686
687 // If request was throttled and empty we log and return without error
688 if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
689 Logger.Printf(
690 "consumer/broker/%d FetchResponse throttled %v\n",
691 child.broker.broker.ID(), response.ThrottleTime)
692 return nil, nil
693 }
694
695 block := response.GetBlock(child.topic, child.partition)
696 if block == nil {
697 return nil, ErrIncompleteResponse
698 }
699
700 if block.Err != ErrNoError {
701 return nil, block.Err
702 }
703
704 nRecs, err := block.numRecords()
705 if err != nil {
706 return nil, err
707 }
708
709 consumerBatchSizeMetric.Update(int64(nRecs))
710
kesavandc71914f2022-03-25 11:19:03 +0530711 if block.PreferredReadReplica != invalidPreferredReplicaID {
712 child.preferredReadReplica = block.PreferredReadReplica
713 }
714
kesavand2cde6582020-06-22 04:56:23 -0400715 if nRecs == 0 {
716 partialTrailingMessage, err := block.isPartial()
717 if err != nil {
718 return nil, err
719 }
720 // We got no messages. If we got a trailing one then we need to ask for more data.
721 // Otherwise we just poll again and wait for one to be produced...
722 if partialTrailingMessage {
723 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
724 // we can't ask for more data, we've hit the configured limit
725 child.sendError(ErrMessageTooLarge)
726 child.offset++ // skip this one so we can keep processing future messages
727 } else {
728 child.fetchSize *= 2
729 // check int32 overflow
730 if child.fetchSize < 0 {
731 child.fetchSize = math.MaxInt32
732 }
733 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
734 child.fetchSize = child.conf.Consumer.Fetch.Max
735 }
736 }
kesavandc71914f2022-03-25 11:19:03 +0530737 } else if block.LastRecordsBatchOffset != nil && *block.LastRecordsBatchOffset < block.HighWaterMarkOffset {
738 // check last record offset to avoid stuck if high watermark was not reached
739 Logger.Printf("consumer/broker/%d received batch with zero records but high watermark was not reached, topic %s, partition %d, offset %d\n", child.broker.broker.ID(), child.topic, child.partition, *block.LastRecordsBatchOffset)
740 child.offset = *block.LastRecordsBatchOffset + 1
kesavand2cde6582020-06-22 04:56:23 -0400741 }
742
743 return nil, nil
744 }
745
746 // we got messages, reset our fetch size in case it was increased for a previous request
747 child.fetchSize = child.conf.Consumer.Fetch.Default
748 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
749
750 // abortedProducerIDs contains producerID which message should be ignored as uncommitted
751 // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
752 // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
753 abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
754 abortedTransactions := block.getAbortedTransactions()
755
kesavandc71914f2022-03-25 11:19:03 +0530756 var messages []*ConsumerMessage
kesavand2cde6582020-06-22 04:56:23 -0400757 for _, records := range block.RecordsSet {
758 switch records.recordsType {
759 case legacyRecords:
760 messageSetMessages, err := child.parseMessages(records.MsgSet)
761 if err != nil {
762 return nil, err
763 }
764
765 messages = append(messages, messageSetMessages...)
766 case defaultRecords:
767 // Consume remaining abortedTransaction up to last offset of current batch
768 for _, txn := range abortedTransactions {
769 if txn.FirstOffset > records.RecordBatch.LastOffset() {
770 break
771 }
772 abortedProducerIDs[txn.ProducerID] = struct{}{}
773 // Pop abortedTransactions so that we never add it again
774 abortedTransactions = abortedTransactions[1:]
775 }
776
777 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
778 if err != nil {
779 return nil, err
780 }
781
782 // Parse and commit offset but do not expose messages that are:
783 // - control records
784 // - part of an aborted transaction when set to `ReadCommitted`
785
786 // control record
787 isControl, err := records.isControl()
788 if err != nil {
789 // I don't know why there is this continue in case of error to begin with
790 // Safe bet is to ignore control messages if ReadUncommitted
791 // and block on them in case of error and ReadCommitted
792 if child.conf.Consumer.IsolationLevel == ReadCommitted {
793 return nil, err
794 }
795 continue
796 }
797 if isControl {
798 controlRecord, err := records.getControlRecord()
799 if err != nil {
800 return nil, err
801 }
802
803 if controlRecord.Type == ControlRecordAbort {
804 delete(abortedProducerIDs, records.RecordBatch.ProducerID)
805 }
806 continue
807 }
808
809 // filter aborted transactions
810 if child.conf.Consumer.IsolationLevel == ReadCommitted {
811 _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
812 if records.RecordBatch.IsTransactional && isAborted {
813 continue
814 }
815 }
816
817 messages = append(messages, recordBatchMessages...)
818 default:
819 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
820 }
821 }
822
823 return messages, nil
824}
825
kesavandc71914f2022-03-25 11:19:03 +0530826func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
827 for _, interceptor := range child.conf.Consumer.Interceptors {
828 msg.safelyApplyInterceptor(interceptor)
829 }
830}
831
832// Pause implements PartitionConsumer.
833func (child *partitionConsumer) Pause() {
834 atomic.StoreInt32(&child.paused, 1)
835}
836
837// Resume implements PartitionConsumer.
838func (child *partitionConsumer) Resume() {
839 atomic.StoreInt32(&child.paused, 0)
840}
841
842// IsPaused implements PartitionConsumer.
843func (child *partitionConsumer) IsPaused() bool {
844 return atomic.LoadInt32(&child.paused) == 1
845}
846
kesavand2cde6582020-06-22 04:56:23 -0400847type brokerConsumer struct {
848 consumer *consumer
849 broker *Broker
850 input chan *partitionConsumer
851 newSubscriptions chan []*partitionConsumer
852 subscriptions map[*partitionConsumer]none
853 wait chan none
854 acks sync.WaitGroup
855 refs int
856}
857
858func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
859 bc := &brokerConsumer{
860 consumer: c,
861 broker: broker,
862 input: make(chan *partitionConsumer),
863 newSubscriptions: make(chan []*partitionConsumer),
864 wait: make(chan none),
865 subscriptions: make(map[*partitionConsumer]none),
866 refs: 0,
867 }
868
869 go withRecover(bc.subscriptionManager)
870 go withRecover(bc.subscriptionConsumer)
871
872 return bc
873}
874
875// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
876// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
877// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
878// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
879// so the main goroutine can block waiting for work if it has none.
880func (bc *brokerConsumer) subscriptionManager() {
881 var buffer []*partitionConsumer
882
883 for {
884 if len(buffer) > 0 {
885 select {
886 case event, ok := <-bc.input:
887 if !ok {
888 goto done
889 }
890 buffer = append(buffer, event)
891 case bc.newSubscriptions <- buffer:
892 buffer = nil
893 case bc.wait <- none{}:
894 }
895 } else {
896 select {
897 case event, ok := <-bc.input:
898 if !ok {
899 goto done
900 }
901 buffer = append(buffer, event)
902 case bc.newSubscriptions <- nil:
903 }
904 }
905 }
906
907done:
908 close(bc.wait)
909 if len(buffer) > 0 {
910 bc.newSubscriptions <- buffer
911 }
912 close(bc.newSubscriptions)
913}
914
kesavandc71914f2022-03-25 11:19:03 +0530915// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
kesavand2cde6582020-06-22 04:56:23 -0400916func (bc *brokerConsumer) subscriptionConsumer() {
917 <-bc.wait // wait for our first piece of work
918
919 for newSubscriptions := range bc.newSubscriptions {
920 bc.updateSubscriptions(newSubscriptions)
921
922 if len(bc.subscriptions) == 0 {
923 // We're about to be shut down or we're about to receive more subscriptions.
924 // Either way, the signal just hasn't propagated to our goroutine yet.
925 <-bc.wait
926 continue
927 }
928
929 response, err := bc.fetchNewMessages()
kesavand2cde6582020-06-22 04:56:23 -0400930 if err != nil {
931 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
932 bc.abort(err)
933 return
934 }
935
936 bc.acks.Add(len(bc.subscriptions))
937 for child := range bc.subscriptions {
938 child.feeder <- response
939 }
940 bc.acks.Wait()
941 bc.handleResponses()
942 }
943}
944
945func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
946 for _, child := range newSubscriptions {
947 bc.subscriptions[child] = none{}
948 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
949 }
950
951 for child := range bc.subscriptions {
952 select {
953 case <-child.dying:
954 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
955 close(child.trigger)
956 delete(bc.subscriptions, child)
957 default:
958 // no-op
959 }
960 }
961}
962
kesavandc71914f2022-03-25 11:19:03 +0530963// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
kesavand2cde6582020-06-22 04:56:23 -0400964func (bc *brokerConsumer) handleResponses() {
965 for child := range bc.subscriptions {
966 result := child.responseResult
967 child.responseResult = nil
968
kesavandc71914f2022-03-25 11:19:03 +0530969 if result == nil {
970 if preferredBroker, err := child.preferredBroker(); err == nil {
971 if bc.broker.ID() != preferredBroker.ID() {
972 // not an error but needs redispatching to consume from preferred replica
973 Logger.Printf(
974 "consumer/broker/%d abandoned in favor of preferred replica broker/%d\n",
975 bc.broker.ID(), preferredBroker.ID())
976 child.trigger <- none{}
977 delete(bc.subscriptions, child)
978 }
979 }
980 continue
981 }
982
983 // Discard any replica preference.
984 child.preferredReadReplica = invalidPreferredReplicaID
985
kesavand2cde6582020-06-22 04:56:23 -0400986 switch result {
kesavand2cde6582020-06-22 04:56:23 -0400987 case errTimedOut:
988 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
989 bc.broker.ID(), child.topic, child.partition)
990 delete(bc.subscriptions, child)
991 case ErrOffsetOutOfRange:
992 // there's no point in retrying this it will just fail the same way again
993 // shut it down and force the user to choose what to do
994 child.sendError(result)
995 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
996 close(child.trigger)
997 delete(bc.subscriptions, child)
998 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
999 // not an error, but does need redispatching
1000 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1001 bc.broker.ID(), child.topic, child.partition, result)
1002 child.trigger <- none{}
1003 delete(bc.subscriptions, child)
1004 default:
1005 // dunno, tell the user and try redispatching
1006 child.sendError(result)
1007 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
1008 bc.broker.ID(), child.topic, child.partition, result)
1009 child.trigger <- none{}
1010 delete(bc.subscriptions, child)
1011 }
1012 }
1013}
1014
1015func (bc *brokerConsumer) abort(err error) {
1016 bc.consumer.abandonBrokerConsumer(bc)
1017 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
1018
1019 for child := range bc.subscriptions {
1020 child.sendError(err)
1021 child.trigger <- none{}
1022 }
1023
1024 for newSubscriptions := range bc.newSubscriptions {
1025 if len(newSubscriptions) == 0 {
1026 <-bc.wait
1027 continue
1028 }
1029 for _, child := range newSubscriptions {
1030 child.sendError(err)
1031 child.trigger <- none{}
1032 }
1033 }
1034}
1035
1036func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
1037 request := &FetchRequest{
1038 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
1039 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
1040 }
1041 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
1042 request.Version = 1
1043 }
1044 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
1045 request.Version = 2
1046 }
1047 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
1048 request.Version = 3
1049 request.MaxBytes = MaxResponseSize
1050 }
1051 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
1052 request.Version = 4
1053 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
1054 }
kesavandc71914f2022-03-25 11:19:03 +05301055 if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
1056 request.Version = 7
1057 // We do not currently implement KIP-227 FetchSessions. Setting the id to 0
1058 // and the epoch to -1 tells the broker not to generate as session ID we're going
1059 // to just ignore anyway.
1060 request.SessionID = 0
1061 request.SessionEpoch = -1
1062 }
1063 if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
1064 request.Version = 10
1065 }
1066 if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
1067 request.Version = 11
1068 request.RackID = bc.consumer.conf.RackID
1069 }
kesavand2cde6582020-06-22 04:56:23 -04001070
1071 for child := range bc.subscriptions {
kesavandc71914f2022-03-25 11:19:03 +05301072 if !child.IsPaused() {
1073 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
1074 }
kesavand2cde6582020-06-22 04:56:23 -04001075 }
1076
1077 return bc.broker.Fetch(request)
1078}