blob: e40f429677d2585ce28815d53badc3efdacfaa18 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
22}
23
24type offsetManager struct {
25 client Client
26 conf *Config
27 group string
28 ticker *time.Ticker
29
30 memberID string
31 generation int32
32
33 broker *Broker
34 brokerLock sync.RWMutex
35
36 poms map[string]map[int32]*partitionOffsetManager
37 pomsLock sync.RWMutex
38
39 closeOnce sync.Once
40 closing chan none
41 closed chan none
42}
43
44// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
45// It is still necessary to call Close() on the underlying client when finished with the partition manager.
46func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
47 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
48}
49
50func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
51 // Check that we are not dealing with a closed Client before processing any other arguments
52 if client.Closed() {
53 return nil, ErrClosedClient
54 }
55
56 conf := client.Config()
57 om := &offsetManager{
58 client: client,
59 conf: conf,
60 group: group,
61 ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
62 poms: make(map[string]map[int32]*partitionOffsetManager),
63
64 memberID: memberID,
65 generation: generation,
66
67 closing: make(chan none),
68 closed: make(chan none),
69 }
70 go withRecover(om.mainLoop)
71
72 return om, nil
73}
74
75func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
76 pom, err := om.newPartitionOffsetManager(topic, partition)
77 if err != nil {
78 return nil, err
79 }
80
81 om.pomsLock.Lock()
82 defer om.pomsLock.Unlock()
83
84 topicManagers := om.poms[topic]
85 if topicManagers == nil {
86 topicManagers = make(map[int32]*partitionOffsetManager)
87 om.poms[topic] = topicManagers
88 }
89
90 if topicManagers[partition] != nil {
91 return nil, ConfigurationError("That topic/partition is already being managed")
92 }
93
94 topicManagers[partition] = pom
95 return pom, nil
96}
97
98func (om *offsetManager) Close() error {
99 om.closeOnce.Do(func() {
100 // exit the mainLoop
101 close(om.closing)
102 <-om.closed
103
104 // mark all POMs as closed
105 om.asyncClosePOMs()
106
107 // flush one last time
108 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
109 om.flushToBroker()
110 if om.releasePOMs(false) == 0 {
111 break
112 }
113 }
114
115 om.releasePOMs(true)
116 om.brokerLock.Lock()
117 om.broker = nil
118 om.brokerLock.Unlock()
119 })
120 return nil
121}
122
123func (om *offsetManager) computeBackoff(retries int) time.Duration {
124 if om.conf.Metadata.Retry.BackoffFunc != nil {
125 return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
126 } else {
127 return om.conf.Metadata.Retry.Backoff
128 }
129}
130
131func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
132 broker, err := om.coordinator()
133 if err != nil {
134 if retries <= 0 {
135 return 0, "", err
136 }
137 return om.fetchInitialOffset(topic, partition, retries-1)
138 }
139
140 req := new(OffsetFetchRequest)
141 req.Version = 1
142 req.ConsumerGroup = om.group
143 req.AddPartition(topic, partition)
144
145 resp, err := broker.FetchOffset(req)
146 if err != nil {
147 if retries <= 0 {
148 return 0, "", err
149 }
150 om.releaseCoordinator(broker)
151 return om.fetchInitialOffset(topic, partition, retries-1)
152 }
153
154 block := resp.GetBlock(topic, partition)
155 if block == nil {
156 return 0, "", ErrIncompleteResponse
157 }
158
159 switch block.Err {
160 case ErrNoError:
161 return block.Offset, block.Metadata, nil
162 case ErrNotCoordinatorForConsumer:
163 if retries <= 0 {
164 return 0, "", block.Err
165 }
166 om.releaseCoordinator(broker)
167 return om.fetchInitialOffset(topic, partition, retries-1)
168 case ErrOffsetsLoadInProgress:
169 if retries <= 0 {
170 return 0, "", block.Err
171 }
172 backoff := om.computeBackoff(retries)
173 select {
174 case <-om.closing:
175 return 0, "", block.Err
176 case <-time.After(backoff):
177 }
178 return om.fetchInitialOffset(topic, partition, retries-1)
179 default:
180 return 0, "", block.Err
181 }
182}
183
184func (om *offsetManager) coordinator() (*Broker, error) {
185 om.brokerLock.RLock()
186 broker := om.broker
187 om.brokerLock.RUnlock()
188
189 if broker != nil {
190 return broker, nil
191 }
192
193 om.brokerLock.Lock()
194 defer om.brokerLock.Unlock()
195
196 if broker := om.broker; broker != nil {
197 return broker, nil
198 }
199
200 if err := om.client.RefreshCoordinator(om.group); err != nil {
201 return nil, err
202 }
203
204 broker, err := om.client.Coordinator(om.group)
205 if err != nil {
206 return nil, err
207 }
208
209 om.broker = broker
210 return broker, nil
211}
212
213func (om *offsetManager) releaseCoordinator(b *Broker) {
214 om.brokerLock.Lock()
215 if om.broker == b {
216 om.broker = nil
217 }
218 om.brokerLock.Unlock()
219}
220
221func (om *offsetManager) mainLoop() {
222 defer om.ticker.Stop()
223 defer close(om.closed)
224
225 for {
226 select {
227 case <-om.ticker.C:
228 om.flushToBroker()
229 om.releasePOMs(false)
230 case <-om.closing:
231 return
232 }
233 }
234}
235
236// flushToBroker is ignored if auto-commit offsets is disabled
237func (om *offsetManager) flushToBroker() {
238 if !om.conf.Consumer.Offsets.AutoCommit.Enable {
239 return
240 }
241
242 req := om.constructRequest()
243 if req == nil {
244 return
245 }
246
247 broker, err := om.coordinator()
248 if err != nil {
249 om.handleError(err)
250 return
251 }
252
253 resp, err := broker.CommitOffset(req)
254 if err != nil {
255 om.handleError(err)
256 om.releaseCoordinator(broker)
257 _ = broker.Close()
258 return
259 }
260
261 om.handleResponse(broker, req, resp)
262}
263
264func (om *offsetManager) constructRequest() *OffsetCommitRequest {
265 var r *OffsetCommitRequest
266 var perPartitionTimestamp int64
267 if om.conf.Consumer.Offsets.Retention == 0 {
268 perPartitionTimestamp = ReceiveTime
269 r = &OffsetCommitRequest{
270 Version: 1,
271 ConsumerGroup: om.group,
272 ConsumerID: om.memberID,
273 ConsumerGroupGeneration: om.generation,
274 }
275 } else {
276 r = &OffsetCommitRequest{
277 Version: 2,
278 RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
279 ConsumerGroup: om.group,
280 ConsumerID: om.memberID,
281 ConsumerGroupGeneration: om.generation,
282 }
283
284 }
285
286 om.pomsLock.RLock()
287 defer om.pomsLock.RUnlock()
288
289 for _, topicManagers := range om.poms {
290 for _, pom := range topicManagers {
291 pom.lock.Lock()
292 if pom.dirty {
293 r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
294 }
295 pom.lock.Unlock()
296 }
297 }
298
299 if len(r.blocks) > 0 {
300 return r
301 }
302
303 return nil
304}
305
306func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
307 om.pomsLock.RLock()
308 defer om.pomsLock.RUnlock()
309
310 for _, topicManagers := range om.poms {
311 for _, pom := range topicManagers {
312 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
313 continue
314 }
315
316 var err KError
317 var ok bool
318
319 if resp.Errors[pom.topic] == nil {
320 pom.handleError(ErrIncompleteResponse)
321 continue
322 }
323 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
324 pom.handleError(ErrIncompleteResponse)
325 continue
326 }
327
328 switch err {
329 case ErrNoError:
330 block := req.blocks[pom.topic][pom.partition]
331 pom.updateCommitted(block.offset, block.metadata)
332 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
333 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
334 // not a critical error, we just need to redispatch
335 om.releaseCoordinator(broker)
336 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
337 // nothing we can do about this, just tell the user and carry on
338 pom.handleError(err)
339 case ErrOffsetsLoadInProgress:
340 // nothing wrong but we didn't commit, we'll get it next time round
341 case ErrUnknownTopicOrPartition:
342 // let the user know *and* try redispatching - if topic-auto-create is
343 // enabled, redispatching should trigger a metadata req and create the
344 // topic; if not then re-dispatching won't help, but we've let the user
345 // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
346 fallthrough
347 default:
348 // dunno, tell the user and try redispatching
349 pom.handleError(err)
350 om.releaseCoordinator(broker)
351 }
352 }
353 }
354}
355
356func (om *offsetManager) handleError(err error) {
357 om.pomsLock.RLock()
358 defer om.pomsLock.RUnlock()
359
360 for _, topicManagers := range om.poms {
361 for _, pom := range topicManagers {
362 pom.handleError(err)
363 }
364 }
365}
366
367func (om *offsetManager) asyncClosePOMs() {
368 om.pomsLock.RLock()
369 defer om.pomsLock.RUnlock()
370
371 for _, topicManagers := range om.poms {
372 for _, pom := range topicManagers {
373 pom.AsyncClose()
374 }
375 }
376}
377
378// Releases/removes closed POMs once they are clean (or when forced)
379func (om *offsetManager) releasePOMs(force bool) (remaining int) {
380 om.pomsLock.Lock()
381 defer om.pomsLock.Unlock()
382
383 for topic, topicManagers := range om.poms {
384 for partition, pom := range topicManagers {
385 pom.lock.Lock()
386 releaseDue := pom.done && (force || !pom.dirty)
387 pom.lock.Unlock()
388
389 if releaseDue {
390 pom.release()
391
392 delete(om.poms[topic], partition)
393 if len(om.poms[topic]) == 0 {
394 delete(om.poms, topic)
395 }
396 }
397 }
398 remaining += len(om.poms[topic])
399 }
400 return
401}
402
403func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
404 om.pomsLock.RLock()
405 defer om.pomsLock.RUnlock()
406
407 if partitions, ok := om.poms[topic]; ok {
408 if pom, ok := partitions[partition]; ok {
409 return pom
410 }
411 }
412 return nil
413}
414
415// Partition Offset Manager
416
417// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
418// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
419// out of scope.
420type PartitionOffsetManager interface {
421 // NextOffset returns the next offset that should be consumed for the managed
422 // partition, accompanied by metadata which can be used to reconstruct the state
423 // of the partition consumer when it resumes. NextOffset() will return
424 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
425 // was committed for this partition yet.
426 NextOffset() (int64, string)
427
428 // MarkOffset marks the provided offset, alongside a metadata string
429 // that represents the state of the partition consumer at that point in time. The
430 // metadata string can be used by another consumer to restore that state, so it
431 // can resume consumption.
432 //
433 // To follow upstream conventions, you are expected to mark the offset of the
434 // next message to read, not the last message read. Thus, when calling `MarkOffset`
435 // you should typically add one to the offset of the last consumed message.
436 //
437 // Note: calling MarkOffset does not necessarily commit the offset to the backend
438 // store immediately for efficiency reasons, and it may never be committed if
439 // your application crashes. This means that you may end up processing the same
440 // message twice, and your processing should ideally be idempotent.
441 MarkOffset(offset int64, metadata string)
442
443 // ResetOffset resets to the provided offset, alongside a metadata string that
444 // represents the state of the partition consumer at that point in time. Reset
445 // acts as a counterpart to MarkOffset, the difference being that it allows to
446 // reset an offset to an earlier or smaller value, where MarkOffset only
447 // allows incrementing the offset. cf MarkOffset for more details.
448 ResetOffset(offset int64, metadata string)
449
450 // Errors returns a read channel of errors that occur during offset management, if
451 // enabled. By default, errors are logged and not returned over this channel. If
452 // you want to implement any custom error handling, set your config's
453 // Consumer.Return.Errors setting to true, and read from this channel.
454 Errors() <-chan *ConsumerError
455
456 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
457 // return immediately, after which you should wait until the 'errors' channel has
458 // been drained and closed. It is required to call this function, or Close before
459 // a consumer object passes out of scope, as it will otherwise leak memory. You
460 // must call this before calling Close on the underlying client.
461 AsyncClose()
462
463 // Close stops the PartitionOffsetManager from managing offsets. It is required to
464 // call this function (or AsyncClose) before a PartitionOffsetManager object
465 // passes out of scope, as it will otherwise leak memory. You must call this
466 // before calling Close on the underlying client.
467 Close() error
468}
469
470type partitionOffsetManager struct {
471 parent *offsetManager
472 topic string
473 partition int32
474
475 lock sync.Mutex
476 offset int64
477 metadata string
478 dirty bool
479 done bool
480
481 releaseOnce sync.Once
482 errors chan *ConsumerError
483}
484
485func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
486 offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
487 if err != nil {
488 return nil, err
489 }
490
491 return &partitionOffsetManager{
492 parent: om,
493 topic: topic,
494 partition: partition,
495 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
496 offset: offset,
497 metadata: metadata,
498 }, nil
499}
500
501func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
502 return pom.errors
503}
504
505func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
506 pom.lock.Lock()
507 defer pom.lock.Unlock()
508
509 if offset > pom.offset {
510 pom.offset = offset
511 pom.metadata = metadata
512 pom.dirty = true
513 }
514}
515
516func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
517 pom.lock.Lock()
518 defer pom.lock.Unlock()
519
520 if offset <= pom.offset {
521 pom.offset = offset
522 pom.metadata = metadata
523 pom.dirty = true
524 }
525}
526
527func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
528 pom.lock.Lock()
529 defer pom.lock.Unlock()
530
531 if pom.offset == offset && pom.metadata == metadata {
532 pom.dirty = false
533 }
534}
535
536func (pom *partitionOffsetManager) NextOffset() (int64, string) {
537 pom.lock.Lock()
538 defer pom.lock.Unlock()
539
540 if pom.offset >= 0 {
541 return pom.offset, pom.metadata
542 }
543
544 return pom.parent.conf.Consumer.Offsets.Initial, ""
545}
546
547func (pom *partitionOffsetManager) AsyncClose() {
548 pom.lock.Lock()
549 pom.done = true
550 pom.lock.Unlock()
551}
552
553func (pom *partitionOffsetManager) Close() error {
554 pom.AsyncClose()
555
556 var errors ConsumerErrors
557 for err := range pom.errors {
558 errors = append(errors, err)
559 }
560
561 if len(errors) > 0 {
562 return errors
563 }
564 return nil
565}
566
567func (pom *partitionOffsetManager) handleError(err error) {
568 cErr := &ConsumerError{
569 Topic: pom.topic,
570 Partition: pom.partition,
571 Err: err,
572 }
573
574 if pom.parent.conf.Consumer.Return.Errors {
575 pom.errors <- cErr
576 } else {
577 Logger.Println(cErr)
578 }
579}
580
581func (pom *partitionOffsetManager) release() {
582 pom.releaseOnce.Do(func() {
583 close(pom.errors)
584 })
585}