blob: bb18204a7c204d92752bfb9a34a146a125780f99 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type CreatePartitionsResponse struct {
9 ThrottleTime time.Duration
10 TopicPartitionErrors map[string]*TopicPartitionError
11}
12
13func (c *CreatePartitionsResponse) encode(pe packetEncoder) error {
14 pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
15 if err := pe.putArrayLength(len(c.TopicPartitionErrors)); err != nil {
16 return err
17 }
18
19 for topic, partitionError := range c.TopicPartitionErrors {
20 if err := pe.putString(topic); err != nil {
21 return err
22 }
23 if err := partitionError.encode(pe); err != nil {
24 return err
25 }
26 }
27
28 return nil
29}
30
31func (c *CreatePartitionsResponse) decode(pd packetDecoder, version int16) (err error) {
32 throttleTime, err := pd.getInt32()
33 if err != nil {
34 return err
35 }
36 c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
37
38 n, err := pd.getArrayLength()
39 if err != nil {
40 return err
41 }
42
43 c.TopicPartitionErrors = make(map[string]*TopicPartitionError, n)
44 for i := 0; i < n; i++ {
45 topic, err := pd.getString()
46 if err != nil {
47 return err
48 }
49 c.TopicPartitionErrors[topic] = new(TopicPartitionError)
50 if err := c.TopicPartitionErrors[topic].decode(pd, version); err != nil {
51 return err
52 }
53 }
54
55 return nil
56}
57
58func (r *CreatePartitionsResponse) key() int16 {
59 return 37
60}
61
62func (r *CreatePartitionsResponse) version() int16 {
63 return 0
64}
65
66func (r *CreatePartitionsResponse) requiredVersion() KafkaVersion {
67 return V1_0_0_0
68}
69
70type TopicPartitionError struct {
71 Err KError
72 ErrMsg *string
73}
74
75func (t *TopicPartitionError) Error() string {
76 text := t.Err.Error()
77 if t.ErrMsg != nil {
78 text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
79 }
80 return text
81}
82
83func (t *TopicPartitionError) encode(pe packetEncoder) error {
84 pe.putInt16(int16(t.Err))
85
86 if err := pe.putNullableString(t.ErrMsg); err != nil {
87 return err
88 }
89
90 return nil
91}
92
93func (t *TopicPartitionError) decode(pd packetDecoder, version int16) (err error) {
94 kerr, err := pd.getInt16()
95 if err != nil {
96 return err
97 }
98 t.Err = KError(kerr)
99
100 if t.ErrMsg, err = pd.getNullableString(); err != nil {
101 return err
102 }
103
104 return nil
105}