blob: 4f480a08b905c2b4abaa1eeb4a517eb158f85987 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
khenaidood948f772021-08-11 17:49:24 -040022
23 // Commit commits the offsets. This method can be used if AutoCommit.Enable is
24 // set to false.
25 Commit()
khenaidooac637102019-01-14 15:44:34 -050026}
27
28type offsetManager struct {
29 client Client
30 conf *Config
31 group string
32 ticker *time.Ticker
33
34 memberID string
35 generation int32
36
37 broker *Broker
38 brokerLock sync.RWMutex
39
40 poms map[string]map[int32]*partitionOffsetManager
41 pomsLock sync.RWMutex
42
43 closeOnce sync.Once
44 closing chan none
45 closed chan none
46}
47
48// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
49// It is still necessary to call Close() on the underlying client when finished with the partition manager.
50func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
51 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
52}
53
54func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
55 // Check that we are not dealing with a closed Client before processing any other arguments
56 if client.Closed() {
57 return nil, ErrClosedClient
58 }
59
60 conf := client.Config()
61 om := &offsetManager{
62 client: client,
63 conf: conf,
64 group: group,
khenaidooac637102019-01-14 15:44:34 -050065 poms: make(map[string]map[int32]*partitionOffsetManager),
66
67 memberID: memberID,
68 generation: generation,
69
70 closing: make(chan none),
71 closed: make(chan none),
72 }
khenaidood948f772021-08-11 17:49:24 -040073 if conf.Consumer.Offsets.AutoCommit.Enable {
74 om.ticker = time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval)
75 go withRecover(om.mainLoop)
76 }
khenaidooac637102019-01-14 15:44:34 -050077
78 return om, nil
79}
80
81func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
82 pom, err := om.newPartitionOffsetManager(topic, partition)
83 if err != nil {
84 return nil, err
85 }
86
87 om.pomsLock.Lock()
88 defer om.pomsLock.Unlock()
89
90 topicManagers := om.poms[topic]
91 if topicManagers == nil {
92 topicManagers = make(map[int32]*partitionOffsetManager)
93 om.poms[topic] = topicManagers
94 }
95
96 if topicManagers[partition] != nil {
97 return nil, ConfigurationError("That topic/partition is already being managed")
98 }
99
100 topicManagers[partition] = pom
101 return pom, nil
102}
103
104func (om *offsetManager) Close() error {
105 om.closeOnce.Do(func() {
106 // exit the mainLoop
107 close(om.closing)
khenaidood948f772021-08-11 17:49:24 -0400108 if om.conf.Consumer.Offsets.AutoCommit.Enable {
109 <-om.closed
110 }
khenaidooac637102019-01-14 15:44:34 -0500111
112 // mark all POMs as closed
113 om.asyncClosePOMs()
114
115 // flush one last time
khenaidood948f772021-08-11 17:49:24 -0400116 if om.conf.Consumer.Offsets.AutoCommit.Enable {
117 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
118 om.flushToBroker()
119 if om.releasePOMs(false) == 0 {
120 break
121 }
khenaidooac637102019-01-14 15:44:34 -0500122 }
123 }
124
125 om.releasePOMs(true)
126 om.brokerLock.Lock()
127 om.broker = nil
128 om.brokerLock.Unlock()
129 })
130 return nil
131}
132
William Kurkiandaa6bb22019-03-07 12:26:28 -0500133func (om *offsetManager) computeBackoff(retries int) time.Duration {
134 if om.conf.Metadata.Retry.BackoffFunc != nil {
135 return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
136 } else {
137 return om.conf.Metadata.Retry.Backoff
138 }
139}
140
khenaidooac637102019-01-14 15:44:34 -0500141func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
142 broker, err := om.coordinator()
143 if err != nil {
144 if retries <= 0 {
145 return 0, "", err
146 }
147 return om.fetchInitialOffset(topic, partition, retries-1)
148 }
149
150 req := new(OffsetFetchRequest)
151 req.Version = 1
152 req.ConsumerGroup = om.group
153 req.AddPartition(topic, partition)
154
155 resp, err := broker.FetchOffset(req)
156 if err != nil {
157 if retries <= 0 {
158 return 0, "", err
159 }
160 om.releaseCoordinator(broker)
161 return om.fetchInitialOffset(topic, partition, retries-1)
162 }
163
164 block := resp.GetBlock(topic, partition)
165 if block == nil {
166 return 0, "", ErrIncompleteResponse
167 }
168
169 switch block.Err {
170 case ErrNoError:
171 return block.Offset, block.Metadata, nil
172 case ErrNotCoordinatorForConsumer:
173 if retries <= 0 {
174 return 0, "", block.Err
175 }
176 om.releaseCoordinator(broker)
177 return om.fetchInitialOffset(topic, partition, retries-1)
178 case ErrOffsetsLoadInProgress:
179 if retries <= 0 {
180 return 0, "", block.Err
181 }
William Kurkiandaa6bb22019-03-07 12:26:28 -0500182 backoff := om.computeBackoff(retries)
khenaidooac637102019-01-14 15:44:34 -0500183 select {
184 case <-om.closing:
185 return 0, "", block.Err
William Kurkiandaa6bb22019-03-07 12:26:28 -0500186 case <-time.After(backoff):
khenaidooac637102019-01-14 15:44:34 -0500187 }
188 return om.fetchInitialOffset(topic, partition, retries-1)
189 default:
190 return 0, "", block.Err
191 }
192}
193
194func (om *offsetManager) coordinator() (*Broker, error) {
195 om.brokerLock.RLock()
196 broker := om.broker
197 om.brokerLock.RUnlock()
198
199 if broker != nil {
200 return broker, nil
201 }
202
203 om.brokerLock.Lock()
204 defer om.brokerLock.Unlock()
205
206 if broker := om.broker; broker != nil {
207 return broker, nil
208 }
209
210 if err := om.client.RefreshCoordinator(om.group); err != nil {
211 return nil, err
212 }
213
214 broker, err := om.client.Coordinator(om.group)
215 if err != nil {
216 return nil, err
217 }
218
219 om.broker = broker
220 return broker, nil
221}
222
223func (om *offsetManager) releaseCoordinator(b *Broker) {
224 om.brokerLock.Lock()
225 if om.broker == b {
226 om.broker = nil
227 }
228 om.brokerLock.Unlock()
229}
230
231func (om *offsetManager) mainLoop() {
232 defer om.ticker.Stop()
233 defer close(om.closed)
234
235 for {
236 select {
237 case <-om.ticker.C:
khenaidood948f772021-08-11 17:49:24 -0400238 om.Commit()
khenaidooac637102019-01-14 15:44:34 -0500239 case <-om.closing:
240 return
241 }
242 }
243}
244
khenaidood948f772021-08-11 17:49:24 -0400245func (om *offsetManager) Commit() {
246 om.flushToBroker()
247 om.releasePOMs(false)
248}
249
khenaidooac637102019-01-14 15:44:34 -0500250func (om *offsetManager) flushToBroker() {
251 req := om.constructRequest()
252 if req == nil {
253 return
254 }
255
256 broker, err := om.coordinator()
257 if err != nil {
258 om.handleError(err)
259 return
260 }
261
262 resp, err := broker.CommitOffset(req)
263 if err != nil {
264 om.handleError(err)
265 om.releaseCoordinator(broker)
266 _ = broker.Close()
267 return
268 }
269
270 om.handleResponse(broker, req, resp)
271}
272
273func (om *offsetManager) constructRequest() *OffsetCommitRequest {
274 var r *OffsetCommitRequest
275 var perPartitionTimestamp int64
276 if om.conf.Consumer.Offsets.Retention == 0 {
277 perPartitionTimestamp = ReceiveTime
278 r = &OffsetCommitRequest{
279 Version: 1,
280 ConsumerGroup: om.group,
281 ConsumerID: om.memberID,
282 ConsumerGroupGeneration: om.generation,
283 }
284 } else {
285 r = &OffsetCommitRequest{
286 Version: 2,
287 RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
288 ConsumerGroup: om.group,
289 ConsumerID: om.memberID,
290 ConsumerGroupGeneration: om.generation,
291 }
khenaidooac637102019-01-14 15:44:34 -0500292 }
293
294 om.pomsLock.RLock()
295 defer om.pomsLock.RUnlock()
296
297 for _, topicManagers := range om.poms {
298 for _, pom := range topicManagers {
299 pom.lock.Lock()
300 if pom.dirty {
301 r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
302 }
303 pom.lock.Unlock()
304 }
305 }
306
307 if len(r.blocks) > 0 {
308 return r
309 }
310
311 return nil
312}
313
314func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
315 om.pomsLock.RLock()
316 defer om.pomsLock.RUnlock()
317
318 for _, topicManagers := range om.poms {
319 for _, pom := range topicManagers {
320 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
321 continue
322 }
323
324 var err KError
325 var ok bool
326
327 if resp.Errors[pom.topic] == nil {
328 pom.handleError(ErrIncompleteResponse)
329 continue
330 }
331 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
332 pom.handleError(ErrIncompleteResponse)
333 continue
334 }
335
336 switch err {
337 case ErrNoError:
338 block := req.blocks[pom.topic][pom.partition]
339 pom.updateCommitted(block.offset, block.metadata)
340 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
341 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
342 // not a critical error, we just need to redispatch
343 om.releaseCoordinator(broker)
344 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
345 // nothing we can do about this, just tell the user and carry on
346 pom.handleError(err)
347 case ErrOffsetsLoadInProgress:
348 // nothing wrong but we didn't commit, we'll get it next time round
khenaidooac637102019-01-14 15:44:34 -0500349 case ErrUnknownTopicOrPartition:
350 // let the user know *and* try redispatching - if topic-auto-create is
351 // enabled, redispatching should trigger a metadata req and create the
352 // topic; if not then re-dispatching won't help, but we've let the user
353 // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
354 fallthrough
355 default:
356 // dunno, tell the user and try redispatching
357 pom.handleError(err)
358 om.releaseCoordinator(broker)
359 }
360 }
361 }
362}
363
364func (om *offsetManager) handleError(err error) {
365 om.pomsLock.RLock()
366 defer om.pomsLock.RUnlock()
367
368 for _, topicManagers := range om.poms {
369 for _, pom := range topicManagers {
370 pom.handleError(err)
371 }
372 }
373}
374
375func (om *offsetManager) asyncClosePOMs() {
376 om.pomsLock.RLock()
377 defer om.pomsLock.RUnlock()
378
379 for _, topicManagers := range om.poms {
380 for _, pom := range topicManagers {
381 pom.AsyncClose()
382 }
383 }
384}
385
386// Releases/removes closed POMs once they are clean (or when forced)
387func (om *offsetManager) releasePOMs(force bool) (remaining int) {
388 om.pomsLock.Lock()
389 defer om.pomsLock.Unlock()
390
391 for topic, topicManagers := range om.poms {
392 for partition, pom := range topicManagers {
393 pom.lock.Lock()
394 releaseDue := pom.done && (force || !pom.dirty)
395 pom.lock.Unlock()
396
397 if releaseDue {
398 pom.release()
399
400 delete(om.poms[topic], partition)
401 if len(om.poms[topic]) == 0 {
402 delete(om.poms, topic)
403 }
404 }
405 }
406 remaining += len(om.poms[topic])
407 }
408 return
409}
410
411func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
412 om.pomsLock.RLock()
413 defer om.pomsLock.RUnlock()
414
415 if partitions, ok := om.poms[topic]; ok {
416 if pom, ok := partitions[partition]; ok {
417 return pom
418 }
419 }
420 return nil
421}
422
423// Partition Offset Manager
424
425// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
426// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
427// out of scope.
428type PartitionOffsetManager interface {
429 // NextOffset returns the next offset that should be consumed for the managed
430 // partition, accompanied by metadata which can be used to reconstruct the state
431 // of the partition consumer when it resumes. NextOffset() will return
432 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
433 // was committed for this partition yet.
434 NextOffset() (int64, string)
435
436 // MarkOffset marks the provided offset, alongside a metadata string
437 // that represents the state of the partition consumer at that point in time. The
438 // metadata string can be used by another consumer to restore that state, so it
439 // can resume consumption.
440 //
441 // To follow upstream conventions, you are expected to mark the offset of the
442 // next message to read, not the last message read. Thus, when calling `MarkOffset`
443 // you should typically add one to the offset of the last consumed message.
444 //
445 // Note: calling MarkOffset does not necessarily commit the offset to the backend
446 // store immediately for efficiency reasons, and it may never be committed if
447 // your application crashes. This means that you may end up processing the same
448 // message twice, and your processing should ideally be idempotent.
449 MarkOffset(offset int64, metadata string)
450
451 // ResetOffset resets to the provided offset, alongside a metadata string that
452 // represents the state of the partition consumer at that point in time. Reset
453 // acts as a counterpart to MarkOffset, the difference being that it allows to
454 // reset an offset to an earlier or smaller value, where MarkOffset only
455 // allows incrementing the offset. cf MarkOffset for more details.
456 ResetOffset(offset int64, metadata string)
457
458 // Errors returns a read channel of errors that occur during offset management, if
459 // enabled. By default, errors are logged and not returned over this channel. If
460 // you want to implement any custom error handling, set your config's
461 // Consumer.Return.Errors setting to true, and read from this channel.
462 Errors() <-chan *ConsumerError
463
464 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
465 // return immediately, after which you should wait until the 'errors' channel has
466 // been drained and closed. It is required to call this function, or Close before
467 // a consumer object passes out of scope, as it will otherwise leak memory. You
468 // must call this before calling Close on the underlying client.
469 AsyncClose()
470
471 // Close stops the PartitionOffsetManager from managing offsets. It is required to
472 // call this function (or AsyncClose) before a PartitionOffsetManager object
473 // passes out of scope, as it will otherwise leak memory. You must call this
474 // before calling Close on the underlying client.
475 Close() error
476}
477
478type partitionOffsetManager struct {
479 parent *offsetManager
480 topic string
481 partition int32
482
483 lock sync.Mutex
484 offset int64
485 metadata string
486 dirty bool
487 done bool
488
489 releaseOnce sync.Once
490 errors chan *ConsumerError
491}
492
493func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
494 offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
495 if err != nil {
496 return nil, err
497 }
498
499 return &partitionOffsetManager{
500 parent: om,
501 topic: topic,
502 partition: partition,
503 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
504 offset: offset,
505 metadata: metadata,
506 }, nil
507}
508
509func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
510 return pom.errors
511}
512
513func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
514 pom.lock.Lock()
515 defer pom.lock.Unlock()
516
517 if offset > pom.offset {
518 pom.offset = offset
519 pom.metadata = metadata
520 pom.dirty = true
521 }
522}
523
524func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
525 pom.lock.Lock()
526 defer pom.lock.Unlock()
527
528 if offset <= pom.offset {
529 pom.offset = offset
530 pom.metadata = metadata
531 pom.dirty = true
532 }
533}
534
535func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
536 pom.lock.Lock()
537 defer pom.lock.Unlock()
538
539 if pom.offset == offset && pom.metadata == metadata {
540 pom.dirty = false
541 }
542}
543
544func (pom *partitionOffsetManager) NextOffset() (int64, string) {
545 pom.lock.Lock()
546 defer pom.lock.Unlock()
547
548 if pom.offset >= 0 {
549 return pom.offset, pom.metadata
550 }
551
552 return pom.parent.conf.Consumer.Offsets.Initial, ""
553}
554
555func (pom *partitionOffsetManager) AsyncClose() {
556 pom.lock.Lock()
557 pom.done = true
558 pom.lock.Unlock()
559}
560
561func (pom *partitionOffsetManager) Close() error {
562 pom.AsyncClose()
563
564 var errors ConsumerErrors
565 for err := range pom.errors {
566 errors = append(errors, err)
567 }
568
569 if len(errors) > 0 {
570 return errors
571 }
572 return nil
573}
574
575func (pom *partitionOffsetManager) handleError(err error) {
576 cErr := &ConsumerError{
577 Topic: pom.topic,
578 Partition: pom.partition,
579 Err: err,
580 }
581
582 if pom.parent.conf.Consumer.Return.Errors {
583 pom.errors <- cErr
584 } else {
585 Logger.Println(cErr)
586 }
587}
588
589func (pom *partitionOffsetManager) release() {
590 pom.releaseOnce.Do(func() {
Scott Baker8461e152019-10-01 14:44:30 -0700591 close(pom.errors)
khenaidooac637102019-01-14 15:44:34 -0500592 })
593}