blob: f9cd172b4735f46f59bf28393c18e5b4d4e9b3b6 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "math"
7 "sync"
8 "sync/atomic"
9 "time"
10
11 "github.com/rcrowley/go-metrics"
12)
13
14// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15type ConsumerMessage struct {
16 Headers []*RecordHeader // only set if kafka is version 0.11+
17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19
20 Key, Value []byte
21 Topic string
22 Partition int32
23 Offset int64
24}
25
26// ConsumerError is what is provided to the user when an error occurs.
27// It wraps an error and includes the topic and partition.
28type ConsumerError struct {
29 Topic string
30 Partition int32
31 Err error
32}
33
34func (ce ConsumerError) Error() string {
35 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36}
37
khenaidoo7d3c5582021-08-11 18:09:44 -040038func (ce ConsumerError) Unwrap() error {
39 return ce.Err
40}
41
Holger Hildebrandtfa074992020-03-27 15:42:06 +000042// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
43// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
44// when stopping.
45type ConsumerErrors []*ConsumerError
46
47func (ce ConsumerErrors) Error() string {
48 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
49}
50
51// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
52// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
53// scope.
54type Consumer interface {
55 // Topics returns the set of available topics as retrieved from the cluster
56 // metadata. This method is the same as Client.Topics(), and is provided for
57 // convenience.
58 Topics() ([]string, error)
59
60 // Partitions returns the sorted list of all partition IDs for the given topic.
61 // This method is the same as Client.Partitions(), and is provided for convenience.
62 Partitions(topic string) ([]int32, error)
63
64 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
65 // the given offset. It will return an error if this Consumer is already consuming
66 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
67 // or OffsetOldest
68 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
69
70 // HighWaterMarks returns the current high water marks for each topic and partition.
71 // Consistency between partitions is not guaranteed since high water marks are updated separately.
72 HighWaterMarks() map[string]map[int32]int64
73
74 // Close shuts down the consumer. It must be called after all child
75 // PartitionConsumers have already been closed.
76 Close() error
77}
78
79type consumer struct {
80 conf *Config
81 children map[string]map[int32]*partitionConsumer
82 brokerConsumers map[*Broker]*brokerConsumer
83 client Client
84 lock sync.Mutex
85}
86
87// NewConsumer creates a new consumer using the given broker addresses and configuration.
88func NewConsumer(addrs []string, config *Config) (Consumer, error) {
89 client, err := NewClient(addrs, config)
90 if err != nil {
91 return nil, err
92 }
93 return newConsumer(client)
94}
95
96// NewConsumerFromClient creates a new consumer using the given client. It is still
97// necessary to call Close() on the underlying client when shutting down this consumer.
98func NewConsumerFromClient(client Client) (Consumer, error) {
99 // For clients passed in by the client, ensure we don't
100 // call Close() on it.
101 cli := &nopCloserClient{client}
102 return newConsumer(cli)
103}
104
105func newConsumer(client Client) (Consumer, error) {
106 // Check that we are not dealing with a closed Client before processing any other arguments
107 if client.Closed() {
108 return nil, ErrClosedClient
109 }
110
111 c := &consumer{
112 client: client,
113 conf: client.Config(),
114 children: make(map[string]map[int32]*partitionConsumer),
115 brokerConsumers: make(map[*Broker]*brokerConsumer),
116 }
117
118 return c, nil
119}
120
121func (c *consumer) Close() error {
122 return c.client.Close()
123}
124
125func (c *consumer) Topics() ([]string, error) {
126 return c.client.Topics()
127}
128
129func (c *consumer) Partitions(topic string) ([]int32, error) {
130 return c.client.Partitions(topic)
131}
132
133func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
134 child := &partitionConsumer{
135 consumer: c,
136 conf: c.conf,
137 topic: topic,
138 partition: partition,
139 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
140 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
141 feeder: make(chan *FetchResponse, 1),
142 trigger: make(chan none, 1),
143 dying: make(chan none),
144 fetchSize: c.conf.Consumer.Fetch.Default,
145 }
146
147 if err := child.chooseStartingOffset(offset); err != nil {
148 return nil, err
149 }
150
151 var leader *Broker
152 var err error
153 if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
154 return nil, err
155 }
156
157 if err := c.addChild(child); err != nil {
158 return nil, err
159 }
160
161 go withRecover(child.dispatcher)
162 go withRecover(child.responseFeeder)
163
164 child.broker = c.refBrokerConsumer(leader)
165 child.broker.input <- child
166
167 return child, nil
168}
169
170func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
171 c.lock.Lock()
172 defer c.lock.Unlock()
173
174 hwms := make(map[string]map[int32]int64)
175 for topic, p := range c.children {
176 hwm := make(map[int32]int64, len(p))
177 for partition, pc := range p {
178 hwm[partition] = pc.HighWaterMarkOffset()
179 }
180 hwms[topic] = hwm
181 }
182
183 return hwms
184}
185
186func (c *consumer) addChild(child *partitionConsumer) error {
187 c.lock.Lock()
188 defer c.lock.Unlock()
189
190 topicChildren := c.children[child.topic]
191 if topicChildren == nil {
192 topicChildren = make(map[int32]*partitionConsumer)
193 c.children[child.topic] = topicChildren
194 }
195
196 if topicChildren[child.partition] != nil {
197 return ConfigurationError("That topic/partition is already being consumed")
198 }
199
200 topicChildren[child.partition] = child
201 return nil
202}
203
204func (c *consumer) removeChild(child *partitionConsumer) {
205 c.lock.Lock()
206 defer c.lock.Unlock()
207
208 delete(c.children[child.topic], child.partition)
209}
210
211func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
212 c.lock.Lock()
213 defer c.lock.Unlock()
214
215 bc := c.brokerConsumers[broker]
216 if bc == nil {
217 bc = c.newBrokerConsumer(broker)
218 c.brokerConsumers[broker] = bc
219 }
220
221 bc.refs++
222
223 return bc
224}
225
226func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
227 c.lock.Lock()
228 defer c.lock.Unlock()
229
230 brokerWorker.refs--
231
232 if brokerWorker.refs == 0 {
233 close(brokerWorker.input)
234 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
235 delete(c.brokerConsumers, brokerWorker.broker)
236 }
237 }
238}
239
240func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
241 c.lock.Lock()
242 defer c.lock.Unlock()
243
244 delete(c.brokerConsumers, brokerWorker.broker)
245}
246
247// PartitionConsumer
248
249// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
250// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
251// of scope.
252//
253// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
254// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
255// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
256// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
257// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
258// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
259// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
260//
261// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
262// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
263// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
264// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
265// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
266type PartitionConsumer interface {
267 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
268 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
269 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
270 // this before calling Close on the underlying client.
271 AsyncClose()
272
273 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
274 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
275 // the Messages channel when this function is called, you will be competing with Close for messages; consider
276 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
277 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
278 Close() error
279
280 // Messages returns the read channel for the messages that are returned by
281 // the broker.
282 Messages() <-chan *ConsumerMessage
283
284 // Errors returns a read channel of errors that occurred during consuming, if
285 // enabled. By default, errors are logged and not returned over this channel.
286 // If you want to implement any custom error handling, set your config's
287 // Consumer.Return.Errors setting to true, and read from this channel.
288 Errors() <-chan *ConsumerError
289
290 // HighWaterMarkOffset returns the high water mark offset of the partition,
291 // i.e. the offset that will be used for the next message that will be produced.
292 // You can use this to determine how far behind the processing is.
293 HighWaterMarkOffset() int64
294}
295
296type partitionConsumer struct {
297 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
298
299 consumer *consumer
300 conf *Config
301 broker *brokerConsumer
302 messages chan *ConsumerMessage
303 errors chan *ConsumerError
304 feeder chan *FetchResponse
305
khenaidoo7d3c5582021-08-11 18:09:44 -0400306 preferredReadReplica int32
307
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000308 trigger, dying chan none
309 closeOnce sync.Once
310 topic string
311 partition int32
312 responseResult error
313 fetchSize int32
314 offset int64
315 retries int32
316}
317
318var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
319
320func (child *partitionConsumer) sendError(err error) {
321 cErr := &ConsumerError{
322 Topic: child.topic,
323 Partition: child.partition,
324 Err: err,
325 }
326
327 if child.conf.Consumer.Return.Errors {
328 child.errors <- cErr
329 } else {
330 Logger.Println(cErr)
331 }
332}
333
334func (child *partitionConsumer) computeBackoff() time.Duration {
335 if child.conf.Consumer.Retry.BackoffFunc != nil {
336 retries := atomic.AddInt32(&child.retries, 1)
337 return child.conf.Consumer.Retry.BackoffFunc(int(retries))
338 }
339 return child.conf.Consumer.Retry.Backoff
340}
341
342func (child *partitionConsumer) dispatcher() {
343 for range child.trigger {
344 select {
345 case <-child.dying:
346 close(child.trigger)
347 case <-time.After(child.computeBackoff()):
348 if child.broker != nil {
349 child.consumer.unrefBrokerConsumer(child.broker)
350 child.broker = nil
351 }
352
353 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
354 if err := child.dispatch(); err != nil {
355 child.sendError(err)
356 child.trigger <- none{}
357 }
358 }
359 }
360
361 if child.broker != nil {
362 child.consumer.unrefBrokerConsumer(child.broker)
363 }
364 child.consumer.removeChild(child)
365 close(child.feeder)
366}
367
khenaidoo7d3c5582021-08-11 18:09:44 -0400368func (child *partitionConsumer) preferredBroker() (*Broker, error) {
369 if child.preferredReadReplica >= 0 {
370 broker, err := child.consumer.client.Broker(child.preferredReadReplica)
371 if err == nil {
372 return broker, nil
373 }
374 }
375
376 // if prefered replica cannot be found fallback to leader
377 return child.consumer.client.Leader(child.topic, child.partition)
378}
379
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000380func (child *partitionConsumer) dispatch() error {
381 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
382 return err
383 }
384
khenaidoo7d3c5582021-08-11 18:09:44 -0400385 broker, err := child.preferredBroker()
386 if err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000387 return err
388 }
389
khenaidoo7d3c5582021-08-11 18:09:44 -0400390 child.broker = child.consumer.refBrokerConsumer(broker)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000391
392 child.broker.input <- child
393
394 return nil
395}
396
397func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
398 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
399 if err != nil {
400 return err
401 }
402 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
403 if err != nil {
404 return err
405 }
406
407 switch {
408 case offset == OffsetNewest:
409 child.offset = newestOffset
410 case offset == OffsetOldest:
411 child.offset = oldestOffset
412 case offset >= oldestOffset && offset <= newestOffset:
413 child.offset = offset
414 default:
415 return ErrOffsetOutOfRange
416 }
417
418 return nil
419}
420
421func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
422 return child.messages
423}
424
425func (child *partitionConsumer) Errors() <-chan *ConsumerError {
426 return child.errors
427}
428
429func (child *partitionConsumer) AsyncClose() {
430 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
431 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
432 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
433 // also just close itself)
434 child.closeOnce.Do(func() {
435 close(child.dying)
436 })
437}
438
439func (child *partitionConsumer) Close() error {
440 child.AsyncClose()
441
khenaidoo7d3c5582021-08-11 18:09:44 -0400442 var consumerErrors ConsumerErrors
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000443 for err := range child.errors {
khenaidoo7d3c5582021-08-11 18:09:44 -0400444 consumerErrors = append(consumerErrors, err)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000445 }
446
khenaidoo7d3c5582021-08-11 18:09:44 -0400447 if len(consumerErrors) > 0 {
448 return consumerErrors
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000449 }
450 return nil
451}
452
453func (child *partitionConsumer) HighWaterMarkOffset() int64 {
454 return atomic.LoadInt64(&child.highWaterMarkOffset)
455}
456
457func (child *partitionConsumer) responseFeeder() {
458 var msgs []*ConsumerMessage
459 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
460 firstAttempt := true
461
462feederLoop:
463 for response := range child.feeder {
464 msgs, child.responseResult = child.parseResponse(response)
465
466 if child.responseResult == nil {
467 atomic.StoreInt32(&child.retries, 0)
468 }
469
470 for i, msg := range msgs {
khenaidoo7d3c5582021-08-11 18:09:44 -0400471 child.interceptors(msg)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000472 messageSelect:
473 select {
474 case <-child.dying:
475 child.broker.acks.Done()
476 continue feederLoop
477 case child.messages <- msg:
478 firstAttempt = true
479 case <-expiryTicker.C:
480 if !firstAttempt {
481 child.responseResult = errTimedOut
482 child.broker.acks.Done()
483 remainingLoop:
484 for _, msg = range msgs[i:] {
khenaidoo7d3c5582021-08-11 18:09:44 -0400485 child.interceptors(msg)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000486 select {
487 case child.messages <- msg:
488 case <-child.dying:
489 break remainingLoop
490 }
491 }
492 child.broker.input <- child
493 continue feederLoop
494 } else {
495 // current message has not been sent, return to select
496 // statement
497 firstAttempt = false
498 goto messageSelect
499 }
500 }
501 }
502
503 child.broker.acks.Done()
504 }
505
506 expiryTicker.Stop()
507 close(child.messages)
508 close(child.errors)
509}
510
511func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
512 var messages []*ConsumerMessage
513 for _, msgBlock := range msgSet.Messages {
514 for _, msg := range msgBlock.Messages() {
515 offset := msg.Offset
516 timestamp := msg.Msg.Timestamp
517 if msg.Msg.Version >= 1 {
518 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
519 offset += baseOffset
520 if msg.Msg.LogAppendTime {
521 timestamp = msgBlock.Msg.Timestamp
522 }
523 }
524 if offset < child.offset {
525 continue
526 }
527 messages = append(messages, &ConsumerMessage{
528 Topic: child.topic,
529 Partition: child.partition,
530 Key: msg.Msg.Key,
531 Value: msg.Msg.Value,
532 Offset: offset,
533 Timestamp: timestamp,
534 BlockTimestamp: msgBlock.Msg.Timestamp,
535 })
536 child.offset = offset + 1
537 }
538 }
539 if len(messages) == 0 {
540 child.offset++
541 }
542 return messages, nil
543}
544
545func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
546 messages := make([]*ConsumerMessage, 0, len(batch.Records))
547
548 for _, rec := range batch.Records {
549 offset := batch.FirstOffset + rec.OffsetDelta
550 if offset < child.offset {
551 continue
552 }
553 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
554 if batch.LogAppendTime {
555 timestamp = batch.MaxTimestamp
556 }
557 messages = append(messages, &ConsumerMessage{
558 Topic: child.topic,
559 Partition: child.partition,
560 Key: rec.Key,
561 Value: rec.Value,
562 Offset: offset,
563 Timestamp: timestamp,
564 Headers: rec.Headers,
565 })
566 child.offset = offset + 1
567 }
568 if len(messages) == 0 {
569 child.offset++
570 }
571 return messages, nil
572}
573
574func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
575 var (
576 metricRegistry = child.conf.MetricRegistry
577 consumerBatchSizeMetric metrics.Histogram
578 )
579
580 if metricRegistry != nil {
581 consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
582 }
583
584 // If request was throttled and empty we log and return without error
585 if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
586 Logger.Printf(
587 "consumer/broker/%d FetchResponse throttled %v\n",
588 child.broker.broker.ID(), response.ThrottleTime)
589 return nil, nil
590 }
591
592 block := response.GetBlock(child.topic, child.partition)
593 if block == nil {
594 return nil, ErrIncompleteResponse
595 }
596
597 if block.Err != ErrNoError {
598 return nil, block.Err
599 }
600
601 nRecs, err := block.numRecords()
602 if err != nil {
603 return nil, err
604 }
605
606 consumerBatchSizeMetric.Update(int64(nRecs))
607
khenaidoo7d3c5582021-08-11 18:09:44 -0400608 child.preferredReadReplica = block.PreferredReadReplica
609
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000610 if nRecs == 0 {
611 partialTrailingMessage, err := block.isPartial()
612 if err != nil {
613 return nil, err
614 }
615 // We got no messages. If we got a trailing one then we need to ask for more data.
616 // Otherwise we just poll again and wait for one to be produced...
617 if partialTrailingMessage {
618 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
619 // we can't ask for more data, we've hit the configured limit
620 child.sendError(ErrMessageTooLarge)
621 child.offset++ // skip this one so we can keep processing future messages
622 } else {
623 child.fetchSize *= 2
624 // check int32 overflow
625 if child.fetchSize < 0 {
626 child.fetchSize = math.MaxInt32
627 }
628 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
629 child.fetchSize = child.conf.Consumer.Fetch.Max
630 }
631 }
632 }
633
634 return nil, nil
635 }
636
637 // we got messages, reset our fetch size in case it was increased for a previous request
638 child.fetchSize = child.conf.Consumer.Fetch.Default
639 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
640
641 // abortedProducerIDs contains producerID which message should be ignored as uncommitted
642 // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
643 // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
644 abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
645 abortedTransactions := block.getAbortedTransactions()
646
khenaidoo7d3c5582021-08-11 18:09:44 -0400647 var messages []*ConsumerMessage
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000648 for _, records := range block.RecordsSet {
649 switch records.recordsType {
650 case legacyRecords:
651 messageSetMessages, err := child.parseMessages(records.MsgSet)
652 if err != nil {
653 return nil, err
654 }
655
656 messages = append(messages, messageSetMessages...)
657 case defaultRecords:
658 // Consume remaining abortedTransaction up to last offset of current batch
659 for _, txn := range abortedTransactions {
660 if txn.FirstOffset > records.RecordBatch.LastOffset() {
661 break
662 }
663 abortedProducerIDs[txn.ProducerID] = struct{}{}
664 // Pop abortedTransactions so that we never add it again
665 abortedTransactions = abortedTransactions[1:]
666 }
667
668 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
669 if err != nil {
670 return nil, err
671 }
672
673 // Parse and commit offset but do not expose messages that are:
674 // - control records
675 // - part of an aborted transaction when set to `ReadCommitted`
676
677 // control record
678 isControl, err := records.isControl()
679 if err != nil {
680 // I don't know why there is this continue in case of error to begin with
681 // Safe bet is to ignore control messages if ReadUncommitted
682 // and block on them in case of error and ReadCommitted
683 if child.conf.Consumer.IsolationLevel == ReadCommitted {
684 return nil, err
685 }
686 continue
687 }
688 if isControl {
689 controlRecord, err := records.getControlRecord()
690 if err != nil {
691 return nil, err
692 }
693
694 if controlRecord.Type == ControlRecordAbort {
695 delete(abortedProducerIDs, records.RecordBatch.ProducerID)
696 }
697 continue
698 }
699
700 // filter aborted transactions
701 if child.conf.Consumer.IsolationLevel == ReadCommitted {
702 _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
703 if records.RecordBatch.IsTransactional && isAborted {
704 continue
705 }
706 }
707
708 messages = append(messages, recordBatchMessages...)
709 default:
710 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
711 }
712 }
713
714 return messages, nil
715}
716
khenaidoo7d3c5582021-08-11 18:09:44 -0400717func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
718 for _, interceptor := range child.conf.Consumer.Interceptors {
719 msg.safelyApplyInterceptor(interceptor)
720 }
721}
722
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000723type brokerConsumer struct {
724 consumer *consumer
725 broker *Broker
726 input chan *partitionConsumer
727 newSubscriptions chan []*partitionConsumer
728 subscriptions map[*partitionConsumer]none
729 wait chan none
730 acks sync.WaitGroup
731 refs int
732}
733
734func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
735 bc := &brokerConsumer{
736 consumer: c,
737 broker: broker,
738 input: make(chan *partitionConsumer),
739 newSubscriptions: make(chan []*partitionConsumer),
740 wait: make(chan none),
741 subscriptions: make(map[*partitionConsumer]none),
742 refs: 0,
743 }
744
745 go withRecover(bc.subscriptionManager)
746 go withRecover(bc.subscriptionConsumer)
747
748 return bc
749}
750
751// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
752// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
753// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
754// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
755// so the main goroutine can block waiting for work if it has none.
756func (bc *brokerConsumer) subscriptionManager() {
757 var buffer []*partitionConsumer
758
759 for {
760 if len(buffer) > 0 {
761 select {
762 case event, ok := <-bc.input:
763 if !ok {
764 goto done
765 }
766 buffer = append(buffer, event)
767 case bc.newSubscriptions <- buffer:
768 buffer = nil
769 case bc.wait <- none{}:
770 }
771 } else {
772 select {
773 case event, ok := <-bc.input:
774 if !ok {
775 goto done
776 }
777 buffer = append(buffer, event)
778 case bc.newSubscriptions <- nil:
779 }
780 }
781 }
782
783done:
784 close(bc.wait)
785 if len(buffer) > 0 {
786 bc.newSubscriptions <- buffer
787 }
788 close(bc.newSubscriptions)
789}
790
khenaidoo7d3c5582021-08-11 18:09:44 -0400791// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000792func (bc *brokerConsumer) subscriptionConsumer() {
793 <-bc.wait // wait for our first piece of work
794
795 for newSubscriptions := range bc.newSubscriptions {
796 bc.updateSubscriptions(newSubscriptions)
797
798 if len(bc.subscriptions) == 0 {
799 // We're about to be shut down or we're about to receive more subscriptions.
800 // Either way, the signal just hasn't propagated to our goroutine yet.
801 <-bc.wait
802 continue
803 }
804
805 response, err := bc.fetchNewMessages()
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000806 if err != nil {
807 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
808 bc.abort(err)
809 return
810 }
811
812 bc.acks.Add(len(bc.subscriptions))
813 for child := range bc.subscriptions {
814 child.feeder <- response
815 }
816 bc.acks.Wait()
817 bc.handleResponses()
818 }
819}
820
821func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
822 for _, child := range newSubscriptions {
823 bc.subscriptions[child] = none{}
824 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
825 }
826
827 for child := range bc.subscriptions {
828 select {
829 case <-child.dying:
830 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
831 close(child.trigger)
832 delete(bc.subscriptions, child)
833 default:
834 // no-op
835 }
836 }
837}
838
khenaidoo7d3c5582021-08-11 18:09:44 -0400839// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000840func (bc *brokerConsumer) handleResponses() {
841 for child := range bc.subscriptions {
842 result := child.responseResult
843 child.responseResult = nil
844
khenaidoo7d3c5582021-08-11 18:09:44 -0400845 if result == nil {
846 if preferredBroker, err := child.preferredBroker(); err == nil {
847 if bc.broker.ID() != preferredBroker.ID() {
848 // not an error but needs redispatching to consume from prefered replica
849 child.trigger <- none{}
850 delete(bc.subscriptions, child)
851 }
852 }
853 continue
854 }
855
856 // Discard any replica preference.
857 child.preferredReadReplica = -1
858
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000859 switch result {
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000860 case errTimedOut:
861 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
862 bc.broker.ID(), child.topic, child.partition)
863 delete(bc.subscriptions, child)
864 case ErrOffsetOutOfRange:
865 // there's no point in retrying this it will just fail the same way again
866 // shut it down and force the user to choose what to do
867 child.sendError(result)
868 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
869 close(child.trigger)
870 delete(bc.subscriptions, child)
871 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
872 // not an error, but does need redispatching
873 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
874 bc.broker.ID(), child.topic, child.partition, result)
875 child.trigger <- none{}
876 delete(bc.subscriptions, child)
877 default:
878 // dunno, tell the user and try redispatching
879 child.sendError(result)
880 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
881 bc.broker.ID(), child.topic, child.partition, result)
882 child.trigger <- none{}
883 delete(bc.subscriptions, child)
884 }
885 }
886}
887
888func (bc *brokerConsumer) abort(err error) {
889 bc.consumer.abandonBrokerConsumer(bc)
890 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
891
892 for child := range bc.subscriptions {
893 child.sendError(err)
894 child.trigger <- none{}
895 }
896
897 for newSubscriptions := range bc.newSubscriptions {
898 if len(newSubscriptions) == 0 {
899 <-bc.wait
900 continue
901 }
902 for _, child := range newSubscriptions {
903 child.sendError(err)
904 child.trigger <- none{}
905 }
906 }
907}
908
909func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
910 request := &FetchRequest{
911 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
912 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
913 }
914 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
915 request.Version = 1
916 }
917 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
918 request.Version = 2
919 }
920 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
921 request.Version = 3
922 request.MaxBytes = MaxResponseSize
923 }
924 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
925 request.Version = 4
926 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
927 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400928 if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
929 request.Version = 7
930 // We do not currently implement KIP-227 FetchSessions. Setting the id to 0
931 // and the epoch to -1 tells the broker not to generate as session ID we're going
932 // to just ignore anyway.
933 request.SessionID = 0
934 request.SessionEpoch = -1
935 }
936 if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
937 request.Version = 10
938 }
939 if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
940 request.Version = 11
941 request.RackID = bc.consumer.conf.RackID
942 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000943
944 for child := range bc.subscriptions {
945 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
946 }
947
948 return bc.broker.Fetch(request)
949}