blob: 68608241ff64ddaf0d4fa7b135f49d5328faa568 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3type OffsetFetchRequest struct {
4 Version int16
5 ConsumerGroup string
6 partitions map[string][]int32
7}
8
9func (r *OffsetFetchRequest) encode(pe packetEncoder) (err error) {
10 if r.Version < 0 || r.Version > 5 {
11 return PacketEncodingError{"invalid or unsupported OffsetFetchRequest version field"}
12 }
13
14 if err = pe.putString(r.ConsumerGroup); err != nil {
15 return err
16 }
17
18 if r.Version >= 2 && r.partitions == nil {
19 pe.putInt32(-1)
20 } else {
21 if err = pe.putArrayLength(len(r.partitions)); err != nil {
22 return err
23 }
24 for topic, partitions := range r.partitions {
25 if err = pe.putString(topic); err != nil {
26 return err
27 }
28 if err = pe.putInt32Array(partitions); err != nil {
29 return err
30 }
31 }
32 }
33 return nil
34}
35
36func (r *OffsetFetchRequest) decode(pd packetDecoder, version int16) (err error) {
37 r.Version = version
38 if r.ConsumerGroup, err = pd.getString(); err != nil {
39 return err
40 }
41 partitionCount, err := pd.getArrayLength()
42 if err != nil {
43 return err
44 }
45 if (partitionCount == 0 && version < 2) || partitionCount < 0 {
46 return nil
47 }
48 r.partitions = make(map[string][]int32)
49 for i := 0; i < partitionCount; i++ {
50 topic, err := pd.getString()
51 if err != nil {
52 return err
53 }
54 partitions, err := pd.getInt32Array()
55 if err != nil {
56 return err
57 }
58 r.partitions[topic] = partitions
59 }
60 return nil
61}
62
63func (r *OffsetFetchRequest) key() int16 {
64 return 9
65}
66
67func (r *OffsetFetchRequest) version() int16 {
68 return r.Version
69}
70
71func (r *OffsetFetchRequest) requiredVersion() KafkaVersion {
72 switch r.Version {
73 case 1:
74 return V0_8_2_0
75 case 2:
76 return V0_10_2_0
77 case 3:
78 return V0_11_0_0
79 case 4:
80 return V2_0_0_0
81 case 5:
82 return V2_1_0_0
83 default:
84 return MinVersion
85 }
86}
87
88func (r *OffsetFetchRequest) ZeroPartitions() {
89 if r.partitions == nil && r.Version >= 2 {
90 r.partitions = make(map[string][]int32)
91 }
92}
93
94func (r *OffsetFetchRequest) AddPartition(topic string, partitionID int32) {
95 if r.partitions == nil {
96 r.partitions = make(map[string][]int32)
97 }
98
99 r.partitions[topic] = append(r.partitions[topic], partitionID)
100}