blob: 19408729faa6cfb99d242d83900e440edb22b102 [file] [log] [blame]
Pragya Arya324337e2020-02-20 14:35:08 +05301package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
22}
23
24type offsetManager struct {
25 client Client
26 conf *Config
27 group string
28 ticker *time.Ticker
29
30 memberID string
31 generation int32
32
33 broker *Broker
34 brokerLock sync.RWMutex
35
36 poms map[string]map[int32]*partitionOffsetManager
37 pomsLock sync.RWMutex
38
39 closeOnce sync.Once
40 closing chan none
41 closed chan none
42}
43
44// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
45// It is still necessary to call Close() on the underlying client when finished with the partition manager.
46func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
47 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
48}
49
50func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
51 // Check that we are not dealing with a closed Client before processing any other arguments
52 if client.Closed() {
53 return nil, ErrClosedClient
54 }
55
56 conf := client.Config()
57 om := &offsetManager{
58 client: client,
59 conf: conf,
60 group: group,
61 ticker: time.NewTicker(conf.Consumer.Offsets.AutoCommit.Interval),
62 poms: make(map[string]map[int32]*partitionOffsetManager),
63
64 memberID: memberID,
65 generation: generation,
66
67 closing: make(chan none),
68 closed: make(chan none),
69 }
70 go withRecover(om.mainLoop)
71
72 return om, nil
73}
74
75func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
76 pom, err := om.newPartitionOffsetManager(topic, partition)
77 if err != nil {
78 return nil, err
79 }
80
81 om.pomsLock.Lock()
82 defer om.pomsLock.Unlock()
83
84 topicManagers := om.poms[topic]
85 if topicManagers == nil {
86 topicManagers = make(map[int32]*partitionOffsetManager)
87 om.poms[topic] = topicManagers
88 }
89
90 if topicManagers[partition] != nil {
91 return nil, ConfigurationError("That topic/partition is already being managed")
92 }
93
94 topicManagers[partition] = pom
95 return pom, nil
96}
97
98func (om *offsetManager) Close() error {
99 om.closeOnce.Do(func() {
100 // exit the mainLoop
101 close(om.closing)
102 <-om.closed
103
104 // mark all POMs as closed
105 om.asyncClosePOMs()
106
107 // flush one last time
108 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
109 om.flushToBroker()
110 if om.releasePOMs(false) == 0 {
111 break
112 }
113 }
114
115 om.releasePOMs(true)
116 om.brokerLock.Lock()
117 om.broker = nil
118 om.brokerLock.Unlock()
119 })
120 return nil
121}
122
123func (om *offsetManager) computeBackoff(retries int) time.Duration {
124 if om.conf.Metadata.Retry.BackoffFunc != nil {
125 return om.conf.Metadata.Retry.BackoffFunc(retries, om.conf.Metadata.Retry.Max)
126 } else {
127 return om.conf.Metadata.Retry.Backoff
128 }
129}
130
131func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
132 broker, err := om.coordinator()
133 if err != nil {
134 if retries <= 0 {
135 return 0, "", err
136 }
137 return om.fetchInitialOffset(topic, partition, retries-1)
138 }
139
140 req := new(OffsetFetchRequest)
141 req.Version = 1
142 req.ConsumerGroup = om.group
143 req.AddPartition(topic, partition)
144
145 resp, err := broker.FetchOffset(req)
146 if err != nil {
147 if retries <= 0 {
148 return 0, "", err
149 }
150 om.releaseCoordinator(broker)
151 return om.fetchInitialOffset(topic, partition, retries-1)
152 }
153
154 block := resp.GetBlock(topic, partition)
155 if block == nil {
156 return 0, "", ErrIncompleteResponse
157 }
158
159 switch block.Err {
160 case ErrNoError:
161 return block.Offset, block.Metadata, nil
162 case ErrNotCoordinatorForConsumer:
163 if retries <= 0 {
164 return 0, "", block.Err
165 }
166 om.releaseCoordinator(broker)
167 return om.fetchInitialOffset(topic, partition, retries-1)
168 case ErrOffsetsLoadInProgress:
169 if retries <= 0 {
170 return 0, "", block.Err
171 }
172 backoff := om.computeBackoff(retries)
173 select {
174 case <-om.closing:
175 return 0, "", block.Err
176 case <-time.After(backoff):
177 }
178 return om.fetchInitialOffset(topic, partition, retries-1)
179 default:
180 return 0, "", block.Err
181 }
182}
183
184func (om *offsetManager) coordinator() (*Broker, error) {
185 om.brokerLock.RLock()
186 broker := om.broker
187 om.brokerLock.RUnlock()
188
189 if broker != nil {
190 return broker, nil
191 }
192
193 om.brokerLock.Lock()
194 defer om.brokerLock.Unlock()
195
196 if broker := om.broker; broker != nil {
197 return broker, nil
198 }
199
200 if err := om.client.RefreshCoordinator(om.group); err != nil {
201 return nil, err
202 }
203
204 broker, err := om.client.Coordinator(om.group)
205 if err != nil {
206 return nil, err
207 }
208
209 om.broker = broker
210 return broker, nil
211}
212
213func (om *offsetManager) releaseCoordinator(b *Broker) {
214 om.brokerLock.Lock()
215 if om.broker == b {
216 om.broker = nil
217 }
218 om.brokerLock.Unlock()
219}
220
221func (om *offsetManager) mainLoop() {
222 defer om.ticker.Stop()
223 defer close(om.closed)
224
225 for {
226 select {
227 case <-om.ticker.C:
228 om.flushToBroker()
229 om.releasePOMs(false)
230 case <-om.closing:
231 return
232 }
233 }
234}
235
236// flushToBroker is ignored if auto-commit offsets is disabled
237func (om *offsetManager) flushToBroker() {
238 if !om.conf.Consumer.Offsets.AutoCommit.Enable {
239 return
240 }
241
242 req := om.constructRequest()
243 if req == nil {
244 return
245 }
246
247 broker, err := om.coordinator()
248 if err != nil {
249 om.handleError(err)
250 return
251 }
252
253 resp, err := broker.CommitOffset(req)
254 if err != nil {
255 om.handleError(err)
256 om.releaseCoordinator(broker)
257 _ = broker.Close()
258 return
259 }
260
261 om.handleResponse(broker, req, resp)
262}
263
264func (om *offsetManager) constructRequest() *OffsetCommitRequest {
265 var r *OffsetCommitRequest
266 var perPartitionTimestamp int64
267 if om.conf.Consumer.Offsets.Retention == 0 {
268 perPartitionTimestamp = ReceiveTime
269 r = &OffsetCommitRequest{
270 Version: 1,
271 ConsumerGroup: om.group,
272 ConsumerID: om.memberID,
273 ConsumerGroupGeneration: om.generation,
274 }
275 } else {
276 r = &OffsetCommitRequest{
277 Version: 2,
278 RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
279 ConsumerGroup: om.group,
280 ConsumerID: om.memberID,
281 ConsumerGroupGeneration: om.generation,
282 }
283 }
284
285 om.pomsLock.RLock()
286 defer om.pomsLock.RUnlock()
287
288 for _, topicManagers := range om.poms {
289 for _, pom := range topicManagers {
290 pom.lock.Lock()
291 if pom.dirty {
292 r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
293 }
294 pom.lock.Unlock()
295 }
296 }
297
298 if len(r.blocks) > 0 {
299 return r
300 }
301
302 return nil
303}
304
305func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
306 om.pomsLock.RLock()
307 defer om.pomsLock.RUnlock()
308
309 for _, topicManagers := range om.poms {
310 for _, pom := range topicManagers {
311 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
312 continue
313 }
314
315 var err KError
316 var ok bool
317
318 if resp.Errors[pom.topic] == nil {
319 pom.handleError(ErrIncompleteResponse)
320 continue
321 }
322 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
323 pom.handleError(ErrIncompleteResponse)
324 continue
325 }
326
327 switch err {
328 case ErrNoError:
329 block := req.blocks[pom.topic][pom.partition]
330 pom.updateCommitted(block.offset, block.metadata)
331 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
332 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
333 // not a critical error, we just need to redispatch
334 om.releaseCoordinator(broker)
335 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
336 // nothing we can do about this, just tell the user and carry on
337 pom.handleError(err)
338 case ErrOffsetsLoadInProgress:
339 // nothing wrong but we didn't commit, we'll get it next time round
340 case ErrUnknownTopicOrPartition:
341 // let the user know *and* try redispatching - if topic-auto-create is
342 // enabled, redispatching should trigger a metadata req and create the
343 // topic; if not then re-dispatching won't help, but we've let the user
344 // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
345 fallthrough
346 default:
347 // dunno, tell the user and try redispatching
348 pom.handleError(err)
349 om.releaseCoordinator(broker)
350 }
351 }
352 }
353}
354
355func (om *offsetManager) handleError(err error) {
356 om.pomsLock.RLock()
357 defer om.pomsLock.RUnlock()
358
359 for _, topicManagers := range om.poms {
360 for _, pom := range topicManagers {
361 pom.handleError(err)
362 }
363 }
364}
365
366func (om *offsetManager) asyncClosePOMs() {
367 om.pomsLock.RLock()
368 defer om.pomsLock.RUnlock()
369
370 for _, topicManagers := range om.poms {
371 for _, pom := range topicManagers {
372 pom.AsyncClose()
373 }
374 }
375}
376
377// Releases/removes closed POMs once they are clean (or when forced)
378func (om *offsetManager) releasePOMs(force bool) (remaining int) {
379 om.pomsLock.Lock()
380 defer om.pomsLock.Unlock()
381
382 for topic, topicManagers := range om.poms {
383 for partition, pom := range topicManagers {
384 pom.lock.Lock()
385 releaseDue := pom.done && (force || !pom.dirty)
386 pom.lock.Unlock()
387
388 if releaseDue {
389 pom.release()
390
391 delete(om.poms[topic], partition)
392 if len(om.poms[topic]) == 0 {
393 delete(om.poms, topic)
394 }
395 }
396 }
397 remaining += len(om.poms[topic])
398 }
399 return
400}
401
402func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
403 om.pomsLock.RLock()
404 defer om.pomsLock.RUnlock()
405
406 if partitions, ok := om.poms[topic]; ok {
407 if pom, ok := partitions[partition]; ok {
408 return pom
409 }
410 }
411 return nil
412}
413
414// Partition Offset Manager
415
416// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
417// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
418// out of scope.
419type PartitionOffsetManager interface {
420 // NextOffset returns the next offset that should be consumed for the managed
421 // partition, accompanied by metadata which can be used to reconstruct the state
422 // of the partition consumer when it resumes. NextOffset() will return
423 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
424 // was committed for this partition yet.
425 NextOffset() (int64, string)
426
427 // MarkOffset marks the provided offset, alongside a metadata string
428 // that represents the state of the partition consumer at that point in time. The
429 // metadata string can be used by another consumer to restore that state, so it
430 // can resume consumption.
431 //
432 // To follow upstream conventions, you are expected to mark the offset of the
433 // next message to read, not the last message read. Thus, when calling `MarkOffset`
434 // you should typically add one to the offset of the last consumed message.
435 //
436 // Note: calling MarkOffset does not necessarily commit the offset to the backend
437 // store immediately for efficiency reasons, and it may never be committed if
438 // your application crashes. This means that you may end up processing the same
439 // message twice, and your processing should ideally be idempotent.
440 MarkOffset(offset int64, metadata string)
441
442 // ResetOffset resets to the provided offset, alongside a metadata string that
443 // represents the state of the partition consumer at that point in time. Reset
444 // acts as a counterpart to MarkOffset, the difference being that it allows to
445 // reset an offset to an earlier or smaller value, where MarkOffset only
446 // allows incrementing the offset. cf MarkOffset for more details.
447 ResetOffset(offset int64, metadata string)
448
449 // Errors returns a read channel of errors that occur during offset management, if
450 // enabled. By default, errors are logged and not returned over this channel. If
451 // you want to implement any custom error handling, set your config's
452 // Consumer.Return.Errors setting to true, and read from this channel.
453 Errors() <-chan *ConsumerError
454
455 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
456 // return immediately, after which you should wait until the 'errors' channel has
457 // been drained and closed. It is required to call this function, or Close before
458 // a consumer object passes out of scope, as it will otherwise leak memory. You
459 // must call this before calling Close on the underlying client.
460 AsyncClose()
461
462 // Close stops the PartitionOffsetManager from managing offsets. It is required to
463 // call this function (or AsyncClose) before a PartitionOffsetManager object
464 // passes out of scope, as it will otherwise leak memory. You must call this
465 // before calling Close on the underlying client.
466 Close() error
467}
468
469type partitionOffsetManager struct {
470 parent *offsetManager
471 topic string
472 partition int32
473
474 lock sync.Mutex
475 offset int64
476 metadata string
477 dirty bool
478 done bool
479
480 releaseOnce sync.Once
481 errors chan *ConsumerError
482}
483
484func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
485 offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
486 if err != nil {
487 return nil, err
488 }
489
490 return &partitionOffsetManager{
491 parent: om,
492 topic: topic,
493 partition: partition,
494 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
495 offset: offset,
496 metadata: metadata,
497 }, nil
498}
499
500func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
501 return pom.errors
502}
503
504func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
505 pom.lock.Lock()
506 defer pom.lock.Unlock()
507
508 if offset > pom.offset {
509 pom.offset = offset
510 pom.metadata = metadata
511 pom.dirty = true
512 }
513}
514
515func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
516 pom.lock.Lock()
517 defer pom.lock.Unlock()
518
519 if offset <= pom.offset {
520 pom.offset = offset
521 pom.metadata = metadata
522 pom.dirty = true
523 }
524}
525
526func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
527 pom.lock.Lock()
528 defer pom.lock.Unlock()
529
530 if pom.offset == offset && pom.metadata == metadata {
531 pom.dirty = false
532 }
533}
534
535func (pom *partitionOffsetManager) NextOffset() (int64, string) {
536 pom.lock.Lock()
537 defer pom.lock.Unlock()
538
539 if pom.offset >= 0 {
540 return pom.offset, pom.metadata
541 }
542
543 return pom.parent.conf.Consumer.Offsets.Initial, ""
544}
545
546func (pom *partitionOffsetManager) AsyncClose() {
547 pom.lock.Lock()
548 pom.done = true
549 pom.lock.Unlock()
550}
551
552func (pom *partitionOffsetManager) Close() error {
553 pom.AsyncClose()
554
555 var errors ConsumerErrors
556 for err := range pom.errors {
557 errors = append(errors, err)
558 }
559
560 if len(errors) > 0 {
561 return errors
562 }
563 return nil
564}
565
566func (pom *partitionOffsetManager) handleError(err error) {
567 cErr := &ConsumerError{
568 Topic: pom.topic,
569 Partition: pom.partition,
570 Err: err,
571 }
572
573 if pom.parent.conf.Consumer.Return.Errors {
574 pom.errors <- cErr
575 } else {
576 Logger.Println(cErr)
577 }
578}
579
580func (pom *partitionOffsetManager) release() {
581 pom.releaseOnce.Do(func() {
582 close(pom.errors)
583 })
584}