blob: e0f183f7a6f8cc7030b8551c844db5caa32082bb [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8const recordBatchOverhead = 49
9
10type recordsArray []*Record
11
12func (e recordsArray) encode(pe packetEncoder) error {
13 for _, r := range e {
14 if err := r.encode(pe); err != nil {
15 return err
16 }
17 }
18 return nil
19}
20
21func (e recordsArray) decode(pd packetDecoder) error {
22 for i := range e {
23 rec := &Record{}
24 if err := rec.decode(pd); err != nil {
25 return err
26 }
27 e[i] = rec
28 }
29 return nil
30}
31
32type RecordBatch struct {
33 FirstOffset int64
34 PartitionLeaderEpoch int32
35 Version int8
36 Codec CompressionCodec
37 CompressionLevel int
38 Control bool
39 LastOffsetDelta int32
40 FirstTimestamp time.Time
41 MaxTimestamp time.Time
42 ProducerID int64
43 ProducerEpoch int16
44 FirstSequence int32
45 Records []*Record
46 PartialTrailingRecord bool
47
48 compressedRecords []byte
49 recordsLen int // uncompressed records size
50}
51
52func (b *RecordBatch) encode(pe packetEncoder) error {
53 if b.Version != 2 {
54 return PacketEncodingError{fmt.Sprintf("unsupported compression codec (%d)", b.Codec)}
55 }
56 pe.putInt64(b.FirstOffset)
57 pe.push(&lengthField{})
58 pe.putInt32(b.PartitionLeaderEpoch)
59 pe.putInt8(b.Version)
60 pe.push(newCRC32Field(crcCastagnoli))
61 pe.putInt16(b.computeAttributes())
62 pe.putInt32(b.LastOffsetDelta)
63
64 if err := (Timestamp{&b.FirstTimestamp}).encode(pe); err != nil {
65 return err
66 }
67
68 if err := (Timestamp{&b.MaxTimestamp}).encode(pe); err != nil {
69 return err
70 }
71
72 pe.putInt64(b.ProducerID)
73 pe.putInt16(b.ProducerEpoch)
74 pe.putInt32(b.FirstSequence)
75
76 if err := pe.putArrayLength(len(b.Records)); err != nil {
77 return err
78 }
79
80 if b.compressedRecords == nil {
81 if err := b.encodeRecords(pe); err != nil {
82 return err
83 }
84 }
85 if err := pe.putRawBytes(b.compressedRecords); err != nil {
86 return err
87 }
88
89 if err := pe.pop(); err != nil {
90 return err
91 }
92 return pe.pop()
93}
94
95func (b *RecordBatch) decode(pd packetDecoder) (err error) {
96 if b.FirstOffset, err = pd.getInt64(); err != nil {
97 return err
98 }
99
100 batchLen, err := pd.getInt32()
101 if err != nil {
102 return err
103 }
104
105 if b.PartitionLeaderEpoch, err = pd.getInt32(); err != nil {
106 return err
107 }
108
109 if b.Version, err = pd.getInt8(); err != nil {
110 return err
111 }
112
113 if err = pd.push(&crc32Field{polynomial: crcCastagnoli}); err != nil {
114 return err
115 }
116
117 attributes, err := pd.getInt16()
118 if err != nil {
119 return err
120 }
121 b.Codec = CompressionCodec(int8(attributes) & compressionCodecMask)
122 b.Control = attributes&controlMask == controlMask
123
124 if b.LastOffsetDelta, err = pd.getInt32(); err != nil {
125 return err
126 }
127
128 if err = (Timestamp{&b.FirstTimestamp}).decode(pd); err != nil {
129 return err
130 }
131
132 if err = (Timestamp{&b.MaxTimestamp}).decode(pd); err != nil {
133 return err
134 }
135
136 if b.ProducerID, err = pd.getInt64(); err != nil {
137 return err
138 }
139
140 if b.ProducerEpoch, err = pd.getInt16(); err != nil {
141 return err
142 }
143
144 if b.FirstSequence, err = pd.getInt32(); err != nil {
145 return err
146 }
147
148 numRecs, err := pd.getArrayLength()
149 if err != nil {
150 return err
151 }
152 if numRecs >= 0 {
153 b.Records = make([]*Record, numRecs)
154 }
155
156 bufSize := int(batchLen) - recordBatchOverhead
157 recBuffer, err := pd.getRawBytes(bufSize)
158 if err != nil {
159 if err == ErrInsufficientData {
160 b.PartialTrailingRecord = true
161 b.Records = nil
162 return nil
163 }
164 return err
165 }
166
167 if err = pd.pop(); err != nil {
168 return err
169 }
170
171 recBuffer, err = decompress(b.Codec, recBuffer)
172 if err != nil {
173 return err
174 }
175
176 b.recordsLen = len(recBuffer)
177 err = decode(recBuffer, recordsArray(b.Records))
178 if err == ErrInsufficientData {
179 b.PartialTrailingRecord = true
180 b.Records = nil
181 return nil
182 }
183 return err
184}
185
186func (b *RecordBatch) encodeRecords(pe packetEncoder) error {
187 var raw []byte
188 var err error
189 if raw, err = encode(recordsArray(b.Records), pe.metricRegistry()); err != nil {
190 return err
191 }
192 b.recordsLen = len(raw)
193
194 b.compressedRecords, err = compress(b.Codec, b.CompressionLevel, raw)
195 return err
196}
197
198func (b *RecordBatch) computeAttributes() int16 {
199 attr := int16(b.Codec) & int16(compressionCodecMask)
200 if b.Control {
201 attr |= controlMask
202 }
203 return attr
204}
205
206func (b *RecordBatch) addRecord(r *Record) {
207 b.Records = append(b.Records, r)
208}