blob: a36f7e6296eb7bf599e60c53fc9fe4bb4e7cdabf [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8const recordBatchOverhead = 49
9
10type recordsArray []*Record
11
12func (e recordsArray) encode(pe packetEncoder) error {
13 for _, r := range e {
14 if err := r.encode(pe); err != nil {
15 return err
16 }
17 }
18 return nil
19}
20
21func (e recordsArray) decode(pd packetDecoder) error {
22 for i := range e {
23 rec := &Record{}
24 if err := rec.decode(pd); err != nil {
25 return err
26 }
27 e[i] = rec
28 }
29 return nil
30}
31
32type RecordBatch struct {
33 FirstOffset int64
34 PartitionLeaderEpoch int32
35 Version int8
36 Codec CompressionCodec
37 CompressionLevel int
38 Control bool
39 LogAppendTime bool
40 LastOffsetDelta int32
41 FirstTimestamp time.Time
42 MaxTimestamp time.Time
43 ProducerID int64
44 ProducerEpoch int16
45 FirstSequence int32
46 Records []*Record
47 PartialTrailingRecord bool
48
49 compressedRecords []byte
50 recordsLen int // uncompressed records size
51}
52
53func (b *RecordBatch) encode(pe packetEncoder) error {
54 if b.Version != 2 {
55 return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
56 }
57 pe.putInt64(b.FirstOffset)
58 pe.push(&lengthField{})
59 pe.putInt32(b.PartitionLeaderEpoch)
60 pe.putInt8(b.Version)
61 pe.push(newCRC32Field(crcCastagnoli))
62 pe.putInt16(b.computeAttributes())
63 pe.putInt32(b.LastOffsetDelta)
64
65 if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
66 return err
67 }
68
69 if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
70 return err
71 }
72
73 pe.putInt64(b.ProducerID)
74 pe.putInt16(b.ProducerEpoch)
75 pe.putInt32(b.FirstSequence)
76
77 if err := pe.putArrayLength(len(b.Records)); err != nil {
78 return err
79 }
80
81 if b.compressedRecords == nil {
82 if err := b.encodeRecords(pe); err != nil {
83 return err
84 }
85 }
86 if err := pe.putRawBytes(b.compressedRecords); err != nil {
87 return err
88 }
89
90 if err := pe.pop(); err != nil {
91 return err
92 }
93 return pe.pop()
94}
95
96func (b *RecordBatch) decode(pd packetDecoder) (err error) {
97 if b.FirstOffset, err = pd.getInt64(); err != nil {
98 return err
99 }
100
101 batchLen, err := pd.getInt32()
102 if err != nil {
103 return err
104 }
105
106 if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
107 return err
108 }
109
110 if b.Version, err = pd.getInt8(); err != nil {
111 return err
112 }
113
114 if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
115 return err
116 }
117
118 attributes, err := pd.getInt16()
119 if err != nil {
120 return err
121 }
122 b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
123 b.Control = attributes&controlMask == controlMask
124 b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
125
126 if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
127 return err
128 }
129
130 if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
131 return err
132 }
133
134 if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
135 return err
136 }
137
138 if b.ProducerID, err = pd.getInt64(); err != nil {
139 return err
140 }
141
142 if b.ProducerEpoch, err = pd.getInt16(); err != nil {
143 return err
144 }
145
146 if b.FirstSequence, err = pd.getInt32(); err != nil {
147 return err
148 }
149
150 numRecs, err := pd.getArrayLength()
151 if err != nil {
152 return err
153 }
154 if numRecs >= 0 {
155 b.Records = make([]*Record, numRecs)
156 }
157
158 bufSize := int(batchLen) - recordBatchOverhead
159 recBuffer, err := pd.getRawBytes(bufSize)
160 if err != nil {
161 if err == ErrInsufficientData {
162 b.PartialTrailingRecord = true
163 b.Records = nil
164 return nil
165 }
166 return err
167 }
168
169 if err = pd.pop(); err != nil {
170 return err
171 }
172
173 recBuffer, err = decompress(b.Codec, recBuffer)
174 if err != nil {
175 return err
176 }
177
178 b.recordsLen = len(recBuffer)
179 err = decode(recBuffer, recordsArray(b.Records))
180 if err == ErrInsufficientData {
181 b.PartialTrailingRecord = true
182 b.Records = nil
183 return nil
184 }
185 return err
186}
187
188func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
189 var raw []byte
190 var err error
191 if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
192 return err
193 }
194 b.recordsLen = len(raw)
195
196 b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
197 return err
198}
199
200func (b *RecordBatch) computeAttributes() int16 {
201 attr := int16(b.Codec) & int16(compressionCodecMask)
202 if b.Control {
203 attr |= controlMask
204 }
205 if b.LogAppendTime {
206 attr |= timestampTypeMask
207 }
208 return attr
209}
210
211func (b *RecordBatch) addRecord(r *Record) {
212 b.Records = append(b.Records, r)
213}