[VOL-1349] EPON ONU adapter (package B)
Change-Id: I609ba349c429bc7e87c74b66bb1121841f9caef6
diff --git a/vendor/github.com/bsm/sarama-cluster/consumer.go b/vendor/github.com/bsm/sarama-cluster/consumer.go
new file mode 100644
index 0000000..e7a67da
--- /dev/null
+++ b/vendor/github.com/bsm/sarama-cluster/consumer.go
@@ -0,0 +1,919 @@
+package cluster
+
+import (
+ "sort"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "github.com/Shopify/sarama"
+)
+
+// Consumer is a cluster group consumer
+type Consumer struct {
+ client *Client
+ ownClient bool
+
+ consumer sarama.Consumer
+ subs *partitionMap
+
+ consumerID string
+ groupID string
+
+ memberID string
+ generationID int32
+ membershipMu sync.RWMutex
+
+ coreTopics []string
+ extraTopics []string
+
+ dying, dead chan none
+ closeOnce sync.Once
+
+ consuming int32
+ messages chan *sarama.ConsumerMessage
+ errors chan error
+ partitions chan PartitionConsumer
+ notifications chan *Notification
+
+ commitMu sync.Mutex
+}
+
+// NewConsumer initializes a new consumer
+func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
+ client, err := NewClient(addrs, config)
+ if err != nil {
+ return nil, err
+ }
+
+ consumer, err := NewConsumerFromClient(client, groupID, topics)
+ if err != nil {
+ return nil, err
+ }
+ consumer.ownClient = true
+ return consumer, nil
+}
+
+// NewConsumerFromClient initializes a new consumer from an existing client.
+//
+// Please note that clients cannot be shared between consumers (due to Kafka internals),
+// they can only be re-used which requires the user to call Close() on the first consumer
+// before using this method again to initialize another one. Attempts to use a client with
+// more than one consumer at a time will return errors.
+func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
+ if !client.claim() {
+ return nil, errClientInUse
+ }
+
+ consumer, err := sarama.NewConsumerFromClient(client.Client)
+ if err != nil {
+ client.release()
+ return nil, err
+ }
+
+ sort.Strings(topics)
+ c := &Consumer{
+ client: client,
+ consumer: consumer,
+ subs: newPartitionMap(),
+ groupID: groupID,
+
+ coreTopics: topics,
+
+ dying: make(chan none),
+ dead: make(chan none),
+
+ messages: make(chan *sarama.ConsumerMessage),
+ errors: make(chan error, client.config.ChannelBufferSize),
+ partitions: make(chan PartitionConsumer, 1),
+ notifications: make(chan *Notification),
+ }
+ if err := c.client.RefreshCoordinator(groupID); err != nil {
+ client.release()
+ return nil, err
+ }
+
+ go c.mainLoop()
+ return c, nil
+}
+
+// Messages returns the read channel for the messages that are returned by
+// the broker.
+//
+// This channel will only return if Config.Group.Mode option is set to
+// ConsumerModeMultiplex (default).
+func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
+
+// Partitions returns the read channels for individual partitions of this broker.
+//
+// This will channel will only return if Config.Group.Mode option is set to
+// ConsumerModePartitions.
+//
+// The Partitions() channel must be listened to for the life of this consumer;
+// when a rebalance happens old partitions will be closed (naturally come to
+// completion) and new ones will be emitted. The returned channel will only close
+// when the consumer is completely shut down.
+func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
+
+// Errors returns a read channel of errors that occur during offset management, if
+// enabled. By default, errors are logged and not returned over this channel. If
+// you want to implement any custom error handling, set your config's
+// Consumer.Return.Errors setting to true, and read from this channel.
+func (c *Consumer) Errors() <-chan error { return c.errors }
+
+// Notifications returns a channel of Notifications that occur during consumer
+// rebalancing. Notifications will only be emitted over this channel, if your config's
+// Group.Return.Notifications setting to true.
+func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
+
+// HighWaterMarks returns the current high water marks for each topic and partition
+// Consistency between partitions is not guaranteed since high water marks are updated separately.
+func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
+
+// MarkOffset marks the provided message as processed, alongside a metadata string
+// that represents the state of the partition consumer at that point in time. The
+// metadata string can be used by another consumer to restore that state, so it
+// can resume consumption.
+//
+// Note: calling MarkOffset does not necessarily commit the offset to the backend
+// store immediately for efficiency reasons, and it may never be committed if
+// your application crashes. This means that you may end up processing the same
+// message twice, and your processing should ideally be idempotent.
+func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
+ if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
+ sub.MarkOffset(msg.Offset, metadata)
+ }
+}
+
+// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
+// See MarkOffset for additional explanation.
+func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ if sub := c.subs.Fetch(topic, partition); sub != nil {
+ sub.MarkOffset(offset, metadata)
+ }
+}
+
+// MarkOffsets marks stashed offsets as processed.
+// See MarkOffset for additional explanation.
+func (c *Consumer) MarkOffsets(s *OffsetStash) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ for tp, info := range s.offsets {
+ if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
+ sub.MarkOffset(info.Offset, info.Metadata)
+ }
+ delete(s.offsets, tp)
+ }
+}
+
+// ResetOffsets marks the provided message as processed, alongside a metadata string
+// that represents the state of the partition consumer at that point in time. The
+// metadata string can be used by another consumer to restore that state, so it
+// can resume consumption.
+//
+// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
+func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
+ if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
+ sub.ResetOffset(msg.Offset, metadata)
+ }
+}
+
+// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
+// See ResetOffset for additional explanation.
+func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+ sub := c.subs.Fetch(topic, partition)
+ if sub != nil {
+ sub.ResetOffset(offset, metadata)
+ }
+}
+
+// ResetOffsets marks stashed offsets as processed.
+// See ResetOffset for additional explanation.
+func (c *Consumer) ResetOffsets(s *OffsetStash) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ for tp, info := range s.offsets {
+ if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
+ sub.ResetOffset(info.Offset, info.Metadata)
+ }
+ delete(s.offsets, tp)
+ }
+}
+
+// Subscriptions returns the consumed topics and partitions
+func (c *Consumer) Subscriptions() map[string][]int32 {
+ return c.subs.Info()
+}
+
+// CommitOffsets allows to manually commit previously marked offsets. By default there is no
+// need to call this function as the consumer will commit offsets automatically
+// using the Config.Consumer.Offsets.CommitInterval setting.
+//
+// Please be aware that calling this function during an internal rebalance cycle may return
+// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
+func (c *Consumer) CommitOffsets() error {
+ c.commitMu.Lock()
+ defer c.commitMu.Unlock()
+
+ memberID, generationID := c.membership()
+ req := &sarama.OffsetCommitRequest{
+ Version: 2,
+ ConsumerGroup: c.groupID,
+ ConsumerGroupGeneration: generationID,
+ ConsumerID: memberID,
+ RetentionTime: -1,
+ }
+
+ if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
+ req.RetentionTime = int64(ns / time.Millisecond)
+ }
+
+ snap := c.subs.Snapshot()
+ dirty := false
+ for tp, state := range snap {
+ if state.Dirty {
+ dirty = true
+ req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
+ }
+ }
+ if !dirty {
+ return nil
+ }
+
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+
+ resp, err := broker.CommitOffset(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+
+ for topic, errs := range resp.Errors {
+ for partition, kerr := range errs {
+ if kerr != sarama.ErrNoError {
+ err = kerr
+ } else if state, ok := snap[topicPartition{topic, partition}]; ok {
+ if sub := c.subs.Fetch(topic, partition); sub != nil {
+ sub.markCommitted(state.Info.Offset)
+ }
+ }
+ }
+ }
+ return err
+}
+
+// Close safely closes the consumer and releases all resources
+func (c *Consumer) Close() (err error) {
+ c.closeOnce.Do(func() {
+ close(c.dying)
+ <-c.dead
+
+ if e := c.release(); e != nil {
+ err = e
+ }
+ if e := c.consumer.Close(); e != nil {
+ err = e
+ }
+ close(c.messages)
+ close(c.errors)
+
+ if e := c.leaveGroup(); e != nil {
+ err = e
+ }
+ close(c.partitions)
+ close(c.notifications)
+
+ // drain
+ for range c.messages {
+ }
+ for range c.errors {
+ }
+ for p := range c.partitions {
+ _ = p.Close()
+ }
+ for range c.notifications {
+ }
+
+ c.client.release()
+ if c.ownClient {
+ if e := c.client.Close(); e != nil {
+ err = e
+ }
+ }
+ })
+ return
+}
+
+func (c *Consumer) mainLoop() {
+ defer close(c.dead)
+ defer atomic.StoreInt32(&c.consuming, 0)
+
+ for {
+ atomic.StoreInt32(&c.consuming, 0)
+
+ // Check if close was requested
+ select {
+ case <-c.dying:
+ return
+ default:
+ }
+
+ // Start next consume cycle
+ c.nextTick()
+ }
+}
+
+func (c *Consumer) nextTick() {
+ // Remember previous subscriptions
+ var notification *Notification
+ if c.client.config.Group.Return.Notifications {
+ notification = newNotification(c.subs.Info())
+ }
+
+ // Refresh coordinator
+ if err := c.refreshCoordinator(); err != nil {
+ c.rebalanceError(err, nil)
+ return
+ }
+
+ // Release subscriptions
+ if err := c.release(); err != nil {
+ c.rebalanceError(err, nil)
+ return
+ }
+
+ // Issue rebalance start notification
+ if c.client.config.Group.Return.Notifications {
+ c.handleNotification(notification)
+ }
+
+ // Rebalance, fetch new subscriptions
+ subs, err := c.rebalance()
+ if err != nil {
+ c.rebalanceError(err, notification)
+ return
+ }
+
+ // Coordinate loops, make sure everything is
+ // stopped on exit
+ tomb := newLoopTomb()
+ defer tomb.Close()
+
+ // Start the heartbeat
+ tomb.Go(c.hbLoop)
+
+ // Subscribe to topic/partitions
+ if err := c.subscribe(tomb, subs); err != nil {
+ c.rebalanceError(err, notification)
+ return
+ }
+
+ // Update/issue notification with new claims
+ if c.client.config.Group.Return.Notifications {
+ notification = notification.success(subs)
+ c.handleNotification(notification)
+ }
+
+ // Start topic watcher loop
+ tomb.Go(c.twLoop)
+
+ // Start consuming and committing offsets
+ tomb.Go(c.cmLoop)
+ atomic.StoreInt32(&c.consuming, 1)
+
+ // Wait for signals
+ select {
+ case <-tomb.Dying():
+ case <-c.dying:
+ }
+}
+
+// heartbeat loop, triggered by the mainLoop
+func (c *Consumer) hbLoop(stopped <-chan none) {
+ ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ switch err := c.heartbeat(); err {
+ case nil, sarama.ErrNoError:
+ case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
+ return
+ default:
+ c.handleError(&Error{Ctx: "heartbeat", error: err})
+ return
+ }
+ case <-stopped:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+}
+
+// topic watcher loop, triggered by the mainLoop
+func (c *Consumer) twLoop(stopped <-chan none) {
+ ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ topics, err := c.client.Topics()
+ if err != nil {
+ c.handleError(&Error{Ctx: "topics", error: err})
+ return
+ }
+
+ for _, topic := range topics {
+ if !c.isKnownCoreTopic(topic) &&
+ !c.isKnownExtraTopic(topic) &&
+ c.isPotentialExtraTopic(topic) {
+ return
+ }
+ }
+ case <-stopped:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+}
+
+// commit loop, triggered by the mainLoop
+func (c *Consumer) cmLoop(stopped <-chan none) {
+ ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
+ defer ticker.Stop()
+
+ for {
+ select {
+ case <-ticker.C:
+ if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
+ c.handleError(&Error{Ctx: "commit", error: err})
+ return
+ }
+ case <-stopped:
+ return
+ case <-c.dying:
+ return
+ }
+ }
+}
+
+func (c *Consumer) rebalanceError(err error, n *Notification) {
+ if n != nil {
+ n.Type = RebalanceError
+ c.handleNotification(n)
+ }
+
+ switch err {
+ case sarama.ErrRebalanceInProgress:
+ default:
+ c.handleError(&Error{Ctx: "rebalance", error: err})
+ }
+
+ select {
+ case <-c.dying:
+ case <-time.After(c.client.config.Metadata.Retry.Backoff):
+ }
+}
+
+func (c *Consumer) handleNotification(n *Notification) {
+ if c.client.config.Group.Return.Notifications {
+ select {
+ case c.notifications <- n:
+ case <-c.dying:
+ return
+ }
+ }
+}
+
+func (c *Consumer) handleError(e *Error) {
+ if c.client.config.Consumer.Return.Errors {
+ select {
+ case c.errors <- e:
+ case <-c.dying:
+ return
+ }
+ } else {
+ sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
+ }
+}
+
+// Releases the consumer and commits offsets, called from rebalance() and Close()
+func (c *Consumer) release() (err error) {
+ // Stop all consumers
+ c.subs.Stop()
+
+ // Clear subscriptions on exit
+ defer c.subs.Clear()
+
+ // Wait for messages to be processed
+ timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
+ defer timeout.Stop()
+
+ select {
+ case <-c.dying:
+ case <-timeout.C:
+ }
+
+ // Commit offsets, continue on errors
+ if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
+ err = e
+ }
+
+ return
+}
+
+// --------------------------------------------------------------------
+
+// Performs a heartbeat, part of the mainLoop()
+func (c *Consumer) heartbeat() error {
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+
+ memberID, generationID := c.membership()
+ resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ GenerationId: generationID,
+ })
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+ return resp.Err
+}
+
+// Performs a rebalance, part of the mainLoop()
+func (c *Consumer) rebalance() (map[string][]int32, error) {
+ memberID, _ := c.membership()
+ sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
+
+ allTopics, err := c.client.Topics()
+ if err != nil {
+ return nil, err
+ }
+ c.extraTopics = c.selectExtraTopics(allTopics)
+ sort.Strings(c.extraTopics)
+
+ // Re-join consumer group
+ strategy, err := c.joinGroup()
+ switch {
+ case err == sarama.ErrUnknownMemberId:
+ c.membershipMu.Lock()
+ c.memberID = ""
+ c.membershipMu.Unlock()
+ return nil, err
+ case err != nil:
+ return nil, err
+ }
+
+ // Sync consumer group state, fetch subscriptions
+ subs, err := c.syncGroup(strategy)
+ switch {
+ case err == sarama.ErrRebalanceInProgress:
+ return nil, err
+ case err != nil:
+ _ = c.leaveGroup()
+ return nil, err
+ }
+ return subs, nil
+}
+
+// Performs the subscription, part of the mainLoop()
+func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
+ // fetch offsets
+ offsets, err := c.fetchOffsets(subs)
+ if err != nil {
+ _ = c.leaveGroup()
+ return err
+ }
+
+ // create consumers in parallel
+ var mu sync.Mutex
+ var wg sync.WaitGroup
+
+ for topic, partitions := range subs {
+ for _, partition := range partitions {
+ wg.Add(1)
+
+ info := offsets[topic][partition]
+ go func(topic string, partition int32) {
+ if e := c.createConsumer(tomb, topic, partition, info); e != nil {
+ mu.Lock()
+ err = e
+ mu.Unlock()
+ }
+ wg.Done()
+ }(topic, partition)
+ }
+ }
+ wg.Wait()
+
+ if err != nil {
+ _ = c.release()
+ _ = c.leaveGroup()
+ }
+ return err
+}
+
+// --------------------------------------------------------------------
+
+// Send a request to the broker to join group on rebalance()
+func (c *Consumer) joinGroup() (*balancer, error) {
+ memberID, _ := c.membership()
+ req := &sarama.JoinGroupRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
+ ProtocolType: "consumer",
+ }
+
+ meta := &sarama.ConsumerGroupMemberMetadata{
+ Version: 1,
+ Topics: append(c.coreTopics, c.extraTopics...),
+ UserData: c.client.config.Group.Member.UserData,
+ }
+ err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
+ if err != nil {
+ return nil, err
+ }
+ err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
+ if err != nil {
+ return nil, err
+ }
+
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+
+ resp, err := broker.JoinGroup(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ } else if resp.Err != sarama.ErrNoError {
+ c.closeCoordinator(broker, resp.Err)
+ return nil, resp.Err
+ }
+
+ var strategy *balancer
+ if resp.LeaderId == resp.MemberId {
+ members, err := resp.GetMembers()
+ if err != nil {
+ return nil, err
+ }
+
+ strategy, err = newBalancerFromMeta(c.client, members)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ c.membershipMu.Lock()
+ c.memberID = resp.MemberId
+ c.generationID = resp.GenerationId
+ c.membershipMu.Unlock()
+
+ return strategy, nil
+}
+
+// Send a request to the broker to sync the group on rebalance().
+// Returns a list of topics and partitions to consume.
+func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
+ memberID, generationID := c.membership()
+ req := &sarama.SyncGroupRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ GenerationId: generationID,
+ }
+
+ if strategy != nil {
+ for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
+ if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
+ Topics: topics,
+ }); err != nil {
+ return nil, err
+ }
+ }
+ }
+
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+
+ resp, err := broker.SyncGroup(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ } else if resp.Err != sarama.ErrNoError {
+ c.closeCoordinator(broker, resp.Err)
+ return nil, resp.Err
+ }
+
+ // Return if there is nothing to subscribe to
+ if len(resp.MemberAssignment) == 0 {
+ return nil, nil
+ }
+
+ // Get assigned subscriptions
+ members, err := resp.GetMemberAssignment()
+ if err != nil {
+ return nil, err
+ }
+
+ // Sort partitions, for each topic
+ for topic := range members.Topics {
+ sort.Sort(int32Slice(members.Topics[topic]))
+ }
+ return members.Topics, nil
+}
+
+// Fetches latest committed offsets for all subscriptions
+func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
+ offsets := make(map[string]map[int32]offsetInfo, len(subs))
+ req := &sarama.OffsetFetchRequest{
+ Version: 1,
+ ConsumerGroup: c.groupID,
+ }
+
+ for topic, partitions := range subs {
+ offsets[topic] = make(map[int32]offsetInfo, len(partitions))
+ for _, partition := range partitions {
+ offsets[topic][partition] = offsetInfo{Offset: -1}
+ req.AddPartition(topic, partition)
+ }
+ }
+
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+
+ resp, err := broker.FetchOffset(req)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return nil, err
+ }
+
+ for topic, partitions := range subs {
+ for _, partition := range partitions {
+ block := resp.GetBlock(topic, partition)
+ if block == nil {
+ return nil, sarama.ErrIncompleteResponse
+ }
+
+ if block.Err == sarama.ErrNoError {
+ offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
+ } else {
+ return nil, block.Err
+ }
+ }
+ }
+ return offsets, nil
+}
+
+// Send a request to the broker to leave the group on failes rebalance() and on Close()
+func (c *Consumer) leaveGroup() error {
+ broker, err := c.client.Coordinator(c.groupID)
+ if err != nil {
+ c.closeCoordinator(broker, err)
+ return err
+ }
+
+ memberID, _ := c.membership()
+ if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
+ GroupId: c.groupID,
+ MemberId: memberID,
+ }); err != nil {
+ c.closeCoordinator(broker, err)
+ }
+ return err
+}
+
+// --------------------------------------------------------------------
+
+func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
+ memberID, _ := c.membership()
+ sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
+
+ // Create partitionConsumer
+ pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
+ if err != nil {
+ return err
+ }
+
+ // Store in subscriptions
+ c.subs.Store(topic, partition, pc)
+
+ // Start partition consumer goroutine
+ tomb.Go(func(stopper <-chan none) {
+ if c.client.config.Group.Mode == ConsumerModePartitions {
+ pc.waitFor(stopper, c.errors)
+ } else {
+ pc.multiplex(stopper, c.messages, c.errors)
+ }
+ })
+
+ if c.client.config.Group.Mode == ConsumerModePartitions {
+ c.partitions <- pc
+ }
+ return nil
+}
+
+func (c *Consumer) commitOffsetsWithRetry(retries int) error {
+ err := c.CommitOffsets()
+ if err != nil && retries > 0 {
+ return c.commitOffsetsWithRetry(retries - 1)
+ }
+ return err
+}
+
+func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
+ if broker != nil {
+ _ = broker.Close()
+ }
+
+ switch err {
+ case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
+ _ = c.client.RefreshCoordinator(c.groupID)
+ }
+}
+
+func (c *Consumer) selectExtraTopics(allTopics []string) []string {
+ extra := allTopics[:0]
+ for _, topic := range allTopics {
+ if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
+ extra = append(extra, topic)
+ }
+ }
+ return extra
+}
+
+func (c *Consumer) isKnownCoreTopic(topic string) bool {
+ pos := sort.SearchStrings(c.coreTopics, topic)
+ return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
+}
+
+func (c *Consumer) isKnownExtraTopic(topic string) bool {
+ pos := sort.SearchStrings(c.extraTopics, topic)
+ return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
+}
+
+func (c *Consumer) isPotentialExtraTopic(topic string) bool {
+ rx := c.client.config.Group.Topics
+ if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
+ return false
+ }
+ if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
+ return true
+ }
+ return false
+}
+
+func (c *Consumer) refreshCoordinator() error {
+ if err := c.refreshMetadata(); err != nil {
+ return err
+ }
+ return c.client.RefreshCoordinator(c.groupID)
+}
+
+func (c *Consumer) refreshMetadata() (err error) {
+ if c.client.config.Metadata.Full {
+ err = c.client.RefreshMetadata()
+ } else {
+ var topics []string
+ if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
+ err = c.client.RefreshMetadata(topics...)
+ }
+ }
+
+ // maybe we didn't have authorization to describe all topics
+ switch err {
+ case sarama.ErrTopicAuthorizationFailed:
+ err = c.client.RefreshMetadata(c.coreTopics...)
+ }
+ return
+}
+
+func (c *Consumer) membership() (memberID string, generationID int32) {
+ c.membershipMu.RLock()
+ memberID, generationID = c.memberID, c.generationID
+ c.membershipMu.RUnlock()
+ return
+}