blob: 021c5a010323e64da764a4ede5dfc43e4f0834b4 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import "sync"
4
5// SyncProducer publishes Kafka messages, blocking until they have been acknowledged. It routes messages to the correct
6// broker, refreshing metadata as appropriate, and parses responses for errors. You must call Close() on a producer
7// to avoid leaks, it may not be garbage-collected automatically when it passes out of scope.
8//
9// The SyncProducer comes with two caveats: it will generally be less efficient than the AsyncProducer, and the actual
10// durability guarantee provided when a message is acknowledged depend on the configured value of `Producer.RequiredAcks`.
11// There are configurations where a message acknowledged by the SyncProducer can still sometimes be lost.
12//
13// For implementation reasons, the SyncProducer requires `Producer.Return.Errors` and `Producer.Return.Successes` to
14// be set to true in its configuration.
15type SyncProducer interface {
16
17 // SendMessage produces a given message, and returns only when it either has
18 // succeeded or failed to produce. It will return the partition and the offset
19 // of the produced message, or an error if the message failed to produce.
20 SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error)
21
22 // SendMessages produces a given set of messages, and returns only when all
23 // messages in the set have either succeeded or failed. Note that messages
24 // can succeed and fail individually; if some succeed and some fail,
25 // SendMessages will return an error.
26 SendMessages(msgs []*ProducerMessage) error
27
28 // Close shuts down the producer and waits for any buffered messages to be
29 // flushed. You must call this function before a producer object passes out of
30 // scope, as it may otherwise leak memory. You must call this before calling
31 // Close on the underlying client.
32 Close() error
33}
34
35type syncProducer struct {
36 producer *asyncProducer
37 wg sync.WaitGroup
38}
39
40// NewSyncProducer creates a new SyncProducer using the given broker addresses and configuration.
41func NewSyncProducer(addrs []string, config *Config) (SyncProducer, error) {
42 if config == nil {
43 config = NewConfig()
44 config.Producer.Return.Successes = true
45 }
46
47 if err := verifyProducerConfig(config); err != nil {
48 return nil, err
49 }
50
51 p, err := NewAsyncProducer(addrs, config)
52 if err != nil {
53 return nil, err
54 }
55 return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
56}
57
58// NewSyncProducerFromClient creates a new SyncProducer using the given client. It is still
59// necessary to call Close() on the underlying client when shutting down this producer.
60func NewSyncProducerFromClient(client Client) (SyncProducer, error) {
61 if err := verifyProducerConfig(client.Config()); err != nil {
62 return nil, err
63 }
64
65 p, err := NewAsyncProducerFromClient(client)
66 if err != nil {
67 return nil, err
68 }
69 return newSyncProducerFromAsyncProducer(p.(*asyncProducer)), nil
70}
71
72func newSyncProducerFromAsyncProducer(p *asyncProducer) *syncProducer {
73 sp := &syncProducer{producer: p}
74
75 sp.wg.Add(2)
76 go withRecover(sp.handleSuccesses)
77 go withRecover(sp.handleErrors)
78
79 return sp
80}
81
82func verifyProducerConfig(config *Config) error {
83 if !config.Producer.Return.Errors {
84 return ConfigurationError("Producer.Return.Errors must be true to be used in a SyncProducer")
85 }
86 if !config.Producer.Return.Successes {
87 return ConfigurationError("Producer.Return.Successes must be true to be used in a SyncProducer")
88 }
89 return nil
90}
91
92func (sp *syncProducer) SendMessage(msg *ProducerMessage) (partition int32, offset int64, err error) {
93 expectation := make(chan *ProducerError, 1)
94 msg.expectation = expectation
95 sp.producer.Input() <- msg
96
97 if err := <-expectation; err != nil {
98 return -1, -1, err.Err
99 }
100
101 return msg.Partition, msg.Offset, nil
102}
103
104func (sp *syncProducer) SendMessages(msgs []*ProducerMessage) error {
105 expectations := make(chan chan *ProducerError, len(msgs))
106 go func() {
107 for _, msg := range msgs {
108 expectation := make(chan *ProducerError, 1)
109 msg.expectation = expectation
110 sp.producer.Input() <- msg
111 expectations <- expectation
112 }
113 close(expectations)
114 }()
115
116 var errors ProducerErrors
117 for expectation := range expectations {
118 if err := <-expectation; err != nil {
119 errors = append(errors, err)
120 }
121 }
122
123 if len(errors) > 0 {
124 return errors
125 }
126 return nil
127}
128
129func (sp *syncProducer) handleSuccesses() {
130 defer sp.wg.Done()
131 for msg := range sp.producer.Successes() {
132 expectation := msg.expectation
133 expectation <- nil
134 }
135}
136
137func (sp *syncProducer) handleErrors() {
138 defer sp.wg.Done()
139 for err := range sp.producer.Errors() {
140 expectation := err.Msg.expectation
141 expectation <- err
142 }
143}
144
145func (sp *syncProducer) Close() error {
146 sp.producer.AsyncClose()
147 sp.wg.Wait()
148 return nil
149}