blob: 19449220f28539242b2707f8b49254c88d1556eb [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type OffsetFetchResponseBlock struct {
4 Offset int64
5 LeaderEpoch int32
6 Metadata string
7 Err KError
8}
9
10func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
khenaidood948f772021-08-11 17:49:24 -040011 isFlexible := version >= 6
12
khenaidooac637102019-01-14 15:44:34 -050013 b.Offset, err = pd.getInt64()
14 if err != nil {
15 return err
16 }
17
18 if version >= 5 {
19 b.LeaderEpoch, err = pd.getInt32()
20 if err != nil {
21 return err
22 }
23 }
24
khenaidood948f772021-08-11 17:49:24 -040025 if isFlexible {
26 b.Metadata, err = pd.getCompactString()
27 } else {
28 b.Metadata, err = pd.getString()
29 }
khenaidooac637102019-01-14 15:44:34 -050030 if err != nil {
31 return err
32 }
33
34 tmp, err := pd.getInt16()
35 if err != nil {
36 return err
37 }
38 b.Err = KError(tmp)
39
khenaidood948f772021-08-11 17:49:24 -040040 if isFlexible {
41 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
42 return err
43 }
44 }
45
khenaidooac637102019-01-14 15:44:34 -050046 return nil
47}
48
49func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
khenaidood948f772021-08-11 17:49:24 -040050 isFlexible := version >= 6
khenaidooac637102019-01-14 15:44:34 -050051 pe.putInt64(b.Offset)
52
53 if version >= 5 {
54 pe.putInt32(b.LeaderEpoch)
55 }
khenaidood948f772021-08-11 17:49:24 -040056 if isFlexible {
57 err = pe.putCompactString(b.Metadata)
58 } else {
59 err = pe.putString(b.Metadata)
60 }
khenaidooac637102019-01-14 15:44:34 -050061 if err != nil {
62 return err
63 }
64
65 pe.putInt16(int16(b.Err))
66
khenaidood948f772021-08-11 17:49:24 -040067 if isFlexible {
68 pe.putEmptyTaggedFieldArray()
69 }
70
khenaidooac637102019-01-14 15:44:34 -050071 return nil
72}
73
74type OffsetFetchResponse struct {
75 Version int16
76 ThrottleTimeMs int32
77 Blocks map[string]map[int32]*OffsetFetchResponseBlock
78 Err KError
79}
80
khenaidood948f772021-08-11 17:49:24 -040081func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
82 isFlexible := r.Version >= 6
83
khenaidooac637102019-01-14 15:44:34 -050084 if r.Version >= 3 {
85 pe.putInt32(r.ThrottleTimeMs)
86 }
khenaidood948f772021-08-11 17:49:24 -040087 if isFlexible {
88 pe.putCompactArrayLength(len(r.Blocks))
89 } else {
90 err = pe.putArrayLength(len(r.Blocks))
91 }
92 if err != nil {
khenaidooac637102019-01-14 15:44:34 -050093 return err
94 }
khenaidood948f772021-08-11 17:49:24 -040095
khenaidooac637102019-01-14 15:44:34 -050096 for topic, partitions := range r.Blocks {
khenaidood948f772021-08-11 17:49:24 -040097 if isFlexible {
98 err = pe.putCompactString(topic)
99 } else {
100 err = pe.putString(topic)
101 }
102 if err != nil {
khenaidooac637102019-01-14 15:44:34 -0500103 return err
104 }
khenaidood948f772021-08-11 17:49:24 -0400105
106 if isFlexible {
107 pe.putCompactArrayLength(len(partitions))
108 } else {
109 err = pe.putArrayLength(len(partitions))
110 }
111 if err != nil {
khenaidooac637102019-01-14 15:44:34 -0500112 return err
113 }
114 for partition, block := range partitions {
115 pe.putInt32(partition)
116 if err := block.encode(pe, r.Version); err != nil {
117 return err
118 }
119 }
khenaidood948f772021-08-11 17:49:24 -0400120 if isFlexible {
121 pe.putEmptyTaggedFieldArray()
122 }
khenaidooac637102019-01-14 15:44:34 -0500123 }
124 if r.Version >= 2 {
125 pe.putInt16(int16(r.Err))
126 }
khenaidood948f772021-08-11 17:49:24 -0400127 if isFlexible {
128 pe.putEmptyTaggedFieldArray()
129 }
khenaidooac637102019-01-14 15:44:34 -0500130 return nil
131}
132
133func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
134 r.Version = version
khenaidood948f772021-08-11 17:49:24 -0400135 isFlexible := version >= 6
khenaidooac637102019-01-14 15:44:34 -0500136
137 if version >= 3 {
138 r.ThrottleTimeMs, err = pd.getInt32()
139 if err != nil {
140 return err
141 }
142 }
143
khenaidood948f772021-08-11 17:49:24 -0400144 var numTopics int
145 if isFlexible {
146 numTopics, err = pd.getCompactArrayLength()
147 } else {
148 numTopics, err = pd.getArrayLength()
149 }
khenaidooac637102019-01-14 15:44:34 -0500150 if err != nil {
151 return err
152 }
153
154 if numTopics > 0 {
155 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
156 for i := 0; i < numTopics; i++ {
khenaidood948f772021-08-11 17:49:24 -0400157 var name string
158 if isFlexible {
159 name, err = pd.getCompactString()
160 } else {
161 name, err = pd.getString()
162 }
khenaidooac637102019-01-14 15:44:34 -0500163 if err != nil {
164 return err
165 }
166
khenaidood948f772021-08-11 17:49:24 -0400167 var numBlocks int
168 if isFlexible {
169 numBlocks, err = pd.getCompactArrayLength()
170 } else {
171 numBlocks, err = pd.getArrayLength()
172 }
khenaidooac637102019-01-14 15:44:34 -0500173 if err != nil {
174 return err
175 }
176
khenaidood948f772021-08-11 17:49:24 -0400177 r.Blocks[name] = nil
178 if numBlocks > 0 {
179 r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
khenaidooac637102019-01-14 15:44:34 -0500180 }
khenaidooac637102019-01-14 15:44:34 -0500181 for j := 0; j < numBlocks; j++ {
182 id, err := pd.getInt32()
183 if err != nil {
184 return err
185 }
186
187 block := new(OffsetFetchResponseBlock)
188 err = block.decode(pd, version)
189 if err != nil {
190 return err
191 }
khenaidood948f772021-08-11 17:49:24 -0400192
khenaidooac637102019-01-14 15:44:34 -0500193 r.Blocks[name][id] = block
194 }
khenaidood948f772021-08-11 17:49:24 -0400195
196 if isFlexible {
197 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
198 return err
199 }
200 }
khenaidooac637102019-01-14 15:44:34 -0500201 }
202 }
203
204 if version >= 2 {
205 kerr, err := pd.getInt16()
206 if err != nil {
207 return err
208 }
209 r.Err = KError(kerr)
210 }
211
khenaidood948f772021-08-11 17:49:24 -0400212 if isFlexible {
213 if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
214 return err
215 }
216 }
217
khenaidooac637102019-01-14 15:44:34 -0500218 return nil
219}
220
221func (r *OffsetFetchResponse) key() int16 {
222 return 9
223}
224
225func (r *OffsetFetchResponse) version() int16 {
226 return r.Version
227}
228
khenaidood948f772021-08-11 17:49:24 -0400229func (r *OffsetFetchResponse) headerVersion() int16 {
230 if r.Version >= 6 {
231 return 1
232 }
233
234 return 0
235}
236
khenaidooac637102019-01-14 15:44:34 -0500237func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
238 switch r.Version {
239 case 1:
240 return V0_8_2_0
241 case 2:
242 return V0_10_2_0
243 case 3:
244 return V0_11_0_0
245 case 4:
246 return V2_0_0_0
247 case 5:
248 return V2_1_0_0
khenaidood948f772021-08-11 17:49:24 -0400249 case 6:
250 return V2_4_0_0
251 case 7:
252 return V2_5_0_0
khenaidooac637102019-01-14 15:44:34 -0500253 default:
254 return MinVersion
255 }
256}
257
258func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
259 if r.Blocks == nil {
260 return nil
261 }
262
263 if r.Blocks[topic] == nil {
264 return nil
265 }
266
267 return r.Blocks[topic][partition]
268}
269
270func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
271 if r.Blocks == nil {
272 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
273 }
274 partitions := r.Blocks[topic]
275 if partitions == nil {
276 partitions = make(map[int32]*OffsetFetchResponseBlock)
277 r.Blocks[topic] = partitions
278 }
279 partitions[partition] = block
280}