blob: bfaa587830d729b03a3a612e50d9b389f1e76080 [file] [log] [blame]
package cluster
import (
"sort"
"sync"
"time"
"github.com/Shopify/sarama"
)
// PartitionConsumer allows code to consume individual partitions from the cluster.
//
// See docs for Consumer.Partitions() for more on how to implement this.
type PartitionConsumer interface {
sarama.PartitionConsumer
// Topic returns the consumed topic name
Topic() string
// Partition returns the consumed partition
Partition() int32
// InitialOffset returns the offset used for creating the PartitionConsumer instance.
// The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest
InitialOffset() int64
// MarkOffset marks the offset of a message as preocessed.
MarkOffset(offset int64, metadata string)
// ResetOffset resets the offset to a previously processed message.
ResetOffset(offset int64, metadata string)
}
type partitionConsumer struct {
sarama.PartitionConsumer
state partitionState
mu sync.Mutex
topic string
partition int32
initialOffset int64
closeOnce sync.Once
closeErr error
dying, dead chan none
}
func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) {
offset := info.NextOffset(defaultOffset)
pcm, err := manager.ConsumePartition(topic, partition, offset)
// Resume from default offset, if requested offset is out-of-range
if err == sarama.ErrOffsetOutOfRange {
info.Offset = -1
offset = defaultOffset
pcm, err = manager.ConsumePartition(topic, partition, offset)
}
if err != nil {
return nil, err
}
return &partitionConsumer{
PartitionConsumer: pcm,
state: partitionState{Info: info},
topic: topic,
partition: partition,
initialOffset: offset,
dying: make(chan none),
dead: make(chan none),
}, nil
}
// Topic implements PartitionConsumer
func (c *partitionConsumer) Topic() string { return c.topic }
// Partition implements PartitionConsumer
func (c *partitionConsumer) Partition() int32 { return c.partition }
// InitialOffset implements PartitionConsumer
func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset }
// AsyncClose implements PartitionConsumer
func (c *partitionConsumer) AsyncClose() {
c.closeOnce.Do(func() {
c.closeErr = c.PartitionConsumer.Close()
close(c.dying)
})
}
// Close implements PartitionConsumer
func (c *partitionConsumer) Close() error {
c.AsyncClose()
<-c.dead
return c.closeErr
}
func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) {
defer close(c.dead)
for {
select {
case err, ok := <-c.Errors():
if !ok {
return
}
select {
case errors <- err:
case <-stopper:
return
case <-c.dying:
return
}
case <-stopper:
return
case <-c.dying:
return
}
}
}
func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) {
defer close(c.dead)
for {
select {
case msg, ok := <-c.Messages():
if !ok {
return
}
select {
case messages <- msg:
case <-stopper:
return
case <-c.dying:
return
}
case err, ok := <-c.Errors():
if !ok {
return
}
select {
case errors <- err:
case <-stopper:
return
case <-c.dying:
return
}
case <-stopper:
return
case <-c.dying:
return
}
}
}
func (c *partitionConsumer) getState() partitionState {
c.mu.Lock()
state := c.state
c.mu.Unlock()
return state
}
func (c *partitionConsumer) markCommitted(offset int64) {
c.mu.Lock()
if offset == c.state.Info.Offset {
c.state.Dirty = false
}
c.mu.Unlock()
}
// MarkOffset implements PartitionConsumer
func (c *partitionConsumer) MarkOffset(offset int64, metadata string) {
c.mu.Lock()
if next := offset + 1; next > c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}
// ResetOffset implements PartitionConsumer
func (c *partitionConsumer) ResetOffset(offset int64, metadata string) {
c.mu.Lock()
if next := offset + 1; next <= c.state.Info.Offset {
c.state.Info.Offset = next
c.state.Info.Metadata = metadata
c.state.Dirty = true
}
c.mu.Unlock()
}
// --------------------------------------------------------------------
type partitionState struct {
Info offsetInfo
Dirty bool
LastCommit time.Time
}
// --------------------------------------------------------------------
type partitionMap struct {
data map[topicPartition]*partitionConsumer
mu sync.RWMutex
}
func newPartitionMap() *partitionMap {
return &partitionMap{
data: make(map[topicPartition]*partitionConsumer),
}
}
func (m *partitionMap) IsSubscribedTo(topic string) bool {
m.mu.RLock()
defer m.mu.RUnlock()
for tp := range m.data {
if tp.Topic == topic {
return true
}
}
return false
}
func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer {
m.mu.RLock()
pc, _ := m.data[topicPartition{topic, partition}]
m.mu.RUnlock()
return pc
}
func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) {
m.mu.Lock()
m.data[topicPartition{topic, partition}] = pc
m.mu.Unlock()
}
func (m *partitionMap) Snapshot() map[topicPartition]partitionState {
m.mu.RLock()
defer m.mu.RUnlock()
snap := make(map[topicPartition]partitionState, len(m.data))
for tp, pc := range m.data {
snap[tp] = pc.getState()
}
return snap
}
func (m *partitionMap) Stop() {
m.mu.RLock()
defer m.mu.RUnlock()
var wg sync.WaitGroup
for tp := range m.data {
wg.Add(1)
go func(p *partitionConsumer) {
_ = p.Close()
wg.Done()
}(m.data[tp])
}
wg.Wait()
}
func (m *partitionMap) Clear() {
m.mu.Lock()
for tp := range m.data {
delete(m.data, tp)
}
m.mu.Unlock()
}
func (m *partitionMap) Info() map[string][]int32 {
info := make(map[string][]int32)
m.mu.RLock()
for tp := range m.data {
info[tp.Topic] = append(info[tp.Topic], tp.Partition)
}
m.mu.RUnlock()
for topic := range info {
sort.Sort(int32Slice(info[topic]))
}
return info
}