blob: f9cd172b4735f46f59bf28393c18e5b4d4e9b3b6 [file] [log] [blame]
Elia Battiston4750d3c2022-07-14 13:24:56 +00001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "math"
7 "sync"
8 "sync/atomic"
9 "time"
10
11 "github.com/rcrowley/go-metrics"
12)
13
14// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15type ConsumerMessage struct {
16 Headers []*RecordHeader // only set if kafka is version 0.11+
17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19
20 Key, Value []byte
21 Topic string
22 Partition int32
23 Offset int64
24}
25
26// ConsumerError is what is provided to the user when an error occurs.
27// It wraps an error and includes the topic and partition.
28type ConsumerError struct {
29 Topic string
30 Partition int32
31 Err error
32}
33
34func (ce ConsumerError) Error() string {
35 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36}
37
38func (ce ConsumerError) Unwrap() error {
39 return ce.Err
40}
41
42// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
43// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
44// when stopping.
45type ConsumerErrors []*ConsumerError
46
47func (ce ConsumerErrors) Error() string {
48 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
49}
50
51// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
52// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
53// scope.
54type Consumer interface {
55 // Topics returns the set of available topics as retrieved from the cluster
56 // metadata. This method is the same as Client.Topics(), and is provided for
57 // convenience.
58 Topics() ([]string, error)
59
60 // Partitions returns the sorted list of all partition IDs for the given topic.
61 // This method is the same as Client.Partitions(), and is provided for convenience.
62 Partitions(topic string) ([]int32, error)
63
64 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
65 // the given offset. It will return an error if this Consumer is already consuming
66 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
67 // or OffsetOldest
68 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
69
70 // HighWaterMarks returns the current high water marks for each topic and partition.
71 // Consistency between partitions is not guaranteed since high water marks are updated separately.
72 HighWaterMarks() map[string]map[int32]int64
73
74 // Close shuts down the consumer. It must be called after all child
75 // PartitionConsumers have already been closed.
76 Close() error
77}
78
79type consumer struct {
80 conf *Config
81 children map[string]map[int32]*partitionConsumer
82 brokerConsumers map[*Broker]*brokerConsumer
83 client Client
84 lock sync.Mutex
85}
86
87// NewConsumer creates a new consumer using the given broker addresses and configuration.
88func NewConsumer(addrs []string, config *Config) (Consumer, error) {
89 client, err := NewClient(addrs, config)
90 if err != nil {
91 return nil, err
92 }
93 return newConsumer(client)
94}
95
96// NewConsumerFromClient creates a new consumer using the given client. It is still
97// necessary to call Close() on the underlying client when shutting down this consumer.
98func NewConsumerFromClient(client Client) (Consumer, error) {
99 // For clients passed in by the client, ensure we don't
100 // call Close() on it.
101 cli := &nopCloserClient{client}
102 return newConsumer(cli)
103}
104
105func newConsumer(client Client) (Consumer, error) {
106 // Check that we are not dealing with a closed Client before processing any other arguments
107 if client.Closed() {
108 return nil, ErrClosedClient
109 }
110
111 c := &consumer{
112 client: client,
113 conf: client.Config(),
114 children: make(map[string]map[int32]*partitionConsumer),
115 brokerConsumers: make(map[*Broker]*brokerConsumer),
116 }
117
118 return c, nil
119}
120
121func (c *consumer) Close() error {
122 return c.client.Close()
123}
124
125func (c *consumer) Topics() ([]string, error) {
126 return c.client.Topics()
127}
128
129func (c *consumer) Partitions(topic string) ([]int32, error) {
130 return c.client.Partitions(topic)
131}
132
133func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
134 child := &partitionConsumer{
135 consumer: c,
136 conf: c.conf,
137 topic: topic,
138 partition: partition,
139 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
140 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
141 feeder: make(chan *FetchResponse, 1),
142 trigger: make(chan none, 1),
143 dying: make(chan none),
144 fetchSize: c.conf.Consumer.Fetch.Default,
145 }
146
147 if err := child.chooseStartingOffset(offset); err != nil {
148 return nil, err
149 }
150
151 var leader *Broker
152 var err error
153 if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
154 return nil, err
155 }
156
157 if err := c.addChild(child); err != nil {
158 return nil, err
159 }
160
161 go withRecover(child.dispatcher)
162 go withRecover(child.responseFeeder)
163
164 child.broker = c.refBrokerConsumer(leader)
165 child.broker.input <- child
166
167 return child, nil
168}
169
170func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
171 c.lock.Lock()
172 defer c.lock.Unlock()
173
174 hwms := make(map[string]map[int32]int64)
175 for topic, p := range c.children {
176 hwm := make(map[int32]int64, len(p))
177 for partition, pc := range p {
178 hwm[partition] = pc.HighWaterMarkOffset()
179 }
180 hwms[topic] = hwm
181 }
182
183 return hwms
184}
185
186func (c *consumer) addChild(child *partitionConsumer) error {
187 c.lock.Lock()
188 defer c.lock.Unlock()
189
190 topicChildren := c.children[child.topic]
191 if topicChildren == nil {
192 topicChildren = make(map[int32]*partitionConsumer)
193 c.children[child.topic] = topicChildren
194 }
195
196 if topicChildren[child.partition] != nil {
197 return ConfigurationError("That topic/partition is already being consumed")
198 }
199
200 topicChildren[child.partition] = child
201 return nil
202}
203
204func (c *consumer) removeChild(child *partitionConsumer) {
205 c.lock.Lock()
206 defer c.lock.Unlock()
207
208 delete(c.children[child.topic], child.partition)
209}
210
211func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
212 c.lock.Lock()
213 defer c.lock.Unlock()
214
215 bc := c.brokerConsumers[broker]
216 if bc == nil {
217 bc = c.newBrokerConsumer(broker)
218 c.brokerConsumers[broker] = bc
219 }
220
221 bc.refs++
222
223 return bc
224}
225
226func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
227 c.lock.Lock()
228 defer c.lock.Unlock()
229
230 brokerWorker.refs--
231
232 if brokerWorker.refs == 0 {
233 close(brokerWorker.input)
234 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
235 delete(c.brokerConsumers, brokerWorker.broker)
236 }
237 }
238}
239
240func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
241 c.lock.Lock()
242 defer c.lock.Unlock()
243
244 delete(c.brokerConsumers, brokerWorker.broker)
245}
246
247// PartitionConsumer
248
249// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
250// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
251// of scope.
252//
253// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
254// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
255// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
256// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
257// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
258// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
259// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
260//
261// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
262// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
263// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
264// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
265// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
266type PartitionConsumer interface {
267 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
268 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
269 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
270 // this before calling Close on the underlying client.
271 AsyncClose()
272
273 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
274 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
275 // the Messages channel when this function is called, you will be competing with Close for messages; consider
276 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
277 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
278 Close() error
279
280 // Messages returns the read channel for the messages that are returned by
281 // the broker.
282 Messages() <-chan *ConsumerMessage
283
284 // Errors returns a read channel of errors that occurred during consuming, if
285 // enabled. By default, errors are logged and not returned over this channel.
286 // If you want to implement any custom error handling, set your config's
287 // Consumer.Return.Errors setting to true, and read from this channel.
288 Errors() <-chan *ConsumerError
289
290 // HighWaterMarkOffset returns the high water mark offset of the partition,
291 // i.e. the offset that will be used for the next message that will be produced.
292 // You can use this to determine how far behind the processing is.
293 HighWaterMarkOffset() int64
294}
295
296type partitionConsumer struct {
297 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
298
299 consumer *consumer
300 conf *Config
301 broker *brokerConsumer
302 messages chan *ConsumerMessage
303 errors chan *ConsumerError
304 feeder chan *FetchResponse
305
306 preferredReadReplica int32
307
308 trigger, dying chan none
309 closeOnce sync.Once
310 topic string
311 partition int32
312 responseResult error
313 fetchSize int32
314 offset int64
315 retries int32
316}
317
318var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
319
320func (child *partitionConsumer) sendError(err error) {
321 cErr := &ConsumerError{
322 Topic: child.topic,
323 Partition: child.partition,
324 Err: err,
325 }
326
327 if child.conf.Consumer.Return.Errors {
328 child.errors <- cErr
329 } else {
330 Logger.Println(cErr)
331 }
332}
333
334func (child *partitionConsumer) computeBackoff() time.Duration {
335 if child.conf.Consumer.Retry.BackoffFunc != nil {
336 retries := atomic.AddInt32(&child.retries, 1)
337 return child.conf.Consumer.Retry.BackoffFunc(int(retries))
338 }
339 return child.conf.Consumer.Retry.Backoff
340}
341
342func (child *partitionConsumer) dispatcher() {
343 for range child.trigger {
344 select {
345 case <-child.dying:
346 close(child.trigger)
347 case <-time.After(child.computeBackoff()):
348 if child.broker != nil {
349 child.consumer.unrefBrokerConsumer(child.broker)
350 child.broker = nil
351 }
352
353 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
354 if err := child.dispatch(); err != nil {
355 child.sendError(err)
356 child.trigger <- none{}
357 }
358 }
359 }
360
361 if child.broker != nil {
362 child.consumer.unrefBrokerConsumer(child.broker)
363 }
364 child.consumer.removeChild(child)
365 close(child.feeder)
366}
367
368func (child *partitionConsumer) preferredBroker() (*Broker, error) {
369 if child.preferredReadReplica >= 0 {
370 broker, err := child.consumer.client.Broker(child.preferredReadReplica)
371 if err == nil {
372 return broker, nil
373 }
374 }
375
376 // if prefered replica cannot be found fallback to leader
377 return child.consumer.client.Leader(child.topic, child.partition)
378}
379
380func (child *partitionConsumer) dispatch() error {
381 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
382 return err
383 }
384
385 broker, err := child.preferredBroker()
386 if err != nil {
387 return err
388 }
389
390 child.broker = child.consumer.refBrokerConsumer(broker)
391
392 child.broker.input <- child
393
394 return nil
395}
396
397func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
398 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
399 if err != nil {
400 return err
401 }
402 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
403 if err != nil {
404 return err
405 }
406
407 switch {
408 case offset == OffsetNewest:
409 child.offset = newestOffset
410 case offset == OffsetOldest:
411 child.offset = oldestOffset
412 case offset >= oldestOffset && offset <= newestOffset:
413 child.offset = offset
414 default:
415 return ErrOffsetOutOfRange
416 }
417
418 return nil
419}
420
421func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
422 return child.messages
423}
424
425func (child *partitionConsumer) Errors() <-chan *ConsumerError {
426 return child.errors
427}
428
429func (child *partitionConsumer) AsyncClose() {
430 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
431 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
432 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
433 // also just close itself)
434 child.closeOnce.Do(func() {
435 close(child.dying)
436 })
437}
438
439func (child *partitionConsumer) Close() error {
440 child.AsyncClose()
441
442 var consumerErrors ConsumerErrors
443 for err := range child.errors {
444 consumerErrors = append(consumerErrors, err)
445 }
446
447 if len(consumerErrors) > 0 {
448 return consumerErrors
449 }
450 return nil
451}
452
453func (child *partitionConsumer) HighWaterMarkOffset() int64 {
454 return atomic.LoadInt64(&child.highWaterMarkOffset)
455}
456
457func (child *partitionConsumer) responseFeeder() {
458 var msgs []*ConsumerMessage
459 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
460 firstAttempt := true
461
462feederLoop:
463 for response := range child.feeder {
464 msgs, child.responseResult = child.parseResponse(response)
465
466 if child.responseResult == nil {
467 atomic.StoreInt32(&child.retries, 0)
468 }
469
470 for i, msg := range msgs {
471 child.interceptors(msg)
472 messageSelect:
473 select {
474 case <-child.dying:
475 child.broker.acks.Done()
476 continue feederLoop
477 case child.messages <- msg:
478 firstAttempt = true
479 case <-expiryTicker.C:
480 if !firstAttempt {
481 child.responseResult = errTimedOut
482 child.broker.acks.Done()
483 remainingLoop:
484 for _, msg = range msgs[i:] {
485 child.interceptors(msg)
486 select {
487 case child.messages <- msg:
488 case <-child.dying:
489 break remainingLoop
490 }
491 }
492 child.broker.input <- child
493 continue feederLoop
494 } else {
495 // current message has not been sent, return to select
496 // statement
497 firstAttempt = false
498 goto messageSelect
499 }
500 }
501 }
502
503 child.broker.acks.Done()
504 }
505
506 expiryTicker.Stop()
507 close(child.messages)
508 close(child.errors)
509}
510
511func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
512 var messages []*ConsumerMessage
513 for _, msgBlock := range msgSet.Messages {
514 for _, msg := range msgBlock.Messages() {
515 offset := msg.Offset
516 timestamp := msg.Msg.Timestamp
517 if msg.Msg.Version >= 1 {
518 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
519 offset += baseOffset
520 if msg.Msg.LogAppendTime {
521 timestamp = msgBlock.Msg.Timestamp
522 }
523 }
524 if offset < child.offset {
525 continue
526 }
527 messages = append(messages, &ConsumerMessage{
528 Topic: child.topic,
529 Partition: child.partition,
530 Key: msg.Msg.Key,
531 Value: msg.Msg.Value,
532 Offset: offset,
533 Timestamp: timestamp,
534 BlockTimestamp: msgBlock.Msg.Timestamp,
535 })
536 child.offset = offset + 1
537 }
538 }
539 if len(messages) == 0 {
540 child.offset++
541 }
542 return messages, nil
543}
544
545func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
546 messages := make([]*ConsumerMessage, 0, len(batch.Records))
547
548 for _, rec := range batch.Records {
549 offset := batch.FirstOffset + rec.OffsetDelta
550 if offset < child.offset {
551 continue
552 }
553 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
554 if batch.LogAppendTime {
555 timestamp = batch.MaxTimestamp
556 }
557 messages = append(messages, &ConsumerMessage{
558 Topic: child.topic,
559 Partition: child.partition,
560 Key: rec.Key,
561 Value: rec.Value,
562 Offset: offset,
563 Timestamp: timestamp,
564 Headers: rec.Headers,
565 })
566 child.offset = offset + 1
567 }
568 if len(messages) == 0 {
569 child.offset++
570 }
571 return messages, nil
572}
573
574func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
575 var (
576 metricRegistry = child.conf.MetricRegistry
577 consumerBatchSizeMetric metrics.Histogram
578 )
579
580 if metricRegistry != nil {
581 consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
582 }
583
584 // If request was throttled and empty we log and return without error
585 if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
586 Logger.Printf(
587 "consumer/broker/%d FetchResponse throttled %v\n",
588 child.broker.broker.ID(), response.ThrottleTime)
589 return nil, nil
590 }
591
592 block := response.GetBlock(child.topic, child.partition)
593 if block == nil {
594 return nil, ErrIncompleteResponse
595 }
596
597 if block.Err != ErrNoError {
598 return nil, block.Err
599 }
600
601 nRecs, err := block.numRecords()
602 if err != nil {
603 return nil, err
604 }
605
606 consumerBatchSizeMetric.Update(int64(nRecs))
607
608 child.preferredReadReplica = block.PreferredReadReplica
609
610 if nRecs == 0 {
611 partialTrailingMessage, err := block.isPartial()
612 if err != nil {
613 return nil, err
614 }
615 // We got no messages. If we got a trailing one then we need to ask for more data.
616 // Otherwise we just poll again and wait for one to be produced...
617 if partialTrailingMessage {
618 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
619 // we can't ask for more data, we've hit the configured limit
620 child.sendError(ErrMessageTooLarge)
621 child.offset++ // skip this one so we can keep processing future messages
622 } else {
623 child.fetchSize *= 2
624 // check int32 overflow
625 if child.fetchSize < 0 {
626 child.fetchSize = math.MaxInt32
627 }
628 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
629 child.fetchSize = child.conf.Consumer.Fetch.Max
630 }
631 }
632 }
633
634 return nil, nil
635 }
636
637 // we got messages, reset our fetch size in case it was increased for a previous request
638 child.fetchSize = child.conf.Consumer.Fetch.Default
639 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
640
641 // abortedProducerIDs contains producerID which message should be ignored as uncommitted
642 // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
643 // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
644 abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
645 abortedTransactions := block.getAbortedTransactions()
646
647 var messages []*ConsumerMessage
648 for _, records := range block.RecordsSet {
649 switch records.recordsType {
650 case legacyRecords:
651 messageSetMessages, err := child.parseMessages(records.MsgSet)
652 if err != nil {
653 return nil, err
654 }
655
656 messages = append(messages, messageSetMessages...)
657 case defaultRecords:
658 // Consume remaining abortedTransaction up to last offset of current batch
659 for _, txn := range abortedTransactions {
660 if txn.FirstOffset > records.RecordBatch.LastOffset() {
661 break
662 }
663 abortedProducerIDs[txn.ProducerID] = struct{}{}
664 // Pop abortedTransactions so that we never add it again
665 abortedTransactions = abortedTransactions[1:]
666 }
667
668 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
669 if err != nil {
670 return nil, err
671 }
672
673 // Parse and commit offset but do not expose messages that are:
674 // - control records
675 // - part of an aborted transaction when set to `ReadCommitted`
676
677 // control record
678 isControl, err := records.isControl()
679 if err != nil {
680 // I don't know why there is this continue in case of error to begin with
681 // Safe bet is to ignore control messages if ReadUncommitted
682 // and block on them in case of error and ReadCommitted
683 if child.conf.Consumer.IsolationLevel == ReadCommitted {
684 return nil, err
685 }
686 continue
687 }
688 if isControl {
689 controlRecord, err := records.getControlRecord()
690 if err != nil {
691 return nil, err
692 }
693
694 if controlRecord.Type == ControlRecordAbort {
695 delete(abortedProducerIDs, records.RecordBatch.ProducerID)
696 }
697 continue
698 }
699
700 // filter aborted transactions
701 if child.conf.Consumer.IsolationLevel == ReadCommitted {
702 _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
703 if records.RecordBatch.IsTransactional && isAborted {
704 continue
705 }
706 }
707
708 messages = append(messages, recordBatchMessages...)
709 default:
710 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
711 }
712 }
713
714 return messages, nil
715}
716
717func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
718 for _, interceptor := range child.conf.Consumer.Interceptors {
719 msg.safelyApplyInterceptor(interceptor)
720 }
721}
722
723type brokerConsumer struct {
724 consumer *consumer
725 broker *Broker
726 input chan *partitionConsumer
727 newSubscriptions chan []*partitionConsumer
728 subscriptions map[*partitionConsumer]none
729 wait chan none
730 acks sync.WaitGroup
731 refs int
732}
733
734func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
735 bc := &brokerConsumer{
736 consumer: c,
737 broker: broker,
738 input: make(chan *partitionConsumer),
739 newSubscriptions: make(chan []*partitionConsumer),
740 wait: make(chan none),
741 subscriptions: make(map[*partitionConsumer]none),
742 refs: 0,
743 }
744
745 go withRecover(bc.subscriptionManager)
746 go withRecover(bc.subscriptionConsumer)
747
748 return bc
749}
750
751// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
752// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
753// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
754// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
755// so the main goroutine can block waiting for work if it has none.
756func (bc *brokerConsumer) subscriptionManager() {
757 var buffer []*partitionConsumer
758
759 for {
760 if len(buffer) > 0 {
761 select {
762 case event, ok := <-bc.input:
763 if !ok {
764 goto done
765 }
766 buffer = append(buffer, event)
767 case bc.newSubscriptions <- buffer:
768 buffer = nil
769 case bc.wait <- none{}:
770 }
771 } else {
772 select {
773 case event, ok := <-bc.input:
774 if !ok {
775 goto done
776 }
777 buffer = append(buffer, event)
778 case bc.newSubscriptions <- nil:
779 }
780 }
781 }
782
783done:
784 close(bc.wait)
785 if len(buffer) > 0 {
786 bc.newSubscriptions <- buffer
787 }
788 close(bc.newSubscriptions)
789}
790
791// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
792func (bc *brokerConsumer) subscriptionConsumer() {
793 <-bc.wait // wait for our first piece of work
794
795 for newSubscriptions := range bc.newSubscriptions {
796 bc.updateSubscriptions(newSubscriptions)
797
798 if len(bc.subscriptions) == 0 {
799 // We're about to be shut down or we're about to receive more subscriptions.
800 // Either way, the signal just hasn't propagated to our goroutine yet.
801 <-bc.wait
802 continue
803 }
804
805 response, err := bc.fetchNewMessages()
806 if err != nil {
807 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
808 bc.abort(err)
809 return
810 }
811
812 bc.acks.Add(len(bc.subscriptions))
813 for child := range bc.subscriptions {
814 child.feeder <- response
815 }
816 bc.acks.Wait()
817 bc.handleResponses()
818 }
819}
820
821func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
822 for _, child := range newSubscriptions {
823 bc.subscriptions[child] = none{}
824 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
825 }
826
827 for child := range bc.subscriptions {
828 select {
829 case <-child.dying:
830 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
831 close(child.trigger)
832 delete(bc.subscriptions, child)
833 default:
834 // no-op
835 }
836 }
837}
838
839// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
840func (bc *brokerConsumer) handleResponses() {
841 for child := range bc.subscriptions {
842 result := child.responseResult
843 child.responseResult = nil
844
845 if result == nil {
846 if preferredBroker, err := child.preferredBroker(); err == nil {
847 if bc.broker.ID() != preferredBroker.ID() {
848 // not an error but needs redispatching to consume from prefered replica
849 child.trigger <- none{}
850 delete(bc.subscriptions, child)
851 }
852 }
853 continue
854 }
855
856 // Discard any replica preference.
857 child.preferredReadReplica = -1
858
859 switch result {
860 case errTimedOut:
861 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
862 bc.broker.ID(), child.topic, child.partition)
863 delete(bc.subscriptions, child)
864 case ErrOffsetOutOfRange:
865 // there's no point in retrying this it will just fail the same way again
866 // shut it down and force the user to choose what to do
867 child.sendError(result)
868 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
869 close(child.trigger)
870 delete(bc.subscriptions, child)
871 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
872 // not an error, but does need redispatching
873 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
874 bc.broker.ID(), child.topic, child.partition, result)
875 child.trigger <- none{}
876 delete(bc.subscriptions, child)
877 default:
878 // dunno, tell the user and try redispatching
879 child.sendError(result)
880 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
881 bc.broker.ID(), child.topic, child.partition, result)
882 child.trigger <- none{}
883 delete(bc.subscriptions, child)
884 }
885 }
886}
887
888func (bc *brokerConsumer) abort(err error) {
889 bc.consumer.abandonBrokerConsumer(bc)
890 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
891
892 for child := range bc.subscriptions {
893 child.sendError(err)
894 child.trigger <- none{}
895 }
896
897 for newSubscriptions := range bc.newSubscriptions {
898 if len(newSubscriptions) == 0 {
899 <-bc.wait
900 continue
901 }
902 for _, child := range newSubscriptions {
903 child.sendError(err)
904 child.trigger <- none{}
905 }
906 }
907}
908
909func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
910 request := &FetchRequest{
911 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
912 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
913 }
914 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
915 request.Version = 1
916 }
917 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
918 request.Version = 2
919 }
920 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
921 request.Version = 3
922 request.MaxBytes = MaxResponseSize
923 }
924 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
925 request.Version = 4
926 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
927 }
928 if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
929 request.Version = 7
930 // We do not currently implement KIP-227 FetchSessions. Setting the id to 0
931 // and the epoch to -1 tells the broker not to generate as session ID we're going
932 // to just ignore anyway.
933 request.SessionID = 0
934 request.SessionEpoch = -1
935 }
936 if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
937 request.Version = 10
938 }
939 if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
940 request.Version = 11
941 request.RackID = bc.consumer.conf.RackID
942 }
943
944 for child := range bc.subscriptions {
945 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
946 }
947
948 return bc.broker.Fetch(request)
949}