blob: 26936d9688fc53ea8bf6166386e4585eff623a61 [file] [log] [blame]
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +00001package sarama
2
3import (
4 "sort"
5 "time"
6)
7
8type AbortedTransaction struct {
9 ProducerID int64
10 FirstOffset int64
11}
12
13func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
14 if t.ProducerID, err = pd.getInt64(); err != nil {
15 return err
16 }
17
18 if t.FirstOffset, err = pd.getInt64(); err != nil {
19 return err
20 }
21
22 return nil
23}
24
25func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
26 pe.putInt64(t.ProducerID)
27 pe.putInt64(t.FirstOffset)
28
29 return nil
30}
31
32type FetchResponseBlock struct {
33 Err KError
34 HighWaterMarkOffset int64
35 LastStableOffset int64
Scott Baker105df152020-04-13 15:55:14 -070036 LogStartOffset int64
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000037 AbortedTransactions []*AbortedTransaction
38 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
39 RecordsSet []*Records
40 Partial bool
41}
42
43func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
44 tmp, err := pd.getInt16()
45 if err != nil {
46 return err
47 }
48 b.Err = KError(tmp)
49
50 b.HighWaterMarkOffset, err = pd.getInt64()
51 if err != nil {
52 return err
53 }
54
55 if version >= 4 {
56 b.LastStableOffset, err = pd.getInt64()
57 if err != nil {
58 return err
59 }
60
Scott Baker105df152020-04-13 15:55:14 -070061 if version >= 5 {
62 b.LogStartOffset, err = pd.getInt64()
63 if err != nil {
64 return err
65 }
66 }
67
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +000068 numTransact, err := pd.getArrayLength()
69 if err != nil {
70 return err
71 }
72
73 if numTransact >= 0 {
74 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
75 }
76
77 for i := 0; i < numTransact; i++ {
78 transact := new(AbortedTransaction)
79 if err = transact.decode(pd); err != nil {
80 return err
81 }
82 b.AbortedTransactions[i] = transact
83 }
84 }
85
86 recordsSize, err := pd.getInt32()
87 if err != nil {
88 return err
89 }
90
91 recordsDecoder, err := pd.getSubset(int(recordsSize))
92 if err != nil {
93 return err
94 }
95
96 b.RecordsSet = []*Records{}
97
98 for recordsDecoder.remaining() > 0 {
99 records := &Records{}
100 if err := records.decode(recordsDecoder); err != nil {
101 // If we have at least one decoded records, this is not an error
102 if err == ErrInsufficientData {
103 if len(b.RecordsSet) == 0 {
104 b.Partial = true
105 }
106 break
107 }
108 return err
109 }
110
111 partial, err := records.isPartial()
112 if err != nil {
113 return err
114 }
115
116 n, err := records.numRecords()
117 if err != nil {
118 return err
119 }
120
121 if n > 0 || (partial && len(b.RecordsSet) == 0) {
122 b.RecordsSet = append(b.RecordsSet, records)
123
124 if b.Records == nil {
125 b.Records = records
126 }
127 }
128
129 overflow, err := records.isOverflow()
130 if err != nil {
131 return err
132 }
133
134 if partial || overflow {
135 break
136 }
137 }
138
139 return nil
140}
141
142func (b *FetchResponseBlock) numRecords() (int, error) {
143 sum := 0
144
145 for _, records := range b.RecordsSet {
146 count, err := records.numRecords()
147 if err != nil {
148 return 0, err
149 }
150
151 sum += count
152 }
153
154 return sum, nil
155}
156
157func (b *FetchResponseBlock) isPartial() (bool, error) {
158 if b.Partial {
159 return true, nil
160 }
161
162 if len(b.RecordsSet) == 1 {
163 return b.RecordsSet[0].isPartial()
164 }
165
166 return false, nil
167}
168
169func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
170 pe.putInt16(int16(b.Err))
171
172 pe.putInt64(b.HighWaterMarkOffset)
173
174 if version >= 4 {
175 pe.putInt64(b.LastStableOffset)
176
Scott Baker105df152020-04-13 15:55:14 -0700177 if version >= 5 {
178 pe.putInt64(b.LogStartOffset)
179 }
180
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000181 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
182 return err
183 }
184 for _, transact := range b.AbortedTransactions {
185 if err = transact.encode(pe); err != nil {
186 return err
187 }
188 }
189 }
190
191 pe.push(&lengthField{})
192 for _, records := range b.RecordsSet {
193 err = records.encode(pe)
194 if err != nil {
195 return err
196 }
197 }
198 return pe.pop()
199}
200
201func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
202 // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
203 // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
204 at := b.AbortedTransactions
205 sort.Slice(
206 at,
207 func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
208 )
209 return at
210}
211
212type FetchResponse struct {
213 Blocks map[string]map[int32]*FetchResponseBlock
214 ThrottleTime time.Duration
Scott Baker105df152020-04-13 15:55:14 -0700215 ErrorCode int16
216 SessionID int32
217 Version int16
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000218 LogAppendTime bool
219 Timestamp time.Time
220}
221
222func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
223 r.Version = version
224
225 if r.Version >= 1 {
226 throttle, err := pd.getInt32()
227 if err != nil {
228 return err
229 }
230 r.ThrottleTime = time.Duration(throttle) * time.Millisecond
231 }
232
Scott Baker105df152020-04-13 15:55:14 -0700233 if r.Version >= 7 {
234 r.ErrorCode, err = pd.getInt16()
235 if err != nil {
236 return err
237 }
238 r.SessionID, err = pd.getInt32()
239 if err != nil {
240 return err
241 }
242 }
243
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000244 numTopics, err := pd.getArrayLength()
245 if err != nil {
246 return err
247 }
248
249 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
250 for i := 0; i < numTopics; i++ {
251 name, err := pd.getString()
252 if err != nil {
253 return err
254 }
255
256 numBlocks, err := pd.getArrayLength()
257 if err != nil {
258 return err
259 }
260
261 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
262
263 for j := 0; j < numBlocks; j++ {
264 id, err := pd.getInt32()
265 if err != nil {
266 return err
267 }
268
269 block := new(FetchResponseBlock)
270 err = block.decode(pd, version)
271 if err != nil {
272 return err
273 }
274 r.Blocks[name][id] = block
275 }
276 }
277
278 return nil
279}
280
281func (r *FetchResponse) encode(pe packetEncoder) (err error) {
282 if r.Version >= 1 {
283 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
284 }
285
Scott Baker105df152020-04-13 15:55:14 -0700286 if r.Version >= 7 {
287 pe.putInt16(r.ErrorCode)
288 pe.putInt32(r.SessionID)
289 }
290
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000291 err = pe.putArrayLength(len(r.Blocks))
292 if err != nil {
293 return err
294 }
295
296 for topic, partitions := range r.Blocks {
297 err = pe.putString(topic)
298 if err != nil {
299 return err
300 }
301
302 err = pe.putArrayLength(len(partitions))
303 if err != nil {
304 return err
305 }
306
307 for id, block := range partitions {
308 pe.putInt32(id)
309 err = block.encode(pe, r.Version)
310 if err != nil {
311 return err
312 }
313 }
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000314 }
315 return nil
316}
317
318func (r *FetchResponse) key() int16 {
319 return 1
320}
321
322func (r *FetchResponse) version() int16 {
323 return r.Version
324}
325
326func (r *FetchResponse) requiredVersion() KafkaVersion {
327 switch r.Version {
Scott Baker105df152020-04-13 15:55:14 -0700328 case 0:
329 return MinVersion
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000330 case 1:
331 return V0_9_0_0
332 case 2:
333 return V0_10_0_0
334 case 3:
335 return V0_10_1_0
Scott Baker105df152020-04-13 15:55:14 -0700336 case 4, 5:
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000337 return V0_11_0_0
Scott Baker105df152020-04-13 15:55:14 -0700338 case 6:
339 return V1_0_0_0
340 case 7:
341 return V1_1_0_0
342 case 8:
343 return V2_0_0_0
344 case 9, 10:
345 return V2_1_0_0
346 case 11:
347 return V2_3_0_0
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000348 default:
Scott Baker105df152020-04-13 15:55:14 -0700349 return MaxVersion
Dinesh Belwalkare63f7f92019-11-22 23:11:16 +0000350 }
351}
352
353func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
354 if r.Blocks == nil {
355 return nil
356 }
357
358 if r.Blocks[topic] == nil {
359 return nil
360 }
361
362 return r.Blocks[topic][partition]
363}
364
365func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
366 if r.Blocks == nil {
367 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
368 }
369 partitions, ok := r.Blocks[topic]
370 if !ok {
371 partitions = make(map[int32]*FetchResponseBlock)
372 r.Blocks[topic] = partitions
373 }
374 frb, ok := partitions[partition]
375 if !ok {
376 frb = new(FetchResponseBlock)
377 partitions[partition] = frb
378 }
379 frb.Err = err
380}
381
382func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
383 if r.Blocks == nil {
384 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
385 }
386 partitions, ok := r.Blocks[topic]
387 if !ok {
388 partitions = make(map[int32]*FetchResponseBlock)
389 r.Blocks[topic] = partitions
390 }
391 frb, ok := partitions[partition]
392 if !ok {
393 frb = new(FetchResponseBlock)
394 partitions[partition] = frb
395 }
396
397 return frb
398}
399
400func encodeKV(key, value Encoder) ([]byte, []byte) {
401 var kb []byte
402 var vb []byte
403 if key != nil {
404 kb, _ = key.Encode()
405 }
406 if value != nil {
407 vb, _ = value.Encode()
408 }
409
410 return kb, vb
411}
412
413func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
414 frb := r.getOrCreateBlock(topic, partition)
415 kb, vb := encodeKV(key, value)
416 if r.LogAppendTime {
417 timestamp = r.Timestamp
418 }
419 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
420 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
421 if len(frb.RecordsSet) == 0 {
422 records := newLegacyRecords(&MessageSet{})
423 frb.RecordsSet = []*Records{&records}
424 }
425 set := frb.RecordsSet[0].MsgSet
426 set.Messages = append(set.Messages, msgBlock)
427}
428
429func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
430 frb := r.getOrCreateBlock(topic, partition)
431 kb, vb := encodeKV(key, value)
432 if len(frb.RecordsSet) == 0 {
433 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
434 frb.RecordsSet = []*Records{&records}
435 }
436 batch := frb.RecordsSet[0].RecordBatch
437 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
438 batch.addRecord(rec)
439}
440
441// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
442// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
443// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
444func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
445 frb := r.getOrCreateBlock(topic, partition)
446 kb, vb := encodeKV(key, value)
447
448 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
449 batch := &RecordBatch{
450 Version: 2,
451 LogAppendTime: r.LogAppendTime,
452 FirstTimestamp: timestamp,
453 MaxTimestamp: r.Timestamp,
454 FirstOffset: offset,
455 LastOffsetDelta: 0,
456 ProducerID: producerID,
457 IsTransactional: isTransactional,
458 }
459 rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
460 batch.addRecord(rec)
461 records.RecordBatch = batch
462
463 frb.RecordsSet = append(frb.RecordsSet, &records)
464}
465
466func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
467 frb := r.getOrCreateBlock(topic, partition)
468
469 // batch
470 batch := &RecordBatch{
471 Version: 2,
472 LogAppendTime: r.LogAppendTime,
473 FirstTimestamp: timestamp,
474 MaxTimestamp: r.Timestamp,
475 FirstOffset: offset,
476 LastOffsetDelta: 0,
477 ProducerID: producerID,
478 IsTransactional: true,
479 Control: true,
480 }
481
482 // records
483 records := newDefaultRecords(nil)
484 records.RecordBatch = batch
485
486 // record
487 crAbort := ControlRecord{
488 Version: 0,
489 Type: recordType,
490 }
491 crKey := &realEncoder{raw: make([]byte, 4)}
492 crValue := &realEncoder{raw: make([]byte, 6)}
493 crAbort.encode(crKey, crValue)
494 rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
495 batch.addRecord(rec)
496
497 frb.RecordsSet = append(frb.RecordsSet, &records)
498}
499
500func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
501 r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
502}
503
504func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
505 r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
506}
507
508func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
509 r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
510}
511
512func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
513 // define controlRecord key and value
514 r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
515}
516
517func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
518 frb := r.getOrCreateBlock(topic, partition)
519 if len(frb.RecordsSet) == 0 {
520 records := newDefaultRecords(&RecordBatch{Version: 2})
521 frb.RecordsSet = []*Records{&records}
522 }
523 batch := frb.RecordsSet[0].RecordBatch
524 batch.LastOffsetDelta = offset
525}
526
527func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
528 frb := r.getOrCreateBlock(topic, partition)
529 frb.LastStableOffset = offset
530}