blob: 54b88284ad98f69212ba462fa61c7b377de4d7dd [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "sort"
5 "time"
6)
7
8type AbortedTransaction struct {
9 ProducerID int64
10 FirstOffset int64
11}
12
13func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
14 if t.ProducerID, err = pd.getInt64(); err != nil {
15 return err
16 }
17
18 if t.FirstOffset, err = pd.getInt64(); err != nil {
19 return err
20 }
21
22 return nil
23}
24
25func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
26 pe.putInt64(t.ProducerID)
27 pe.putInt64(t.FirstOffset)
28
29 return nil
30}
31
32type FetchResponseBlock struct {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000033 Err KError
34 HighWaterMarkOffset int64
35 LastStableOffset int64
36 LogStartOffset int64
37 AbortedTransactions []*AbortedTransaction
38 PreferredReadReplica int32
39 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
40 RecordsSet []*Records
41 Partial bool
Scott Bakered4efab2020-01-13 19:12:25 -080042}
43
44func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
45 tmp, err := pd.getInt16()
46 if err != nil {
47 return err
48 }
49 b.Err = KError(tmp)
50
51 b.HighWaterMarkOffset, err = pd.getInt64()
52 if err != nil {
53 return err
54 }
55
56 if version >= 4 {
57 b.LastStableOffset, err = pd.getInt64()
58 if err != nil {
59 return err
60 }
61
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000062 if version >= 5 {
63 b.LogStartOffset, err = pd.getInt64()
64 if err != nil {
65 return err
66 }
67 }
68
Scott Bakered4efab2020-01-13 19:12:25 -080069 numTransact, err := pd.getArrayLength()
70 if err != nil {
71 return err
72 }
73
74 if numTransact >= 0 {
75 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
76 }
77
78 for i := 0; i < numTransact; i++ {
79 transact := new(AbortedTransaction)
80 if err = transact.decode(pd); err != nil {
81 return err
82 }
83 b.AbortedTransactions[i] = transact
84 }
85 }
86
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000087 if version >= 11 {
88 b.PreferredReadReplica, err = pd.getInt32()
89 if err != nil {
90 return err
91 }
92 } else {
93 b.PreferredReadReplica = -1
94 }
95
Scott Bakered4efab2020-01-13 19:12:25 -080096 recordsSize, err := pd.getInt32()
97 if err != nil {
98 return err
99 }
100
101 recordsDecoder, err := pd.getSubset(int(recordsSize))
102 if err != nil {
103 return err
104 }
105
106 b.RecordsSet = []*Records{}
107
108 for recordsDecoder.remaining() > 0 {
109 records := &Records{}
110 if err := records.decode(recordsDecoder); err != nil {
111 // If we have at least one decoded records, this is not an error
112 if err == ErrInsufficientData {
113 if len(b.RecordsSet) == 0 {
114 b.Partial = true
115 }
116 break
117 }
118 return err
119 }
120
121 partial, err := records.isPartial()
122 if err != nil {
123 return err
124 }
125
126 n, err := records.numRecords()
127 if err != nil {
128 return err
129 }
130
131 if n > 0 || (partial && len(b.RecordsSet) == 0) {
132 b.RecordsSet = append(b.RecordsSet, records)
133
134 if b.Records == nil {
135 b.Records = records
136 }
137 }
138
139 overflow, err := records.isOverflow()
140 if err != nil {
141 return err
142 }
143
144 if partial || overflow {
145 break
146 }
147 }
148
149 return nil
150}
151
152func (b *FetchResponseBlock) numRecords() (int, error) {
153 sum := 0
154
155 for _, records := range b.RecordsSet {
156 count, err := records.numRecords()
157 if err != nil {
158 return 0, err
159 }
160
161 sum += count
162 }
163
164 return sum, nil
165}
166
167func (b *FetchResponseBlock) isPartial() (bool, error) {
168 if b.Partial {
169 return true, nil
170 }
171
172 if len(b.RecordsSet) == 1 {
173 return b.RecordsSet[0].isPartial()
174 }
175
176 return false, nil
177}
178
179func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
180 pe.putInt16(int16(b.Err))
181
182 pe.putInt64(b.HighWaterMarkOffset)
183
184 if version >= 4 {
185 pe.putInt64(b.LastStableOffset)
186
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000187 if version >= 5 {
188 pe.putInt64(b.LogStartOffset)
189 }
190
Scott Bakered4efab2020-01-13 19:12:25 -0800191 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
192 return err
193 }
194 for _, transact := range b.AbortedTransactions {
195 if err = transact.encode(pe); err != nil {
196 return err
197 }
198 }
199 }
200
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000201 if version >= 11 {
202 pe.putInt32(b.PreferredReadReplica)
203 }
204
Scott Bakered4efab2020-01-13 19:12:25 -0800205 pe.push(&lengthField{})
206 for _, records := range b.RecordsSet {
207 err = records.encode(pe)
208 if err != nil {
209 return err
210 }
211 }
212 return pe.pop()
213}
214
215func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
216 // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
217 // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
218 at := b.AbortedTransactions
219 sort.Slice(
220 at,
221 func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
222 )
223 return at
224}
225
226type FetchResponse struct {
227 Blocks map[string]map[int32]*FetchResponseBlock
228 ThrottleTime time.Duration
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000229 ErrorCode int16
230 SessionID int32
231 Version int16
Scott Bakered4efab2020-01-13 19:12:25 -0800232 LogAppendTime bool
233 Timestamp time.Time
234}
235
236func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
237 r.Version = version
238
239 if r.Version >= 1 {
240 throttle, err := pd.getInt32()
241 if err != nil {
242 return err
243 }
244 r.ThrottleTime = time.Duration(throttle) * time.Millisecond
245 }
246
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000247 if r.Version >= 7 {
248 r.ErrorCode, err = pd.getInt16()
249 if err != nil {
250 return err
251 }
252 r.SessionID, err = pd.getInt32()
253 if err != nil {
254 return err
255 }
256 }
257
Scott Bakered4efab2020-01-13 19:12:25 -0800258 numTopics, err := pd.getArrayLength()
259 if err != nil {
260 return err
261 }
262
263 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
264 for i := 0; i < numTopics; i++ {
265 name, err := pd.getString()
266 if err != nil {
267 return err
268 }
269
270 numBlocks, err := pd.getArrayLength()
271 if err != nil {
272 return err
273 }
274
275 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
276
277 for j := 0; j < numBlocks; j++ {
278 id, err := pd.getInt32()
279 if err != nil {
280 return err
281 }
282
283 block := new(FetchResponseBlock)
284 err = block.decode(pd, version)
285 if err != nil {
286 return err
287 }
288 r.Blocks[name][id] = block
289 }
290 }
291
292 return nil
293}
294
295func (r *FetchResponse) encode(pe packetEncoder) (err error) {
296 if r.Version >= 1 {
297 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
298 }
299
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000300 if r.Version >= 7 {
301 pe.putInt16(r.ErrorCode)
302 pe.putInt32(r.SessionID)
303 }
304
Scott Bakered4efab2020-01-13 19:12:25 -0800305 err = pe.putArrayLength(len(r.Blocks))
306 if err != nil {
307 return err
308 }
309
310 for topic, partitions := range r.Blocks {
311 err = pe.putString(topic)
312 if err != nil {
313 return err
314 }
315
316 err = pe.putArrayLength(len(partitions))
317 if err != nil {
318 return err
319 }
320
321 for id, block := range partitions {
322 pe.putInt32(id)
323 err = block.encode(pe, r.Version)
324 if err != nil {
325 return err
326 }
327 }
Scott Bakered4efab2020-01-13 19:12:25 -0800328 }
329 return nil
330}
331
332func (r *FetchResponse) key() int16 {
333 return 1
334}
335
336func (r *FetchResponse) version() int16 {
337 return r.Version
338}
339
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000340func (r *FetchResponse) headerVersion() int16 {
341 return 0
342}
343
Scott Bakered4efab2020-01-13 19:12:25 -0800344func (r *FetchResponse) requiredVersion() KafkaVersion {
345 switch r.Version {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000346 case 0:
347 return MinVersion
Scott Bakered4efab2020-01-13 19:12:25 -0800348 case 1:
349 return V0_9_0_0
350 case 2:
351 return V0_10_0_0
352 case 3:
353 return V0_10_1_0
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000354 case 4, 5:
Scott Bakered4efab2020-01-13 19:12:25 -0800355 return V0_11_0_0
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000356 case 6:
357 return V1_0_0_0
358 case 7:
359 return V1_1_0_0
360 case 8:
361 return V2_0_0_0
362 case 9, 10:
363 return V2_1_0_0
364 case 11:
365 return V2_3_0_0
Scott Bakered4efab2020-01-13 19:12:25 -0800366 default:
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000367 return MaxVersion
Scott Bakered4efab2020-01-13 19:12:25 -0800368 }
369}
370
371func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
372 if r.Blocks == nil {
373 return nil
374 }
375
376 if r.Blocks[topic] == nil {
377 return nil
378 }
379
380 return r.Blocks[topic][partition]
381}
382
383func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
384 if r.Blocks == nil {
385 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
386 }
387 partitions, ok := r.Blocks[topic]
388 if !ok {
389 partitions = make(map[int32]*FetchResponseBlock)
390 r.Blocks[topic] = partitions
391 }
392 frb, ok := partitions[partition]
393 if !ok {
394 frb = new(FetchResponseBlock)
395 partitions[partition] = frb
396 }
397 frb.Err = err
398}
399
400func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
401 if r.Blocks == nil {
402 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
403 }
404 partitions, ok := r.Blocks[topic]
405 if !ok {
406 partitions = make(map[int32]*FetchResponseBlock)
407 r.Blocks[topic] = partitions
408 }
409 frb, ok := partitions[partition]
410 if !ok {
411 frb = new(FetchResponseBlock)
412 partitions[partition] = frb
413 }
414
415 return frb
416}
417
418func encodeKV(key, value Encoder) ([]byte, []byte) {
419 var kb []byte
420 var vb []byte
421 if key != nil {
422 kb, _ = key.Encode()
423 }
424 if value != nil {
425 vb, _ = value.Encode()
426 }
427
428 return kb, vb
429}
430
431func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
432 frb := r.getOrCreateBlock(topic, partition)
433 kb, vb := encodeKV(key, value)
434 if r.LogAppendTime {
435 timestamp = r.Timestamp
436 }
437 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
438 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
439 if len(frb.RecordsSet) == 0 {
440 records := newLegacyRecords(&MessageSet{})
441 frb.RecordsSet = []*Records{&records}
442 }
443 set := frb.RecordsSet[0].MsgSet
444 set.Messages = append(set.Messages, msgBlock)
445}
446
447func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
448 frb := r.getOrCreateBlock(topic, partition)
449 kb, vb := encodeKV(key, value)
450 if len(frb.RecordsSet) == 0 {
451 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
452 frb.RecordsSet = []*Records{&records}
453 }
454 batch := frb.RecordsSet[0].RecordBatch
455 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
456 batch.addRecord(rec)
457}
458
459// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
460// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
461// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
462func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
463 frb := r.getOrCreateBlock(topic, partition)
464 kb, vb := encodeKV(key, value)
465
466 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
467 batch := &RecordBatch{
468 Version: 2,
469 LogAppendTime: r.LogAppendTime,
470 FirstTimestamp: timestamp,
471 MaxTimestamp: r.Timestamp,
472 FirstOffset: offset,
473 LastOffsetDelta: 0,
474 ProducerID: producerID,
475 IsTransactional: isTransactional,
476 }
477 rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
478 batch.addRecord(rec)
479 records.RecordBatch = batch
480
481 frb.RecordsSet = append(frb.RecordsSet, &records)
482}
483
484func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
485 frb := r.getOrCreateBlock(topic, partition)
486
487 // batch
488 batch := &RecordBatch{
489 Version: 2,
490 LogAppendTime: r.LogAppendTime,
491 FirstTimestamp: timestamp,
492 MaxTimestamp: r.Timestamp,
493 FirstOffset: offset,
494 LastOffsetDelta: 0,
495 ProducerID: producerID,
496 IsTransactional: true,
497 Control: true,
498 }
499
500 // records
501 records := newDefaultRecords(nil)
502 records.RecordBatch = batch
503
504 // record
505 crAbort := ControlRecord{
506 Version: 0,
507 Type: recordType,
508 }
509 crKey := &realEncoder{raw: make([]byte, 4)}
510 crValue := &realEncoder{raw: make([]byte, 6)}
511 crAbort.encode(crKey, crValue)
512 rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
513 batch.addRecord(rec)
514
515 frb.RecordsSet = append(frb.RecordsSet, &records)
516}
517
518func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
519 r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
520}
521
522func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
523 r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
524}
525
526func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
527 r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
528}
529
530func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
531 // define controlRecord key and value
532 r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
533}
534
535func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
536 frb := r.getOrCreateBlock(topic, partition)
537 if len(frb.RecordsSet) == 0 {
538 records := newDefaultRecords(&RecordBatch{Version: 2})
539 frb.RecordsSet = []*Records{&records}
540 }
541 batch := frb.RecordsSet[0].RecordBatch
542 batch.LastOffsetDelta = offset
543}
544
545func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
546 frb := r.getOrCreateBlock(topic, partition)
547 frb.LastStableOffset = offset
548}