blob: 36c43c6a612227bf111b4cde50850c7a3135d496 [file] [log] [blame]
Scott Baker105df152020-04-13 15:55:14 -07001package sarama
2
3import (
4 "encoding/binary"
5 "errors"
6 "time"
7)
8
9type partitionSet struct {
10 msgs []*ProducerMessage
11 recordsToSend Records
12 bufferBytes int
13}
14
15type produceSet struct {
16 parent *asyncProducer
17 msgs map[string]map[int32]*partitionSet
18
19 bufferBytes int
20 bufferCount int
21}
22
23func newProduceSet(parent *asyncProducer) *produceSet {
24 return &produceSet{
25 msgs: make(map[string]map[int32]*partitionSet),
26 parent: parent,
27 }
28}
29
30func (ps *produceSet) add(msg *ProducerMessage) error {
31 var err error
32 var key, val []byte
33
34 if msg.Key != nil {
35 if key, err = msg.Key.Encode(); err != nil {
36 return err
37 }
38 }
39
40 if msg.Value != nil {
41 if val, err = msg.Value.Encode(); err != nil {
42 return err
43 }
44 }
45
46 timestamp := msg.Timestamp
47 if timestamp.IsZero() {
48 timestamp = time.Now()
49 }
50 timestamp = timestamp.Truncate(time.Millisecond)
51
52 partitions := ps.msgs[msg.Topic]
53 if partitions == nil {
54 partitions = make(map[int32]*partitionSet)
55 ps.msgs[msg.Topic] = partitions
56 }
57
58 var size int
59
60 set := partitions[msg.Partition]
61 if set == nil {
62 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
63 batch := &RecordBatch{
64 FirstTimestamp: timestamp,
65 Version: 2,
66 Codec: ps.parent.conf.Producer.Compression,
67 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
68 ProducerID: ps.parent.txnmgr.producerID,
69 ProducerEpoch: ps.parent.txnmgr.producerEpoch,
70 }
71 if ps.parent.conf.Producer.Idempotent {
72 batch.FirstSequence = msg.sequenceNumber
73 }
74 set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
75 size = recordBatchOverhead
76 } else {
77 set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
78 }
79 partitions[msg.Partition] = set
80 }
81 set.msgs = append(set.msgs, msg)
82
83 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
84 if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
85 return errors.New("assertion failed: message out of sequence added to a batch")
86 }
87 // We are being conservative here to avoid having to prep encode the record
88 size += maximumRecordOverhead
89 rec := &Record{
90 Key: key,
91 Value: val,
92 TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
93 }
94 size += len(key) + len(val)
95 if len(msg.Headers) > 0 {
96 rec.Headers = make([]*RecordHeader, len(msg.Headers))
97 for i := range msg.Headers {
98 rec.Headers[i] = &msg.Headers[i]
99 size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
100 }
101 }
102 set.recordsToSend.RecordBatch.addRecord(rec)
103 } else {
104 msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
105 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
106 msgToSend.Timestamp = timestamp
107 msgToSend.Version = 1
108 }
109 set.recordsToSend.MsgSet.addMessage(msgToSend)
110 size = producerMessageOverhead + len(key) + len(val)
111 }
112
113 set.bufferBytes += size
114 ps.bufferBytes += size
115 ps.bufferCount++
116
117 return nil
118}
119
120func (ps *produceSet) buildRequest() *ProduceRequest {
121 req := &ProduceRequest{
122 RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
123 Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
124 }
125 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
126 req.Version = 2
127 }
128 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
129 req.Version = 3
130 }
131
132 if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
133 req.Version = 7
134 }
135
136 for topic, partitionSets := range ps.msgs {
137 for partition, set := range partitionSets {
138 if req.Version >= 3 {
139 // If the API version we're hitting is 3 or greater, we need to calculate
140 // offsets for each record in the batch relative to FirstOffset.
141 // Additionally, we must set LastOffsetDelta to the value of the last offset
142 // in the batch. Since the OffsetDelta of the first record is 0, we know that the
143 // final record of any batch will have an offset of (# of records in batch) - 1.
144 // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
145 // under the RecordBatch section for details.)
146 rb := set.recordsToSend.RecordBatch
147 if len(rb.Records) > 0 {
148 rb.LastOffsetDelta = int32(len(rb.Records) - 1)
149 for i, record := range rb.Records {
150 record.OffsetDelta = int64(i)
151 }
152 }
153 req.AddBatch(topic, partition, rb)
154 continue
155 }
156 if ps.parent.conf.Producer.Compression == CompressionNone {
157 req.AddSet(topic, partition, set.recordsToSend.MsgSet)
158 } else {
159 // When compression is enabled, the entire set for each partition is compressed
160 // and sent as the payload of a single fake "message" with the appropriate codec
161 // set and no key. When the server sees a message with a compression codec, it
162 // decompresses the payload and treats the result as its message set.
163
164 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
165 // If our version is 0.10 or later, assign relative offsets
166 // to the inner messages. This lets the broker avoid
167 // recompressing the message set.
168 // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
169 // for details on relative offsets.)
170 for i, msg := range set.recordsToSend.MsgSet.Messages {
171 msg.Offset = int64(i)
172 }
173 }
174 payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
175 if err != nil {
176 Logger.Println(err) // if this happens, it's basically our fault.
177 panic(err)
178 }
179 compMsg := &Message{
180 Codec: ps.parent.conf.Producer.Compression,
181 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
182 Key: nil,
183 Value: payload,
184 Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
185 }
186 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
187 compMsg.Version = 1
188 compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
189 }
190 req.AddMessage(topic, partition, compMsg)
191 }
192 }
193 }
194
195 return req
196}
197
198func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
199 for topic, partitionSet := range ps.msgs {
200 for partition, set := range partitionSet {
201 cb(topic, partition, set)
202 }
203 }
204}
205
206func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
207 if ps.msgs[topic] == nil {
208 return nil
209 }
210 set := ps.msgs[topic][partition]
211 if set == nil {
212 return nil
213 }
214 ps.bufferBytes -= set.bufferBytes
215 ps.bufferCount -= len(set.msgs)
216 delete(ps.msgs[topic], partition)
217 return set.msgs
218}
219
220func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
221 version := 1
222 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
223 version = 2
224 }
225
226 switch {
227 // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
228 case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
229 return true
230 // Would we overflow the size-limit of a message-batch for this partition?
231 case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
232 ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
233 return true
234 // Would we overflow simply in number of messages?
235 case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
236 return true
237 default:
238 return false
239 }
240}
241
242func (ps *produceSet) readyToFlush() bool {
243 switch {
244 // If we don't have any messages, nothing else matters
245 case ps.empty():
246 return false
247 // If all three config values are 0, we always flush as-fast-as-possible
248 case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
249 return true
250 // If we've passed the message trigger-point
251 case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
252 return true
253 // If we've passed the byte trigger-point
254 case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
255 return true
256 default:
257 return false
258 }
259}
260
261func (ps *produceSet) empty() bool {
262 return ps.bufferCount == 0
263}