blob: c0dcce9b9d9da77c860ca0ccea67a79e5e2b84c4 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/eapache/go-resiliency/breaker"
10 "github.com/eapache/queue"
11)
12
13// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
14// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
15// and parses responses for errors. You must read from the Errors() channel or the
16// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
kesavandc71914f2022-03-25 11:19:03 +053017// leaks and message lost: it will not be garbage-collected automatically when it passes
18// out of scope and buffered messages may not be flushed.
kesavand2cde6582020-06-22 04:56:23 -040019type AsyncProducer interface {
20
21 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
22 // when both the Errors and Successes channels have been closed. When calling
23 // AsyncClose, you *must* continue to read from those channels in order to
24 // drain the results of any messages in flight.
25 AsyncClose()
26
27 // Close shuts down the producer and waits for any buffered messages to be
28 // flushed. You must call this function before a producer object passes out of
kesavandc71914f2022-03-25 11:19:03 +053029 // scope, as it may otherwise leak memory. You must call this before process
30 // shutting down, or you may lose messages. You must call this before calling
kesavand2cde6582020-06-22 04:56:23 -040031 // Close on the underlying client.
32 Close() error
33
34 // Input is the input channel for the user to write messages to that they
35 // wish to send.
36 Input() chan<- *ProducerMessage
37
38 // Successes is the success output channel back to the user when Return.Successes is
39 // enabled. If Return.Successes is true, you MUST read from this channel or the
40 // Producer will deadlock. It is suggested that you send and read messages
41 // together in a single select statement.
42 Successes() <-chan *ProducerMessage
43
44 // Errors is the error output channel back to the user. You MUST read from this
45 // channel or the Producer will deadlock when the channel is full. Alternatively,
46 // you can set Producer.Return.Errors in your config to false, which prevents
47 // errors to be returned.
48 Errors() <-chan *ProducerError
49}
50
51// transactionManager keeps the state necessary to ensure idempotent production
52type transactionManager struct {
53 producerID int64
54 producerEpoch int16
55 sequenceNumbers map[string]int32
56 mutex sync.Mutex
57}
58
59const (
60 noProducerID = -1
61 noProducerEpoch = -1
62)
63
kesavandc71914f2022-03-25 11:19:03 +053064func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
kesavand2cde6582020-06-22 04:56:23 -040065 key := fmt.Sprintf("%s-%d", topic, partition)
66 t.mutex.Lock()
67 defer t.mutex.Unlock()
68 sequence := t.sequenceNumbers[key]
69 t.sequenceNumbers[key] = sequence + 1
kesavandc71914f2022-03-25 11:19:03 +053070 return sequence, t.producerEpoch
71}
72
73func (t *transactionManager) bumpEpoch() {
74 t.mutex.Lock()
75 defer t.mutex.Unlock()
76 t.producerEpoch++
77 for k := range t.sequenceNumbers {
78 t.sequenceNumbers[k] = 0
79 }
80}
81
82func (t *transactionManager) getProducerID() (int64, int16) {
83 t.mutex.Lock()
84 defer t.mutex.Unlock()
85 return t.producerID, t.producerEpoch
kesavand2cde6582020-06-22 04:56:23 -040086}
87
88func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
89 txnmgr := &transactionManager{
90 producerID: noProducerID,
91 producerEpoch: noProducerEpoch,
92 }
93
94 if conf.Producer.Idempotent {
95 initProducerIDResponse, err := client.InitProducerID()
96 if err != nil {
97 return nil, err
98 }
99 txnmgr.producerID = initProducerIDResponse.ProducerID
100 txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
101 txnmgr.sequenceNumbers = make(map[string]int32)
102 txnmgr.mutex = sync.Mutex{}
103
104 Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
105 }
106
107 return txnmgr, nil
108}
109
110type asyncProducer struct {
111 client Client
112 conf *Config
113
114 errors chan *ProducerError
115 input, successes, retries chan *ProducerMessage
116 inFlight sync.WaitGroup
117
118 brokers map[*Broker]*brokerProducer
119 brokerRefs map[*brokerProducer]int
120 brokerLock sync.Mutex
121
122 txnmgr *transactionManager
123}
124
125// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
126func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
127 client, err := NewClient(addrs, conf)
128 if err != nil {
129 return nil, err
130 }
131 return newAsyncProducer(client)
132}
133
134// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
135// necessary to call Close() on the underlying client when shutting down this producer.
136func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
137 // For clients passed in by the client, ensure we don't
138 // call Close() on it.
139 cli := &nopCloserClient{client}
140 return newAsyncProducer(cli)
141}
142
143func newAsyncProducer(client Client) (AsyncProducer, error) {
144 // Check that we are not dealing with a closed Client before processing any other arguments
145 if client.Closed() {
146 return nil, ErrClosedClient
147 }
148
149 txnmgr, err := newTransactionManager(client.Config(), client)
150 if err != nil {
151 return nil, err
152 }
153
154 p := &asyncProducer{
155 client: client,
156 conf: client.Config(),
157 errors: make(chan *ProducerError),
158 input: make(chan *ProducerMessage),
159 successes: make(chan *ProducerMessage),
160 retries: make(chan *ProducerMessage),
161 brokers: make(map[*Broker]*brokerProducer),
162 brokerRefs: make(map[*brokerProducer]int),
163 txnmgr: txnmgr,
164 }
165
166 // launch our singleton dispatchers
167 go withRecover(p.dispatcher)
168 go withRecover(p.retryHandler)
169
170 return p, nil
171}
172
173type flagSet int8
174
175const (
176 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
177 fin // final message from partitionProducer to brokerProducer and back
178 shutdown // start the shutdown process
179)
180
181// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
182type ProducerMessage struct {
183 Topic string // The Kafka topic for this message.
184 // The partitioning key for this message. Pre-existing Encoders include
185 // StringEncoder and ByteEncoder.
186 Key Encoder
187 // The actual message to store in Kafka. Pre-existing Encoders include
188 // StringEncoder and ByteEncoder.
189 Value Encoder
190
191 // The headers are key-value pairs that are transparently passed
192 // by Kafka between producers and consumers.
193 Headers []RecordHeader
194
195 // This field is used to hold arbitrary data you wish to include so it
196 // will be available when receiving on the Successes and Errors channels.
197 // Sarama completely ignores this field and is only to be used for
198 // pass-through data.
199 Metadata interface{}
200
201 // Below this point are filled in by the producer as the message is processed
202
203 // Offset is the offset of the message stored on the broker. This is only
204 // guaranteed to be defined if the message was successfully delivered and
205 // RequiredAcks is not NoResponse.
206 Offset int64
207 // Partition is the partition that the message was sent to. This is only
208 // guaranteed to be defined if the message was successfully delivered.
209 Partition int32
kesavandc71914f2022-03-25 11:19:03 +0530210 // Timestamp can vary in behavior depending on broker configuration, being
kesavand2cde6582020-06-22 04:56:23 -0400211 // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
212 // and requiring version at least 0.10.0.
213 //
214 // When configured to CreateTime, the timestamp is specified by the producer
215 // either by explicitly setting this field, or when the message is added
216 // to a produce set.
217 //
218 // When configured to LogAppendTime, the timestamp assigned to the message
219 // by the broker. This is only guaranteed to be defined if the message was
220 // successfully delivered and RequiredAcks is not NoResponse.
221 Timestamp time.Time
222
223 retries int
224 flags flagSet
225 expectation chan *ProducerError
226 sequenceNumber int32
kesavandc71914f2022-03-25 11:19:03 +0530227 producerEpoch int16
228 hasSequence bool
kesavand2cde6582020-06-22 04:56:23 -0400229}
230
231const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
232
233func (m *ProducerMessage) byteSize(version int) int {
234 var size int
235 if version >= 2 {
236 size = maximumRecordOverhead
237 for _, h := range m.Headers {
238 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
239 }
240 } else {
241 size = producerMessageOverhead
242 }
243 if m.Key != nil {
244 size += m.Key.Length()
245 }
246 if m.Value != nil {
247 size += m.Value.Length()
248 }
249 return size
250}
251
252func (m *ProducerMessage) clear() {
253 m.flags = 0
254 m.retries = 0
kesavandc71914f2022-03-25 11:19:03 +0530255 m.sequenceNumber = 0
256 m.producerEpoch = 0
257 m.hasSequence = false
kesavand2cde6582020-06-22 04:56:23 -0400258}
259
260// ProducerError is the type of error generated when the producer fails to deliver a message.
261// It contains the original ProducerMessage as well as the actual error value.
262type ProducerError struct {
263 Msg *ProducerMessage
264 Err error
265}
266
267func (pe ProducerError) Error() string {
268 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
269}
270
kesavandc71914f2022-03-25 11:19:03 +0530271func (pe ProducerError) Unwrap() error {
272 return pe.Err
273}
274
kesavand2cde6582020-06-22 04:56:23 -0400275// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
276// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
277// when closing a producer.
278type ProducerErrors []*ProducerError
279
280func (pe ProducerErrors) Error() string {
281 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
282}
283
284func (p *asyncProducer) Errors() <-chan *ProducerError {
285 return p.errors
286}
287
288func (p *asyncProducer) Successes() <-chan *ProducerMessage {
289 return p.successes
290}
291
292func (p *asyncProducer) Input() chan<- *ProducerMessage {
293 return p.input
294}
295
296func (p *asyncProducer) Close() error {
297 p.AsyncClose()
298
299 if p.conf.Producer.Return.Successes {
300 go withRecover(func() {
301 for range p.successes {
302 }
303 })
304 }
305
306 var errors ProducerErrors
307 if p.conf.Producer.Return.Errors {
308 for event := range p.errors {
309 errors = append(errors, event)
310 }
311 } else {
312 <-p.errors
313 }
314
315 if len(errors) > 0 {
316 return errors
317 }
318 return nil
319}
320
321func (p *asyncProducer) AsyncClose() {
322 go withRecover(p.shutdown)
323}
324
325// singleton
326// dispatches messages by topic
327func (p *asyncProducer) dispatcher() {
328 handlers := make(map[string]chan<- *ProducerMessage)
329 shuttingDown := false
330
331 for msg := range p.input {
332 if msg == nil {
333 Logger.Println("Something tried to send a nil message, it was ignored.")
334 continue
335 }
336
337 if msg.flags&shutdown != 0 {
338 shuttingDown = true
339 p.inFlight.Done()
340 continue
341 } else if msg.retries == 0 {
342 if shuttingDown {
343 // we can't just call returnError here because that decrements the wait group,
344 // which hasn't been incremented yet for this message, and shouldn't be
345 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
346 if p.conf.Producer.Return.Errors {
347 p.errors <- pErr
348 } else {
349 Logger.Println(pErr)
350 }
351 continue
352 }
353 p.inFlight.Add(1)
354 }
355
kesavandc71914f2022-03-25 11:19:03 +0530356 for _, interceptor := range p.conf.Producer.Interceptors {
357 msg.safelyApplyInterceptor(interceptor)
358 }
359
kesavand2cde6582020-06-22 04:56:23 -0400360 version := 1
361 if p.conf.Version.IsAtLeast(V0_11_0_0) {
362 version = 2
363 } else if msg.Headers != nil {
364 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
365 continue
366 }
367 if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
368 p.returnError(msg, ErrMessageSizeTooLarge)
369 continue
370 }
371
372 handler := handlers[msg.Topic]
373 if handler == nil {
374 handler = p.newTopicProducer(msg.Topic)
375 handlers[msg.Topic] = handler
376 }
377
378 handler <- msg
379 }
380
381 for _, handler := range handlers {
382 close(handler)
383 }
384}
385
386// one per topic
387// partitions messages, then dispatches them by partition
388type topicProducer struct {
389 parent *asyncProducer
390 topic string
391 input <-chan *ProducerMessage
392
393 breaker *breaker.Breaker
394 handlers map[int32]chan<- *ProducerMessage
395 partitioner Partitioner
396}
397
398func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
399 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
400 tp := &topicProducer{
401 parent: p,
402 topic: topic,
403 input: input,
404 breaker: breaker.New(3, 1, 10*time.Second),
405 handlers: make(map[int32]chan<- *ProducerMessage),
406 partitioner: p.conf.Producer.Partitioner(topic),
407 }
408 go withRecover(tp.dispatch)
409 return input
410}
411
412func (tp *topicProducer) dispatch() {
413 for msg := range tp.input {
414 if msg.retries == 0 {
415 if err := tp.partitionMessage(msg); err != nil {
416 tp.parent.returnError(msg, err)
417 continue
418 }
419 }
kesavand2cde6582020-06-22 04:56:23 -0400420
421 handler := tp.handlers[msg.Partition]
422 if handler == nil {
423 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
424 tp.handlers[msg.Partition] = handler
425 }
426
427 handler <- msg
428 }
429
430 for _, handler := range tp.handlers {
431 close(handler)
432 }
433}
434
435func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
436 var partitions []int32
437
438 err := tp.breaker.Run(func() (err error) {
kesavandc71914f2022-03-25 11:19:03 +0530439 requiresConsistency := false
kesavand2cde6582020-06-22 04:56:23 -0400440 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
441 requiresConsistency = ep.MessageRequiresConsistency(msg)
442 } else {
443 requiresConsistency = tp.partitioner.RequiresConsistency()
444 }
445
446 if requiresConsistency {
447 partitions, err = tp.parent.client.Partitions(msg.Topic)
448 } else {
449 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
450 }
451 return
452 })
kesavand2cde6582020-06-22 04:56:23 -0400453 if err != nil {
454 return err
455 }
456
457 numPartitions := int32(len(partitions))
458
459 if numPartitions == 0 {
460 return ErrLeaderNotAvailable
461 }
462
463 choice, err := tp.partitioner.Partition(msg, numPartitions)
464
465 if err != nil {
466 return err
467 } else if choice < 0 || choice >= numPartitions {
468 return ErrInvalidPartition
469 }
470
471 msg.Partition = partitions[choice]
472
473 return nil
474}
475
476// one per partition per topic
477// dispatches messages to the appropriate broker
478// also responsible for maintaining message order during retries
479type partitionProducer struct {
480 parent *asyncProducer
481 topic string
482 partition int32
483 input <-chan *ProducerMessage
484
485 leader *Broker
486 breaker *breaker.Breaker
487 brokerProducer *brokerProducer
488
489 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
490 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
491 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
492 // therefore whether our buffer is complete and safe to flush)
493 highWatermark int
494 retryState []partitionRetryState
495}
496
497type partitionRetryState struct {
498 buf []*ProducerMessage
499 expectChaser bool
500}
501
502func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
503 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
504 pp := &partitionProducer{
505 parent: p,
506 topic: topic,
507 partition: partition,
508 input: input,
509
510 breaker: breaker.New(3, 1, 10*time.Second),
511 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
512 }
513 go withRecover(pp.dispatch)
514 return input
515}
516
517func (pp *partitionProducer) backoff(retries int) {
518 var backoff time.Duration
519 if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
520 maxRetries := pp.parent.conf.Producer.Retry.Max
521 backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
522 } else {
523 backoff = pp.parent.conf.Producer.Retry.Backoff
524 }
525 if backoff > 0 {
526 time.Sleep(backoff)
527 }
528}
529
530func (pp *partitionProducer) dispatch() {
531 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
532 // on the first message
533 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
534 if pp.leader != nil {
535 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
536 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
537 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
538 }
539
540 defer func() {
541 if pp.brokerProducer != nil {
542 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
543 }
544 }()
545
546 for msg := range pp.input {
kesavand2cde6582020-06-22 04:56:23 -0400547 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
548 select {
549 case <-pp.brokerProducer.abandoned:
550 // a message on the abandoned channel means that our current broker selection is out of date
551 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
552 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
553 pp.brokerProducer = nil
554 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
555 default:
556 // producer connection is still open.
557 }
558 }
559
560 if msg.retries > pp.highWatermark {
561 // a new, higher, retry level; handle it and then back off
562 pp.newHighWatermark(msg.retries)
563 pp.backoff(msg.retries)
564 } else if pp.highWatermark > 0 {
565 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
566 if msg.retries < pp.highWatermark {
567 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
568 if msg.flags&fin == fin {
569 pp.retryState[msg.retries].expectChaser = false
570 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
571 } else {
572 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
573 }
574 continue
575 } else if msg.flags&fin == fin {
576 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
577 // meaning this retry level is done and we can go down (at least) one level and flush that
578 pp.retryState[pp.highWatermark].expectChaser = false
579 pp.flushRetryBuffers()
580 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
581 continue
582 }
583 }
584
585 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
586 // without breaking any of our ordering guarantees
587
588 if pp.brokerProducer == nil {
589 if err := pp.updateLeader(); err != nil {
590 pp.parent.returnError(msg, err)
591 pp.backoff(msg.retries)
592 continue
593 }
594 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
595 }
596
kesavandc71914f2022-03-25 11:19:03 +0530597 // Now that we know we have a broker to actually try and send this message to, generate the sequence
598 // number for it.
599 // All messages being retried (sent or not) have already had their retry count updated
600 // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
601 if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
602 msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
603 msg.hasSequence = true
604 }
605
kesavand2cde6582020-06-22 04:56:23 -0400606 pp.brokerProducer.input <- msg
607 }
608}
609
610func (pp *partitionProducer) newHighWatermark(hwm int) {
611 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
612 pp.highWatermark = hwm
613
614 // send off a fin so that we know when everything "in between" has made it
615 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
616 pp.retryState[pp.highWatermark].expectChaser = true
617 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
618 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
619
620 // a new HWM means that our current broker selection is out of date
621 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
622 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
623 pp.brokerProducer = nil
624}
625
626func (pp *partitionProducer) flushRetryBuffers() {
627 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
628 for {
629 pp.highWatermark--
630
631 if pp.brokerProducer == nil {
632 if err := pp.updateLeader(); err != nil {
633 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
634 goto flushDone
635 }
636 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
637 }
638
639 for _, msg := range pp.retryState[pp.highWatermark].buf {
640 pp.brokerProducer.input <- msg
641 }
642
643 flushDone:
644 pp.retryState[pp.highWatermark].buf = nil
645 if pp.retryState[pp.highWatermark].expectChaser {
646 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
647 break
648 } else if pp.highWatermark == 0 {
649 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
650 break
651 }
652 }
653}
654
655func (pp *partitionProducer) updateLeader() error {
656 return pp.breaker.Run(func() (err error) {
657 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
658 return err
659 }
660
661 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
662 return err
663 }
664
665 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
666 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
667 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
668
669 return nil
670 })
671}
672
673// one per broker; also constructs an associated flusher
674func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
675 var (
676 input = make(chan *ProducerMessage)
677 bridge = make(chan *produceSet)
kesavandc71914f2022-03-25 11:19:03 +0530678 pending = make(chan *brokerProducerResponse)
kesavand2cde6582020-06-22 04:56:23 -0400679 responses = make(chan *brokerProducerResponse)
680 )
681
682 bp := &brokerProducer{
683 parent: p,
684 broker: broker,
685 input: input,
686 output: bridge,
687 responses: responses,
688 buffer: newProduceSet(p),
689 currentRetries: make(map[string]map[int32]error),
690 }
691 go withRecover(bp.run)
692
693 // minimal bridge to make the network response `select`able
694 go withRecover(func() {
kesavandc71914f2022-03-25 11:19:03 +0530695 // Use a wait group to know if we still have in flight requests
696 var wg sync.WaitGroup
697
kesavand2cde6582020-06-22 04:56:23 -0400698 for set := range bridge {
699 request := set.buildRequest()
700
kesavandc71914f2022-03-25 11:19:03 +0530701 // Count the in flight requests to know when we can close the pending channel safely
702 wg.Add(1)
703 // Capture the current set to forward in the callback
704 sendResponse := func(set *produceSet) ProduceCallback {
705 return func(response *ProduceResponse, err error) {
706 // Forward the response to make sure we do not block the responseReceiver
707 pending <- &brokerProducerResponse{
708 set: set,
709 err: err,
710 res: response,
711 }
712 wg.Done()
713 }
714 }(set)
kesavand2cde6582020-06-22 04:56:23 -0400715
kesavandc71914f2022-03-25 11:19:03 +0530716 // Use AsyncProduce vs Produce to not block waiting for the response
717 // so that we can pipeline multiple produce requests and achieve higher throughput, see:
718 // https://kafka.apache.org/protocol#protocol_network
719 err := broker.AsyncProduce(request, sendResponse)
720 if err != nil {
721 // Request failed to be sent
722 sendResponse(nil, err)
723 continue
724 }
725 // Callback is not called when using NoResponse
726 if p.conf.Producer.RequiredAcks == NoResponse {
727 // Provide the expected nil response
728 sendResponse(nil, nil)
kesavand2cde6582020-06-22 04:56:23 -0400729 }
730 }
kesavandc71914f2022-03-25 11:19:03 +0530731 // Wait for all in flight requests to close the pending channel safely
732 wg.Wait()
733 close(pending)
734 })
735
736 // In order to avoid a deadlock when closing the broker on network or malformed response error
737 // we use an intermediate channel to buffer and send pending responses in order
738 // This is because the AsyncProduce callback inside the bridge is invoked from the broker
739 // responseReceiver goroutine and closing the broker requires such goroutine to be finished
740 go withRecover(func() {
741 buf := queue.New()
742 for {
743 if buf.Length() == 0 {
744 res, ok := <-pending
745 if !ok {
746 // We are done forwarding the last pending response
747 close(responses)
748 return
749 }
750 buf.Add(res)
751 }
752 // Send the head pending response or buffer another one
753 // so that we never block the callback
754 headRes := buf.Peek().(*brokerProducerResponse)
755 select {
756 case res, ok := <-pending:
757 if !ok {
758 continue
759 }
760 buf.Add(res)
761 continue
762 case responses <- headRes:
763 buf.Remove()
764 continue
765 }
766 }
kesavand2cde6582020-06-22 04:56:23 -0400767 })
768
769 if p.conf.Producer.Retry.Max <= 0 {
770 bp.abandoned = make(chan struct{})
771 }
772
773 return bp
774}
775
776type brokerProducerResponse struct {
777 set *produceSet
778 err error
779 res *ProduceResponse
780}
781
782// groups messages together into appropriately-sized batches for sending to the broker
783// handles state related to retries etc
784type brokerProducer struct {
785 parent *asyncProducer
786 broker *Broker
787
788 input chan *ProducerMessage
789 output chan<- *produceSet
790 responses <-chan *brokerProducerResponse
791 abandoned chan struct{}
792
793 buffer *produceSet
794 timer <-chan time.Time
795 timerFired bool
796
797 closing error
798 currentRetries map[string]map[int32]error
799}
800
801func (bp *brokerProducer) run() {
802 var output chan<- *produceSet
803 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
804
805 for {
806 select {
kesavandc71914f2022-03-25 11:19:03 +0530807 case msg, ok := <-bp.input:
808 if !ok {
809 Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
kesavand2cde6582020-06-22 04:56:23 -0400810 bp.shutdown()
811 return
812 }
813
kesavandc71914f2022-03-25 11:19:03 +0530814 if msg == nil {
815 continue
816 }
817
kesavand2cde6582020-06-22 04:56:23 -0400818 if msg.flags&syn == syn {
819 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
820 bp.broker.ID(), msg.Topic, msg.Partition)
821 if bp.currentRetries[msg.Topic] == nil {
822 bp.currentRetries[msg.Topic] = make(map[int32]error)
823 }
824 bp.currentRetries[msg.Topic][msg.Partition] = nil
825 bp.parent.inFlight.Done()
826 continue
827 }
828
829 if reason := bp.needsRetry(msg); reason != nil {
830 bp.parent.retryMessage(msg, reason)
831
832 if bp.closing == nil && msg.flags&fin == fin {
833 // we were retrying this partition but we can start processing again
834 delete(bp.currentRetries[msg.Topic], msg.Partition)
835 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
836 bp.broker.ID(), msg.Topic, msg.Partition)
837 }
838
839 continue
840 }
841
842 if bp.buffer.wouldOverflow(msg) {
kesavandc71914f2022-03-25 11:19:03 +0530843 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
844 if err := bp.waitForSpace(msg, false); err != nil {
kesavand2cde6582020-06-22 04:56:23 -0400845 bp.parent.retryMessage(msg, err)
846 continue
847 }
848 }
849
kesavandc71914f2022-03-25 11:19:03 +0530850 if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
851 // The epoch was reset, need to roll the buffer over
852 Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
853 if err := bp.waitForSpace(msg, true); err != nil {
854 bp.parent.retryMessage(msg, err)
855 continue
856 }
857 }
kesavand2cde6582020-06-22 04:56:23 -0400858 if err := bp.buffer.add(msg); err != nil {
859 bp.parent.returnError(msg, err)
860 continue
861 }
862
863 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
864 bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
865 }
866 case <-bp.timer:
867 bp.timerFired = true
868 case output <- bp.buffer:
869 bp.rollOver()
kesavandc71914f2022-03-25 11:19:03 +0530870 case response, ok := <-bp.responses:
871 if ok {
872 bp.handleResponse(response)
873 }
kesavand2cde6582020-06-22 04:56:23 -0400874 }
875
876 if bp.timerFired || bp.buffer.readyToFlush() {
877 output = bp.output
878 } else {
879 output = nil
880 }
881 }
882}
883
884func (bp *brokerProducer) shutdown() {
885 for !bp.buffer.empty() {
886 select {
887 case response := <-bp.responses:
888 bp.handleResponse(response)
889 case bp.output <- bp.buffer:
890 bp.rollOver()
891 }
892 }
893 close(bp.output)
kesavandc71914f2022-03-25 11:19:03 +0530894 // Drain responses from the bridge goroutine
kesavand2cde6582020-06-22 04:56:23 -0400895 for response := range bp.responses {
896 bp.handleResponse(response)
897 }
kesavandc71914f2022-03-25 11:19:03 +0530898 // No more brokerProducer related goroutine should be running
kesavand2cde6582020-06-22 04:56:23 -0400899 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
900}
901
902func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
903 if bp.closing != nil {
904 return bp.closing
905 }
906
907 return bp.currentRetries[msg.Topic][msg.Partition]
908}
909
kesavandc71914f2022-03-25 11:19:03 +0530910func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
kesavand2cde6582020-06-22 04:56:23 -0400911 for {
912 select {
913 case response := <-bp.responses:
914 bp.handleResponse(response)
915 // handling a response can change our state, so re-check some things
916 if reason := bp.needsRetry(msg); reason != nil {
917 return reason
kesavandc71914f2022-03-25 11:19:03 +0530918 } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
kesavand2cde6582020-06-22 04:56:23 -0400919 return nil
920 }
921 case bp.output <- bp.buffer:
922 bp.rollOver()
923 return nil
924 }
925 }
926}
927
928func (bp *brokerProducer) rollOver() {
929 bp.timer = nil
930 bp.timerFired = false
931 bp.buffer = newProduceSet(bp.parent)
932}
933
934func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
935 if response.err != nil {
936 bp.handleError(response.set, response.err)
937 } else {
938 bp.handleSuccess(response.set, response.res)
939 }
940
941 if bp.buffer.empty() {
942 bp.rollOver() // this can happen if the response invalidated our buffer
943 }
944}
945
946func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
947 // we iterate through the blocks in the request set, not the response, so that we notice
948 // if the response is missing a block completely
949 var retryTopics []string
950 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
951 if response == nil {
952 // this only happens when RequiredAcks is NoResponse, so we have to assume success
953 bp.parent.returnSuccesses(pSet.msgs)
954 return
955 }
956
957 block := response.GetBlock(topic, partition)
958 if block == nil {
959 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
960 return
961 }
962
963 switch block.Err {
964 // Success
965 case ErrNoError:
966 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
967 for _, msg := range pSet.msgs {
968 msg.Timestamp = block.Timestamp
969 }
970 }
971 for i, msg := range pSet.msgs {
972 msg.Offset = block.Offset + int64(i)
973 }
974 bp.parent.returnSuccesses(pSet.msgs)
975 // Duplicate
976 case ErrDuplicateSequenceNumber:
977 bp.parent.returnSuccesses(pSet.msgs)
978 // Retriable errors
979 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
980 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
981 if bp.parent.conf.Producer.Retry.Max <= 0 {
982 bp.parent.abandonBrokerConnection(bp.broker)
983 bp.parent.returnErrors(pSet.msgs, block.Err)
984 } else {
985 retryTopics = append(retryTopics, topic)
986 }
987 // Other non-retriable errors
988 default:
989 if bp.parent.conf.Producer.Retry.Max <= 0 {
990 bp.parent.abandonBrokerConnection(bp.broker)
991 }
992 bp.parent.returnErrors(pSet.msgs, block.Err)
993 }
994 })
995
996 if len(retryTopics) > 0 {
997 if bp.parent.conf.Producer.Idempotent {
998 err := bp.parent.client.RefreshMetadata(retryTopics...)
999 if err != nil {
1000 Logger.Printf("Failed refreshing metadata because of %v\n", err)
1001 }
1002 }
1003
1004 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1005 block := response.GetBlock(topic, partition)
1006 if block == nil {
1007 // handled in the previous "eachPartition" loop
1008 return
1009 }
1010
1011 switch block.Err {
1012 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
1013 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
1014 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
1015 bp.broker.ID(), topic, partition, block.Err)
1016 if bp.currentRetries[topic] == nil {
1017 bp.currentRetries[topic] = make(map[int32]error)
1018 }
1019 bp.currentRetries[topic][partition] = block.Err
1020 if bp.parent.conf.Producer.Idempotent {
1021 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
1022 } else {
1023 bp.parent.retryMessages(pSet.msgs, block.Err)
1024 }
1025 // dropping the following messages has the side effect of incrementing their retry count
1026 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
1027 }
1028 })
1029 }
1030}
1031
1032func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
1033 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
1034 produceSet := newProduceSet(p)
1035 produceSet.msgs[topic] = make(map[int32]*partitionSet)
1036 produceSet.msgs[topic][partition] = pSet
1037 produceSet.bufferBytes += pSet.bufferBytes
1038 produceSet.bufferCount += len(pSet.msgs)
1039 for _, msg := range pSet.msgs {
1040 if msg.retries >= p.conf.Producer.Retry.Max {
1041 p.returnError(msg, kerr)
1042 return
1043 }
1044 msg.retries++
1045 }
1046
1047 // it's expected that a metadata refresh has been requested prior to calling retryBatch
1048 leader, err := p.client.Leader(topic, partition)
1049 if err != nil {
1050 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
1051 for _, msg := range pSet.msgs {
1052 p.returnError(msg, kerr)
1053 }
1054 return
1055 }
1056 bp := p.getBrokerProducer(leader)
1057 bp.output <- produceSet
1058}
1059
1060func (bp *brokerProducer) handleError(sent *produceSet, err error) {
1061 switch err.(type) {
1062 case PacketEncodingError:
1063 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1064 bp.parent.returnErrors(pSet.msgs, err)
1065 })
1066 default:
1067 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
1068 bp.parent.abandonBrokerConnection(bp.broker)
1069 _ = bp.broker.Close()
1070 bp.closing = err
1071 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1072 bp.parent.retryMessages(pSet.msgs, err)
1073 })
1074 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1075 bp.parent.retryMessages(pSet.msgs, err)
1076 })
1077 bp.rollOver()
1078 }
1079}
1080
1081// singleton
1082// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
1083// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
1084func (p *asyncProducer) retryHandler() {
1085 var msg *ProducerMessage
1086 buf := queue.New()
1087
1088 for {
1089 if buf.Length() == 0 {
1090 msg = <-p.retries
1091 } else {
1092 select {
1093 case msg = <-p.retries:
1094 case p.input <- buf.Peek().(*ProducerMessage):
1095 buf.Remove()
1096 continue
1097 }
1098 }
1099
1100 if msg == nil {
1101 return
1102 }
1103
1104 buf.Add(msg)
1105 }
1106}
1107
1108// utility functions
1109
1110func (p *asyncProducer) shutdown() {
1111 Logger.Println("Producer shutting down.")
1112 p.inFlight.Add(1)
1113 p.input <- &ProducerMessage{flags: shutdown}
1114
1115 p.inFlight.Wait()
1116
1117 err := p.client.Close()
1118 if err != nil {
1119 Logger.Println("producer/shutdown failed to close the embedded client:", err)
1120 }
1121
1122 close(p.input)
1123 close(p.retries)
1124 close(p.errors)
1125 close(p.successes)
1126}
1127
1128func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
kesavandc71914f2022-03-25 11:19:03 +05301129 // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
1130 // will never see a message with this number, so we can never continue the sequence.
1131 if msg.hasSequence {
1132 Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
1133 p.txnmgr.bumpEpoch()
1134 }
kesavand2cde6582020-06-22 04:56:23 -04001135 msg.clear()
1136 pErr := &ProducerError{Msg: msg, Err: err}
1137 if p.conf.Producer.Return.Errors {
1138 p.errors <- pErr
1139 } else {
1140 Logger.Println(pErr)
1141 }
1142 p.inFlight.Done()
1143}
1144
1145func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1146 for _, msg := range batch {
1147 p.returnError(msg, err)
1148 }
1149}
1150
1151func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1152 for _, msg := range batch {
1153 if p.conf.Producer.Return.Successes {
1154 msg.clear()
1155 p.successes <- msg
1156 }
1157 p.inFlight.Done()
1158 }
1159}
1160
1161func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1162 if msg.retries >= p.conf.Producer.Retry.Max {
1163 p.returnError(msg, err)
1164 } else {
1165 msg.retries++
1166 p.retries <- msg
1167 }
1168}
1169
1170func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1171 for _, msg := range batch {
1172 p.retryMessage(msg, err)
1173 }
1174}
1175
1176func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1177 p.brokerLock.Lock()
1178 defer p.brokerLock.Unlock()
1179
1180 bp := p.brokers[broker]
1181
1182 if bp == nil {
1183 bp = p.newBrokerProducer(broker)
1184 p.brokers[broker] = bp
1185 p.brokerRefs[bp] = 0
1186 }
1187
1188 p.brokerRefs[bp]++
1189
1190 return bp
1191}
1192
1193func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1194 p.brokerLock.Lock()
1195 defer p.brokerLock.Unlock()
1196
1197 p.brokerRefs[bp]--
1198 if p.brokerRefs[bp] == 0 {
1199 close(bp.input)
1200 delete(p.brokerRefs, bp)
1201
1202 if p.brokers[broker] == bp {
1203 delete(p.brokers, broker)
1204 }
1205 }
1206}
1207
1208func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1209 p.brokerLock.Lock()
1210 defer p.brokerLock.Unlock()
1211
1212 bc, ok := p.brokers[broker]
1213 if ok && bc.abandoned != nil {
1214 close(bc.abandoned)
1215 }
1216
1217 delete(p.brokers, broker)
1218}