blob: c653763eca835a7b89030c8bd6c48b6007c5d3c9 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8const recordBatchOverhead = 49
9
10type recordsArray []*Record
11
12func (e recordsArray) encode(pe packetEncoder) error {
13 for _, r := range e {
14 if err := r.encode(pe); err != nil {
15 return err
16 }
17 }
18 return nil
19}
20
21func (e recordsArray) decode(pd packetDecoder) error {
22 for i := range e {
23 rec := &Record{}
24 if err := rec.decode(pd); err != nil {
25 return err
26 }
27 e[i] = rec
28 }
29 return nil
30}
31
32type RecordBatch struct {
33 FirstOffset int64
34 PartitionLeaderEpoch int32
35 Version int8
36 Codec CompressionCodec
37 CompressionLevel int
38 Control bool
39 LogAppendTime bool
40 LastOffsetDelta int32
41 FirstTimestamp time.Time
42 MaxTimestamp time.Time
43 ProducerID int64
44 ProducerEpoch int16
45 FirstSequence int32
46 Records []*Record
47 PartialTrailingRecord bool
48 IsTransactional bool
49
50 compressedRecords []byte
51 recordsLen int // uncompressed records size
52}
53
54func (b *RecordBatch) LastOffset() int64 {
55 return b.FirstOffset + int64(b.LastOffsetDelta)
56}
57
58func (b *RecordBatch) encode(pe packetEncoder) error {
59 if b.Version != 2 {
60 return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
61 }
62 pe.putInt64(b.FirstOffset)
63 pe.push(&lengthField{})
64 pe.putInt32(b.PartitionLeaderEpoch)
65 pe.putInt8(b.Version)
66 pe.push(newCRC32Field(crcCastagnoli))
67 pe.putInt16(b.computeAttributes())
68 pe.putInt32(b.LastOffsetDelta)
69
70 if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
71 return err
72 }
73
74 if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
75 return err
76 }
77
78 pe.putInt64(b.ProducerID)
79 pe.putInt16(b.ProducerEpoch)
80 pe.putInt32(b.FirstSequence)
81
82 if err := pe.putArrayLength(len(b.Records)); err != nil {
83 return err
84 }
85
86 if b.compressedRecords == nil {
87 if err := b.encodeRecords(pe); err != nil {
88 return err
89 }
90 }
91 if err := pe.putRawBytes(b.compressedRecords); err != nil {
92 return err
93 }
94
95 if err := pe.pop(); err != nil {
96 return err
97 }
98 return pe.pop()
99}
100
101func (b *RecordBatch) decode(pd packetDecoder) (err error) {
102 if b.FirstOffset, err = pd.getInt64(); err != nil {
103 return err
104 }
105
106 batchLen, err := pd.getInt32()
107 if err != nil {
108 return err
109 }
110
111 if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
112 return err
113 }
114
115 if b.Version, err = pd.getInt8(); err != nil {
116 return err
117 }
118
119 crc32Decoder := acquireCrc32Field(crcCastagnoli)
120 defer releaseCrc32Field(crc32Decoder)
121
122 if err = pd.push(crc32Decoder); err != nil {
123 return err
124 }
125
126 attributes, err := pd.getInt16()
127 if err != nil {
128 return err
129 }
130 b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
131 b.Control = attributes&controlMask == controlMask
132 b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
133 b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
134
135 if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
136 return err
137 }
138
139 if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
140 return err
141 }
142
143 if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
144 return err
145 }
146
147 if b.ProducerID, err = pd.getInt64(); err != nil {
148 return err
149 }
150
151 if b.ProducerEpoch, err = pd.getInt16(); err != nil {
152 return err
153 }
154
155 if b.FirstSequence, err = pd.getInt32(); err != nil {
156 return err
157 }
158
159 numRecs, err := pd.getArrayLength()
160 if err != nil {
161 return err
162 }
163 if numRecs >= 0 {
164 b.Records = make([]*Record, numRecs)
165 }
166
167 bufSize := int(batchLen) - recordBatchOverhead
168 recBuffer, err := pd.getRawBytes(bufSize)
169 if err != nil {
170 if err == ErrInsufficientData {
171 b.PartialTrailingRecord = true
172 b.Records = nil
173 return nil
174 }
175 return err
176 }
177
178 if err = pd.pop(); err != nil {
179 return err
180 }
181
182 recBuffer, err = decompress(b.Codec, recBuffer)
183 if err != nil {
184 return err
185 }
186
187 b.recordsLen = len(recBuffer)
188 err = decode(recBuffer, recordsArray(b.Records))
189 if err == ErrInsufficientData {
190 b.PartialTrailingRecord = true
191 b.Records = nil
192 return nil
193 }
194 return err
195}
196
197func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
198 var raw []byte
199 var err error
200 if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
201 return err
202 }
203 b.recordsLen = len(raw)
204
205 b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
206 return err
207}
208
209func (b *RecordBatch) computeAttributes() int16 {
210 attr := int16(b.Codec) & int16(compressionCodecMask)
211 if b.Control {
212 attr |= controlMask
213 }
214 if b.LogAppendTime {
215 attr |= timestampTypeMask
216 }
217 if b.IsTransactional {
218 attr |= isTransactionalMask
219 }
220 return attr
221}
222
223func (b *RecordBatch) addRecord(r *Record) {
224 b.Records = append(b.Records, r)
225}