blob: 836e6dec1c2088c2b7de88c15544a5544066a1e2 [file] [log] [blame]
Scott Baker105df152020-04-13 15:55:14 -07001package sarama
2
3type fetchRequestBlock struct {
4 Version int16
5 currentLeaderEpoch int32
6 fetchOffset int64
7 logStartOffset int64
8 maxBytes int32
9}
10
11func (b *fetchRequestBlock) encode(pe packetEncoder, version int16) error {
12 b.Version = version
13 if b.Version >= 9 {
14 pe.putInt32(b.currentLeaderEpoch)
15 }
16 pe.putInt64(b.fetchOffset)
17 if b.Version >= 5 {
18 pe.putInt64(b.logStartOffset)
19 }
20 pe.putInt32(b.maxBytes)
21 return nil
22}
23
24func (b *fetchRequestBlock) decode(pd packetDecoder, version int16) (err error) {
25 b.Version = version
26 if b.Version >= 9 {
27 if b.currentLeaderEpoch, err = pd.getInt32(); err != nil {
28 return err
29 }
30 }
31 if b.fetchOffset, err = pd.getInt64(); err != nil {
32 return err
33 }
34 if b.Version >= 5 {
35 if b.logStartOffset, err = pd.getInt64(); err != nil {
36 return err
37 }
38 }
39 if b.maxBytes, err = pd.getInt32(); err != nil {
40 return err
41 }
42 return nil
43}
44
45// FetchRequest (API key 1) will fetch Kafka messages. Version 3 introduced the MaxBytes field. See
46// https://issues.apache.org/jira/browse/KAFKA-2063 for a discussion of the issues leading up to that. The KIP is at
47// https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes
48type FetchRequest struct {
49 MaxWaitTime int32
50 MinBytes int32
51 MaxBytes int32
52 Version int16
53 Isolation IsolationLevel
54 SessionID int32
55 SessionEpoch int32
56 blocks map[string]map[int32]*fetchRequestBlock
57 forgotten map[string][]int32
58}
59
60type IsolationLevel int8
61
62const (
63 ReadUncommitted IsolationLevel = iota
64 ReadCommitted
65)
66
67func (r *FetchRequest) encode(pe packetEncoder) (err error) {
68 pe.putInt32(-1) // replica ID is always -1 for clients
69 pe.putInt32(r.MaxWaitTime)
70 pe.putInt32(r.MinBytes)
71 if r.Version >= 3 {
72 pe.putInt32(r.MaxBytes)
73 }
74 if r.Version >= 4 {
75 pe.putInt8(int8(r.Isolation))
76 }
77 if r.Version >= 7 {
78 pe.putInt32(r.SessionID)
79 pe.putInt32(r.SessionEpoch)
80 }
81 err = pe.putArrayLength(len(r.blocks))
82 if err != nil {
83 return err
84 }
85 for topic, blocks := range r.blocks {
86 err = pe.putString(topic)
87 if err != nil {
88 return err
89 }
90 err = pe.putArrayLength(len(blocks))
91 if err != nil {
92 return err
93 }
94 for partition, block := range blocks {
95 pe.putInt32(partition)
96 err = block.encode(pe, r.Version)
97 if err != nil {
98 return err
99 }
100 }
101 }
102 if r.Version >= 7 {
103 err = pe.putArrayLength(len(r.forgotten))
104 if err != nil {
105 return err
106 }
107 for topic, partitions := range r.forgotten {
108 err = pe.putString(topic)
109 if err != nil {
110 return err
111 }
112 err = pe.putArrayLength(len(partitions))
113 if err != nil {
114 return err
115 }
116 for _, partition := range partitions {
117 pe.putInt32(partition)
118 }
119 }
120 }
121
122 return nil
123}
124
125func (r *FetchRequest) decode(pd packetDecoder, version int16) (err error) {
126 r.Version = version
127
128 if _, err = pd.getInt32(); err != nil {
129 return err
130 }
131 if r.MaxWaitTime, err = pd.getInt32(); err != nil {
132 return err
133 }
134 if r.MinBytes, err = pd.getInt32(); err != nil {
135 return err
136 }
137 if r.Version >= 3 {
138 if r.MaxBytes, err = pd.getInt32(); err != nil {
139 return err
140 }
141 }
142 if r.Version >= 4 {
143 isolation, err := pd.getInt8()
144 if err != nil {
145 return err
146 }
147 r.Isolation = IsolationLevel(isolation)
148 }
149 if r.Version >= 7 {
150 r.SessionID, err = pd.getInt32()
151 if err != nil {
152 return err
153 }
154 r.SessionEpoch, err = pd.getInt32()
155 if err != nil {
156 return err
157 }
158 }
159 topicCount, err := pd.getArrayLength()
160 if err != nil {
161 return err
162 }
163 if topicCount == 0 {
164 return nil
165 }
166 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
167 for i := 0; i < topicCount; i++ {
168 topic, err := pd.getString()
169 if err != nil {
170 return err
171 }
172 partitionCount, err := pd.getArrayLength()
173 if err != nil {
174 return err
175 }
176 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
177 for j := 0; j < partitionCount; j++ {
178 partition, err := pd.getInt32()
179 if err != nil {
180 return err
181 }
182 fetchBlock := &fetchRequestBlock{}
183 if err = fetchBlock.decode(pd, r.Version); err != nil {
184 return err
185 }
186 r.blocks[topic][partition] = fetchBlock
187 }
188 }
189
190 if r.Version >= 7 {
191 forgottenCount, err := pd.getArrayLength()
192 if err != nil {
193 return err
194 }
195 if forgottenCount == 0 {
196 return nil
197 }
198 r.forgotten = make(map[string][]int32)
199 for i := 0; i < forgottenCount; i++ {
200 topic, err := pd.getString()
201 if err != nil {
202 return err
203 }
204 partitionCount, err := pd.getArrayLength()
205 if err != nil {
206 return err
207 }
208 r.forgotten[topic] = make([]int32, partitionCount)
209
210 for j := 0; j < partitionCount; j++ {
211 partition, err := pd.getInt32()
212 if err != nil {
213 return err
214 }
215 r.forgotten[topic][j] = partition
216 }
217 }
218 }
219
220 return nil
221}
222
223func (r *FetchRequest) key() int16 {
224 return 1
225}
226
227func (r *FetchRequest) version() int16 {
228 return r.Version
229}
230
231func (r *FetchRequest) requiredVersion() KafkaVersion {
232 switch r.Version {
233 case 0:
234 return MinVersion
235 case 1:
236 return V0_9_0_0
237 case 2:
238 return V0_10_0_0
239 case 3:
240 return V0_10_1_0
241 case 4, 5:
242 return V0_11_0_0
243 case 6:
244 return V1_0_0_0
245 case 7:
246 return V1_1_0_0
247 case 8:
248 return V2_0_0_0
249 case 9, 10:
250 return V2_1_0_0
251 case 11:
252 return V2_3_0_0
253 default:
254 return MaxVersion
255 }
256}
257
258func (r *FetchRequest) AddBlock(topic string, partitionID int32, fetchOffset int64, maxBytes int32) {
259 if r.blocks == nil {
260 r.blocks = make(map[string]map[int32]*fetchRequestBlock)
261 }
262
263 if r.Version >= 7 && r.forgotten == nil {
264 r.forgotten = make(map[string][]int32)
265 }
266
267 if r.blocks[topic] == nil {
268 r.blocks[topic] = make(map[int32]*fetchRequestBlock)
269 }
270
271 tmp := new(fetchRequestBlock)
272 tmp.Version = r.Version
273 tmp.maxBytes = maxBytes
274 tmp.fetchOffset = fetchOffset
275 if r.Version >= 9 {
276 tmp.currentLeaderEpoch = int32(-1)
277 }
278
279 r.blocks[topic][partitionID] = tmp
280}