blob: 923972f2690169e69ca06b87930bbd68ea33ff79 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
22}
23
24type offsetManager struct {
25 client Client
26 conf *Config
27 group string
28 ticker *time.Ticker
29
30 memberID string
31 generation int32
32
33 broker *Broker
34 brokerLock sync.RWMutex
35
36 poms map[string]map[int32]*partitionOffsetManager
37 pomsLock sync.RWMutex
38
39 closeOnce sync.Once
40 closing chan none
41 closed chan none
42}
43
44// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
45// It is still necessary to call Close() on the underlying client when finished with the partition manager.
46func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
47 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
48}
49
50func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
51 // Check that we are not dealing with a closed Client before processing any other arguments
52 if client.Closed() {
53 return nil, ErrClosedClient
54 }
55
56 conf := client.Config()
57 om := &offsetManager{
58 client: client,
59 conf: conf,
60 group: group,
61 ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
62 poms: make(map[string]map[int32]*partitionOffsetManager),
63
64 memberID: memberID,
65 generation: generation,
66
67 closing: make(chan none),
68 closed: make(chan none),
69 }
70 go withRecover(om.mainLoop)
71
72 return om, nil
73}
74
75func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
76 pom, err := om.newPartitionOffsetManager(topic, partition)
77 if err != nil {
78 return nil, err
79 }
80
81 om.pomsLock.Lock()
82 defer om.pomsLock.Unlock()
83
84 topicManagers := om.poms[topic]
85 if topicManagers == nil {
86 topicManagers = make(map[int32]*partitionOffsetManager)
87 om.poms[topic] = topicManagers
88 }
89
90 if topicManagers[partition] != nil {
91 return nil, ConfigurationError("That topic/partition is already being managed")
92 }
93
94 topicManagers[partition] = pom
95 return pom, nil
96}
97
98func (om *offsetManager) Close() error {
99 om.closeOnce.Do(func() {
100 // exit the mainLoop
101 close(om.closing)
102 <-om.closed
103
104 // mark all POMs as closed
105 om.asyncClosePOMs()
106
107 // flush one last time
108 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
109 om.flushToBroker()
110 if om.releasePOMs(false) == 0 {
111 break
112 }
113 }
114
115 om.releasePOMs(true)
116 om.brokerLock.Lock()
117 om.broker = nil
118 om.brokerLock.Unlock()
119 })
120 return nil
121}
122
William Kurkiandaa6bb22019-03-07 12:26:28 -0500123func (om *offsetManager) computeBackoff(retries int) time.Duration {
124 if om.conf.Metadata.Retry.BackoffFunc != nil {
125 return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
126 } else {
127 return om.conf.Metadata.Retry.Backoff
128 }
129}
130
khenaidooac637102019-01-14 15:44:34 -0500131func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
132 broker, err := om.coordinator()
133 if err != nil {
134 if retries <= 0 {
135 return 0, "", err
136 }
137 return om.fetchInitialOffset(topic, partition, retries-1)
138 }
139
140 req := new(OffsetFetchRequest)
141 req.Version = 1
142 req.ConsumerGroup = om.group
143 req.AddPartition(topic, partition)
144
145 resp, err := broker.FetchOffset(req)
146 if err != nil {
147 if retries <= 0 {
148 return 0, "", err
149 }
150 om.releaseCoordinator(broker)
151 return om.fetchInitialOffset(topic, partition, retries-1)
152 }
153
154 block := resp.GetBlock(topic, partition)
155 if block == nil {
156 return 0, "", ErrIncompleteResponse
157 }
158
159 switch block.Err {
160 case ErrNoError:
161 return block.Offset, block.Metadata, nil
162 case ErrNotCoordinatorForConsumer:
163 if retries <= 0 {
164 return 0, "", block.Err
165 }
166 om.releaseCoordinator(broker)
167 return om.fetchInitialOffset(topic, partition, retries-1)
168 case ErrOffsetsLoadInProgress:
169 if retries <= 0 {
170 return 0, "", block.Err
171 }
William Kurkiandaa6bb22019-03-07 12:26:28 -0500172 backoff := om.computeBackoff(retries)
khenaidooac637102019-01-14 15:44:34 -0500173 select {
174 case <-om.closing:
175 return 0, "", block.Err
William Kurkiandaa6bb22019-03-07 12:26:28 -0500176 case <-time.After(backoff):
khenaidooac637102019-01-14 15:44:34 -0500177 }
178 return om.fetchInitialOffset(topic, partition, retries-1)
179 default:
180 return 0, "", block.Err
181 }
182}
183
184func (om *offsetManager) coordinator() (*Broker, error) {
185 om.brokerLock.RLock()
186 broker := om.broker
187 om.brokerLock.RUnlock()
188
189 if broker != nil {
190 return broker, nil
191 }
192
193 om.brokerLock.Lock()
194 defer om.brokerLock.Unlock()
195
196 if broker := om.broker; broker != nil {
197 return broker, nil
198 }
199
200 if err := om.client.RefreshCoordinator(om.group); err != nil {
201 return nil, err
202 }
203
204 broker, err := om.client.Coordinator(om.group)
205 if err != nil {
206 return nil, err
207 }
208
209 om.broker = broker
210 return broker, nil
211}
212
213func (om *offsetManager) releaseCoordinator(b *Broker) {
214 om.brokerLock.Lock()
215 if om.broker == b {
216 om.broker = nil
217 }
218 om.brokerLock.Unlock()
219}
220
221func (om *offsetManager) mainLoop() {
222 defer om.ticker.Stop()
223 defer close(om.closed)
224
225 for {
226 select {
227 case <-om.ticker.C:
228 om.flushToBroker()
229 om.releasePOMs(false)
230 case <-om.closing:
231 return
232 }
233 }
234}
235
236func (om *offsetManager) flushToBroker() {
237 req := om.constructRequest()
238 if req == nil {
239 return
240 }
241
242 broker, err := om.coordinator()
243 if err != nil {
244 om.handleError(err)
245 return
246 }
247
248 resp, err := broker.CommitOffset(req)
249 if err != nil {
250 om.handleError(err)
251 om.releaseCoordinator(broker)
252 _ = broker.Close()
253 return
254 }
255
256 om.handleResponse(broker, req, resp)
257}
258
259func (om *offsetManager) constructRequest() *OffsetCommitRequest {
260 var r *OffsetCommitRequest
261 var perPartitionTimestamp int64
262 if om.conf.Consumer.Offsets.Retention == 0 {
263 perPartitionTimestamp = ReceiveTime
264 r = &OffsetCommitRequest{
265 Version: 1,
266 ConsumerGroup: om.group,
267 ConsumerID: om.memberID,
268 ConsumerGroupGeneration: om.generation,
269 }
270 } else {
271 r = &OffsetCommitRequest{
272 Version: 2,
273 RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
274 ConsumerGroup: om.group,
275 ConsumerID: om.memberID,
276 ConsumerGroupGeneration: om.generation,
277 }
278
279 }
280
281 om.pomsLock.RLock()
282 defer om.pomsLock.RUnlock()
283
284 for _, topicManagers := range om.poms {
285 for _, pom := range topicManagers {
286 pom.lock.Lock()
287 if pom.dirty {
288 r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
289 }
290 pom.lock.Unlock()
291 }
292 }
293
294 if len(r.blocks) > 0 {
295 return r
296 }
297
298 return nil
299}
300
301func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
302 om.pomsLock.RLock()
303 defer om.pomsLock.RUnlock()
304
305 for _, topicManagers := range om.poms {
306 for _, pom := range topicManagers {
307 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
308 continue
309 }
310
311 var err KError
312 var ok bool
313
314 if resp.Errors[pom.topic] == nil {
315 pom.handleError(ErrIncompleteResponse)
316 continue
317 }
318 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
319 pom.handleError(ErrIncompleteResponse)
320 continue
321 }
322
323 switch err {
324 case ErrNoError:
325 block := req.blocks[pom.topic][pom.partition]
326 pom.updateCommitted(block.offset, block.metadata)
327 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
328 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
329 // not a critical error, we just need to redispatch
330 om.releaseCoordinator(broker)
331 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
332 // nothing we can do about this, just tell the user and carry on
333 pom.handleError(err)
334 case ErrOffsetsLoadInProgress:
335 // nothing wrong but we didn't commit, we'll get it next time round
khenaidooac637102019-01-14 15:44:34 -0500336 case ErrUnknownTopicOrPartition:
337 // let the user know *and* try redispatching - if topic-auto-create is
338 // enabled, redispatching should trigger a metadata req and create the
339 // topic; if not then re-dispatching won't help, but we've let the user
340 // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
341 fallthrough
342 default:
343 // dunno, tell the user and try redispatching
344 pom.handleError(err)
345 om.releaseCoordinator(broker)
346 }
347 }
348 }
349}
350
351func (om *offsetManager) handleError(err error) {
352 om.pomsLock.RLock()
353 defer om.pomsLock.RUnlock()
354
355 for _, topicManagers := range om.poms {
356 for _, pom := range topicManagers {
357 pom.handleError(err)
358 }
359 }
360}
361
362func (om *offsetManager) asyncClosePOMs() {
363 om.pomsLock.RLock()
364 defer om.pomsLock.RUnlock()
365
366 for _, topicManagers := range om.poms {
367 for _, pom := range topicManagers {
368 pom.AsyncClose()
369 }
370 }
371}
372
373// Releases/removes closed POMs once they are clean (or when forced)
374func (om *offsetManager) releasePOMs(force bool) (remaining int) {
375 om.pomsLock.Lock()
376 defer om.pomsLock.Unlock()
377
378 for topic, topicManagers := range om.poms {
379 for partition, pom := range topicManagers {
380 pom.lock.Lock()
381 releaseDue := pom.done && (force || !pom.dirty)
382 pom.lock.Unlock()
383
384 if releaseDue {
385 pom.release()
386
387 delete(om.poms[topic], partition)
388 if len(om.poms[topic]) == 0 {
389 delete(om.poms, topic)
390 }
391 }
392 }
393 remaining += len(om.poms[topic])
394 }
395 return
396}
397
398func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
399 om.pomsLock.RLock()
400 defer om.pomsLock.RUnlock()
401
402 if partitions, ok := om.poms[topic]; ok {
403 if pom, ok := partitions[partition]; ok {
404 return pom
405 }
406 }
407 return nil
408}
409
410// Partition Offset Manager
411
412// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
413// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
414// out of scope.
415type PartitionOffsetManager interface {
416 // NextOffset returns the next offset that should be consumed for the managed
417 // partition, accompanied by metadata which can be used to reconstruct the state
418 // of the partition consumer when it resumes. NextOffset() will return
419 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
420 // was committed for this partition yet.
421 NextOffset() (int64, string)
422
423 // MarkOffset marks the provided offset, alongside a metadata string
424 // that represents the state of the partition consumer at that point in time. The
425 // metadata string can be used by another consumer to restore that state, so it
426 // can resume consumption.
427 //
428 // To follow upstream conventions, you are expected to mark the offset of the
429 // next message to read, not the last message read. Thus, when calling `MarkOffset`
430 // you should typically add one to the offset of the last consumed message.
431 //
432 // Note: calling MarkOffset does not necessarily commit the offset to the backend
433 // store immediately for efficiency reasons, and it may never be committed if
434 // your application crashes. This means that you may end up processing the same
435 // message twice, and your processing should ideally be idempotent.
436 MarkOffset(offset int64, metadata string)
437
438 // ResetOffset resets to the provided offset, alongside a metadata string that
439 // represents the state of the partition consumer at that point in time. Reset
440 // acts as a counterpart to MarkOffset, the difference being that it allows to
441 // reset an offset to an earlier or smaller value, where MarkOffset only
442 // allows incrementing the offset. cf MarkOffset for more details.
443 ResetOffset(offset int64, metadata string)
444
445 // Errors returns a read channel of errors that occur during offset management, if
446 // enabled. By default, errors are logged and not returned over this channel. If
447 // you want to implement any custom error handling, set your config's
448 // Consumer.Return.Errors setting to true, and read from this channel.
449 Errors() <-chan *ConsumerError
450
451 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
452 // return immediately, after which you should wait until the 'errors' channel has
453 // been drained and closed. It is required to call this function, or Close before
454 // a consumer object passes out of scope, as it will otherwise leak memory. You
455 // must call this before calling Close on the underlying client.
456 AsyncClose()
457
458 // Close stops the PartitionOffsetManager from managing offsets. It is required to
459 // call this function (or AsyncClose) before a PartitionOffsetManager object
460 // passes out of scope, as it will otherwise leak memory. You must call this
461 // before calling Close on the underlying client.
462 Close() error
463}
464
465type partitionOffsetManager struct {
466 parent *offsetManager
467 topic string
468 partition int32
469
470 lock sync.Mutex
471 offset int64
472 metadata string
473 dirty bool
474 done bool
475
476 releaseOnce sync.Once
477 errors chan *ConsumerError
478}
479
480func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
481 offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
482 if err != nil {
483 return nil, err
484 }
485
486 return &partitionOffsetManager{
487 parent: om,
488 topic: topic,
489 partition: partition,
490 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
491 offset: offset,
492 metadata: metadata,
493 }, nil
494}
495
496func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
497 return pom.errors
498}
499
500func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
501 pom.lock.Lock()
502 defer pom.lock.Unlock()
503
504 if offset > pom.offset {
505 pom.offset = offset
506 pom.metadata = metadata
507 pom.dirty = true
508 }
509}
510
511func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
512 pom.lock.Lock()
513 defer pom.lock.Unlock()
514
515 if offset <= pom.offset {
516 pom.offset = offset
517 pom.metadata = metadata
518 pom.dirty = true
519 }
520}
521
522func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
523 pom.lock.Lock()
524 defer pom.lock.Unlock()
525
526 if pom.offset == offset && pom.metadata == metadata {
527 pom.dirty = false
528 }
529}
530
531func (pom *partitionOffsetManager) NextOffset() (int64, string) {
532 pom.lock.Lock()
533 defer pom.lock.Unlock()
534
535 if pom.offset >= 0 {
536 return pom.offset, pom.metadata
537 }
538
539 return pom.parent.conf.Consumer.Offsets.Initial, ""
540}
541
542func (pom *partitionOffsetManager) AsyncClose() {
543 pom.lock.Lock()
544 pom.done = true
545 pom.lock.Unlock()
546}
547
548func (pom *partitionOffsetManager) Close() error {
549 pom.AsyncClose()
550
551 var errors ConsumerErrors
552 for err := range pom.errors {
553 errors = append(errors, err)
554 }
555
556 if len(errors) > 0 {
557 return errors
558 }
559 return nil
560}
561
562func (pom *partitionOffsetManager) handleError(err error) {
563 cErr := &ConsumerError{
564 Topic: pom.topic,
565 Partition: pom.partition,
566 Err: err,
567 }
568
569 if pom.parent.conf.Consumer.Return.Errors {
570 pom.errors <- cErr
571 } else {
572 Logger.Println(cErr)
573 }
574}
575
576func (pom *partitionOffsetManager) release() {
577 pom.releaseOnce.Do(func() {
Scott Baker8461e152019-10-01 14:44:30 -0700578 close(pom.errors)
khenaidooac637102019-01-14 15:44:34 -0500579 })
580}