blob: 287acd069b6ff42a5fb82ca08d50f2a4ee453253 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "time"
5)
6
7type CreateTopicsRequest struct {
8 Version int16
9
10 TopicDetails map[string]*TopicDetail
11 Timeout time.Duration
12 ValidateOnly bool
13}
14
15func (c *CreateTopicsRequest) encode(pe packetEncoder) error {
16 if err := pe.putArrayLength(len(c.TopicDetails)); err != nil {
17 return err
18 }
19 for topic, detail := range c.TopicDetails {
20 if err := pe.putString(topic); err != nil {
21 return err
22 }
23 if err := detail.encode(pe); err != nil {
24 return err
25 }
26 }
27
28 pe.putInt32(int32(c.Timeout / time.Millisecond))
29
30 if c.Version >= 1 {
31 pe.putBool(c.ValidateOnly)
32 }
33
34 return nil
35}
36
37func (c *CreateTopicsRequest) decode(pd packetDecoder, version int16) (err error) {
38 n, err := pd.getArrayLength()
39 if err != nil {
40 return err
41 }
42
43 c.TopicDetails = make(map[string]*TopicDetail, n)
44
45 for i := 0; i < n; i++ {
46 topic, err := pd.getString()
47 if err != nil {
48 return err
49 }
50 c.TopicDetails[topic] = new(TopicDetail)
51 if err = c.TopicDetails[topic].decode(pd, version); err != nil {
52 return err
53 }
54 }
55
56 timeout, err := pd.getInt32()
57 if err != nil {
58 return err
59 }
60 c.Timeout = time.Duration(timeout) * time.Millisecond
61
62 if version >= 1 {
63 c.ValidateOnly, err = pd.getBool()
64 if err != nil {
65 return err
66 }
67
68 c.Version = version
69 }
70
71 return nil
72}
73
74func (c *CreateTopicsRequest) key() int16 {
75 return 19
76}
77
78func (c *CreateTopicsRequest) version() int16 {
79 return c.Version
80}
81
khenaidood948f772021-08-11 17:49:24 -040082func (r *CreateTopicsRequest) headerVersion() int16 {
83 return 1
84}
85
khenaidooac637102019-01-14 15:44:34 -050086func (c *CreateTopicsRequest) requiredVersion() KafkaVersion {
87 switch c.Version {
88 case 2:
89 return V1_0_0_0
90 case 1:
91 return V0_11_0_0
92 default:
93 return V0_10_1_0
94 }
95}
96
97type TopicDetail struct {
98 NumPartitions int32
99 ReplicationFactor int16
100 ReplicaAssignment map[int32][]int32
101 ConfigEntries map[string]*string
102}
103
104func (t *TopicDetail) encode(pe packetEncoder) error {
105 pe.putInt32(t.NumPartitions)
106 pe.putInt16(t.ReplicationFactor)
107
108 if err := pe.putArrayLength(len(t.ReplicaAssignment)); err != nil {
109 return err
110 }
111 for partition, assignment := range t.ReplicaAssignment {
112 pe.putInt32(partition)
113 if err := pe.putInt32Array(assignment); err != nil {
114 return err
115 }
116 }
117
118 if err := pe.putArrayLength(len(t.ConfigEntries)); err != nil {
119 return err
120 }
121 for configKey, configValue := range t.ConfigEntries {
122 if err := pe.putString(configKey); err != nil {
123 return err
124 }
125 if err := pe.putNullableString(configValue); err != nil {
126 return err
127 }
128 }
129
130 return nil
131}
132
133func (t *TopicDetail) decode(pd packetDecoder, version int16) (err error) {
134 if t.NumPartitions, err = pd.getInt32(); err != nil {
135 return err
136 }
137 if t.ReplicationFactor, err = pd.getInt16(); err != nil {
138 return err
139 }
140
141 n, err := pd.getArrayLength()
142 if err != nil {
143 return err
144 }
145
146 if n > 0 {
147 t.ReplicaAssignment = make(map[int32][]int32, n)
148 for i := 0; i < n; i++ {
149 replica, err := pd.getInt32()
150 if err != nil {
151 return err
152 }
153 if t.ReplicaAssignment[replica], err = pd.getInt32Array(); err != nil {
154 return err
155 }
156 }
157 }
158
159 n, err = pd.getArrayLength()
160 if err != nil {
161 return err
162 }
163
164 if n > 0 {
165 t.ConfigEntries = make(map[string]*string, n)
166 for i := 0; i < n; i++ {
167 configKey, err := pd.getString()
168 if err != nil {
169 return err
170 }
171 if t.ConfigEntries[configKey], err = pd.getNullableString(); err != nil {
172 return err
173 }
174 }
175 }
176
177 return nil
178}