blob: 9931cade512d61da1c734340bfaf5ee360df208e [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import "errors"
4
5// ReceiveTime is a special value for the timestamp field of Offset Commit Requests which
6// tells the broker to set the timestamp to the time at which the request was received.
7// The timestamp is only used if message version 1 is used, which requires kafka 0.8.2.
8const ReceiveTime int64 = -1
9
10// GroupGenerationUndefined is a special value for the group generation field of
11// Offset Commit Requests that should be used when a consumer group does not rely
12// on Kafka for partition management.
13const GroupGenerationUndefined = -1
14
15type offsetCommitRequestBlock struct {
16 offset int64
17 timestamp int64
18 metadata string
19}
20
21func (b *offsetCommitRequestBlock) encode(pe packetEncoder, version int16) error {
22 pe.putInt64(b.offset)
23 if version == 1 {
24 pe.putInt64(b.timestamp)
25 } else if b.timestamp != 0 {
26 Logger.Println("Non-zero timestamp specified for OffsetCommitRequest not v1, it will be ignored")
27 }
28
29 return pe.putString(b.metadata)
30}
31
32func (b *offsetCommitRequestBlock) decode(pd packetDecoder, version int16) (err error) {
33 if b.offset, err = pd.getInt64(); err != nil {
34 return err
35 }
36 if version == 1 {
37 if b.timestamp, err = pd.getInt64(); err != nil {
38 return err
39 }
40 }
41 b.metadata, err = pd.getString()
42 return err
43}
44
45type OffsetCommitRequest struct {
46 ConsumerGroup string
47 ConsumerGroupGeneration int32 // v1 or later
48 ConsumerID string // v1 or later
49 RetentionTime int64 // v2 or later
50
51 // Version can be:
52 // - 0 (kafka 0.8.1 and later)
53 // - 1 (kafka 0.8.2 and later)
54 // - 2 (kafka 0.9.0 and later)
55 // - 3 (kafka 0.11.0 and later)
56 // - 4 (kafka 2.0.0 and later)
57 Version int16
58 blocks map[string]map[int32]*offsetCommitRequestBlock
59}
60
61func (r *OffsetCommitRequest) encode(pe packetEncoder) error {
62 if r.Version < 0 || r.Version > 4 {
63 return PacketEncodingError{"invalid or unsupported OffsetCommitRequest version field"}
64 }
65
66 if err := pe.putString(r.ConsumerGroup); err != nil {
67 return err
68 }
69
70 if r.Version >= 1 {
71 pe.putInt32(r.ConsumerGroupGeneration)
72 if err := pe.putString(r.ConsumerID); err != nil {
73 return err
74 }
75 } else {
76 if r.ConsumerGroupGeneration != 0 {
77 Logger.Println("Non-zero ConsumerGroupGeneration specified for OffsetCommitRequest v0, it will be ignored")
78 }
79 if r.ConsumerID != "" {
80 Logger.Println("Non-empty ConsumerID specified for OffsetCommitRequest v0, it will be ignored")
81 }
82 }
83
84 if r.Version >= 2 {
85 pe.putInt64(r.RetentionTime)
86 } else if r.RetentionTime != 0 {
87 Logger.Println("Non-zero RetentionTime specified for OffsetCommitRequest version <2, it will be ignored")
88 }
89
90 if err := pe.putArrayLength(len(r.blocks)); err != nil {
91 return err
92 }
93 for topic, partitions := range r.blocks {
94 if err := pe.putString(topic); err != nil {
95 return err
96 }
97 if err := pe.putArrayLength(len(partitions)); err != nil {
98 return err
99 }
100 for partition, block := range partitions {
101 pe.putInt32(partition)
102 if err := block.encode(pe, r.Version); err != nil {
103 return err
104 }
105 }
106 }
107 return nil
108}
109
110func (r *OffsetCommitRequest) decode(pd packetDecoder, version int16) (err error) {
111 r.Version = version
112
113 if r.ConsumerGroup, err = pd.getString(); err != nil {
114 return err
115 }
116
117 if r.Version >= 1 {
118 if r.ConsumerGroupGeneration, err = pd.getInt32(); err != nil {
119 return err
120 }
121 if r.ConsumerID, err = pd.getString(); err != nil {
122 return err
123 }
124 }
125
126 if r.Version >= 2 {
127 if r.RetentionTime, err = pd.getInt64(); err != nil {
128 return err
129 }
130 }
131
132 topicCount, err := pd.getArrayLength()
133 if err != nil {
134 return err
135 }
136 if topicCount == 0 {
137 return nil
138 }
139 r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
140 for i := 0; i < topicCount; i++ {
141 topic, err := pd.getString()
142 if err != nil {
143 return err
144 }
145 partitionCount, err := pd.getArrayLength()
146 if err != nil {
147 return err
148 }
149 r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
150 for j := 0; j < partitionCount; j++ {
151 partition, err := pd.getInt32()
152 if err != nil {
153 return err
154 }
155 block := &offsetCommitRequestBlock{}
156 if err := block.decode(pd, r.Version); err != nil {
157 return err
158 }
159 r.blocks[topic][partition] = block
160 }
161 }
162 return nil
163}
164
165func (r *OffsetCommitRequest) key() int16 {
166 return 8
167}
168
169func (r *OffsetCommitRequest) version() int16 {
170 return r.Version
171}
172
khenaidood948f772021-08-11 17:49:24 -0400173func (r *OffsetCommitRequest) headerVersion() int16 {
174 return 1
175}
176
khenaidooac637102019-01-14 15:44:34 -0500177func (r *OffsetCommitRequest) requiredVersion() KafkaVersion {
178 switch r.Version {
179 case 1:
180 return V0_8_2_0
181 case 2:
182 return V0_9_0_0
183 case 3:
184 return V0_11_0_0
185 case 4:
186 return V2_0_0_0
187 default:
188 return MinVersion
189 }
190}
191
192func (r *OffsetCommitRequest) AddBlock(topic string, partitionID int32, offset int64, timestamp int64, metadata string) {
193 if r.blocks == nil {
194 r.blocks = make(map[string]map[int32]*offsetCommitRequestBlock)
195 }
196
197 if r.blocks[topic] == nil {
198 r.blocks[topic] = make(map[int32]*offsetCommitRequestBlock)
199 }
200
201 r.blocks[topic][partitionID] = &offsetCommitRequestBlock{offset, timestamp, metadata}
202}
203
204func (r *OffsetCommitRequest) Offset(topic string, partitionID int32) (int64, string, error) {
205 partitions := r.blocks[topic]
206 if partitions == nil {
Scott Baker8461e152019-10-01 14:44:30 -0700207 return 0, "", errors.New("no such offset")
khenaidooac637102019-01-14 15:44:34 -0500208 }
209 block := partitions[partitionID]
210 if block == nil {
Scott Baker8461e152019-10-01 14:44:30 -0700211 return 0, "", errors.New("no such offset")
khenaidooac637102019-01-14 15:44:34 -0500212 }
213 return block.offset, block.metadata, nil
214}