blob: c653763eca835a7b89030c8bd6c48b6007c5d3c9 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8const recordBatchOverhead = 49
9
10type recordsArray []*Record
11
12func (e recordsArray) encode(pe packetEncoder) error {
13 for _, r := range e {
14 if err := r.encode(pe); err != nil {
15 return err
16 }
17 }
18 return nil
19}
20
21func (e recordsArray) decode(pd packetDecoder) error {
22 for i := range e {
23 rec := &Record{}
24 if err := rec.decode(pd); err != nil {
25 return err
26 }
27 e[i] = rec
28 }
29 return nil
30}
31
32type RecordBatch struct {
33 FirstOffset int64
34 PartitionLeaderEpoch int32
35 Version int8
36 Codec CompressionCodec
37 CompressionLevel int
38 Control bool
William Kurkiandaa6bb22019-03-07 12:26:28 -050039 LogAppendTime bool
khenaidooac637102019-01-14 15:44:34 -050040 LastOffsetDelta int32
41 FirstTimestamp time.Time
42 MaxTimestamp time.Time
43 ProducerID int64
44 ProducerEpoch int16
45 FirstSequence int32
46 Records []*Record
47 PartialTrailingRecord bool
Scott Baker8461e152019-10-01 14:44:30 -070048 IsTransactional bool
khenaidooac637102019-01-14 15:44:34 -050049
50 compressedRecords []byte
51 recordsLen int // uncompressed records size
52}
53
Scott Baker8461e152019-10-01 14:44:30 -070054func (b *RecordBatch) LastOffset() int64 {
55 return b.FirstOffset + int64(b.LastOffsetDelta)
56}
57
khenaidooac637102019-01-14 15:44:34 -050058func (b *RecordBatch) encode(pe packetEncoder) error {
59 if b.Version != 2 {
60 return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
61 }
62 pe.putInt64(b.FirstOffset)
63 pe.push(&lengthField{})
64 pe.putInt32(b.PartitionLeaderEpoch)
65 pe.putInt8(b.Version)
66 pe.push(newCRC32Field(crcCastagnoli))
67 pe.putInt16(b.computeAttributes())
68 pe.putInt32(b.LastOffsetDelta)
69
70 if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
71 return err
72 }
73
74 if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
75 return err
76 }
77
78 pe.putInt64(b.ProducerID)
79 pe.putInt16(b.ProducerEpoch)
80 pe.putInt32(b.FirstSequence)
81
82 if err := pe.putArrayLength(len(b.Records)); err != nil {
83 return err
84 }
85
86 if b.compressedRecords == nil {
87 if err := b.encodeRecords(pe); err != nil {
88 return err
89 }
90 }
91 if err := pe.putRawBytes(b.compressedRecords); err != nil {
92 return err
93 }
94
95 if err := pe.pop(); err != nil {
96 return err
97 }
98 return pe.pop()
99}
100
101func (b *RecordBatch) decode(pd packetDecoder) (err error) {
102 if b.FirstOffset, err = pd.getInt64(); err != nil {
103 return err
104 }
105
106 batchLen, err := pd.getInt32()
107 if err != nil {
108 return err
109 }
110
111 if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
112 return err
113 }
114
115 if b.Version, err = pd.getInt8(); err != nil {
116 return err
117 }
118
Scott Baker8461e152019-10-01 14:44:30 -0700119 crc32Decoder := acquireCrc32Field(crcCastagnoli)
120 defer releaseCrc32Field(crc32Decoder)
121
122 if err = pd.push(crc32Decoder); err != nil {
khenaidooac637102019-01-14 15:44:34 -0500123 return err
124 }
125
126 attributes, err := pd.getInt16()
127 if err != nil {
128 return err
129 }
130 b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
131 b.Control = attributes&controlMask == controlMask
William Kurkiandaa6bb22019-03-07 12:26:28 -0500132 b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
Scott Baker8461e152019-10-01 14:44:30 -0700133 b.IsTransactional = attributes&isTransactionalMask == isTransactionalMask
khenaidooac637102019-01-14 15:44:34 -0500134
135 if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
136 return err
137 }
138
139 if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
140 return err
141 }
142
143 if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
144 return err
145 }
146
147 if b.ProducerID, err = pd.getInt64(); err != nil {
148 return err
149 }
150
151 if b.ProducerEpoch, err = pd.getInt16(); err != nil {
152 return err
153 }
154
155 if b.FirstSequence, err = pd.getInt32(); err != nil {
156 return err
157 }
158
159 numRecs, err := pd.getArrayLength()
160 if err != nil {
161 return err
162 }
163 if numRecs >= 0 {
164 b.Records = make([]*Record, numRecs)
165 }
166
167 bufSize := int(batchLen) - recordBatchOverhead
168 recBuffer, err := pd.getRawBytes(bufSize)
169 if err != nil {
170 if err == ErrInsufficientData {
171 b.PartialTrailingRecord = true
172 b.Records = nil
173 return nil
174 }
175 return err
176 }
177
178 if err = pd.pop(); err != nil {
179 return err
180 }
181
182 recBuffer, err = decompress(b.Codec, recBuffer)
183 if err != nil {
184 return err
185 }
186
187 b.recordsLen = len(recBuffer)
188 err = decode(recBuffer, recordsArray(b.Records))
189 if err == ErrInsufficientData {
190 b.PartialTrailingRecord = true
191 b.Records = nil
192 return nil
193 }
194 return err
195}
196
197func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
198 var raw []byte
199 var err error
200 if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
201 return err
202 }
203 b.recordsLen = len(raw)
204
205 b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
206 return err
207}
208
209func (b *RecordBatch) computeAttributes() int16 {
210 attr := int16(b.Codec) & int16(compressionCodecMask)
211 if b.Control {
212 attr |= controlMask
213 }
William Kurkiandaa6bb22019-03-07 12:26:28 -0500214 if b.LogAppendTime {
215 attr |= timestampTypeMask
216 }
Scott Baker8461e152019-10-01 14:44:30 -0700217 if b.IsTransactional {
218 attr |= isTransactionalMask
219 }
khenaidooac637102019-01-14 15:44:34 -0500220 return attr
221}
222
223func (b *RecordBatch) addRecord(r *Record) {
224 b.Records = append(b.Records, r)
225}