blob: e842298dbb683a62eb82c8045b244016937cc930 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type OffsetCommitResponse struct {
4 Version int16
5 ThrottleTimeMs int32
6 Errors map[string]map[int32]KError
7}
8
9func (r *OffsetCommitResponse) AddError(topic string, partition int32, kerror KError) {
10 if r.Errors == nil {
11 r.Errors = make(map[string]map[int32]KError)
12 }
13 partitions := r.Errors[topic]
14 if partitions == nil {
15 partitions = make(map[int32]KError)
16 r.Errors[topic] = partitions
17 }
18 partitions[partition] = kerror
19}
20
21func (r *OffsetCommitResponse) encode(pe packetEncoder) error {
22 if r.Version >= 3 {
23 pe.putInt32(r.ThrottleTimeMs)
24 }
25 if err := pe.putArrayLength(len(r.Errors)); err != nil {
26 return err
27 }
28 for topic, partitions := range r.Errors {
29 if err := pe.putString(topic); err != nil {
30 return err
31 }
32 if err := pe.putArrayLength(len(partitions)); err != nil {
33 return err
34 }
35 for partition, kerror := range partitions {
36 pe.putInt32(partition)
37 pe.putInt16(int16(kerror))
38 }
39 }
40 return nil
41}
42
43func (r *OffsetCommitResponse) decode(pd packetDecoder, version int16) (err error) {
44 r.Version = version
45
46 if version >= 3 {
47 r.ThrottleTimeMs, err = pd.getInt32()
48 if err != nil {
49 return err
50 }
51 }
52
53 numTopics, err := pd.getArrayLength()
54 if err != nil || numTopics == 0 {
55 return err
56 }
57
58 r.Errors = make(map[string]map[int32]KError, numTopics)
59 for i := 0; i < numTopics; i++ {
60 name, err := pd.getString()
61 if err != nil {
62 return err
63 }
64
65 numErrors, err := pd.getArrayLength()
66 if err != nil {
67 return err
68 }
69
70 r.Errors[name] = make(map[int32]KError, numErrors)
71
72 for j := 0; j < numErrors; j++ {
73 id, err := pd.getInt32()
74 if err != nil {
75 return err
76 }
77
78 tmp, err := pd.getInt16()
79 if err != nil {
80 return err
81 }
82 r.Errors[name][id] = KError(tmp)
83 }
84 }
85
86 return nil
87}
88
89func (r *OffsetCommitResponse) key() int16 {
90 return 8
91}
92
93func (r *OffsetCommitResponse) version() int16 {
94 return r.Version
95}
96
97func (r *OffsetCommitResponse) requiredVersion() KafkaVersion {
98 switch r.Version {
99 case 1:
100 return V0_8_2_0
101 case 2:
102 return V0_9_0_0
103 case 3:
104 return V0_11_0_0
105 case 4:
106 return V2_0_0_0
107 default:
108 return MinVersion
109 }
110}