[VOL-1349] EPON ONU adapter (package B)

Change-Id: I609ba349c429bc7e87c74b66bb1121841f9caef6
diff --git a/vendor/github.com/bsm/sarama-cluster/consumer.go b/vendor/github.com/bsm/sarama-cluster/consumer.go
new file mode 100644
index 0000000..e7a67da
--- /dev/null
+++ b/vendor/github.com/bsm/sarama-cluster/consumer.go
@@ -0,0 +1,919 @@
+package cluster
+
+import (
+	"sort"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/Shopify/sarama"
+)
+
+// Consumer is a cluster group consumer
+type Consumer struct {
+	client    *Client
+	ownClient bool
+
+	consumer sarama.Consumer
+	subs     *partitionMap
+
+	consumerID string
+	groupID    string
+
+	memberID     string
+	generationID int32
+	membershipMu sync.RWMutex
+
+	coreTopics  []string
+	extraTopics []string
+
+	dying, dead chan none
+	closeOnce   sync.Once
+
+	consuming     int32
+	messages      chan *sarama.ConsumerMessage
+	errors        chan error
+	partitions    chan PartitionConsumer
+	notifications chan *Notification
+
+	commitMu sync.Mutex
+}
+
+// NewConsumer initializes a new consumer
+func NewConsumer(addrs []string, groupID string, topics []string, config *Config) (*Consumer, error) {
+	client, err := NewClient(addrs, config)
+	if err != nil {
+		return nil, err
+	}
+
+	consumer, err := NewConsumerFromClient(client, groupID, topics)
+	if err != nil {
+		return nil, err
+	}
+	consumer.ownClient = true
+	return consumer, nil
+}
+
+// NewConsumerFromClient initializes a new consumer from an existing client.
+//
+// Please note that clients cannot be shared between consumers (due to Kafka internals),
+// they can only be re-used which requires the user to call Close() on the first consumer
+// before using this method again to initialize another one. Attempts to use a client with
+// more than one consumer at a time will return errors.
+func NewConsumerFromClient(client *Client, groupID string, topics []string) (*Consumer, error) {
+	if !client.claim() {
+		return nil, errClientInUse
+	}
+
+	consumer, err := sarama.NewConsumerFromClient(client.Client)
+	if err != nil {
+		client.release()
+		return nil, err
+	}
+
+	sort.Strings(topics)
+	c := &Consumer{
+		client:   client,
+		consumer: consumer,
+		subs:     newPartitionMap(),
+		groupID:  groupID,
+
+		coreTopics: topics,
+
+		dying: make(chan none),
+		dead:  make(chan none),
+
+		messages:      make(chan *sarama.ConsumerMessage),
+		errors:        make(chan error, client.config.ChannelBufferSize),
+		partitions:    make(chan PartitionConsumer, 1),
+		notifications: make(chan *Notification),
+	}
+	if err := c.client.RefreshCoordinator(groupID); err != nil {
+		client.release()
+		return nil, err
+	}
+
+	go c.mainLoop()
+	return c, nil
+}
+
+// Messages returns the read channel for the messages that are returned by
+// the broker.
+//
+// This channel will only return if Config.Group.Mode option is set to
+// ConsumerModeMultiplex (default).
+func (c *Consumer) Messages() <-chan *sarama.ConsumerMessage { return c.messages }
+
+// Partitions returns the read channels for individual partitions of this broker.
+//
+// This will channel will only return if Config.Group.Mode option is set to
+// ConsumerModePartitions.
+//
+// The Partitions() channel must be listened to for the life of this consumer;
+// when a rebalance happens old partitions will be closed (naturally come to
+// completion) and new ones will be emitted. The returned channel will only close
+// when the consumer is completely shut down.
+func (c *Consumer) Partitions() <-chan PartitionConsumer { return c.partitions }
+
+// Errors returns a read channel of errors that occur during offset management, if
+// enabled. By default, errors are logged and not returned over this channel. If
+// you want to implement any custom error handling, set your config's
+// Consumer.Return.Errors setting to true, and read from this channel.
+func (c *Consumer) Errors() <-chan error { return c.errors }
+
+// Notifications returns a channel of Notifications that occur during consumer
+// rebalancing. Notifications will only be emitted over this channel, if your config's
+// Group.Return.Notifications setting to true.
+func (c *Consumer) Notifications() <-chan *Notification { return c.notifications }
+
+// HighWaterMarks returns the current high water marks for each topic and partition
+// Consistency between partitions is not guaranteed since high water marks are updated separately.
+func (c *Consumer) HighWaterMarks() map[string]map[int32]int64 { return c.consumer.HighWaterMarks() }
+
+// MarkOffset marks the provided message as processed, alongside a metadata string
+// that represents the state of the partition consumer at that point in time. The
+// metadata string can be used by another consumer to restore that state, so it
+// can resume consumption.
+//
+// Note: calling MarkOffset does not necessarily commit the offset to the backend
+// store immediately for efficiency reasons, and it may never be committed if
+// your application crashes. This means that you may end up processing the same
+// message twice, and your processing should ideally be idempotent.
+func (c *Consumer) MarkOffset(msg *sarama.ConsumerMessage, metadata string) {
+	if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
+		sub.MarkOffset(msg.Offset, metadata)
+	}
+}
+
+// MarkPartitionOffset marks an offset of the provided topic/partition as processed.
+// See MarkOffset for additional explanation.
+func (c *Consumer) MarkPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+	if sub := c.subs.Fetch(topic, partition); sub != nil {
+		sub.MarkOffset(offset, metadata)
+	}
+}
+
+// MarkOffsets marks stashed offsets as processed.
+// See MarkOffset for additional explanation.
+func (c *Consumer) MarkOffsets(s *OffsetStash) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	for tp, info := range s.offsets {
+		if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
+			sub.MarkOffset(info.Offset, info.Metadata)
+		}
+		delete(s.offsets, tp)
+	}
+}
+
+// ResetOffsets marks the provided message as processed, alongside a metadata string
+// that represents the state of the partition consumer at that point in time. The
+// metadata string can be used by another consumer to restore that state, so it
+// can resume consumption.
+//
+// Difference between ResetOffset and MarkOffset is that it allows to rewind to an earlier offset
+func (c *Consumer) ResetOffset(msg *sarama.ConsumerMessage, metadata string) {
+	if sub := c.subs.Fetch(msg.Topic, msg.Partition); sub != nil {
+		sub.ResetOffset(msg.Offset, metadata)
+	}
+}
+
+// ResetPartitionOffset marks an offset of the provided topic/partition as processed.
+// See ResetOffset for additional explanation.
+func (c *Consumer) ResetPartitionOffset(topic string, partition int32, offset int64, metadata string) {
+	sub := c.subs.Fetch(topic, partition)
+	if sub != nil {
+		sub.ResetOffset(offset, metadata)
+	}
+}
+
+// ResetOffsets marks stashed offsets as processed.
+// See ResetOffset for additional explanation.
+func (c *Consumer) ResetOffsets(s *OffsetStash) {
+	s.mu.Lock()
+	defer s.mu.Unlock()
+
+	for tp, info := range s.offsets {
+		if sub := c.subs.Fetch(tp.Topic, tp.Partition); sub != nil {
+			sub.ResetOffset(info.Offset, info.Metadata)
+		}
+		delete(s.offsets, tp)
+	}
+}
+
+// Subscriptions returns the consumed topics and partitions
+func (c *Consumer) Subscriptions() map[string][]int32 {
+	return c.subs.Info()
+}
+
+// CommitOffsets allows to manually commit previously marked offsets. By default there is no
+// need to call this function as the consumer will commit offsets automatically
+// using the Config.Consumer.Offsets.CommitInterval setting.
+//
+// Please be aware that calling this function during an internal rebalance cycle may return
+// broker errors (e.g. sarama.ErrUnknownMemberId or sarama.ErrIllegalGeneration).
+func (c *Consumer) CommitOffsets() error {
+	c.commitMu.Lock()
+	defer c.commitMu.Unlock()
+
+	memberID, generationID := c.membership()
+	req := &sarama.OffsetCommitRequest{
+		Version:                 2,
+		ConsumerGroup:           c.groupID,
+		ConsumerGroupGeneration: generationID,
+		ConsumerID:              memberID,
+		RetentionTime:           -1,
+	}
+
+	if ns := c.client.config.Consumer.Offsets.Retention; ns != 0 {
+		req.RetentionTime = int64(ns / time.Millisecond)
+	}
+
+	snap := c.subs.Snapshot()
+	dirty := false
+	for tp, state := range snap {
+		if state.Dirty {
+			dirty = true
+			req.AddBlock(tp.Topic, tp.Partition, state.Info.Offset, 0, state.Info.Metadata)
+		}
+	}
+	if !dirty {
+		return nil
+	}
+
+	broker, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return err
+	}
+
+	resp, err := broker.CommitOffset(req)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return err
+	}
+
+	for topic, errs := range resp.Errors {
+		for partition, kerr := range errs {
+			if kerr != sarama.ErrNoError {
+				err = kerr
+			} else if state, ok := snap[topicPartition{topic, partition}]; ok {
+				if sub := c.subs.Fetch(topic, partition); sub != nil {
+					sub.markCommitted(state.Info.Offset)
+				}
+			}
+		}
+	}
+	return err
+}
+
+// Close safely closes the consumer and releases all resources
+func (c *Consumer) Close() (err error) {
+	c.closeOnce.Do(func() {
+		close(c.dying)
+		<-c.dead
+
+		if e := c.release(); e != nil {
+			err = e
+		}
+		if e := c.consumer.Close(); e != nil {
+			err = e
+		}
+		close(c.messages)
+		close(c.errors)
+
+		if e := c.leaveGroup(); e != nil {
+			err = e
+		}
+		close(c.partitions)
+		close(c.notifications)
+
+		// drain
+		for range c.messages {
+		}
+		for range c.errors {
+		}
+		for p := range c.partitions {
+			_ = p.Close()
+		}
+		for range c.notifications {
+		}
+
+		c.client.release()
+		if c.ownClient {
+			if e := c.client.Close(); e != nil {
+				err = e
+			}
+		}
+	})
+	return
+}
+
+func (c *Consumer) mainLoop() {
+	defer close(c.dead)
+	defer atomic.StoreInt32(&c.consuming, 0)
+
+	for {
+		atomic.StoreInt32(&c.consuming, 0)
+
+		// Check if close was requested
+		select {
+		case <-c.dying:
+			return
+		default:
+		}
+
+		// Start next consume cycle
+		c.nextTick()
+	}
+}
+
+func (c *Consumer) nextTick() {
+	// Remember previous subscriptions
+	var notification *Notification
+	if c.client.config.Group.Return.Notifications {
+		notification = newNotification(c.subs.Info())
+	}
+
+	// Refresh coordinator
+	if err := c.refreshCoordinator(); err != nil {
+		c.rebalanceError(err, nil)
+		return
+	}
+
+	// Release subscriptions
+	if err := c.release(); err != nil {
+		c.rebalanceError(err, nil)
+		return
+	}
+
+	// Issue rebalance start notification
+	if c.client.config.Group.Return.Notifications {
+		c.handleNotification(notification)
+	}
+
+	// Rebalance, fetch new subscriptions
+	subs, err := c.rebalance()
+	if err != nil {
+		c.rebalanceError(err, notification)
+		return
+	}
+
+	// Coordinate loops, make sure everything is
+	// stopped on exit
+	tomb := newLoopTomb()
+	defer tomb.Close()
+
+	// Start the heartbeat
+	tomb.Go(c.hbLoop)
+
+	// Subscribe to topic/partitions
+	if err := c.subscribe(tomb, subs); err != nil {
+		c.rebalanceError(err, notification)
+		return
+	}
+
+	// Update/issue notification with new claims
+	if c.client.config.Group.Return.Notifications {
+		notification = notification.success(subs)
+		c.handleNotification(notification)
+	}
+
+	// Start topic watcher loop
+	tomb.Go(c.twLoop)
+
+	// Start consuming and committing offsets
+	tomb.Go(c.cmLoop)
+	atomic.StoreInt32(&c.consuming, 1)
+
+	// Wait for signals
+	select {
+	case <-tomb.Dying():
+	case <-c.dying:
+	}
+}
+
+// heartbeat loop, triggered by the mainLoop
+func (c *Consumer) hbLoop(stopped <-chan none) {
+	ticker := time.NewTicker(c.client.config.Group.Heartbeat.Interval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			switch err := c.heartbeat(); err {
+			case nil, sarama.ErrNoError:
+			case sarama.ErrNotCoordinatorForConsumer, sarama.ErrRebalanceInProgress:
+				return
+			default:
+				c.handleError(&Error{Ctx: "heartbeat", error: err})
+				return
+			}
+		case <-stopped:
+			return
+		case <-c.dying:
+			return
+		}
+	}
+}
+
+// topic watcher loop, triggered by the mainLoop
+func (c *Consumer) twLoop(stopped <-chan none) {
+	ticker := time.NewTicker(c.client.config.Metadata.RefreshFrequency / 2)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			topics, err := c.client.Topics()
+			if err != nil {
+				c.handleError(&Error{Ctx: "topics", error: err})
+				return
+			}
+
+			for _, topic := range topics {
+				if !c.isKnownCoreTopic(topic) &&
+					!c.isKnownExtraTopic(topic) &&
+					c.isPotentialExtraTopic(topic) {
+					return
+				}
+			}
+		case <-stopped:
+			return
+		case <-c.dying:
+			return
+		}
+	}
+}
+
+// commit loop, triggered by the mainLoop
+func (c *Consumer) cmLoop(stopped <-chan none) {
+	ticker := time.NewTicker(c.client.config.Consumer.Offsets.CommitInterval)
+	defer ticker.Stop()
+
+	for {
+		select {
+		case <-ticker.C:
+			if err := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); err != nil {
+				c.handleError(&Error{Ctx: "commit", error: err})
+				return
+			}
+		case <-stopped:
+			return
+		case <-c.dying:
+			return
+		}
+	}
+}
+
+func (c *Consumer) rebalanceError(err error, n *Notification) {
+	if n != nil {
+		n.Type = RebalanceError
+		c.handleNotification(n)
+	}
+
+	switch err {
+	case sarama.ErrRebalanceInProgress:
+	default:
+		c.handleError(&Error{Ctx: "rebalance", error: err})
+	}
+
+	select {
+	case <-c.dying:
+	case <-time.After(c.client.config.Metadata.Retry.Backoff):
+	}
+}
+
+func (c *Consumer) handleNotification(n *Notification) {
+	if c.client.config.Group.Return.Notifications {
+		select {
+		case c.notifications <- n:
+		case <-c.dying:
+			return
+		}
+	}
+}
+
+func (c *Consumer) handleError(e *Error) {
+	if c.client.config.Consumer.Return.Errors {
+		select {
+		case c.errors <- e:
+		case <-c.dying:
+			return
+		}
+	} else {
+		sarama.Logger.Printf("%s error: %s\n", e.Ctx, e.Error())
+	}
+}
+
+// Releases the consumer and commits offsets, called from rebalance() and Close()
+func (c *Consumer) release() (err error) {
+	// Stop all consumers
+	c.subs.Stop()
+
+	// Clear subscriptions on exit
+	defer c.subs.Clear()
+
+	// Wait for messages to be processed
+	timeout := time.NewTimer(c.client.config.Group.Offsets.Synchronization.DwellTime)
+	defer timeout.Stop()
+
+	select {
+	case <-c.dying:
+	case <-timeout.C:
+	}
+
+	// Commit offsets, continue on errors
+	if e := c.commitOffsetsWithRetry(c.client.config.Group.Offsets.Retry.Max); e != nil {
+		err = e
+	}
+
+	return
+}
+
+// --------------------------------------------------------------------
+
+// Performs a heartbeat, part of the mainLoop()
+func (c *Consumer) heartbeat() error {
+	broker, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return err
+	}
+
+	memberID, generationID := c.membership()
+	resp, err := broker.Heartbeat(&sarama.HeartbeatRequest{
+		GroupId:      c.groupID,
+		MemberId:     memberID,
+		GenerationId: generationID,
+	})
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return err
+	}
+	return resp.Err
+}
+
+// Performs a rebalance, part of the mainLoop()
+func (c *Consumer) rebalance() (map[string][]int32, error) {
+	memberID, _ := c.membership()
+	sarama.Logger.Printf("cluster/consumer %s rebalance\n", memberID)
+
+	allTopics, err := c.client.Topics()
+	if err != nil {
+		return nil, err
+	}
+	c.extraTopics = c.selectExtraTopics(allTopics)
+	sort.Strings(c.extraTopics)
+
+	// Re-join consumer group
+	strategy, err := c.joinGroup()
+	switch {
+	case err == sarama.ErrUnknownMemberId:
+		c.membershipMu.Lock()
+		c.memberID = ""
+		c.membershipMu.Unlock()
+		return nil, err
+	case err != nil:
+		return nil, err
+	}
+
+	// Sync consumer group state, fetch subscriptions
+	subs, err := c.syncGroup(strategy)
+	switch {
+	case err == sarama.ErrRebalanceInProgress:
+		return nil, err
+	case err != nil:
+		_ = c.leaveGroup()
+		return nil, err
+	}
+	return subs, nil
+}
+
+// Performs the subscription, part of the mainLoop()
+func (c *Consumer) subscribe(tomb *loopTomb, subs map[string][]int32) error {
+	// fetch offsets
+	offsets, err := c.fetchOffsets(subs)
+	if err != nil {
+		_ = c.leaveGroup()
+		return err
+	}
+
+	// create consumers in parallel
+	var mu sync.Mutex
+	var wg sync.WaitGroup
+
+	for topic, partitions := range subs {
+		for _, partition := range partitions {
+			wg.Add(1)
+
+			info := offsets[topic][partition]
+			go func(topic string, partition int32) {
+				if e := c.createConsumer(tomb, topic, partition, info); e != nil {
+					mu.Lock()
+					err = e
+					mu.Unlock()
+				}
+				wg.Done()
+			}(topic, partition)
+		}
+	}
+	wg.Wait()
+
+	if err != nil {
+		_ = c.release()
+		_ = c.leaveGroup()
+	}
+	return err
+}
+
+// --------------------------------------------------------------------
+
+// Send a request to the broker to join group on rebalance()
+func (c *Consumer) joinGroup() (*balancer, error) {
+	memberID, _ := c.membership()
+	req := &sarama.JoinGroupRequest{
+		GroupId:        c.groupID,
+		MemberId:       memberID,
+		SessionTimeout: int32(c.client.config.Group.Session.Timeout / time.Millisecond),
+		ProtocolType:   "consumer",
+	}
+
+	meta := &sarama.ConsumerGroupMemberMetadata{
+		Version:  1,
+		Topics:   append(c.coreTopics, c.extraTopics...),
+		UserData: c.client.config.Group.Member.UserData,
+	}
+	err := req.AddGroupProtocolMetadata(string(StrategyRange), meta)
+	if err != nil {
+		return nil, err
+	}
+	err = req.AddGroupProtocolMetadata(string(StrategyRoundRobin), meta)
+	if err != nil {
+		return nil, err
+	}
+
+	broker, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return nil, err
+	}
+
+	resp, err := broker.JoinGroup(req)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return nil, err
+	} else if resp.Err != sarama.ErrNoError {
+		c.closeCoordinator(broker, resp.Err)
+		return nil, resp.Err
+	}
+
+	var strategy *balancer
+	if resp.LeaderId == resp.MemberId {
+		members, err := resp.GetMembers()
+		if err != nil {
+			return nil, err
+		}
+
+		strategy, err = newBalancerFromMeta(c.client, members)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	c.membershipMu.Lock()
+	c.memberID = resp.MemberId
+	c.generationID = resp.GenerationId
+	c.membershipMu.Unlock()
+
+	return strategy, nil
+}
+
+// Send a request to the broker to sync the group on rebalance().
+// Returns a list of topics and partitions to consume.
+func (c *Consumer) syncGroup(strategy *balancer) (map[string][]int32, error) {
+	memberID, generationID := c.membership()
+	req := &sarama.SyncGroupRequest{
+		GroupId:      c.groupID,
+		MemberId:     memberID,
+		GenerationId: generationID,
+	}
+
+	if strategy != nil {
+		for memberID, topics := range strategy.Perform(c.client.config.Group.PartitionStrategy) {
+			if err := req.AddGroupAssignmentMember(memberID, &sarama.ConsumerGroupMemberAssignment{
+				Topics: topics,
+			}); err != nil {
+				return nil, err
+			}
+		}
+	}
+
+	broker, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return nil, err
+	}
+
+	resp, err := broker.SyncGroup(req)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return nil, err
+	} else if resp.Err != sarama.ErrNoError {
+		c.closeCoordinator(broker, resp.Err)
+		return nil, resp.Err
+	}
+
+	// Return if there is nothing to subscribe to
+	if len(resp.MemberAssignment) == 0 {
+		return nil, nil
+	}
+
+	// Get assigned subscriptions
+	members, err := resp.GetMemberAssignment()
+	if err != nil {
+		return nil, err
+	}
+
+	// Sort partitions, for each topic
+	for topic := range members.Topics {
+		sort.Sort(int32Slice(members.Topics[topic]))
+	}
+	return members.Topics, nil
+}
+
+// Fetches latest committed offsets for all subscriptions
+func (c *Consumer) fetchOffsets(subs map[string][]int32) (map[string]map[int32]offsetInfo, error) {
+	offsets := make(map[string]map[int32]offsetInfo, len(subs))
+	req := &sarama.OffsetFetchRequest{
+		Version:       1,
+		ConsumerGroup: c.groupID,
+	}
+
+	for topic, partitions := range subs {
+		offsets[topic] = make(map[int32]offsetInfo, len(partitions))
+		for _, partition := range partitions {
+			offsets[topic][partition] = offsetInfo{Offset: -1}
+			req.AddPartition(topic, partition)
+		}
+	}
+
+	broker, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return nil, err
+	}
+
+	resp, err := broker.FetchOffset(req)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return nil, err
+	}
+
+	for topic, partitions := range subs {
+		for _, partition := range partitions {
+			block := resp.GetBlock(topic, partition)
+			if block == nil {
+				return nil, sarama.ErrIncompleteResponse
+			}
+
+			if block.Err == sarama.ErrNoError {
+				offsets[topic][partition] = offsetInfo{Offset: block.Offset, Metadata: block.Metadata}
+			} else {
+				return nil, block.Err
+			}
+		}
+	}
+	return offsets, nil
+}
+
+// Send a request to the broker to leave the group on failes rebalance() and on Close()
+func (c *Consumer) leaveGroup() error {
+	broker, err := c.client.Coordinator(c.groupID)
+	if err != nil {
+		c.closeCoordinator(broker, err)
+		return err
+	}
+
+	memberID, _ := c.membership()
+	if _, err = broker.LeaveGroup(&sarama.LeaveGroupRequest{
+		GroupId:  c.groupID,
+		MemberId: memberID,
+	}); err != nil {
+		c.closeCoordinator(broker, err)
+	}
+	return err
+}
+
+// --------------------------------------------------------------------
+
+func (c *Consumer) createConsumer(tomb *loopTomb, topic string, partition int32, info offsetInfo) error {
+	memberID, _ := c.membership()
+	sarama.Logger.Printf("cluster/consumer %s consume %s/%d from %d\n", memberID, topic, partition, info.NextOffset(c.client.config.Consumer.Offsets.Initial))
+
+	// Create partitionConsumer
+	pc, err := newPartitionConsumer(c.consumer, topic, partition, info, c.client.config.Consumer.Offsets.Initial)
+	if err != nil {
+		return err
+	}
+
+	// Store in subscriptions
+	c.subs.Store(topic, partition, pc)
+
+	// Start partition consumer goroutine
+	tomb.Go(func(stopper <-chan none) {
+		if c.client.config.Group.Mode == ConsumerModePartitions {
+			pc.waitFor(stopper, c.errors)
+		} else {
+			pc.multiplex(stopper, c.messages, c.errors)
+		}
+	})
+
+	if c.client.config.Group.Mode == ConsumerModePartitions {
+		c.partitions <- pc
+	}
+	return nil
+}
+
+func (c *Consumer) commitOffsetsWithRetry(retries int) error {
+	err := c.CommitOffsets()
+	if err != nil && retries > 0 {
+		return c.commitOffsetsWithRetry(retries - 1)
+	}
+	return err
+}
+
+func (c *Consumer) closeCoordinator(broker *sarama.Broker, err error) {
+	if broker != nil {
+		_ = broker.Close()
+	}
+
+	switch err {
+	case sarama.ErrConsumerCoordinatorNotAvailable, sarama.ErrNotCoordinatorForConsumer:
+		_ = c.client.RefreshCoordinator(c.groupID)
+	}
+}
+
+func (c *Consumer) selectExtraTopics(allTopics []string) []string {
+	extra := allTopics[:0]
+	for _, topic := range allTopics {
+		if !c.isKnownCoreTopic(topic) && c.isPotentialExtraTopic(topic) {
+			extra = append(extra, topic)
+		}
+	}
+	return extra
+}
+
+func (c *Consumer) isKnownCoreTopic(topic string) bool {
+	pos := sort.SearchStrings(c.coreTopics, topic)
+	return pos < len(c.coreTopics) && c.coreTopics[pos] == topic
+}
+
+func (c *Consumer) isKnownExtraTopic(topic string) bool {
+	pos := sort.SearchStrings(c.extraTopics, topic)
+	return pos < len(c.extraTopics) && c.extraTopics[pos] == topic
+}
+
+func (c *Consumer) isPotentialExtraTopic(topic string) bool {
+	rx := c.client.config.Group.Topics
+	if rx.Blacklist != nil && rx.Blacklist.MatchString(topic) {
+		return false
+	}
+	if rx.Whitelist != nil && rx.Whitelist.MatchString(topic) {
+		return true
+	}
+	return false
+}
+
+func (c *Consumer) refreshCoordinator() error {
+	if err := c.refreshMetadata(); err != nil {
+		return err
+	}
+	return c.client.RefreshCoordinator(c.groupID)
+}
+
+func (c *Consumer) refreshMetadata() (err error) {
+	if c.client.config.Metadata.Full {
+		err = c.client.RefreshMetadata()
+	} else {
+		var topics []string
+		if topics, err = c.client.Topics(); err == nil && len(topics) != 0 {
+			err = c.client.RefreshMetadata(topics...)
+		}
+	}
+
+	// maybe we didn't have authorization to describe all topics
+	switch err {
+	case sarama.ErrTopicAuthorizationFailed:
+		err = c.client.RefreshMetadata(c.coreTopics...)
+	}
+	return
+}
+
+func (c *Consumer) membership() (memberID string, generationID int32) {
+	c.membershipMu.RLock()
+	memberID, generationID = c.memberID, c.generationID
+	c.membershipMu.RUnlock()
+	return
+}