blob: 9b15cd1920be7a2c7d75551fdd50643ecf0348d7 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/eapache/go-resiliency/breaker"
10 "github.com/eapache/queue"
11)
12
13// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
14// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
15// and parses responses for errors. You must read from the Errors() channel or the
16// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
17// leaks: it will not be garbage-collected automatically when it passes out of
18// scope.
19type AsyncProducer interface {
20
21 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
22 // when both the Errors and Successes channels have been closed. When calling
23 // AsyncClose, you *must* continue to read from those channels in order to
24 // drain the results of any messages in flight.
25 AsyncClose()
26
27 // Close shuts down the producer and waits for any buffered messages to be
28 // flushed. You must call this function before a producer object passes out of
29 // scope, as it may otherwise leak memory. You must call this before calling
30 // Close on the underlying client.
31 Close() error
32
33 // Input is the input channel for the user to write messages to that they
34 // wish to send.
35 Input() chan<- *ProducerMessage
36
37 // Successes is the success output channel back to the user when Return.Successes is
38 // enabled. If Return.Successes is true, you MUST read from this channel or the
39 // Producer will deadlock. It is suggested that you send and read messages
40 // together in a single select statement.
41 Successes() <-chan *ProducerMessage
42
43 // Errors is the error output channel back to the user. You MUST read from this
44 // channel or the Producer will deadlock when the channel is full. Alternatively,
45 // you can set Producer.Return.Errors in your config to false, which prevents
46 // errors to be returned.
47 Errors() <-chan *ProducerError
48}
49
50// transactionManager keeps the state necessary to ensure idempotent production
51type transactionManager struct {
52 producerID int64
53 producerEpoch int16
54 sequenceNumbers map[string]int32
55 mutex sync.Mutex
56}
57
58const (
59 noProducerID = -1
60 noProducerEpoch = -1
61)
62
63func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
64 key := fmt.Sprintf("%s-%d", topic, partition)
65 t.mutex.Lock()
66 defer t.mutex.Unlock()
67 sequence := t.sequenceNumbers[key]
68 t.sequenceNumbers[key] = sequence + 1
69 return sequence
70}
71
72func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
73 txnmgr := &transactionManager{
74 producerID: noProducerID,
75 producerEpoch: noProducerEpoch,
76 }
77
78 if conf.Producer.Idempotent {
79 initProducerIDResponse, err := client.InitProducerID()
80 if err != nil {
81 return nil, err
82 }
83 txnmgr.producerID = initProducerIDResponse.ProducerID
84 txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
85 txnmgr.sequenceNumbers = make(map[string]int32)
86 txnmgr.mutex = sync.Mutex{}
87
88 Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
89 }
90
91 return txnmgr, nil
92}
93
94type asyncProducer struct {
95 client Client
96 conf *Config
97
98 errors chan *ProducerError
99 input, successes, retries chan *ProducerMessage
100 inFlight sync.WaitGroup
101
102 brokers map[*Broker]*brokerProducer
103 brokerRefs map[*brokerProducer]int
104 brokerLock sync.Mutex
105
106 txnmgr *transactionManager
107}
108
109// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
110func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
111 client, err := NewClient(addrs, conf)
112 if err != nil {
113 return nil, err
114 }
115 return newAsyncProducer(client)
116}
117
118// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
119// necessary to call Close() on the underlying client when shutting down this producer.
120func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
121 // For clients passed in by the client, ensure we don't
122 // call Close() on it.
123 cli := &nopCloserClient{client}
124 return newAsyncProducer(cli)
125}
126
127func newAsyncProducer(client Client) (AsyncProducer, error) {
128 // Check that we are not dealing with a closed Client before processing any other arguments
129 if client.Closed() {
130 return nil, ErrClosedClient
131 }
132
133 txnmgr, err := newTransactionManager(client.Config(), client)
134 if err != nil {
135 return nil, err
136 }
137
138 p := &asyncProducer{
139 client: client,
140 conf: client.Config(),
141 errors: make(chan *ProducerError),
142 input: make(chan *ProducerMessage),
143 successes: make(chan *ProducerMessage),
144 retries: make(chan *ProducerMessage),
145 brokers: make(map[*Broker]*brokerProducer),
146 brokerRefs: make(map[*brokerProducer]int),
147 txnmgr: txnmgr,
148 }
149
150 // launch our singleton dispatchers
151 go withRecover(p.dispatcher)
152 go withRecover(p.retryHandler)
153
154 return p, nil
155}
156
157type flagSet int8
158
159const (
160 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
161 fin // final message from partitionProducer to brokerProducer and back
162 shutdown // start the shutdown process
163)
164
165// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
166type ProducerMessage struct {
167 Topic string // The Kafka topic for this message.
168 // The partitioning key for this message. Pre-existing Encoders include
169 // StringEncoder and ByteEncoder.
170 Key Encoder
171 // The actual message to store in Kafka. Pre-existing Encoders include
172 // StringEncoder and ByteEncoder.
173 Value Encoder
174
175 // The headers are key-value pairs that are transparently passed
176 // by Kafka between producers and consumers.
177 Headers []RecordHeader
178
179 // This field is used to hold arbitrary data you wish to include so it
180 // will be available when receiving on the Successes and Errors channels.
181 // Sarama completely ignores this field and is only to be used for
182 // pass-through data.
183 Metadata interface{}
184
185 // Below this point are filled in by the producer as the message is processed
186
187 // Offset is the offset of the message stored on the broker. This is only
188 // guaranteed to be defined if the message was successfully delivered and
189 // RequiredAcks is not NoResponse.
190 Offset int64
191 // Partition is the partition that the message was sent to. This is only
192 // guaranteed to be defined if the message was successfully delivered.
193 Partition int32
194 // Timestamp can vary in behaviour depending on broker configuration, being
195 // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
196 // and requiring version at least 0.10.0.
197 //
198 // When configured to CreateTime, the timestamp is specified by the producer
199 // either by explicitly setting this field, or when the message is added
200 // to a produce set.
201 //
202 // When configured to LogAppendTime, the timestamp assigned to the message
203 // by the broker. This is only guaranteed to be defined if the message was
204 // successfully delivered and RequiredAcks is not NoResponse.
205 Timestamp time.Time
206
207 retries int
208 flags flagSet
209 expectation chan *ProducerError
210 sequenceNumber int32
211}
212
213const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
214
215func (m *ProducerMessage) byteSize(version int) int {
216 var size int
217 if version >= 2 {
218 size = maximumRecordOverhead
219 for _, h := range m.Headers {
220 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
221 }
222 } else {
223 size = producerMessageOverhead
224 }
225 if m.Key != nil {
226 size += m.Key.Length()
227 }
228 if m.Value != nil {
229 size += m.Value.Length()
230 }
231 return size
232}
233
234func (m *ProducerMessage) clear() {
235 m.flags = 0
236 m.retries = 0
237}
238
239// ProducerError is the type of error generated when the producer fails to deliver a message.
240// It contains the original ProducerMessage as well as the actual error value.
241type ProducerError struct {
242 Msg *ProducerMessage
243 Err error
244}
245
246func (pe ProducerError) Error() string {
247 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
248}
249
250// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
251// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
252// when closing a producer.
253type ProducerErrors []*ProducerError
254
255func (pe ProducerErrors) Error() string {
256 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
257}
258
259func (p *asyncProducer) Errors() <-chan *ProducerError {
260 return p.errors
261}
262
263func (p *asyncProducer) Successes() <-chan *ProducerMessage {
264 return p.successes
265}
266
267func (p *asyncProducer) Input() chan<- *ProducerMessage {
268 return p.input
269}
270
271func (p *asyncProducer) Close() error {
272 p.AsyncClose()
273
274 if p.conf.Producer.Return.Successes {
275 go withRecover(func() {
276 for range p.successes {
277 }
278 })
279 }
280
281 var errors ProducerErrors
282 if p.conf.Producer.Return.Errors {
283 for event := range p.errors {
284 errors = append(errors, event)
285 }
286 } else {
287 <-p.errors
288 }
289
290 if len(errors) > 0 {
291 return errors
292 }
293 return nil
294}
295
296func (p *asyncProducer) AsyncClose() {
297 go withRecover(p.shutdown)
298}
299
300// singleton
301// dispatches messages by topic
302func (p *asyncProducer) dispatcher() {
303 handlers := make(map[string]chan<- *ProducerMessage)
304 shuttingDown := false
305
306 for msg := range p.input {
307 if msg == nil {
308 Logger.Println("Something tried to send a nil message, it was ignored.")
309 continue
310 }
311
312 if msg.flags&shutdown != 0 {
313 shuttingDown = true
314 p.inFlight.Done()
315 continue
316 } else if msg.retries == 0 {
317 if shuttingDown {
318 // we can't just call returnError here because that decrements the wait group,
319 // which hasn't been incremented yet for this message, and shouldn't be
320 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
321 if p.conf.Producer.Return.Errors {
322 p.errors <- pErr
323 } else {
324 Logger.Println(pErr)
325 }
326 continue
327 }
328 p.inFlight.Add(1)
329 }
330
331 version := 1
332 if p.conf.Version.IsAtLeast(V0_11_0_0) {
333 version = 2
334 } else if msg.Headers != nil {
335 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
336 continue
337 }
338 if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
339 p.returnError(msg, ErrMessageSizeTooLarge)
340 continue
341 }
342
343 handler := handlers[msg.Topic]
344 if handler == nil {
345 handler = p.newTopicProducer(msg.Topic)
346 handlers[msg.Topic] = handler
347 }
348
349 handler <- msg
350 }
351
352 for _, handler := range handlers {
353 close(handler)
354 }
355}
356
357// one per topic
358// partitions messages, then dispatches them by partition
359type topicProducer struct {
360 parent *asyncProducer
361 topic string
362 input <-chan *ProducerMessage
363
364 breaker *breaker.Breaker
365 handlers map[int32]chan<- *ProducerMessage
366 partitioner Partitioner
367}
368
369func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
370 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
371 tp := &topicProducer{
372 parent: p,
373 topic: topic,
374 input: input,
375 breaker: breaker.New(3, 1, 10*time.Second),
376 handlers: make(map[int32]chan<- *ProducerMessage),
377 partitioner: p.conf.Producer.Partitioner(topic),
378 }
379 go withRecover(tp.dispatch)
380 return input
381}
382
383func (tp *topicProducer) dispatch() {
384 for msg := range tp.input {
385 if msg.retries == 0 {
386 if err := tp.partitionMessage(msg); err != nil {
387 tp.parent.returnError(msg, err)
388 continue
389 }
390 }
391 // All messages being retried (sent or not) have already had their retry count updated
392 if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
393 msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
394 }
395
396 handler := tp.handlers[msg.Partition]
397 if handler == nil {
398 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
399 tp.handlers[msg.Partition] = handler
400 }
401
402 handler <- msg
403 }
404
405 for _, handler := range tp.handlers {
406 close(handler)
407 }
408}
409
410func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
411 var partitions []int32
412
413 err := tp.breaker.Run(func() (err error) {
414 var requiresConsistency = false
415 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
416 requiresConsistency = ep.MessageRequiresConsistency(msg)
417 } else {
418 requiresConsistency = tp.partitioner.RequiresConsistency()
419 }
420
421 if requiresConsistency {
422 partitions, err = tp.parent.client.Partitions(msg.Topic)
423 } else {
424 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
425 }
426 return
427 })
428
429 if err != nil {
430 return err
431 }
432
433 numPartitions := int32(len(partitions))
434
435 if numPartitions == 0 {
436 return ErrLeaderNotAvailable
437 }
438
439 choice, err := tp.partitioner.Partition(msg, numPartitions)
440
441 if err != nil {
442 return err
443 } else if choice < 0 || choice >= numPartitions {
444 return ErrInvalidPartition
445 }
446
447 msg.Partition = partitions[choice]
448
449 return nil
450}
451
452// one per partition per topic
453// dispatches messages to the appropriate broker
454// also responsible for maintaining message order during retries
455type partitionProducer struct {
456 parent *asyncProducer
457 topic string
458 partition int32
459 input <-chan *ProducerMessage
460
461 leader *Broker
462 breaker *breaker.Breaker
463 brokerProducer *brokerProducer
464
465 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
466 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
467 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
468 // therefore whether our buffer is complete and safe to flush)
469 highWatermark int
470 retryState []partitionRetryState
471}
472
473type partitionRetryState struct {
474 buf []*ProducerMessage
475 expectChaser bool
476}
477
478func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
479 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
480 pp := &partitionProducer{
481 parent: p,
482 topic: topic,
483 partition: partition,
484 input: input,
485
486 breaker: breaker.New(3, 1, 10*time.Second),
487 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
488 }
489 go withRecover(pp.dispatch)
490 return input
491}
492
493func (pp *partitionProducer) backoff(retries int) {
494 var backoff time.Duration
495 if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
496 maxRetries := pp.parent.conf.Producer.Retry.Max
497 backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
498 } else {
499 backoff = pp.parent.conf.Producer.Retry.Backoff
500 }
501 if backoff > 0 {
502 time.Sleep(backoff)
503 }
504}
505
506func (pp *partitionProducer) dispatch() {
507 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
508 // on the first message
509 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
510 if pp.leader != nil {
511 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
512 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
513 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
514 }
515
516 defer func() {
517 if pp.brokerProducer != nil {
518 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
519 }
520 }()
521
522 for msg := range pp.input {
523 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
524 select {
525 case <-pp.brokerProducer.abandoned:
526 // a message on the abandoned channel means that our current broker selection is out of date
527 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
528 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
529 pp.brokerProducer = nil
530 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
531 default:
532 // producer connection is still open.
533 }
534 }
535
536 if msg.retries > pp.highWatermark {
537 // a new, higher, retry level; handle it and then back off
538 pp.newHighWatermark(msg.retries)
539 pp.backoff(msg.retries)
540 } else if pp.highWatermark > 0 {
541 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
542 if msg.retries < pp.highWatermark {
543 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
544 if msg.flags&fin == fin {
545 pp.retryState[msg.retries].expectChaser = false
546 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
547 } else {
548 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
549 }
550 continue
551 } else if msg.flags&fin == fin {
552 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
553 // meaning this retry level is done and we can go down (at least) one level and flush that
554 pp.retryState[pp.highWatermark].expectChaser = false
555 pp.flushRetryBuffers()
556 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
557 continue
558 }
559 }
560
561 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
562 // without breaking any of our ordering guarantees
563
564 if pp.brokerProducer == nil {
565 if err := pp.updateLeader(); err != nil {
566 pp.parent.returnError(msg, err)
567 pp.backoff(msg.retries)
568 continue
569 }
570 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
571 }
572
573 pp.brokerProducer.input <- msg
574 }
575}
576
577func (pp *partitionProducer) newHighWatermark(hwm int) {
578 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
579 pp.highWatermark = hwm
580
581 // send off a fin so that we know when everything "in between" has made it
582 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
583 pp.retryState[pp.highWatermark].expectChaser = true
584 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
585 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
586
587 // a new HWM means that our current broker selection is out of date
588 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
589 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
590 pp.brokerProducer = nil
591}
592
593func (pp *partitionProducer) flushRetryBuffers() {
594 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
595 for {
596 pp.highWatermark--
597
598 if pp.brokerProducer == nil {
599 if err := pp.updateLeader(); err != nil {
600 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
601 goto flushDone
602 }
603 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
604 }
605
606 for _, msg := range pp.retryState[pp.highWatermark].buf {
607 pp.brokerProducer.input <- msg
608 }
609
610 flushDone:
611 pp.retryState[pp.highWatermark].buf = nil
612 if pp.retryState[pp.highWatermark].expectChaser {
613 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
614 break
615 } else if pp.highWatermark == 0 {
616 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
617 break
618 }
619 }
620}
621
622func (pp *partitionProducer) updateLeader() error {
623 return pp.breaker.Run(func() (err error) {
624 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
625 return err
626 }
627
628 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
629 return err
630 }
631
632 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
633 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
634 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
635
636 return nil
637 })
638}
639
640// one per broker; also constructs an associated flusher
641func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
642 var (
643 input = make(chan *ProducerMessage)
644 bridge = make(chan *produceSet)
645 responses = make(chan *brokerProducerResponse)
646 )
647
648 bp := &brokerProducer{
649 parent: p,
650 broker: broker,
651 input: input,
652 output: bridge,
653 responses: responses,
654 stopchan: make(chan struct{}),
655 buffer: newProduceSet(p),
656 currentRetries: make(map[string]map[int32]error),
657 }
658 go withRecover(bp.run)
659
660 // minimal bridge to make the network response `select`able
661 go withRecover(func() {
662 for set := range bridge {
663 request := set.buildRequest()
664
665 response, err := broker.Produce(request)
666
667 responses <- &brokerProducerResponse{
668 set: set,
669 err: err,
670 res: response,
671 }
672 }
673 close(responses)
674 })
675
676 if p.conf.Producer.Retry.Max <= 0 {
677 bp.abandoned = make(chan struct{})
678 }
679
680 return bp
681}
682
683type brokerProducerResponse struct {
684 set *produceSet
685 err error
686 res *ProduceResponse
687}
688
689// groups messages together into appropriately-sized batches for sending to the broker
690// handles state related to retries etc
691type brokerProducer struct {
692 parent *asyncProducer
693 broker *Broker
694
695 input chan *ProducerMessage
696 output chan<- *produceSet
697 responses <-chan *brokerProducerResponse
698 abandoned chan struct{}
699 stopchan chan struct{}
700
701 buffer *produceSet
702 timer <-chan time.Time
703 timerFired bool
704
705 closing error
706 currentRetries map[string]map[int32]error
707}
708
709func (bp *brokerProducer) run() {
710 var output chan<- *produceSet
711 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
712
713 for {
714 select {
715 case msg, ok := <-bp.input:
716 if !ok {
717 Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
718 bp.shutdown()
719 return
720 }
721
722 if msg == nil {
723 continue
724 }
725
726 if msg.flags&syn == syn {
727 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
728 bp.broker.ID(), msg.Topic, msg.Partition)
729 if bp.currentRetries[msg.Topic] == nil {
730 bp.currentRetries[msg.Topic] = make(map[int32]error)
731 }
732 bp.currentRetries[msg.Topic][msg.Partition] = nil
733 bp.parent.inFlight.Done()
734 continue
735 }
736
737 if reason := bp.needsRetry(msg); reason != nil {
738 bp.parent.retryMessage(msg, reason)
739
740 if bp.closing == nil && msg.flags&fin == fin {
741 // we were retrying this partition but we can start processing again
742 delete(bp.currentRetries[msg.Topic], msg.Partition)
743 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
744 bp.broker.ID(), msg.Topic, msg.Partition)
745 }
746
747 continue
748 }
749
750 if bp.buffer.wouldOverflow(msg) {
751 if err := bp.waitForSpace(msg); err != nil {
752 bp.parent.retryMessage(msg, err)
753 continue
754 }
755 }
756
757 if err := bp.buffer.add(msg); err != nil {
758 bp.parent.returnError(msg, err)
759 continue
760 }
761
762 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
763 bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
764 }
765 case <-bp.timer:
766 bp.timerFired = true
767 case output <- bp.buffer:
768 bp.rollOver()
769 case response, ok := <-bp.responses:
770 if ok {
771 bp.handleResponse(response)
772 }
773 case <-bp.stopchan:
774 Logger.Printf(
775 "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
776 return
777 }
778
779 if bp.timerFired || bp.buffer.readyToFlush() {
780 output = bp.output
781 } else {
782 output = nil
783 }
784 }
785}
786
787func (bp *brokerProducer) shutdown() {
788 for !bp.buffer.empty() {
789 select {
790 case response := <-bp.responses:
791 bp.handleResponse(response)
792 case bp.output <- bp.buffer:
793 bp.rollOver()
794 }
795 }
796 close(bp.output)
797 for response := range bp.responses {
798 bp.handleResponse(response)
799 }
800 close(bp.stopchan)
801 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
802}
803
804func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
805 if bp.closing != nil {
806 return bp.closing
807 }
808
809 return bp.currentRetries[msg.Topic][msg.Partition]
810}
811
812func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
813 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
814
815 for {
816 select {
817 case response := <-bp.responses:
818 bp.handleResponse(response)
819 // handling a response can change our state, so re-check some things
820 if reason := bp.needsRetry(msg); reason != nil {
821 return reason
822 } else if !bp.buffer.wouldOverflow(msg) {
823 return nil
824 }
825 case bp.output <- bp.buffer:
826 bp.rollOver()
827 return nil
828 }
829 }
830}
831
832func (bp *brokerProducer) rollOver() {
833 bp.timer = nil
834 bp.timerFired = false
835 bp.buffer = newProduceSet(bp.parent)
836}
837
838func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
839 if response.err != nil {
840 bp.handleError(response.set, response.err)
841 } else {
842 bp.handleSuccess(response.set, response.res)
843 }
844
845 if bp.buffer.empty() {
846 bp.rollOver() // this can happen if the response invalidated our buffer
847 }
848}
849
850func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
851 // we iterate through the blocks in the request set, not the response, so that we notice
852 // if the response is missing a block completely
853 var retryTopics []string
854 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
855 if response == nil {
856 // this only happens when RequiredAcks is NoResponse, so we have to assume success
857 bp.parent.returnSuccesses(pSet.msgs)
858 return
859 }
860
861 block := response.GetBlock(topic, partition)
862 if block == nil {
863 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
864 return
865 }
866
867 switch block.Err {
868 // Success
869 case ErrNoError:
870 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
871 for _, msg := range pSet.msgs {
872 msg.Timestamp = block.Timestamp
873 }
874 }
875 for i, msg := range pSet.msgs {
876 msg.Offset = block.Offset + int64(i)
877 }
878 bp.parent.returnSuccesses(pSet.msgs)
879 // Duplicate
880 case ErrDuplicateSequenceNumber:
881 bp.parent.returnSuccesses(pSet.msgs)
882 // Retriable errors
883 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
884 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
885 if bp.parent.conf.Producer.Retry.Max <= 0 {
886 bp.parent.abandonBrokerConnection(bp.broker)
887 bp.parent.returnErrors(pSet.msgs, block.Err)
888 } else {
889 retryTopics = append(retryTopics, topic)
890 }
891 // Other non-retriable errors
892 default:
893 if bp.parent.conf.Producer.Retry.Max <= 0 {
894 bp.parent.abandonBrokerConnection(bp.broker)
895 }
896 bp.parent.returnErrors(pSet.msgs, block.Err)
897 }
898 })
899
900 if len(retryTopics) > 0 {
901 if bp.parent.conf.Producer.Idempotent {
902 err := bp.parent.client.RefreshMetadata(retryTopics...)
903 if err != nil {
904 Logger.Printf("Failed refreshing metadata because of %v\n", err)
905 }
906 }
907
908 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
909 block := response.GetBlock(topic, partition)
910 if block == nil {
911 // handled in the previous "eachPartition" loop
912 return
913 }
914
915 switch block.Err {
916 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
917 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
918 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
919 bp.broker.ID(), topic, partition, block.Err)
920 if bp.currentRetries[topic] == nil {
921 bp.currentRetries[topic] = make(map[int32]error)
922 }
923 bp.currentRetries[topic][partition] = block.Err
924 if bp.parent.conf.Producer.Idempotent {
925 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
926 } else {
927 bp.parent.retryMessages(pSet.msgs, block.Err)
928 }
929 // dropping the following messages has the side effect of incrementing their retry count
930 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
931 }
932 })
933 }
934}
935
936func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
937 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
938 produceSet := newProduceSet(p)
939 produceSet.msgs[topic] = make(map[int32]*partitionSet)
940 produceSet.msgs[topic][partition] = pSet
941 produceSet.bufferBytes += pSet.bufferBytes
942 produceSet.bufferCount += len(pSet.msgs)
943 for _, msg := range pSet.msgs {
944 if msg.retries >= p.conf.Producer.Retry.Max {
945 p.returnError(msg, kerr)
946 return
947 }
948 msg.retries++
949 }
950
951 // it's expected that a metadata refresh has been requested prior to calling retryBatch
952 leader, err := p.client.Leader(topic, partition)
953 if err != nil {
954 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
955 for _, msg := range pSet.msgs {
956 p.returnError(msg, kerr)
957 }
958 return
959 }
960 bp := p.getBrokerProducer(leader)
961 bp.output <- produceSet
962}
963
964func (bp *brokerProducer) handleError(sent *produceSet, err error) {
965 switch err.(type) {
966 case PacketEncodingError:
967 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
968 bp.parent.returnErrors(pSet.msgs, err)
969 })
970 default:
971 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
972 bp.parent.abandonBrokerConnection(bp.broker)
973 _ = bp.broker.Close()
974 bp.closing = err
975 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
976 bp.parent.retryMessages(pSet.msgs, err)
977 })
978 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
979 bp.parent.retryMessages(pSet.msgs, err)
980 })
981 bp.rollOver()
982 }
983}
984
985// singleton
986// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
987// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
988func (p *asyncProducer) retryHandler() {
989 var msg *ProducerMessage
990 buf := queue.New()
991
992 for {
993 if buf.Length() == 0 {
994 msg = <-p.retries
995 } else {
996 select {
997 case msg = <-p.retries:
998 case p.input <- buf.Peek().(*ProducerMessage):
999 buf.Remove()
1000 continue
1001 }
1002 }
1003
1004 if msg == nil {
1005 return
1006 }
1007
1008 buf.Add(msg)
1009 }
1010}
1011
1012// utility functions
1013
1014func (p *asyncProducer) shutdown() {
1015 Logger.Println("Producer shutting down.")
1016 p.inFlight.Add(1)
1017 p.input <- &ProducerMessage{flags: shutdown}
1018
1019 p.inFlight.Wait()
1020
1021 err := p.client.Close()
1022 if err != nil {
1023 Logger.Println("producer/shutdown failed to close the embedded client:", err)
1024 }
1025
1026 close(p.input)
1027 close(p.retries)
1028 close(p.errors)
1029 close(p.successes)
1030}
1031
1032func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
1033 msg.clear()
1034 pErr := &ProducerError{Msg: msg, Err: err}
1035 if p.conf.Producer.Return.Errors {
1036 p.errors <- pErr
1037 } else {
1038 Logger.Println(pErr)
1039 }
1040 p.inFlight.Done()
1041}
1042
1043func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1044 for _, msg := range batch {
1045 p.returnError(msg, err)
1046 }
1047}
1048
1049func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1050 for _, msg := range batch {
1051 if p.conf.Producer.Return.Successes {
1052 msg.clear()
1053 p.successes <- msg
1054 }
1055 p.inFlight.Done()
1056 }
1057}
1058
1059func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1060 if msg.retries >= p.conf.Producer.Retry.Max {
1061 p.returnError(msg, err)
1062 } else {
1063 msg.retries++
1064 p.retries <- msg
1065 }
1066}
1067
1068func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1069 for _, msg := range batch {
1070 p.retryMessage(msg, err)
1071 }
1072}
1073
1074func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1075 p.brokerLock.Lock()
1076 defer p.brokerLock.Unlock()
1077
1078 bp := p.brokers[broker]
1079
1080 if bp == nil {
1081 bp = p.newBrokerProducer(broker)
1082 p.brokers[broker] = bp
1083 p.brokerRefs[bp] = 0
1084 }
1085
1086 p.brokerRefs[bp]++
1087
1088 return bp
1089}
1090
1091func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1092 p.brokerLock.Lock()
1093 defer p.brokerLock.Unlock()
1094
1095 p.brokerRefs[bp]--
1096 if p.brokerRefs[bp] == 0 {
1097 close(bp.input)
1098 delete(p.brokerRefs, bp)
1099
1100 if p.brokers[broker] == bp {
1101 delete(p.brokers, broker)
1102 }
1103 }
1104}
1105
1106func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1107 p.brokerLock.Lock()
1108 defer p.brokerLock.Unlock()
1109
1110 bc, ok := p.brokers[broker]
1111 if ok && bc.abandoned != nil {
1112 close(bc.abandoned)
1113 }
1114
1115 delete(p.brokers, broker)
1116}