blob: 7e147eb60c1924fdc6a328e4bebab6bee8e2e4e7 [file] [log] [blame]
Holger Hildebrandtfa074992020-03-27 15:42:06 +00001package sarama
2
3type OffsetFetchRequest struct {
4 Version int16
5 ConsumerGroup string
khenaidoo7d3c5582021-08-11 18:09:44 -04006 RequireStable bool // requires v7+
Holger Hildebrandtfa074992020-03-27 15:42:06 +00007 partitions map[string][]int32
8}
9
10func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
khenaidoo7d3c5582021-08-11 18:09:44 -040011 if r.Version < 0 || r.Version > 7 {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000012 return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
13 }
14
khenaidoo7d3c5582021-08-11 18:09:44 -040015 isFlexible := r.Version >= 6
16
17 if isFlexible {
18 err = pe.putCompactString(r.ConsumerGroup)
19 } else {
20 err = pe.putString(r.ConsumerGroup)
21 }
22 if err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000023 return err
24 }
25
khenaidoo7d3c5582021-08-11 18:09:44 -040026 if isFlexible {
27 if r.partitions == nil {
28 pe.putUVarint(0)
29 } else {
30 pe.putCompactArrayLength(len(r.partitions))
Holger Hildebrandtfa074992020-03-27 15:42:06 +000031 }
khenaidoo7d3c5582021-08-11 18:09:44 -040032 } else {
33 if r.partitions == nil && r.Version >= 2 {
34 pe.putInt32(-1)
35 } else {
36 if err = pe.putArrayLength(len(r.partitions)); err != nil {
Holger Hildebrandtfa074992020-03-27 15:42:06 +000037 return err
38 }
39 }
40 }
khenaidoo7d3c5582021-08-11 18:09:44 -040041
42 for topic, partitions := range r.partitions {
43 if isFlexible {
44 err = pe.putCompactString(topic)
45 } else {
46 err = pe.putString(topic)
47 }
48 if err != nil {
49 return err
50 }
51
52 //
53
54 if isFlexible {
55 err = pe.putCompactInt32Array(partitions)
56 } else {
57 err = pe.putInt32Array(partitions)
58 }
59 if err != nil {
60 return err
61 }
62
63 if isFlexible {
64 pe.putEmptyTaggedFieldArray()
65 }
66 }
67
68 if r.RequireStable && r.Version < 7 {
69 return PacketEncodingError{"requireStable is not supported. use version 7 or later"}
70 }
71
72 if r.Version >= 7 {
73 pe.putBool(r.RequireStable)
74 }
75
76 if isFlexible {
77 pe.putEmptyTaggedFieldArray()
78 }
79
Holger Hildebrandtfa074992020-03-27 15:42:06 +000080 return nil
81}
82
83func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
84 r.Version = version
khenaidoo7d3c5582021-08-11 18:09:44 -040085 isFlexible := r.Version >= 6
86 if isFlexible {
87 r.ConsumerGroup, err = pd.getCompactString()
88 } else {
89 r.ConsumerGroup, err = pd.getString()
Holger Hildebrandtfa074992020-03-27 15:42:06 +000090 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +000091 if err != nil {
92 return err
93 }
khenaidoo7d3c5582021-08-11 18:09:44 -040094
95 var partitionCount int
96
97 if isFlexible {
98 partitionCount, err = pd.getCompactArrayLength()
99 } else {
100 partitionCount, err = pd.getArrayLength()
101 }
102 if err != nil {
103 return err
104 }
105
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000106 if (partitionCount == 0 && version < 2) || partitionCount < 0 {
107 return nil
108 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400109
110 r.partitions = make(map[string][]int32, partitionCount)
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000111 for i := 0; i < partitionCount; i++ {
khenaidoo7d3c5582021-08-11 18:09:44 -0400112 var topic string
113 if isFlexible {
114 topic, err = pd.getCompactString()
115 } else {
116 topic, err = pd.getString()
117 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000118 if err != nil {
119 return err
120 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400121
122 var partitions []int32
123 if isFlexible {
124 partitions, err = pd.getCompactInt32Array()
125 } else {
126 partitions, err = pd.getInt32Array()
127 }
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000128 if err != nil {
129 return err
130 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400131 if isFlexible {
132 _, err = pd.getEmptyTaggedFieldArray()
133 if err != nil {
134 return err
135 }
136 }
137
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000138 r.partitions[topic] = partitions
139 }
khenaidoo7d3c5582021-08-11 18:09:44 -0400140
141 if r.Version >= 7 {
142 r.RequireStable, err = pd.getBool()
143 if err != nil {
144 return err
145 }
146 }
147
148 if isFlexible {
149 _, err = pd.getEmptyTaggedFieldArray()
150 if err != nil {
151 return err
152 }
153 }
154
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000155 return nil
156}
157
158func (r *OffsetFetchRequest) key() int16 {
159 return 9
160}
161
162func (r *OffsetFetchRequest) version() int16 {
163 return r.Version
164}
165
khenaidoo7d3c5582021-08-11 18:09:44 -0400166func (r *OffsetFetchRequest) headerVersion() int16 {
167 if r.Version >= 6 {
168 return 2
169 }
170
171 return 1
172}
173
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000174func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
175 switch r.Version {
176 case 1:
177 return V0_8_2_0
178 case 2:
179 return V0_10_2_0
180 case 3:
181 return V0_11_0_0
182 case 4:
183 return V2_0_0_0
184 case 5:
185 return V2_1_0_0
khenaidoo7d3c5582021-08-11 18:09:44 -0400186 case 6:
187 return V2_4_0_0
188 case 7:
189 return V2_5_0_0
Holger Hildebrandtfa074992020-03-27 15:42:06 +0000190 default:
191 return MinVersion
192 }
193}
194
195func (r *OffsetFetchRequest) ZeroPartitions() {
196 if r.partitions == nil && r.Version >= 2 {
197 r.partitions = make(map[string][]int32)
198 }
199}
200
201func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
202 if r.partitions == nil {
203 r.partitions = make(map[string][]int32)
204 }
205
206 r.partitions[topic] = append(r.partitions[topic], partitionID)
207}