| package cluster |
| |
| import ( |
| "sync" |
| |
| "github.com/Shopify/sarama" |
| ) |
| |
| // OffsetStash allows to accumulate offsets and |
| // mark them as processed in a bulk |
| type OffsetStash struct { |
| offsets map[topicPartition]offsetInfo |
| mu sync.Mutex |
| } |
| |
| // NewOffsetStash inits a blank stash |
| func NewOffsetStash() *OffsetStash { |
| return &OffsetStash{offsets: make(map[topicPartition]offsetInfo)} |
| } |
| |
| // MarkOffset stashes the provided message offset |
| func (s *OffsetStash) MarkOffset(msg *sarama.ConsumerMessage, metadata string) { |
| s.MarkPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) |
| } |
| |
| // MarkPartitionOffset stashes the offset for the provided topic/partition combination |
| func (s *OffsetStash) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| key := topicPartition{Topic: topic, Partition: partition} |
| if info := s.offsets[key]; offset >= info.Offset { |
| info.Offset = offset |
| info.Metadata = metadata |
| s.offsets[key] = info |
| } |
| } |
| |
| // ResetPartitionOffset stashes the offset for the provided topic/partition combination. |
| // Difference between ResetPartitionOffset and MarkPartitionOffset is that, ResetPartitionOffset supports earlier offsets |
| func (s *OffsetStash) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| key := topicPartition{Topic: topic, Partition: partition} |
| if info := s.offsets[key]; offset <= info.Offset { |
| info.Offset = offset |
| info.Metadata = metadata |
| s.offsets[key] = info |
| } |
| } |
| |
| // ResetOffset stashes the provided message offset |
| // See ResetPartitionOffset for explanation |
| func (s *OffsetStash) ResetOffset(msg *sarama.ConsumerMessage, metadata string) { |
| s.ResetPartitionOffset(msg.Topic, msg.Partition, msg.Offset, metadata) |
| } |
| |
| // Offsets returns the latest stashed offsets by topic-partition |
| func (s *OffsetStash) Offsets() map[string]int64 { |
| s.mu.Lock() |
| defer s.mu.Unlock() |
| |
| res := make(map[string]int64, len(s.offsets)) |
| for tp, info := range s.offsets { |
| res[tp.String()] = info.Offset |
| } |
| return res |
| } |