blob: af321e99466e5a3b955f4878d9ca982ab290cd32 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import "time"
4
5type CreatePartitionsRequest struct {
6 TopicPartitions map[string]*TopicPartition
7 Timeout time.Duration
8 ValidateOnly bool
9}
10
11func (c *CreatePartitionsRequest) encode(pe packetEncoder) error {
12 if err := pe.putArrayLength(len(c.TopicPartitions)); err != nil {
13 return err
14 }
15
16 for topic, partition := range c.TopicPartitions {
17 if err := pe.putString(topic); err != nil {
18 return err
19 }
20 if err := partition.encode(pe); err != nil {
21 return err
22 }
23 }
24
25 pe.putInt32(int32(c.Timeout / time.Millisecond))
26
27 pe.putBool(c.ValidateOnly)
28
29 return nil
30}
31
32func (c *CreatePartitionsRequest) decode(pd packetDecoder, version int16) (err error) {
33 n, err := pd.getArrayLength()
34 if err != nil {
35 return err
36 }
37 c.TopicPartitions = make(map[string]*TopicPartition, n)
38 for i := 0; i < n; i++ {
39 topic, err := pd.getString()
40 if err != nil {
41 return err
42 }
43 c.TopicPartitions[topic] = new(TopicPartition)
44 if err := c.TopicPartitions[topic].decode(pd, version); err != nil {
45 return err
46 }
47 }
48
49 timeout, err := pd.getInt32()
50 if err != nil {
51 return err
52 }
53 c.Timeout = time.Duration(timeout) * time.Millisecond
54
55 if c.ValidateOnly, err = pd.getBool(); err != nil {
56 return err
57 }
58
59 return nil
60}
61
62func (r *CreatePartitionsRequest) key() int16 {
63 return 37
64}
65
66func (r *CreatePartitionsRequest) version() int16 {
67 return 0
68}
69
70func (r *CreatePartitionsRequest) requiredVersion() KafkaVersion {
71 return V1_0_0_0
72}
73
74type TopicPartition struct {
75 Count int32
76 Assignment [][]int32
77}
78
79func (t *TopicPartition) encode(pe packetEncoder) error {
80 pe.putInt32(t.Count)
81
82 if len(t.Assignment) == 0 {
83 pe.putInt32(-1)
84 return nil
85 }
86
87 if err := pe.putArrayLength(len(t.Assignment)); err != nil {
88 return err
89 }
90
91 for _, assign := range t.Assignment {
92 if err := pe.putInt32Array(assign); err != nil {
93 return err
94 }
95 }
96
97 return nil
98}
99
100func (t *TopicPartition) decode(pd packetDecoder, version int16) (err error) {
101 if t.Count, err = pd.getInt32(); err != nil {
102 return err
103 }
104
105 n, err := pd.getInt32()
106 if err != nil {
107 return err
108 }
109 if n <= 0 {
110 return nil
111 }
112 t.Assignment = make([][]int32, n)
113
114 for i := 0; i < int(n); i++ {
115 if t.Assignment[i], err = pd.getInt32Array(); err != nil {
116 return err
117 }
118 }
119
120 return nil
121}