blob: 54b88284ad98f69212ba462fa61c7b377de4d7dd [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
Scott Baker8461e152019-10-01 14:44:30 -07004 "sort"
khenaidooac637102019-01-14 15:44:34 -05005 "time"
6)
7
8type AbortedTransaction struct {
9 ProducerID int64
10 FirstOffset int64
11}
12
13func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
14 if t.ProducerID, err = pd.getInt64(); err != nil {
15 return err
16 }
17
18 if t.FirstOffset, err = pd.getInt64(); err != nil {
19 return err
20 }
21
22 return nil
23}
24
25func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
26 pe.putInt64(t.ProducerID)
27 pe.putInt64(t.FirstOffset)
28
29 return nil
30}
31
32type FetchResponseBlock struct {
khenaidood948f772021-08-11 17:49:24 -040033 Err KError
34 HighWaterMarkOffset int64
35 LastStableOffset int64
36 LogStartOffset int64
37 AbortedTransactions []*AbortedTransaction
38 PreferredReadReplica int32
39 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
40 RecordsSet []*Records
41 Partial bool
khenaidooac637102019-01-14 15:44:34 -050042}
43
44func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
45 tmp, err := pd.getInt16()
46 if err != nil {
47 return err
48 }
49 b.Err = KError(tmp)
50
51 b.HighWaterMarkOffset, err = pd.getInt64()
52 if err != nil {
53 return err
54 }
55
56 if version >= 4 {
57 b.LastStableOffset, err = pd.getInt64()
58 if err != nil {
59 return err
60 }
61
khenaidood948f772021-08-11 17:49:24 -040062 if version >= 5 {
63 b.LogStartOffset, err = pd.getInt64()
64 if err != nil {
65 return err
66 }
67 }
68
khenaidooac637102019-01-14 15:44:34 -050069 numTransact, err := pd.getArrayLength()
70 if err != nil {
71 return err
72 }
73
74 if numTransact >= 0 {
75 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
76 }
77
78 for i := 0; i < numTransact; i++ {
79 transact := new(AbortedTransaction)
80 if err = transact.decode(pd); err != nil {
81 return err
82 }
83 b.AbortedTransactions[i] = transact
84 }
85 }
86
khenaidood948f772021-08-11 17:49:24 -040087 if version >= 11 {
88 b.PreferredReadReplica, err = pd.getInt32()
89 if err != nil {
90 return err
91 }
92 } else {
93 b.PreferredReadReplica = -1
94 }
95
khenaidooac637102019-01-14 15:44:34 -050096 recordsSize, err := pd.getInt32()
97 if err != nil {
98 return err
99 }
100
101 recordsDecoder, err := pd.getSubset(int(recordsSize))
102 if err != nil {
103 return err
104 }
105
106 b.RecordsSet = []*Records{}
107
108 for recordsDecoder.remaining() > 0 {
109 records := &Records{}
110 if err := records.decode(recordsDecoder); err != nil {
111 // If we have at least one decoded records, this is not an error
112 if err == ErrInsufficientData {
113 if len(b.RecordsSet) == 0 {
114 b.Partial = true
115 }
116 break
117 }
118 return err
119 }
120
121 partial, err := records.isPartial()
122 if err != nil {
123 return err
124 }
125
126 n, err := records.numRecords()
127 if err != nil {
128 return err
129 }
130
131 if n > 0 || (partial && len(b.RecordsSet) == 0) {
132 b.RecordsSet = append(b.RecordsSet, records)
133
134 if b.Records == nil {
135 b.Records = records
136 }
137 }
138
139 overflow, err := records.isOverflow()
140 if err != nil {
141 return err
142 }
143
144 if partial || overflow {
145 break
146 }
147 }
148
149 return nil
150}
151
152func (b *FetchResponseBlock) numRecords() (int, error) {
153 sum := 0
154
155 for _, records := range b.RecordsSet {
156 count, err := records.numRecords()
157 if err != nil {
158 return 0, err
159 }
160
161 sum += count
162 }
163
164 return sum, nil
165}
166
167func (b *FetchResponseBlock) isPartial() (bool, error) {
168 if b.Partial {
169 return true, nil
170 }
171
172 if len(b.RecordsSet) == 1 {
173 return b.RecordsSet[0].isPartial()
174 }
175
176 return false, nil
177}
178
179func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
180 pe.putInt16(int16(b.Err))
181
182 pe.putInt64(b.HighWaterMarkOffset)
183
184 if version >= 4 {
185 pe.putInt64(b.LastStableOffset)
186
khenaidood948f772021-08-11 17:49:24 -0400187 if version >= 5 {
188 pe.putInt64(b.LogStartOffset)
189 }
190
khenaidooac637102019-01-14 15:44:34 -0500191 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
192 return err
193 }
194 for _, transact := range b.AbortedTransactions {
195 if err = transact.encode(pe); err != nil {
196 return err
197 }
198 }
199 }
200
khenaidood948f772021-08-11 17:49:24 -0400201 if version >= 11 {
202 pe.putInt32(b.PreferredReadReplica)
203 }
204
khenaidooac637102019-01-14 15:44:34 -0500205 pe.push(&lengthField{})
206 for _, records := range b.RecordsSet {
207 err = records.encode(pe)
208 if err != nil {
209 return err
210 }
211 }
212 return pe.pop()
213}
214
Scott Baker8461e152019-10-01 14:44:30 -0700215func (b *FetchResponseBlock) getAbortedTransactions() []*AbortedTransaction {
216 // I can't find any doc that guarantee the field `fetchResponse.AbortedTransactions` is ordered
217 // plus Java implementation use a PriorityQueue based on `FirstOffset`. I guess we have to order it ourself
218 at := b.AbortedTransactions
219 sort.Slice(
220 at,
221 func(i, j int) bool { return at[i].FirstOffset < at[j].FirstOffset },
222 )
223 return at
224}
225
khenaidooac637102019-01-14 15:44:34 -0500226type FetchResponse struct {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500227 Blocks map[string]map[int32]*FetchResponseBlock
228 ThrottleTime time.Duration
khenaidood948f772021-08-11 17:49:24 -0400229 ErrorCode int16
230 SessionID int32
231 Version int16
William Kurkiandaa6bb22019-03-07 12:26:28 -0500232 LogAppendTime bool
233 Timestamp time.Time
khenaidooac637102019-01-14 15:44:34 -0500234}
235
236func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
237 r.Version = version
238
239 if r.Version >= 1 {
240 throttle, err := pd.getInt32()
241 if err != nil {
242 return err
243 }
244 r.ThrottleTime = time.Duration(throttle) * time.Millisecond
245 }
246
khenaidood948f772021-08-11 17:49:24 -0400247 if r.Version >= 7 {
248 r.ErrorCode, err = pd.getInt16()
249 if err != nil {
250 return err
251 }
252 r.SessionID, err = pd.getInt32()
253 if err != nil {
254 return err
255 }
256 }
257
khenaidooac637102019-01-14 15:44:34 -0500258 numTopics, err := pd.getArrayLength()
259 if err != nil {
260 return err
261 }
262
263 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
264 for i := 0; i < numTopics; i++ {
265 name, err := pd.getString()
266 if err != nil {
267 return err
268 }
269
270 numBlocks, err := pd.getArrayLength()
271 if err != nil {
272 return err
273 }
274
275 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
276
277 for j := 0; j < numBlocks; j++ {
278 id, err := pd.getInt32()
279 if err != nil {
280 return err
281 }
282
283 block := new(FetchResponseBlock)
284 err = block.decode(pd, version)
285 if err != nil {
286 return err
287 }
288 r.Blocks[name][id] = block
289 }
290 }
291
292 return nil
293}
294
295func (r *FetchResponse) encode(pe packetEncoder) (err error) {
296 if r.Version >= 1 {
297 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
298 }
299
khenaidood948f772021-08-11 17:49:24 -0400300 if r.Version >= 7 {
301 pe.putInt16(r.ErrorCode)
302 pe.putInt32(r.SessionID)
303 }
304
khenaidooac637102019-01-14 15:44:34 -0500305 err = pe.putArrayLength(len(r.Blocks))
306 if err != nil {
307 return err
308 }
309
310 for topic, partitions := range r.Blocks {
311 err = pe.putString(topic)
312 if err != nil {
313 return err
314 }
315
316 err = pe.putArrayLength(len(partitions))
317 if err != nil {
318 return err
319 }
320
321 for id, block := range partitions {
322 pe.putInt32(id)
323 err = block.encode(pe, r.Version)
324 if err != nil {
325 return err
326 }
327 }
khenaidooac637102019-01-14 15:44:34 -0500328 }
329 return nil
330}
331
332func (r *FetchResponse) key() int16 {
333 return 1
334}
335
336func (r *FetchResponse) version() int16 {
337 return r.Version
338}
339
khenaidood948f772021-08-11 17:49:24 -0400340func (r *FetchResponse) headerVersion() int16 {
341 return 0
342}
343
khenaidooac637102019-01-14 15:44:34 -0500344func (r *FetchResponse) requiredVersion() KafkaVersion {
345 switch r.Version {
khenaidood948f772021-08-11 17:49:24 -0400346 case 0:
347 return MinVersion
khenaidooac637102019-01-14 15:44:34 -0500348 case 1:
349 return V0_9_0_0
350 case 2:
351 return V0_10_0_0
352 case 3:
353 return V0_10_1_0
khenaidood948f772021-08-11 17:49:24 -0400354 case 4, 5:
khenaidooac637102019-01-14 15:44:34 -0500355 return V0_11_0_0
khenaidood948f772021-08-11 17:49:24 -0400356 case 6:
357 return V1_0_0_0
358 case 7:
359 return V1_1_0_0
360 case 8:
361 return V2_0_0_0
362 case 9, 10:
363 return V2_1_0_0
364 case 11:
365 return V2_3_0_0
khenaidooac637102019-01-14 15:44:34 -0500366 default:
khenaidood948f772021-08-11 17:49:24 -0400367 return MaxVersion
khenaidooac637102019-01-14 15:44:34 -0500368 }
369}
370
371func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
372 if r.Blocks == nil {
373 return nil
374 }
375
376 if r.Blocks[topic] == nil {
377 return nil
378 }
379
380 return r.Blocks[topic][partition]
381}
382
383func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
384 if r.Blocks == nil {
385 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
386 }
387 partitions, ok := r.Blocks[topic]
388 if !ok {
389 partitions = make(map[int32]*FetchResponseBlock)
390 r.Blocks[topic] = partitions
391 }
392 frb, ok := partitions[partition]
393 if !ok {
394 frb = new(FetchResponseBlock)
395 partitions[partition] = frb
396 }
397 frb.Err = err
398}
399
400func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
401 if r.Blocks == nil {
402 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
403 }
404 partitions, ok := r.Blocks[topic]
405 if !ok {
406 partitions = make(map[int32]*FetchResponseBlock)
407 r.Blocks[topic] = partitions
408 }
409 frb, ok := partitions[partition]
410 if !ok {
411 frb = new(FetchResponseBlock)
412 partitions[partition] = frb
413 }
414
415 return frb
416}
417
418func encodeKV(key, value Encoder) ([]byte, []byte) {
419 var kb []byte
420 var vb []byte
421 if key != nil {
422 kb, _ = key.Encode()
423 }
424 if value != nil {
425 vb, _ = value.Encode()
426 }
427
428 return kb, vb
429}
430
William Kurkiandaa6bb22019-03-07 12:26:28 -0500431func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
khenaidooac637102019-01-14 15:44:34 -0500432 frb := r.getOrCreateBlock(topic, partition)
433 kb, vb := encodeKV(key, value)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500434 if r.LogAppendTime {
435 timestamp = r.Timestamp
436 }
437 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
khenaidooac637102019-01-14 15:44:34 -0500438 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
439 if len(frb.RecordsSet) == 0 {
440 records := newLegacyRecords(&MessageSet{})
441 frb.RecordsSet = []*Records{&records}
442 }
443 set := frb.RecordsSet[0].MsgSet
444 set.Messages = append(set.Messages, msgBlock)
445}
446
William Kurkiandaa6bb22019-03-07 12:26:28 -0500447func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
khenaidooac637102019-01-14 15:44:34 -0500448 frb := r.getOrCreateBlock(topic, partition)
449 kb, vb := encodeKV(key, value)
khenaidooac637102019-01-14 15:44:34 -0500450 if len(frb.RecordsSet) == 0 {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500451 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
khenaidooac637102019-01-14 15:44:34 -0500452 frb.RecordsSet = []*Records{&records}
453 }
454 batch := frb.RecordsSet[0].RecordBatch
William Kurkiandaa6bb22019-03-07 12:26:28 -0500455 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
khenaidooac637102019-01-14 15:44:34 -0500456 batch.addRecord(rec)
457}
458
Scott Baker8461e152019-10-01 14:44:30 -0700459// AddRecordBatchWithTimestamp is similar to AddRecordWithTimestamp
460// But instead of appending 1 record to a batch, it append a new batch containing 1 record to the fetchResponse
461// Since transaction are handled on batch level (the whole batch is either committed or aborted), use this to test transactions
462func (r *FetchResponse) AddRecordBatchWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool, timestamp time.Time) {
463 frb := r.getOrCreateBlock(topic, partition)
464 kb, vb := encodeKV(key, value)
465
466 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
467 batch := &RecordBatch{
468 Version: 2,
469 LogAppendTime: r.LogAppendTime,
470 FirstTimestamp: timestamp,
471 MaxTimestamp: r.Timestamp,
472 FirstOffset: offset,
473 LastOffsetDelta: 0,
474 ProducerID: producerID,
475 IsTransactional: isTransactional,
476 }
477 rec := &Record{Key: kb, Value: vb, OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
478 batch.addRecord(rec)
479 records.RecordBatch = batch
480
481 frb.RecordsSet = append(frb.RecordsSet, &records)
482}
483
484func (r *FetchResponse) AddControlRecordWithTimestamp(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType, timestamp time.Time) {
485 frb := r.getOrCreateBlock(topic, partition)
486
487 // batch
488 batch := &RecordBatch{
489 Version: 2,
490 LogAppendTime: r.LogAppendTime,
491 FirstTimestamp: timestamp,
492 MaxTimestamp: r.Timestamp,
493 FirstOffset: offset,
494 LastOffsetDelta: 0,
495 ProducerID: producerID,
496 IsTransactional: true,
497 Control: true,
498 }
499
500 // records
501 records := newDefaultRecords(nil)
502 records.RecordBatch = batch
503
504 // record
505 crAbort := ControlRecord{
506 Version: 0,
507 Type: recordType,
508 }
509 crKey := &realEncoder{raw: make([]byte, 4)}
510 crValue := &realEncoder{raw: make([]byte, 6)}
511 crAbort.encode(crKey, crValue)
512 rec := &Record{Key: ByteEncoder(crKey.raw), Value: ByteEncoder(crValue.raw), OffsetDelta: 0, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
513 batch.addRecord(rec)
514
515 frb.RecordsSet = append(frb.RecordsSet, &records)
516}
517
William Kurkiandaa6bb22019-03-07 12:26:28 -0500518func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
519 r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
520}
521
522func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
523 r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
524}
525
Scott Baker8461e152019-10-01 14:44:30 -0700526func (r *FetchResponse) AddRecordBatch(topic string, partition int32, key, value Encoder, offset int64, producerID int64, isTransactional bool) {
527 r.AddRecordBatchWithTimestamp(topic, partition, key, value, offset, producerID, isTransactional, time.Time{})
528}
529
530func (r *FetchResponse) AddControlRecord(topic string, partition int32, offset int64, producerID int64, recordType ControlRecordType) {
531 // define controlRecord key and value
532 r.AddControlRecordWithTimestamp(topic, partition, offset, producerID, recordType, time.Time{})
533}
534
khenaidooac637102019-01-14 15:44:34 -0500535func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
536 frb := r.getOrCreateBlock(topic, partition)
537 if len(frb.RecordsSet) == 0 {
538 records := newDefaultRecords(&RecordBatch{Version: 2})
539 frb.RecordsSet = []*Records{&records}
540 }
541 batch := frb.RecordsSet[0].RecordBatch
542 batch.LastOffsetDelta = offset
543}
544
545func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
546 frb := r.getOrCreateBlock(topic, partition)
547 frb.LastStableOffset = offset
548}