blob: 63fb6ea811dbe981871fefa5601485917ffc3102 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type ConfigSource int8
9
10func (s ConfigSource) String() string {
11 switch s {
12 case SourceUnknown:
13 return "Unknown"
14 case SourceTopic:
15 return "Topic"
16 case SourceDynamicBroker:
17 return "DynamicBroker"
18 case SourceDynamicDefaultBroker:
19 return "DynamicDefaultBroker"
20 case SourceStaticBroker:
21 return "StaticBroker"
22 case SourceDefault:
23 return "Default"
24 }
25 return fmt.Sprintf("Source Invalid: %d", int(s))
26}
27
28const (
29 SourceUnknown ConfigSource = 0
30 SourceTopic ConfigSource = 1
31 SourceDynamicBroker ConfigSource = 2
32 SourceDynamicDefaultBroker ConfigSource = 3
33 SourceStaticBroker ConfigSource = 4
34 SourceDefault ConfigSource = 5
35)
36
37type DescribeConfigsResponse struct {
38 Version int16
39 ThrottleTime time.Duration
40 Resources []*ResourceResponse
41}
42
43type ResourceResponse struct {
44 ErrorCode int16
45 ErrorMsg string
46 Type ConfigResourceType
47 Name string
48 Configs []*ConfigEntry
49}
50
51type ConfigEntry struct {
52 Name string
53 Value string
54 ReadOnly bool
55 Default bool
56 Source ConfigSource
57 Sensitive bool
58 Synonyms []*ConfigSynonym
59}
60
61type ConfigSynonym struct {
62 ConfigName string
63 ConfigValue string
64 Source ConfigSource
65}
66
67func (r *DescribeConfigsResponse) encode(pe packetEncoder) (err error) {
68 pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
69 if err = pe.putArrayLength(len(r.Resources)); err != nil {
70 return err
71 }
72
73 for _, c := range r.Resources {
74 if err = c.encode(pe, r.Version); err != nil {
75 return err
76 }
77 }
78
79 return nil
80}
81
82func (r *DescribeConfigsResponse) decode(pd packetDecoder, version int16) (err error) {
83 r.Version = version
84 throttleTime, err := pd.getInt32()
85 if err != nil {
86 return err
87 }
88 r.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
89
90 n, err := pd.getArrayLength()
91 if err != nil {
92 return err
93 }
94
95 r.Resources = make([]*ResourceResponse, n)
96 for i := 0; i < n; i++ {
97 rr := &ResourceResponse{}
98 if err := rr.decode(pd, version); err != nil {
99 return err
100 }
101 r.Resources[i] = rr
102 }
103
104 return nil
105}
106
107func (r *DescribeConfigsResponse) key() int16 {
108 return 32
109}
110
111func (r *DescribeConfigsResponse) version() int16 {
112 return r.Version
113}
114
115func (r *DescribeConfigsResponse) requiredVersion() KafkaVersion {
116 switch r.Version {
117 case 1:
118 return V1_0_0_0
119 case 2:
120 return V2_0_0_0
121 default:
122 return V0_11_0_0
123 }
124}
125
126func (r *ResourceResponse) encode(pe packetEncoder, version int16) (err error) {
127 pe.putInt16(r.ErrorCode)
128
129 if err = pe.putString(r.ErrorMsg); err != nil {
130 return err
131 }
132
133 pe.putInt8(int8(r.Type))
134
135 if err = pe.putString(r.Name); err != nil {
136 return err
137 }
138
139 if err = pe.putArrayLength(len(r.Configs)); err != nil {
140 return err
141 }
142
143 for _, c := range r.Configs {
144 if err = c.encode(pe, version); err != nil {
145 return err
146 }
147 }
148 return nil
149}
150
151func (r *ResourceResponse) decode(pd packetDecoder, version int16) (err error) {
152 ec, err := pd.getInt16()
153 if err != nil {
154 return err
155 }
156 r.ErrorCode = ec
157
158 em, err := pd.getString()
159 if err != nil {
160 return err
161 }
162 r.ErrorMsg = em
163
164 t, err := pd.getInt8()
165 if err != nil {
166 return err
167 }
168 r.Type = ConfigResourceType(t)
169
170 name, err := pd.getString()
171 if err != nil {
172 return err
173 }
174 r.Name = name
175
176 n, err := pd.getArrayLength()
177 if err != nil {
178 return err
179 }
180
181 r.Configs = make([]*ConfigEntry, n)
182 for i := 0; i < n; i++ {
183 c := &ConfigEntry{}
184 if err := c.decode(pd, version); err != nil {
185 return err
186 }
187 r.Configs[i] = c
188 }
189 return nil
190}
191
192func (r *ConfigEntry) encode(pe packetEncoder, version int16) (err error) {
193 if err = pe.putString(r.Name); err != nil {
194 return err
195 }
196
197 if err = pe.putString(r.Value); err != nil {
198 return err
199 }
200
201 pe.putBool(r.ReadOnly)
202
203 if version <= 0 {
204 pe.putBool(r.Default)
205 pe.putBool(r.Sensitive)
206 } else {
207 pe.putInt8(int8(r.Source))
208 pe.putBool(r.Sensitive)
209
210 if err := pe.putArrayLength(len(r.Synonyms)); err != nil {
211 return err
212 }
213 for _, c := range r.Synonyms {
214 if err = c.encode(pe, version); err != nil {
215 return err
216 }
217 }
218 }
219
220 return nil
221}
222
223//https://cwiki.apache.org/confluence/display/KAFKA/KIP-226+-+Dynamic+Broker+Configuration
224func (r *ConfigEntry) decode(pd packetDecoder, version int16) (err error) {
225 if version == 0 {
226 r.Source = SourceUnknown
227 }
228 name, err := pd.getString()
229 if err != nil {
230 return err
231 }
232 r.Name = name
233
234 value, err := pd.getString()
235 if err != nil {
236 return err
237 }
238 r.Value = value
239
240 read, err := pd.getBool()
241 if err != nil {
242 return err
243 }
244 r.ReadOnly = read
245
246 if version == 0 {
247 defaultB, err := pd.getBool()
248 if err != nil {
249 return err
250 }
251 r.Default = defaultB
252 } else {
253 source, err := pd.getInt8()
254 if err != nil {
255 return err
256 }
257 r.Source = ConfigSource(source)
258 }
259
260 sensitive, err := pd.getBool()
261 if err != nil {
262 return err
263 }
264 r.Sensitive = sensitive
265
266 if version > 0 {
267 n, err := pd.getArrayLength()
268 if err != nil {
269 return err
270 }
271 r.Synonyms = make([]*ConfigSynonym, n)
272
273 for i := 0; i < n; i++ {
274 s := &ConfigSynonym{}
275 if err := s.decode(pd, version); err != nil {
276 return err
277 }
278 r.Synonyms[i] = s
279 }
280
281 }
282 return nil
283}
284
285func (c *ConfigSynonym) encode(pe packetEncoder, version int16) (err error) {
286 err = pe.putString(c.ConfigName)
287 if err != nil {
288 return err
289 }
290
291 err = pe.putString(c.ConfigValue)
292 if err != nil {
293 return err
294 }
295
296 pe.putInt8(int8(c.Source))
297
298 return nil
299}
300
301func (c *ConfigSynonym) decode(pd packetDecoder, version int16) error {
302 name, err := pd.getString()
303 if err != nil {
304 return nil
305 }
306 c.ConfigName = name
307
308 value, err := pd.getString()
309 if err != nil {
310 return nil
311 }
312 c.ConfigValue = value
313
314 source, err := pd.getInt8()
315 if err != nil {
316 return nil
317 }
318 c.Source = ConfigSource(source)
319 return nil
320}