blob: b684aa4dca43702e93ed69eccbd7b561b6bfaca8 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "encoding/binary"
5 "errors"
6 "time"
7)
8
9type partitionSet struct {
10 msgs []*ProducerMessage
11 recordsToSend Records
12 bufferBytes int
13}
14
15type produceSet struct {
16 parent *asyncProducer
17 msgs map[string]map[int32]*partitionSet
18
19 bufferBytes int
20 bufferCount int
21}
22
23func newProduceSet(parent *asyncProducer) *produceSet {
24 return &produceSet{
25 msgs: make(map[string]map[int32]*partitionSet),
26 parent: parent,
27 }
28}
29
30func (ps *produceSet) add(msg *ProducerMessage) error {
31 var err error
32 var key, val []byte
33
34 if msg.Key != nil {
35 if key, err = msg.Key.Encode(); err != nil {
36 return err
37 }
38 }
39
40 if msg.Value != nil {
41 if val, err = msg.Value.Encode(); err != nil {
42 return err
43 }
44 }
45
46 timestamp := msg.Timestamp
47 if timestamp.IsZero() {
48 timestamp = time.Now()
49 }
50 timestamp = timestamp.Truncate(time.Millisecond)
51
52 partitions := ps.msgs[msg.Topic]
53 if partitions == nil {
54 partitions = make(map[int32]*partitionSet)
55 ps.msgs[msg.Topic] = partitions
56 }
57
58 var size int
59
60 set := partitions[msg.Partition]
61 if set == nil {
62 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
63 batch := &RecordBatch{
64 FirstTimestamp: timestamp,
65 Version: 2,
66 Codec: ps.parent.conf.Producer.Compression,
67 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
68 ProducerID: ps.parent.txnmgr.producerID,
69 ProducerEpoch: ps.parent.txnmgr.producerEpoch,
70 }
71 if ps.parent.conf.Producer.Idempotent {
72 batch.FirstSequence = msg.sequenceNumber
73 }
74 set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
75 size = recordBatchOverhead
76 } else {
77 set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
78 }
79 partitions[msg.Partition] = set
80 }
81 set.msgs = append(set.msgs, msg)
82
83 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
84 if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
85 return errors.New("assertion failed: message out of sequence added to a batch")
86 }
87 // We are being conservative here to avoid having to prep encode the record
88 size += maximumRecordOverhead
89 rec := &Record{
90 Key: key,
91 Value: val,
92 TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
93 }
94 size += len(key) + len(val)
95 if len(msg.Headers) > 0 {
96 rec.Headers = make([]*RecordHeader, len(msg.Headers))
97 for i := range msg.Headers {
98 rec.Headers[i] = &msg.Headers[i]
99 size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
100 }
101 }
102 set.recordsToSend.RecordBatch.addRecord(rec)
103 } else {
104 msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
105 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
106 msgToSend.Timestamp = timestamp
107 msgToSend.Version = 1
108 }
109 set.recordsToSend.MsgSet.addMessage(msgToSend)
110 size = producerMessageOverhead + len(key) + len(val)
111 }
112
113 set.bufferBytes += size
114 ps.bufferBytes += size
115 ps.bufferCount++
116
117 return nil
118}
119
120func (ps *produceSet) buildRequest() *ProduceRequest {
121 req := &ProduceRequest{
122 RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
123 Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
124 }
125 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
126 req.Version = 2
127 }
128 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
129 req.Version = 3
130 }
131
132 for topic, partitionSets := range ps.msgs {
133 for partition, set := range partitionSets {
134 if req.Version >= 3 {
135 // If the API version we're hitting is 3 or greater, we need to calculate
136 // offsets for each record in the batch relative to FirstOffset.
137 // Additionally, we must set LastOffsetDelta to the value of the last offset
138 // in the batch. Since the OffsetDelta of the first record is 0, we know that the
139 // final record of any batch will have an offset of (# of records in batch) - 1.
140 // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
141 // under the RecordBatch section for details.)
142 rb := set.recordsToSend.RecordBatch
143 if len(rb.Records) > 0 {
144 rb.LastOffsetDelta = int32(len(rb.Records) - 1)
145 for i, record := range rb.Records {
146 record.OffsetDelta = int64(i)
147 }
148 }
149 req.AddBatch(topic, partition, rb)
150 continue
151 }
152 if ps.parent.conf.Producer.Compression == CompressionNone {
153 req.AddSet(topic, partition, set.recordsToSend.MsgSet)
154 } else {
155 // When compression is enabled, the entire set for each partition is compressed
156 // and sent as the payload of a single fake "message" with the appropriate codec
157 // set and no key. When the server sees a message with a compression codec, it
158 // decompresses the payload and treats the result as its message set.
159
160 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
161 // If our version is 0.10 or later, assign relative offsets
162 // to the inner messages. This lets the broker avoid
163 // recompressing the message set.
164 // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
165 // for details on relative offsets.)
166 for i, msg := range set.recordsToSend.MsgSet.Messages {
167 msg.Offset = int64(i)
168 }
169 }
170 payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
171 if err != nil {
172 Logger.Println(err) // if this happens, it's basically our fault.
173 panic(err)
174 }
175 compMsg := &Message{
176 Codec: ps.parent.conf.Producer.Compression,
177 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
178 Key: nil,
179 Value: payload,
180 Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
181 }
182 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
183 compMsg.Version = 1
184 compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
185 }
186 req.AddMessage(topic, partition, compMsg)
187 }
188 }
189 }
190
191 return req
192}
193
194func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
195 for topic, partitionSet := range ps.msgs {
196 for partition, set := range partitionSet {
197 cb(topic, partition, set)
198 }
199 }
200}
201
202func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
203 if ps.msgs[topic] == nil {
204 return nil
205 }
206 set := ps.msgs[topic][partition]
207 if set == nil {
208 return nil
209 }
210 ps.bufferBytes -= set.bufferBytes
211 ps.bufferCount -= len(set.msgs)
212 delete(ps.msgs[topic], partition)
213 return set.msgs
214}
215
216func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
217 version := 1
218 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
219 version = 2
220 }
221
222 switch {
223 // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
224 case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
225 return true
226 // Would we overflow the size-limit of a message-batch for this partition?
227 case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
228 ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
229 return true
230 // Would we overflow simply in number of messages?
231 case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
232 return true
233 default:
234 return false
235 }
236}
237
238func (ps *produceSet) readyToFlush() bool {
239 switch {
240 // If we don't have any messages, nothing else matters
241 case ps.empty():
242 return false
243 // If all three config values are 0, we always flush as-fast-as-possible
244 case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
245 return true
246 // If we've passed the message trigger-point
247 case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
248 return true
249 // If we've passed the byte trigger-point
250 case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
251 return true
252 default:
253 return false
254 }
255}
256
257func (ps *produceSet) empty() bool {
258 return ps.bufferCount == 0
259}