blob: a493e02accfaad294876d78da3bbdd84f70b4128 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "fmt"
5 "time"
6)
7
8type CreateTopicsResponse struct {
9 Version int16
10 ThrottleTime time.Duration
11 TopicErrors map[string]*TopicError
12}
13
14func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
15 if c.Version >= 2 {
16 pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
17 }
18
19 if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
20 return err
21 }
22 for topic, topicError := range c.TopicErrors {
23 if err := pe.putString(topic); err != nil {
24 return err
25 }
26 if err := topicError.encode(pe, c.Version); err != nil {
27 return err
28 }
29 }
30
31 return nil
32}
33
34func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
35 c.Version = version
36
37 if version >= 2 {
38 throttleTime, err := pd.getInt32()
39 if err != nil {
40 return err
41 }
42 c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
43 }
44
45 n, err := pd.getArrayLength()
46 if err != nil {
47 return err
48 }
49
50 c.TopicErrors = make(map[string]*TopicError, n)
51 for i := 0; i < n; i++ {
52 topic, err := pd.getString()
53 if err != nil {
54 return err
55 }
56 c.TopicErrors[topic] = new(TopicError)
57 if err := c.TopicErrors[topic].decode(pd, version); err != nil {
58 return err
59 }
60 }
61
62 return nil
63}
64
65func (c *CreateTopicsResponse) key() int16 {
66 return 19
67}
68
69func (c *CreateTopicsResponse) version() int16 {
70 return c.Version
71}
72
73func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
74 switch c.Version {
75 case 2:
76 return V1_0_0_0
77 case 1:
78 return V0_11_0_0
79 default:
80 return V0_10_1_0
81 }
82}
83
84type TopicError struct {
85 Err KError
86 ErrMsg *string
87}
88
89func (t *TopicError) Error() string {
90 text := t.Err.Error()
91 if t.ErrMsg != nil {
92 text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
93 }
94 return text
95}
96
97func (t *TopicError) encode(pe packetEncoder, version int16) error {
98 pe.putInt16(int16(t.Err))
99
100 if version >= 1 {
101 if err := pe.putNullableString(t.ErrMsg); err != nil {
102 return err
103 }
104 }
105
106 return nil
107}
108
109func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
110 kErr, err := pd.getInt16()
111 if err != nil {
112 return err
113 }
114 t.Err = KError(kErr)
115
116 if version >= 1 {
117 if t.ErrMsg, err = pd.getNullableString(); err != nil {
118 return err
119 }
120 }
121
122 return nil
123}