| package cluster |
| |
| import ( |
| "sort" |
| "sync" |
| "time" |
| |
| "github.com/Shopify/sarama" |
| ) |
| |
| // PartitionConsumer allows code to consume individual partitions from the cluster. |
| // |
| // See docs for Consumer.Partitions() for more on how to implement this. |
| type PartitionConsumer interface { |
| sarama.PartitionConsumer |
| |
| // Topic returns the consumed topic name |
| Topic() string |
| |
| // Partition returns the consumed partition |
| Partition() int32 |
| |
| // InitialOffset returns the offset used for creating the PartitionConsumer instance. |
| // The returned offset can be a literal offset, or OffsetNewest, or OffsetOldest |
| InitialOffset() int64 |
| |
| // MarkOffset marks the offset of a message as preocessed. |
| MarkOffset(offset int64, metadata string) |
| |
| // ResetOffset resets the offset to a previously processed message. |
| ResetOffset(offset int64, metadata string) |
| } |
| |
| type partitionConsumer struct { |
| sarama.PartitionConsumer |
| |
| state partitionState |
| mu sync.Mutex |
| |
| topic string |
| partition int32 |
| initialOffset int64 |
| |
| closeOnce sync.Once |
| closeErr error |
| |
| dying, dead chan none |
| } |
| |
| func newPartitionConsumer(manager sarama.Consumer, topic string, partition int32, info offsetInfo, defaultOffset int64) (*partitionConsumer, error) { |
| offset := info.NextOffset(defaultOffset) |
| pcm, err := manager.ConsumePartition(topic, partition, offset) |
| |
| // Resume from default offset, if requested offset is out-of-range |
| if err == sarama.ErrOffsetOutOfRange { |
| info.Offset = -1 |
| offset = defaultOffset |
| pcm, err = manager.ConsumePartition(topic, partition, offset) |
| } |
| if err != nil { |
| return nil, err |
| } |
| |
| return &partitionConsumer{ |
| PartitionConsumer: pcm, |
| state: partitionState{Info: info}, |
| |
| topic: topic, |
| partition: partition, |
| initialOffset: offset, |
| |
| dying: make(chan none), |
| dead: make(chan none), |
| }, nil |
| } |
| |
| // Topic implements PartitionConsumer |
| func (c *partitionConsumer) Topic() string { return c.topic } |
| |
| // Partition implements PartitionConsumer |
| func (c *partitionConsumer) Partition() int32 { return c.partition } |
| |
| // InitialOffset implements PartitionConsumer |
| func (c *partitionConsumer) InitialOffset() int64 { return c.initialOffset } |
| |
| // AsyncClose implements PartitionConsumer |
| func (c *partitionConsumer) AsyncClose() { |
| c.closeOnce.Do(func() { |
| c.closeErr = c.PartitionConsumer.Close() |
| close(c.dying) |
| }) |
| } |
| |
| // Close implements PartitionConsumer |
| func (c *partitionConsumer) Close() error { |
| c.AsyncClose() |
| <-c.dead |
| return c.closeErr |
| } |
| |
| func (c *partitionConsumer) waitFor(stopper <-chan none, errors chan<- error) { |
| defer close(c.dead) |
| |
| for { |
| select { |
| case err, ok := <-c.Errors(): |
| if !ok { |
| return |
| } |
| select { |
| case errors <- err: |
| case <-stopper: |
| return |
| case <-c.dying: |
| return |
| } |
| case <-stopper: |
| return |
| case <-c.dying: |
| return |
| } |
| } |
| } |
| |
| func (c *partitionConsumer) multiplex(stopper <-chan none, messages chan<- *sarama.ConsumerMessage, errors chan<- error) { |
| defer close(c.dead) |
| |
| for { |
| select { |
| case msg, ok := <-c.Messages(): |
| if !ok { |
| return |
| } |
| select { |
| case messages <- msg: |
| case <-stopper: |
| return |
| case <-c.dying: |
| return |
| } |
| case err, ok := <-c.Errors(): |
| if !ok { |
| return |
| } |
| select { |
| case errors <- err: |
| case <-stopper: |
| return |
| case <-c.dying: |
| return |
| } |
| case <-stopper: |
| return |
| case <-c.dying: |
| return |
| } |
| } |
| } |
| |
| func (c *partitionConsumer) getState() partitionState { |
| c.mu.Lock() |
| state := c.state |
| c.mu.Unlock() |
| |
| return state |
| } |
| |
| func (c *partitionConsumer) markCommitted(offset int64) { |
| c.mu.Lock() |
| if offset == c.state.Info.Offset { |
| c.state.Dirty = false |
| } |
| c.mu.Unlock() |
| } |
| |
| // MarkOffset implements PartitionConsumer |
| func (c *partitionConsumer) MarkOffset(offset int64, metadata string) { |
| c.mu.Lock() |
| if next := offset + 1; next > c.state.Info.Offset { |
| c.state.Info.Offset = next |
| c.state.Info.Metadata = metadata |
| c.state.Dirty = true |
| } |
| c.mu.Unlock() |
| } |
| |
| // ResetOffset implements PartitionConsumer |
| func (c *partitionConsumer) ResetOffset(offset int64, metadata string) { |
| c.mu.Lock() |
| if next := offset + 1; next <= c.state.Info.Offset { |
| c.state.Info.Offset = next |
| c.state.Info.Metadata = metadata |
| c.state.Dirty = true |
| } |
| c.mu.Unlock() |
| } |
| |
| // -------------------------------------------------------------------- |
| |
| type partitionState struct { |
| Info offsetInfo |
| Dirty bool |
| LastCommit time.Time |
| } |
| |
| // -------------------------------------------------------------------- |
| |
| type partitionMap struct { |
| data map[topicPartition]*partitionConsumer |
| mu sync.RWMutex |
| } |
| |
| func newPartitionMap() *partitionMap { |
| return &partitionMap{ |
| data: make(map[topicPartition]*partitionConsumer), |
| } |
| } |
| |
| func (m *partitionMap) IsSubscribedTo(topic string) bool { |
| m.mu.RLock() |
| defer m.mu.RUnlock() |
| |
| for tp := range m.data { |
| if tp.Topic == topic { |
| return true |
| } |
| } |
| return false |
| } |
| |
| func (m *partitionMap) Fetch(topic string, partition int32) *partitionConsumer { |
| m.mu.RLock() |
| pc, _ := m.data[topicPartition{topic, partition}] |
| m.mu.RUnlock() |
| return pc |
| } |
| |
| func (m *partitionMap) Store(topic string, partition int32, pc *partitionConsumer) { |
| m.mu.Lock() |
| m.data[topicPartition{topic, partition}] = pc |
| m.mu.Unlock() |
| } |
| |
| func (m *partitionMap) Snapshot() map[topicPartition]partitionState { |
| m.mu.RLock() |
| defer m.mu.RUnlock() |
| |
| snap := make(map[topicPartition]partitionState, len(m.data)) |
| for tp, pc := range m.data { |
| snap[tp] = pc.getState() |
| } |
| return snap |
| } |
| |
| func (m *partitionMap) Stop() { |
| m.mu.RLock() |
| defer m.mu.RUnlock() |
| |
| var wg sync.WaitGroup |
| for tp := range m.data { |
| wg.Add(1) |
| go func(p *partitionConsumer) { |
| _ = p.Close() |
| wg.Done() |
| }(m.data[tp]) |
| } |
| wg.Wait() |
| } |
| |
| func (m *partitionMap) Clear() { |
| m.mu.Lock() |
| for tp := range m.data { |
| delete(m.data, tp) |
| } |
| m.mu.Unlock() |
| } |
| |
| func (m *partitionMap) Info() map[string][]int32 { |
| info := make(map[string][]int32) |
| m.mu.RLock() |
| for tp := range m.data { |
| info[tp.Topic] = append(info[tp.Topic], tp.Partition) |
| } |
| m.mu.RUnlock() |
| |
| for topic := range info { |
| sort.Sort(int32Slice(info[topic])) |
| } |
| return info |
| } |