blob: a36f7e6296eb7bf599e60c53fc9fe4bb4e7cdabf [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8const recordBatchOverhead = 49
9
10type recordsArray []*Record
11
12func (e recordsArray) encode(pe packetEncoder) error {
13 for _, r := range e {
14 if err := r.encode(pe); err != nil {
15 return err
16 }
17 }
18 return nil
19}
20
21func (e recordsArray) decode(pd packetDecoder) error {
22 for i := range e {
23 rec := &Record{}
24 if err := rec.decode(pd); err != nil {
25 return err
26 }
27 e[i] = rec
28 }
29 return nil
30}
31
32type RecordBatch struct {
33 FirstOffset int64
34 PartitionLeaderEpoch int32
35 Version int8
36 Codec CompressionCodec
37 CompressionLevel int
38 Control bool
William Kurkiandaa6bb22019-03-07 12:26:28 -050039 LogAppendTime bool
khenaidooac637102019-01-14 15:44:34 -050040 LastOffsetDelta int32
41 FirstTimestamp time.Time
42 MaxTimestamp time.Time
43 ProducerID int64
44 ProducerEpoch int16
45 FirstSequence int32
46 Records []*Record
47 PartialTrailingRecord bool
48
49 compressedRecords []byte
50 recordsLen int // uncompressed records size
51}
52
53func (b *RecordBatch) encode(pe packetEncoder) error {
54 if b.Version != 2 {
55 return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
56 }
57 pe.putInt64(b.FirstOffset)
58 pe.push(&lengthField{})
59 pe.putInt32(b.PartitionLeaderEpoch)
60 pe.putInt8(b.Version)
61 pe.push(newCRC32Field(crcCastagnoli))
62 pe.putInt16(b.computeAttributes())
63 pe.putInt32(b.LastOffsetDelta)
64
65 if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
66 return err
67 }
68
69 if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
70 return err
71 }
72
73 pe.putInt64(b.ProducerID)
74 pe.putInt16(b.ProducerEpoch)
75 pe.putInt32(b.FirstSequence)
76
77 if err := pe.putArrayLength(len(b.Records)); err != nil {
78 return err
79 }
80
81 if b.compressedRecords == nil {
82 if err := b.encodeRecords(pe); err != nil {
83 return err
84 }
85 }
86 if err := pe.putRawBytes(b.compressedRecords); err != nil {
87 return err
88 }
89
90 if err := pe.pop(); err != nil {
91 return err
92 }
93 return pe.pop()
94}
95
96func (b *RecordBatch) decode(pd packetDecoder) (err error) {
97 if b.FirstOffset, err = pd.getInt64(); err != nil {
98 return err
99 }
100
101 batchLen, err := pd.getInt32()
102 if err != nil {
103 return err
104 }
105
106 if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
107 return err
108 }
109
110 if b.Version, err = pd.getInt8(); err != nil {
111 return err
112 }
113
114 if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
115 return err
116 }
117
118 attributes, err := pd.getInt16()
119 if err != nil {
120 return err
121 }
122 b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
123 b.Control = attributes&controlMask == controlMask
William Kurkiandaa6bb22019-03-07 12:26:28 -0500124 b.LogAppendTime = attributes&timestampTypeMask == timestampTypeMask
khenaidooac637102019-01-14 15:44:34 -0500125
126 if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
127 return err
128 }
129
130 if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
131 return err
132 }
133
134 if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
135 return err
136 }
137
138 if b.ProducerID, err = pd.getInt64(); err != nil {
139 return err
140 }
141
142 if b.ProducerEpoch, err = pd.getInt16(); err != nil {
143 return err
144 }
145
146 if b.FirstSequence, err = pd.getInt32(); err != nil {
147 return err
148 }
149
150 numRecs, err := pd.getArrayLength()
151 if err != nil {
152 return err
153 }
154 if numRecs >= 0 {
155 b.Records = make([]*Record, numRecs)
156 }
157
158 bufSize := int(batchLen) - recordBatchOverhead
159 recBuffer, err := pd.getRawBytes(bufSize)
160 if err != nil {
161 if err == ErrInsufficientData {
162 b.PartialTrailingRecord = true
163 b.Records = nil
164 return nil
165 }
166 return err
167 }
168
169 if err = pd.pop(); err != nil {
170 return err
171 }
172
173 recBuffer, err = decompress(b.Codec, recBuffer)
174 if err != nil {
175 return err
176 }
177
178 b.recordsLen = len(recBuffer)
179 err = decode(recBuffer, recordsArray(b.Records))
180 if err == ErrInsufficientData {
181 b.PartialTrailingRecord = true
182 b.Records = nil
183 return nil
184 }
185 return err
186}
187
188func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
189 var raw []byte
190 var err error
191 if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
192 return err
193 }
194 b.recordsLen = len(raw)
195
196 b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
197 return err
198}
199
200func (b *RecordBatch) computeAttributes() int16 {
201 attr := int16(b.Codec) & int16(compressionCodecMask)
202 if b.Control {
203 attr |= controlMask
204 }
William Kurkiandaa6bb22019-03-07 12:26:28 -0500205 if b.LogAppendTime {
206 attr |= timestampTypeMask
207 }
khenaidooac637102019-01-14 15:44:34 -0500208 return attr
209}
210
211func (b *RecordBatch) addRecord(r *Record) {
212 b.Records = append(b.Records, r)
213}