blob: f893aeff7d525aeb310c369072046561bd32ac12 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type fetchRequestBlock struct {
khenaidood948f772021-08-11 17:49:24 -04004 Version int16
5 currentLeaderEpoch int32
6 fetchOffset int64
7 logStartOffset int64
8 maxBytes int32
khenaidooac637102019-01-14 15:44:34 -05009}
10
khenaidood948f772021-08-11 17:49:24 -040011func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
12 b.Version = version
13 if b.Version >= 9 {
14 pe.putInt32(b.currentLeaderEpoch)
15 }
khenaidooac637102019-01-14 15:44:34 -050016 pe.putInt64(b.fetchOffset)
khenaidood948f772021-08-11 17:49:24 -040017 if b.Version >= 5 {
18 pe.putInt64(b.logStartOffset)
19 }
khenaidooac637102019-01-14 15:44:34 -050020 pe.putInt32(b.maxBytes)
21 return nil
22}
23
khenaidood948f772021-08-11 17:49:24 -040024func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
25 b.Version = version
26 if b.Version >= 9 {
27 if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
28 return err
29 }
30 }
khenaidooac637102019-01-14 15:44:34 -050031 if b.fetchOffset, err = pd.getInt64(); err != nil {
32 return err
33 }
khenaidood948f772021-08-11 17:49:24 -040034 if b.Version >= 5 {
35 if b.logStartOffset, err = pd.getInt64(); err != nil {
36 return err
37 }
38 }
khenaidooac637102019-01-14 15:44:34 -050039 if b.maxBytes, err = pd.getInt32(); err != nil {
40 return err
41 }
42 return nil
43}
44
45// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
46// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
47// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
48type FetchRequest struct {
khenaidood948f772021-08-11 17:49:24 -040049 MaxWaitTime int32
50 MinBytes int32
51 MaxBytes int32
52 Version int16
53 Isolation IsolationLevel
54 SessionID int32
55 SessionEpoch int32
56 blocks map[string]map[int32]*fetchRequestBlock
57 forgotten map[string][]int32
58 RackID string
khenaidooac637102019-01-14 15:44:34 -050059}
60
61type IsolationLevel int8
62
63const (
Scott Baker8461e152019-10-01 14:44:30 -070064 ReadUncommitted IsolationLevel = iota
65 ReadCommitted
khenaidooac637102019-01-14 15:44:34 -050066)
67
68func (r *FetchRequest) encode(pe packetEncoder) (err error) {
69 pe.putInt32(-1) // replica ID is always -1 for clients
70 pe.putInt32(r.MaxWaitTime)
71 pe.putInt32(r.MinBytes)
72 if r.Version >= 3 {
73 pe.putInt32(r.MaxBytes)
74 }
75 if r.Version >= 4 {
76 pe.putInt8(int8(r.Isolation))
77 }
khenaidood948f772021-08-11 17:49:24 -040078 if r.Version >= 7 {
79 pe.putInt32(r.SessionID)
80 pe.putInt32(r.SessionEpoch)
81 }
khenaidooac637102019-01-14 15:44:34 -050082 err = pe.putArrayLength(len(r.blocks))
83 if err != nil {
84 return err
85 }
86 for topic, blocks := range r.blocks {
87 err = pe.putString(topic)
88 if err != nil {
89 return err
90 }
91 err = pe.putArrayLength(len(blocks))
92 if err != nil {
93 return err
94 }
95 for partition, block := range blocks {
96 pe.putInt32(partition)
khenaidood948f772021-08-11 17:49:24 -040097 err = block.encode(pe, r.Version)
khenaidooac637102019-01-14 15:44:34 -050098 if err != nil {
99 return err
100 }
101 }
102 }
khenaidood948f772021-08-11 17:49:24 -0400103 if r.Version >= 7 {
104 err = pe.putArrayLength(len(r.forgotten))
105 if err != nil {
106 return err
107 }
108 for topic, partitions := range r.forgotten {
109 err = pe.putString(topic)
110 if err != nil {
111 return err
112 }
113 err = pe.putArrayLength(len(partitions))
114 if err != nil {
115 return err
116 }
117 for _, partition := range partitions {
118 pe.putInt32(partition)
119 }
120 }
121 }
122 if r.Version >= 11 {
123 err = pe.putString(r.RackID)
124 if err != nil {
125 return err
126 }
127 }
128
khenaidooac637102019-01-14 15:44:34 -0500129 return nil
130}
131
132func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
133 r.Version = version
khenaidood948f772021-08-11 17:49:24 -0400134
khenaidooac637102019-01-14 15:44:34 -0500135 if _, err = pd.getInt32(); err != nil {
136 return err
137 }
138 if r.MaxWaitTime, err = pd.getInt32(); err != nil {
139 return err
140 }
141 if r.MinBytes, err = pd.getInt32(); err != nil {
142 return err
143 }
144 if r.Version >= 3 {
145 if r.MaxBytes, err = pd.getInt32(); err != nil {
146 return err
147 }
148 }
149 if r.Version >= 4 {
150 isolation, err := pd.getInt8()
151 if err != nil {
152 return err
153 }
154 r.Isolation = IsolationLevel(isolation)
155 }
khenaidood948f772021-08-11 17:49:24 -0400156 if r.Version >= 7 {
157 r.SessionID, err = pd.getInt32()
158 if err != nil {
159 return err
160 }
161 r.SessionEpoch, err = pd.getInt32()
162 if err != nil {
163 return err
164 }
165 }
khenaidooac637102019-01-14 15:44:34 -0500166 topicCount, err := pd.getArrayLength()
167 if err != nil {
168 return err
169 }
170 if topicCount == 0 {
171 return nil
172 }
173 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
174 for i := 0; i < topicCount; i++ {
175 topic, err := pd.getString()
176 if err != nil {
177 return err
178 }
179 partitionCount, err := pd.getArrayLength()
180 if err != nil {
181 return err
182 }
183 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
184 for j := 0; j < partitionCount; j++ {
185 partition, err := pd.getInt32()
186 if err != nil {
187 return err
188 }
189 fetchBlock := &fetchRequestBlock{}
khenaidood948f772021-08-11 17:49:24 -0400190 if err = fetchBlock.decode(pd, r.Version); err != nil {
khenaidooac637102019-01-14 15:44:34 -0500191 return err
192 }
193 r.blocks[topic][partition] = fetchBlock
194 }
195 }
khenaidood948f772021-08-11 17:49:24 -0400196
197 if r.Version >= 7 {
198 forgottenCount, err := pd.getArrayLength()
199 if err != nil {
200 return err
201 }
202 r.forgotten = make(map[string][]int32)
203 for i := 0; i < forgottenCount; i++ {
204 topic, err := pd.getString()
205 if err != nil {
206 return err
207 }
208 partitionCount, err := pd.getArrayLength()
209 if err != nil {
210 return err
211 }
212 r.forgotten[topic] = make([]int32, partitionCount)
213
214 for j := 0; j < partitionCount; j++ {
215 partition, err := pd.getInt32()
216 if err != nil {
217 return err
218 }
219 r.forgotten[topic][j] = partition
220 }
221 }
222 }
223
224 if r.Version >= 11 {
225 r.RackID, err = pd.getString()
226 if err != nil {
227 return err
228 }
229 }
230
khenaidooac637102019-01-14 15:44:34 -0500231 return nil
232}
233
234func (r *FetchRequest) key() int16 {
235 return 1
236}
237
238func (r *FetchRequest) version() int16 {
239 return r.Version
240}
241
khenaidood948f772021-08-11 17:49:24 -0400242func (r *FetchRequest) headerVersion() int16 {
243 return 1
244}
245
khenaidooac637102019-01-14 15:44:34 -0500246func (r *FetchRequest) requiredVersion() KafkaVersion {
247 switch r.Version {
khenaidood948f772021-08-11 17:49:24 -0400248 case 0:
249 return MinVersion
khenaidooac637102019-01-14 15:44:34 -0500250 case 1:
251 return V0_9_0_0
252 case 2:
253 return V0_10_0_0
254 case 3:
255 return V0_10_1_0
khenaidood948f772021-08-11 17:49:24 -0400256 case 4, 5:
khenaidooac637102019-01-14 15:44:34 -0500257 return V0_11_0_0
khenaidood948f772021-08-11 17:49:24 -0400258 case 6:
259 return V1_0_0_0
260 case 7:
261 return V1_1_0_0
262 case 8:
263 return V2_0_0_0
264 case 9, 10:
265 return V2_1_0_0
266 case 11:
267 return V2_3_0_0
khenaidooac637102019-01-14 15:44:34 -0500268 default:
khenaidood948f772021-08-11 17:49:24 -0400269 return MaxVersion
khenaidooac637102019-01-14 15:44:34 -0500270 }
271}
272
273func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
274 if r.blocks == nil {
275 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
276 }
277
khenaidood948f772021-08-11 17:49:24 -0400278 if r.Version >= 7 && r.forgotten == nil {
279 r.forgotten = make(map[string][]int32)
280 }
281
khenaidooac637102019-01-14 15:44:34 -0500282 if r.blocks[topic] == nil {
283 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
284 }
285
286 tmp := new(fetchRequestBlock)
khenaidood948f772021-08-11 17:49:24 -0400287 tmp.Version = r.Version
khenaidooac637102019-01-14 15:44:34 -0500288 tmp.maxBytes = maxBytes
289 tmp.fetchOffset = fetchOffset
khenaidood948f772021-08-11 17:49:24 -0400290 if r.Version >= 9 {
291 tmp.currentLeaderEpoch = int32(-1)
292 }
khenaidooac637102019-01-14 15:44:34 -0500293
294 r.blocks[topic][partitionID] = tmp
295}