blob: 342260ef5995285b8b40ca5d56b2733861a8332e [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type OffsetCommitResponse struct {
4 Version int16
5 ThrottleTimeMs int32
6 Errors map[string]map[int32]KError
7}
8
9func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
10 if r.Errors == nil {
11 r.Errors = make(map[string]map[int32]KError)
12 }
13 partitions := r.Errors[topic]
14 if partitions == nil {
15 partitions = make(map[int32]KError)
16 r.Errors[topic] = partitions
17 }
18 partitions[partition] = kerror
19}
20
21func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
22 if r.Version >= 3 {
23 pe.putInt32(r.ThrottleTimeMs)
24 }
25 if err := pe.putArrayLength(len(r.Errors)); err != nil {
26 return err
27 }
28 for topic, partitions := range r.Errors {
29 if err := pe.putString(topic); err != nil {
30 return err
31 }
32 if err := pe.putArrayLength(len(partitions)); err != nil {
33 return err
34 }
35 for partition, kerror := range partitions {
36 pe.putInt32(partition)
37 pe.putInt16(int16(kerror))
38 }
39 }
40 return nil
41}
42
43func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
44 r.Version = version
45
46 if version >= 3 {
47 r.ThrottleTimeMs, err = pd.getInt32()
48 if err != nil {
49 return err
50 }
51 }
52
53 numTopics, err := pd.getArrayLength()
54 if err != nil || numTopics == 0 {
55 return err
56 }
57
58 r.Errors = make(map[string]map[int32]KError, numTopics)
59 for i := 0; i < numTopics; i++ {
60 name, err := pd.getString()
61 if err != nil {
62 return err
63 }
64
65 numErrors, err := pd.getArrayLength()
66 if err != nil {
67 return err
68 }
69
70 r.Errors[name] = make(map[int32]KError, numErrors)
71
72 for j := 0; j < numErrors; j++ {
73 id, err := pd.getInt32()
74 if err != nil {
75 return err
76 }
77
78 tmp, err := pd.getInt16()
79 if err != nil {
80 return err
81 }
82 r.Errors[name][id] = KError(tmp)
83 }
84 }
85
86 return nil
87}
88
89func (r *OffsetCommitResponse) key() int16 {
90 return 8
91}
92
93func (r *OffsetCommitResponse) version() int16 {
94 return r.Version
95}
96
khenaidood948f772021-08-11 17:49:24 -040097func (r *OffsetCommitResponse) headerVersion() int16 {
98 return 0
99}
100
khenaidooac637102019-01-14 15:44:34 -0500101func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
102 switch r.Version {
103 case 1:
104 return V0_8_2_0
105 case 2:
106 return V0_9_0_0
107 case 3:
108 return V0_11_0_0
109 case 4:
110 return V2_0_0_0
111 default:
112 return MinVersion
113 }
114}