blob: 928f5a52ab2d7ec5d8c66506fc5ebe998ef16630 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type ConfigSource int8
9
10func (s ConfigSource) String() string {
11 switch s {
12 case SourceUnknown:
13 return "Unknown"
14 case SourceTopic:
15 return "Topic"
16 case SourceDynamicBroker:
17 return "DynamicBroker"
18 case SourceDynamicDefaultBroker:
19 return "DynamicDefaultBroker"
20 case SourceStaticBroker:
21 return "StaticBroker"
22 case SourceDefault:
23 return "Default"
24 }
25 return fmt.Sprintf("Source Invalid: %d", int(s))
26}
27
28const (
Scott Baker8461e152019-10-01 14:44:30 -070029 SourceUnknown ConfigSource = iota
30 SourceTopic
31 SourceDynamicBroker
32 SourceDynamicDefaultBroker
33 SourceStaticBroker
34 SourceDefault
khenaidooac637102019-01-14 15:44:34 -050035)
36
37type DescribeConfigsResponse struct {
38 Version int16
39 ThrottleTime time.Duration
40 Resources []*ResourceResponse
41}
42
43type ResourceResponse struct {
44 ErrorCode int16
45 ErrorMsg string
46 Type ConfigResourceType
47 Name string
48 Configs []*ConfigEntry
49}
50
51type ConfigEntry struct {
52 Name string
53 Value string
54 ReadOnly bool
55 Default bool
56 Source ConfigSource
57 Sensitive bool
58 Synonyms []*ConfigSynonym
59}
60
61type ConfigSynonym struct {
62 ConfigName string
63 ConfigValue string
64 Source ConfigSource
65}
66
67func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
68 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
69 if err = pe.putArrayLength(len(r.Resources)); err != nil {
70 return err
71 }
72
73 for _, c := range r.Resources {
74 if err = c.encode(pe, r.Version); err != nil {
75 return err
76 }
77 }
78
79 return nil
80}
81
82func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
83 r.Version = version
84 throttleTime, err := pd.getInt32()
85 if err != nil {
86 return err
87 }
88 r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
89
90 n, err := pd.getArrayLength()
91 if err != nil {
92 return err
93 }
94
95 r.Resources = make([]*ResourceResponse, n)
96 for i := 0; i < n; i++ {
97 rr := &ResourceResponse{}
98 if err := rr.decode(pd, version); err != nil {
99 return err
100 }
101 r.Resources[i] = rr
102 }
103
104 return nil
105}
106
107func (r *DescribeConfigsResponse) key() int16 {
108 return 32
109}
110
111func (r *DescribeConfigsResponse) version() int16 {
112 return r.Version
113}
114
khenaidood948f772021-08-11 17:49:24 -0400115func (r *DescribeConfigsResponse) headerVersion() int16 {
116 return 0
117}
118
khenaidooac637102019-01-14 15:44:34 -0500119func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
120 switch r.Version {
121 case 1:
122 return V1_0_0_0
123 case 2:
124 return V2_0_0_0
125 default:
126 return V0_11_0_0
127 }
128}
129
130func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
131 pe.putInt16(r.ErrorCode)
132
133 if err = pe.putString(r.ErrorMsg); err != nil {
134 return err
135 }
136
137 pe.putInt8(int8(r.Type))
138
139 if err = pe.putString(r.Name); err != nil {
140 return err
141 }
142
143 if err = pe.putArrayLength(len(r.Configs)); err != nil {
144 return err
145 }
146
147 for _, c := range r.Configs {
148 if err = c.encode(pe, version); err != nil {
149 return err
150 }
151 }
152 return nil
153}
154
155func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
156 ec, err := pd.getInt16()
157 if err != nil {
158 return err
159 }
160 r.ErrorCode = ec
161
162 em, err := pd.getString()
163 if err != nil {
164 return err
165 }
166 r.ErrorMsg = em
167
168 t, err := pd.getInt8()
169 if err != nil {
170 return err
171 }
172 r.Type = ConfigResourceType(t)
173
174 name, err := pd.getString()
175 if err != nil {
176 return err
177 }
178 r.Name = name
179
180 n, err := pd.getArrayLength()
181 if err != nil {
182 return err
183 }
184
185 r.Configs = make([]*ConfigEntry, n)
186 for i := 0; i < n; i++ {
187 c := &ConfigEntry{}
188 if err := c.decode(pd, version); err != nil {
189 return err
190 }
191 r.Configs[i] = c
192 }
193 return nil
194}
195
196func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
197 if err = pe.putString(r.Name); err != nil {
198 return err
199 }
200
201 if err = pe.putString(r.Value); err != nil {
202 return err
203 }
204
205 pe.putBool(r.ReadOnly)
206
207 if version <= 0 {
208 pe.putBool(r.Default)
209 pe.putBool(r.Sensitive)
210 } else {
211 pe.putInt8(int8(r.Source))
212 pe.putBool(r.Sensitive)
213
214 if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
215 return err
216 }
217 for _, c := range r.Synonyms {
218 if err = c.encode(pe, version); err != nil {
219 return err
220 }
221 }
222 }
223
224 return nil
225}
226
khenaidood948f772021-08-11 17:49:24 -0400227// https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
khenaidooac637102019-01-14 15:44:34 -0500228func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
229 if version == 0 {
230 r.Source = SourceUnknown
231 }
232 name, err := pd.getString()
233 if err != nil {
234 return err
235 }
236 r.Name = name
237
238 value, err := pd.getString()
239 if err != nil {
240 return err
241 }
242 r.Value = value
243
244 read, err := pd.getBool()
245 if err != nil {
246 return err
247 }
248 r.ReadOnly = read
249
250 if version == 0 {
251 defaultB, err := pd.getBool()
252 if err != nil {
253 return err
254 }
255 r.Default = defaultB
khenaidood948f772021-08-11 17:49:24 -0400256 if defaultB {
257 r.Source = SourceDefault
258 }
khenaidooac637102019-01-14 15:44:34 -0500259 } else {
260 source, err := pd.getInt8()
261 if err != nil {
262 return err
263 }
264 r.Source = ConfigSource(source)
khenaidood948f772021-08-11 17:49:24 -0400265 r.Default = r.Source == SourceDefault
khenaidooac637102019-01-14 15:44:34 -0500266 }
267
268 sensitive, err := pd.getBool()
269 if err != nil {
270 return err
271 }
272 r.Sensitive = sensitive
273
274 if version > 0 {
275 n, err := pd.getArrayLength()
276 if err != nil {
277 return err
278 }
279 r.Synonyms = make([]*ConfigSynonym, n)
280
281 for i := 0; i < n; i++ {
282 s := &ConfigSynonym{}
283 if err := s.decode(pd, version); err != nil {
284 return err
285 }
286 r.Synonyms[i] = s
287 }
khenaidooac637102019-01-14 15:44:34 -0500288 }
289 return nil
290}
291
292func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
293 err = pe.putString(c.ConfigName)
294 if err != nil {
295 return err
296 }
297
298 err = pe.putString(c.ConfigValue)
299 if err != nil {
300 return err
301 }
302
303 pe.putInt8(int8(c.Source))
304
305 return nil
306}
307
308func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
309 name, err := pd.getString()
310 if err != nil {
311 return nil
312 }
313 c.ConfigName = name
314
315 value, err := pd.getString()
316 if err != nil {
317 return nil
318 }
319 c.ConfigValue = value
320
321 source, err := pd.getInt8()
322 if err != nil {
323 return nil
324 }
325 c.Source = ConfigSource(source)
326 return nil
327}