blob: 7e1448a6692d085f2c161cc75d330aaa174425a5 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
Scott Baker8461e152019-10-01 14:44:30 -07003import (
4 "fmt"
5 "time"
6)
khenaidooac637102019-01-14 15:44:34 -05007
8type CreateTopicsResponse struct {
9 Version int16
10 ThrottleTime time.Duration
11 TopicErrors map[string]*TopicError
12}
13
14func (c *CreateTopicsResponse) encode(pe packetEncoder) error {
15 if c.Version >= 2 {
16 pe.putInt32(int32(c.ThrottleTime / time.Millisecond))
17 }
18
19 if err := pe.putArrayLength(len(c.TopicErrors)); err != nil {
20 return err
21 }
22 for topic, topicError := range c.TopicErrors {
23 if err := pe.putString(topic); err != nil {
24 return err
25 }
26 if err := topicError.encode(pe, c.Version); err != nil {
27 return err
28 }
29 }
30
31 return nil
32}
33
34func (c *CreateTopicsResponse) decode(pd packetDecoder, version int16) (err error) {
35 c.Version = version
36
37 if version >= 2 {
38 throttleTime, err := pd.getInt32()
39 if err != nil {
40 return err
41 }
42 c.ThrottleTime = time.Duration(throttleTime) * time.Millisecond
43 }
44
45 n, err := pd.getArrayLength()
46 if err != nil {
47 return err
48 }
49
50 c.TopicErrors = make(map[string]*TopicError, n)
51 for i := 0; i < n; i++ {
52 topic, err := pd.getString()
53 if err != nil {
54 return err
55 }
56 c.TopicErrors[topic] = new(TopicError)
57 if err := c.TopicErrors[topic].decode(pd, version); err != nil {
58 return err
59 }
60 }
61
62 return nil
63}
64
65func (c *CreateTopicsResponse) key() int16 {
66 return 19
67}
68
69func (c *CreateTopicsResponse) version() int16 {
70 return c.Version
71}
72
khenaidood948f772021-08-11 17:49:24 -040073func (c *CreateTopicsResponse) headerVersion() int16 {
74 return 0
75}
76
khenaidooac637102019-01-14 15:44:34 -050077func (c *CreateTopicsResponse) requiredVersion() KafkaVersion {
78 switch c.Version {
79 case 2:
80 return V1_0_0_0
81 case 1:
82 return V0_11_0_0
83 default:
84 return V0_10_1_0
85 }
86}
87
88type TopicError struct {
89 Err KError
90 ErrMsg *string
91}
92
Scott Baker8461e152019-10-01 14:44:30 -070093func (t *TopicError) Error() string {
94 text := t.Err.Error()
95 if t.ErrMsg != nil {
96 text = fmt.Sprintf("%s - %s", text, *t.ErrMsg)
97 }
98 return text
99}
100
khenaidooac637102019-01-14 15:44:34 -0500101func (t *TopicError) encode(pe packetEncoder, version int16) error {
102 pe.putInt16(int16(t.Err))
103
104 if version >= 1 {
105 if err := pe.putNullableString(t.ErrMsg); err != nil {
106 return err
107 }
108 }
109
110 return nil
111}
112
113func (t *TopicError) decode(pd packetDecoder, version int16) (err error) {
114 kErr, err := pd.getInt16()
115 if err != nil {
116 return err
117 }
118 t.Err = KError(kErr)
119
120 if version >= 1 {
121 if t.ErrMsg, err = pd.getNullableString(); err != nil {
122 return err
123 }
124 }
125
126 return nil
127}