blob: 4223ac5e012b1392e9a7d679788bfbfaf3795a6d [file] [log] [blame]
Scott Bakereee8dd82019-09-24 12:52:34 -07001package cluster
2
3import (
4 "sync"
5
6 "github.com/Shopify/sarama"
7)
8
9// OffsetStash allows to accumulate offsets and
10// mark them as processed in a bulk
11type OffsetStash struct {
12 offsets map[topicPartition]offsetInfo
13 mu sync.Mutex
14}
15
16// NewOffsetStash inits a blank stash
17func NewOffsetStash() *OffsetStash {
18 return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)}
19}
20
21// MarkOffset stashes the provided message offset
22func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
23 s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
24}
25
26// MarkPartitionOffset stashes the offset for the provided topic/partition combination
27func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
28 s.mu.Lock()
29 defer s.mu.Unlock()
30
31 key := topicPartition{Topic: topic, Partition: partition}
32 if info := s.offsets[key]; offset >= info.Offset {
33 info.Offset = offset
34 info.Metadata = metadata
35 s.offsets[key] = info
36 }
37}
38
39// ResetPartitionOffset stashes the offset for the provided topic/partition combination.
40// Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets
41func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
42 s.mu.Lock()
43 defer s.mu.Unlock()
44
45 key := topicPartition{Topic: topic, Partition: partition}
46 if info := s.offsets[key]; offset <= info.Offset {
47 info.Offset = offset
48 info.Metadata = metadata
49 s.offsets[key] = info
50 }
51}
52
53// ResetOffset stashes the provided message offset
54// See ResetPartitionOffset for explanation
55func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
56 s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata)
57}
58
59// Offsets returns the latest stashed offsets by topic-partition
60func (s *OffsetStash) Offsets() map[string]int64 {
61 s.mu.Lock()
62 defer s.mu.Unlock()
63
64 res := make(map[string]int64, len(s.offsets))
65 for tp, info := range s.offsets {
66 res[tp.String()] = info.Offset
67 }
68 return res
69}