blob: 46fb0440249c58c3f5a758eb48616c6a6f5c68ec [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3import "time"
4
5type CreatePartitionsRequest struct {
6 TopicPartitions map[string]*TopicPartition
7 Timeout time.Duration
8 ValidateOnly bool
9}
10
11func (c *CreatePartitionsRequest) encode(pe packetEncoder) error {
12 if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil {
13 return err
14 }
15
16 for topic, partition := range c.TopicPartitions {
17 if err := pe.putString(topic); err != nil {
18 return err
19 }
20 if err := partition.encode(pe); err != nil {
21 return err
22 }
23 }
24
25 pe.putInt32(int32(c.Timeout / time.Millisecond))
26
27 pe.putBool(c.ValidateOnly)
28
29 return nil
30}
31
32func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) {
33 n, err := pd.getArrayLength()
34 if err != nil {
35 return err
36 }
37 c.TopicPartitions = make(map[string]*TopicPartition, n)
38 for i := 0; i < n; i++ {
39 topic, err := pd.getString()
40 if err != nil {
41 return err
42 }
43 c.TopicPartitions[topic] = new(TopicPartition)
44 if err := c.TopicPartitions[topic].decode(pd, version); err != nil {
45 return err
46 }
47 }
48
49 timeout, err := pd.getInt32()
50 if err != nil {
51 return err
52 }
53 c.Timeout = time.Duration(timeout) * time.Millisecond
54
55 if c.ValidateOnly, err = pd.getBool(); err != nil {
56 return err
57 }
58
59 return nil
60}
61
62func (r *CreatePartitionsRequest) key() int16 {
63 return 37
64}
65
66func (r *CreatePartitionsRequest) version() int16 {
67 return 0
68}
69
khenaidoo7d3c5582021-08-11 18:09:44 -040070func (r *CreatePartitionsRequest) headerVersion() int16 {
71 return 1
72}
73
Holger Hildebrandtfa074992020-03-27 15:42:06 +000074func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
75 return V1_0_0_0
76}
77
78type TopicPartition struct {
79 Count int32
80 Assignment [][]int32
81}
82
83func (t *TopicPartition) encode(pe packetEncoder) error {
84 pe.putInt32(t.Count)
85
86 if len(t.Assignment) == 0 {
87 pe.putInt32(-1)
88 return nil
89 }
90
91 if err := pe.putArrayLength(len(t.Assignment)); err != nil {
92 return err
93 }
94
95 for _, assign := range t.Assignment {
96 if err := pe.putInt32Array(assign); err != nil {
97 return err
98 }
99 }
100
101 return nil
102}
103
104func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) {
105 if t.Count, err = pd.getInt32(); err != nil {
106 return err
107 }
108
109 n, err := pd.getInt32()
110 if err != nil {
111 return err
112 }
113 if n <= 0 {
114 return nil
115 }
116 t.Assignment = make([][]int32, n)
117
118 for i := 0; i < int(n); i++ {
119 if t.Assignment[i], err = pd.getInt32Array(); err != nil {
120 return err
121 }
122 }
123
124 return nil
125}