blob: 89a0c702c36b47ee23b9a8f25a83f46194b8c370 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/eapache/go-resiliency/breaker"
10 "github.com/eapache/queue"
11)
12
13// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
14// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
15// and parses responses for errors. You must read from the Errors() channel or the
16// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
17// leaks: it will not be garbage-collected automatically when it passes out of
18// scope.
19type AsyncProducer interface {
20
21 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
22 // when both the Errors and Successes channels have been closed. When calling
23 // AsyncClose, you *must* continue to read from those channels in order to
24 // drain the results of any messages in flight.
25 AsyncClose()
26
27 // Close shuts down the producer and waits for any buffered messages to be
28 // flushed. You must call this function before a producer object passes out of
29 // scope, as it may otherwise leak memory. You must call this before calling
30 // Close on the underlying client.
31 Close() error
32
33 // Input is the input channel for the user to write messages to that they
34 // wish to send.
35 Input() chan<- *ProducerMessage
36
37 // Successes is the success output channel back to the user when Return.Successes is
38 // enabled. If Return.Successes is true, you MUST read from this channel or the
39 // Producer will deadlock. It is suggested that you send and read messages
40 // together in a single select statement.
41 Successes() <-chan *ProducerMessage
42
43 // Errors is the error output channel back to the user. You MUST read from this
44 // channel or the Producer will deadlock when the channel is full. Alternatively,
45 // you can set Producer.Return.Errors in your config to false, which prevents
46 // errors to be returned.
47 Errors() <-chan *ProducerError
48}
49
50// transactionManager keeps the state necessary to ensure idempotent production
51type transactionManager struct {
52 producerID int64
53 producerEpoch int16
54 sequenceNumbers map[string]int32
55 mutex sync.Mutex
56}
57
58const (
59 noProducerID = -1
60 noProducerEpoch = -1
61)
62
63func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
64 key := fmt.Sprintf("%s-%d", topic, partition)
65 t.mutex.Lock()
66 defer t.mutex.Unlock()
67 sequence := t.sequenceNumbers[key]
68 t.sequenceNumbers[key] = sequence + 1
69 return sequence
70}
71
72func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
73 txnmgr := &transactionManager{
74 producerID: noProducerID,
75 producerEpoch: noProducerEpoch,
76 }
77
78 if conf.Producer.Idempotent {
79 initProducerIDResponse, err := client.InitProducerID()
80 if err != nil {
81 return nil, err
82 }
83 txnmgr.producerID = initProducerIDResponse.ProducerID
84 txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
85 txnmgr.sequenceNumbers = make(map[string]int32)
86 txnmgr.mutex = sync.Mutex{}
87
88 Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
89 }
90
91 return txnmgr, nil
92}
93
94type asyncProducer struct {
95 client Client
96 conf *Config
97 ownClient bool
98
99 errors chan *ProducerError
100 input, successes, retries chan *ProducerMessage
101 inFlight sync.WaitGroup
102
103 brokers map[*Broker]*brokerProducer
104 brokerRefs map[*brokerProducer]int
105 brokerLock sync.Mutex
106
107 txnmgr *transactionManager
108}
109
110// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
111func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
112 client, err := NewClient(addrs, conf)
113 if err != nil {
114 return nil, err
115 }
116
117 p, err := NewAsyncProducerFromClient(client)
118 if err != nil {
119 return nil, err
120 }
121 p.(*asyncProducer).ownClient = true
122 return p, nil
123}
124
125// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
126// necessary to call Close() on the underlying client when shutting down this producer.
127func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
128 // Check that we are not dealing with a closed Client before processing any other arguments
129 if client.Closed() {
130 return nil, ErrClosedClient
131 }
132
133 txnmgr, err := newTransactionManager(client.Config(), client)
134 if err != nil {
135 return nil, err
136 }
137
138 p := &asyncProducer{
139 client: client,
140 conf: client.Config(),
141 errors: make(chan *ProducerError),
142 input: make(chan *ProducerMessage),
143 successes: make(chan *ProducerMessage),
144 retries: make(chan *ProducerMessage),
145 brokers: make(map[*Broker]*brokerProducer),
146 brokerRefs: make(map[*brokerProducer]int),
147 txnmgr: txnmgr,
148 }
149
150 // launch our singleton dispatchers
151 go withRecover(p.dispatcher)
152 go withRecover(p.retryHandler)
153
154 return p, nil
155}
156
157type flagSet int8
158
159const (
160 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
161 fin // final message from partitionProducer to brokerProducer and back
162 shutdown // start the shutdown process
163)
164
165// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
166type ProducerMessage struct {
167 Topic string // The Kafka topic for this message.
168 // The partitioning key for this message. Pre-existing Encoders include
169 // StringEncoder and ByteEncoder.
170 Key Encoder
171 // The actual message to store in Kafka. Pre-existing Encoders include
172 // StringEncoder and ByteEncoder.
173 Value Encoder
174
175 // The headers are key-value pairs that are transparently passed
176 // by Kafka between producers and consumers.
177 Headers []RecordHeader
178
179 // This field is used to hold arbitrary data you wish to include so it
180 // will be available when receiving on the Successes and Errors channels.
181 // Sarama completely ignores this field and is only to be used for
182 // pass-through data.
183 Metadata interface{}
184
185 // Below this point are filled in by the producer as the message is processed
186
187 // Offset is the offset of the message stored on the broker. This is only
188 // guaranteed to be defined if the message was successfully delivered and
189 // RequiredAcks is not NoResponse.
190 Offset int64
191 // Partition is the partition that the message was sent to. This is only
192 // guaranteed to be defined if the message was successfully delivered.
193 Partition int32
194 // Timestamp is the timestamp assigned to the message by the broker. This
195 // is only guaranteed to be defined if the message was successfully
196 // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
197 // least version 0.10.0.
198 Timestamp time.Time
199
200 retries int
201 flags flagSet
202 expectation chan *ProducerError
203 sequenceNumber int32
204}
205
206const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
207
208func (m *ProducerMessage) byteSize(version int) int {
209 var size int
210 if version >= 2 {
211 size = maximumRecordOverhead
212 for _, h := range m.Headers {
213 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
214 }
215 } else {
216 size = producerMessageOverhead
217 }
218 if m.Key != nil {
219 size += m.Key.Length()
220 }
221 if m.Value != nil {
222 size += m.Value.Length()
223 }
224 return size
225}
226
227func (m *ProducerMessage) clear() {
228 m.flags = 0
229 m.retries = 0
230}
231
232// ProducerError is the type of error generated when the producer fails to deliver a message.
233// It contains the original ProducerMessage as well as the actual error value.
234type ProducerError struct {
235 Msg *ProducerMessage
236 Err error
237}
238
239func (pe ProducerError) Error() string {
240 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
241}
242
243// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
244// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
245// when closing a producer.
246type ProducerErrors []*ProducerError
247
248func (pe ProducerErrors) Error() string {
249 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
250}
251
252func (p *asyncProducer) Errors() <-chan *ProducerError {
253 return p.errors
254}
255
256func (p *asyncProducer) Successes() <-chan *ProducerMessage {
257 return p.successes
258}
259
260func (p *asyncProducer) Input() chan<- *ProducerMessage {
261 return p.input
262}
263
264func (p *asyncProducer) Close() error {
265 p.AsyncClose()
266
267 if p.conf.Producer.Return.Successes {
268 go withRecover(func() {
269 for range p.successes {
270 }
271 })
272 }
273
274 var errors ProducerErrors
275 if p.conf.Producer.Return.Errors {
276 for event := range p.errors {
277 errors = append(errors, event)
278 }
279 } else {
280 <-p.errors
281 }
282
283 if len(errors) > 0 {
284 return errors
285 }
286 return nil
287}
288
289func (p *asyncProducer) AsyncClose() {
290 go withRecover(p.shutdown)
291}
292
293// singleton
294// dispatches messages by topic
295func (p *asyncProducer) dispatcher() {
296 handlers := make(map[string]chan<- *ProducerMessage)
297 shuttingDown := false
298
299 for msg := range p.input {
300 if msg == nil {
301 Logger.Println("Something tried to send a nil message, it was ignored.")
302 continue
303 }
304
305 if msg.flags&shutdown != 0 {
306 shuttingDown = true
307 p.inFlight.Done()
308 continue
309 } else if msg.retries == 0 {
310 if shuttingDown {
311 // we can't just call returnError here because that decrements the wait group,
312 // which hasn't been incremented yet for this message, and shouldn't be
313 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
314 if p.conf.Producer.Return.Errors {
315 p.errors <- pErr
316 } else {
317 Logger.Println(pErr)
318 }
319 continue
320 }
321 p.inFlight.Add(1)
322 }
323
324 version := 1
325 if p.conf.Version.IsAtLeast(V0_11_0_0) {
326 version = 2
327 } else if msg.Headers != nil {
328 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
329 continue
330 }
331 if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
332 p.returnError(msg, ErrMessageSizeTooLarge)
333 continue
334 }
335
336 handler := handlers[msg.Topic]
337 if handler == nil {
338 handler = p.newTopicProducer(msg.Topic)
339 handlers[msg.Topic] = handler
340 }
341
342 handler <- msg
343 }
344
345 for _, handler := range handlers {
346 close(handler)
347 }
348}
349
350// one per topic
351// partitions messages, then dispatches them by partition
352type topicProducer struct {
353 parent *asyncProducer
354 topic string
355 input <-chan *ProducerMessage
356
357 breaker *breaker.Breaker
358 handlers map[int32]chan<- *ProducerMessage
359 partitioner Partitioner
360}
361
362func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
363 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
364 tp := &topicProducer{
365 parent: p,
366 topic: topic,
367 input: input,
368 breaker: breaker.New(3, 1, 10*time.Second),
369 handlers: make(map[int32]chan<- *ProducerMessage),
370 partitioner: p.conf.Producer.Partitioner(topic),
371 }
372 go withRecover(tp.dispatch)
373 return input
374}
375
376func (tp *topicProducer) dispatch() {
377 for msg := range tp.input {
378 if msg.retries == 0 {
379 if err := tp.partitionMessage(msg); err != nil {
380 tp.parent.returnError(msg, err)
381 continue
382 }
383 }
384 // All messages being retried (sent or not) have already had their retry count updated
385 if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
386 msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
387 }
388
389 handler := tp.handlers[msg.Partition]
390 if handler == nil {
391 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
392 tp.handlers[msg.Partition] = handler
393 }
394
395 handler <- msg
396 }
397
398 for _, handler := range tp.handlers {
399 close(handler)
400 }
401}
402
403func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
404 var partitions []int32
405
406 err := tp.breaker.Run(func() (err error) {
407 var requiresConsistency = false
408 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
409 requiresConsistency = ep.MessageRequiresConsistency(msg)
410 } else {
411 requiresConsistency = tp.partitioner.RequiresConsistency()
412 }
413
414 if requiresConsistency {
415 partitions, err = tp.parent.client.Partitions(msg.Topic)
416 } else {
417 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
418 }
419 return
420 })
421
422 if err != nil {
423 return err
424 }
425
426 numPartitions := int32(len(partitions))
427
428 if numPartitions == 0 {
429 return ErrLeaderNotAvailable
430 }
431
432 choice, err := tp.partitioner.Partition(msg, numPartitions)
433
434 if err != nil {
435 return err
436 } else if choice < 0 || choice >= numPartitions {
437 return ErrInvalidPartition
438 }
439
440 msg.Partition = partitions[choice]
441
442 return nil
443}
444
445// one per partition per topic
446// dispatches messages to the appropriate broker
447// also responsible for maintaining message order during retries
448type partitionProducer struct {
449 parent *asyncProducer
450 topic string
451 partition int32
452 input <-chan *ProducerMessage
453
454 leader *Broker
455 breaker *breaker.Breaker
456 brokerProducer *brokerProducer
457
458 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
459 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
460 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
461 // therefore whether our buffer is complete and safe to flush)
462 highWatermark int
463 retryState []partitionRetryState
464}
465
466type partitionRetryState struct {
467 buf []*ProducerMessage
468 expectChaser bool
469}
470
471func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
472 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
473 pp := &partitionProducer{
474 parent: p,
475 topic: topic,
476 partition: partition,
477 input: input,
478
479 breaker: breaker.New(3, 1, 10*time.Second),
480 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
481 }
482 go withRecover(pp.dispatch)
483 return input
484}
485
486func (pp *partitionProducer) dispatch() {
487 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
488 // on the first message
489 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
490 if pp.leader != nil {
491 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
492 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
493 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
494 }
495
496 for msg := range pp.input {
497 if msg.retries > pp.highWatermark {
498 // a new, higher, retry level; handle it and then back off
499 pp.newHighWatermark(msg.retries)
500 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
501 } else if pp.highWatermark > 0 {
502 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
503 if msg.retries < pp.highWatermark {
504 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
505 if msg.flags&fin == fin {
506 pp.retryState[msg.retries].expectChaser = false
507 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
508 } else {
509 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
510 }
511 continue
512 } else if msg.flags&fin == fin {
513 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
514 // meaning this retry level is done and we can go down (at least) one level and flush that
515 pp.retryState[pp.highWatermark].expectChaser = false
516 pp.flushRetryBuffers()
517 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
518 continue
519 }
520 }
521
522 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
523 // without breaking any of our ordering guarantees
524
525 if pp.brokerProducer == nil {
526 if err := pp.updateLeader(); err != nil {
527 pp.parent.returnError(msg, err)
528 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
529 continue
530 }
531 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
532 }
533
534 pp.brokerProducer.input <- msg
535 }
536
537 if pp.brokerProducer != nil {
538 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
539 }
540}
541
542func (pp *partitionProducer) newHighWatermark(hwm int) {
543 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
544 pp.highWatermark = hwm
545
546 // send off a fin so that we know when everything "in between" has made it
547 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
548 pp.retryState[pp.highWatermark].expectChaser = true
549 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
550 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
551
552 // a new HWM means that our current broker selection is out of date
553 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
554 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
555 pp.brokerProducer = nil
556}
557
558func (pp *partitionProducer) flushRetryBuffers() {
559 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
560 for {
561 pp.highWatermark--
562
563 if pp.brokerProducer == nil {
564 if err := pp.updateLeader(); err != nil {
565 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
566 goto flushDone
567 }
568 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
569 }
570
571 for _, msg := range pp.retryState[pp.highWatermark].buf {
572 pp.brokerProducer.input <- msg
573 }
574
575 flushDone:
576 pp.retryState[pp.highWatermark].buf = nil
577 if pp.retryState[pp.highWatermark].expectChaser {
578 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
579 break
580 } else if pp.highWatermark == 0 {
581 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
582 break
583 }
584 }
585}
586
587func (pp *partitionProducer) updateLeader() error {
588 return pp.breaker.Run(func() (err error) {
589 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
590 return err
591 }
592
593 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
594 return err
595 }
596
597 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
598 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
599 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
600
601 return nil
602 })
603}
604
605// one per broker; also constructs an associated flusher
606func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
607 var (
608 input = make(chan *ProducerMessage)
609 bridge = make(chan *produceSet)
610 responses = make(chan *brokerProducerResponse)
611 )
612
613 bp := &brokerProducer{
614 parent: p,
615 broker: broker,
616 input: input,
617 output: bridge,
618 responses: responses,
619 buffer: newProduceSet(p),
620 currentRetries: make(map[string]map[int32]error),
621 }
622 go withRecover(bp.run)
623
624 // minimal bridge to make the network response `select`able
625 go withRecover(func() {
626 for set := range bridge {
627 request := set.buildRequest()
628
629 response, err := broker.Produce(request)
630
631 responses <- &brokerProducerResponse{
632 set: set,
633 err: err,
634 res: response,
635 }
636 }
637 close(responses)
638 })
639
640 return bp
641}
642
643type brokerProducerResponse struct {
644 set *produceSet
645 err error
646 res *ProduceResponse
647}
648
649// groups messages together into appropriately-sized batches for sending to the broker
650// handles state related to retries etc
651type brokerProducer struct {
652 parent *asyncProducer
653 broker *Broker
654
655 input chan *ProducerMessage
656 output chan<- *produceSet
657 responses <-chan *brokerProducerResponse
658
659 buffer *produceSet
660 timer <-chan time.Time
661 timerFired bool
662
663 closing error
664 currentRetries map[string]map[int32]error
665}
666
667func (bp *brokerProducer) run() {
668 var output chan<- *produceSet
669 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
670
671 for {
672 select {
673 case msg := <-bp.input:
674 if msg == nil {
675 bp.shutdown()
676 return
677 }
678
679 if msg.flags&syn == syn {
680 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
681 bp.broker.ID(), msg.Topic, msg.Partition)
682 if bp.currentRetries[msg.Topic] == nil {
683 bp.currentRetries[msg.Topic] = make(map[int32]error)
684 }
685 bp.currentRetries[msg.Topic][msg.Partition] = nil
686 bp.parent.inFlight.Done()
687 continue
688 }
689
690 if reason := bp.needsRetry(msg); reason != nil {
691 bp.parent.retryMessage(msg, reason)
692
693 if bp.closing == nil && msg.flags&fin == fin {
694 // we were retrying this partition but we can start processing again
695 delete(bp.currentRetries[msg.Topic], msg.Partition)
696 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
697 bp.broker.ID(), msg.Topic, msg.Partition)
698 }
699
700 continue
701 }
702
703 if bp.buffer.wouldOverflow(msg) {
704 if err := bp.waitForSpace(msg); err != nil {
705 bp.parent.retryMessage(msg, err)
706 continue
707 }
708 }
709
710 if err := bp.buffer.add(msg); err != nil {
711 bp.parent.returnError(msg, err)
712 continue
713 }
714
715 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
716 bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
717 }
718 case <-bp.timer:
719 bp.timerFired = true
720 case output <- bp.buffer:
721 bp.rollOver()
722 case response := <-bp.responses:
723 bp.handleResponse(response)
724 }
725
726 if bp.timerFired || bp.buffer.readyToFlush() {
727 output = bp.output
728 } else {
729 output = nil
730 }
731 }
732}
733
734func (bp *brokerProducer) shutdown() {
735 for !bp.buffer.empty() {
736 select {
737 case response := <-bp.responses:
738 bp.handleResponse(response)
739 case bp.output <- bp.buffer:
740 bp.rollOver()
741 }
742 }
743 close(bp.output)
744 for response := range bp.responses {
745 bp.handleResponse(response)
746 }
747
748 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
749}
750
751func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
752 if bp.closing != nil {
753 return bp.closing
754 }
755
756 return bp.currentRetries[msg.Topic][msg.Partition]
757}
758
759func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
760 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
761
762 for {
763 select {
764 case response := <-bp.responses:
765 bp.handleResponse(response)
766 // handling a response can change our state, so re-check some things
767 if reason := bp.needsRetry(msg); reason != nil {
768 return reason
769 } else if !bp.buffer.wouldOverflow(msg) {
770 return nil
771 }
772 case bp.output <- bp.buffer:
773 bp.rollOver()
774 return nil
775 }
776 }
777}
778
779func (bp *brokerProducer) rollOver() {
780 bp.timer = nil
781 bp.timerFired = false
782 bp.buffer = newProduceSet(bp.parent)
783}
784
785func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
786 if response.err != nil {
787 bp.handleError(response.set, response.err)
788 } else {
789 bp.handleSuccess(response.set, response.res)
790 }
791
792 if bp.buffer.empty() {
793 bp.rollOver() // this can happen if the response invalidated our buffer
794 }
795}
796
797func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
798 // we iterate through the blocks in the request set, not the response, so that we notice
799 // if the response is missing a block completely
800 var retryTopics []string
801 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
802 if response == nil {
803 // this only happens when RequiredAcks is NoResponse, so we have to assume success
804 bp.parent.returnSuccesses(pSet.msgs)
805 return
806 }
807
808 block := response.GetBlock(topic, partition)
809 if block == nil {
810 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
811 return
812 }
813
814 switch block.Err {
815 // Success
816 case ErrNoError:
817 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
818 for _, msg := range pSet.msgs {
819 msg.Timestamp = block.Timestamp
820 }
821 }
822 for i, msg := range pSet.msgs {
823 msg.Offset = block.Offset + int64(i)
824 }
825 bp.parent.returnSuccesses(pSet.msgs)
826 // Duplicate
827 case ErrDuplicateSequenceNumber:
828 bp.parent.returnSuccesses(pSet.msgs)
829 // Retriable errors
830 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
831 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
832 retryTopics = append(retryTopics, topic)
833 // Other non-retriable errors
834 default:
835 bp.parent.returnErrors(pSet.msgs, block.Err)
836 }
837 })
838
839 if len(retryTopics) > 0 {
840 if bp.parent.conf.Producer.Idempotent {
841 err := bp.parent.client.RefreshMetadata(retryTopics...)
842 if err != nil {
843 Logger.Printf("Failed refreshing metadata because of %v\n", err)
844 }
845 }
846
847 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
848 block := response.GetBlock(topic, partition)
849 if block == nil {
850 // handled in the previous "eachPartition" loop
851 return
852 }
853
854 switch block.Err {
855 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
856 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
857 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
858 bp.broker.ID(), topic, partition, block.Err)
859 if bp.currentRetries[topic] == nil {
860 bp.currentRetries[topic] = make(map[int32]error)
861 }
862 bp.currentRetries[topic][partition] = block.Err
863 if bp.parent.conf.Producer.Idempotent {
864 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
865 } else {
866 bp.parent.retryMessages(pSet.msgs, block.Err)
867 }
868 // dropping the following messages has the side effect of incrementing their retry count
869 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
870 }
871 })
872 }
873}
874
875func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
876 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
877 produceSet := newProduceSet(p)
878 produceSet.msgs[topic] = make(map[int32]*partitionSet)
879 produceSet.msgs[topic][partition] = pSet
880 produceSet.bufferBytes += pSet.bufferBytes
881 produceSet.bufferCount += len(pSet.msgs)
882 for _, msg := range pSet.msgs {
883 if msg.retries >= p.conf.Producer.Retry.Max {
884 p.returnError(msg, kerr)
885 return
886 }
887 msg.retries++
888 }
889
890 // it's expected that a metadata refresh has been requested prior to calling retryBatch
891 leader, err := p.client.Leader(topic, partition)
892 if err != nil {
893 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
894 for _, msg := range pSet.msgs {
895 p.returnError(msg, kerr)
896 }
897 return
898 }
899 bp := p.getBrokerProducer(leader)
900 bp.output <- produceSet
901}
902
903func (bp *brokerProducer) handleError(sent *produceSet, err error) {
904 switch err.(type) {
905 case PacketEncodingError:
906 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
907 bp.parent.returnErrors(pSet.msgs, err)
908 })
909 default:
910 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
911 bp.parent.abandonBrokerConnection(bp.broker)
912 _ = bp.broker.Close()
913 bp.closing = err
914 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
915 bp.parent.retryMessages(pSet.msgs, err)
916 })
917 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
918 bp.parent.retryMessages(pSet.msgs, err)
919 })
920 bp.rollOver()
921 }
922}
923
924// singleton
925// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
926// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
927func (p *asyncProducer) retryHandler() {
928 var msg *ProducerMessage
929 buf := queue.New()
930
931 for {
932 if buf.Length() == 0 {
933 msg = <-p.retries
934 } else {
935 select {
936 case msg = <-p.retries:
937 case p.input <- buf.Peek().(*ProducerMessage):
938 buf.Remove()
939 continue
940 }
941 }
942
943 if msg == nil {
944 return
945 }
946
947 buf.Add(msg)
948 }
949}
950
951// utility functions
952
953func (p *asyncProducer) shutdown() {
954 Logger.Println("Producer shutting down.")
955 p.inFlight.Add(1)
956 p.input <- &ProducerMessage{flags: shutdown}
957
958 p.inFlight.Wait()
959
960 if p.ownClient {
961 err := p.client.Close()
962 if err != nil {
963 Logger.Println("producer/shutdown failed to close the embedded client:", err)
964 }
965 }
966
967 close(p.input)
968 close(p.retries)
969 close(p.errors)
970 close(p.successes)
971}
972
973func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
974 msg.clear()
975 pErr := &ProducerError{Msg: msg, Err: err}
976 if p.conf.Producer.Return.Errors {
977 p.errors <- pErr
978 } else {
979 Logger.Println(pErr)
980 }
981 p.inFlight.Done()
982}
983
984func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
985 for _, msg := range batch {
986 p.returnError(msg, err)
987 }
988}
989
990func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
991 for _, msg := range batch {
992 if p.conf.Producer.Return.Successes {
993 msg.clear()
994 p.successes <- msg
995 }
996 p.inFlight.Done()
997 }
998}
999
1000func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1001 if msg.retries >= p.conf.Producer.Retry.Max {
1002 p.returnError(msg, err)
1003 } else {
1004 msg.retries++
1005 p.retries <- msg
1006 }
1007}
1008
1009func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1010 for _, msg := range batch {
1011 p.retryMessage(msg, err)
1012 }
1013}
1014
1015func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1016 p.brokerLock.Lock()
1017 defer p.brokerLock.Unlock()
1018
1019 bp := p.brokers[broker]
1020
1021 if bp == nil {
1022 bp = p.newBrokerProducer(broker)
1023 p.brokers[broker] = bp
1024 p.brokerRefs[bp] = 0
1025 }
1026
1027 p.brokerRefs[bp]++
1028
1029 return bp
1030}
1031
1032func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1033 p.brokerLock.Lock()
1034 defer p.brokerLock.Unlock()
1035
1036 p.brokerRefs[bp]--
1037 if p.brokerRefs[bp] == 0 {
1038 close(bp.input)
1039 delete(p.brokerRefs, bp)
1040
1041 if p.brokers[broker] == bp {
1042 delete(p.brokers, broker)
1043 }
1044 }
1045}
1046
1047func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1048 p.brokerLock.Lock()
1049 defer p.brokerLock.Unlock()
1050
1051 delete(p.brokers, broker)
1052}