blob: e7a67dac8b6ef68fe39db3955b46ac4e392978b9 [file] [log] [blame]
package cluster
import (
"sort"
"sync"
"sync/atomic"
"time"
"github.com/Shopify/sarama"
)
// Consumer is a cluster group consumer
type Consumer struct {
client *Client
ownClient bool
consumer sarama.Consumer
subs *partitionMap
consumerID string
groupID string
memberID string
generationID int32
membershipMu sync.RWMutex
coreTopics []string
extraTopics []string
dying, dead chan none
closeOnce sync.Once
consuming int32
messages chan *sarama.ConsumerMessage
errors chan error
partitions chan PartitionConsumer
notifications chan *Notification
commitMu sync.Mutex
}
// NewConsumer initializes a new consumer
func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
client, err := NewClient(addrs, config)
if err != nil {
return nil, err
}
consumer, err := NewConsumerFromClient(client, groupID, topics)
if err != nil {
return nil, err
}
consumer.ownClient = true
return consumer, nil
}
// NewConsumerFromClient initializes a new consumer from an existing client.
//
// Please note that clients cannot be shared between consumers (due to Kafka internals),
// they can only be re-used which requires the user to call Close() on the first consumer
// before using this method again to initialize another one. Attempts to use a client with
// more than one consumer at a time will return errors.
func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
if !client.claim() {
return nil, errClientInUse
}
consumer, err := sarama.NewConsumerFromClient(client.Client)
if err != nil {
client.release()
return nil, err
}
sort.Strings(topics)
c := &Consumer{
client: client,
consumer: consumer,
subs: newPartitionMap(),
groupID: groupID,
coreTopics: topics,
dying: make(chan none),
dead: make(chan none),
messages: make(chan *sarama.ConsumerMessage),
errors: make(chan error, client.config.ChannelBufferSize),
partitions: make(chan PartitionConsumer, 1),
notifications: make(chan *Notification),
}
if err := c.client.RefreshCoordinator(groupID); err != nil {
client.release()
return nil, err
}
go c.mainLoop()
return c, nil
}
// Messages returns the read channel for the messages that are returned by
// the broker.
//
// This channel will only return if Config.Group.Mode option is set to
// ConsumerModeMultiplex (default).
func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
// Partitions returns the read channels for individual partitions of this broker.
//
// This will channel will only return if Config.Group.Mode option is set to
// ConsumerModePartitions.
//
// The Partitions() channel must be listened to for the life of this consumer;
// when a rebalance happens old partitions will be closed (naturally come to
// completion) and new ones will be emitted. The returned channel will only close
// when the consumer is completely shut down.
func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
// Errors returns a read channel of errors that occur during offset management, if
// enabled. By default, errors are logged and not returned over this channel. If
// you want to implement any custom error handling, set your config's
// Consumer.Return.Errors setting to true, and read from this channel.
func (c *Consumer) Errors() <-chan error { return c.errors }
// Notifications returns a channel of Notifications that occur during consumer
// rebalancing. Notifications will only be emitted over this channel, if your config's
// Group.Return.Notifications setting to true.
func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
// HighWaterMarks returns the current high water marks for each topic and partition
// Consistency between partitions is not guaranteed since high water marks are updated separately.
func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
// MarkOffset marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Note: calling MarkOffset does not necessarily commit the offset to the backend
// store immediately for efficiency reasons, and it may never be committed if
// your application crashes. This means that you may end up processing the same
// message twice, and your processing should ideally be idempotent.
func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
sub.MarkOffset(msg.Offset, metadata)
}
}
// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
if sub := c.subs.Fetch(topic, partition); sub != nil {
sub.MarkOffset(offset, metadata)
}
}
// MarkOffsets marks stashed offsets as processed.
// See MarkOffset for additional explanation.
func (c *Consumer) MarkOffsets(s *OffsetStash) {
s.mu.Lock()
defer s.mu.Unlock()
for tp, info := range s.offsets {
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
sub.MarkOffset(info.Offset, info.Metadata)
}
delete(s.offsets, tp)
}
}
// ResetOffsets marks the provided message as processed, alongside a metadata string
// that represents the state of the partition consumer at that point in time. The
// metadata string can be used by another consumer to restore that state, so it
// can resume consumption.
//
// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
sub.ResetOffset(msg.Offset, metadata)
}
}
// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
sub := c.subs.Fetch(topic, partition)
if sub != nil {
sub.ResetOffset(offset, metadata)
}
}
// ResetOffsets marks stashed offsets as processed.
// See ResetOffset for additional explanation.
func (c *Consumer) ResetOffsets(s *OffsetStash) {
s.mu.Lock()
defer s.mu.Unlock()
for tp, info := range s.offsets {
if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
sub.ResetOffset(info.Offset, info.Metadata)
}
delete(s.offsets, tp)
}
}
// Subscriptions returns the consumed topics and partitions
func (c *Consumer) Subscriptions() map[string][]int32 {
return c.subs.Info()
}
// CommitOffsets allows to manually commit previously marked offsets. By default there is no
// need to call this function as the consumer will commit offsets automatically
// using the Config.Consumer.Offsets.CommitInterval setting.
//
// Please be aware that calling this function during an internal rebalance cycle may return
// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
func (c *Consumer) CommitOffsets() error {
c.commitMu.Lock()
defer c.commitMu.Unlock()
memberID, generationID := c.membership()
req := &sarama.OffsetCommitRequest{
Version: 2,
ConsumerGroup: c.groupID,
ConsumerGroupGeneration: generationID,
ConsumerID: memberID,
RetentionTime: -1,
}
if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
req.RetentionTime = int64(ns / time.Millisecond)
}
snap := c.subs.Snapshot()
dirty := false
for tp, state := range snap {
if state.Dirty {
dirty = true
req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
}
}
if !dirty {
return nil
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
resp, err := broker.CommitOffset(req)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
for topic, errs := range resp.Errors {
for partition, kerr := range errs {
if kerr != sarama.ErrNoError {
err = kerr
} else if state, ok := snap[topicPartition{topic, partition}]; ok {
if sub := c.subs.Fetch(topic, partition); sub != nil {
sub.markCommitted(state.Info.Offset)
}
}
}
}
return err
}
// Close safely closes the consumer and releases all resources
func (c *Consumer) Close() (err error) {
c.closeOnce.Do(func() {
close(c.dying)
<-c.dead
if e := c.release(); e != nil {
err = e
}
if e := c.consumer.Close(); e != nil {
err = e
}
close(c.messages)
close(c.errors)
if e := c.leaveGroup(); e != nil {
err = e
}
close(c.partitions)
close(c.notifications)
// drain
for range c.messages {
}
for range c.errors {
}
for p := range c.partitions {
_ = p.Close()
}
for range c.notifications {
}
c.client.release()
if c.ownClient {
if e := c.client.Close(); e != nil {
err = e
}
}
})
return
}
func (c *Consumer) mainLoop() {
defer close(c.dead)
defer atomic.StoreInt32(&c.consuming, 0)
for {
atomic.StoreInt32(&c.consuming, 0)
// Check if close was requested
select {
case <-c.dying:
return
default:
}
// Start next consume cycle
c.nextTick()
}
}
func (c *Consumer) nextTick() {
// Remember previous subscriptions
var notification *Notification
if c.client.config.Group.Return.Notifications {
notification = newNotification(c.subs.Info())
}
// Refresh coordinator
if err := c.refreshCoordinator(); err != nil {
c.rebalanceError(err, nil)
return
}
// Release subscriptions
if err := c.release(); err != nil {
c.rebalanceError(err, nil)
return
}
// Issue rebalance start notification
if c.client.config.Group.Return.Notifications {
c.handleNotification(notification)
}
// Rebalance, fetch new subscriptions
subs, err := c.rebalance()
if err != nil {
c.rebalanceError(err, notification)
return
}
// Coordinate loops, make sure everything is
// stopped on exit
tomb := newLoopTomb()
defer tomb.Close()
// Start the heartbeat
tomb.Go(c.hbLoop)
// Subscribe to topic/partitions
if err := c.subscribe(tomb, subs); err != nil {
c.rebalanceError(err, notification)
return
}
// Update/issue notification with new claims
if c.client.config.Group.Return.Notifications {
notification = notification.success(subs)
c.handleNotification(notification)
}
// Start topic watcher loop
tomb.Go(c.twLoop)
// Start consuming and committing offsets
tomb.Go(c.cmLoop)
atomic.StoreInt32(&c.consuming, 1)
// Wait for signals
select {
case <-tomb.Dying():
case <-c.dying:
}
}
// heartbeat loop, triggered by the mainLoop
func (c *Consumer) hbLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
switch err := c.heartbeat(); err {
case nil, sarama.ErrNoError:
case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
return
default:
c.handleError(&Error{Ctx: "heartbeat", error: err})
return
}
case <-stopped:
return
case <-c.dying:
return
}
}
}
// topic watcher loop, triggered by the mainLoop
func (c *Consumer) twLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
defer ticker.Stop()
for {
select {
case <-ticker.C:
topics, err := c.client.Topics()
if err != nil {
c.handleError(&Error{Ctx: "topics", error: err})
return
}
for _, topic := range topics {
if !c.isKnownCoreTopic(topic) &&
!c.isKnownExtraTopic(topic) &&
c.isPotentialExtraTopic(topic) {
return
}
}
case <-stopped:
return
case <-c.dying:
return
}
}
}
// commit loop, triggered by the mainLoop
func (c *Consumer) cmLoop(stopped <-chan none) {
ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
c.handleError(&Error{Ctx: "commit", error: err})
return
}
case <-stopped:
return
case <-c.dying:
return
}
}
}
func (c *Consumer) rebalanceError(err error, n *Notification) {
if n != nil {
n.Type = RebalanceError
c.handleNotification(n)
}
switch err {
case sarama.ErrRebalanceInProgress:
default:
c.handleError(&Error{Ctx: "rebalance", error: err})
}
select {
case <-c.dying:
case <-time.After(c.client.config.Metadata.Retry.Backoff):
}
}
func (c *Consumer) handleNotification(n *Notification) {
if c.client.config.Group.Return.Notifications {
select {
case c.notifications <- n:
case <-c.dying:
return
}
}
}
func (c *Consumer) handleError(e *Error) {
if c.client.config.Consumer.Return.Errors {
select {
case c.errors <- e:
case <-c.dying:
return
}
} else {
sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
}
}
// Releases the consumer and commits offsets, called from rebalance() and Close()
func (c *Consumer) release() (err error) {
// Stop all consumers
c.subs.Stop()
// Clear subscriptions on exit
defer c.subs.Clear()
// Wait for messages to be processed
timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
defer timeout.Stop()
select {
case <-c.dying:
case <-timeout.C:
}
// Commit offsets, continue on errors
if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
err = e
}
return
}
// --------------------------------------------------------------------
// Performs a heartbeat, part of the mainLoop()
func (c *Consumer) heartbeat() error {
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
memberID, generationID := c.membership()
resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
GroupId: c.groupID,
MemberId: memberID,
GenerationId: generationID,
})
if err != nil {
c.closeCoordinator(broker, err)
return err
}
return resp.Err
}
// Performs a rebalance, part of the mainLoop()
func (c *Consumer) rebalance() (map[string][]int32, error) {
memberID, _ := c.membership()
sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
allTopics, err := c.client.Topics()
if err != nil {
return nil, err
}
c.extraTopics = c.selectExtraTopics(allTopics)
sort.Strings(c.extraTopics)
// Re-join consumer group
strategy, err := c.joinGroup()
switch {
case err == sarama.ErrUnknownMemberId:
c.membershipMu.Lock()
c.memberID = ""
c.membershipMu.Unlock()
return nil, err
case err != nil:
return nil, err
}
// Sync consumer group state, fetch subscriptions
subs, err := c.syncGroup(strategy)
switch {
case err == sarama.ErrRebalanceInProgress:
return nil, err
case err != nil:
_ = c.leaveGroup()
return nil, err
}
return subs, nil
}
// Performs the subscription, part of the mainLoop()
func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
// fetch offsets
offsets, err := c.fetchOffsets(subs)
if err != nil {
_ = c.leaveGroup()
return err
}
// create consumers in parallel
var mu sync.Mutex
var wg sync.WaitGroup
for topic, partitions := range subs {
for _, partition := range partitions {
wg.Add(1)
info := offsets[topic][partition]
go func(topic string, partition int32) {
if e := c.createConsumer(tomb, topic, partition, info); e != nil {
mu.Lock()
err = e
mu.Unlock()
}
wg.Done()
}(topic, partition)
}
}
wg.Wait()
if err != nil {
_ = c.release()
_ = c.leaveGroup()
}
return err
}
// --------------------------------------------------------------------
// Send a request to the broker to join group on rebalance()
func (c *Consumer) joinGroup() (*balancer, error) {
memberID, _ := c.membership()
req := &sarama.JoinGroupRequest{
GroupId: c.groupID,
MemberId: memberID,
SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
ProtocolType: "consumer",
}
meta := &sarama.ConsumerGroupMemberMetadata{
Version: 1,
Topics: append(c.coreTopics, c.extraTopics...),
UserData: c.client.config.Group.Member.UserData,
}
err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
if err != nil {
return nil, err
}
err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
if err != nil {
return nil, err
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
resp, err := broker.JoinGroup(req)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
} else if resp.Err != sarama.ErrNoError {
c.closeCoordinator(broker, resp.Err)
return nil, resp.Err
}
var strategy *balancer
if resp.LeaderId == resp.MemberId {
members, err := resp.GetMembers()
if err != nil {
return nil, err
}
strategy, err = newBalancerFromMeta(c.client, members)
if err != nil {
return nil, err
}
}
c.membershipMu.Lock()
c.memberID = resp.MemberId
c.generationID = resp.GenerationId
c.membershipMu.Unlock()
return strategy, nil
}
// Send a request to the broker to sync the group on rebalance().
// Returns a list of topics and partitions to consume.
func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
memberID, generationID := c.membership()
req := &sarama.SyncGroupRequest{
GroupId: c.groupID,
MemberId: memberID,
GenerationId: generationID,
}
if strategy != nil {
for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
Topics: topics,
}); err != nil {
return nil, err
}
}
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
resp, err := broker.SyncGroup(req)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
} else if resp.Err != sarama.ErrNoError {
c.closeCoordinator(broker, resp.Err)
return nil, resp.Err
}
// Return if there is nothing to subscribe to
if len(resp.MemberAssignment) == 0 {
return nil, nil
}
// Get assigned subscriptions
members, err := resp.GetMemberAssignment()
if err != nil {
return nil, err
}
// Sort partitions, for each topic
for topic := range members.Topics {
sort.Sort(int32Slice(members.Topics[topic]))
}
return members.Topics, nil
}
// Fetches latest committed offsets for all subscriptions
func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
offsets := make(map[string]map[int32]offsetInfo, len(subs))
req := &sarama.OffsetFetchRequest{
Version: 1,
ConsumerGroup: c.groupID,
}
for topic, partitions := range subs {
offsets[topic] = make(map[int32]offsetInfo, len(partitions))
for _, partition := range partitions {
offsets[topic][partition] = offsetInfo{Offset: -1}
req.AddPartition(topic, partition)
}
}
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
resp, err := broker.FetchOffset(req)
if err != nil {
c.closeCoordinator(broker, err)
return nil, err
}
for topic, partitions := range subs {
for _, partition := range partitions {
block := resp.GetBlock(topic, partition)
if block == nil {
return nil, sarama.ErrIncompleteResponse
}
if block.Err == sarama.ErrNoError {
offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
} else {
return nil, block.Err
}
}
}
return offsets, nil
}
// Send a request to the broker to leave the group on failes rebalance() and on Close()
func (c *Consumer) leaveGroup() error {
broker, err := c.client.Coordinator(c.groupID)
if err != nil {
c.closeCoordinator(broker, err)
return err
}
memberID, _ := c.membership()
if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
GroupId: c.groupID,
MemberId: memberID,
}); err != nil {
c.closeCoordinator(broker, err)
}
return err
}
// --------------------------------------------------------------------
func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
memberID, _ := c.membership()
sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
// Create partitionConsumer
pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
if err != nil {
return err
}
// Store in subscriptions
c.subs.Store(topic, partition, pc)
// Start partition consumer goroutine
tomb.Go(func(stopper <-chan none) {
if c.client.config.Group.Mode == ConsumerModePartitions {
pc.waitFor(stopper, c.errors)
} else {
pc.multiplex(stopper, c.messages, c.errors)
}
})
if c.client.config.Group.Mode == ConsumerModePartitions {
c.partitions <- pc
}
return nil
}
func (c *Consumer) commitOffsetsWithRetry(retries int) error {
err := c.CommitOffsets()
if err != nil && retries > 0 {
return c.commitOffsetsWithRetry(retries - 1)
}
return err
}
func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
if broker != nil {
_ = broker.Close()
}
switch err {
case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
_ = c.client.RefreshCoordinator(c.groupID)
}
}
func (c *Consumer) selectExtraTopics(allTopics []string) []string {
extra := allTopics[:0]
for _, topic := range allTopics {
if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
extra = append(extra, topic)
}
}
return extra
}
func (c *Consumer) isKnownCoreTopic(topic string) bool {
pos := sort.SearchStrings(c.coreTopics, topic)
return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
}
func (c *Consumer) isKnownExtraTopic(topic string) bool {
pos := sort.SearchStrings(c.extraTopics, topic)
return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
}
func (c *Consumer) isPotentialExtraTopic(topic string) bool {
rx := c.client.config.Group.Topics
if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
return false
}
if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
return true
}
return false
}
func (c *Consumer) refreshCoordinator() error {
if err := c.refreshMetadata(); err != nil {
return err
}
return c.client.RefreshCoordinator(c.groupID)
}
func (c *Consumer) refreshMetadata() (err error) {
if c.client.config.Metadata.Full {
err = c.client.RefreshMetadata()
} else {
var topics []string
if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
err = c.client.RefreshMetadata(topics...)
}
}
// maybe we didn't have authorization to describe all topics
switch err {
case sarama.ErrTopicAuthorizationFailed:
err = c.client.RefreshMetadata(c.coreTopics...)
}
return
}
func (c *Consumer) membership() (memberID string, generationID int32) {
c.membershipMu.RLock()
memberID, generationID = c.memberID, c.generationID
c.membershipMu.RUnlock()
return
}