blob: 5174a35b7865d1981e15ff9a9ed4b35b3ee3b127 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/eapache/go-resiliency/breaker"
10 "github.com/eapache/queue"
11)
12
13// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
14// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
15// and parses responses for errors. You must read from the Errors() channel or the
16// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
17// leaks: it will not be garbage-collected automatically when it passes out of
18// scope.
19type AsyncProducer interface {
20
21 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
22 // when both the Errors and Successes channels have been closed. When calling
23 // AsyncClose, you *must* continue to read from those channels in order to
24 // drain the results of any messages in flight.
25 AsyncClose()
26
27 // Close shuts down the producer and waits for any buffered messages to be
28 // flushed. You must call this function before a producer object passes out of
29 // scope, as it may otherwise leak memory. You must call this before calling
30 // Close on the underlying client.
31 Close() error
32
33 // Input is the input channel for the user to write messages to that they
34 // wish to send.
35 Input() chan<- *ProducerMessage
36
37 // Successes is the success output channel back to the user when Return.Successes is
38 // enabled. If Return.Successes is true, you MUST read from this channel or the
39 // Producer will deadlock. It is suggested that you send and read messages
40 // together in a single select statement.
41 Successes() <-chan *ProducerMessage
42
43 // Errors is the error output channel back to the user. You MUST read from this
44 // channel or the Producer will deadlock when the channel is full. Alternatively,
45 // you can set Producer.Return.Errors in your config to false, which prevents
46 // errors to be returned.
47 Errors() <-chan *ProducerError
48}
49
50// transactionManager keeps the state necessary to ensure idempotent production
51type transactionManager struct {
52 producerID int64
53 producerEpoch int16
54 sequenceNumbers map[string]int32
55 mutex sync.Mutex
56}
57
58const (
59 noProducerID = -1
60 noProducerEpoch = -1
61)
62
63func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
64 key := fmt.Sprintf("%s-%d", topic, partition)
65 t.mutex.Lock()
66 defer t.mutex.Unlock()
67 sequence := t.sequenceNumbers[key]
68 t.sequenceNumbers[key] = sequence + 1
69 return sequence
70}
71
72func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
73 txnmgr := &transactionManager{
74 producerID: noProducerID,
75 producerEpoch: noProducerEpoch,
76 }
77
78 if conf.Producer.Idempotent {
79 initProducerIDResponse, err := client.InitProducerID()
80 if err != nil {
81 return nil, err
82 }
83 txnmgr.producerID = initProducerIDResponse.ProducerID
84 txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
85 txnmgr.sequenceNumbers = make(map[string]int32)
86 txnmgr.mutex = sync.Mutex{}
87
88 Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
89 }
90
91 return txnmgr, nil
92}
93
94type asyncProducer struct {
95 client Client
96 conf *Config
97 ownClient bool
98
99 errors chan *ProducerError
100 input, successes, retries chan *ProducerMessage
101 inFlight sync.WaitGroup
102
103 brokers map[*Broker]*brokerProducer
104 brokerRefs map[*brokerProducer]int
105 brokerLock sync.Mutex
106
107 txnmgr *transactionManager
108}
109
110// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
111func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
112 client, err := NewClient(addrs, conf)
113 if err != nil {
114 return nil, err
115 }
116
117 p, err := NewAsyncProducerFromClient(client)
118 if err != nil {
119 return nil, err
120 }
121 p.(*asyncProducer).ownClient = true
122 return p, nil
123}
124
125// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
126// necessary to call Close() on the underlying client when shutting down this producer.
127func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
128 // Check that we are not dealing with a closed Client before processing any other arguments
129 if client.Closed() {
130 return nil, ErrClosedClient
131 }
132
133 txnmgr, err := newTransactionManager(client.Config(), client)
134 if err != nil {
135 return nil, err
136 }
137
138 p := &asyncProducer{
139 client: client,
140 conf: client.Config(),
141 errors: make(chan *ProducerError),
142 input: make(chan *ProducerMessage),
143 successes: make(chan *ProducerMessage),
144 retries: make(chan *ProducerMessage),
145 brokers: make(map[*Broker]*brokerProducer),
146 brokerRefs: make(map[*brokerProducer]int),
147 txnmgr: txnmgr,
148 }
149
150 // launch our singleton dispatchers
151 go withRecover(p.dispatcher)
152 go withRecover(p.retryHandler)
153
154 return p, nil
155}
156
157type flagSet int8
158
159const (
160 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
161 fin // final message from partitionProducer to brokerProducer and back
162 shutdown // start the shutdown process
163)
164
165// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
166type ProducerMessage struct {
167 Topic string // The Kafka topic for this message.
168 // The partitioning key for this message. Pre-existing Encoders include
169 // StringEncoder and ByteEncoder.
170 Key Encoder
171 // The actual message to store in Kafka. Pre-existing Encoders include
172 // StringEncoder and ByteEncoder.
173 Value Encoder
174
175 // The headers are key-value pairs that are transparently passed
176 // by Kafka between producers and consumers.
177 Headers []RecordHeader
178
179 // This field is used to hold arbitrary data you wish to include so it
180 // will be available when receiving on the Successes and Errors channels.
181 // Sarama completely ignores this field and is only to be used for
182 // pass-through data.
183 Metadata interface{}
184
185 // Below this point are filled in by the producer as the message is processed
186
187 // Offset is the offset of the message stored on the broker. This is only
188 // guaranteed to be defined if the message was successfully delivered and
189 // RequiredAcks is not NoResponse.
190 Offset int64
191 // Partition is the partition that the message was sent to. This is only
192 // guaranteed to be defined if the message was successfully delivered.
193 Partition int32
194 // Timestamp is the timestamp assigned to the message by the broker. This
195 // is only guaranteed to be defined if the message was successfully
196 // delivered, RequiredAcks is not NoResponse, and the Kafka broker is at
197 // least version 0.10.0.
198 Timestamp time.Time
199
200 retries int
201 flags flagSet
202 expectation chan *ProducerError
203 sequenceNumber int32
204}
205
206const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
207
208func (m *ProducerMessage) byteSize(version int) int {
209 var size int
210 if version >= 2 {
211 size = maximumRecordOverhead
212 for _, h := range m.Headers {
213 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
214 }
215 } else {
216 size = producerMessageOverhead
217 }
218 if m.Key != nil {
219 size += m.Key.Length()
220 }
221 if m.Value != nil {
222 size += m.Value.Length()
223 }
224 return size
225}
226
227func (m *ProducerMessage) clear() {
228 m.flags = 0
229 m.retries = 0
230}
231
232// ProducerError is the type of error generated when the producer fails to deliver a message.
233// It contains the original ProducerMessage as well as the actual error value.
234type ProducerError struct {
235 Msg *ProducerMessage
236 Err error
237}
238
239func (pe ProducerError) Error() string {
240 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
241}
242
243// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
244// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
245// when closing a producer.
246type ProducerErrors []*ProducerError
247
248func (pe ProducerErrors) Error() string {
249 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
250}
251
252func (p *asyncProducer) Errors() <-chan *ProducerError {
253 return p.errors
254}
255
256func (p *asyncProducer) Successes() <-chan *ProducerMessage {
257 return p.successes
258}
259
260func (p *asyncProducer) Input() chan<- *ProducerMessage {
261 return p.input
262}
263
264func (p *asyncProducer) Close() error {
265 p.AsyncClose()
266
267 if p.conf.Producer.Return.Successes {
268 go withRecover(func() {
269 for range p.successes {
270 }
271 })
272 }
273
274 var errors ProducerErrors
275 if p.conf.Producer.Return.Errors {
276 for event := range p.errors {
277 errors = append(errors, event)
278 }
279 } else {
280 <-p.errors
281 }
282
283 if len(errors) > 0 {
284 return errors
285 }
286 return nil
287}
288
289func (p *asyncProducer) AsyncClose() {
290 go withRecover(p.shutdown)
291}
292
293// singleton
294// dispatches messages by topic
295func (p *asyncProducer) dispatcher() {
296 handlers := make(map[string]chan<- *ProducerMessage)
297 shuttingDown := false
298
299 for msg := range p.input {
300 if msg == nil {
301 Logger.Println("Something tried to send a nil message, it was ignored.")
302 continue
303 }
304
305 if msg.flags&shutdown != 0 {
306 shuttingDown = true
307 p.inFlight.Done()
308 continue
309 } else if msg.retries == 0 {
310 if shuttingDown {
311 // we can't just call returnError here because that decrements the wait group,
312 // which hasn't been incremented yet for this message, and shouldn't be
313 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
314 if p.conf.Producer.Return.Errors {
315 p.errors <- pErr
316 } else {
317 Logger.Println(pErr)
318 }
319 continue
320 }
321 p.inFlight.Add(1)
322 }
323
324 version := 1
325 if p.conf.Version.IsAtLeast(V0_11_0_0) {
326 version = 2
327 } else if msg.Headers != nil {
328 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
329 continue
330 }
331 if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
332 p.returnError(msg, ErrMessageSizeTooLarge)
333 continue
334 }
335
336 handler := handlers[msg.Topic]
337 if handler == nil {
338 handler = p.newTopicProducer(msg.Topic)
339 handlers[msg.Topic] = handler
340 }
341
342 handler <- msg
343 }
344
345 for _, handler := range handlers {
346 close(handler)
347 }
348}
349
350// one per topic
351// partitions messages, then dispatches them by partition
352type topicProducer struct {
353 parent *asyncProducer
354 topic string
355 input <-chan *ProducerMessage
356
357 breaker *breaker.Breaker
358 handlers map[int32]chan<- *ProducerMessage
359 partitioner Partitioner
360}
361
362func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
363 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
364 tp := &topicProducer{
365 parent: p,
366 topic: topic,
367 input: input,
368 breaker: breaker.New(3, 1, 10*time.Second),
369 handlers: make(map[int32]chan<- *ProducerMessage),
370 partitioner: p.conf.Producer.Partitioner(topic),
371 }
372 go withRecover(tp.dispatch)
373 return input
374}
375
376func (tp *topicProducer) dispatch() {
377 for msg := range tp.input {
378 if msg.retries == 0 {
379 if err := tp.partitionMessage(msg); err != nil {
380 tp.parent.returnError(msg, err)
381 continue
382 }
383 }
384 // All messages being retried (sent or not) have already had their retry count updated
385 if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
386 msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
387 }
388
389 handler := tp.handlers[msg.Partition]
390 if handler == nil {
391 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
392 tp.handlers[msg.Partition] = handler
393 }
394
395 handler <- msg
396 }
397
398 for _, handler := range tp.handlers {
399 close(handler)
400 }
401}
402
403func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
404 var partitions []int32
405
406 err := tp.breaker.Run(func() (err error) {
407 var requiresConsistency = false
408 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
409 requiresConsistency = ep.MessageRequiresConsistency(msg)
410 } else {
411 requiresConsistency = tp.partitioner.RequiresConsistency()
412 }
413
414 if requiresConsistency {
415 partitions, err = tp.parent.client.Partitions(msg.Topic)
416 } else {
417 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
418 }
419 return
420 })
421
422 if err != nil {
423 return err
424 }
425
426 numPartitions := int32(len(partitions))
427
428 if numPartitions == 0 {
429 return ErrLeaderNotAvailable
430 }
431
432 choice, err := tp.partitioner.Partition(msg, numPartitions)
433
434 if err != nil {
435 return err
436 } else if choice < 0 || choice >= numPartitions {
437 return ErrInvalidPartition
438 }
439
440 msg.Partition = partitions[choice]
441
442 return nil
443}
444
445// one per partition per topic
446// dispatches messages to the appropriate broker
447// also responsible for maintaining message order during retries
448type partitionProducer struct {
449 parent *asyncProducer
450 topic string
451 partition int32
452 input <-chan *ProducerMessage
453
454 leader *Broker
455 breaker *breaker.Breaker
456 brokerProducer *brokerProducer
457
458 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
459 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
460 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
461 // therefore whether our buffer is complete and safe to flush)
462 highWatermark int
463 retryState []partitionRetryState
464}
465
466type partitionRetryState struct {
467 buf []*ProducerMessage
468 expectChaser bool
469}
470
471func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
472 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
473 pp := &partitionProducer{
474 parent: p,
475 topic: topic,
476 partition: partition,
477 input: input,
478
479 breaker: breaker.New(3, 1, 10*time.Second),
480 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
481 }
482 go withRecover(pp.dispatch)
483 return input
484}
485
William Kurkiandaa6bb22019-03-07 12:26:28 -0500486func (pp *partitionProducer) backoff(retries int) {
487 var backoff time.Duration
488 if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
489 maxRetries := pp.parent.conf.Producer.Retry.Max
490 backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
491 } else {
492 backoff = pp.parent.conf.Producer.Retry.Backoff
493 }
494 if backoff > 0 {
495 time.Sleep(backoff)
496 }
497}
498
khenaidooac637102019-01-14 15:44:34 -0500499func (pp *partitionProducer) dispatch() {
500 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
501 // on the first message
502 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
503 if pp.leader != nil {
504 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
505 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
506 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
507 }
508
William Kurkiandaa6bb22019-03-07 12:26:28 -0500509 defer func() {
510 if pp.brokerProducer != nil {
511 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
512 }
513 }()
514
khenaidooac637102019-01-14 15:44:34 -0500515 for msg := range pp.input {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500516
517 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
518 select {
519 case <-pp.brokerProducer.abandoned:
520 // a message on the abandoned channel means that our current broker selection is out of date
521 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
522 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
523 pp.brokerProducer = nil
524 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
525 default:
526 // producer connection is still open.
527 }
528 }
529
khenaidooac637102019-01-14 15:44:34 -0500530 if msg.retries > pp.highWatermark {
531 // a new, higher, retry level; handle it and then back off
532 pp.newHighWatermark(msg.retries)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500533 pp.backoff(msg.retries)
khenaidooac637102019-01-14 15:44:34 -0500534 } else if pp.highWatermark > 0 {
535 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
536 if msg.retries < pp.highWatermark {
537 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
538 if msg.flags&fin == fin {
539 pp.retryState[msg.retries].expectChaser = false
540 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
541 } else {
542 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
543 }
544 continue
545 } else if msg.flags&fin == fin {
546 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
547 // meaning this retry level is done and we can go down (at least) one level and flush that
548 pp.retryState[pp.highWatermark].expectChaser = false
549 pp.flushRetryBuffers()
550 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
551 continue
552 }
553 }
554
555 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
556 // without breaking any of our ordering guarantees
557
558 if pp.brokerProducer == nil {
559 if err := pp.updateLeader(); err != nil {
560 pp.parent.returnError(msg, err)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500561 pp.backoff(msg.retries)
khenaidooac637102019-01-14 15:44:34 -0500562 continue
563 }
564 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
565 }
566
567 pp.brokerProducer.input <- msg
568 }
khenaidooac637102019-01-14 15:44:34 -0500569}
570
571func (pp *partitionProducer) newHighWatermark(hwm int) {
572 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
573 pp.highWatermark = hwm
574
575 // send off a fin so that we know when everything "in between" has made it
576 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
577 pp.retryState[pp.highWatermark].expectChaser = true
578 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
579 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
580
581 // a new HWM means that our current broker selection is out of date
582 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
583 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
584 pp.brokerProducer = nil
585}
586
587func (pp *partitionProducer) flushRetryBuffers() {
588 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
589 for {
590 pp.highWatermark--
591
592 if pp.brokerProducer == nil {
593 if err := pp.updateLeader(); err != nil {
594 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
595 goto flushDone
596 }
597 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
598 }
599
600 for _, msg := range pp.retryState[pp.highWatermark].buf {
601 pp.brokerProducer.input <- msg
602 }
603
604 flushDone:
605 pp.retryState[pp.highWatermark].buf = nil
606 if pp.retryState[pp.highWatermark].expectChaser {
607 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
608 break
609 } else if pp.highWatermark == 0 {
610 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
611 break
612 }
613 }
614}
615
616func (pp *partitionProducer) updateLeader() error {
617 return pp.breaker.Run(func() (err error) {
618 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
619 return err
620 }
621
622 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
623 return err
624 }
625
626 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
627 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
628 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
629
630 return nil
631 })
632}
633
634// one per broker; also constructs an associated flusher
635func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
636 var (
637 input = make(chan *ProducerMessage)
638 bridge = make(chan *produceSet)
639 responses = make(chan *brokerProducerResponse)
640 )
641
642 bp := &brokerProducer{
643 parent: p,
644 broker: broker,
645 input: input,
646 output: bridge,
647 responses: responses,
648 buffer: newProduceSet(p),
649 currentRetries: make(map[string]map[int32]error),
650 }
651 go withRecover(bp.run)
652
653 // minimal bridge to make the network response `select`able
654 go withRecover(func() {
655 for set := range bridge {
656 request := set.buildRequest()
657
658 response, err := broker.Produce(request)
659
660 responses <- &brokerProducerResponse{
661 set: set,
662 err: err,
663 res: response,
664 }
665 }
666 close(responses)
667 })
668
William Kurkiandaa6bb22019-03-07 12:26:28 -0500669 if p.conf.Producer.Retry.Max <= 0 {
670 bp.abandoned = make(chan struct{})
671 }
672
khenaidooac637102019-01-14 15:44:34 -0500673 return bp
674}
675
676type brokerProducerResponse struct {
677 set *produceSet
678 err error
679 res *ProduceResponse
680}
681
682// groups messages together into appropriately-sized batches for sending to the broker
683// handles state related to retries etc
684type brokerProducer struct {
685 parent *asyncProducer
686 broker *Broker
687
688 input chan *ProducerMessage
689 output chan<- *produceSet
690 responses <-chan *brokerProducerResponse
William Kurkiandaa6bb22019-03-07 12:26:28 -0500691 abandoned chan struct{}
khenaidooac637102019-01-14 15:44:34 -0500692
693 buffer *produceSet
694 timer <-chan time.Time
695 timerFired bool
696
697 closing error
698 currentRetries map[string]map[int32]error
699}
700
701func (bp *brokerProducer) run() {
702 var output chan<- *produceSet
703 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
704
705 for {
706 select {
707 case msg := <-bp.input:
708 if msg == nil {
709 bp.shutdown()
710 return
711 }
712
713 if msg.flags&syn == syn {
714 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
715 bp.broker.ID(), msg.Topic, msg.Partition)
716 if bp.currentRetries[msg.Topic] == nil {
717 bp.currentRetries[msg.Topic] = make(map[int32]error)
718 }
719 bp.currentRetries[msg.Topic][msg.Partition] = nil
720 bp.parent.inFlight.Done()
721 continue
722 }
723
724 if reason := bp.needsRetry(msg); reason != nil {
725 bp.parent.retryMessage(msg, reason)
726
727 if bp.closing == nil && msg.flags&fin == fin {
728 // we were retrying this partition but we can start processing again
729 delete(bp.currentRetries[msg.Topic], msg.Partition)
730 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
731 bp.broker.ID(), msg.Topic, msg.Partition)
732 }
733
734 continue
735 }
736
737 if bp.buffer.wouldOverflow(msg) {
738 if err := bp.waitForSpace(msg); err != nil {
739 bp.parent.retryMessage(msg, err)
740 continue
741 }
742 }
743
744 if err := bp.buffer.add(msg); err != nil {
745 bp.parent.returnError(msg, err)
746 continue
747 }
748
749 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
750 bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
751 }
752 case <-bp.timer:
753 bp.timerFired = true
754 case output <- bp.buffer:
755 bp.rollOver()
756 case response := <-bp.responses:
757 bp.handleResponse(response)
758 }
759
760 if bp.timerFired || bp.buffer.readyToFlush() {
761 output = bp.output
762 } else {
763 output = nil
764 }
765 }
766}
767
768func (bp *brokerProducer) shutdown() {
769 for !bp.buffer.empty() {
770 select {
771 case response := <-bp.responses:
772 bp.handleResponse(response)
773 case bp.output <- bp.buffer:
774 bp.rollOver()
775 }
776 }
777 close(bp.output)
778 for response := range bp.responses {
779 bp.handleResponse(response)
780 }
781
782 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
783}
784
785func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
786 if bp.closing != nil {
787 return bp.closing
788 }
789
790 return bp.currentRetries[msg.Topic][msg.Partition]
791}
792
793func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
794 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
795
796 for {
797 select {
798 case response := <-bp.responses:
799 bp.handleResponse(response)
800 // handling a response can change our state, so re-check some things
801 if reason := bp.needsRetry(msg); reason != nil {
802 return reason
803 } else if !bp.buffer.wouldOverflow(msg) {
804 return nil
805 }
806 case bp.output <- bp.buffer:
807 bp.rollOver()
808 return nil
809 }
810 }
811}
812
813func (bp *brokerProducer) rollOver() {
814 bp.timer = nil
815 bp.timerFired = false
816 bp.buffer = newProduceSet(bp.parent)
817}
818
819func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
820 if response.err != nil {
821 bp.handleError(response.set, response.err)
822 } else {
823 bp.handleSuccess(response.set, response.res)
824 }
825
826 if bp.buffer.empty() {
827 bp.rollOver() // this can happen if the response invalidated our buffer
828 }
829}
830
831func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
832 // we iterate through the blocks in the request set, not the response, so that we notice
833 // if the response is missing a block completely
834 var retryTopics []string
835 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
836 if response == nil {
837 // this only happens when RequiredAcks is NoResponse, so we have to assume success
838 bp.parent.returnSuccesses(pSet.msgs)
839 return
840 }
841
842 block := response.GetBlock(topic, partition)
843 if block == nil {
844 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
845 return
846 }
847
848 switch block.Err {
849 // Success
850 case ErrNoError:
851 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
852 for _, msg := range pSet.msgs {
853 msg.Timestamp = block.Timestamp
854 }
855 }
856 for i, msg := range pSet.msgs {
857 msg.Offset = block.Offset + int64(i)
858 }
859 bp.parent.returnSuccesses(pSet.msgs)
860 // Duplicate
861 case ErrDuplicateSequenceNumber:
862 bp.parent.returnSuccesses(pSet.msgs)
863 // Retriable errors
864 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
865 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500866 if bp.parent.conf.Producer.Retry.Max <= 0 {
867 bp.parent.abandonBrokerConnection(bp.broker)
868 bp.parent.returnErrors(pSet.msgs, block.Err)
869 } else {
870 retryTopics = append(retryTopics, topic)
871 }
khenaidooac637102019-01-14 15:44:34 -0500872 // Other non-retriable errors
873 default:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500874 if bp.parent.conf.Producer.Retry.Max <= 0 {
875 bp.parent.abandonBrokerConnection(bp.broker)
876 }
khenaidooac637102019-01-14 15:44:34 -0500877 bp.parent.returnErrors(pSet.msgs, block.Err)
878 }
879 })
880
881 if len(retryTopics) > 0 {
882 if bp.parent.conf.Producer.Idempotent {
883 err := bp.parent.client.RefreshMetadata(retryTopics...)
884 if err != nil {
885 Logger.Printf("Failed refreshing metadata because of %v\n", err)
886 }
887 }
888
889 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
890 block := response.GetBlock(topic, partition)
891 if block == nil {
892 // handled in the previous "eachPartition" loop
893 return
894 }
895
896 switch block.Err {
897 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
898 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
899 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
900 bp.broker.ID(), topic, partition, block.Err)
901 if bp.currentRetries[topic] == nil {
902 bp.currentRetries[topic] = make(map[int32]error)
903 }
904 bp.currentRetries[topic][partition] = block.Err
905 if bp.parent.conf.Producer.Idempotent {
906 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
907 } else {
908 bp.parent.retryMessages(pSet.msgs, block.Err)
909 }
910 // dropping the following messages has the side effect of incrementing their retry count
911 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
912 }
913 })
914 }
915}
916
917func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
918 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
919 produceSet := newProduceSet(p)
920 produceSet.msgs[topic] = make(map[int32]*partitionSet)
921 produceSet.msgs[topic][partition] = pSet
922 produceSet.bufferBytes += pSet.bufferBytes
923 produceSet.bufferCount += len(pSet.msgs)
924 for _, msg := range pSet.msgs {
925 if msg.retries >= p.conf.Producer.Retry.Max {
926 p.returnError(msg, kerr)
927 return
928 }
929 msg.retries++
930 }
931
932 // it's expected that a metadata refresh has been requested prior to calling retryBatch
933 leader, err := p.client.Leader(topic, partition)
934 if err != nil {
935 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
936 for _, msg := range pSet.msgs {
937 p.returnError(msg, kerr)
938 }
939 return
940 }
941 bp := p.getBrokerProducer(leader)
942 bp.output <- produceSet
943}
944
945func (bp *brokerProducer) handleError(sent *produceSet, err error) {
946 switch err.(type) {
947 case PacketEncodingError:
948 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
949 bp.parent.returnErrors(pSet.msgs, err)
950 })
951 default:
952 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
953 bp.parent.abandonBrokerConnection(bp.broker)
954 _ = bp.broker.Close()
955 bp.closing = err
956 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
957 bp.parent.retryMessages(pSet.msgs, err)
958 })
959 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
960 bp.parent.retryMessages(pSet.msgs, err)
961 })
962 bp.rollOver()
963 }
964}
965
966// singleton
967// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
968// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
969func (p *asyncProducer) retryHandler() {
970 var msg *ProducerMessage
971 buf := queue.New()
972
973 for {
974 if buf.Length() == 0 {
975 msg = <-p.retries
976 } else {
977 select {
978 case msg = <-p.retries:
979 case p.input <- buf.Peek().(*ProducerMessage):
980 buf.Remove()
981 continue
982 }
983 }
984
985 if msg == nil {
986 return
987 }
988
989 buf.Add(msg)
990 }
991}
992
993// utility functions
994
995func (p *asyncProducer) shutdown() {
996 Logger.Println("Producer shutting down.")
997 p.inFlight.Add(1)
998 p.input <- &ProducerMessage{flags: shutdown}
999
1000 p.inFlight.Wait()
1001
1002 if p.ownClient {
1003 err := p.client.Close()
1004 if err != nil {
1005 Logger.Println("producer/shutdown failed to close the embedded client:", err)
1006 }
1007 }
1008
1009 close(p.input)
1010 close(p.retries)
1011 close(p.errors)
1012 close(p.successes)
1013}
1014
1015func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
1016 msg.clear()
1017 pErr := &ProducerError{Msg: msg, Err: err}
1018 if p.conf.Producer.Return.Errors {
1019 p.errors <- pErr
1020 } else {
1021 Logger.Println(pErr)
1022 }
1023 p.inFlight.Done()
1024}
1025
1026func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1027 for _, msg := range batch {
1028 p.returnError(msg, err)
1029 }
1030}
1031
1032func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1033 for _, msg := range batch {
1034 if p.conf.Producer.Return.Successes {
1035 msg.clear()
1036 p.successes <- msg
1037 }
1038 p.inFlight.Done()
1039 }
1040}
1041
1042func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1043 if msg.retries >= p.conf.Producer.Retry.Max {
1044 p.returnError(msg, err)
1045 } else {
1046 msg.retries++
1047 p.retries <- msg
1048 }
1049}
1050
1051func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1052 for _, msg := range batch {
1053 p.retryMessage(msg, err)
1054 }
1055}
1056
1057func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1058 p.brokerLock.Lock()
1059 defer p.brokerLock.Unlock()
1060
1061 bp := p.brokers[broker]
1062
1063 if bp == nil {
1064 bp = p.newBrokerProducer(broker)
1065 p.brokers[broker] = bp
1066 p.brokerRefs[bp] = 0
1067 }
1068
1069 p.brokerRefs[bp]++
1070
1071 return bp
1072}
1073
1074func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1075 p.brokerLock.Lock()
1076 defer p.brokerLock.Unlock()
1077
1078 p.brokerRefs[bp]--
1079 if p.brokerRefs[bp] == 0 {
1080 close(bp.input)
1081 delete(p.brokerRefs, bp)
1082
1083 if p.brokers[broker] == bp {
1084 delete(p.brokers, broker)
1085 }
1086 }
1087}
1088
1089func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1090 p.brokerLock.Lock()
1091 defer p.brokerLock.Unlock()
1092
William Kurkiandaa6bb22019-03-07 12:26:28 -05001093 bc, ok := p.brokers[broker]
1094 if ok && bc.abandoned != nil {
1095 close(bc.abandoned)
1096 }
1097
khenaidooac637102019-01-14 15:44:34 -05001098 delete(p.brokers, broker)
1099}