blob: 5911f7b58cf84b6e4548d0aa6ff1f97589397458 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "encoding/binary"
5 "fmt"
6 "sync"
7 "time"
8
9 "github.com/eapache/go-resiliency/breaker"
10 "github.com/eapache/queue"
11)
12
13// AsyncProducer publishes Kafka messages using a non-blocking API. It routes messages
14// to the correct broker for the provided topic-partition, refreshing metadata as appropriate,
15// and parses responses for errors. You must read from the Errors() channel or the
16// producer will deadlock. You must call Close() or AsyncClose() on a producer to avoid
17// leaks: it will not be garbage-collected automatically when it passes out of
18// scope.
19type AsyncProducer interface {
20
21 // AsyncClose triggers a shutdown of the producer. The shutdown has completed
22 // when both the Errors and Successes channels have been closed. When calling
23 // AsyncClose, you *must* continue to read from those channels in order to
24 // drain the results of any messages in flight.
25 AsyncClose()
26
27 // Close shuts down the producer and waits for any buffered messages to be
28 // flushed. You must call this function before a producer object passes out of
29 // scope, as it may otherwise leak memory. You must call this before calling
30 // Close on the underlying client.
31 Close() error
32
33 // Input is the input channel for the user to write messages to that they
34 // wish to send.
35 Input() chan<- *ProducerMessage
36
37 // Successes is the success output channel back to the user when Return.Successes is
38 // enabled. If Return.Successes is true, you MUST read from this channel or the
39 // Producer will deadlock. It is suggested that you send and read messages
40 // together in a single select statement.
41 Successes() <-chan *ProducerMessage
42
43 // Errors is the error output channel back to the user. You MUST read from this
44 // channel or the Producer will deadlock when the channel is full. Alternatively,
45 // you can set Producer.Return.Errors in your config to false, which prevents
46 // errors to be returned.
47 Errors() <-chan *ProducerError
48}
49
50// transactionManager keeps the state necessary to ensure idempotent production
51type transactionManager struct {
52 producerID int64
53 producerEpoch int16
54 sequenceNumbers map[string]int32
55 mutex sync.Mutex
56}
57
58const (
59 noProducerID = -1
60 noProducerEpoch = -1
61)
62
khenaidood948f772021-08-11 17:49:24 -040063func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
khenaidooac637102019-01-14 15:44:34 -050064 key := fmt.Sprintf("%s-%d", topic, partition)
65 t.mutex.Lock()
66 defer t.mutex.Unlock()
67 sequence := t.sequenceNumbers[key]
68 t.sequenceNumbers[key] = sequence + 1
khenaidood948f772021-08-11 17:49:24 -040069 return sequence, t.producerEpoch
70}
71
72func (t *transactionManager) bumpEpoch() {
73 t.mutex.Lock()
74 defer t.mutex.Unlock()
75 t.producerEpoch++
76 for k := range t.sequenceNumbers {
77 t.sequenceNumbers[k] = 0
78 }
79}
80
81func (t *transactionManager) getProducerID() (int64, int16) {
82 t.mutex.Lock()
83 defer t.mutex.Unlock()
84 return t.producerID, t.producerEpoch
khenaidooac637102019-01-14 15:44:34 -050085}
86
87func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
88 txnmgr := &transactionManager{
89 producerID: noProducerID,
90 producerEpoch: noProducerEpoch,
91 }
92
93 if conf.Producer.Idempotent {
94 initProducerIDResponse, err := client.InitProducerID()
95 if err != nil {
96 return nil, err
97 }
98 txnmgr.producerID = initProducerIDResponse.ProducerID
99 txnmgr.producerEpoch = initProducerIDResponse.ProducerEpoch
100 txnmgr.sequenceNumbers = make(map[string]int32)
101 txnmgr.mutex = sync.Mutex{}
102
103 Logger.Printf("Obtained a ProducerId: %d and ProducerEpoch: %d\n", txnmgr.producerID, txnmgr.producerEpoch)
104 }
105
106 return txnmgr, nil
107}
108
109type asyncProducer struct {
Scott Baker8461e152019-10-01 14:44:30 -0700110 client Client
111 conf *Config
khenaidooac637102019-01-14 15:44:34 -0500112
113 errors chan *ProducerError
114 input, successes, retries chan *ProducerMessage
115 inFlight sync.WaitGroup
116
117 brokers map[*Broker]*brokerProducer
118 brokerRefs map[*brokerProducer]int
119 brokerLock sync.Mutex
120
121 txnmgr *transactionManager
122}
123
124// NewAsyncProducer creates a new AsyncProducer using the given broker addresses and configuration.
125func NewAsyncProducer(addrs []string, conf *Config) (AsyncProducer, error) {
126 client, err := NewClient(addrs, conf)
127 if err != nil {
128 return nil, err
129 }
Scott Baker8461e152019-10-01 14:44:30 -0700130 return newAsyncProducer(client)
khenaidooac637102019-01-14 15:44:34 -0500131}
132
133// NewAsyncProducerFromClient creates a new Producer using the given client. It is still
134// necessary to call Close() on the underlying client when shutting down this producer.
135func NewAsyncProducerFromClient(client Client) (AsyncProducer, error) {
Scott Baker8461e152019-10-01 14:44:30 -0700136 // For clients passed in by the client, ensure we don't
137 // call Close() on it.
138 cli := &nopCloserClient{client}
139 return newAsyncProducer(cli)
140}
141
142func newAsyncProducer(client Client) (AsyncProducer, error) {
khenaidooac637102019-01-14 15:44:34 -0500143 // Check that we are not dealing with a closed Client before processing any other arguments
144 if client.Closed() {
145 return nil, ErrClosedClient
146 }
147
148 txnmgr, err := newTransactionManager(client.Config(), client)
149 if err != nil {
150 return nil, err
151 }
152
153 p := &asyncProducer{
154 client: client,
155 conf: client.Config(),
156 errors: make(chan *ProducerError),
157 input: make(chan *ProducerMessage),
158 successes: make(chan *ProducerMessage),
159 retries: make(chan *ProducerMessage),
160 brokers: make(map[*Broker]*brokerProducer),
161 brokerRefs: make(map[*brokerProducer]int),
162 txnmgr: txnmgr,
163 }
164
165 // launch our singleton dispatchers
166 go withRecover(p.dispatcher)
167 go withRecover(p.retryHandler)
168
169 return p, nil
170}
171
172type flagSet int8
173
174const (
175 syn flagSet = 1 << iota // first message from partitionProducer to brokerProducer
176 fin // final message from partitionProducer to brokerProducer and back
177 shutdown // start the shutdown process
178)
179
180// ProducerMessage is the collection of elements passed to the Producer in order to send a message.
181type ProducerMessage struct {
182 Topic string // The Kafka topic for this message.
183 // The partitioning key for this message. Pre-existing Encoders include
184 // StringEncoder and ByteEncoder.
185 Key Encoder
186 // The actual message to store in Kafka. Pre-existing Encoders include
187 // StringEncoder and ByteEncoder.
188 Value Encoder
189
190 // The headers are key-value pairs that are transparently passed
191 // by Kafka between producers and consumers.
192 Headers []RecordHeader
193
194 // This field is used to hold arbitrary data you wish to include so it
195 // will be available when receiving on the Successes and Errors channels.
196 // Sarama completely ignores this field and is only to be used for
197 // pass-through data.
198 Metadata interface{}
199
200 // Below this point are filled in by the producer as the message is processed
201
202 // Offset is the offset of the message stored on the broker. This is only
203 // guaranteed to be defined if the message was successfully delivered and
204 // RequiredAcks is not NoResponse.
205 Offset int64
206 // Partition is the partition that the message was sent to. This is only
207 // guaranteed to be defined if the message was successfully delivered.
208 Partition int32
Scott Baker8461e152019-10-01 14:44:30 -0700209 // Timestamp can vary in behaviour depending on broker configuration, being
210 // in either one of the CreateTime or LogAppendTime modes (default CreateTime),
211 // and requiring version at least 0.10.0.
212 //
213 // When configured to CreateTime, the timestamp is specified by the producer
214 // either by explicitly setting this field, or when the message is added
215 // to a produce set.
216 //
217 // When configured to LogAppendTime, the timestamp assigned to the message
218 // by the broker. This is only guaranteed to be defined if the message was
219 // successfully delivered and RequiredAcks is not NoResponse.
khenaidooac637102019-01-14 15:44:34 -0500220 Timestamp time.Time
221
222 retries int
223 flags flagSet
224 expectation chan *ProducerError
225 sequenceNumber int32
khenaidood948f772021-08-11 17:49:24 -0400226 producerEpoch int16
227 hasSequence bool
khenaidooac637102019-01-14 15:44:34 -0500228}
229
230const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
231
232func (m *ProducerMessage) byteSize(version int) int {
233 var size int
234 if version >= 2 {
235 size = maximumRecordOverhead
236 for _, h := range m.Headers {
237 size += len(h.Key) + len(h.Value) + 2*binary.MaxVarintLen32
238 }
239 } else {
240 size = producerMessageOverhead
241 }
242 if m.Key != nil {
243 size += m.Key.Length()
244 }
245 if m.Value != nil {
246 size += m.Value.Length()
247 }
248 return size
249}
250
251func (m *ProducerMessage) clear() {
252 m.flags = 0
253 m.retries = 0
khenaidood948f772021-08-11 17:49:24 -0400254 m.sequenceNumber = 0
255 m.producerEpoch = 0
256 m.hasSequence = false
khenaidooac637102019-01-14 15:44:34 -0500257}
258
259// ProducerError is the type of error generated when the producer fails to deliver a message.
260// It contains the original ProducerMessage as well as the actual error value.
261type ProducerError struct {
262 Msg *ProducerMessage
263 Err error
264}
265
266func (pe ProducerError) Error() string {
267 return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
268}
269
khenaidood948f772021-08-11 17:49:24 -0400270func (pe ProducerError) Unwrap() error {
271 return pe.Err
272}
273
khenaidooac637102019-01-14 15:44:34 -0500274// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
275// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
276// when closing a producer.
277type ProducerErrors []*ProducerError
278
279func (pe ProducerErrors) Error() string {
280 return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
281}
282
283func (p *asyncProducer) Errors() <-chan *ProducerError {
284 return p.errors
285}
286
287func (p *asyncProducer) Successes() <-chan *ProducerMessage {
288 return p.successes
289}
290
291func (p *asyncProducer) Input() chan<- *ProducerMessage {
292 return p.input
293}
294
295func (p *asyncProducer) Close() error {
296 p.AsyncClose()
297
298 if p.conf.Producer.Return.Successes {
299 go withRecover(func() {
300 for range p.successes {
301 }
302 })
303 }
304
305 var errors ProducerErrors
306 if p.conf.Producer.Return.Errors {
307 for event := range p.errors {
308 errors = append(errors, event)
309 }
310 } else {
311 <-p.errors
312 }
313
314 if len(errors) > 0 {
315 return errors
316 }
317 return nil
318}
319
320func (p *asyncProducer) AsyncClose() {
321 go withRecover(p.shutdown)
322}
323
324// singleton
325// dispatches messages by topic
326func (p *asyncProducer) dispatcher() {
327 handlers := make(map[string]chan<- *ProducerMessage)
328 shuttingDown := false
329
330 for msg := range p.input {
331 if msg == nil {
332 Logger.Println("Something tried to send a nil message, it was ignored.")
333 continue
334 }
335
336 if msg.flags&shutdown != 0 {
337 shuttingDown = true
338 p.inFlight.Done()
339 continue
340 } else if msg.retries == 0 {
341 if shuttingDown {
342 // we can't just call returnError here because that decrements the wait group,
343 // which hasn't been incremented yet for this message, and shouldn't be
344 pErr := &ProducerError{Msg: msg, Err: ErrShuttingDown}
345 if p.conf.Producer.Return.Errors {
346 p.errors <- pErr
347 } else {
348 Logger.Println(pErr)
349 }
350 continue
351 }
352 p.inFlight.Add(1)
353 }
354
khenaidood948f772021-08-11 17:49:24 -0400355 for _, interceptor := range p.conf.Producer.Interceptors {
356 msg.safelyApplyInterceptor(interceptor)
357 }
358
khenaidooac637102019-01-14 15:44:34 -0500359 version := 1
360 if p.conf.Version.IsAtLeast(V0_11_0_0) {
361 version = 2
362 } else if msg.Headers != nil {
363 p.returnError(msg, ConfigurationError("Producing headers requires Kafka at least v0.11"))
364 continue
365 }
366 if msg.byteSize(version) > p.conf.Producer.MaxMessageBytes {
367 p.returnError(msg, ErrMessageSizeTooLarge)
368 continue
369 }
370
371 handler := handlers[msg.Topic]
372 if handler == nil {
373 handler = p.newTopicProducer(msg.Topic)
374 handlers[msg.Topic] = handler
375 }
376
377 handler <- msg
378 }
379
380 for _, handler := range handlers {
381 close(handler)
382 }
383}
384
385// one per topic
386// partitions messages, then dispatches them by partition
387type topicProducer struct {
388 parent *asyncProducer
389 topic string
390 input <-chan *ProducerMessage
391
392 breaker *breaker.Breaker
393 handlers map[int32]chan<- *ProducerMessage
394 partitioner Partitioner
395}
396
397func (p *asyncProducer) newTopicProducer(topic string) chan<- *ProducerMessage {
398 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
399 tp := &topicProducer{
400 parent: p,
401 topic: topic,
402 input: input,
403 breaker: breaker.New(3, 1, 10*time.Second),
404 handlers: make(map[int32]chan<- *ProducerMessage),
405 partitioner: p.conf.Producer.Partitioner(topic),
406 }
407 go withRecover(tp.dispatch)
408 return input
409}
410
411func (tp *topicProducer) dispatch() {
412 for msg := range tp.input {
413 if msg.retries == 0 {
414 if err := tp.partitionMessage(msg); err != nil {
415 tp.parent.returnError(msg, err)
416 continue
417 }
418 }
khenaidooac637102019-01-14 15:44:34 -0500419
420 handler := tp.handlers[msg.Partition]
421 if handler == nil {
422 handler = tp.parent.newPartitionProducer(msg.Topic, msg.Partition)
423 tp.handlers[msg.Partition] = handler
424 }
425
426 handler <- msg
427 }
428
429 for _, handler := range tp.handlers {
430 close(handler)
431 }
432}
433
434func (tp *topicProducer) partitionMessage(msg *ProducerMessage) error {
435 var partitions []int32
436
437 err := tp.breaker.Run(func() (err error) {
khenaidood948f772021-08-11 17:49:24 -0400438 requiresConsistency := false
khenaidooac637102019-01-14 15:44:34 -0500439 if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
440 requiresConsistency = ep.MessageRequiresConsistency(msg)
441 } else {
442 requiresConsistency = tp.partitioner.RequiresConsistency()
443 }
444
445 if requiresConsistency {
446 partitions, err = tp.parent.client.Partitions(msg.Topic)
447 } else {
448 partitions, err = tp.parent.client.WritablePartitions(msg.Topic)
449 }
450 return
451 })
khenaidooac637102019-01-14 15:44:34 -0500452 if err != nil {
453 return err
454 }
455
456 numPartitions := int32(len(partitions))
457
458 if numPartitions == 0 {
459 return ErrLeaderNotAvailable
460 }
461
462 choice, err := tp.partitioner.Partition(msg, numPartitions)
463
464 if err != nil {
465 return err
466 } else if choice < 0 || choice >= numPartitions {
467 return ErrInvalidPartition
468 }
469
470 msg.Partition = partitions[choice]
471
472 return nil
473}
474
475// one per partition per topic
476// dispatches messages to the appropriate broker
477// also responsible for maintaining message order during retries
478type partitionProducer struct {
479 parent *asyncProducer
480 topic string
481 partition int32
482 input <-chan *ProducerMessage
483
484 leader *Broker
485 breaker *breaker.Breaker
486 brokerProducer *brokerProducer
487
488 // highWatermark tracks the "current" retry level, which is the only one where we actually let messages through,
489 // all other messages get buffered in retryState[msg.retries].buf to preserve ordering
490 // retryState[msg.retries].expectChaser simply tracks whether we've seen a fin message for a given level (and
491 // therefore whether our buffer is complete and safe to flush)
492 highWatermark int
493 retryState []partitionRetryState
494}
495
496type partitionRetryState struct {
497 buf []*ProducerMessage
498 expectChaser bool
499}
500
501func (p *asyncProducer) newPartitionProducer(topic string, partition int32) chan<- *ProducerMessage {
502 input := make(chan *ProducerMessage, p.conf.ChannelBufferSize)
503 pp := &partitionProducer{
504 parent: p,
505 topic: topic,
506 partition: partition,
507 input: input,
508
509 breaker: breaker.New(3, 1, 10*time.Second),
510 retryState: make([]partitionRetryState, p.conf.Producer.Retry.Max+1),
511 }
512 go withRecover(pp.dispatch)
513 return input
514}
515
William Kurkiandaa6bb22019-03-07 12:26:28 -0500516func (pp *partitionProducer) backoff(retries int) {
517 var backoff time.Duration
518 if pp.parent.conf.Producer.Retry.BackoffFunc != nil {
519 maxRetries := pp.parent.conf.Producer.Retry.Max
520 backoff = pp.parent.conf.Producer.Retry.BackoffFunc(retries, maxRetries)
521 } else {
522 backoff = pp.parent.conf.Producer.Retry.Backoff
523 }
524 if backoff > 0 {
525 time.Sleep(backoff)
526 }
527}
528
khenaidooac637102019-01-14 15:44:34 -0500529func (pp *partitionProducer) dispatch() {
530 // try to prefetch the leader; if this doesn't work, we'll do a proper call to `updateLeader`
531 // on the first message
532 pp.leader, _ = pp.parent.client.Leader(pp.topic, pp.partition)
533 if pp.leader != nil {
534 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
535 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
536 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
537 }
538
William Kurkiandaa6bb22019-03-07 12:26:28 -0500539 defer func() {
540 if pp.brokerProducer != nil {
541 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
542 }
543 }()
544
khenaidooac637102019-01-14 15:44:34 -0500545 for msg := range pp.input {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500546 if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
547 select {
548 case <-pp.brokerProducer.abandoned:
549 // a message on the abandoned channel means that our current broker selection is out of date
550 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
551 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
552 pp.brokerProducer = nil
553 time.Sleep(pp.parent.conf.Producer.Retry.Backoff)
554 default:
555 // producer connection is still open.
556 }
557 }
558
khenaidooac637102019-01-14 15:44:34 -0500559 if msg.retries > pp.highWatermark {
560 // a new, higher, retry level; handle it and then back off
561 pp.newHighWatermark(msg.retries)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500562 pp.backoff(msg.retries)
khenaidooac637102019-01-14 15:44:34 -0500563 } else if pp.highWatermark > 0 {
564 // we are retrying something (else highWatermark would be 0) but this message is not a *new* retry level
565 if msg.retries < pp.highWatermark {
566 // in fact this message is not even the current retry level, so buffer it for now (unless it's a just a fin)
567 if msg.flags&fin == fin {
568 pp.retryState[msg.retries].expectChaser = false
569 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
570 } else {
571 pp.retryState[msg.retries].buf = append(pp.retryState[msg.retries].buf, msg)
572 }
573 continue
574 } else if msg.flags&fin == fin {
575 // this message is of the current retry level (msg.retries == highWatermark) and the fin flag is set,
576 // meaning this retry level is done and we can go down (at least) one level and flush that
577 pp.retryState[pp.highWatermark].expectChaser = false
578 pp.flushRetryBuffers()
579 pp.parent.inFlight.Done() // this fin is now handled and will be garbage collected
580 continue
581 }
582 }
583
584 // if we made it this far then the current msg contains real data, and can be sent to the next goroutine
585 // without breaking any of our ordering guarantees
586
587 if pp.brokerProducer == nil {
588 if err := pp.updateLeader(); err != nil {
589 pp.parent.returnError(msg, err)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500590 pp.backoff(msg.retries)
khenaidooac637102019-01-14 15:44:34 -0500591 continue
592 }
593 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
594 }
595
khenaidood948f772021-08-11 17:49:24 -0400596 // Now that we know we have a broker to actually try and send this message to, generate the sequence
597 // number for it.
598 // All messages being retried (sent or not) have already had their retry count updated
599 // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
600 if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
601 msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
602 msg.hasSequence = true
603 }
604
khenaidooac637102019-01-14 15:44:34 -0500605 pp.brokerProducer.input <- msg
606 }
khenaidooac637102019-01-14 15:44:34 -0500607}
608
609func (pp *partitionProducer) newHighWatermark(hwm int) {
610 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, hwm)
611 pp.highWatermark = hwm
612
613 // send off a fin so that we know when everything "in between" has made it
614 // back to us and we can safely flush the backlog (otherwise we risk re-ordering messages)
615 pp.retryState[pp.highWatermark].expectChaser = true
616 pp.parent.inFlight.Add(1) // we're generating a fin message; track it so we don't shut down while it's still inflight
617 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: fin, retries: pp.highWatermark - 1}
618
619 // a new HWM means that our current broker selection is out of date
620 Logger.Printf("producer/leader/%s/%d abandoning broker %d\n", pp.topic, pp.partition, pp.leader.ID())
621 pp.parent.unrefBrokerProducer(pp.leader, pp.brokerProducer)
622 pp.brokerProducer = nil
623}
624
625func (pp *partitionProducer) flushRetryBuffers() {
626 Logger.Printf("producer/leader/%s/%d state change to [flushing-%d]\n", pp.topic, pp.partition, pp.highWatermark)
627 for {
628 pp.highWatermark--
629
630 if pp.brokerProducer == nil {
631 if err := pp.updateLeader(); err != nil {
632 pp.parent.returnErrors(pp.retryState[pp.highWatermark].buf, err)
633 goto flushDone
634 }
635 Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
636 }
637
638 for _, msg := range pp.retryState[pp.highWatermark].buf {
639 pp.brokerProducer.input <- msg
640 }
641
642 flushDone:
643 pp.retryState[pp.highWatermark].buf = nil
644 if pp.retryState[pp.highWatermark].expectChaser {
645 Logger.Printf("producer/leader/%s/%d state change to [retrying-%d]\n", pp.topic, pp.partition, pp.highWatermark)
646 break
647 } else if pp.highWatermark == 0 {
648 Logger.Printf("producer/leader/%s/%d state change to [normal]\n", pp.topic, pp.partition)
649 break
650 }
651 }
652}
653
654func (pp *partitionProducer) updateLeader() error {
655 return pp.breaker.Run(func() (err error) {
656 if err = pp.parent.client.RefreshMetadata(pp.topic); err != nil {
657 return err
658 }
659
660 if pp.leader, err = pp.parent.client.Leader(pp.topic, pp.partition); err != nil {
661 return err
662 }
663
664 pp.brokerProducer = pp.parent.getBrokerProducer(pp.leader)
665 pp.parent.inFlight.Add(1) // we're generating a syn message; track it so we don't shut down while it's still inflight
666 pp.brokerProducer.input <- &ProducerMessage{Topic: pp.topic, Partition: pp.partition, flags: syn}
667
668 return nil
669 })
670}
671
672// one per broker; also constructs an associated flusher
673func (p *asyncProducer) newBrokerProducer(broker *Broker) *brokerProducer {
674 var (
675 input = make(chan *ProducerMessage)
676 bridge = make(chan *produceSet)
677 responses = make(chan *brokerProducerResponse)
678 )
679
680 bp := &brokerProducer{
681 parent: p,
682 broker: broker,
683 input: input,
684 output: bridge,
685 responses: responses,
khenaidood948f772021-08-11 17:49:24 -0400686 stopchan: make(chan struct{}),
khenaidooac637102019-01-14 15:44:34 -0500687 buffer: newProduceSet(p),
688 currentRetries: make(map[string]map[int32]error),
689 }
690 go withRecover(bp.run)
691
692 // minimal bridge to make the network response `select`able
693 go withRecover(func() {
694 for set := range bridge {
695 request := set.buildRequest()
696
697 response, err := broker.Produce(request)
698
699 responses <- &brokerProducerResponse{
700 set: set,
701 err: err,
702 res: response,
703 }
704 }
705 close(responses)
706 })
707
William Kurkiandaa6bb22019-03-07 12:26:28 -0500708 if p.conf.Producer.Retry.Max <= 0 {
709 bp.abandoned = make(chan struct{})
710 }
711
khenaidooac637102019-01-14 15:44:34 -0500712 return bp
713}
714
715type brokerProducerResponse struct {
716 set *produceSet
717 err error
718 res *ProduceResponse
719}
720
721// groups messages together into appropriately-sized batches for sending to the broker
722// handles state related to retries etc
723type brokerProducer struct {
724 parent *asyncProducer
725 broker *Broker
726
727 input chan *ProducerMessage
728 output chan<- *produceSet
729 responses <-chan *brokerProducerResponse
William Kurkiandaa6bb22019-03-07 12:26:28 -0500730 abandoned chan struct{}
khenaidood948f772021-08-11 17:49:24 -0400731 stopchan chan struct{}
khenaidooac637102019-01-14 15:44:34 -0500732
733 buffer *produceSet
734 timer <-chan time.Time
735 timerFired bool
736
737 closing error
738 currentRetries map[string]map[int32]error
739}
740
741func (bp *brokerProducer) run() {
742 var output chan<- *produceSet
743 Logger.Printf("producer/broker/%d starting up\n", bp.broker.ID())
744
745 for {
746 select {
khenaidood948f772021-08-11 17:49:24 -0400747 case msg, ok := <-bp.input:
748 if !ok {
749 Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
khenaidooac637102019-01-14 15:44:34 -0500750 bp.shutdown()
751 return
752 }
753
khenaidood948f772021-08-11 17:49:24 -0400754 if msg == nil {
755 continue
756 }
757
khenaidooac637102019-01-14 15:44:34 -0500758 if msg.flags&syn == syn {
759 Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
760 bp.broker.ID(), msg.Topic, msg.Partition)
761 if bp.currentRetries[msg.Topic] == nil {
762 bp.currentRetries[msg.Topic] = make(map[int32]error)
763 }
764 bp.currentRetries[msg.Topic][msg.Partition] = nil
765 bp.parent.inFlight.Done()
766 continue
767 }
768
769 if reason := bp.needsRetry(msg); reason != nil {
770 bp.parent.retryMessage(msg, reason)
771
772 if bp.closing == nil && msg.flags&fin == fin {
773 // we were retrying this partition but we can start processing again
774 delete(bp.currentRetries[msg.Topic], msg.Partition)
775 Logger.Printf("producer/broker/%d state change to [closed] on %s/%d\n",
776 bp.broker.ID(), msg.Topic, msg.Partition)
777 }
778
779 continue
780 }
781
782 if bp.buffer.wouldOverflow(msg) {
khenaidood948f772021-08-11 17:49:24 -0400783 Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
784 if err := bp.waitForSpace(msg, false); err != nil {
khenaidooac637102019-01-14 15:44:34 -0500785 bp.parent.retryMessage(msg, err)
786 continue
787 }
788 }
789
khenaidood948f772021-08-11 17:49:24 -0400790 if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
791 // The epoch was reset, need to roll the buffer over
792 Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
793 if err := bp.waitForSpace(msg, true); err != nil {
794 bp.parent.retryMessage(msg, err)
795 continue
796 }
797 }
khenaidooac637102019-01-14 15:44:34 -0500798 if err := bp.buffer.add(msg); err != nil {
799 bp.parent.returnError(msg, err)
800 continue
801 }
802
803 if bp.parent.conf.Producer.Flush.Frequency > 0 && bp.timer == nil {
804 bp.timer = time.After(bp.parent.conf.Producer.Flush.Frequency)
805 }
806 case <-bp.timer:
807 bp.timerFired = true
808 case output <- bp.buffer:
809 bp.rollOver()
khenaidood948f772021-08-11 17:49:24 -0400810 case response, ok := <-bp.responses:
811 if ok {
812 bp.handleResponse(response)
813 }
814 case <-bp.stopchan:
815 Logger.Printf(
816 "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
817 return
khenaidooac637102019-01-14 15:44:34 -0500818 }
819
820 if bp.timerFired || bp.buffer.readyToFlush() {
821 output = bp.output
822 } else {
823 output = nil
824 }
825 }
826}
827
828func (bp *brokerProducer) shutdown() {
829 for !bp.buffer.empty() {
830 select {
831 case response := <-bp.responses:
832 bp.handleResponse(response)
833 case bp.output <- bp.buffer:
834 bp.rollOver()
835 }
836 }
837 close(bp.output)
838 for response := range bp.responses {
839 bp.handleResponse(response)
840 }
khenaidood948f772021-08-11 17:49:24 -0400841 close(bp.stopchan)
khenaidooac637102019-01-14 15:44:34 -0500842 Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
843}
844
845func (bp *brokerProducer) needsRetry(msg *ProducerMessage) error {
846 if bp.closing != nil {
847 return bp.closing
848 }
849
850 return bp.currentRetries[msg.Topic][msg.Partition]
851}
852
khenaidood948f772021-08-11 17:49:24 -0400853func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
khenaidooac637102019-01-14 15:44:34 -0500854 for {
855 select {
856 case response := <-bp.responses:
857 bp.handleResponse(response)
858 // handling a response can change our state, so re-check some things
859 if reason := bp.needsRetry(msg); reason != nil {
860 return reason
khenaidood948f772021-08-11 17:49:24 -0400861 } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
khenaidooac637102019-01-14 15:44:34 -0500862 return nil
863 }
864 case bp.output <- bp.buffer:
865 bp.rollOver()
866 return nil
867 }
868 }
869}
870
871func (bp *brokerProducer) rollOver() {
872 bp.timer = nil
873 bp.timerFired = false
874 bp.buffer = newProduceSet(bp.parent)
875}
876
877func (bp *brokerProducer) handleResponse(response *brokerProducerResponse) {
878 if response.err != nil {
879 bp.handleError(response.set, response.err)
880 } else {
881 bp.handleSuccess(response.set, response.res)
882 }
883
884 if bp.buffer.empty() {
885 bp.rollOver() // this can happen if the response invalidated our buffer
886 }
887}
888
889func (bp *brokerProducer) handleSuccess(sent *produceSet, response *ProduceResponse) {
890 // we iterate through the blocks in the request set, not the response, so that we notice
891 // if the response is missing a block completely
892 var retryTopics []string
893 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
894 if response == nil {
895 // this only happens when RequiredAcks is NoResponse, so we have to assume success
896 bp.parent.returnSuccesses(pSet.msgs)
897 return
898 }
899
900 block := response.GetBlock(topic, partition)
901 if block == nil {
902 bp.parent.returnErrors(pSet.msgs, ErrIncompleteResponse)
903 return
904 }
905
906 switch block.Err {
907 // Success
908 case ErrNoError:
909 if bp.parent.conf.Version.IsAtLeast(V0_10_0_0) && !block.Timestamp.IsZero() {
910 for _, msg := range pSet.msgs {
911 msg.Timestamp = block.Timestamp
912 }
913 }
914 for i, msg := range pSet.msgs {
915 msg.Offset = block.Offset + int64(i)
916 }
917 bp.parent.returnSuccesses(pSet.msgs)
918 // Duplicate
919 case ErrDuplicateSequenceNumber:
920 bp.parent.returnSuccesses(pSet.msgs)
921 // Retriable errors
922 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
923 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500924 if bp.parent.conf.Producer.Retry.Max <= 0 {
925 bp.parent.abandonBrokerConnection(bp.broker)
926 bp.parent.returnErrors(pSet.msgs, block.Err)
927 } else {
928 retryTopics = append(retryTopics, topic)
929 }
khenaidooac637102019-01-14 15:44:34 -0500930 // Other non-retriable errors
931 default:
William Kurkiandaa6bb22019-03-07 12:26:28 -0500932 if bp.parent.conf.Producer.Retry.Max <= 0 {
933 bp.parent.abandonBrokerConnection(bp.broker)
934 }
khenaidooac637102019-01-14 15:44:34 -0500935 bp.parent.returnErrors(pSet.msgs, block.Err)
936 }
937 })
938
939 if len(retryTopics) > 0 {
940 if bp.parent.conf.Producer.Idempotent {
941 err := bp.parent.client.RefreshMetadata(retryTopics...)
942 if err != nil {
943 Logger.Printf("Failed refreshing metadata because of %v\n", err)
944 }
945 }
946
947 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
948 block := response.GetBlock(topic, partition)
949 if block == nil {
950 // handled in the previous "eachPartition" loop
951 return
952 }
953
954 switch block.Err {
955 case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
956 ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
957 Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
958 bp.broker.ID(), topic, partition, block.Err)
959 if bp.currentRetries[topic] == nil {
960 bp.currentRetries[topic] = make(map[int32]error)
961 }
962 bp.currentRetries[topic][partition] = block.Err
963 if bp.parent.conf.Producer.Idempotent {
964 go bp.parent.retryBatch(topic, partition, pSet, block.Err)
965 } else {
966 bp.parent.retryMessages(pSet.msgs, block.Err)
967 }
968 // dropping the following messages has the side effect of incrementing their retry count
969 bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
970 }
971 })
972 }
973}
974
975func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
976 Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
977 produceSet := newProduceSet(p)
978 produceSet.msgs[topic] = make(map[int32]*partitionSet)
979 produceSet.msgs[topic][partition] = pSet
980 produceSet.bufferBytes += pSet.bufferBytes
981 produceSet.bufferCount += len(pSet.msgs)
982 for _, msg := range pSet.msgs {
983 if msg.retries >= p.conf.Producer.Retry.Max {
984 p.returnError(msg, kerr)
985 return
986 }
987 msg.retries++
988 }
989
990 // it's expected that a metadata refresh has been requested prior to calling retryBatch
991 leader, err := p.client.Leader(topic, partition)
992 if err != nil {
993 Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
994 for _, msg := range pSet.msgs {
995 p.returnError(msg, kerr)
996 }
997 return
998 }
999 bp := p.getBrokerProducer(leader)
1000 bp.output <- produceSet
1001}
1002
1003func (bp *brokerProducer) handleError(sent *produceSet, err error) {
1004 switch err.(type) {
1005 case PacketEncodingError:
1006 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1007 bp.parent.returnErrors(pSet.msgs, err)
1008 })
1009 default:
1010 Logger.Printf("producer/broker/%d state change to [closing] because %s\n", bp.broker.ID(), err)
1011 bp.parent.abandonBrokerConnection(bp.broker)
1012 _ = bp.broker.Close()
1013 bp.closing = err
1014 sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1015 bp.parent.retryMessages(pSet.msgs, err)
1016 })
1017 bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
1018 bp.parent.retryMessages(pSet.msgs, err)
1019 })
1020 bp.rollOver()
1021 }
1022}
1023
1024// singleton
1025// effectively a "bridge" between the flushers and the dispatcher in order to avoid deadlock
1026// based on https://godoc.org/github.com/eapache/channels#InfiniteChannel
1027func (p *asyncProducer) retryHandler() {
1028 var msg *ProducerMessage
1029 buf := queue.New()
1030
1031 for {
1032 if buf.Length() == 0 {
1033 msg = <-p.retries
1034 } else {
1035 select {
1036 case msg = <-p.retries:
1037 case p.input <- buf.Peek().(*ProducerMessage):
1038 buf.Remove()
1039 continue
1040 }
1041 }
1042
1043 if msg == nil {
1044 return
1045 }
1046
1047 buf.Add(msg)
1048 }
1049}
1050
1051// utility functions
1052
1053func (p *asyncProducer) shutdown() {
1054 Logger.Println("Producer shutting down.")
1055 p.inFlight.Add(1)
1056 p.input <- &ProducerMessage{flags: shutdown}
1057
1058 p.inFlight.Wait()
1059
Scott Baker8461e152019-10-01 14:44:30 -07001060 err := p.client.Close()
1061 if err != nil {
1062 Logger.Println("producer/shutdown failed to close the embedded client:", err)
khenaidooac637102019-01-14 15:44:34 -05001063 }
1064
1065 close(p.input)
1066 close(p.retries)
1067 close(p.errors)
1068 close(p.successes)
1069}
1070
1071func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
khenaidood948f772021-08-11 17:49:24 -04001072 // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
1073 // will never see a message with this number, so we can never continue the sequence.
1074 if msg.hasSequence {
1075 Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
1076 p.txnmgr.bumpEpoch()
1077 }
khenaidooac637102019-01-14 15:44:34 -05001078 msg.clear()
1079 pErr := &ProducerError{Msg: msg, Err: err}
1080 if p.conf.Producer.Return.Errors {
1081 p.errors <- pErr
1082 } else {
1083 Logger.Println(pErr)
1084 }
1085 p.inFlight.Done()
1086}
1087
1088func (p *asyncProducer) returnErrors(batch []*ProducerMessage, err error) {
1089 for _, msg := range batch {
1090 p.returnError(msg, err)
1091 }
1092}
1093
1094func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
1095 for _, msg := range batch {
1096 if p.conf.Producer.Return.Successes {
1097 msg.clear()
1098 p.successes <- msg
1099 }
1100 p.inFlight.Done()
1101 }
1102}
1103
1104func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
1105 if msg.retries >= p.conf.Producer.Retry.Max {
1106 p.returnError(msg, err)
1107 } else {
1108 msg.retries++
1109 p.retries <- msg
1110 }
1111}
1112
1113func (p *asyncProducer) retryMessages(batch []*ProducerMessage, err error) {
1114 for _, msg := range batch {
1115 p.retryMessage(msg, err)
1116 }
1117}
1118
1119func (p *asyncProducer) getBrokerProducer(broker *Broker) *brokerProducer {
1120 p.brokerLock.Lock()
1121 defer p.brokerLock.Unlock()
1122
1123 bp := p.brokers[broker]
1124
1125 if bp == nil {
1126 bp = p.newBrokerProducer(broker)
1127 p.brokers[broker] = bp
1128 p.brokerRefs[bp] = 0
1129 }
1130
1131 p.brokerRefs[bp]++
1132
1133 return bp
1134}
1135
1136func (p *asyncProducer) unrefBrokerProducer(broker *Broker, bp *brokerProducer) {
1137 p.brokerLock.Lock()
1138 defer p.brokerLock.Unlock()
1139
1140 p.brokerRefs[bp]--
1141 if p.brokerRefs[bp] == 0 {
1142 close(bp.input)
1143 delete(p.brokerRefs, bp)
1144
1145 if p.brokers[broker] == bp {
1146 delete(p.brokers, broker)
1147 }
1148 }
1149}
1150
1151func (p *asyncProducer) abandonBrokerConnection(broker *Broker) {
1152 p.brokerLock.Lock()
1153 defer p.brokerLock.Unlock()
1154
William Kurkiandaa6bb22019-03-07 12:26:28 -05001155 bc, ok := p.brokers[broker]
1156 if ok && bc.abandoned != nil {
1157 close(bc.abandoned)
1158 }
1159
khenaidooac637102019-01-14 15:44:34 -05001160 delete(p.brokers, broker)
1161}