blob: ce72ff1d62e8bdfb73ac3ebc6dfa6e29e7c42068 [file] [log] [blame]
Scott Bakere7144bc2019-10-01 14:16:47 -07001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "sync"
7 "sync/atomic"
8 "time"
9)
10
11// ConsumerMessage encapsulates a Kafka message returned by the consumer.
12type ConsumerMessage struct {
13 Key, Value []byte
14 Topic string
15 Partition int32
16 Offset int64
17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19 Headers []*RecordHeader // only set if kafka is version 0.11+
20}
21
22// ConsumerError is what is provided to the user when an error occurs.
23// It wraps an error and includes the topic and partition.
24type ConsumerError struct {
25 Topic string
26 Partition int32
27 Err error
28}
29
30func (ce ConsumerError) Error() string {
31 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
32}
33
34// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
35// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
36// when stopping.
37type ConsumerErrors []*ConsumerError
38
39func (ce ConsumerErrors) Error() string {
40 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
41}
42
43// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
44// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
45// scope.
46//
47// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
48// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
49// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
50// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
51type Consumer interface {
52
53 // Topics returns the set of available topics as retrieved from the cluster
54 // metadata. This method is the same as Client.Topics(), and is provided for
55 // convenience.
56 Topics() ([]string, error)
57
58 // Partitions returns the sorted list of all partition IDs for the given topic.
59 // This method is the same as Client.Partitions(), and is provided for convenience.
60 Partitions(topic string) ([]int32, error)
61
62 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
63 // the given offset. It will return an error if this Consumer is already consuming
64 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
65 // or OffsetOldest
66 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
67
68 // HighWaterMarks returns the current high water marks for each topic and partition.
69 // Consistency between partitions is not guaranteed since high water marks are updated separately.
70 HighWaterMarks() map[string]map[int32]int64
71
72 // Close shuts down the consumer. It must be called after all child
73 // PartitionConsumers have already been closed.
74 Close() error
75}
76
77type consumer struct {
78 client Client
79 conf *Config
80 ownClient bool
81
82 lock sync.Mutex
83 children map[string]map[int32]*partitionConsumer
84 brokerConsumers map[*Broker]*brokerConsumer
85}
86
87// NewConsumer creates a new consumer using the given broker addresses and configuration.
88func NewConsumer(addrs []string, config *Config) (Consumer, error) {
89 client, err := NewClient(addrs, config)
90 if err != nil {
91 return nil, err
92 }
93
94 c, err := NewConsumerFromClient(client)
95 if err != nil {
96 return nil, err
97 }
98 c.(*consumer).ownClient = true
99 return c, nil
100}
101
102// NewConsumerFromClient creates a new consumer using the given client. It is still
103// necessary to call Close() on the underlying client when shutting down this consumer.
104func NewConsumerFromClient(client Client) (Consumer, error) {
105 // Check that we are not dealing with a closed Client before processing any other arguments
106 if client.Closed() {
107 return nil, ErrClosedClient
108 }
109
110 c := &consumer{
111 client: client,
112 conf: client.Config(),
113 children: make(map[string]map[int32]*partitionConsumer),
114 brokerConsumers: make(map[*Broker]*brokerConsumer),
115 }
116
117 return c, nil
118}
119
120func (c *consumer) Close() error {
121 if c.ownClient {
122 return c.client.Close()
123 }
124 return nil
125}
126
127func (c *consumer) Topics() ([]string, error) {
128 return c.client.Topics()
129}
130
131func (c *consumer) Partitions(topic string) ([]int32, error) {
132 return c.client.Partitions(topic)
133}
134
135func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
136 child := &partitionConsumer{
137 consumer: c,
138 conf: c.conf,
139 topic: topic,
140 partition: partition,
141 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
142 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
143 feeder: make(chan *FetchResponse, 1),
144 trigger: make(chan none, 1),
145 dying: make(chan none),
146 fetchSize: c.conf.Consumer.Fetch.Default,
147 }
148
149 if err := child.chooseStartingOffset(offset); err != nil {
150 return nil, err
151 }
152
153 var leader *Broker
154 var err error
155 if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
156 return nil, err
157 }
158
159 if err := c.addChild(child); err != nil {
160 return nil, err
161 }
162
163 go withRecover(child.dispatcher)
164 go withRecover(child.responseFeeder)
165
166 child.broker = c.refBrokerConsumer(leader)
167 child.broker.input <- child
168
169 return child, nil
170}
171
172func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
173 c.lock.Lock()
174 defer c.lock.Unlock()
175
176 hwms := make(map[string]map[int32]int64)
177 for topic, p := range c.children {
178 hwm := make(map[int32]int64, len(p))
179 for partition, pc := range p {
180 hwm[partition] = pc.HighWaterMarkOffset()
181 }
182 hwms[topic] = hwm
183 }
184
185 return hwms
186}
187
188func (c *consumer) addChild(child *partitionConsumer) error {
189 c.lock.Lock()
190 defer c.lock.Unlock()
191
192 topicChildren := c.children[child.topic]
193 if topicChildren == nil {
194 topicChildren = make(map[int32]*partitionConsumer)
195 c.children[child.topic] = topicChildren
196 }
197
198 if topicChildren[child.partition] != nil {
199 return ConfigurationError("That topic/partition is already being consumed")
200 }
201
202 topicChildren[child.partition] = child
203 return nil
204}
205
206func (c *consumer) removeChild(child *partitionConsumer) {
207 c.lock.Lock()
208 defer c.lock.Unlock()
209
210 delete(c.children[child.topic], child.partition)
211}
212
213func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
214 c.lock.Lock()
215 defer c.lock.Unlock()
216
217 bc := c.brokerConsumers[broker]
218 if bc == nil {
219 bc = c.newBrokerConsumer(broker)
220 c.brokerConsumers[broker] = bc
221 }
222
223 bc.refs++
224
225 return bc
226}
227
228func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
229 c.lock.Lock()
230 defer c.lock.Unlock()
231
232 brokerWorker.refs--
233
234 if brokerWorker.refs == 0 {
235 close(brokerWorker.input)
236 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
237 delete(c.brokerConsumers, brokerWorker.broker)
238 }
239 }
240}
241
242func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
243 c.lock.Lock()
244 defer c.lock.Unlock()
245
246 delete(c.brokerConsumers, brokerWorker.broker)
247}
248
249// PartitionConsumer
250
251// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
252// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
253// of scope.
254//
255// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
256// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
257// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
258// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
259// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
260// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
261// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
262//
263// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
264// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
265// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
266// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
267// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
268type PartitionConsumer interface {
269
270 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
271 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
272 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
273 // this before calling Close on the underlying client.
274 AsyncClose()
275
276 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
277 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
278 // the Messages channel when this function is called, you will be competing with Close for messages; consider
279 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
280 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
281 Close() error
282
283 // Messages returns the read channel for the messages that are returned by
284 // the broker.
285 Messages() <-chan *ConsumerMessage
286
287 // Errors returns a read channel of errors that occurred during consuming, if
288 // enabled. By default, errors are logged and not returned over this channel.
289 // If you want to implement any custom error handling, set your config's
290 // Consumer.Return.Errors setting to true, and read from this channel.
291 Errors() <-chan *ConsumerError
292
293 // HighWaterMarkOffset returns the high water mark offset of the partition,
294 // i.e. the offset that will be used for the next message that will be produced.
295 // You can use this to determine how far behind the processing is.
296 HighWaterMarkOffset() int64
297}
298
299type partitionConsumer struct {
300 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
301 consumer *consumer
302 conf *Config
303 topic string
304 partition int32
305
306 broker *brokerConsumer
307 messages chan *ConsumerMessage
308 errors chan *ConsumerError
309 feeder chan *FetchResponse
310
311 trigger, dying chan none
312 responseResult error
313 closeOnce sync.Once
314
315 fetchSize int32
316 offset int64
317
318 retries int32
319}
320
321var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
322
323func (child *partitionConsumer) sendError(err error) {
324 cErr := &ConsumerError{
325 Topic: child.topic,
326 Partition: child.partition,
327 Err: err,
328 }
329
330 if child.conf.Consumer.Return.Errors {
331 child.errors <- cErr
332 } else {
333 Logger.Println(cErr)
334 }
335}
336
337func (child *partitionConsumer) computeBackoff() time.Duration {
338 if child.conf.Consumer.Retry.BackoffFunc != nil {
339 retries := atomic.AddInt32(&child.retries, 1)
340 return child.conf.Consumer.Retry.BackoffFunc(int(retries))
341 } else {
342 return child.conf.Consumer.Retry.Backoff
343 }
344}
345
346func (child *partitionConsumer) dispatcher() {
347 for range child.trigger {
348 select {
349 case <-child.dying:
350 close(child.trigger)
351 case <-time.After(child.computeBackoff()):
352 if child.broker != nil {
353 child.consumer.unrefBrokerConsumer(child.broker)
354 child.broker = nil
355 }
356
357 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
358 if err := child.dispatch(); err != nil {
359 child.sendError(err)
360 child.trigger <- none{}
361 }
362 }
363 }
364
365 if child.broker != nil {
366 child.consumer.unrefBrokerConsumer(child.broker)
367 }
368 child.consumer.removeChild(child)
369 close(child.feeder)
370}
371
372func (child *partitionConsumer) dispatch() error {
373 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
374 return err
375 }
376
377 var leader *Broker
378 var err error
379 if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
380 return err
381 }
382
383 child.broker = child.consumer.refBrokerConsumer(leader)
384
385 child.broker.input <- child
386
387 return nil
388}
389
390func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
391 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
392 if err != nil {
393 return err
394 }
395 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
396 if err != nil {
397 return err
398 }
399
400 switch {
401 case offset == OffsetNewest:
402 child.offset = newestOffset
403 case offset == OffsetOldest:
404 child.offset = oldestOffset
405 case offset >= oldestOffset && offset <= newestOffset:
406 child.offset = offset
407 default:
408 return ErrOffsetOutOfRange
409 }
410
411 return nil
412}
413
414func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
415 return child.messages
416}
417
418func (child *partitionConsumer) Errors() <-chan *ConsumerError {
419 return child.errors
420}
421
422func (child *partitionConsumer) AsyncClose() {
423 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
424 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
425 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
426 // also just close itself)
427 child.closeOnce.Do(func() {
428 close(child.dying)
429 })
430}
431
432func (child *partitionConsumer) Close() error {
433 child.AsyncClose()
434
435 go withRecover(func() {
436 for range child.messages {
437 // drain
438 }
439 })
440
441 var errors ConsumerErrors
442 for err := range child.errors {
443 errors = append(errors, err)
444 }
445
446 if len(errors) > 0 {
447 return errors
448 }
449 return nil
450}
451
452func (child *partitionConsumer) HighWaterMarkOffset() int64 {
453 return atomic.LoadInt64(&child.highWaterMarkOffset)
454}
455
456func (child *partitionConsumer) responseFeeder() {
457 var msgs []*ConsumerMessage
458 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
459 firstAttempt := true
460
461feederLoop:
462 for response := range child.feeder {
463 msgs, child.responseResult = child.parseResponse(response)
464
465 if child.responseResult == nil {
466 atomic.StoreInt32(&child.retries, 0)
467 }
468
469 for i, msg := range msgs {
470 messageSelect:
471 select {
472 case child.messages <- msg:
473 firstAttempt = true
474 case <-expiryTicker.C:
475 if !firstAttempt {
476 child.responseResult = errTimedOut
477 child.broker.acks.Done()
478 for _, msg = range msgs[i:] {
479 child.messages <- msg
480 }
481 child.broker.input <- child
482 continue feederLoop
483 } else {
484 // current message has not been sent, return to select
485 // statement
486 firstAttempt = false
487 goto messageSelect
488 }
489 }
490 }
491
492 child.broker.acks.Done()
493 }
494
495 expiryTicker.Stop()
496 close(child.messages)
497 close(child.errors)
498}
499
500func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
501 var messages []*ConsumerMessage
502 for _, msgBlock := range msgSet.Messages {
503 for _, msg := range msgBlock.Messages() {
504 offset := msg.Offset
505 timestamp := msg.Msg.Timestamp
506 if msg.Msg.Version >= 1 {
507 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
508 offset += baseOffset
509 if msg.Msg.LogAppendTime {
510 timestamp = msgBlock.Msg.Timestamp
511 }
512 }
513 if offset < child.offset {
514 continue
515 }
516 messages = append(messages, &ConsumerMessage{
517 Topic: child.topic,
518 Partition: child.partition,
519 Key: msg.Msg.Key,
520 Value: msg.Msg.Value,
521 Offset: offset,
522 Timestamp: timestamp,
523 BlockTimestamp: msgBlock.Msg.Timestamp,
524 })
525 child.offset = offset + 1
526 }
527 }
528 if len(messages) == 0 {
529 child.offset++
530 }
531 return messages, nil
532}
533
534func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
535 var messages []*ConsumerMessage
536 for _, rec := range batch.Records {
537 offset := batch.FirstOffset + rec.OffsetDelta
538 if offset < child.offset {
539 continue
540 }
541 timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
542 if batch.LogAppendTime {
543 timestamp = batch.MaxTimestamp
544 }
545 messages = append(messages, &ConsumerMessage{
546 Topic: child.topic,
547 Partition: child.partition,
548 Key: rec.Key,
549 Value: rec.Value,
550 Offset: offset,
551 Timestamp: timestamp,
552 Headers: rec.Headers,
553 })
554 child.offset = offset + 1
555 }
556 if len(messages) == 0 {
557 child.offset++
558 }
559 return messages, nil
560}
561
562func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
563 block := response.GetBlock(child.topic, child.partition)
564 if block == nil {
565 return nil, ErrIncompleteResponse
566 }
567
568 if block.Err != ErrNoError {
569 return nil, block.Err
570 }
571
572 nRecs, err := block.numRecords()
573 if err != nil {
574 return nil, err
575 }
576 if nRecs == 0 {
577 partialTrailingMessage, err := block.isPartial()
578 if err != nil {
579 return nil, err
580 }
581 // We got no messages. If we got a trailing one then we need to ask for more data.
582 // Otherwise we just poll again and wait for one to be produced...
583 if partialTrailingMessage {
584 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
585 // we can't ask for more data, we've hit the configured limit
586 child.sendError(ErrMessageTooLarge)
587 child.offset++ // skip this one so we can keep processing future messages
588 } else {
589 child.fetchSize *= 2
590 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
591 child.fetchSize = child.conf.Consumer.Fetch.Max
592 }
593 }
594 }
595
596 return nil, nil
597 }
598
599 // we got messages, reset our fetch size in case it was increased for a previous request
600 child.fetchSize = child.conf.Consumer.Fetch.Default
601 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
602
603 messages := []*ConsumerMessage{}
604 for _, records := range block.RecordsSet {
605 switch records.recordsType {
606 case legacyRecords:
607 messageSetMessages, err := child.parseMessages(records.MsgSet)
608 if err != nil {
609 return nil, err
610 }
611
612 messages = append(messages, messageSetMessages...)
613 case defaultRecords:
614 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
615 if err != nil {
616 return nil, err
617 }
618 if control, err := records.isControl(); err != nil || control {
619 continue
620 }
621
622 messages = append(messages, recordBatchMessages...)
623 default:
624 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
625 }
626 }
627
628 return messages, nil
629}
630
631// brokerConsumer
632
633type brokerConsumer struct {
634 consumer *consumer
635 broker *Broker
636 input chan *partitionConsumer
637 newSubscriptions chan []*partitionConsumer
638 wait chan none
639 subscriptions map[*partitionConsumer]none
640 acks sync.WaitGroup
641 refs int
642}
643
644func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
645 bc := &brokerConsumer{
646 consumer: c,
647 broker: broker,
648 input: make(chan *partitionConsumer),
649 newSubscriptions: make(chan []*partitionConsumer),
650 wait: make(chan none),
651 subscriptions: make(map[*partitionConsumer]none),
652 refs: 0,
653 }
654
655 go withRecover(bc.subscriptionManager)
656 go withRecover(bc.subscriptionConsumer)
657
658 return bc
659}
660
661func (bc *brokerConsumer) subscriptionManager() {
662 var buffer []*partitionConsumer
663
664 // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
665 // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
666 // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
667 // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
668 // so the main goroutine can block waiting for work if it has none.
669 for {
670 if len(buffer) > 0 {
671 select {
672 case event, ok := <-bc.input:
673 if !ok {
674 goto done
675 }
676 buffer = append(buffer, event)
677 case bc.newSubscriptions <- buffer:
678 buffer = nil
679 case bc.wait <- none{}:
680 }
681 } else {
682 select {
683 case event, ok := <-bc.input:
684 if !ok {
685 goto done
686 }
687 buffer = append(buffer, event)
688 case bc.newSubscriptions <- nil:
689 }
690 }
691 }
692
693done:
694 close(bc.wait)
695 if len(buffer) > 0 {
696 bc.newSubscriptions <- buffer
697 }
698 close(bc.newSubscriptions)
699}
700
701func (bc *brokerConsumer) subscriptionConsumer() {
702 <-bc.wait // wait for our first piece of work
703
704 // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
705 for newSubscriptions := range bc.newSubscriptions {
706 bc.updateSubscriptions(newSubscriptions)
707
708 if len(bc.subscriptions) == 0 {
709 // We're about to be shut down or we're about to receive more subscriptions.
710 // Either way, the signal just hasn't propagated to our goroutine yet.
711 <-bc.wait
712 continue
713 }
714
715 response, err := bc.fetchNewMessages()
716
717 if err != nil {
718 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
719 bc.abort(err)
720 return
721 }
722
723 bc.acks.Add(len(bc.subscriptions))
724 for child := range bc.subscriptions {
725 child.feeder <- response
726 }
727 bc.acks.Wait()
728 bc.handleResponses()
729 }
730}
731
732func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
733 for _, child := range newSubscriptions {
734 bc.subscriptions[child] = none{}
735 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
736 }
737
738 for child := range bc.subscriptions {
739 select {
740 case <-child.dying:
741 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
742 close(child.trigger)
743 delete(bc.subscriptions, child)
744 default:
745 break
746 }
747 }
748}
749
750func (bc *brokerConsumer) handleResponses() {
751 // handles the response codes left for us by our subscriptions, and abandons ones that have been closed
752 for child := range bc.subscriptions {
753 result := child.responseResult
754 child.responseResult = nil
755
756 switch result {
757 case nil:
758 break
759 case errTimedOut:
760 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
761 bc.broker.ID(), child.topic, child.partition)
762 delete(bc.subscriptions, child)
763 case ErrOffsetOutOfRange:
764 // there's no point in retrying this it will just fail the same way again
765 // shut it down and force the user to choose what to do
766 child.sendError(result)
767 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
768 close(child.trigger)
769 delete(bc.subscriptions, child)
770 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
771 // not an error, but does need redispatching
772 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
773 bc.broker.ID(), child.topic, child.partition, result)
774 child.trigger <- none{}
775 delete(bc.subscriptions, child)
776 default:
777 // dunno, tell the user and try redispatching
778 child.sendError(result)
779 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
780 bc.broker.ID(), child.topic, child.partition, result)
781 child.trigger <- none{}
782 delete(bc.subscriptions, child)
783 }
784 }
785}
786
787func (bc *brokerConsumer) abort(err error) {
788 bc.consumer.abandonBrokerConsumer(bc)
789 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
790
791 for child := range bc.subscriptions {
792 child.sendError(err)
793 child.trigger <- none{}
794 }
795
796 for newSubscriptions := range bc.newSubscriptions {
797 if len(newSubscriptions) == 0 {
798 <-bc.wait
799 continue
800 }
801 for _, child := range newSubscriptions {
802 child.sendError(err)
803 child.trigger <- none{}
804 }
805 }
806}
807
808func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
809 request := &FetchRequest{
810 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
811 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
812 }
813 if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
814 request.Version = 1
815 }
816 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
817 request.Version = 2
818 }
819 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
820 request.Version = 3
821 request.MaxBytes = MaxResponseSize
822 }
823 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
824 request.Version = 4
825 request.Isolation = ReadUncommitted // We don't support yet transactions.
826 }
827
828 for child := range bc.subscriptions {
829 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
830 }
831
832 return bc.broker.Fetch(request)
833}