blob: 69349efe2ba0e42c722152cb20e4ca6f6fdc0403 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3type OffsetResponseBlock struct {
4 Err KError
5 Offsets []int64 // Version 0
6 Offset int64 // Version 1
7 Timestamp int64 // Version 1
8}
9
10func (b *OffsetResponseBlock) decode(pd packetDecoder, version int16) (err error) {
11 tmp, err := pd.getInt16()
12 if err != nil {
13 return err
14 }
15 b.Err = KError(tmp)
16
17 if version == 0 {
18 b.Offsets, err = pd.getInt64Array()
19
20 return err
21 }
22
23 b.Timestamp, err = pd.getInt64()
24 if err != nil {
25 return err
26 }
27
28 b.Offset, err = pd.getInt64()
29 if err != nil {
30 return err
31 }
32
33 // For backwards compatibility put the offset in the offsets array too
34 b.Offsets = []int64{b.Offset}
35
36 return nil
37}
38
39func (b *OffsetResponseBlock) encode(pe packetEncoder, version int16) (err error) {
40 pe.putInt16(int16(b.Err))
41
42 if version == 0 {
43 return pe.putInt64Array(b.Offsets)
44 }
45
46 pe.putInt64(b.Timestamp)
47 pe.putInt64(b.Offset)
48
49 return nil
50}
51
52type OffsetResponse struct {
khenaidood948f772021-08-11 17:49:24 -040053 Version int16
54 ThrottleTimeMs int32
55 Blocks map[string]map[int32]*OffsetResponseBlock
khenaidooac637102019-01-14 15:44:34 -050056}
57
58func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
khenaidood948f772021-08-11 17:49:24 -040059 if version >= 2 {
60 r.ThrottleTimeMs, err = pd.getInt32()
61 if err != nil {
62 return err
63 }
64 }
65
khenaidooac637102019-01-14 15:44:34 -050066 numTopics, err := pd.getArrayLength()
67 if err != nil {
68 return err
69 }
70
71 r.Blocks = make(map[string]map[int32]*OffsetResponseBlock, numTopics)
72 for i := 0; i < numTopics; i++ {
73 name, err := pd.getString()
74 if err != nil {
75 return err
76 }
77
78 numBlocks, err := pd.getArrayLength()
79 if err != nil {
80 return err
81 }
82
83 r.Blocks[name] = make(map[int32]*OffsetResponseBlock, numBlocks)
84
85 for j := 0; j < numBlocks; j++ {
86 id, err := pd.getInt32()
87 if err != nil {
88 return err
89 }
90
91 block := new(OffsetResponseBlock)
92 err = block.decode(pd, version)
93 if err != nil {
94 return err
95 }
96 r.Blocks[name][id] = block
97 }
98 }
99
100 return nil
101}
102
103func (r *OffsetResponse) GetBlock(topic string, partition int32) *OffsetResponseBlock {
104 if r.Blocks == nil {
105 return nil
106 }
107
108 if r.Blocks[topic] == nil {
109 return nil
110 }
111
112 return r.Blocks[topic][partition]
113}
114
115/*
116// [0 0 0 1 ntopics
1170 8 109 121 95 116 111 112 105 99 topic
1180 0 0 1 npartitions
1190 0 0 0 id
1200 0
121
1220 0 0 1 0 0 0 0
1230 1 1 1 0 0 0 1
1240 8 109 121 95 116 111 112
125105 99 0 0 0 1 0 0
1260 0 0 0 0 0 0 1
1270 0 0 0 0 1 1 1] <nil>
128
129*/
130func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
khenaidood948f772021-08-11 17:49:24 -0400131 if r.Version >= 2 {
132 pe.putInt32(r.ThrottleTimeMs)
133 }
134
khenaidooac637102019-01-14 15:44:34 -0500135 if err = pe.putArrayLength(len(r.Blocks)); err != nil {
136 return err
137 }
138
139 for topic, partitions := range r.Blocks {
140 if err = pe.putString(topic); err != nil {
141 return err
142 }
143 if err = pe.putArrayLength(len(partitions)); err != nil {
144 return err
145 }
146 for partition, block := range partitions {
147 pe.putInt32(partition)
148 if err = block.encode(pe, r.version()); err != nil {
149 return err
150 }
151 }
152 }
153
154 return nil
155}
156
157func (r *OffsetResponse) key() int16 {
158 return 2
159}
160
161func (r *OffsetResponse) version() int16 {
162 return r.Version
163}
164
khenaidood948f772021-08-11 17:49:24 -0400165func (r *OffsetResponse) headerVersion() int16 {
166 return 0
167}
168
khenaidooac637102019-01-14 15:44:34 -0500169func (r *OffsetResponse) requiredVersion() KafkaVersion {
170 switch r.Version {
171 case 1:
172 return V0_10_1_0
khenaidood948f772021-08-11 17:49:24 -0400173 case 2:
174 return V0_11_0_0
khenaidooac637102019-01-14 15:44:34 -0500175 default:
176 return MinVersion
177 }
178}
179
180// testing API
181
182func (r *OffsetResponse) AddTopicPartition(topic string, partition int32, offset int64) {
183 if r.Blocks == nil {
184 r.Blocks = make(map[string]map[int32]*OffsetResponseBlock)
185 }
186 byTopic, ok := r.Blocks[topic]
187 if !ok {
188 byTopic = make(map[int32]*OffsetResponseBlock)
189 r.Blocks[topic] = byTopic
190 }
191 byTopic[partition] = &OffsetResponseBlock{Offsets: []int64{offset}, Offset: offset}
192}