blob: 1b02b00703f72d1e48734647228b0236d685d1a7 [file] [log] [blame]
kesavand2cde6582020-06-22 04:56:23 -04001package sarama
2
3import (
4 "sort"
5 "time"
6)
7
kesavandc71914f2022-03-25 11:19:03 +05308const invalidPreferredReplicaID = -1
9
kesavand2cde6582020-06-22 04:56:23 -040010type AbortedTransaction struct {
11 ProducerID int64
12 FirstOffset int64
13}
14
15func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
16 if t.ProducerID, err = pd.getInt64(); err != nil {
17 return err
18 }
19
20 if t.FirstOffset, err = pd.getInt64(); err != nil {
21 return err
22 }
23
24 return nil
25}
26
27func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
28 pe.putInt64(t.ProducerID)
29 pe.putInt64(t.FirstOffset)
30
31 return nil
32}
33
34type FetchResponseBlock struct {
kesavandc71914f2022-03-25 11:19:03 +053035 Err KError
36 HighWaterMarkOffset int64
37 LastStableOffset int64
38 LastRecordsBatchOffset *int64
39 LogStartOffset int64
40 AbortedTransactions []*AbortedTransaction
41 PreferredReadReplica int32
42 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
43 RecordsSet []*Records
44 Partial bool
kesavand2cde6582020-06-22 04:56:23 -040045}
46
47func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
48 tmp, err := pd.getInt16()
49 if err != nil {
50 return err
51 }
52 b.Err = KError(tmp)
53
54 b.HighWaterMarkOffset, err = pd.getInt64()
55 if err != nil {
56 return err
57 }
58
59 if version >= 4 {
60 b.LastStableOffset, err = pd.getInt64()
61 if err != nil {
62 return err
63 }
64
kesavandc71914f2022-03-25 11:19:03 +053065 if version >= 5 {
66 b.LogStartOffset, err = pd.getInt64()
67 if err != nil {
68 return err
69 }
70 }
71
kesavand2cde6582020-06-22 04:56:23 -040072 numTransact, err := pd.getArrayLength()
73 if err != nil {
74 return err
75 }
76
77 if numTransact >= 0 {
78 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
79 }
80
81 for i := 0; i < numTransact; i++ {
82 transact := new(AbortedTransaction)
83 if err = transact.decode(pd); err != nil {
84 return err
85 }
86 b.AbortedTransactions[i] = transact
87 }
88 }
89
kesavandc71914f2022-03-25 11:19:03 +053090 if version >= 11 {
91 b.PreferredReadReplica, err = pd.getInt32()
92 if err != nil {
93 return err
94 }
95 } else {
96 b.PreferredReadReplica = -1
97 }
98
kesavand2cde6582020-06-22 04:56:23 -040099 recordsSize, err := pd.getInt32()
100 if err != nil {
101 return err
102 }
103
104 recordsDecoder, err := pd.getSubset(int(recordsSize))
105 if err != nil {
106 return err
107 }
108
109 b.RecordsSet = []*Records{}
110
111 for recordsDecoder.remaining() > 0 {
112 records := &Records{}
113 if err := records.decode(recordsDecoder); err != nil {
114 // If we have at least one decoded records, this is not an error
115 if err == ErrInsufficientData {
116 if len(b.RecordsSet) == 0 {
117 b.Partial = true
118 }
119 break
120 }
121 return err
122 }
123
kesavandc71914f2022-03-25 11:19:03 +0530124 b.LastRecordsBatchOffset, err = records.recordsOffset()
125 if err != nil {
126 return err
127 }
128
kesavand2cde6582020-06-22 04:56:23 -0400129 partial, err := records.isPartial()
130 if err != nil {
131 return err
132 }
133
134 n, err := records.numRecords()
135 if err != nil {
136 return err
137 }
138
139 if n > 0 || (partial && len(b.RecordsSet) == 0) {
140 b.RecordsSet = append(b.RecordsSet, records)
141
142 if b.Records == nil {
143 b.Records = records
144 }
145 }
146
147 overflow, err := records.isOverflow()
148 if err != nil {
149 return err
150 }
151
152 if partial || overflow {
153 break
154 }
155 }
156
157 return nil
158}
159
160func (b *FetchResponseBlock) numRecords() (int, error) {
161 sum := 0
162
163 for _, records := range b.RecordsSet {
164 count, err := records.numRecords()
165 if err != nil {
166 return 0, err
167 }
168
169 sum += count
170 }
171
172 return sum, nil
173}
174
175func (b *FetchResponseBlock) isPartial() (bool, error) {
176 if b.Partial {
177 return true, nil
178 }
179
180 if len(b.RecordsSet) == 1 {
181 return b.RecordsSet[0].isPartial()
182 }
183
184 return false, nil
185}
186
187func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
188 pe.putInt16(int16(b.Err))
189
190 pe.putInt64(b.HighWaterMarkOffset)
191
192 if version >= 4 {
193 pe.putInt64(b.LastStableOffset)
194
kesavandc71914f2022-03-25 11:19:03 +0530195 if version >= 5 {
196 pe.putInt64(b.LogStartOffset)
197 }
198
kesavand2cde6582020-06-22 04:56:23 -0400199 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
200 return err
201 }
202 for _, transact := range b.AbortedTransactions {
203 if err = transact.encode(pe); err != nil {
204 return err
205 }
206 }
207 }
208
kesavandc71914f2022-03-25 11:19:03 +0530209 if version >= 11 {
210 pe.putInt32(b.PreferredReadReplica)
211 }
212
kesavand2cde6582020-06-22 04:56:23 -0400213 pe.push(&lengthField{})
214 for _, records := range b.RecordsSet {
215 err = records.encode(pe)
216 if err != nil {
217 return err
218 }
219 }
220 return pe.pop()
221}
222
223func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
224 // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
225 // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
226 at := b.AbortedTransactions
227 sort.Slice(
228 at,
229 func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
230 )
231 return at
232}
233
234type FetchResponse struct {
235 Blocks map[string]map[int32]*FetchResponseBlock
236 ThrottleTime time.Duration
kesavandc71914f2022-03-25 11:19:03 +0530237 ErrorCode int16
238 SessionID int32
239 Version int16
kesavand2cde6582020-06-22 04:56:23 -0400240 LogAppendTime bool
241 Timestamp time.Time
242}
243
244func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
245 r.Version = version
246
247 if r.Version >= 1 {
248 throttle, err := pd.getInt32()
249 if err != nil {
250 return err
251 }
252 r.ThrottleTime = time.Duration(throttle) * time.Millisecond
253 }
254
kesavandc71914f2022-03-25 11:19:03 +0530255 if r.Version >= 7 {
256 r.ErrorCode, err = pd.getInt16()
257 if err != nil {
258 return err
259 }
260 r.SessionID, err = pd.getInt32()
261 if err != nil {
262 return err
263 }
264 }
265
kesavand2cde6582020-06-22 04:56:23 -0400266 numTopics, err := pd.getArrayLength()
267 if err != nil {
268 return err
269 }
270
271 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
272 for i := 0; i < numTopics; i++ {
273 name, err := pd.getString()
274 if err != nil {
275 return err
276 }
277
278 numBlocks, err := pd.getArrayLength()
279 if err != nil {
280 return err
281 }
282
283 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
284
285 for j := 0; j < numBlocks; j++ {
286 id, err := pd.getInt32()
287 if err != nil {
288 return err
289 }
290
291 block := new(FetchResponseBlock)
292 err = block.decode(pd, version)
293 if err != nil {
294 return err
295 }
296 r.Blocks[name][id] = block
297 }
298 }
299
300 return nil
301}
302
303func (r *FetchResponse) encode(pe packetEncoder) (err error) {
304 if r.Version >= 1 {
305 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
306 }
307
kesavandc71914f2022-03-25 11:19:03 +0530308 if r.Version >= 7 {
309 pe.putInt16(r.ErrorCode)
310 pe.putInt32(r.SessionID)
311 }
312
kesavand2cde6582020-06-22 04:56:23 -0400313 err = pe.putArrayLength(len(r.Blocks))
314 if err != nil {
315 return err
316 }
317
318 for topic, partitions := range r.Blocks {
319 err = pe.putString(topic)
320 if err != nil {
321 return err
322 }
323
324 err = pe.putArrayLength(len(partitions))
325 if err != nil {
326 return err
327 }
328
329 for id, block := range partitions {
330 pe.putInt32(id)
331 err = block.encode(pe, r.Version)
332 if err != nil {
333 return err
334 }
335 }
kesavand2cde6582020-06-22 04:56:23 -0400336 }
337 return nil
338}
339
340func (r *FetchResponse) key() int16 {
341 return 1
342}
343
344func (r *FetchResponse) version() int16 {
345 return r.Version
346}
347
kesavandc71914f2022-03-25 11:19:03 +0530348func (r *FetchResponse) headerVersion() int16 {
349 return 0
350}
351
kesavand2cde6582020-06-22 04:56:23 -0400352func (r *FetchResponse) requiredVersion() KafkaVersion {
353 switch r.Version {
kesavandc71914f2022-03-25 11:19:03 +0530354 case 0:
355 return MinVersion
kesavand2cde6582020-06-22 04:56:23 -0400356 case 1:
357 return V0_9_0_0
358 case 2:
359 return V0_10_0_0
360 case 3:
361 return V0_10_1_0
kesavandc71914f2022-03-25 11:19:03 +0530362 case 4, 5:
kesavand2cde6582020-06-22 04:56:23 -0400363 return V0_11_0_0
kesavandc71914f2022-03-25 11:19:03 +0530364 case 6:
365 return V1_0_0_0
366 case 7:
367 return V1_1_0_0
368 case 8:
369 return V2_0_0_0
370 case 9, 10:
371 return V2_1_0_0
372 case 11:
373 return V2_3_0_0
kesavand2cde6582020-06-22 04:56:23 -0400374 default:
kesavandc71914f2022-03-25 11:19:03 +0530375 return MaxVersion
kesavand2cde6582020-06-22 04:56:23 -0400376 }
377}
378
379func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
380 if r.Blocks == nil {
381 return nil
382 }
383
384 if r.Blocks[topic] == nil {
385 return nil
386 }
387
388 return r.Blocks[topic][partition]
389}
390
391func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
392 if r.Blocks == nil {
393 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
394 }
395 partitions, ok := r.Blocks[topic]
396 if !ok {
397 partitions = make(map[int32]*FetchResponseBlock)
398 r.Blocks[topic] = partitions
399 }
400 frb, ok := partitions[partition]
401 if !ok {
402 frb = new(FetchResponseBlock)
403 partitions[partition] = frb
404 }
405 frb.Err = err
406}
407
408func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
409 if r.Blocks == nil {
410 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
411 }
412 partitions, ok := r.Blocks[topic]
413 if !ok {
414 partitions = make(map[int32]*FetchResponseBlock)
415 r.Blocks[topic] = partitions
416 }
417 frb, ok := partitions[partition]
418 if !ok {
419 frb = new(FetchResponseBlock)
420 partitions[partition] = frb
421 }
422
423 return frb
424}
425
426func encodeKV(key, value Encoder) ([]byte, []byte) {
427 var kb []byte
428 var vb []byte
429 if key != nil {
430 kb, _ = key.Encode()
431 }
432 if value != nil {
433 vb, _ = value.Encode()
434 }
435
436 return kb, vb
437}
438
439func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
440 frb := r.getOrCreateBlock(topic, partition)
441 kb, vb := encodeKV(key, value)
442 if r.LogAppendTime {
443 timestamp = r.Timestamp
444 }
445 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
446 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
447 if len(frb.RecordsSet) == 0 {
448 records := newLegacyRecords(&MessageSet{})
449 frb.RecordsSet = []*Records{&records}
450 }
451 set := frb.RecordsSet[0].MsgSet
452 set.Messages = append(set.Messages, msgBlock)
453}
454
455func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
456 frb := r.getOrCreateBlock(topic, partition)
457 kb, vb := encodeKV(key, value)
458 if len(frb.RecordsSet) == 0 {
459 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
460 frb.RecordsSet = []*Records{&records}
461 }
462 batch := frb.RecordsSet[0].RecordBatch
463 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
464 batch.addRecord(rec)
465}
466
467// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
468// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
469// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
470func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
471 frb := r.getOrCreateBlock(topic, partition)
472 kb, vb := encodeKV(key, value)
473
474 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
475 batch := &RecordBatch{
476 Version: 2,
477 LogAppendTime: r.LogAppendTime,
478 FirstTimestamp: timestamp,
479 MaxTimestamp: r.Timestamp,
480 FirstOffset: offset,
481 LastOffsetDelta: 0,
482 ProducerID: producerID,
483 IsTransactional: isTransactional,
484 }
485 rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
486 batch.addRecord(rec)
487 records.RecordBatch = batch
488
489 frb.RecordsSet = append(frb.RecordsSet, &records)
490}
491
492func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
493 frb := r.getOrCreateBlock(topic, partition)
494
495 // batch
496 batch := &RecordBatch{
497 Version: 2,
498 LogAppendTime: r.LogAppendTime,
499 FirstTimestamp: timestamp,
500 MaxTimestamp: r.Timestamp,
501 FirstOffset: offset,
502 LastOffsetDelta: 0,
503 ProducerID: producerID,
504 IsTransactional: true,
505 Control: true,
506 }
507
508 // records
509 records := newDefaultRecords(nil)
510 records.RecordBatch = batch
511
512 // record
513 crAbort := ControlRecord{
514 Version: 0,
515 Type: recordType,
516 }
517 crKey := &realEncoder{raw: make([]byte, 4)}
518 crValue := &realEncoder{raw: make([]byte, 6)}
519 crAbort.encode(crKey, crValue)
520 rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
521 batch.addRecord(rec)
522
523 frb.RecordsSet = append(frb.RecordsSet, &records)
524}
525
526func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
527 r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
528}
529
530func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
531 r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
532}
533
534func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
535 r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
536}
537
538func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
539 // define controlRecord key and value
540 r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
541}
542
543func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
544 frb := r.getOrCreateBlock(topic, partition)
545 if len(frb.RecordsSet) == 0 {
546 records := newDefaultRecords(&RecordBatch{Version: 2})
547 frb.RecordsSet = []*Records{&records}
548 }
549 batch := frb.RecordsSet[0].RecordBatch
550 batch.LastOffsetDelta = offset
551}
552
553func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
554 frb := r.getOrCreateBlock(topic, partition)
555 frb.LastStableOffset = offset
556}