blob: 90acfc28029fab2e6afa3a4340901f7a46d797b9 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
7type AbortedTransaction struct {
8 ProducerID int64
9 FirstOffset int64
10}
11
12func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
13 if t.ProducerID, err = pd.getInt64(); err != nil {
14 return err
15 }
16
17 if t.FirstOffset, err = pd.getInt64(); err != nil {
18 return err
19 }
20
21 return nil
22}
23
24func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
25 pe.putInt64(t.ProducerID)
26 pe.putInt64(t.FirstOffset)
27
28 return nil
29}
30
31type FetchResponseBlock struct {
32 Err KError
33 HighWaterMarkOffset int64
34 LastStableOffset int64
35 AbortedTransactions []*AbortedTransaction
36 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
37 RecordsSet []*Records
38 Partial bool
39}
40
41func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
42 tmp, err := pd.getInt16()
43 if err != nil {
44 return err
45 }
46 b.Err = KError(tmp)
47
48 b.HighWaterMarkOffset, err = pd.getInt64()
49 if err != nil {
50 return err
51 }
52
53 if version >= 4 {
54 b.LastStableOffset, err = pd.getInt64()
55 if err != nil {
56 return err
57 }
58
59 numTransact, err := pd.getArrayLength()
60 if err != nil {
61 return err
62 }
63
64 if numTransact >= 0 {
65 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
66 }
67
68 for i := 0; i < numTransact; i++ {
69 transact := new(AbortedTransaction)
70 if err = transact.decode(pd); err != nil {
71 return err
72 }
73 b.AbortedTransactions[i] = transact
74 }
75 }
76
77 recordsSize, err := pd.getInt32()
78 if err != nil {
79 return err
80 }
81
82 recordsDecoder, err := pd.getSubset(int(recordsSize))
83 if err != nil {
84 return err
85 }
86
87 b.RecordsSet = []*Records{}
88
89 for recordsDecoder.remaining() > 0 {
90 records := &Records{}
91 if err := records.decode(recordsDecoder); err != nil {
92 // If we have at least one decoded records, this is not an error
93 if err == ErrInsufficientData {
94 if len(b.RecordsSet) == 0 {
95 b.Partial = true
96 }
97 break
98 }
99 return err
100 }
101
102 partial, err := records.isPartial()
103 if err != nil {
104 return err
105 }
106
107 n, err := records.numRecords()
108 if err != nil {
109 return err
110 }
111
112 if n > 0 || (partial && len(b.RecordsSet) == 0) {
113 b.RecordsSet = append(b.RecordsSet, records)
114
115 if b.Records == nil {
116 b.Records = records
117 }
118 }
119
120 overflow, err := records.isOverflow()
121 if err != nil {
122 return err
123 }
124
125 if partial || overflow {
126 break
127 }
128 }
129
130 return nil
131}
132
133func (b *FetchResponseBlock) numRecords() (int, error) {
134 sum := 0
135
136 for _, records := range b.RecordsSet {
137 count, err := records.numRecords()
138 if err != nil {
139 return 0, err
140 }
141
142 sum += count
143 }
144
145 return sum, nil
146}
147
148func (b *FetchResponseBlock) isPartial() (bool, error) {
149 if b.Partial {
150 return true, nil
151 }
152
153 if len(b.RecordsSet) == 1 {
154 return b.RecordsSet[0].isPartial()
155 }
156
157 return false, nil
158}
159
160func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
161 pe.putInt16(int16(b.Err))
162
163 pe.putInt64(b.HighWaterMarkOffset)
164
165 if version >= 4 {
166 pe.putInt64(b.LastStableOffset)
167
168 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
169 return err
170 }
171 for _, transact := range b.AbortedTransactions {
172 if err = transact.encode(pe); err != nil {
173 return err
174 }
175 }
176 }
177
178 pe.push(&lengthField{})
179 for _, records := range b.RecordsSet {
180 err = records.encode(pe)
181 if err != nil {
182 return err
183 }
184 }
185 return pe.pop()
186}
187
188type FetchResponse struct {
189 Blocks map[string]map[int32]*FetchResponseBlock
190 ThrottleTime time.Duration
191 Version int16 // v1 requires 0.9+, v2 requires 0.10+
192}
193
194func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
195 r.Version = version
196
197 if r.Version >= 1 {
198 throttle, err := pd.getInt32()
199 if err != nil {
200 return err
201 }
202 r.ThrottleTime = time.Duration(throttle) * time.Millisecond
203 }
204
205 numTopics, err := pd.getArrayLength()
206 if err != nil {
207 return err
208 }
209
210 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
211 for i := 0; i < numTopics; i++ {
212 name, err := pd.getString()
213 if err != nil {
214 return err
215 }
216
217 numBlocks, err := pd.getArrayLength()
218 if err != nil {
219 return err
220 }
221
222 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
223
224 for j := 0; j < numBlocks; j++ {
225 id, err := pd.getInt32()
226 if err != nil {
227 return err
228 }
229
230 block := new(FetchResponseBlock)
231 err = block.decode(pd, version)
232 if err != nil {
233 return err
234 }
235 r.Blocks[name][id] = block
236 }
237 }
238
239 return nil
240}
241
242func (r *FetchResponse) encode(pe packetEncoder) (err error) {
243 if r.Version >= 1 {
244 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
245 }
246
247 err = pe.putArrayLength(len(r.Blocks))
248 if err != nil {
249 return err
250 }
251
252 for topic, partitions := range r.Blocks {
253 err = pe.putString(topic)
254 if err != nil {
255 return err
256 }
257
258 err = pe.putArrayLength(len(partitions))
259 if err != nil {
260 return err
261 }
262
263 for id, block := range partitions {
264 pe.putInt32(id)
265 err = block.encode(pe, r.Version)
266 if err != nil {
267 return err
268 }
269 }
270
271 }
272 return nil
273}
274
275func (r *FetchResponse) key() int16 {
276 return 1
277}
278
279func (r *FetchResponse) version() int16 {
280 return r.Version
281}
282
283func (r *FetchResponse) requiredVersion() KafkaVersion {
284 switch r.Version {
285 case 1:
286 return V0_9_0_0
287 case 2:
288 return V0_10_0_0
289 case 3:
290 return V0_10_1_0
291 case 4:
292 return V0_11_0_0
293 default:
294 return MinVersion
295 }
296}
297
298func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
299 if r.Blocks == nil {
300 return nil
301 }
302
303 if r.Blocks[topic] == nil {
304 return nil
305 }
306
307 return r.Blocks[topic][partition]
308}
309
310func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
311 if r.Blocks == nil {
312 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
313 }
314 partitions, ok := r.Blocks[topic]
315 if !ok {
316 partitions = make(map[int32]*FetchResponseBlock)
317 r.Blocks[topic] = partitions
318 }
319 frb, ok := partitions[partition]
320 if !ok {
321 frb = new(FetchResponseBlock)
322 partitions[partition] = frb
323 }
324 frb.Err = err
325}
326
327func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
328 if r.Blocks == nil {
329 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
330 }
331 partitions, ok := r.Blocks[topic]
332 if !ok {
333 partitions = make(map[int32]*FetchResponseBlock)
334 r.Blocks[topic] = partitions
335 }
336 frb, ok := partitions[partition]
337 if !ok {
338 frb = new(FetchResponseBlock)
339 partitions[partition] = frb
340 }
341
342 return frb
343}
344
345func encodeKV(key, value Encoder) ([]byte, []byte) {
346 var kb []byte
347 var vb []byte
348 if key != nil {
349 kb, _ = key.Encode()
350 }
351 if value != nil {
352 vb, _ = value.Encode()
353 }
354
355 return kb, vb
356}
357
358func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
359 frb := r.getOrCreateBlock(topic, partition)
360 kb, vb := encodeKV(key, value)
361 msg := &Message{Key: kb, Value: vb}
362 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
363 if len(frb.RecordsSet) == 0 {
364 records := newLegacyRecords(&MessageSet{})
365 frb.RecordsSet = []*Records{&records}
366 }
367 set := frb.RecordsSet[0].MsgSet
368 set.Messages = append(set.Messages, msgBlock)
369}
370
371func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
372 frb := r.getOrCreateBlock(topic, partition)
373 kb, vb := encodeKV(key, value)
374 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset}
375 if len(frb.RecordsSet) == 0 {
376 records := newDefaultRecords(&RecordBatch{Version: 2})
377 frb.RecordsSet = []*Records{&records}
378 }
379 batch := frb.RecordsSet[0].RecordBatch
380 batch.addRecord(rec)
381}
382
383func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
384 frb := r.getOrCreateBlock(topic, partition)
385 if len(frb.RecordsSet) == 0 {
386 records := newDefaultRecords(&RecordBatch{Version: 2})
387 frb.RecordsSet = []*Records{&records}
388 }
389 batch := frb.RecordsSet[0].RecordBatch
390 batch.LastOffsetDelta = offset
391}
392
393func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
394 frb := r.getOrCreateBlock(topic, partition)
395 frb.LastStableOffset = offset
396}