blob: eedece6b4a97f5cc7f59ac6fa08051b7bda66773 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import "sync"
4
5// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
6// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
7// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
8//
9// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
10// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
11// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
12//
13// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
14// be set to true in its configuration.
15type SyncProducer interface {
16
17 // SendMessage produces a given message, and returns only when it either has
18 // succeeded or failed to produce. It will return the partition and the offset
19 // of the produced message, or an error if the message failed to produce.
20 SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
21
22 // SendMessages produces a given set of messages, and returns only when all
23 // messages in the set have either succeeded or failed. Note that messages
24 // can succeed and fail individually; if some succeed and some fail,
25 // SendMessages will return an error.
26 SendMessages(msgs []*ProducerMessage) error
27
kesavandc71914f2022-03-25 11:19:03 +053028 // Close shuts down the producer; you must call this function before a producer
29 // object passes out of scope, as it may otherwise leak memory.
30 // You must call this before calling Close on the underlying client.
kesavand2cde6582020-06-22 04:56:23 -040031 Close() error
32}
33
34type syncProducer struct {
35 producer *asyncProducer
36 wg sync.WaitGroup
37}
38
39// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
40func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
41 if config == nil {
42 config = NewConfig()
43 config.Producer.Return.Successes = true
44 }
45
46 if err := verifyProducerConfig(config); err != nil {
47 return nil, err
48 }
49
50 p, err := NewAsyncProducer(addrs, config)
51 if err != nil {
52 return nil, err
53 }
54 return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
55}
56
57// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
58// necessary to call Close() on the underlying client when shutting down this producer.
59func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
60 if err := verifyProducerConfig(client.Config()); err != nil {
61 return nil, err
62 }
63
64 p, err := NewAsyncProducerFromClient(client)
65 if err != nil {
66 return nil, err
67 }
68 return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
69}
70
71func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
72 sp := &syncProducer{producer: p}
73
74 sp.wg.Add(2)
75 go withRecover(sp.handleSuccesses)
76 go withRecover(sp.handleErrors)
77
78 return sp
79}
80
81func verifyProducerConfig(config *Config) error {
82 if !config.Producer.Return.Errors {
83 return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
84 }
85 if !config.Producer.Return.Successes {
86 return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
87 }
88 return nil
89}
90
91func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
92 expectation := make(chan *ProducerError, 1)
93 msg.expectation = expectation
94 sp.producer.Input() <- msg
95
kesavandc71914f2022-03-25 11:19:03 +053096 if pErr := <-expectation; pErr != nil {
97 return -1, -1, pErr.Err
kesavand2cde6582020-06-22 04:56:23 -040098 }
99
100 return msg.Partition, msg.Offset, nil
101}
102
103func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
104 expectations := make(chan chan *ProducerError, len(msgs))
105 go func() {
106 for _, msg := range msgs {
107 expectation := make(chan *ProducerError, 1)
108 msg.expectation = expectation
109 sp.producer.Input() <- msg
110 expectations <- expectation
111 }
112 close(expectations)
113 }()
114
115 var errors ProducerErrors
116 for expectation := range expectations {
kesavandc71914f2022-03-25 11:19:03 +0530117 if pErr := <-expectation; pErr != nil {
118 errors = append(errors, pErr)
kesavand2cde6582020-06-22 04:56:23 -0400119 }
120 }
121
122 if len(errors) > 0 {
123 return errors
124 }
125 return nil
126}
127
128func (sp *syncProducer) handleSuccesses() {
129 defer sp.wg.Done()
130 for msg := range sp.producer.Successes() {
131 expectation := msg.expectation
132 expectation <- nil
133 }
134}
135
136func (sp *syncProducer) handleErrors() {
137 defer sp.wg.Done()
138 for err := range sp.producer.Errors() {
139 expectation := err.Msg.expectation
140 expectation <- err
141 }
142}
143
144func (sp *syncProducer) Close() error {
145 sp.producer.AsyncClose()
146 sp.wg.Wait()
147 return nil
148}