blob: 9df99c17eb4b56d303a5fab09434274cc8631585 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
7type AbortedTransaction struct {
8 ProducerID int64
9 FirstOffset int64
10}
11
12func (t *AbortedTransaction) decode(pd packetDecoder) (err error) {
13 if t.ProducerID, err = pd.getInt64(); err != nil {
14 return err
15 }
16
17 if t.FirstOffset, err = pd.getInt64(); err != nil {
18 return err
19 }
20
21 return nil
22}
23
24func (t *AbortedTransaction) encode(pe packetEncoder) (err error) {
25 pe.putInt64(t.ProducerID)
26 pe.putInt64(t.FirstOffset)
27
28 return nil
29}
30
31type FetchResponseBlock struct {
32 Err KError
33 HighWaterMarkOffset int64
34 LastStableOffset int64
35 AbortedTransactions []*AbortedTransaction
36 Records *Records // deprecated: use FetchResponseBlock.RecordsSet
37 RecordsSet []*Records
38 Partial bool
39}
40
41func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
42 tmp, err := pd.getInt16()
43 if err != nil {
44 return err
45 }
46 b.Err = KError(tmp)
47
48 b.HighWaterMarkOffset, err = pd.getInt64()
49 if err != nil {
50 return err
51 }
52
53 if version >= 4 {
54 b.LastStableOffset, err = pd.getInt64()
55 if err != nil {
56 return err
57 }
58
59 numTransact, err := pd.getArrayLength()
60 if err != nil {
61 return err
62 }
63
64 if numTransact >= 0 {
65 b.AbortedTransactions = make([]*AbortedTransaction, numTransact)
66 }
67
68 for i := 0; i < numTransact; i++ {
69 transact := new(AbortedTransaction)
70 if err = transact.decode(pd); err != nil {
71 return err
72 }
73 b.AbortedTransactions[i] = transact
74 }
75 }
76
77 recordsSize, err := pd.getInt32()
78 if err != nil {
79 return err
80 }
81
82 recordsDecoder, err := pd.getSubset(int(recordsSize))
83 if err != nil {
84 return err
85 }
86
87 b.RecordsSet = []*Records{}
88
89 for recordsDecoder.remaining() > 0 {
90 records := &Records{}
91 if err := records.decode(recordsDecoder); err != nil {
92 // If we have at least one decoded records, this is not an error
93 if err == ErrInsufficientData {
94 if len(b.RecordsSet) == 0 {
95 b.Partial = true
96 }
97 break
98 }
99 return err
100 }
101
102 partial, err := records.isPartial()
103 if err != nil {
104 return err
105 }
106
107 n, err := records.numRecords()
108 if err != nil {
109 return err
110 }
111
112 if n > 0 || (partial && len(b.RecordsSet) == 0) {
113 b.RecordsSet = append(b.RecordsSet, records)
114
115 if b.Records == nil {
116 b.Records = records
117 }
118 }
119
120 overflow, err := records.isOverflow()
121 if err != nil {
122 return err
123 }
124
125 if partial || overflow {
126 break
127 }
128 }
129
130 return nil
131}
132
133func (b *FetchResponseBlock) numRecords() (int, error) {
134 sum := 0
135
136 for _, records := range b.RecordsSet {
137 count, err := records.numRecords()
138 if err != nil {
139 return 0, err
140 }
141
142 sum += count
143 }
144
145 return sum, nil
146}
147
148func (b *FetchResponseBlock) isPartial() (bool, error) {
149 if b.Partial {
150 return true, nil
151 }
152
153 if len(b.RecordsSet) == 1 {
154 return b.RecordsSet[0].isPartial()
155 }
156
157 return false, nil
158}
159
160func (b *FetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
161 pe.putInt16(int16(b.Err))
162
163 pe.putInt64(b.HighWaterMarkOffset)
164
165 if version >= 4 {
166 pe.putInt64(b.LastStableOffset)
167
168 if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
169 return err
170 }
171 for _, transact := range b.AbortedTransactions {
172 if err = transact.encode(pe); err != nil {
173 return err
174 }
175 }
176 }
177
178 pe.push(&lengthField{})
179 for _, records := range b.RecordsSet {
180 err = records.encode(pe)
181 if err != nil {
182 return err
183 }
184 }
185 return pe.pop()
186}
187
188type FetchResponse struct {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500189 Blocks map[string]map[int32]*FetchResponseBlock
190 ThrottleTime time.Duration
191 Version int16 // v1 requires 0.9+, v2 requires 0.10+
192 LogAppendTime bool
193 Timestamp time.Time
khenaidooac637102019-01-14 15:44:34 -0500194}
195
196func (r *FetchResponse) decode(pd packetDecoder, version int16) (err error) {
197 r.Version = version
198
199 if r.Version >= 1 {
200 throttle, err := pd.getInt32()
201 if err != nil {
202 return err
203 }
204 r.ThrottleTime = time.Duration(throttle) * time.Millisecond
205 }
206
207 numTopics, err := pd.getArrayLength()
208 if err != nil {
209 return err
210 }
211
212 r.Blocks = make(map[string]map[int32]*FetchResponseBlock, numTopics)
213 for i := 0; i < numTopics; i++ {
214 name, err := pd.getString()
215 if err != nil {
216 return err
217 }
218
219 numBlocks, err := pd.getArrayLength()
220 if err != nil {
221 return err
222 }
223
224 r.Blocks[name] = make(map[int32]*FetchResponseBlock, numBlocks)
225
226 for j := 0; j < numBlocks; j++ {
227 id, err := pd.getInt32()
228 if err != nil {
229 return err
230 }
231
232 block := new(FetchResponseBlock)
233 err = block.decode(pd, version)
234 if err != nil {
235 return err
236 }
237 r.Blocks[name][id] = block
238 }
239 }
240
241 return nil
242}
243
244func (r *FetchResponse) encode(pe packetEncoder) (err error) {
245 if r.Version >= 1 {
246 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
247 }
248
249 err = pe.putArrayLength(len(r.Blocks))
250 if err != nil {
251 return err
252 }
253
254 for topic, partitions := range r.Blocks {
255 err = pe.putString(topic)
256 if err != nil {
257 return err
258 }
259
260 err = pe.putArrayLength(len(partitions))
261 if err != nil {
262 return err
263 }
264
265 for id, block := range partitions {
266 pe.putInt32(id)
267 err = block.encode(pe, r.Version)
268 if err != nil {
269 return err
270 }
271 }
272
273 }
274 return nil
275}
276
277func (r *FetchResponse) key() int16 {
278 return 1
279}
280
281func (r *FetchResponse) version() int16 {
282 return r.Version
283}
284
285func (r *FetchResponse) requiredVersion() KafkaVersion {
286 switch r.Version {
287 case 1:
288 return V0_9_0_0
289 case 2:
290 return V0_10_0_0
291 case 3:
292 return V0_10_1_0
293 case 4:
294 return V0_11_0_0
295 default:
296 return MinVersion
297 }
298}
299
300func (r *FetchResponse) GetBlock(topic string, partition int32) *FetchResponseBlock {
301 if r.Blocks == nil {
302 return nil
303 }
304
305 if r.Blocks[topic] == nil {
306 return nil
307 }
308
309 return r.Blocks[topic][partition]
310}
311
312func (r *FetchResponse) AddError(topic string, partition int32, err KError) {
313 if r.Blocks == nil {
314 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
315 }
316 partitions, ok := r.Blocks[topic]
317 if !ok {
318 partitions = make(map[int32]*FetchResponseBlock)
319 r.Blocks[topic] = partitions
320 }
321 frb, ok := partitions[partition]
322 if !ok {
323 frb = new(FetchResponseBlock)
324 partitions[partition] = frb
325 }
326 frb.Err = err
327}
328
329func (r *FetchResponse) getOrCreateBlock(topic string, partition int32) *FetchResponseBlock {
330 if r.Blocks == nil {
331 r.Blocks = make(map[string]map[int32]*FetchResponseBlock)
332 }
333 partitions, ok := r.Blocks[topic]
334 if !ok {
335 partitions = make(map[int32]*FetchResponseBlock)
336 r.Blocks[topic] = partitions
337 }
338 frb, ok := partitions[partition]
339 if !ok {
340 frb = new(FetchResponseBlock)
341 partitions[partition] = frb
342 }
343
344 return frb
345}
346
347func encodeKV(key, value Encoder) ([]byte, []byte) {
348 var kb []byte
349 var vb []byte
350 if key != nil {
351 kb, _ = key.Encode()
352 }
353 if value != nil {
354 vb, _ = value.Encode()
355 }
356
357 return kb, vb
358}
359
William Kurkiandaa6bb22019-03-07 12:26:28 -0500360func (r *FetchResponse) AddMessageWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time, version int8) {
khenaidooac637102019-01-14 15:44:34 -0500361 frb := r.getOrCreateBlock(topic, partition)
362 kb, vb := encodeKV(key, value)
William Kurkiandaa6bb22019-03-07 12:26:28 -0500363 if r.LogAppendTime {
364 timestamp = r.Timestamp
365 }
366 msg := &Message{Key: kb, Value: vb, LogAppendTime: r.LogAppendTime, Timestamp: timestamp, Version: version}
khenaidooac637102019-01-14 15:44:34 -0500367 msgBlock := &MessageBlock{Msg: msg, Offset: offset}
368 if len(frb.RecordsSet) == 0 {
369 records := newLegacyRecords(&MessageSet{})
370 frb.RecordsSet = []*Records{&records}
371 }
372 set := frb.RecordsSet[0].MsgSet
373 set.Messages = append(set.Messages, msgBlock)
374}
375
William Kurkiandaa6bb22019-03-07 12:26:28 -0500376func (r *FetchResponse) AddRecordWithTimestamp(topic string, partition int32, key, value Encoder, offset int64, timestamp time.Time) {
khenaidooac637102019-01-14 15:44:34 -0500377 frb := r.getOrCreateBlock(topic, partition)
378 kb, vb := encodeKV(key, value)
khenaidooac637102019-01-14 15:44:34 -0500379 if len(frb.RecordsSet) == 0 {
William Kurkiandaa6bb22019-03-07 12:26:28 -0500380 records := newDefaultRecords(&RecordBatch{Version: 2, LogAppendTime: r.LogAppendTime, FirstTimestamp: timestamp, MaxTimestamp: r.Timestamp})
khenaidooac637102019-01-14 15:44:34 -0500381 frb.RecordsSet = []*Records{&records}
382 }
383 batch := frb.RecordsSet[0].RecordBatch
William Kurkiandaa6bb22019-03-07 12:26:28 -0500384 rec := &Record{Key: kb, Value: vb, OffsetDelta: offset, TimestampDelta: timestamp.Sub(batch.FirstTimestamp)}
khenaidooac637102019-01-14 15:44:34 -0500385 batch.addRecord(rec)
386}
387
William Kurkiandaa6bb22019-03-07 12:26:28 -0500388func (r *FetchResponse) AddMessage(topic string, partition int32, key, value Encoder, offset int64) {
389 r.AddMessageWithTimestamp(topic, partition, key, value, offset, time.Time{}, 0)
390}
391
392func (r *FetchResponse) AddRecord(topic string, partition int32, key, value Encoder, offset int64) {
393 r.AddRecordWithTimestamp(topic, partition, key, value, offset, time.Time{})
394}
395
khenaidooac637102019-01-14 15:44:34 -0500396func (r *FetchResponse) SetLastOffsetDelta(topic string, partition int32, offset int32) {
397 frb := r.getOrCreateBlock(topic, partition)
398 if len(frb.RecordsSet) == 0 {
399 records := newDefaultRecords(&RecordBatch{Version: 2})
400 frb.RecordsSet = []*Records{&records}
401 }
402 batch := frb.RecordsSet[0].RecordBatch
403 batch.LastOffsetDelta = offset
404}
405
406func (r *FetchResponse) SetLastStableOffset(topic string, partition int32, offset int64) {
407 frb := r.getOrCreateBlock(topic, partition)
408 frb.LastStableOffset = offset
409}