blob: 4b42d9c32935929dd7491e2b36cb5be44323ae00 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "encoding/binary"
5 "errors"
6 "time"
7)
8
9type partitionSet struct {
10 msgs []*ProducerMessage
11 recordsToSend Records
12 bufferBytes int
13}
14
15type produceSet struct {
16 parent *asyncProducer
17 msgs map[string]map[int32]*partitionSet
18
19 bufferBytes int
20 bufferCount int
21}
22
23func newProduceSet(parent *asyncProducer) *produceSet {
24 return &produceSet{
25 msgs: make(map[string]map[int32]*partitionSet),
26 parent: parent,
27 }
28}
29
30func (ps *produceSet) add(msg *ProducerMessage) error {
31 var err error
32 var key, val []byte
33
34 if msg.Key != nil {
35 if key, err = msg.Key.Encode(); err != nil {
36 return err
37 }
38 }
39
40 if msg.Value != nil {
41 if val, err = msg.Value.Encode(); err != nil {
42 return err
43 }
44 }
45
46 timestamp := msg.Timestamp
47 if msg.Timestamp.IsZero() {
48 timestamp = time.Now()
49 }
50
51 partitions := ps.msgs[msg.Topic]
52 if partitions == nil {
53 partitions = make(map[int32]*partitionSet)
54 ps.msgs[msg.Topic] = partitions
55 }
56
57 var size int
58
59 set := partitions[msg.Partition]
60 if set == nil {
61 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
62 batch := &RecordBatch{
63 FirstTimestamp: timestamp,
64 Version: 2,
65 Codec: ps.parent.conf.Producer.Compression,
66 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
67 ProducerID: ps.parent.txnmgr.producerID,
68 ProducerEpoch: ps.parent.txnmgr.producerEpoch,
69 }
70 if ps.parent.conf.Producer.Idempotent {
71 batch.FirstSequence = msg.sequenceNumber
72 }
73 set = &partitionSet{recordsToSend: newDefaultRecords(batch)}
74 size = recordBatchOverhead
75 } else {
76 set = &partitionSet{recordsToSend: newLegacyRecords(new(MessageSet))}
77 }
78 partitions[msg.Partition] = set
79 }
80 set.msgs = append(set.msgs, msg)
81
82 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
83 if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
84 return errors.New("Assertion failed: Message out of sequence added to a batch")
85 }
86 // We are being conservative here to avoid having to prep encode the record
87 size += maximumRecordOverhead
88 rec := &Record{
89 Key: key,
90 Value: val,
91 TimestampDelta: timestamp.Sub(set.recordsToSend.RecordBatch.FirstTimestamp),
92 }
93 size += len(key) + len(val)
94 if len(msg.Headers) > 0 {
95 rec.Headers = make([]*RecordHeader, len(msg.Headers))
96 for i := range msg.Headers {
97 rec.Headers[i] = &msg.Headers[i]
98 size += len(rec.Headers[i].Key) + len(rec.Headers[i].Value) + 2*binary.MaxVarintLen32
99 }
100 }
101 set.recordsToSend.RecordBatch.addRecord(rec)
102 } else {
103 msgToSend := &Message{Codec: CompressionNone, Key: key, Value: val}
104 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
105 msgToSend.Timestamp = timestamp
106 msgToSend.Version = 1
107 }
108 set.recordsToSend.MsgSet.addMessage(msgToSend)
109 size = producerMessageOverhead + len(key) + len(val)
110 }
111
112 set.bufferBytes += size
113 ps.bufferBytes += size
114 ps.bufferCount++
115
116 return nil
117}
118
119func (ps *produceSet) buildRequest() *ProduceRequest {
120 req := &ProduceRequest{
121 RequiredAcks: ps.parent.conf.Producer.RequiredAcks,
122 Timeout: int32(ps.parent.conf.Producer.Timeout / time.Millisecond),
123 }
124 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
125 req.Version = 2
126 }
127 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
128 req.Version = 3
129 }
130
131 for topic, partitionSets := range ps.msgs {
132 for partition, set := range partitionSets {
133 if req.Version >= 3 {
134 // If the API version we're hitting is 3 or greater, we need to calculate
135 // offsets for each record in the batch relative to FirstOffset.
136 // Additionally, we must set LastOffsetDelta to the value of the last offset
137 // in the batch. Since the OffsetDelta of the first record is 0, we know that the
138 // final record of any batch will have an offset of (# of records in batch) - 1.
139 // (See https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-Messagesets
140 // under the RecordBatch section for details.)
141 rb := set.recordsToSend.RecordBatch
142 if len(rb.Records) > 0 {
143 rb.LastOffsetDelta = int32(len(rb.Records) - 1)
144 for i, record := range rb.Records {
145 record.OffsetDelta = int64(i)
146 }
147 }
148 req.AddBatch(topic, partition, rb)
149 continue
150 }
151 if ps.parent.conf.Producer.Compression == CompressionNone {
152 req.AddSet(topic, partition, set.recordsToSend.MsgSet)
153 } else {
154 // When compression is enabled, the entire set for each partition is compressed
155 // and sent as the payload of a single fake "message" with the appropriate codec
156 // set and no key. When the server sees a message with a compression codec, it
157 // decompresses the payload and treats the result as its message set.
158
159 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
160 // If our version is 0.10 or later, assign relative offsets
161 // to the inner messages. This lets the broker avoid
162 // recompressing the message set.
163 // (See https://cwiki.apache.org/confluence/display/KAFKA/KIP-31+-+Move+to+relative+offsets+in+compressed+message+sets
164 // for details on relative offsets.)
165 for i, msg := range set.recordsToSend.MsgSet.Messages {
166 msg.Offset = int64(i)
167 }
168 }
169 payload, err := encode(set.recordsToSend.MsgSet, ps.parent.conf.MetricRegistry)
170 if err != nil {
171 Logger.Println(err) // if this happens, it's basically our fault.
172 panic(err)
173 }
174 compMsg := &Message{
175 Codec: ps.parent.conf.Producer.Compression,
176 CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
177 Key: nil,
178 Value: payload,
179 Set: set.recordsToSend.MsgSet, // Provide the underlying message set for accurate metrics
180 }
181 if ps.parent.conf.Version.IsAtLeast(V0_10_0_0) {
182 compMsg.Version = 1
183 compMsg.Timestamp = set.recordsToSend.MsgSet.Messages[0].Msg.Timestamp
184 }
185 req.AddMessage(topic, partition, compMsg)
186 }
187 }
188 }
189
190 return req
191}
192
193func (ps *produceSet) eachPartition(cb func(topic string, partition int32, pSet *partitionSet)) {
194 for topic, partitionSet := range ps.msgs {
195 for partition, set := range partitionSet {
196 cb(topic, partition, set)
197 }
198 }
199}
200
201func (ps *produceSet) dropPartition(topic string, partition int32) []*ProducerMessage {
202 if ps.msgs[topic] == nil {
203 return nil
204 }
205 set := ps.msgs[topic][partition]
206 if set == nil {
207 return nil
208 }
209 ps.bufferBytes -= set.bufferBytes
210 ps.bufferCount -= len(set.msgs)
211 delete(ps.msgs[topic], partition)
212 return set.msgs
213}
214
215func (ps *produceSet) wouldOverflow(msg *ProducerMessage) bool {
216 version := 1
217 if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
218 version = 2
219 }
220
221 switch {
222 // Would we overflow our maximum possible size-on-the-wire? 10KiB is arbitrary overhead for safety.
223 case ps.bufferBytes+msg.byteSize(version) >= int(MaxRequestSize-(10*1024)):
224 return true
225 // Would we overflow the size-limit of a message-batch for this partition?
226 case ps.msgs[msg.Topic] != nil && ps.msgs[msg.Topic][msg.Partition] != nil &&
227 ps.msgs[msg.Topic][msg.Partition].bufferBytes+msg.byteSize(version) >= ps.parent.conf.Producer.MaxMessageBytes:
228 return true
229 // Would we overflow simply in number of messages?
230 case ps.parent.conf.Producer.Flush.MaxMessages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.MaxMessages:
231 return true
232 default:
233 return false
234 }
235}
236
237func (ps *produceSet) readyToFlush() bool {
238 switch {
239 // If we don't have any messages, nothing else matters
240 case ps.empty():
241 return false
242 // If all three config values are 0, we always flush as-fast-as-possible
243 case ps.parent.conf.Producer.Flush.Frequency == 0 && ps.parent.conf.Producer.Flush.Bytes == 0 && ps.parent.conf.Producer.Flush.Messages == 0:
244 return true
245 // If we've passed the message trigger-point
246 case ps.parent.conf.Producer.Flush.Messages > 0 && ps.bufferCount >= ps.parent.conf.Producer.Flush.Messages:
247 return true
248 // If we've passed the byte trigger-point
249 case ps.parent.conf.Producer.Flush.Bytes > 0 && ps.bufferBytes >= ps.parent.conf.Producer.Flush.Bytes:
250 return true
251 default:
252 return false
253 }
254}
255
256func (ps *produceSet) empty() bool {
257 return ps.bufferCount == 0
258}