blob: 9c70f81800695563322b7ad7538ab798d84858e1 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "encoding/binary"
5 "errors"
6 "time"
7)
8
9type partitionSet struct {
10 msgs []*ProducerMessage
11 recordsToSend Records
12 bufferBytes int
13}
14
15type produceSet struct {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000016 parent *asyncProducer
17 msgs map[string]map[int32]*partitionSet
18 producerID int64
19 producerEpoch int16
Scott Bakered4efab2020-01-13 19:12:25 -080020
21 bufferBytes int
22 bufferCount int
23}
24
25func newProduceSet(parent *asyncProducer) *produceSet {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000026 pid, epoch := parent.txnmgr.getProducerID()
Scott Bakered4efab2020-01-13 19:12:25 -080027 return &produceSet{
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000028 msgs: make(map[string]map[int32]*partitionSet),
29 parent: parent,
30 producerID: pid,
31 producerEpoch: epoch,
Scott Bakered4efab2020-01-13 19:12:25 -080032 }
33}
34
35func (ps *produceSet) add(msg *ProducerMessage) error {
36 var err error
37 var key, val []byte
38
39 if msg.Key != nil {
40 if key, err = msg.Key.Encode(); err != nil {
41 return err
42 }
43 }
44
45 if msg.Value != nil {
46 if val, err = msg.Value.Encode(); err != nil {
47 return err
48 }
49 }
50
51 timestamp := msg.Timestamp
52 if timestamp.IsZero() {
53 timestamp = time.Now()
54 }
55 timestamp = timestamp.Truncate(time.Millisecond)
56
57 partitions := ps.msgs[msg.Topic]
58 if partitions == nil {
59 partitions = make(map[int32]*partitionSet)
60 ps.msgs[msg.Topic] = partitions
61 }
62
63 var size int
64
65 set := partitions[msg.Partition]
66 if set == nil {
67 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
68 batch := &RecordBatch{
69 FirstTimestamp: timestamp,
70 Version: 2,
71 Codec: ps.parent.conf.Producer.Compression,
72 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000073 ProducerID: ps.producerID,
74 ProducerEpoch: ps.producerEpoch,
Scott Bakered4efab2020-01-13 19:12:25 -080075 }
76 if ps.parent.conf.Producer.Idempotent {
77 batch.FirstSequence = msg.sequenceNumber
78 }
79 set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
80 size = recordBatchOverhead
81 } else {
82 set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
83 }
84 partitions[msg.Partition] = set
85 }
Scott Bakered4efab2020-01-13 19:12:25 -080086
87 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
88 if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
89 return errors.New("assertion failed: message out of sequence added to a batch")
90 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000091 }
92
93 // Past this point we can't return an error, because we've already added the message to the set.
94 set.msgs = append(set.msgs, msg)
95
96 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
Scott Bakered4efab2020-01-13 19:12:25 -080097 // We are being conservative here to avoid having to prep encode the record
98 size += maximumRecordOverhead
99 rec := &Record{
100 Key: key,
101 Value: val,
102 TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
103 }
104 size += len(key) + len(val)
105 if len(msg.Headers) > 0 {
106 rec.Headers = make([]*RecordHeader, len(msg.Headers))
107 for i := range msg.Headers {
108 rec.Headers[i] = &msg.Headers[i]
109 size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
110 }
111 }
112 set.recordsToSend.RecordBatch.addRecord(rec)
113 } else {
114 msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
115 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
116 msgToSend.Timestamp = timestamp
117 msgToSend.Version = 1
118 }
119 set.recordsToSend.MsgSet.addMessage(msgToSend)
120 size = producerMessageOverhead + len(key) + len(val)
121 }
122
123 set.bufferBytes += size
124 ps.bufferBytes += size
125 ps.bufferCount++
126
127 return nil
128}
129
130func (ps *produceSet) buildRequest() *ProduceRequest {
131 req := &ProduceRequest{
132 RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
133 Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
134 }
135 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
136 req.Version = 2
137 }
138 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
139 req.Version = 3
140 }
141
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000142 if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
143 req.Version = 7
144 }
145
Scott Bakered4efab2020-01-13 19:12:25 -0800146 for topic, partitionSets := range ps.msgs {
147 for partition, set := range partitionSets {
148 if req.Version >= 3 {
149 // If the API version we're hitting is 3 or greater, we need to calculate
150 // offsets for each record in the batch relative to FirstOffset.
151 // Additionally, we must set LastOffsetDelta to the value of the last offset
152 // in the batch. Since the OffsetDelta of the first record is 0, we know that the
153 // final record of any batch will have an offset of (# of records in batch) - 1.
154 // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
155 // under the RecordBatch section for details.)
156 rb := set.recordsToSend.RecordBatch
157 if len(rb.Records) > 0 {
158 rb.LastOffsetDelta = int32(len(rb.Records) - 1)
159 for i, record := range rb.Records {
160 record.OffsetDelta = int64(i)
161 }
162 }
163 req.AddBatch(topic, partition, rb)
164 continue
165 }
166 if ps.parent.conf.Producer.Compression == CompressionNone {
167 req.AddSet(topic, partition, set.recordsToSend.MsgSet)
168 } else {
169 // When compression is enabled, the entire set for each partition is compressed
170 // and sent as the payload of a single fake "message" with the appropriate codec
171 // set and no key. When the server sees a message with a compression codec, it
172 // decompresses the payload and treats the result as its message set.
173
174 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
175 // If our version is 0.10 or later, assign relative offsets
176 // to the inner messages. This lets the broker avoid
177 // recompressing the message set.
178 // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
179 // for details on relative offsets.)
180 for i, msg := range set.recordsToSend.MsgSet.Messages {
181 msg.Offset = int64(i)
182 }
183 }
184 payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
185 if err != nil {
186 Logger.Println(err) // if this happens, it's basically our fault.
187 panic(err)
188 }
189 compMsg := &Message{
190 Codec: ps.parent.conf.Producer.Compression,
191 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
192 Key: nil,
193 Value: payload,
194 Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
195 }
196 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
197 compMsg.Version = 1
198 compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
199 }
200 req.AddMessage(topic, partition, compMsg)
201 }
202 }
203 }
204
205 return req
206}
207
208func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
209 for topic, partitionSet := range ps.msgs {
210 for partition, set := range partitionSet {
211 cb(topic, partition, set)
212 }
213 }
214}
215
216func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
217 if ps.msgs[topic] == nil {
218 return nil
219 }
220 set := ps.msgs[topic][partition]
221 if set == nil {
222 return nil
223 }
224 ps.bufferBytes -= set.bufferBytes
225 ps.bufferCount -= len(set.msgs)
226 delete(ps.msgs[topic], partition)
227 return set.msgs
228}
229
230func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
231 version := 1
232 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
233 version = 2
234 }
235
236 switch {
237 // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
238 case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
239 return true
240 // Would we overflow the size-limit of a message-batch for this partition?
241 case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
242 ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
243 return true
244 // Would we overflow simply in number of messages?
245 case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
246 return true
247 default:
248 return false
249 }
250}
251
252func (ps *produceSet) readyToFlush() bool {
253 switch {
254 // If we don't have any messages, nothing else matters
255 case ps.empty():
256 return false
257 // If all three config values are 0, we always flush as-fast-as-possible
258 case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
259 return true
260 // If we've passed the message trigger-point
261 case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
262 return true
263 // If we've passed the byte trigger-point
264 case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
265 return true
266 default:
267 return false
268 }
269}
270
271func (ps *produceSet) empty() bool {
272 return ps.bufferCount == 0
273}