blob: 0bb8702cc37df67b33a0b9807790e33b8ca2f462 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type PartitionMetadata struct {
4 Err KError
5 ID int32
6 Leader int32
7 Replicas []int32
8 Isr []int32
9 OfflineReplicas []int32
10}
11
12func (pm *PartitionMetadata) decode(pd packetDecoder, version int16) (err error) {
13 tmp, err := pd.getInt16()
14 if err != nil {
15 return err
16 }
17 pm.Err = KError(tmp)
18
19 pm.ID, err = pd.getInt32()
20 if err != nil {
21 return err
22 }
23
24 pm.Leader, err = pd.getInt32()
25 if err != nil {
26 return err
27 }
28
29 pm.Replicas, err = pd.getInt32Array()
30 if err != nil {
31 return err
32 }
33
34 pm.Isr, err = pd.getInt32Array()
35 if err != nil {
36 return err
37 }
38
39 if version >= 5 {
40 pm.OfflineReplicas, err = pd.getInt32Array()
41 if err != nil {
42 return err
43 }
44 }
45
46 return nil
47}
48
49func (pm *PartitionMetadata) encode(pe packetEncoder, version int16) (err error) {
50 pe.putInt16(int16(pm.Err))
51 pe.putInt32(pm.ID)
52 pe.putInt32(pm.Leader)
53
54 err = pe.putInt32Array(pm.Replicas)
55 if err != nil {
56 return err
57 }
58
59 err = pe.putInt32Array(pm.Isr)
60 if err != nil {
61 return err
62 }
63
64 if version >= 5 {
65 err = pe.putInt32Array(pm.OfflineReplicas)
66 if err != nil {
67 return err
68 }
69 }
70
71 return nil
72}
73
74type TopicMetadata struct {
75 Err KError
76 Name string
77 IsInternal bool // Only valid for Version >= 1
78 Partitions []*PartitionMetadata
79}
80
81func (tm *TopicMetadata) decode(pd packetDecoder, version int16) (err error) {
82 tmp, err := pd.getInt16()
83 if err != nil {
84 return err
85 }
86 tm.Err = KError(tmp)
87
88 tm.Name, err = pd.getString()
89 if err != nil {
90 return err
91 }
92
93 if version >= 1 {
94 tm.IsInternal, err = pd.getBool()
95 if err != nil {
96 return err
97 }
98 }
99
100 n, err := pd.getArrayLength()
101 if err != nil {
102 return err
103 }
104 tm.Partitions = make([]*PartitionMetadata, n)
105 for i := 0; i < n; i++ {
106 tm.Partitions[i] = new(PartitionMetadata)
107 err = tm.Partitions[i].decode(pd, version)
108 if err != nil {
109 return err
110 }
111 }
112
113 return nil
114}
115
116func (tm *TopicMetadata) encode(pe packetEncoder, version int16) (err error) {
117 pe.putInt16(int16(tm.Err))
118
119 err = pe.putString(tm.Name)
120 if err != nil {
121 return err
122 }
123
124 if version >= 1 {
125 pe.putBool(tm.IsInternal)
126 }
127
128 err = pe.putArrayLength(len(tm.Partitions))
129 if err != nil {
130 return err
131 }
132
133 for _, pm := range tm.Partitions {
134 err = pm.encode(pe, version)
135 if err != nil {
136 return err
137 }
138 }
139
140 return nil
141}
142
143type MetadataResponse struct {
144 Version int16
145 ThrottleTimeMs int32
146 Brokers []*Broker
147 ClusterID *string
148 ControllerID int32
149 Topics []*TopicMetadata
150}
151
152func (r *MetadataResponse) decode(pd packetDecoder, version int16) (err error) {
153 r.Version = version
154
155 if version >= 3 {
156 r.ThrottleTimeMs, err = pd.getInt32()
157 if err != nil {
158 return err
159 }
160 }
161
162 n, err := pd.getArrayLength()
163 if err != nil {
164 return err
165 }
166
167 r.Brokers = make([]*Broker, n)
168 for i := 0; i < n; i++ {
169 r.Brokers[i] = new(Broker)
170 err = r.Brokers[i].decode(pd, version)
171 if err != nil {
172 return err
173 }
174 }
175
176 if version >= 2 {
177 r.ClusterID, err = pd.getNullableString()
178 if err != nil {
179 return err
180 }
181 }
182
183 if version >= 1 {
184 r.ControllerID, err = pd.getInt32()
185 if err != nil {
186 return err
187 }
188 } else {
189 r.ControllerID = -1
190 }
191
192 n, err = pd.getArrayLength()
193 if err != nil {
194 return err
195 }
196
197 r.Topics = make([]*TopicMetadata, n)
198 for i := 0; i < n; i++ {
199 r.Topics[i] = new(TopicMetadata)
200 err = r.Topics[i].decode(pd, version)
201 if err != nil {
202 return err
203 }
204 }
205
206 return nil
207}
208
209func (r *MetadataResponse) encode(pe packetEncoder) error {
210 if r.Version >= 3 {
211 pe.putInt32(r.ThrottleTimeMs)
212 }
213
214 err := pe.putArrayLength(len(r.Brokers))
215 if err != nil {
216 return err
217 }
218 for _, broker := range r.Brokers {
219 err = broker.encode(pe, r.Version)
220 if err != nil {
221 return err
222 }
223 }
224
225 if r.Version >= 2 {
226 err := pe.putNullableString(r.ClusterID)
227 if err != nil {
228 return err
229 }
230 }
231
232 if r.Version >= 1 {
233 pe.putInt32(r.ControllerID)
234 }
235
236 err = pe.putArrayLength(len(r.Topics))
237 if err != nil {
238 return err
239 }
240 for _, tm := range r.Topics {
241 err = tm.encode(pe, r.Version)
242 if err != nil {
243 return err
244 }
245 }
246
247 return nil
248}
249
250func (r *MetadataResponse) key() int16 {
251 return 3
252}
253
254func (r *MetadataResponse) version() int16 {
255 return r.Version
256}
257
khenaidood948f772021-08-11 17:49:24 -0400258func (r *MetadataResponse) headerVersion() int16 {
259 return 0
260}
261
khenaidooac637102019-01-14 15:44:34 -0500262func (r *MetadataResponse) requiredVersion() KafkaVersion {
263 switch r.Version {
264 case 1:
265 return V0_10_0_0
266 case 2:
267 return V0_10_1_0
268 case 3, 4:
269 return V0_11_0_0
270 case 5:
271 return V1_0_0_0
272 default:
273 return MinVersion
274 }
275}
276
277// testing API
278
279func (r *MetadataResponse) AddBroker(addr string, id int32) {
280 r.Brokers = append(r.Brokers, &Broker{id: id, addr: addr})
281}
282
283func (r *MetadataResponse) AddTopic(topic string, err KError) *TopicMetadata {
284 var tmatch *TopicMetadata
285
286 for _, tm := range r.Topics {
287 if tm.Name == topic {
288 tmatch = tm
289 goto foundTopic
290 }
291 }
292
293 tmatch = new(TopicMetadata)
294 tmatch.Name = topic
295 r.Topics = append(r.Topics, tmatch)
296
297foundTopic:
298
299 tmatch.Err = err
300 return tmatch
301}
302
Scott Baker8461e152019-10-01 14:44:30 -0700303func (r *MetadataResponse) AddTopicPartition(topic string, partition, brokerID int32, replicas, isr []int32, offline []int32, err KError) {
khenaidooac637102019-01-14 15:44:34 -0500304 tmatch := r.AddTopic(topic, ErrNoError)
305 var pmatch *PartitionMetadata
306
307 for _, pm := range tmatch.Partitions {
308 if pm.ID == partition {
309 pmatch = pm
310 goto foundPartition
311 }
312 }
313
314 pmatch = new(PartitionMetadata)
315 pmatch.ID = partition
316 tmatch.Partitions = append(tmatch.Partitions, pmatch)
317
318foundPartition:
319
320 pmatch.Leader = brokerID
321 pmatch.Replicas = replicas
322 pmatch.Isr = isr
Scott Baker8461e152019-10-01 14:44:30 -0700323 pmatch.OfflineReplicas = offline
khenaidooac637102019-01-14 15:44:34 -0500324 pmatch.Err = err
khenaidooac637102019-01-14 15:44:34 -0500325}