blob: 2432f7bc4deadda1f8b3f5921bf10a4184fe1902 [file] [log] [blame]
William Kurkianea869482019-04-09 15:16:11 -04001package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
22}
23
24type offsetManager struct {
25 client Client
26 conf *Config
27 group string
28 ticker *time.Ticker
29
30 memberID string
31 generation int32
32
33 broker *Broker
34 brokerLock sync.RWMutex
35
36 poms map[string]map[int32]*partitionOffsetManager
37 pomsLock sync.RWMutex
38
39 closeOnce sync.Once
40 closing chan none
41 closed chan none
42}
43
44// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
45// It is still necessary to call Close() on the underlying client when finished with the partition manager.
46func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
47 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
48}
49
50func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
51 // Check that we are not dealing with a closed Client before processing any other arguments
52 if client.Closed() {
53 return nil, ErrClosedClient
54 }
55
56 conf := client.Config()
57 om := &offsetManager{
58 client: client,
59 conf: conf,
60 group: group,
61 ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
62 poms: make(map[string]map[int32]*partitionOffsetManager),
63
64 memberID: memberID,
65 generation: generation,
66
67 closing: make(chan none),
68 closed: make(chan none),
69 }
70 go withRecover(om.mainLoop)
71
72 return om, nil
73}
74
75func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
76 pom, err := om.newPartitionOffsetManager(topic, partition)
77 if err != nil {
78 return nil, err
79 }
80
81 om.pomsLock.Lock()
82 defer om.pomsLock.Unlock()
83
84 topicManagers := om.poms[topic]
85 if topicManagers == nil {
86 topicManagers = make(map[int32]*partitionOffsetManager)
87 om.poms[topic] = topicManagers
88 }
89
90 if topicManagers[partition] != nil {
91 return nil, ConfigurationError("That topic/partition is already being managed")
92 }
93
94 topicManagers[partition] = pom
95 return pom, nil
96}
97
98func (om *offsetManager) Close() error {
99 om.closeOnce.Do(func() {
100 // exit the mainLoop
101 close(om.closing)
102 <-om.closed
103
104 // mark all POMs as closed
105 om.asyncClosePOMs()
106
107 // flush one last time
108 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
109 om.flushToBroker()
110 if om.releasePOMs(false) == 0 {
111 break
112 }
113 }
114
115 om.releasePOMs(true)
116 om.brokerLock.Lock()
117 om.broker = nil
118 om.brokerLock.Unlock()
119 })
120 return nil
121}
122
123func (om *offsetManager) computeBackoff(retries int) time.Duration {
124 if om.conf.Metadata.Retry.BackoffFunc != nil {
125 return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
126 } else {
127 return om.conf.Metadata.Retry.Backoff
128 }
129}
130
131func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
132 broker, err := om.coordinator()
133 if err != nil {
134 if retries <= 0 {
135 return 0, "", err
136 }
137 return om.fetchInitialOffset(topic, partition, retries-1)
138 }
139
140 req := new(OffsetFetchRequest)
141 req.Version = 1
142 req.ConsumerGroup = om.group
143 req.AddPartition(topic, partition)
144
145 resp, err := broker.FetchOffset(req)
146 if err != nil {
147 if retries <= 0 {
148 return 0, "", err
149 }
150 om.releaseCoordinator(broker)
151 return om.fetchInitialOffset(topic, partition, retries-1)
152 }
153
154 block := resp.GetBlock(topic, partition)
155 if block == nil {
156 return 0, "", ErrIncompleteResponse
157 }
158
159 switch block.Err {
160 case ErrNoError:
161 return block.Offset, block.Metadata, nil
162 case ErrNotCoordinatorForConsumer:
163 if retries <= 0 {
164 return 0, "", block.Err
165 }
166 om.releaseCoordinator(broker)
167 return om.fetchInitialOffset(topic, partition, retries-1)
168 case ErrOffsetsLoadInProgress:
169 if retries <= 0 {
170 return 0, "", block.Err
171 }
172 backoff := om.computeBackoff(retries)
173 select {
174 case <-om.closing:
175 return 0, "", block.Err
176 case <-time.After(backoff):
177 }
178 return om.fetchInitialOffset(topic, partition, retries-1)
179 default:
180 return 0, "", block.Err
181 }
182}
183
184func (om *offsetManager) coordinator() (*Broker, error) {
185 om.brokerLock.RLock()
186 broker := om.broker
187 om.brokerLock.RUnlock()
188
189 if broker != nil {
190 return broker, nil
191 }
192
193 om.brokerLock.Lock()
194 defer om.brokerLock.Unlock()
195
196 if broker := om.broker; broker != nil {
197 return broker, nil
198 }
199
200 if err := om.client.RefreshCoordinator(om.group); err != nil {
201 return nil, err
202 }
203
204 broker, err := om.client.Coordinator(om.group)
205 if err != nil {
206 return nil, err
207 }
208
209 om.broker = broker
210 return broker, nil
211}
212
213func (om *offsetManager) releaseCoordinator(b *Broker) {
214 om.brokerLock.Lock()
215 if om.broker == b {
216 om.broker = nil
217 }
218 om.brokerLock.Unlock()
219}
220
221func (om *offsetManager) mainLoop() {
222 defer om.ticker.Stop()
223 defer close(om.closed)
224
225 for {
226 select {
227 case <-om.ticker.C:
228 om.flushToBroker()
229 om.releasePOMs(false)
230 case <-om.closing:
231 return
232 }
233 }
234}
235
236func (om *offsetManager) flushToBroker() {
237 req := om.constructRequest()
238 if req == nil {
239 return
240 }
241
242 broker, err := om.coordinator()
243 if err != nil {
244 om.handleError(err)
245 return
246 }
247
248 resp, err := broker.CommitOffset(req)
249 if err != nil {
250 om.handleError(err)
251 om.releaseCoordinator(broker)
252 _ = broker.Close()
253 return
254 }
255
256 om.handleResponse(broker, req, resp)
257}
258
259func (om *offsetManager) constructRequest() *OffsetCommitRequest {
260 var r *OffsetCommitRequest
261 var perPartitionTimestamp int64
262 if om.conf.Consumer.Offsets.Retention == 0 {
263 perPartitionTimestamp = ReceiveTime
264 r = &OffsetCommitRequest{
265 Version: 1,
266 ConsumerGroup: om.group,
267 ConsumerID: om.memberID,
268 ConsumerGroupGeneration: om.generation,
269 }
270 } else {
271 r = &OffsetCommitRequest{
272 Version: 2,
273 RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
274 ConsumerGroup: om.group,
275 ConsumerID: om.memberID,
276 ConsumerGroupGeneration: om.generation,
277 }
278
279 }
280
281 om.pomsLock.RLock()
282 defer om.pomsLock.RUnlock()
283
284 for _, topicManagers := range om.poms {
285 for _, pom := range topicManagers {
286 pom.lock.Lock()
287 if pom.dirty {
288 r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
289 }
290 pom.lock.Unlock()
291 }
292 }
293
294 if len(r.blocks) > 0 {
295 return r
296 }
297
298 return nil
299}
300
301func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
302 om.pomsLock.RLock()
303 defer om.pomsLock.RUnlock()
304
305 for _, topicManagers := range om.poms {
306 for _, pom := range topicManagers {
307 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
308 continue
309 }
310
311 var err KError
312 var ok bool
313
314 if resp.Errors[pom.topic] == nil {
315 pom.handleError(ErrIncompleteResponse)
316 continue
317 }
318 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
319 pom.handleError(ErrIncompleteResponse)
320 continue
321 }
322
323 switch err {
324 case ErrNoError:
325 block := req.blocks[pom.topic][pom.partition]
326 pom.updateCommitted(block.offset, block.metadata)
327 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
328 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
329 // not a critical error, we just need to redispatch
330 om.releaseCoordinator(broker)
331 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
332 // nothing we can do about this, just tell the user and carry on
333 pom.handleError(err)
334 case ErrOffsetsLoadInProgress:
335 // nothing wrong but we didn't commit, we'll get it next time round
336 break
337 case ErrUnknownTopicOrPartition:
338 // let the user know *and* try redispatching - if topic-auto-create is
339 // enabled, redispatching should trigger a metadata req and create the
340 // topic; if not then re-dispatching won't help, but we've let the user
341 // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
342 fallthrough
343 default:
344 // dunno, tell the user and try redispatching
345 pom.handleError(err)
346 om.releaseCoordinator(broker)
347 }
348 }
349 }
350}
351
352func (om *offsetManager) handleError(err error) {
353 om.pomsLock.RLock()
354 defer om.pomsLock.RUnlock()
355
356 for _, topicManagers := range om.poms {
357 for _, pom := range topicManagers {
358 pom.handleError(err)
359 }
360 }
361}
362
363func (om *offsetManager) asyncClosePOMs() {
364 om.pomsLock.RLock()
365 defer om.pomsLock.RUnlock()
366
367 for _, topicManagers := range om.poms {
368 for _, pom := range topicManagers {
369 pom.AsyncClose()
370 }
371 }
372}
373
374// Releases/removes closed POMs once they are clean (or when forced)
375func (om *offsetManager) releasePOMs(force bool) (remaining int) {
376 om.pomsLock.Lock()
377 defer om.pomsLock.Unlock()
378
379 for topic, topicManagers := range om.poms {
380 for partition, pom := range topicManagers {
381 pom.lock.Lock()
382 releaseDue := pom.done && (force || !pom.dirty)
383 pom.lock.Unlock()
384
385 if releaseDue {
386 pom.release()
387
388 delete(om.poms[topic], partition)
389 if len(om.poms[topic]) == 0 {
390 delete(om.poms, topic)
391 }
392 }
393 }
394 remaining += len(om.poms[topic])
395 }
396 return
397}
398
399func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
400 om.pomsLock.RLock()
401 defer om.pomsLock.RUnlock()
402
403 if partitions, ok := om.poms[topic]; ok {
404 if pom, ok := partitions[partition]; ok {
405 return pom
406 }
407 }
408 return nil
409}
410
411// Partition Offset Manager
412
413// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
414// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
415// out of scope.
416type PartitionOffsetManager interface {
417 // NextOffset returns the next offset that should be consumed for the managed
418 // partition, accompanied by metadata which can be used to reconstruct the state
419 // of the partition consumer when it resumes. NextOffset() will return
420 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
421 // was committed for this partition yet.
422 NextOffset() (int64, string)
423
424 // MarkOffset marks the provided offset, alongside a metadata string
425 // that represents the state of the partition consumer at that point in time. The
426 // metadata string can be used by another consumer to restore that state, so it
427 // can resume consumption.
428 //
429 // To follow upstream conventions, you are expected to mark the offset of the
430 // next message to read, not the last message read. Thus, when calling `MarkOffset`
431 // you should typically add one to the offset of the last consumed message.
432 //
433 // Note: calling MarkOffset does not necessarily commit the offset to the backend
434 // store immediately for efficiency reasons, and it may never be committed if
435 // your application crashes. This means that you may end up processing the same
436 // message twice, and your processing should ideally be idempotent.
437 MarkOffset(offset int64, metadata string)
438
439 // ResetOffset resets to the provided offset, alongside a metadata string that
440 // represents the state of the partition consumer at that point in time. Reset
441 // acts as a counterpart to MarkOffset, the difference being that it allows to
442 // reset an offset to an earlier or smaller value, where MarkOffset only
443 // allows incrementing the offset. cf MarkOffset for more details.
444 ResetOffset(offset int64, metadata string)
445
446 // Errors returns a read channel of errors that occur during offset management, if
447 // enabled. By default, errors are logged and not returned over this channel. If
448 // you want to implement any custom error handling, set your config's
449 // Consumer.Return.Errors setting to true, and read from this channel.
450 Errors() <-chan *ConsumerError
451
452 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
453 // return immediately, after which you should wait until the 'errors' channel has
454 // been drained and closed. It is required to call this function, or Close before
455 // a consumer object passes out of scope, as it will otherwise leak memory. You
456 // must call this before calling Close on the underlying client.
457 AsyncClose()
458
459 // Close stops the PartitionOffsetManager from managing offsets. It is required to
460 // call this function (or AsyncClose) before a PartitionOffsetManager object
461 // passes out of scope, as it will otherwise leak memory. You must call this
462 // before calling Close on the underlying client.
463 Close() error
464}
465
466type partitionOffsetManager struct {
467 parent *offsetManager
468 topic string
469 partition int32
470
471 lock sync.Mutex
472 offset int64
473 metadata string
474 dirty bool
475 done bool
476
477 releaseOnce sync.Once
478 errors chan *ConsumerError
479}
480
481func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
482 offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
483 if err != nil {
484 return nil, err
485 }
486
487 return &partitionOffsetManager{
488 parent: om,
489 topic: topic,
490 partition: partition,
491 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
492 offset: offset,
493 metadata: metadata,
494 }, nil
495}
496
497func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
498 return pom.errors
499}
500
501func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
502 pom.lock.Lock()
503 defer pom.lock.Unlock()
504
505 if offset > pom.offset {
506 pom.offset = offset
507 pom.metadata = metadata
508 pom.dirty = true
509 }
510}
511
512func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
513 pom.lock.Lock()
514 defer pom.lock.Unlock()
515
516 if offset <= pom.offset {
517 pom.offset = offset
518 pom.metadata = metadata
519 pom.dirty = true
520 }
521}
522
523func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
524 pom.lock.Lock()
525 defer pom.lock.Unlock()
526
527 if pom.offset == offset && pom.metadata == metadata {
528 pom.dirty = false
529 }
530}
531
532func (pom *partitionOffsetManager) NextOffset() (int64, string) {
533 pom.lock.Lock()
534 defer pom.lock.Unlock()
535
536 if pom.offset >= 0 {
537 return pom.offset, pom.metadata
538 }
539
540 return pom.parent.conf.Consumer.Offsets.Initial, ""
541}
542
543func (pom *partitionOffsetManager) AsyncClose() {
544 pom.lock.Lock()
545 pom.done = true
546 pom.lock.Unlock()
547}
548
549func (pom *partitionOffsetManager) Close() error {
550 pom.AsyncClose()
551
552 var errors ConsumerErrors
553 for err := range pom.errors {
554 errors = append(errors, err)
555 }
556
557 if len(errors) > 0 {
558 return errors
559 }
560 return nil
561}
562
563func (pom *partitionOffsetManager) handleError(err error) {
564 cErr := &ConsumerError{
565 Topic: pom.topic,
566 Partition: pom.partition,
567 Err: err,
568 }
569
570 if pom.parent.conf.Consumer.Return.Errors {
571 pom.errors <- cErr
572 } else {
573 Logger.Println(cErr)
574 }
575}
576
577func (pom *partitionOffsetManager) release() {
578 pom.releaseOnce.Do(func() {
579 go close(pom.errors)
580 })
581}