blob: 8ea857f8351e6b4158ebd22baf146bfa176154b4 [file] [log] [blame]
khenaidooac637102019-01-14 15:44:34 -05001package sarama
2
3import (
4 "sync"
5 "time"
6)
7
8// Offset Manager
9
10// OffsetManager uses Kafka to store and fetch consumed partition offsets.
11type OffsetManager interface {
12 // ManagePartition creates a PartitionOffsetManager on the given topic/partition.
13 // It will return an error if this OffsetManager is already managing the given
14 // topic/partition.
15 ManagePartition(topic string, partition int32) (PartitionOffsetManager, error)
16
17 // Close stops the OffsetManager from managing offsets. It is required to call
18 // this function before an OffsetManager object passes out of scope, as it
19 // will otherwise leak memory. You must call this after all the
20 // PartitionOffsetManagers are closed.
21 Close() error
22}
23
24type offsetManager struct {
25 client Client
26 conf *Config
27 group string
28 ticker *time.Ticker
29
30 memberID string
31 generation int32
32
33 broker *Broker
34 brokerLock sync.RWMutex
35
36 poms map[string]map[int32]*partitionOffsetManager
37 pomsLock sync.RWMutex
38
39 closeOnce sync.Once
40 closing chan none
41 closed chan none
42}
43
44// NewOffsetManagerFromClient creates a new OffsetManager from the given client.
45// It is still necessary to call Close() on the underlying client when finished with the partition manager.
46func NewOffsetManagerFromClient(group string, client Client) (OffsetManager, error) {
47 return newOffsetManagerFromClient(group, "", GroupGenerationUndefined, client)
48}
49
50func newOffsetManagerFromClient(group, memberID string, generation int32, client Client) (*offsetManager, error) {
51 // Check that we are not dealing with a closed Client before processing any other arguments
52 if client.Closed() {
53 return nil, ErrClosedClient
54 }
55
56 conf := client.Config()
57 om := &offsetManager{
58 client: client,
59 conf: conf,
60 group: group,
61 ticker: time.NewTicker(conf.Consumer.Offsets.CommitInterval),
62 poms: make(map[string]map[int32]*partitionOffsetManager),
63
64 memberID: memberID,
65 generation: generation,
66
67 closing: make(chan none),
68 closed: make(chan none),
69 }
70 go withRecover(om.mainLoop)
71
72 return om, nil
73}
74
75func (om *offsetManager) ManagePartition(topic string, partition int32) (PartitionOffsetManager, error) {
76 pom, err := om.newPartitionOffsetManager(topic, partition)
77 if err != nil {
78 return nil, err
79 }
80
81 om.pomsLock.Lock()
82 defer om.pomsLock.Unlock()
83
84 topicManagers := om.poms[topic]
85 if topicManagers == nil {
86 topicManagers = make(map[int32]*partitionOffsetManager)
87 om.poms[topic] = topicManagers
88 }
89
90 if topicManagers[partition] != nil {
91 return nil, ConfigurationError("That topic/partition is already being managed")
92 }
93
94 topicManagers[partition] = pom
95 return pom, nil
96}
97
98func (om *offsetManager) Close() error {
99 om.closeOnce.Do(func() {
100 // exit the mainLoop
101 close(om.closing)
102 <-om.closed
103
104 // mark all POMs as closed
105 om.asyncClosePOMs()
106
107 // flush one last time
108 for attempt := 0; attempt <= om.conf.Consumer.Offsets.Retry.Max; attempt++ {
109 om.flushToBroker()
110 if om.releasePOMs(false) == 0 {
111 break
112 }
113 }
114
115 om.releasePOMs(true)
116 om.brokerLock.Lock()
117 om.broker = nil
118 om.brokerLock.Unlock()
119 })
120 return nil
121}
122
123func (om *offsetManager) fetchInitialOffset(topic string, partition int32, retries int) (int64, string, error) {
124 broker, err := om.coordinator()
125 if err != nil {
126 if retries <= 0 {
127 return 0, "", err
128 }
129 return om.fetchInitialOffset(topic, partition, retries-1)
130 }
131
132 req := new(OffsetFetchRequest)
133 req.Version = 1
134 req.ConsumerGroup = om.group
135 req.AddPartition(topic, partition)
136
137 resp, err := broker.FetchOffset(req)
138 if err != nil {
139 if retries <= 0 {
140 return 0, "", err
141 }
142 om.releaseCoordinator(broker)
143 return om.fetchInitialOffset(topic, partition, retries-1)
144 }
145
146 block := resp.GetBlock(topic, partition)
147 if block == nil {
148 return 0, "", ErrIncompleteResponse
149 }
150
151 switch block.Err {
152 case ErrNoError:
153 return block.Offset, block.Metadata, nil
154 case ErrNotCoordinatorForConsumer:
155 if retries <= 0 {
156 return 0, "", block.Err
157 }
158 om.releaseCoordinator(broker)
159 return om.fetchInitialOffset(topic, partition, retries-1)
160 case ErrOffsetsLoadInProgress:
161 if retries <= 0 {
162 return 0, "", block.Err
163 }
164 select {
165 case <-om.closing:
166 return 0, "", block.Err
167 case <-time.After(om.conf.Metadata.Retry.Backoff):
168 }
169 return om.fetchInitialOffset(topic, partition, retries-1)
170 default:
171 return 0, "", block.Err
172 }
173}
174
175func (om *offsetManager) coordinator() (*Broker, error) {
176 om.brokerLock.RLock()
177 broker := om.broker
178 om.brokerLock.RUnlock()
179
180 if broker != nil {
181 return broker, nil
182 }
183
184 om.brokerLock.Lock()
185 defer om.brokerLock.Unlock()
186
187 if broker := om.broker; broker != nil {
188 return broker, nil
189 }
190
191 if err := om.client.RefreshCoordinator(om.group); err != nil {
192 return nil, err
193 }
194
195 broker, err := om.client.Coordinator(om.group)
196 if err != nil {
197 return nil, err
198 }
199
200 om.broker = broker
201 return broker, nil
202}
203
204func (om *offsetManager) releaseCoordinator(b *Broker) {
205 om.brokerLock.Lock()
206 if om.broker == b {
207 om.broker = nil
208 }
209 om.brokerLock.Unlock()
210}
211
212func (om *offsetManager) mainLoop() {
213 defer om.ticker.Stop()
214 defer close(om.closed)
215
216 for {
217 select {
218 case <-om.ticker.C:
219 om.flushToBroker()
220 om.releasePOMs(false)
221 case <-om.closing:
222 return
223 }
224 }
225}
226
227func (om *offsetManager) flushToBroker() {
228 req := om.constructRequest()
229 if req == nil {
230 return
231 }
232
233 broker, err := om.coordinator()
234 if err != nil {
235 om.handleError(err)
236 return
237 }
238
239 resp, err := broker.CommitOffset(req)
240 if err != nil {
241 om.handleError(err)
242 om.releaseCoordinator(broker)
243 _ = broker.Close()
244 return
245 }
246
247 om.handleResponse(broker, req, resp)
248}
249
250func (om *offsetManager) constructRequest() *OffsetCommitRequest {
251 var r *OffsetCommitRequest
252 var perPartitionTimestamp int64
253 if om.conf.Consumer.Offsets.Retention == 0 {
254 perPartitionTimestamp = ReceiveTime
255 r = &OffsetCommitRequest{
256 Version: 1,
257 ConsumerGroup: om.group,
258 ConsumerID: om.memberID,
259 ConsumerGroupGeneration: om.generation,
260 }
261 } else {
262 r = &OffsetCommitRequest{
263 Version: 2,
264 RetentionTime: int64(om.conf.Consumer.Offsets.Retention / time.Millisecond),
265 ConsumerGroup: om.group,
266 ConsumerID: om.memberID,
267 ConsumerGroupGeneration: om.generation,
268 }
269
270 }
271
272 om.pomsLock.RLock()
273 defer om.pomsLock.RUnlock()
274
275 for _, topicManagers := range om.poms {
276 for _, pom := range topicManagers {
277 pom.lock.Lock()
278 if pom.dirty {
279 r.AddBlock(pom.topic, pom.partition, pom.offset, perPartitionTimestamp, pom.metadata)
280 }
281 pom.lock.Unlock()
282 }
283 }
284
285 if len(r.blocks) > 0 {
286 return r
287 }
288
289 return nil
290}
291
292func (om *offsetManager) handleResponse(broker *Broker, req *OffsetCommitRequest, resp *OffsetCommitResponse) {
293 om.pomsLock.RLock()
294 defer om.pomsLock.RUnlock()
295
296 for _, topicManagers := range om.poms {
297 for _, pom := range topicManagers {
298 if req.blocks[pom.topic] == nil || req.blocks[pom.topic][pom.partition] == nil {
299 continue
300 }
301
302 var err KError
303 var ok bool
304
305 if resp.Errors[pom.topic] == nil {
306 pom.handleError(ErrIncompleteResponse)
307 continue
308 }
309 if err, ok = resp.Errors[pom.topic][pom.partition]; !ok {
310 pom.handleError(ErrIncompleteResponse)
311 continue
312 }
313
314 switch err {
315 case ErrNoError:
316 block := req.blocks[pom.topic][pom.partition]
317 pom.updateCommitted(block.offset, block.metadata)
318 case ErrNotLeaderForPartition, ErrLeaderNotAvailable,
319 ErrConsumerCoordinatorNotAvailable, ErrNotCoordinatorForConsumer:
320 // not a critical error, we just need to redispatch
321 om.releaseCoordinator(broker)
322 case ErrOffsetMetadataTooLarge, ErrInvalidCommitOffsetSize:
323 // nothing we can do about this, just tell the user and carry on
324 pom.handleError(err)
325 case ErrOffsetsLoadInProgress:
326 // nothing wrong but we didn't commit, we'll get it next time round
327 break
328 case ErrUnknownTopicOrPartition:
329 // let the user know *and* try redispatching - if topic-auto-create is
330 // enabled, redispatching should trigger a metadata req and create the
331 // topic; if not then re-dispatching won't help, but we've let the user
332 // know and it shouldn't hurt either (see https://github.com/Shopify/sarama/issues/706)
333 fallthrough
334 default:
335 // dunno, tell the user and try redispatching
336 pom.handleError(err)
337 om.releaseCoordinator(broker)
338 }
339 }
340 }
341}
342
343func (om *offsetManager) handleError(err error) {
344 om.pomsLock.RLock()
345 defer om.pomsLock.RUnlock()
346
347 for _, topicManagers := range om.poms {
348 for _, pom := range topicManagers {
349 pom.handleError(err)
350 }
351 }
352}
353
354func (om *offsetManager) asyncClosePOMs() {
355 om.pomsLock.RLock()
356 defer om.pomsLock.RUnlock()
357
358 for _, topicManagers := range om.poms {
359 for _, pom := range topicManagers {
360 pom.AsyncClose()
361 }
362 }
363}
364
365// Releases/removes closed POMs once they are clean (or when forced)
366func (om *offsetManager) releasePOMs(force bool) (remaining int) {
367 om.pomsLock.Lock()
368 defer om.pomsLock.Unlock()
369
370 for topic, topicManagers := range om.poms {
371 for partition, pom := range topicManagers {
372 pom.lock.Lock()
373 releaseDue := pom.done && (force || !pom.dirty)
374 pom.lock.Unlock()
375
376 if releaseDue {
377 pom.release()
378
379 delete(om.poms[topic], partition)
380 if len(om.poms[topic]) == 0 {
381 delete(om.poms, topic)
382 }
383 }
384 }
385 remaining += len(om.poms[topic])
386 }
387 return
388}
389
390func (om *offsetManager) findPOM(topic string, partition int32) *partitionOffsetManager {
391 om.pomsLock.RLock()
392 defer om.pomsLock.RUnlock()
393
394 if partitions, ok := om.poms[topic]; ok {
395 if pom, ok := partitions[partition]; ok {
396 return pom
397 }
398 }
399 return nil
400}
401
402// Partition Offset Manager
403
404// PartitionOffsetManager uses Kafka to store and fetch consumed partition offsets. You MUST call Close()
405// on a partition offset manager to avoid leaks, it will not be garbage-collected automatically when it passes
406// out of scope.
407type PartitionOffsetManager interface {
408 // NextOffset returns the next offset that should be consumed for the managed
409 // partition, accompanied by metadata which can be used to reconstruct the state
410 // of the partition consumer when it resumes. NextOffset() will return
411 // `config.Consumer.Offsets.Initial` and an empty metadata string if no offset
412 // was committed for this partition yet.
413 NextOffset() (int64, string)
414
415 // MarkOffset marks the provided offset, alongside a metadata string
416 // that represents the state of the partition consumer at that point in time. The
417 // metadata string can be used by another consumer to restore that state, so it
418 // can resume consumption.
419 //
420 // To follow upstream conventions, you are expected to mark the offset of the
421 // next message to read, not the last message read. Thus, when calling `MarkOffset`
422 // you should typically add one to the offset of the last consumed message.
423 //
424 // Note: calling MarkOffset does not necessarily commit the offset to the backend
425 // store immediately for efficiency reasons, and it may never be committed if
426 // your application crashes. This means that you may end up processing the same
427 // message twice, and your processing should ideally be idempotent.
428 MarkOffset(offset int64, metadata string)
429
430 // ResetOffset resets to the provided offset, alongside a metadata string that
431 // represents the state of the partition consumer at that point in time. Reset
432 // acts as a counterpart to MarkOffset, the difference being that it allows to
433 // reset an offset to an earlier or smaller value, where MarkOffset only
434 // allows incrementing the offset. cf MarkOffset for more details.
435 ResetOffset(offset int64, metadata string)
436
437 // Errors returns a read channel of errors that occur during offset management, if
438 // enabled. By default, errors are logged and not returned over this channel. If
439 // you want to implement any custom error handling, set your config's
440 // Consumer.Return.Errors setting to true, and read from this channel.
441 Errors() <-chan *ConsumerError
442
443 // AsyncClose initiates a shutdown of the PartitionOffsetManager. This method will
444 // return immediately, after which you should wait until the 'errors' channel has
445 // been drained and closed. It is required to call this function, or Close before
446 // a consumer object passes out of scope, as it will otherwise leak memory. You
447 // must call this before calling Close on the underlying client.
448 AsyncClose()
449
450 // Close stops the PartitionOffsetManager from managing offsets. It is required to
451 // call this function (or AsyncClose) before a PartitionOffsetManager object
452 // passes out of scope, as it will otherwise leak memory. You must call this
453 // before calling Close on the underlying client.
454 Close() error
455}
456
457type partitionOffsetManager struct {
458 parent *offsetManager
459 topic string
460 partition int32
461
462 lock sync.Mutex
463 offset int64
464 metadata string
465 dirty bool
466 done bool
467
468 releaseOnce sync.Once
469 errors chan *ConsumerError
470}
471
472func (om *offsetManager) newPartitionOffsetManager(topic string, partition int32) (*partitionOffsetManager, error) {
473 offset, metadata, err := om.fetchInitialOffset(topic, partition, om.conf.Metadata.Retry.Max)
474 if err != nil {
475 return nil, err
476 }
477
478 return &partitionOffsetManager{
479 parent: om,
480 topic: topic,
481 partition: partition,
482 errors: make(chan *ConsumerError, om.conf.ChannelBufferSize),
483 offset: offset,
484 metadata: metadata,
485 }, nil
486}
487
488func (pom *partitionOffsetManager) Errors() <-chan *ConsumerError {
489 return pom.errors
490}
491
492func (pom *partitionOffsetManager) MarkOffset(offset int64, metadata string) {
493 pom.lock.Lock()
494 defer pom.lock.Unlock()
495
496 if offset > pom.offset {
497 pom.offset = offset
498 pom.metadata = metadata
499 pom.dirty = true
500 }
501}
502
503func (pom *partitionOffsetManager) ResetOffset(offset int64, metadata string) {
504 pom.lock.Lock()
505 defer pom.lock.Unlock()
506
507 if offset <= pom.offset {
508 pom.offset = offset
509 pom.metadata = metadata
510 pom.dirty = true
511 }
512}
513
514func (pom *partitionOffsetManager) updateCommitted(offset int64, metadata string) {
515 pom.lock.Lock()
516 defer pom.lock.Unlock()
517
518 if pom.offset == offset && pom.metadata == metadata {
519 pom.dirty = false
520 }
521}
522
523func (pom *partitionOffsetManager) NextOffset() (int64, string) {
524 pom.lock.Lock()
525 defer pom.lock.Unlock()
526
527 if pom.offset >= 0 {
528 return pom.offset, pom.metadata
529 }
530
531 return pom.parent.conf.Consumer.Offsets.Initial, ""
532}
533
534func (pom *partitionOffsetManager) AsyncClose() {
535 pom.lock.Lock()
536 pom.done = true
537 pom.lock.Unlock()
538}
539
540func (pom *partitionOffsetManager) Close() error {
541 pom.AsyncClose()
542
543 var errors ConsumerErrors
544 for err := range pom.errors {
545 errors = append(errors, err)
546 }
547
548 if len(errors) > 0 {
549 return errors
550 }
551 return nil
552}
553
554func (pom *partitionOffsetManager) handleError(err error) {
555 cErr := &ConsumerError{
556 Topic: pom.topic,
557 Partition: pom.partition,
558 Err: err,
559 }
560
561 if pom.parent.conf.Consumer.Return.Errors {
562 pom.errors <- cErr
563 } else {
564 Logger.Println(cErr)
565 }
566}
567
568func (pom *partitionOffsetManager) release() {
569 pom.releaseOnce.Do(func() {
570 go close(pom.errors)
571 })
572}