blob: 9e257028040ab02531e72ac31f6240c83ffc4800 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3type OffsetFetchResponseBlock struct {
4 Offset int64
5 LeaderEpoch int32
6 Metadata string
7 Err KError
8}
9
10func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
11 b.Offset, err = pd.getInt64()
12 if err != nil {
13 return err
14 }
15
16 if version >= 5 {
17 b.LeaderEpoch, err = pd.getInt32()
18 if err != nil {
19 return err
20 }
21 }
22
23 b.Metadata, err = pd.getString()
24 if err != nil {
25 return err
26 }
27
28 tmp, err := pd.getInt16()
29 if err != nil {
30 return err
31 }
32 b.Err = KError(tmp)
33
34 return nil
35}
36
37func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
38 pe.putInt64(b.Offset)
39
40 if version >= 5 {
41 pe.putInt32(b.LeaderEpoch)
42 }
43
44 err = pe.putString(b.Metadata)
45 if err != nil {
46 return err
47 }
48
49 pe.putInt16(int16(b.Err))
50
51 return nil
52}
53
54type OffsetFetchResponse struct {
55 Version int16
56 ThrottleTimeMs int32
57 Blocks map[string]map[int32]*OffsetFetchResponseBlock
58 Err KError
59}
60
61func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
62 if r.Version >= 3 {
63 pe.putInt32(r.ThrottleTimeMs)
64 }
65
66 if err := pe.putArrayLength(len(r.Blocks)); err != nil {
67 return err
68 }
69 for topic, partitions := range r.Blocks {
70 if err := pe.putString(topic); err != nil {
71 return err
72 }
73 if err := pe.putArrayLength(len(partitions)); err != nil {
74 return err
75 }
76 for partition, block := range partitions {
77 pe.putInt32(partition)
78 if err := block.encode(pe, r.Version); err != nil {
79 return err
80 }
81 }
82 }
83 if r.Version >= 2 {
84 pe.putInt16(int16(r.Err))
85 }
86 return nil
87}
88
89func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
90 r.Version = version
91
92 if version >= 3 {
93 r.ThrottleTimeMs, err = pd.getInt32()
94 if err != nil {
95 return err
96 }
97 }
98
99 numTopics, err := pd.getArrayLength()
100 if err != nil {
101 return err
102 }
103
104 if numTopics > 0 {
105 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
106 for i := 0; i < numTopics; i++ {
107 name, err := pd.getString()
108 if err != nil {
109 return err
110 }
111
112 numBlocks, err := pd.getArrayLength()
113 if err != nil {
114 return err
115 }
116
117 if numBlocks == 0 {
118 r.Blocks[name] = nil
119 continue
120 }
121 r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
122
123 for j := 0; j < numBlocks; j++ {
124 id, err := pd.getInt32()
125 if err != nil {
126 return err
127 }
128
129 block := new(OffsetFetchResponseBlock)
130 err = block.decode(pd, version)
131 if err != nil {
132 return err
133 }
134 r.Blocks[name][id] = block
135 }
136 }
137 }
138
139 if version >= 2 {
140 kerr, err := pd.getInt16()
141 if err != nil {
142 return err
143 }
144 r.Err = KError(kerr)
145 }
146
147 return nil
148}
149
150func (r *OffsetFetchResponse) key() int16 {
151 return 9
152}
153
154func (r *OffsetFetchResponse) version() int16 {
155 return r.Version
156}
157
158func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
159 switch r.Version {
160 case 1:
161 return V0_8_2_0
162 case 2:
163 return V0_10_2_0
164 case 3:
165 return V0_11_0_0
166 case 4:
167 return V2_0_0_0
168 case 5:
169 return V2_1_0_0
170 default:
171 return MinVersion
172 }
173}
174
175func (r *OffsetFetchResponse) GetBlock(topic string, partition int32) *OffsetFetchResponseBlock {
176 if r.Blocks == nil {
177 return nil
178 }
179
180 if r.Blocks[topic] == nil {
181 return nil
182 }
183
184 return r.Blocks[topic][partition]
185}
186
187func (r *OffsetFetchResponse) AddBlock(topic string, partition int32, block *OffsetFetchResponseBlock) {
188 if r.Blocks == nil {
189 r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock)
190 }
191 partitions := r.Blocks[topic]
192 if partitions == nil {
193 partitions = make(map[int32]*OffsetFetchResponseBlock)
194 r.Blocks[topic] = partitions
195 }
196 partitions[partition] = block
197}