blob: 72c4d7cd8ffd93bc6dc25f66beb2b907ebc26534 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001package sarama
2
3import (
4 "errors"
5 "fmt"
Scott Bakerf579f132019-10-24 14:31:41 -07006 "math"
Scott Bakere7144bc2019-10-01 14:16:47 -07007 "sync"
8 "sync/atomic"
9 "time"
Scott Bakerf579f132019-10-24 14:31:41 -070010
11 "github.com/rcrowley/go-metrics"
Scott Bakere7144bc2019-10-01 14:16:47 -070012)
13
14// ConsumerMessage encapsulates a Kafka message returned by the consumer.
15type ConsumerMessage struct {
Scott Bakerf579f132019-10-24 14:31:41 -070016 Headers []*RecordHeader // only set if kafka is version 0.11+
Scott Bakere7144bc2019-10-01 14:16:47 -070017 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Scott Bakerf579f132019-10-24 14:31:41 -070019
20 Key, Value []byte
21 Topic string
22 Partition int32
23 Offset int64
Scott Bakere7144bc2019-10-01 14:16:47 -070024}
25
26// ConsumerError is what is provided to the user when an error occurs.
27// It wraps an error and includes the topic and partition.
28type ConsumerError struct {
29 Topic string
30 Partition int32
31 Err error
32}
33
34func (ce ConsumerError) Error() string {
35 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
36}
37
38// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
39// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
40// when stopping.
41type ConsumerErrors []*ConsumerError
42
43func (ce ConsumerErrors) Error() string {
44 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
45}
46
47// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
48// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
49// scope.
Scott Bakere7144bc2019-10-01 14:16:47 -070050type Consumer interface {
Scott Bakere7144bc2019-10-01 14:16:47 -070051 // Topics returns the set of available topics as retrieved from the cluster
52 // metadata. This method is the same as Client.Topics(), and is provided for
53 // convenience.
54 Topics() ([]string, error)
55
56 // Partitions returns the sorted list of all partition IDs for the given topic.
57 // This method is the same as Client.Partitions(), and is provided for convenience.
58 Partitions(topic string) ([]int32, error)
59
60 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
61 // the given offset. It will return an error if this Consumer is already consuming
62 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
63 // or OffsetOldest
64 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
65
66 // HighWaterMarks returns the current high water marks for each topic and partition.
67 // Consistency between partitions is not guaranteed since high water marks are updated separately.
68 HighWaterMarks() map[string]map[int32]int64
69
70 // Close shuts down the consumer. It must be called after all child
71 // PartitionConsumers have already been closed.
72 Close() error
73}
74
75type consumer struct {
Scott Bakerf579f132019-10-24 14:31:41 -070076 conf *Config
Scott Bakere7144bc2019-10-01 14:16:47 -070077 children map[string]map[int32]*partitionConsumer
78 brokerConsumers map[*Broker]*brokerConsumer
Scott Bakerf579f132019-10-24 14:31:41 -070079 client Client
80 lock sync.Mutex
Scott Bakere7144bc2019-10-01 14:16:47 -070081}
82
83// NewConsumer creates a new consumer using the given broker addresses and configuration.
84func NewConsumer(addrs []string, config *Config) (Consumer, error) {
85 client, err := NewClient(addrs, config)
86 if err != nil {
87 return nil, err
88 }
Scott Bakerf579f132019-10-24 14:31:41 -070089 return newConsumer(client)
Scott Bakere7144bc2019-10-01 14:16:47 -070090}
91
92// NewConsumerFromClient creates a new consumer using the given client. It is still
93// necessary to call Close() on the underlying client when shutting down this consumer.
94func NewConsumerFromClient(client Client) (Consumer, error) {
Scott Bakerf579f132019-10-24 14:31:41 -070095 // For clients passed in by the client, ensure we don't
96 // call Close() on it.
97 cli := &nopCloserClient{client}
98 return newConsumer(cli)
99}
100
101func newConsumer(client Client) (Consumer, error) {
Scott Bakere7144bc2019-10-01 14:16:47 -0700102 // Check that we are not dealing with a closed Client before processing any other arguments
103 if client.Closed() {
104 return nil, ErrClosedClient
105 }
106
107 c := &consumer{
108 client: client,
109 conf: client.Config(),
110 children: make(map[string]map[int32]*partitionConsumer),
111 brokerConsumers: make(map[*Broker]*brokerConsumer),
112 }
113
114 return c, nil
115}
116
117func (c *consumer) Close() error {
Scott Bakerf579f132019-10-24 14:31:41 -0700118 return c.client.Close()
Scott Bakere7144bc2019-10-01 14:16:47 -0700119}
120
121func (c *consumer) Topics() ([]string, error) {
122 return c.client.Topics()
123}
124
125func (c *consumer) Partitions(topic string) ([]int32, error) {
126 return c.client.Partitions(topic)
127}
128
129func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
130 child := &partitionConsumer{
131 consumer: c,
132 conf: c.conf,
133 topic: topic,
134 partition: partition,
135 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
136 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
137 feeder: make(chan *FetchResponse, 1),
138 trigger: make(chan none, 1),
139 dying: make(chan none),
140 fetchSize: c.conf.Consumer.Fetch.Default,
141 }
142
143 if err := child.chooseStartingOffset(offset); err != nil {
144 return nil, err
145 }
146
147 var leader *Broker
148 var err error
149 if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
150 return nil, err
151 }
152
153 if err := c.addChild(child); err != nil {
154 return nil, err
155 }
156
157 go withRecover(child.dispatcher)
158 go withRecover(child.responseFeeder)
159
160 child.broker = c.refBrokerConsumer(leader)
161 child.broker.input <- child
162
163 return child, nil
164}
165
166func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
167 c.lock.Lock()
168 defer c.lock.Unlock()
169
170 hwms := make(map[string]map[int32]int64)
171 for topic, p := range c.children {
172 hwm := make(map[int32]int64, len(p))
173 for partition, pc := range p {
174 hwm[partition] = pc.HighWaterMarkOffset()
175 }
176 hwms[topic] = hwm
177 }
178
179 return hwms
180}
181
182func (c *consumer) addChild(child *partitionConsumer) error {
183 c.lock.Lock()
184 defer c.lock.Unlock()
185
186 topicChildren := c.children[child.topic]
187 if topicChildren == nil {
188 topicChildren = make(map[int32]*partitionConsumer)
189 c.children[child.topic] = topicChildren
190 }
191
192 if topicChildren[child.partition] != nil {
193 return ConfigurationError("That topic/partition is already being consumed")
194 }
195
196 topicChildren[child.partition] = child
197 return nil
198}
199
200func (c *consumer) removeChild(child *partitionConsumer) {
201 c.lock.Lock()
202 defer c.lock.Unlock()
203
204 delete(c.children[child.topic], child.partition)
205}
206
207func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
208 c.lock.Lock()
209 defer c.lock.Unlock()
210
211 bc := c.brokerConsumers[broker]
212 if bc == nil {
213 bc = c.newBrokerConsumer(broker)
214 c.brokerConsumers[broker] = bc
215 }
216
217 bc.refs++
218
219 return bc
220}
221
222func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
223 c.lock.Lock()
224 defer c.lock.Unlock()
225
226 brokerWorker.refs--
227
228 if brokerWorker.refs == 0 {
229 close(brokerWorker.input)
230 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
231 delete(c.brokerConsumers, brokerWorker.broker)
232 }
233 }
234}
235
236func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
237 c.lock.Lock()
238 defer c.lock.Unlock()
239
240 delete(c.brokerConsumers, brokerWorker.broker)
241}
242
243// PartitionConsumer
244
245// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
246// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
247// of scope.
248//
249// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
250// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
251// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
252// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
253// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
254// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
255// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
256//
257// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
Scott Bakerf579f132019-10-24 14:31:41 -0700258// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
Scott Bakere7144bc2019-10-01 14:16:47 -0700259// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
260// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
261// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
262type PartitionConsumer interface {
Scott Bakere7144bc2019-10-01 14:16:47 -0700263 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
264 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
265 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
266 // this before calling Close on the underlying client.
267 AsyncClose()
268
269 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
270 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
271 // the Messages channel when this function is called, you will be competing with Close for messages; consider
272 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
273 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
274 Close() error
275
276 // Messages returns the read channel for the messages that are returned by
277 // the broker.
278 Messages() <-chan *ConsumerMessage
279
280 // Errors returns a read channel of errors that occurred during consuming, if
281 // enabled. By default, errors are logged and not returned over this channel.
282 // If you want to implement any custom error handling, set your config's
283 // Consumer.Return.Errors setting to true, and read from this channel.
284 Errors() <-chan *ConsumerError
285
286 // HighWaterMarkOffset returns the high water mark offset of the partition,
287 // i.e. the offset that will be used for the next message that will be produced.
288 // You can use this to determine how far behind the processing is.
289 HighWaterMarkOffset() int64
290}
291
292type partitionConsumer struct {
293 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
Scott Bakere7144bc2019-10-01 14:16:47 -0700294
Scott Bakerf579f132019-10-24 14:31:41 -0700295 consumer *consumer
296 conf *Config
Scott Bakere7144bc2019-10-01 14:16:47 -0700297 broker *brokerConsumer
298 messages chan *ConsumerMessage
299 errors chan *ConsumerError
300 feeder chan *FetchResponse
301
302 trigger, dying chan none
Scott Bakere7144bc2019-10-01 14:16:47 -0700303 closeOnce sync.Once
Scott Bakerf579f132019-10-24 14:31:41 -0700304 topic string
305 partition int32
306 responseResult error
307 fetchSize int32
308 offset int64
309 retries int32
Scott Bakere7144bc2019-10-01 14:16:47 -0700310}
311
312var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
313
314func (child *partitionConsumer) sendError(err error) {
315 cErr := &ConsumerError{
316 Topic: child.topic,
317 Partition: child.partition,
318 Err: err,
319 }
320
321 if child.conf.Consumer.Return.Errors {
322 child.errors <- cErr
323 } else {
324 Logger.Println(cErr)
325 }
326}
327
328func (child *partitionConsumer) computeBackoff() time.Duration {
329 if child.conf.Consumer.Retry.BackoffFunc != nil {
330 retries := atomic.AddInt32(&child.retries, 1)
331 return child.conf.Consumer.Retry.BackoffFunc(int(retries))
Scott Bakere7144bc2019-10-01 14:16:47 -0700332 }
Scott Bakerf579f132019-10-24 14:31:41 -0700333 return child.conf.Consumer.Retry.Backoff
Scott Bakere7144bc2019-10-01 14:16:47 -0700334}
335
336func (child *partitionConsumer) dispatcher() {
337 for range child.trigger {
338 select {
339 case <-child.dying:
340 close(child.trigger)
341 case <-time.After(child.computeBackoff()):
342 if child.broker != nil {
343 child.consumer.unrefBrokerConsumer(child.broker)
344 child.broker = nil
345 }
346
347 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
348 if err := child.dispatch(); err != nil {
349 child.sendError(err)
350 child.trigger <- none{}
351 }
352 }
353 }
354
355 if child.broker != nil {
356 child.consumer.unrefBrokerConsumer(child.broker)
357 }
358 child.consumer.removeChild(child)
359 close(child.feeder)
360}
361
362func (child *partitionConsumer) dispatch() error {
363 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
364 return err
365 }
366
367 var leader *Broker
368 var err error
369 if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
370 return err
371 }
372
373 child.broker = child.consumer.refBrokerConsumer(leader)
374
375 child.broker.input <- child
376
377 return nil
378}
379
380func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
381 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
382 if err != nil {
383 return err
384 }
385 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
386 if err != nil {
387 return err
388 }
389
390 switch {
391 case offset == OffsetNewest:
392 child.offset = newestOffset
393 case offset == OffsetOldest:
394 child.offset = oldestOffset
395 case offset >= oldestOffset && offset <= newestOffset:
396 child.offset = offset
397 default:
398 return ErrOffsetOutOfRange
399 }
400
401 return nil
402}
403
404func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
405 return child.messages
406}
407
408func (child *partitionConsumer) Errors() <-chan *ConsumerError {
409 return child.errors
410}
411
412func (child *partitionConsumer) AsyncClose() {
413 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
414 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
415 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
416 // also just close itself)
417 child.closeOnce.Do(func() {
418 close(child.dying)
419 })
420}
421
422func (child *partitionConsumer) Close() error {
423 child.AsyncClose()
424
Scott Bakere7144bc2019-10-01 14:16:47 -0700425 var errors ConsumerErrors
426 for err := range child.errors {
427 errors = append(errors, err)
428 }
429
430 if len(errors) > 0 {
431 return errors
432 }
433 return nil
434}
435
436func (child *partitionConsumer) HighWaterMarkOffset() int64 {
437 return atomic.LoadInt64(&child.highWaterMarkOffset)
438}
439
440func (child *partitionConsumer) responseFeeder() {
441 var msgs []*ConsumerMessage
442 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
443 firstAttempt := true
444
445feederLoop:
446 for response := range child.feeder {
447 msgs, child.responseResult = child.parseResponse(response)
448
449 if child.responseResult == nil {
450 atomic.StoreInt32(&child.retries, 0)
451 }
452
453 for i, msg := range msgs {
454 messageSelect:
455 select {
Scott Bakerf579f132019-10-24 14:31:41 -0700456 case <-child.dying:
457 child.broker.acks.Done()
458 continue feederLoop
Scott Bakere7144bc2019-10-01 14:16:47 -0700459 case child.messages <- msg:
460 firstAttempt = true
461 case <-expiryTicker.C:
462 if !firstAttempt {
463 child.responseResult = errTimedOut
464 child.broker.acks.Done()
Scott Bakerf579f132019-10-24 14:31:41 -0700465 remainingLoop:
Scott Bakere7144bc2019-10-01 14:16:47 -0700466 for _, msg = range msgs[i:] {
Scott Bakerf579f132019-10-24 14:31:41 -0700467 select {
468 case child.messages <- msg:
469 case <-child.dying:
470 break remainingLoop
471 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700472 }
473 child.broker.input <- child
474 continue feederLoop
475 } else {
476 // current message has not been sent, return to select
477 // statement
478 firstAttempt = false
479 goto messageSelect
480 }
481 }
482 }
483
484 child.broker.acks.Done()
485 }
486
487 expiryTicker.Stop()
488 close(child.messages)
489 close(child.errors)
490}
491
492func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
493 var messages []*ConsumerMessage
494 for _, msgBlock := range msgSet.Messages {
495 for _, msg := range msgBlock.Messages() {
496 offset := msg.Offset
497 timestamp := msg.Msg.Timestamp
498 if msg.Msg.Version >= 1 {
499 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
500 offset += baseOffset
501 if msg.Msg.LogAppendTime {
502 timestamp = msgBlock.Msg.Timestamp
503 }
504 }
505 if offset < child.offset {
506 continue
507 }
508 messages = append(messages, &ConsumerMessage{
509 Topic: child.topic,
510 Partition: child.partition,
511 Key: msg.Msg.Key,
512 Value: msg.Msg.Value,
513 Offset: offset,
514 Timestamp: timestamp,
515 BlockTimestamp: msgBlock.Msg.Timestamp,
516 })
517 child.offset = offset + 1
518 }
519 }
520 if len(messages) == 0 {
521 child.offset++
522 }
523 return messages, nil
524}
525
526func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
Scott Bakerf579f132019-10-24 14:31:41 -0700527 messages := make([]*ConsumerMessage, 0, len(batch.Records))
528
Scott Bakere7144bc2019-10-01 14:16:47 -0700529 for _, rec := range batch.Records {
530 offset := batch.FirstOffset + rec.OffsetDelta
531 if offset < child.offset {
532 continue
533 }
534 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
535 if batch.LogAppendTime {
536 timestamp = batch.MaxTimestamp
537 }
538 messages = append(messages, &ConsumerMessage{
539 Topic: child.topic,
540 Partition: child.partition,
541 Key: rec.Key,
542 Value: rec.Value,
543 Offset: offset,
544 Timestamp: timestamp,
545 Headers: rec.Headers,
546 })
547 child.offset = offset + 1
548 }
549 if len(messages) == 0 {
550 child.offset++
551 }
552 return messages, nil
553}
554
555func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
Scott Bakerf579f132019-10-24 14:31:41 -0700556 var (
557 metricRegistry = child.conf.MetricRegistry
558 consumerBatchSizeMetric metrics.Histogram
559 )
560
561 if metricRegistry != nil {
562 consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
563 }
564
565 // If request was throttled and empty we log and return without error
566 if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
567 Logger.Printf(
568 "consumer/broker/%d FetchResponse throttled %v\n",
569 child.broker.broker.ID(), response.ThrottleTime)
570 return nil, nil
571 }
572
Scott Bakere7144bc2019-10-01 14:16:47 -0700573 block := response.GetBlock(child.topic, child.partition)
574 if block == nil {
575 return nil, ErrIncompleteResponse
576 }
577
578 if block.Err != ErrNoError {
579 return nil, block.Err
580 }
581
582 nRecs, err := block.numRecords()
583 if err != nil {
584 return nil, err
585 }
Scott Bakerf579f132019-10-24 14:31:41 -0700586
587 consumerBatchSizeMetric.Update(int64(nRecs))
588
Scott Bakere7144bc2019-10-01 14:16:47 -0700589 if nRecs == 0 {
590 partialTrailingMessage, err := block.isPartial()
591 if err != nil {
592 return nil, err
593 }
594 // We got no messages. If we got a trailing one then we need to ask for more data.
595 // Otherwise we just poll again and wait for one to be produced...
596 if partialTrailingMessage {
597 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
598 // we can't ask for more data, we've hit the configured limit
599 child.sendError(ErrMessageTooLarge)
600 child.offset++ // skip this one so we can keep processing future messages
601 } else {
602 child.fetchSize *= 2
Scott Bakerf579f132019-10-24 14:31:41 -0700603 // check int32 overflow
604 if child.fetchSize < 0 {
605 child.fetchSize = math.MaxInt32
606 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700607 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
608 child.fetchSize = child.conf.Consumer.Fetch.Max
609 }
610 }
611 }
612
613 return nil, nil
614 }
615
616 // we got messages, reset our fetch size in case it was increased for a previous request
617 child.fetchSize = child.conf.Consumer.Fetch.Default
618 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
619
Scott Bakerf579f132019-10-24 14:31:41 -0700620 // abortedProducerIDs contains producerID which message should be ignored as uncommitted
621 // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
622 // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
623 abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
624 abortedTransactions := block.getAbortedTransactions()
625
Scott Bakere7144bc2019-10-01 14:16:47 -0700626 messages := []*ConsumerMessage{}
627 for _, records := range block.RecordsSet {
628 switch records.recordsType {
629 case legacyRecords:
630 messageSetMessages, err := child.parseMessages(records.MsgSet)
631 if err != nil {
632 return nil, err
633 }
634
635 messages = append(messages, messageSetMessages...)
636 case defaultRecords:
Scott Bakerf579f132019-10-24 14:31:41 -0700637 // Consume remaining abortedTransaction up to last offset of current batch
638 for _, txn := range abortedTransactions {
639 if txn.FirstOffset > records.RecordBatch.LastOffset() {
640 break
641 }
642 abortedProducerIDs[txn.ProducerID] = struct{}{}
643 // Pop abortedTransactions so that we never add it again
644 abortedTransactions = abortedTransactions[1:]
645 }
646
Scott Bakere7144bc2019-10-01 14:16:47 -0700647 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
648 if err != nil {
649 return nil, err
650 }
Scott Bakerf579f132019-10-24 14:31:41 -0700651
652 // Parse and commit offset but do not expose messages that are:
653 // - control records
654 // - part of an aborted transaction when set to `ReadCommitted`
655
656 // control record
657 isControl, err := records.isControl()
658 if err != nil {
659 // I don't know why there is this continue in case of error to begin with
660 // Safe bet is to ignore control messages if ReadUncommitted
661 // and block on them in case of error and ReadCommitted
662 if child.conf.Consumer.IsolationLevel == ReadCommitted {
663 return nil, err
664 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700665 continue
666 }
Scott Bakerf579f132019-10-24 14:31:41 -0700667 if isControl {
668 controlRecord, err := records.getControlRecord()
669 if err != nil {
670 return nil, err
671 }
672
673 if controlRecord.Type == ControlRecordAbort {
674 delete(abortedProducerIDs, records.RecordBatch.ProducerID)
675 }
676 continue
677 }
678
679 // filter aborted transactions
680 if child.conf.Consumer.IsolationLevel == ReadCommitted {
681 _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
682 if records.RecordBatch.IsTransactional && isAborted {
683 continue
684 }
685 }
Scott Bakere7144bc2019-10-01 14:16:47 -0700686
687 messages = append(messages, recordBatchMessages...)
688 default:
689 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
690 }
691 }
692
693 return messages, nil
694}
695
Scott Bakere7144bc2019-10-01 14:16:47 -0700696type brokerConsumer struct {
697 consumer *consumer
698 broker *Broker
699 input chan *partitionConsumer
700 newSubscriptions chan []*partitionConsumer
Scott Bakere7144bc2019-10-01 14:16:47 -0700701 subscriptions map[*partitionConsumer]none
Scott Bakerf579f132019-10-24 14:31:41 -0700702 wait chan none
Scott Bakere7144bc2019-10-01 14:16:47 -0700703 acks sync.WaitGroup
704 refs int
705}
706
707func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
708 bc := &brokerConsumer{
709 consumer: c,
710 broker: broker,
711 input: make(chan *partitionConsumer),
712 newSubscriptions: make(chan []*partitionConsumer),
713 wait: make(chan none),
714 subscriptions: make(map[*partitionConsumer]none),
715 refs: 0,
716 }
717
718 go withRecover(bc.subscriptionManager)
719 go withRecover(bc.subscriptionConsumer)
720
721 return bc
722}
723
Scott Bakerf579f132019-10-24 14:31:41 -0700724// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
725// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
726// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
727// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
728// so the main goroutine can block waiting for work if it has none.
Scott Bakere7144bc2019-10-01 14:16:47 -0700729func (bc *brokerConsumer) subscriptionManager() {
730 var buffer []*partitionConsumer
731
Scott Bakere7144bc2019-10-01 14:16:47 -0700732 for {
733 if len(buffer) > 0 {
734 select {
735 case event, ok := <-bc.input:
736 if !ok {
737 goto done
738 }
739 buffer = append(buffer, event)
740 case bc.newSubscriptions <- buffer:
741 buffer = nil
742 case bc.wait <- none{}:
743 }
744 } else {
745 select {
746 case event, ok := <-bc.input:
747 if !ok {
748 goto done
749 }
750 buffer = append(buffer, event)
751 case bc.newSubscriptions <- nil:
752 }
753 }
754 }
755
756done:
757 close(bc.wait)
758 if len(buffer) > 0 {
759 bc.newSubscriptions <- buffer
760 }
761 close(bc.newSubscriptions)
762}
763
Scott Bakerf579f132019-10-24 14:31:41 -0700764//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
Scott Bakere7144bc2019-10-01 14:16:47 -0700765func (bc *brokerConsumer) subscriptionConsumer() {
766 <-bc.wait // wait for our first piece of work
767
Scott Bakere7144bc2019-10-01 14:16:47 -0700768 for newSubscriptions := range bc.newSubscriptions {
769 bc.updateSubscriptions(newSubscriptions)
770
771 if len(bc.subscriptions) == 0 {
772 // We're about to be shut down or we're about to receive more subscriptions.
773 // Either way, the signal just hasn't propagated to our goroutine yet.
774 <-bc.wait
775 continue
776 }
777
778 response, err := bc.fetchNewMessages()
779
780 if err != nil {
781 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
782 bc.abort(err)
783 return
784 }
785
786 bc.acks.Add(len(bc.subscriptions))
787 for child := range bc.subscriptions {
788 child.feeder <- response
789 }
790 bc.acks.Wait()
791 bc.handleResponses()
792 }
793}
794
795func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
796 for _, child := range newSubscriptions {
797 bc.subscriptions[child] = none{}
798 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
799 }
800
801 for child := range bc.subscriptions {
802 select {
803 case <-child.dying:
804 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
805 close(child.trigger)
806 delete(bc.subscriptions, child)
807 default:
Scott Bakerf579f132019-10-24 14:31:41 -0700808 // no-op
Scott Bakere7144bc2019-10-01 14:16:47 -0700809 }
810 }
811}
812
Scott Bakerf579f132019-10-24 14:31:41 -0700813//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
Scott Bakere7144bc2019-10-01 14:16:47 -0700814func (bc *brokerConsumer) handleResponses() {
Scott Bakere7144bc2019-10-01 14:16:47 -0700815 for child := range bc.subscriptions {
816 result := child.responseResult
817 child.responseResult = nil
818
819 switch result {
820 case nil:
Scott Bakerf579f132019-10-24 14:31:41 -0700821 // no-op
Scott Bakere7144bc2019-10-01 14:16:47 -0700822 case errTimedOut:
823 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
824 bc.broker.ID(), child.topic, child.partition)
825 delete(bc.subscriptions, child)
826 case ErrOffsetOutOfRange:
827 // there's no point in retrying this it will just fail the same way again
828 // shut it down and force the user to choose what to do
829 child.sendError(result)
830 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
831 close(child.trigger)
832 delete(bc.subscriptions, child)
833 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
834 // not an error, but does need redispatching
835 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
836 bc.broker.ID(), child.topic, child.partition, result)
837 child.trigger <- none{}
838 delete(bc.subscriptions, child)
839 default:
840 // dunno, tell the user and try redispatching
841 child.sendError(result)
842 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
843 bc.broker.ID(), child.topic, child.partition, result)
844 child.trigger <- none{}
845 delete(bc.subscriptions, child)
846 }
847 }
848}
849
850func (bc *brokerConsumer) abort(err error) {
851 bc.consumer.abandonBrokerConsumer(bc)
852 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
853
854 for child := range bc.subscriptions {
855 child.sendError(err)
856 child.trigger <- none{}
857 }
858
859 for newSubscriptions := range bc.newSubscriptions {
860 if len(newSubscriptions) == 0 {
861 <-bc.wait
862 continue
863 }
864 for _, child := range newSubscriptions {
865 child.sendError(err)
866 child.trigger <- none{}
867 }
868 }
869}
870
871func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
872 request := &FetchRequest{
873 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
874 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
875 }
876 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
877 request.Version = 1
878 }
879 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
880 request.Version = 2
881 }
882 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
883 request.Version = 3
884 request.MaxBytes = MaxResponseSize
885 }
886 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
887 request.Version = 4
Scott Bakerf579f132019-10-24 14:31:41 -0700888 request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
Scott Bakere7144bc2019-10-01 14:16:47 -0700889 }
890
891 for child := range bc.subscriptions {
892 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
893 }
894
895 return bc.broker.Fetch(request)
896}