blob: 19449220f28539242b2707f8b49254c88d1556eb [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3type OffsetFetchResponseBlock struct {
4 Offset int64
5 LeaderEpoch int32
6 Metadata string
7 Err KError
8}
9
10func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000011 isFlexible := version >= 6
12
Scott Bakered4efab2020-01-13 19:12:25 -080013 b.Offset, err = pd.getInt64()
14 if err != nil {
15 return err
16 }
17
18 if version >= 5 {
19 b.LeaderEpoch, err = pd.getInt32()
20 if err != nil {
21 return err
22 }
23 }
24
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000025 if isFlexible {
26 b.Metadata, err = pd.getCompactString()
27 } else {
28 b.Metadata, err = pd.getString()
29 }
Scott Bakered4efab2020-01-13 19:12:25 -080030 if err != nil {
31 return err
32 }
33
34 tmp, err := pd.getInt16()
35 if err != nil {
36 return err
37 }
38 b.Err = KError(tmp)
39
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000040 if isFlexible {
41 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
42 return err
43 }
44 }
45
Scott Bakered4efab2020-01-13 19:12:25 -080046 return nil
47}
48
49func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000050 isFlexible := version >= 6
Scott Bakered4efab2020-01-13 19:12:25 -080051 pe.putInt64(b.Offset)
52
53 if version >= 5 {
54 pe.putInt32(b.LeaderEpoch)
55 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000056 if isFlexible {
57 err = pe.putCompactString(b.Metadata)
58 } else {
59 err = pe.putString(b.Metadata)
60 }
Scott Bakered4efab2020-01-13 19:12:25 -080061 if err != nil {
62 return err
63 }
64
65 pe.putInt16(int16(b.Err))
66
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000067 if isFlexible {
68 pe.putEmptyTaggedFieldArray()
69 }
70
Scott Bakered4efab2020-01-13 19:12:25 -080071 return nil
72}
73
74type OffsetFetchResponse struct {
75 Version int16
76 ThrottleTimeMs int32
77 Blocks map[string]map[int32]*OffsetFetchResponseBlock
78 Err KError
79}
80
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000081func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
82 isFlexible := r.Version >= 6
83
Scott Bakered4efab2020-01-13 19:12:25 -080084 if r.Version >= 3 {
85 pe.putInt32(r.ThrottleTimeMs)
86 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000087 if isFlexible {
88 pe.putCompactArrayLength(len(r.Blocks))
89 } else {
90 err = pe.putArrayLength(len(r.Blocks))
91 }
92 if err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -080093 return err
94 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000095
Scott Bakered4efab2020-01-13 19:12:25 -080096 for topic, partitions := range r.Blocks {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000097 if isFlexible {
98 err = pe.putCompactString(topic)
99 } else {
100 err = pe.putString(topic)
101 }
102 if err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800103 return err
104 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000105
106 if isFlexible {
107 pe.putCompactArrayLength(len(partitions))
108 } else {
109 err = pe.putArrayLength(len(partitions))
110 }
111 if err != nil {
Scott Bakered4efab2020-01-13 19:12:25 -0800112 return err
113 }
114 for partition, block := range partitions {
115 pe.putInt32(partition)
116 if err := block.encode(pe, r.Version); err != nil {
117 return err
118 }
119 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000120 if isFlexible {
121 pe.putEmptyTaggedFieldArray()
122 }
Scott Bakered4efab2020-01-13 19:12:25 -0800123 }
124 if r.Version >= 2 {
125 pe.putInt16(int16(r.Err))
126 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000127 if isFlexible {
128 pe.putEmptyTaggedFieldArray()
129 }
Scott Bakered4efab2020-01-13 19:12:25 -0800130 return nil
131}
132
133func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
134 r.Version = version
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000135 isFlexible := version >= 6
Scott Bakered4efab2020-01-13 19:12:25 -0800136
137 if version >= 3 {
138 r.ThrottleTimeMs, err = pd.getInt32()
139 if err != nil {
140 return err
141 }
142 }
143
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000144 var numTopics int
145 if isFlexible {
146 numTopics, err = pd.getCompactArrayLength()
147 } else {
148 numTopics, err = pd.getArrayLength()
149 }
Scott Bakered4efab2020-01-13 19:12:25 -0800150 if err != nil {
151 return err
152 }
153
154 if numTopics > 0 {
155 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
156 for i := 0; i < numTopics; i++ {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000157 var name string
158 if isFlexible {
159 name, err = pd.getCompactString()
160 } else {
161 name, err = pd.getString()
162 }
Scott Bakered4efab2020-01-13 19:12:25 -0800163 if err != nil {
164 return err
165 }
166
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000167 var numBlocks int
168 if isFlexible {
169 numBlocks, err = pd.getCompactArrayLength()
170 } else {
171 numBlocks, err = pd.getArrayLength()
172 }
Scott Bakered4efab2020-01-13 19:12:25 -0800173 if err != nil {
174 return err
175 }
176
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000177 r.Blocks[name] = nil
178 if numBlocks > 0 {
179 r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
Scott Bakered4efab2020-01-13 19:12:25 -0800180 }
Scott Bakered4efab2020-01-13 19:12:25 -0800181 for j := 0; j < numBlocks; j++ {
182 id, err := pd.getInt32()
183 if err != nil {
184 return err
185 }
186
187 block := new(OffsetFetchResponseBlock)
188 err = block.decode(pd, version)
189 if err != nil {
190 return err
191 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000192
Scott Bakered4efab2020-01-13 19:12:25 -0800193 r.Blocks[name][id] = block
194 }
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000195
196 if isFlexible {
197 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
198 return err
199 }
200 }
Scott Bakered4efab2020-01-13 19:12:25 -0800201 }
202 }
203
204 if version >= 2 {
205 kerr, err := pd.getInt16()
206 if err != nil {
207 return err
208 }
209 r.Err = KError(kerr)
210 }
211
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000212 if isFlexible {
213 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
214 return err
215 }
216 }
217
Scott Bakered4efab2020-01-13 19:12:25 -0800218 return nil
219}
220
221func (r *OffsetFetchResponse) key() int16 {
222 return 9
223}
224
225func (r *OffsetFetchResponse) version() int16 {
226 return r.Version
227}
228
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000229func (r *OffsetFetchResponse) headerVersion() int16 {
230 if r.Version >= 6 {
231 return 1
232 }
233
234 return 0
235}
236
Scott Bakered4efab2020-01-13 19:12:25 -0800237func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
238 switch r.Version {
239 case 1:
240 return V0_8_2_0
241 case 2:
242 return V0_10_2_0
243 case 3:
244 return V0_11_0_0
245 case 4:
246 return V2_0_0_0
247 case 5:
248 return V2_1_0_0
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000249 case 6:
250 return V2_4_0_0
251 case 7:
252 return V2_5_0_0
Scott Bakered4efab2020-01-13 19:12:25 -0800253 default:
254 return MinVersion
255 }
256}
257
258func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
259 if r.Blocks == nil {
260 return nil
261 }
262
263 if r.Blocks[topic] == nil {
264 return nil
265 }
266
267 return r.Blocks[topic][partition]
268}
269
270func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
271 if r.Blocks == nil {
272 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
273 }
274 partitions := r.Blocks[topic]
275 if partitions == nil {
276 partitions = make(map[int32]*OffsetFetchResponseBlock)
277 r.Blocks[topic] = partitions
278 }
279 partitions[partition] = block
280}