blob: 709c0a44e71330263ea237b67b6e00ab544fcf69 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "time"
5)
6
7type CreateTopicsRequest struct {
8 Version int16
9
10 TopicDetails map[string]*TopicDetail
11 Timeout time.Duration
12 ValidateOnly bool
13}
14
15func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
16 if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
17 return err
18 }
19 for topic, detail := range c.TopicDetails {
20 if err := pe.putString(topic); err != nil {
21 return err
22 }
23 if err := detail.encode(pe); err != nil {
24 return err
25 }
26 }
27
28 pe.putInt32(int32(c.Timeout / time.Millisecond))
29
30 if c.Version >= 1 {
31 pe.putBool(c.ValidateOnly)
32 }
33
34 return nil
35}
36
37func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
38 n, err := pd.getArrayLength()
39 if err != nil {
40 return err
41 }
42
43 c.TopicDetails = make(map[string]*TopicDetail, n)
44
45 for i := 0; i < n; i++ {
46 topic, err := pd.getString()
47 if err != nil {
48 return err
49 }
50 c.TopicDetails[topic] = new(TopicDetail)
51 if err = c.TopicDetails[topic].decode(pd, version); err != nil {
52 return err
53 }
54 }
55
56 timeout, err := pd.getInt32()
57 if err != nil {
58 return err
59 }
60 c.Timeout = time.Duration(timeout) * time.Millisecond
61
62 if version >= 1 {
63 c.ValidateOnly, err = pd.getBool()
64 if err != nil {
65 return err
66 }
67
68 c.Version = version
69 }
70
71 return nil
72}
73
74func (c *CreateTopicsRequest) key() int16 {
75 return 19
76}
77
78func (c *CreateTopicsRequest) version() int16 {
79 return c.Version
80}
81
82func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
83 switch c.Version {
84 case 2:
85 return V1_0_0_0
86 case 1:
87 return V0_11_0_0
88 default:
89 return V0_10_1_0
90 }
91}
92
93type TopicDetail struct {
94 NumPartitions int32
95 ReplicationFactor int16
96 ReplicaAssignment map[int32][]int32
97 ConfigEntries map[string]*string
98}
99
100func (t *TopicDetail) encode(pe packetEncoder) error {
101 pe.putInt32(t.NumPartitions)
102 pe.putInt16(t.ReplicationFactor)
103
104 if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
105 return err
106 }
107 for partition, assignment := range t.ReplicaAssignment {
108 pe.putInt32(partition)
109 if err := pe.putInt32Array(assignment); err != nil {
110 return err
111 }
112 }
113
114 if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
115 return err
116 }
117 for configKey, configValue := range t.ConfigEntries {
118 if err := pe.putString(configKey); err != nil {
119 return err
120 }
121 if err := pe.putNullableString(configValue); err != nil {
122 return err
123 }
124 }
125
126 return nil
127}
128
129func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
130 if t.NumPartitions, err = pd.getInt32(); err != nil {
131 return err
132 }
133 if t.ReplicationFactor, err = pd.getInt16(); err != nil {
134 return err
135 }
136
137 n, err := pd.getArrayLength()
138 if err != nil {
139 return err
140 }
141
142 if n > 0 {
143 t.ReplicaAssignment = make(map[int32][]int32, n)
144 for i := 0; i < n; i++ {
145 replica, err := pd.getInt32()
146 if err != nil {
147 return err
148 }
149 if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
150 return err
151 }
152 }
153 }
154
155 n, err = pd.getArrayLength()
156 if err != nil {
157 return err
158 }
159
160 if n > 0 {
161 t.ConfigEntries = make(map[string]*string, n)
162 for i := 0; i < n; i++ {
163 configKey, err := pd.getString()
164 if err != nil {
165 return err
166 }
167 if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
168 return err
169 }
170 }
171 }
172
173 return nil
174}