blob: 66207e00c5d4fb0a509e1c93b6f65fbcdfd7e084 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import "time"
4
5type CreateTopicsResponse struct {
6 Version int16
7 ThrottleTime time.Duration
8 TopicErrors map[string]*TopicError
9}
10
11func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
12 if c.Version >= 2 {
13 pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
14 }
15
16 if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
17 return err
18 }
19 for topic, topicError := range c.TopicErrors {
20 if err := pe.putString(topic); err != nil {
21 return err
22 }
23 if err := topicError.encode(pe, c.Version); err != nil {
24 return err
25 }
26 }
27
28 return nil
29}
30
31func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
32 c.Version = version
33
34 if version >= 2 {
35 throttleTime, err := pd.getInt32()
36 if err != nil {
37 return err
38 }
39 c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
40 }
41
42 n, err := pd.getArrayLength()
43 if err != nil {
44 return err
45 }
46
47 c.TopicErrors = make(map[string]*TopicError, n)
48 for i := 0; i < n; i++ {
49 topic, err := pd.getString()
50 if err != nil {
51 return err
52 }
53 c.TopicErrors[topic] = new(TopicError)
54 if err := c.TopicErrors[topic].decode(pd, version); err != nil {
55 return err
56 }
57 }
58
59 return nil
60}
61
62func (c *CreateTopicsResponse) key() int16 {
63 return 19
64}
65
66func (c *CreateTopicsResponse) version() int16 {
67 return c.Version
68}
69
70func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
71 switch c.Version {
72 case 2:
73 return V1_0_0_0
74 case 1:
75 return V0_11_0_0
76 default:
77 return V0_10_1_0
78 }
79}
80
81type TopicError struct {
82 Err KError
83 ErrMsg *string
84}
85
86func (t *TopicError) encode(pe packetEncoder, version int16) error {
87 pe.putInt16(int16(t.Err))
88
89 if version >= 1 {
90 if err := pe.putNullableString(t.ErrMsg); err != nil {
91 return err
92 }
93 }
94
95 return nil
96}
97
98func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
99 kErr, err := pd.getInt16()
100 if err != nil {
101 return err
102 }
103 t.Err = KError(kErr)
104
105 if version >= 1 {
106 if t.ErrMsg, err = pd.getNullableString(); err != nil {
107 return err
108 }
109 }
110
111 return nil
112}