blob: 33d9d143f917d8023bdd384dda0a6162f5e16a94 [file] [log] [blame]
Matteo Scandolo9a2772a2018-11-19 14:56:26 -08001package sarama
2
3import (
4 "errors"
5 "fmt"
6 "sync"
7 "sync/atomic"
8 "time"
9)
10
11// ConsumerMessage encapsulates a Kafka message returned by the consumer.
12type ConsumerMessage struct {
13 Key, Value []byte
14 Topic string
15 Partition int32
16 Offset int64
17 Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
18 BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
19 Headers []*RecordHeader // only set if kafka is version 0.11+
20}
21
22// ConsumerError is what is provided to the user when an error occurs.
23// It wraps an error and includes the topic and partition.
24type ConsumerError struct {
25 Topic string
26 Partition int32
27 Err error
28}
29
30func (ce ConsumerError) Error() string {
31 return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
32}
33
34// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
35// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
36// when stopping.
37type ConsumerErrors []*ConsumerError
38
39func (ce ConsumerErrors) Error() string {
40 return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
41}
42
43// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
44// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
45// scope.
46//
47// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
48// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
49// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
50// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
51type Consumer interface {
52
53 // Topics returns the set of available topics as retrieved from the cluster
54 // metadata. This method is the same as Client.Topics(), and is provided for
55 // convenience.
56 Topics() ([]string, error)
57
58 // Partitions returns the sorted list of all partition IDs for the given topic.
59 // This method is the same as Client.Partitions(), and is provided for convenience.
60 Partitions(topic string) ([]int32, error)
61
62 // ConsumePartition creates a PartitionConsumer on the given topic/partition with
63 // the given offset. It will return an error if this Consumer is already consuming
64 // on the given topic/partition. Offset can be a literal offset, or OffsetNewest
65 // or OffsetOldest
66 ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
67
68 // HighWaterMarks returns the current high water marks for each topic and partition.
69 // Consistency between partitions is not guaranteed since high water marks are updated separately.
70 HighWaterMarks() map[string]map[int32]int64
71
72 // Close shuts down the consumer. It must be called after all child
73 // PartitionConsumers have already been closed.
74 Close() error
75}
76
77type consumer struct {
78 client Client
79 conf *Config
80 ownClient bool
81
82 lock sync.Mutex
83 children map[string]map[int32]*partitionConsumer
84 brokerConsumers map[*Broker]*brokerConsumer
85}
86
87// NewConsumer creates a new consumer using the given broker addresses and configuration.
88func NewConsumer(addrs []string, config *Config) (Consumer, error) {
89 client, err := NewClient(addrs, config)
90 if err != nil {
91 return nil, err
92 }
93
94 c, err := NewConsumerFromClient(client)
95 if err != nil {
96 return nil, err
97 }
98 c.(*consumer).ownClient = true
99 return c, nil
100}
101
102// NewConsumerFromClient creates a new consumer using the given client. It is still
103// necessary to call Close() on the underlying client when shutting down this consumer.
104func NewConsumerFromClient(client Client) (Consumer, error) {
105 // Check that we are not dealing with a closed Client before processing any other arguments
106 if client.Closed() {
107 return nil, ErrClosedClient
108 }
109
110 c := &consumer{
111 client: client,
112 conf: client.Config(),
113 children: make(map[string]map[int32]*partitionConsumer),
114 brokerConsumers: make(map[*Broker]*brokerConsumer),
115 }
116
117 return c, nil
118}
119
120func (c *consumer) Close() error {
121 if c.ownClient {
122 return c.client.Close()
123 }
124 return nil
125}
126
127func (c *consumer) Topics() ([]string, error) {
128 return c.client.Topics()
129}
130
131func (c *consumer) Partitions(topic string) ([]int32, error) {
132 return c.client.Partitions(topic)
133}
134
135func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
136 child := &partitionConsumer{
137 consumer: c,
138 conf: c.conf,
139 topic: topic,
140 partition: partition,
141 messages: make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
142 errors: make(chan *ConsumerError, c.conf.ChannelBufferSize),
143 feeder: make(chan *FetchResponse, 1),
144 trigger: make(chan none, 1),
145 dying: make(chan none),
146 fetchSize: c.conf.Consumer.Fetch.Default,
147 }
148
149 if err := child.chooseStartingOffset(offset); err != nil {
150 return nil, err
151 }
152
153 var leader *Broker
154 var err error
155 if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
156 return nil, err
157 }
158
159 if err := c.addChild(child); err != nil {
160 return nil, err
161 }
162
163 go withRecover(child.dispatcher)
164 go withRecover(child.responseFeeder)
165
166 child.broker = c.refBrokerConsumer(leader)
167 child.broker.input <- child
168
169 return child, nil
170}
171
172func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
173 c.lock.Lock()
174 defer c.lock.Unlock()
175
176 hwms := make(map[string]map[int32]int64)
177 for topic, p := range c.children {
178 hwm := make(map[int32]int64, len(p))
179 for partition, pc := range p {
180 hwm[partition] = pc.HighWaterMarkOffset()
181 }
182 hwms[topic] = hwm
183 }
184
185 return hwms
186}
187
188func (c *consumer) addChild(child *partitionConsumer) error {
189 c.lock.Lock()
190 defer c.lock.Unlock()
191
192 topicChildren := c.children[child.topic]
193 if topicChildren == nil {
194 topicChildren = make(map[int32]*partitionConsumer)
195 c.children[child.topic] = topicChildren
196 }
197
198 if topicChildren[child.partition] != nil {
199 return ConfigurationError("That topic/partition is already being consumed")
200 }
201
202 topicChildren[child.partition] = child
203 return nil
204}
205
206func (c *consumer) removeChild(child *partitionConsumer) {
207 c.lock.Lock()
208 defer c.lock.Unlock()
209
210 delete(c.children[child.topic], child.partition)
211}
212
213func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
214 c.lock.Lock()
215 defer c.lock.Unlock()
216
217 bc := c.brokerConsumers[broker]
218 if bc == nil {
219 bc = c.newBrokerConsumer(broker)
220 c.brokerConsumers[broker] = bc
221 }
222
223 bc.refs++
224
225 return bc
226}
227
228func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
229 c.lock.Lock()
230 defer c.lock.Unlock()
231
232 brokerWorker.refs--
233
234 if brokerWorker.refs == 0 {
235 close(brokerWorker.input)
236 if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
237 delete(c.brokerConsumers, brokerWorker.broker)
238 }
239 }
240}
241
242func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
243 c.lock.Lock()
244 defer c.lock.Unlock()
245
246 delete(c.brokerConsumers, brokerWorker.broker)
247}
248
249// PartitionConsumer
250
251// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
252// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
253// of scope.
254//
255// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
256// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
257// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
258// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
259// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
260// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
261// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
262//
263// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
264// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
265// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
266// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
267// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
268type PartitionConsumer interface {
269
270 // AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
271 // should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
272 // function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
273 // this before calling Close on the underlying client.
274 AsyncClose()
275
276 // Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
277 // the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
278 // the Messages channel when this function is called, you will be competing with Close for messages; consider
279 // calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
280 // out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
281 Close() error
282
283 // Messages returns the read channel for the messages that are returned by
284 // the broker.
285 Messages() <-chan *ConsumerMessage
286
287 // Errors returns a read channel of errors that occurred during consuming, if
288 // enabled. By default, errors are logged and not returned over this channel.
289 // If you want to implement any custom error handling, set your config's
290 // Consumer.Return.Errors setting to true, and read from this channel.
291 Errors() <-chan *ConsumerError
292
293 // HighWaterMarkOffset returns the high water mark offset of the partition,
294 // i.e. the offset that will be used for the next message that will be produced.
295 // You can use this to determine how far behind the processing is.
296 HighWaterMarkOffset() int64
297}
298
299type partitionConsumer struct {
300 highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
301 consumer *consumer
302 conf *Config
303 topic string
304 partition int32
305
306 broker *brokerConsumer
307 messages chan *ConsumerMessage
308 errors chan *ConsumerError
309 feeder chan *FetchResponse
310
311 trigger, dying chan none
312 responseResult error
313 closeOnce sync.Once
314
315 fetchSize int32
316 offset int64
317}
318
319var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
320
321func (child *partitionConsumer) sendError(err error) {
322 cErr := &ConsumerError{
323 Topic: child.topic,
324 Partition: child.partition,
325 Err: err,
326 }
327
328 if child.conf.Consumer.Return.Errors {
329 child.errors <- cErr
330 } else {
331 Logger.Println(cErr)
332 }
333}
334
335func (child *partitionConsumer) dispatcher() {
336 for range child.trigger {
337 select {
338 case <-child.dying:
339 close(child.trigger)
340 case <-time.After(child.conf.Consumer.Retry.Backoff):
341 if child.broker != nil {
342 child.consumer.unrefBrokerConsumer(child.broker)
343 child.broker = nil
344 }
345
346 Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
347 if err := child.dispatch(); err != nil {
348 child.sendError(err)
349 child.trigger <- none{}
350 }
351 }
352 }
353
354 if child.broker != nil {
355 child.consumer.unrefBrokerConsumer(child.broker)
356 }
357 child.consumer.removeChild(child)
358 close(child.feeder)
359}
360
361func (child *partitionConsumer) dispatch() error {
362 if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
363 return err
364 }
365
366 var leader *Broker
367 var err error
368 if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
369 return err
370 }
371
372 child.broker = child.consumer.refBrokerConsumer(leader)
373
374 child.broker.input <- child
375
376 return nil
377}
378
379func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
380 newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
381 if err != nil {
382 return err
383 }
384 oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
385 if err != nil {
386 return err
387 }
388
389 switch {
390 case offset == OffsetNewest:
391 child.offset = newestOffset
392 case offset == OffsetOldest:
393 child.offset = oldestOffset
394 case offset >= oldestOffset && offset <= newestOffset:
395 child.offset = offset
396 default:
397 return ErrOffsetOutOfRange
398 }
399
400 return nil
401}
402
403func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
404 return child.messages
405}
406
407func (child *partitionConsumer) Errors() <-chan *ConsumerError {
408 return child.errors
409}
410
411func (child *partitionConsumer) AsyncClose() {
412 // this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
413 // the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
414 // 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
415 // also just close itself)
416 child.closeOnce.Do(func() {
417 close(child.dying)
418 })
419}
420
421func (child *partitionConsumer) Close() error {
422 child.AsyncClose()
423
424 go withRecover(func() {
425 for range child.messages {
426 // drain
427 }
428 })
429
430 var errors ConsumerErrors
431 for err := range child.errors {
432 errors = append(errors, err)
433 }
434
435 if len(errors) > 0 {
436 return errors
437 }
438 return nil
439}
440
441func (child *partitionConsumer) HighWaterMarkOffset() int64 {
442 return atomic.LoadInt64(&child.highWaterMarkOffset)
443}
444
445func (child *partitionConsumer) responseFeeder() {
446 var msgs []*ConsumerMessage
447 expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
448 firstAttempt := true
449
450feederLoop:
451 for response := range child.feeder {
452 msgs, child.responseResult = child.parseResponse(response)
453
454 for i, msg := range msgs {
455 messageSelect:
456 select {
457 case child.messages <- msg:
458 firstAttempt = true
459 case <-expiryTicker.C:
460 if !firstAttempt {
461 child.responseResult = errTimedOut
462 child.broker.acks.Done()
463 for _, msg = range msgs[i:] {
464 child.messages <- msg
465 }
466 child.broker.input <- child
467 continue feederLoop
468 } else {
469 // current message has not been sent, return to select
470 // statement
471 firstAttempt = false
472 goto messageSelect
473 }
474 }
475 }
476
477 child.broker.acks.Done()
478 }
479
480 expiryTicker.Stop()
481 close(child.messages)
482 close(child.errors)
483}
484
485func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
486 var messages []*ConsumerMessage
487 for _, msgBlock := range msgSet.Messages {
488 for _, msg := range msgBlock.Messages() {
489 offset := msg.Offset
490 if msg.Msg.Version >= 1 {
491 baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
492 offset += baseOffset
493 }
494 if offset < child.offset {
495 continue
496 }
497 messages = append(messages, &ConsumerMessage{
498 Topic: child.topic,
499 Partition: child.partition,
500 Key: msg.Msg.Key,
501 Value: msg.Msg.Value,
502 Offset: offset,
503 Timestamp: msg.Msg.Timestamp,
504 BlockTimestamp: msgBlock.Msg.Timestamp,
505 })
506 child.offset = offset + 1
507 }
508 }
509 if len(messages) == 0 {
510 return nil, ErrIncompleteResponse
511 }
512 return messages, nil
513}
514
515func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
516 var messages []*ConsumerMessage
517 for _, rec := range batch.Records {
518 offset := batch.FirstOffset + rec.OffsetDelta
519 if offset < child.offset {
520 continue
521 }
522 messages = append(messages, &ConsumerMessage{
523 Topic: child.topic,
524 Partition: child.partition,
525 Key: rec.Key,
526 Value: rec.Value,
527 Offset: offset,
528 Timestamp: batch.FirstTimestamp.Add(rec.TimestampDelta),
529 Headers: rec.Headers,
530 })
531 child.offset = offset + 1
532 }
533 if len(messages) == 0 {
534 child.offset += 1
535 }
536 return messages, nil
537}
538
539func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
540 block := response.GetBlock(child.topic, child.partition)
541 if block == nil {
542 return nil, ErrIncompleteResponse
543 }
544
545 if block.Err != ErrNoError {
546 return nil, block.Err
547 }
548
549 nRecs, err := block.numRecords()
550 if err != nil {
551 return nil, err
552 }
553 if nRecs == 0 {
554 partialTrailingMessage, err := block.isPartial()
555 if err != nil {
556 return nil, err
557 }
558 // We got no messages. If we got a trailing one then we need to ask for more data.
559 // Otherwise we just poll again and wait for one to be produced...
560 if partialTrailingMessage {
561 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
562 // we can't ask for more data, we've hit the configured limit
563 child.sendError(ErrMessageTooLarge)
564 child.offset++ // skip this one so we can keep processing future messages
565 } else {
566 child.fetchSize *= 2
567 if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
568 child.fetchSize = child.conf.Consumer.Fetch.Max
569 }
570 }
571 }
572
573 return nil, nil
574 }
575
576 // we got messages, reset our fetch size in case it was increased for a previous request
577 child.fetchSize = child.conf.Consumer.Fetch.Default
578 atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
579
580 messages := []*ConsumerMessage{}
581 for _, records := range block.RecordsSet {
582 switch records.recordsType {
583 case legacyRecords:
584 messageSetMessages, err := child.parseMessages(records.MsgSet)
585 if err != nil {
586 return nil, err
587 }
588
589 messages = append(messages, messageSetMessages...)
590 case defaultRecords:
591 recordBatchMessages, err := child.parseRecords(records.RecordBatch)
592 if err != nil {
593 return nil, err
594 }
595 if control, err := records.isControl(); err != nil || control {
596 continue
597 }
598
599 messages = append(messages, recordBatchMessages...)
600 default:
601 return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
602 }
603 }
604
605 return messages, nil
606}
607
608// brokerConsumer
609
610type brokerConsumer struct {
611 consumer *consumer
612 broker *Broker
613 input chan *partitionConsumer
614 newSubscriptions chan []*partitionConsumer
615 wait chan none
616 subscriptions map[*partitionConsumer]none
617 acks sync.WaitGroup
618 refs int
619}
620
621func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
622 bc := &brokerConsumer{
623 consumer: c,
624 broker: broker,
625 input: make(chan *partitionConsumer),
626 newSubscriptions: make(chan []*partitionConsumer),
627 wait: make(chan none),
628 subscriptions: make(map[*partitionConsumer]none),
629 refs: 0,
630 }
631
632 go withRecover(bc.subscriptionManager)
633 go withRecover(bc.subscriptionConsumer)
634
635 return bc
636}
637
638func (bc *brokerConsumer) subscriptionManager() {
639 var buffer []*partitionConsumer
640
641 // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
642 // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
643 // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
644 // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
645 // so the main goroutine can block waiting for work if it has none.
646 for {
647 if len(buffer) > 0 {
648 select {
649 case event, ok := <-bc.input:
650 if !ok {
651 goto done
652 }
653 buffer = append(buffer, event)
654 case bc.newSubscriptions <- buffer:
655 buffer = nil
656 case bc.wait <- none{}:
657 }
658 } else {
659 select {
660 case event, ok := <-bc.input:
661 if !ok {
662 goto done
663 }
664 buffer = append(buffer, event)
665 case bc.newSubscriptions <- nil:
666 }
667 }
668 }
669
670done:
671 close(bc.wait)
672 if len(buffer) > 0 {
673 bc.newSubscriptions <- buffer
674 }
675 close(bc.newSubscriptions)
676}
677
678func (bc *brokerConsumer) subscriptionConsumer() {
679 <-bc.wait // wait for our first piece of work
680
681 // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
682 for newSubscriptions := range bc.newSubscriptions {
683 bc.updateSubscriptions(newSubscriptions)
684
685 if len(bc.subscriptions) == 0 {
686 // We're about to be shut down or we're about to receive more subscriptions.
687 // Either way, the signal just hasn't propagated to our goroutine yet.
688 <-bc.wait
689 continue
690 }
691
692 response, err := bc.fetchNewMessages()
693
694 if err != nil {
695 Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
696 bc.abort(err)
697 return
698 }
699
700 bc.acks.Add(len(bc.subscriptions))
701 for child := range bc.subscriptions {
702 child.feeder <- response
703 }
704 bc.acks.Wait()
705 bc.handleResponses()
706 }
707}
708
709func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
710 for _, child := range newSubscriptions {
711 bc.subscriptions[child] = none{}
712 Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
713 }
714
715 for child := range bc.subscriptions {
716 select {
717 case <-child.dying:
718 Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
719 close(child.trigger)
720 delete(bc.subscriptions, child)
721 default:
722 break
723 }
724 }
725}
726
727func (bc *brokerConsumer) handleResponses() {
728 // handles the response codes left for us by our subscriptions, and abandons ones that have been closed
729 for child := range bc.subscriptions {
730 result := child.responseResult
731 child.responseResult = nil
732
733 switch result {
734 case nil:
735 break
736 case errTimedOut:
737 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
738 bc.broker.ID(), child.topic, child.partition)
739 delete(bc.subscriptions, child)
740 case ErrOffsetOutOfRange:
741 // there's no point in retrying this it will just fail the same way again
742 // shut it down and force the user to choose what to do
743 child.sendError(result)
744 Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
745 close(child.trigger)
746 delete(bc.subscriptions, child)
747 case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
748 // not an error, but does need redispatching
749 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
750 bc.broker.ID(), child.topic, child.partition, result)
751 child.trigger <- none{}
752 delete(bc.subscriptions, child)
753 default:
754 // dunno, tell the user and try redispatching
755 child.sendError(result)
756 Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
757 bc.broker.ID(), child.topic, child.partition, result)
758 child.trigger <- none{}
759 delete(bc.subscriptions, child)
760 }
761 }
762}
763
764func (bc *brokerConsumer) abort(err error) {
765 bc.consumer.abandonBrokerConsumer(bc)
766 _ = bc.broker.Close() // we don't care about the error this might return, we already have one
767
768 for child := range bc.subscriptions {
769 child.sendError(err)
770 child.trigger <- none{}
771 }
772
773 for newSubscriptions := range bc.newSubscriptions {
774 if len(newSubscriptions) == 0 {
775 <-bc.wait
776 continue
777 }
778 for _, child := range newSubscriptions {
779 child.sendError(err)
780 child.trigger <- none{}
781 }
782 }
783}
784
785func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
786 request := &FetchRequest{
787 MinBytes: bc.consumer.conf.Consumer.Fetch.Min,
788 MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
789 }
790 if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
791 request.Version = 2
792 }
793 if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
794 request.Version = 3
795 request.MaxBytes = MaxResponseSize
796 }
797 if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
798 request.Version = 4
799 request.Isolation = ReadUncommitted // We don't support yet transactions.
800 }
801
802 for child := range bc.subscriptions {
803 request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
804 }
805
806 return bc.broker.Fetch(request)
807}