blob: 11e0849613e2d7a59eb98f5a87d077cd8a6a7dd0 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/eapache/go-resiliency/breaker"
10 "github.com/eapache/queue"
11)
12
13// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
14// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
15// and parses responses for errors. You must read from the Errors() channel or the
16// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
17// leaks: it will not be garbage-collected automatically when it passes out of
18// scope.
19type AsyncProducer interface {
20
21 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
22 // when both the Errors and Successes channels have been closed. When calling
23 // AsyncClose, you *must* continue to read from those channels in order to
24 // drain the results of any messages in flight.
25 AsyncClose()
26
27 // Close shuts down the producer and waits for any buffered messages to be
28 // flushed. You must call this function before a producer object passes out of
29 // scope, as it may otherwise leak memory. You must call this before calling
30 // Close on the underlying client.
31 Close() error
32
33 // Input is the input channel for the user to write messages to that they
34 // wish to send.
35 Input() chan<- *ProducerMessage
36
37 // Successes is the success output channel back to the user when Return.Successes is
38 // enabled. If Return.Successes is true, you MUST read from this channel or the
39 // Producer will deadlock. It is suggested that you send and read messages
40 // together in a single select statement.
41 Successes() <-chan *ProducerMessage
42
43 // Errors is the error output channel back to the user. You MUST read from this
44 // channel or the Producer will deadlock when the channel is full. Alternatively,
45 // you can set Producer.Return.Errors in your config to false, which prevents
46 // errors to be returned.
47 Errors() <-chan *ProducerError
48}
49
50// transactionManager keeps the state necessary to ensure idempotent production
51type transactionManager struct {
52 producerID int64
53 producerEpoch int16
54 sequenceNumbers map[string]int32
55 mutex sync.Mutex
56}
57
58const (
59 noProducerID = -1
60 noProducerEpoch = -1
61)
62
63func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
64 key := fmt.Sprintf("%s-%d", topic, partition)
65 t.mutex.Lock()
66 defer t.mutex.Unlock()
67 sequence := t.sequenceNumbers[key]
68 t.sequenceNumbers[key] = sequence + 1
69 return sequence
70}
71
72func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
73 txnmgr := &transactionManager{
74 producerID: noProducerID,
75 producerEpoch: noProducerEpoch,
76 }
77
78 if conf.Producer.Idempotent {
79 initProducerIDResponse, err := client.InitProducerID()
80 if err != nil {
81 return nil, err
82 }
83 txnmgr.producerID = initProducerIDResponse.ProducerID
84 txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
85 txnmgr.sequenceNumbers = make(map[string]int32)
86 txnmgr.mutex = sync.Mutex{}
87
88 Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
89 }
90
91 return txnmgr, nil
92}
93
94type asyncProducer struct {
Scott Baker8461e152019-10-01 14:44:30 -070095 client Client
96 conf *Config
khenaidooac637102019-01-14 15:44:34 -050097
98 errors chan *ProducerError
99 input, successes, retries chan *ProducerMessage
100 inFlight sync.WaitGroup
101
102 brokers map[*Broker]*brokerProducer
103 brokerRefs map[*brokerProducer]int
104 brokerLock sync.Mutex
105
106 txnmgr *transactionManager
107}
108
109// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
110func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
111 client, err := NewClient(addrs, conf)
112 if err != nil {
113 return nil, err
114 }
Scott Baker8461e152019-10-01 14:44:30 -0700115 return newAsyncProducer(client)
khenaidooac637102019-01-14 15:44:34 -0500116}
117
118// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
119// necessary to call Close() on the underlying client when shutting down this producer.
120func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
Scott Baker8461e152019-10-01 14:44:30 -0700121 // For clients passed in by the client, ensure we don't
122 // call Close() on it.
123 cli := &nopCloserClient{client}
124 return newAsyncProducer(cli)
125}
126
127func newAsyncProducer(client Client) (AsyncProducer, error) {
khenaidooac637102019-01-14 15:44:34 -0500128 // Check that we are not dealing with a closed Client before processing any other arguments
129 if client.Closed() {
130 return nil, ErrClosedClient
131 }
132
133 txnmgr, err := newTransactionManager(client.Config(), client)
134 if err != nil {
135 return nil, err
136 }
137
138 p := &asyncProducer{
139 client: client,
140 conf: client.Config(),
141 errors: make(chan *ProducerError),
142 input: make(chan *ProducerMessage),
143 successes: make(chan *ProducerMessage),
144 retries: make(chan *ProducerMessage),
145 brokers: make(map[*Broker]*brokerProducer),
146 brokerRefs: make(map[*brokerProducer]int),
147 txnmgr: txnmgr,
148 }
149
150 // launch our singleton dispatchers
151 go withRecover(p.dispatcher)
152 go withRecover(p.retryHandler)
153
154 return p, nil
155}
156
157type flagSet int8
158
159const (
160 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
161 fin // final message from partitionProducer to brokerProducer and back
162 shutdown // start the shutdown process
163)
164
165// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
166type ProducerMessage struct {
167 Topic string // The Kafka topic for this message.
168 // The partitioning key for this message. Pre-existing Encoders include
169 // StringEncoder and ByteEncoder.
170 Key Encoder
171 // The actual message to store in Kafka. Pre-existing Encoders include
172 // StringEncoder and ByteEncoder.
173 Value Encoder
174
175 // The headers are key-value pairs that are transparently passed
176 // by Kafka between producers and consumers.
177 Headers []RecordHeader
178
179 // This field is used to hold arbitrary data you wish to include so it
180 // will be available when receiving on the Successes and Errors channels.
181 // Sarama completely ignores this field and is only to be used for
182 // pass-through data.
183 Metadata interface{}
184
185 // Below this point are filled in by the producer as the message is processed
186
187 // Offset is the offset of the message stored on the broker. This is only
188 // guaranteed to be defined if the message was successfully delivered and
189 // RequiredAcks is not NoResponse.
190 Offset int64
191 // Partition is the partition that the message was sent to. This is only
192 // guaranteed to be defined if the message was successfully delivered.
193 Partition int32
Scott Baker8461e152019-10-01 14:44:30 -0700194 // Timestamp can vary in behaviour depending on broker configuration, being
195 // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
196 // and requiring version at least 0.10.0.
197 //
198 // When configured to CreateTime, the timestamp is specified by the producer
199 // either by explicitly setting this field, or when the message is added
200 // to a produce set.
201 //
202 // When configured to LogAppendTime, the timestamp assigned to the message
203 // by the broker. This is only guaranteed to be defined if the message was
204 // successfully delivered and RequiredAcks is not NoResponse.
khenaidooac637102019-01-14 15:44:34 -0500205 Timestamp time.Time
206
207 retries int
208 flags flagSet
209 expectation chan *ProducerError
210 sequenceNumber int32
211}
212
213const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
214
215func (m *ProducerMessage) byteSize(version int) int {
216 var size int
217 if version >= 2 {
218 size = maximumRecordOverhead
219 for _, h := range m.Headers {
220 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
221 }
222 } else {
223 size = producerMessageOverhead
224 }
225 if m.Key != nil {
226 size += m.Key.Length()
227 }
228 if m.Value != nil {
229 size += m.Value.Length()
230 }
231 return size
232}
233
234func (m *ProducerMessage) clear() {
235 m.flags = 0
236 m.retries = 0
237}
238
239// ProducerError is the type of error generated when the producer fails to deliver a message.
240// It contains the original ProducerMessage as well as the actual error value.
241type ProducerError struct {
242 Msg *ProducerMessage
243 Err error
244}
245
246func (pe ProducerError) Error() string {
247 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
248}
249
250// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
251// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
252// when closing a producer.
253type ProducerErrors []*ProducerError
254
255func (pe ProducerErrors) Error() string {
256 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
257}
258
259func (p *asyncProducer) Errors() <-chan *ProducerError {
260 return p.errors
261}
262
263func (p *asyncProducer) Successes() <-chan *ProducerMessage {
264 return p.successes
265}
266
267func (p *asyncProducer) Input() chan<- *ProducerMessage {
268 return p.input
269}
270
271func (p *asyncProducer) Close() error {
272 p.AsyncClose()
273
274 if p.conf.Producer.Return.Successes {
275 go withRecover(func() {
276 for range p.successes {
277 }
278 })
279 }
280
281 var errors ProducerErrors
282 if p.conf.Producer.Return.Errors {
283 for event := range p.errors {
284 errors = append(errors, event)
285 }
286 } else {
287 <-p.errors
288 }
289
290 if len(errors) > 0 {
291 return errors
292 }
293 return nil
294}
295
296func (p *asyncProducer) AsyncClose() {
297 go withRecover(p.shutdown)
298}
299
300// singleton
301// dispatches messages by topic
302func (p *asyncProducer) dispatcher() {
303 handlers := make(map[string]chan<- *ProducerMessage)
304 shuttingDown := false
305
306 for msg := range p.input {
307 if msg == nil {
308 Logger.Println("Something tried to send a nil message, it was ignored.")
309 continue
310 }
311
312 if msg.flags&shutdown != 0 {
313 shuttingDown = true
314 p.inFlight.Done()
315 continue
316 } else if msg.retries == 0 {
317 if shuttingDown {
318 // we can't just call returnError here because that decrements the wait group,
319 // which hasn't been incremented yet for this message, and shouldn't be
320 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
321 if p.conf.Producer.Return.Errors {
322 p.errors <- pErr
323 } else {
324 Logger.Println(pErr)
325 }
326 continue
327 }
328 p.inFlight.Add(1)
329 }
330
331 version := 1
332 if p.conf.Version.IsAtLeast(V0_11_0_0) {
333 version = 2
334 } else if msg.Headers != nil {
335 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
336 continue
337 }
338 if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
339 p.returnError(msg, ErrMessageSizeTooLarge)
340 continue
341 }
342
343 handler := handlers[msg.Topic]
344 if handler == nil {
345 handler = p.newTopicProducer(msg.Topic)
346 handlers[msg.Topic] = handler
347 }
348
349 handler <- msg
350 }
351
352 for _, handler := range handlers {
353 close(handler)
354 }
355}
356
357// one per topic
358// partitions messages, then dispatches them by partition
359type topicProducer struct {
360 parent *asyncProducer
361 topic string
362 input <-chan *ProducerMessage
363
364 breaker *breaker.Breaker
365 handlers map[int32]chan<- *ProducerMessage
366 partitioner Partitioner
367}
368
369func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
370 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
371 tp := &topicProducer{
372 parent: p,
373 topic: topic,
374 input: input,
375 breaker: breaker.New(3, 1, 10*time.Second),
376 handlers: make(map[int32]chan<- *ProducerMessage),
377 partitioner: p.conf.Producer.Partitioner(topic),
378 }
379 go withRecover(tp.dispatch)
380 return input
381}
382
383func (tp *topicProducer) dispatch() {
384 for msg := range tp.input {
385 if msg.retries == 0 {
386 if err := tp.partitionMessage(msg); err != nil {
387 tp.parent.returnError(msg, err)
388 continue
389 }
390 }
391 // All messages being retried (sent or not) have already had their retry count updated
392 if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
393 msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
394 }
395
396 handler := tp.handlers[msg.Partition]
397 if handler == nil {
398 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
399 tp.handlers[msg.Partition] = handler
400 }
401
402 handler <- msg
403 }
404
405 for _, handler := range tp.handlers {
406 close(handler)
407 }
408}
409
410func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
411 var partitions []int32
412
413 err := tp.breaker.Run(func() (err error) {
414 var requiresConsistency = false
415 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
416 requiresConsistency = ep.MessageRequiresConsistency(msg)
417 } else {
418 requiresConsistency = tp.partitioner.RequiresConsistency()
419 }
420
421 if requiresConsistency {
422 partitions, err = tp.parent.client.Partitions(msg.Topic)
423 } else {
424 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
425 }
426 return
427 })
428
429 if err != nil {
430 return err
431 }
432
433 numPartitions := int32(len(partitions))
434
435 if numPartitions == 0 {
436 return ErrLeaderNotAvailable
437 }
438
439 choice, err := tp.partitioner.Partition(msg, numPartitions)
440
441 if err != nil {
442 return err
443 } else if choice < 0 || choice >= numPartitions {
444 return ErrInvalidPartition
445 }
446
447 msg.Partition = partitions[choice]
448
449 return nil
450}
451
452// one per partition per topic
453// dispatches messages to the appropriate broker
454// also responsible for maintaining message order during retries
455type partitionProducer struct {
456 parent *asyncProducer
457 topic string
458 partition int32
459 input <-chan *ProducerMessage
460
461 leader *Broker
462 breaker *breaker.Breaker
463 brokerProducer *brokerProducer
464
465 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
466 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
467 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
468 // therefore whether our buffer is complete and safe to flush)
469 highWatermark int
470 retryState []partitionRetryState
471}
472
473type partitionRetryState struct {
474 buf []*ProducerMessage
475 expectChaser bool
476}
477
478func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
479 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
480 pp := &partitionProducer{
481 parent: p,
482 topic: topic,
483 partition: partition,
484 input: input,
485
486 breaker: breaker.New(3, 1, 10*time.Second),
487 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
488 }
489 go withRecover(pp.dispatch)
490 return input
491}
492
William Kurkiandaa6bb22019-03-07 12:26:28 -0500493func (pp *partitionProducer) backoff(retries int) {
494 var backoff time.Duration
495 if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
496 maxRetries := pp.parent.conf.Producer.Retry.Max
497 backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
498 } else {
499 backoff = pp.parent.conf.Producer.Retry.Backoff
500 }
501 if backoff > 0 {
502 time.Sleep(backoff)
503 }
504}
505
khenaidooac637102019-01-14 15:44:34 -0500506func (pp *partitionProducer) dispatch() {
507 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
508 // on the first message
509 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
510 if pp.leader != nil {
511 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
512 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
513 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
514 }
515
William Kurkiandaa6bb22019-03-07 12:26:28 -0500516 defer func() {
517 if pp.brokerProducer != nil {
518 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
519 }
520 }()
521
khenaidooac637102019-01-14 15:44:34 -0500522 for msg := range pp.input {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500523
524 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
525 select {
526 case <-pp.brokerProducer.abandoned:
527 // a message on the abandoned channel means that our current broker selection is out of date
528 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
529 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
530 pp.brokerProducer = nil
531 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
532 default:
533 // producer connection is still open.
534 }
535 }
536
khenaidooac637102019-01-14 15:44:34 -0500537 if msg.retries > pp.highWatermark {
538 // a new, higher, retry level; handle it and then back off
539 pp.newHighWatermark(msg.retries)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500540 pp.backoff(msg.retries)
khenaidooac637102019-01-14 15:44:34 -0500541 } else if pp.highWatermark > 0 {
542 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
543 if msg.retries < pp.highWatermark {
544 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
545 if msg.flags&fin == fin {
546 pp.retryState[msg.retries].expectChaser = false
547 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
548 } else {
549 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
550 }
551 continue
552 } else if msg.flags&fin == fin {
553 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
554 // meaning this retry level is done and we can go down (at least) one level and flush that
555 pp.retryState[pp.highWatermark].expectChaser = false
556 pp.flushRetryBuffers()
557 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
558 continue
559 }
560 }
561
562 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
563 // without breaking any of our ordering guarantees
564
565 if pp.brokerProducer == nil {
566 if err := pp.updateLeader(); err != nil {
567 pp.parent.returnError(msg, err)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500568 pp.backoff(msg.retries)
khenaidooac637102019-01-14 15:44:34 -0500569 continue
570 }
571 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
572 }
573
574 pp.brokerProducer.input <- msg
575 }
khenaidooac637102019-01-14 15:44:34 -0500576}
577
578func (pp *partitionProducer) newHighWatermark(hwm int) {
579 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
580 pp.highWatermark = hwm
581
582 // send off a fin so that we know when everything "in between" has made it
583 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
584 pp.retryState[pp.highWatermark].expectChaser = true
585 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
586 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
587
588 // a new HWM means that our current broker selection is out of date
589 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
590 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
591 pp.brokerProducer = nil
592}
593
594func (pp *partitionProducer) flushRetryBuffers() {
595 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
596 for {
597 pp.highWatermark--
598
599 if pp.brokerProducer == nil {
600 if err := pp.updateLeader(); err != nil {
601 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
602 goto flushDone
603 }
604 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
605 }
606
607 for _, msg := range pp.retryState[pp.highWatermark].buf {
608 pp.brokerProducer.input <- msg
609 }
610
611 flushDone:
612 pp.retryState[pp.highWatermark].buf = nil
613 if pp.retryState[pp.highWatermark].expectChaser {
614 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
615 break
616 } else if pp.highWatermark == 0 {
617 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
618 break
619 }
620 }
621}
622
623func (pp *partitionProducer) updateLeader() error {
624 return pp.breaker.Run(func() (err error) {
625 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
626 return err
627 }
628
629 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
630 return err
631 }
632
633 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
634 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
635 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
636
637 return nil
638 })
639}
640
641// one per broker; also constructs an associated flusher
642func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
643 var (
644 input = make(chan *ProducerMessage)
645 bridge = make(chan *produceSet)
646 responses = make(chan *brokerProducerResponse)
647 )
648
649 bp := &brokerProducer{
650 parent: p,
651 broker: broker,
652 input: input,
653 output: bridge,
654 responses: responses,
655 buffer: newProduceSet(p),
656 currentRetries: make(map[string]map[int32]error),
657 }
658 go withRecover(bp.run)
659
660 // minimal bridge to make the network response `select`able
661 go withRecover(func() {
662 for set := range bridge {
663 request := set.buildRequest()
664
665 response, err := broker.Produce(request)
666
667 responses <- &brokerProducerResponse{
668 set: set,
669 err: err,
670 res: response,
671 }
672 }
673 close(responses)
674 })
675
William Kurkiandaa6bb22019-03-07 12:26:28 -0500676 if p.conf.Producer.Retry.Max <= 0 {
677 bp.abandoned = make(chan struct{})
678 }
679
khenaidooac637102019-01-14 15:44:34 -0500680 return bp
681}
682
683type brokerProducerResponse struct {
684 set *produceSet
685 err error
686 res *ProduceResponse
687}
688
689// groups messages together into appropriately-sized batches for sending to the broker
690// handles state related to retries etc
691type brokerProducer struct {
692 parent *asyncProducer
693 broker *Broker
694
695 input chan *ProducerMessage
696 output chan<- *produceSet
697 responses <-chan *brokerProducerResponse
William Kurkiandaa6bb22019-03-07 12:26:28 -0500698 abandoned chan struct{}
khenaidooac637102019-01-14 15:44:34 -0500699
700 buffer *produceSet
701 timer <-chan time.Time
702 timerFired bool
703
704 closing error
705 currentRetries map[string]map[int32]error
706}
707
708func (bp *brokerProducer) run() {
709 var output chan<- *produceSet
710 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
711
712 for {
713 select {
714 case msg := <-bp.input:
715 if msg == nil {
716 bp.shutdown()
717 return
718 }
719
720 if msg.flags&syn == syn {
721 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
722 bp.broker.ID(), msg.Topic, msg.Partition)
723 if bp.currentRetries[msg.Topic] == nil {
724 bp.currentRetries[msg.Topic] = make(map[int32]error)
725 }
726 bp.currentRetries[msg.Topic][msg.Partition] = nil
727 bp.parent.inFlight.Done()
728 continue
729 }
730
731 if reason := bp.needsRetry(msg); reason != nil {
732 bp.parent.retryMessage(msg, reason)
733
734 if bp.closing == nil && msg.flags&fin == fin {
735 // we were retrying this partition but we can start processing again
736 delete(bp.currentRetries[msg.Topic], msg.Partition)
737 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
738 bp.broker.ID(), msg.Topic, msg.Partition)
739 }
740
741 continue
742 }
743
744 if bp.buffer.wouldOverflow(msg) {
745 if err := bp.waitForSpace(msg); err != nil {
746 bp.parent.retryMessage(msg, err)
747 continue
748 }
749 }
750
751 if err := bp.buffer.add(msg); err != nil {
752 bp.parent.returnError(msg, err)
753 continue
754 }
755
756 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
757 bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
758 }
759 case <-bp.timer:
760 bp.timerFired = true
761 case output <- bp.buffer:
762 bp.rollOver()
763 case response := <-bp.responses:
764 bp.handleResponse(response)
765 }
766
767 if bp.timerFired || bp.buffer.readyToFlush() {
768 output = bp.output
769 } else {
770 output = nil
771 }
772 }
773}
774
775func (bp *brokerProducer) shutdown() {
776 for !bp.buffer.empty() {
777 select {
778 case response := <-bp.responses:
779 bp.handleResponse(response)
780 case bp.output <- bp.buffer:
781 bp.rollOver()
782 }
783 }
784 close(bp.output)
785 for response := range bp.responses {
786 bp.handleResponse(response)
787 }
788
789 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
790}
791
792func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
793 if bp.closing != nil {
794 return bp.closing
795 }
796
797 return bp.currentRetries[msg.Topic][msg.Partition]
798}
799
800func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
801 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
802
803 for {
804 select {
805 case response := <-bp.responses:
806 bp.handleResponse(response)
807 // handling a response can change our state, so re-check some things
808 if reason := bp.needsRetry(msg); reason != nil {
809 return reason
810 } else if !bp.buffer.wouldOverflow(msg) {
811 return nil
812 }
813 case bp.output <- bp.buffer:
814 bp.rollOver()
815 return nil
816 }
817 }
818}
819
820func (bp *brokerProducer) rollOver() {
821 bp.timer = nil
822 bp.timerFired = false
823 bp.buffer = newProduceSet(bp.parent)
824}
825
826func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
827 if response.err != nil {
828 bp.handleError(response.set, response.err)
829 } else {
830 bp.handleSuccess(response.set, response.res)
831 }
832
833 if bp.buffer.empty() {
834 bp.rollOver() // this can happen if the response invalidated our buffer
835 }
836}
837
838func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
839 // we iterate through the blocks in the request set, not the response, so that we notice
840 // if the response is missing a block completely
841 var retryTopics []string
842 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
843 if response == nil {
844 // this only happens when RequiredAcks is NoResponse, so we have to assume success
845 bp.parent.returnSuccesses(pSet.msgs)
846 return
847 }
848
849 block := response.GetBlock(topic, partition)
850 if block == nil {
851 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
852 return
853 }
854
855 switch block.Err {
856 // Success
857 case ErrNoError:
858 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
859 for _, msg := range pSet.msgs {
860 msg.Timestamp = block.Timestamp
861 }
862 }
863 for i, msg := range pSet.msgs {
864 msg.Offset = block.Offset + int64(i)
865 }
866 bp.parent.returnSuccesses(pSet.msgs)
867 // Duplicate
868 case ErrDuplicateSequenceNumber:
869 bp.parent.returnSuccesses(pSet.msgs)
870 // Retriable errors
871 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
872 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500873 if bp.parent.conf.Producer.Retry.Max <= 0 {
874 bp.parent.abandonBrokerConnection(bp.broker)
875 bp.parent.returnErrors(pSet.msgs, block.Err)
876 } else {
877 retryTopics = append(retryTopics, topic)
878 }
khenaidooac637102019-01-14 15:44:34 -0500879 // Other non-retriable errors
880 default:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500881 if bp.parent.conf.Producer.Retry.Max <= 0 {
882 bp.parent.abandonBrokerConnection(bp.broker)
883 }
khenaidooac637102019-01-14 15:44:34 -0500884 bp.parent.returnErrors(pSet.msgs, block.Err)
885 }
886 })
887
888 if len(retryTopics) > 0 {
889 if bp.parent.conf.Producer.Idempotent {
890 err := bp.parent.client.RefreshMetadata(retryTopics...)
891 if err != nil {
892 Logger.Printf("Failed refreshing metadata because of %v\n", err)
893 }
894 }
895
896 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
897 block := response.GetBlock(topic, partition)
898 if block == nil {
899 // handled in the previous "eachPartition" loop
900 return
901 }
902
903 switch block.Err {
904 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
905 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
906 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
907 bp.broker.ID(), topic, partition, block.Err)
908 if bp.currentRetries[topic] == nil {
909 bp.currentRetries[topic] = make(map[int32]error)
910 }
911 bp.currentRetries[topic][partition] = block.Err
912 if bp.parent.conf.Producer.Idempotent {
913 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
914 } else {
915 bp.parent.retryMessages(pSet.msgs, block.Err)
916 }
917 // dropping the following messages has the side effect of incrementing their retry count
918 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
919 }
920 })
921 }
922}
923
924func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
925 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
926 produceSet := newProduceSet(p)
927 produceSet.msgs[topic] = make(map[int32]*partitionSet)
928 produceSet.msgs[topic][partition] = pSet
929 produceSet.bufferBytes += pSet.bufferBytes
930 produceSet.bufferCount += len(pSet.msgs)
931 for _, msg := range pSet.msgs {
932 if msg.retries >= p.conf.Producer.Retry.Max {
933 p.returnError(msg, kerr)
934 return
935 }
936 msg.retries++
937 }
938
939 // it's expected that a metadata refresh has been requested prior to calling retryBatch
940 leader, err := p.client.Leader(topic, partition)
941 if err != nil {
942 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
943 for _, msg := range pSet.msgs {
944 p.returnError(msg, kerr)
945 }
946 return
947 }
948 bp := p.getBrokerProducer(leader)
949 bp.output <- produceSet
950}
951
952func (bp *brokerProducer) handleError(sent *produceSet, err error) {
953 switch err.(type) {
954 case PacketEncodingError:
955 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
956 bp.parent.returnErrors(pSet.msgs, err)
957 })
958 default:
959 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
960 bp.parent.abandonBrokerConnection(bp.broker)
961 _ = bp.broker.Close()
962 bp.closing = err
963 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
964 bp.parent.retryMessages(pSet.msgs, err)
965 })
966 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
967 bp.parent.retryMessages(pSet.msgs, err)
968 })
969 bp.rollOver()
970 }
971}
972
973// singleton
974// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
975// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
976func (p *asyncProducer) retryHandler() {
977 var msg *ProducerMessage
978 buf := queue.New()
979
980 for {
981 if buf.Length() == 0 {
982 msg = <-p.retries
983 } else {
984 select {
985 case msg = <-p.retries:
986 case p.input <- buf.Peek().(*ProducerMessage):
987 buf.Remove()
988 continue
989 }
990 }
991
992 if msg == nil {
993 return
994 }
995
996 buf.Add(msg)
997 }
998}
999
1000// utility functions
1001
1002func (p *asyncProducer) shutdown() {
1003 Logger.Println("Producer shutting down.")
1004 p.inFlight.Add(1)
1005 p.input <- &ProducerMessage{flags: shutdown}
1006
1007 p.inFlight.Wait()
1008
Scott Baker8461e152019-10-01 14:44:30 -07001009 err := p.client.Close()
1010 if err != nil {
1011 Logger.Println("producer/shutdown failed to close the embedded client:", err)
khenaidooac637102019-01-14 15:44:34 -05001012 }
1013
1014 close(p.input)
1015 close(p.retries)
1016 close(p.errors)
1017 close(p.successes)
1018}
1019
1020func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
1021 msg.clear()
1022 pErr := &ProducerError{Msg: msg, Err: err}
1023 if p.conf.Producer.Return.Errors {
1024 p.errors <- pErr
1025 } else {
1026 Logger.Println(pErr)
1027 }
1028 p.inFlight.Done()
1029}
1030
1031func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1032 for _, msg := range batch {
1033 p.returnError(msg, err)
1034 }
1035}
1036
1037func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1038 for _, msg := range batch {
1039 if p.conf.Producer.Return.Successes {
1040 msg.clear()
1041 p.successes <- msg
1042 }
1043 p.inFlight.Done()
1044 }
1045}
1046
1047func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1048 if msg.retries >= p.conf.Producer.Retry.Max {
1049 p.returnError(msg, err)
1050 } else {
1051 msg.retries++
1052 p.retries <- msg
1053 }
1054}
1055
1056func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1057 for _, msg := range batch {
1058 p.retryMessage(msg, err)
1059 }
1060}
1061
1062func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1063 p.brokerLock.Lock()
1064 defer p.brokerLock.Unlock()
1065
1066 bp := p.brokers[broker]
1067
1068 if bp == nil {
1069 bp = p.newBrokerProducer(broker)
1070 p.brokers[broker] = bp
1071 p.brokerRefs[bp] = 0
1072 }
1073
1074 p.brokerRefs[bp]++
1075
1076 return bp
1077}
1078
1079func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1080 p.brokerLock.Lock()
1081 defer p.brokerLock.Unlock()
1082
1083 p.brokerRefs[bp]--
1084 if p.brokerRefs[bp] == 0 {
1085 close(bp.input)
1086 delete(p.brokerRefs, bp)
1087
1088 if p.brokers[broker] == bp {
1089 delete(p.brokers, broker)
1090 }
1091 }
1092}
1093
1094func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1095 p.brokerLock.Lock()
1096 defer p.brokerLock.Unlock()
1097
William Kurkiandaa6bb22019-03-07 12:26:28 -05001098 bc, ok := p.brokers[broker]
1099 if ok && bc.abandoned != nil {
1100 close(bc.abandoned)
1101 }
1102
khenaidooac637102019-01-14 15:44:34 -05001103 delete(p.brokers, broker)
1104}