blob: 17dc4289a3a9bb5a9acdd3c721f3105155ee2985 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3type MetadataRequest struct {
4 Version int16
5 Topics []string
6 AllowAutoTopicCreation bool
7}
8
9func (r *MetadataRequest) encode(pe packetEncoder) error {
10 if r.Version < 0 || r.Version > 5 {
11 return PacketEncodingError{"invalid or unsupported MetadataRequest version field"}
12 }
13 if r.Version == 0 || len(r.Topics) > 0 {
14 err := pe.putArrayLength(len(r.Topics))
15 if err != nil {
16 return err
17 }
18
19 for i := range r.Topics {
20 err = pe.putString(r.Topics[i])
21 if err != nil {
22 return err
23 }
24 }
25 } else {
26 pe.putInt32(-1)
27 }
28 if r.Version > 3 {
29 pe.putBool(r.AllowAutoTopicCreation)
30 }
31 return nil
32}
33
34func (r *MetadataRequest) decode(pd packetDecoder, version int16) error {
35 r.Version = version
36 size, err := pd.getInt32()
37 if err != nil {
38 return err
39 }
40 if size < 0 {
41 return nil
42 } else {
43 topicCount := size
44 if topicCount == 0 {
45 return nil
46 }
47
48 r.Topics = make([]string, topicCount)
49 for i := range r.Topics {
50 topic, err := pd.getString()
51 if err != nil {
52 return err
53 }
54 r.Topics[i] = topic
55 }
56 }
57 if r.Version > 3 {
58 autoCreation, err := pd.getBool()
59 if err != nil {
60 return err
61 }
62 r.AllowAutoTopicCreation = autoCreation
63 }
64 return nil
65}
66
67func (r *MetadataRequest) key() int16 {
68 return 3
69}
70
71func (r *MetadataRequest) version() int16 {
72 return r.Version
73}
74
75func (r *MetadataRequest) requiredVersion() KafkaVersion {
76 switch r.Version {
77 case 1:
78 return V0_10_0_0
79 case 2:
80 return V0_10_1_0
81 case 3, 4:
82 return V0_11_0_0
83 case 5:
84 return V1_0_0_0
85 default:
86 return MinVersion
87 }
88}