WIP [VOL-2811] - Incorporate preliminary onu-adapter-go code into opencord repo

- reason "discovery-mibsync-complete" reached (via full MibUpload only, received data won't be stored yet)
- first review comments of patchset #4 considered
 (please have a look into our inline-comments in Gerrit to know more about the current state)
- no refactoring done yet

Change-Id: Iac47817f8ce4bd28dd8132f530b0570d57ae99b8
Signed-off-by: Holger Hildebrandt <holger.hildebrandt@adtran.com>
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
new file mode 100644
index 0000000..72c4d7c
--- /dev/null
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -0,0 +1,896 @@
+package sarama
+
+import (
+	"errors"
+	"fmt"
+	"math"
+	"sync"
+	"sync/atomic"
+	"time"
+
+	"github.com/rcrowley/go-metrics"
+)
+
+// ConsumerMessage encapsulates a Kafka message returned by the consumer.
+type ConsumerMessage struct {
+	Headers        []*RecordHeader // only set if kafka is version 0.11+
+	Timestamp      time.Time       // only set if kafka is version 0.10+, inner message timestamp
+	BlockTimestamp time.Time       // only set if kafka is version 0.10+, outer (compressed) block timestamp
+
+	Key, Value []byte
+	Topic      string
+	Partition  int32
+	Offset     int64
+}
+
+// ConsumerError is what is provided to the user when an error occurs.
+// It wraps an error and includes the topic and partition.
+type ConsumerError struct {
+	Topic     string
+	Partition int32
+	Err       error
+}
+
+func (ce ConsumerError) Error() string {
+	return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
+}
+
+// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
+// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
+// when stopping.
+type ConsumerErrors []*ConsumerError
+
+func (ce ConsumerErrors) Error() string {
+	return fmt.Sprintf("kafka: %d errors while consuming", len(ce))
+}
+
+// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
+// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
+// scope.
+type Consumer interface {
+	// Topics returns the set of available topics as retrieved from the cluster
+	// metadata. This method is the same as Client.Topics(), and is provided for
+	// convenience.
+	Topics() ([]string, error)
+
+	// Partitions returns the sorted list of all partition IDs for the given topic.
+	// This method is the same as Client.Partitions(), and is provided for convenience.
+	Partitions(topic string) ([]int32, error)
+
+	// ConsumePartition creates a PartitionConsumer on the given topic/partition with
+	// the given offset. It will return an error if this Consumer is already consuming
+	// on the given topic/partition. Offset can be a literal offset, or OffsetNewest
+	// or OffsetOldest
+	ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error)
+
+	// HighWaterMarks returns the current high water marks for each topic and partition.
+	// Consistency between partitions is not guaranteed since high water marks are updated separately.
+	HighWaterMarks() map[string]map[int32]int64
+
+	// Close shuts down the consumer. It must be called after all child
+	// PartitionConsumers have already been closed.
+	Close() error
+}
+
+type consumer struct {
+	conf            *Config
+	children        map[string]map[int32]*partitionConsumer
+	brokerConsumers map[*Broker]*brokerConsumer
+	client          Client
+	lock            sync.Mutex
+}
+
+// NewConsumer creates a new consumer using the given broker addresses and configuration.
+func NewConsumer(addrs []string, config *Config) (Consumer, error) {
+	client, err := NewClient(addrs, config)
+	if err != nil {
+		return nil, err
+	}
+	return newConsumer(client)
+}
+
+// NewConsumerFromClient creates a new consumer using the given client. It is still
+// necessary to call Close() on the underlying client when shutting down this consumer.
+func NewConsumerFromClient(client Client) (Consumer, error) {
+	// For clients passed in by the client, ensure we don't
+	// call Close() on it.
+	cli := &nopCloserClient{client}
+	return newConsumer(cli)
+}
+
+func newConsumer(client Client) (Consumer, error) {
+	// Check that we are not dealing with a closed Client before processing any other arguments
+	if client.Closed() {
+		return nil, ErrClosedClient
+	}
+
+	c := &consumer{
+		client:          client,
+		conf:            client.Config(),
+		children:        make(map[string]map[int32]*partitionConsumer),
+		brokerConsumers: make(map[*Broker]*brokerConsumer),
+	}
+
+	return c, nil
+}
+
+func (c *consumer) Close() error {
+	return c.client.Close()
+}
+
+func (c *consumer) Topics() ([]string, error) {
+	return c.client.Topics()
+}
+
+func (c *consumer) Partitions(topic string) ([]int32, error) {
+	return c.client.Partitions(topic)
+}
+
+func (c *consumer) ConsumePartition(topic string, partition int32, offset int64) (PartitionConsumer, error) {
+	child := &partitionConsumer{
+		consumer:  c,
+		conf:      c.conf,
+		topic:     topic,
+		partition: partition,
+		messages:  make(chan *ConsumerMessage, c.conf.ChannelBufferSize),
+		errors:    make(chan *ConsumerError, c.conf.ChannelBufferSize),
+		feeder:    make(chan *FetchResponse, 1),
+		trigger:   make(chan none, 1),
+		dying:     make(chan none),
+		fetchSize: c.conf.Consumer.Fetch.Default,
+	}
+
+	if err := child.chooseStartingOffset(offset); err != nil {
+		return nil, err
+	}
+
+	var leader *Broker
+	var err error
+	if leader, err = c.client.Leader(child.topic, child.partition); err != nil {
+		return nil, err
+	}
+
+	if err := c.addChild(child); err != nil {
+		return nil, err
+	}
+
+	go withRecover(child.dispatcher)
+	go withRecover(child.responseFeeder)
+
+	child.broker = c.refBrokerConsumer(leader)
+	child.broker.input <- child
+
+	return child, nil
+}
+
+func (c *consumer) HighWaterMarks() map[string]map[int32]int64 {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	hwms := make(map[string]map[int32]int64)
+	for topic, p := range c.children {
+		hwm := make(map[int32]int64, len(p))
+		for partition, pc := range p {
+			hwm[partition] = pc.HighWaterMarkOffset()
+		}
+		hwms[topic] = hwm
+	}
+
+	return hwms
+}
+
+func (c *consumer) addChild(child *partitionConsumer) error {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	topicChildren := c.children[child.topic]
+	if topicChildren == nil {
+		topicChildren = make(map[int32]*partitionConsumer)
+		c.children[child.topic] = topicChildren
+	}
+
+	if topicChildren[child.partition] != nil {
+		return ConfigurationError("That topic/partition is already being consumed")
+	}
+
+	topicChildren[child.partition] = child
+	return nil
+}
+
+func (c *consumer) removeChild(child *partitionConsumer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	delete(c.children[child.topic], child.partition)
+}
+
+func (c *consumer) refBrokerConsumer(broker *Broker) *brokerConsumer {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	bc := c.brokerConsumers[broker]
+	if bc == nil {
+		bc = c.newBrokerConsumer(broker)
+		c.brokerConsumers[broker] = bc
+	}
+
+	bc.refs++
+
+	return bc
+}
+
+func (c *consumer) unrefBrokerConsumer(brokerWorker *brokerConsumer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	brokerWorker.refs--
+
+	if brokerWorker.refs == 0 {
+		close(brokerWorker.input)
+		if c.brokerConsumers[brokerWorker.broker] == brokerWorker {
+			delete(c.brokerConsumers, brokerWorker.broker)
+		}
+	}
+}
+
+func (c *consumer) abandonBrokerConsumer(brokerWorker *brokerConsumer) {
+	c.lock.Lock()
+	defer c.lock.Unlock()
+
+	delete(c.brokerConsumers, brokerWorker.broker)
+}
+
+// PartitionConsumer
+
+// PartitionConsumer processes Kafka messages from a given topic and partition. You MUST call one of Close() or
+// AsyncClose() on a PartitionConsumer to avoid leaks; it will not be garbage-collected automatically when it passes out
+// of scope.
+//
+// The simplest way of using a PartitionConsumer is to loop over its Messages channel using a for/range
+// loop. The PartitionConsumer will only stop itself in one case: when the offset being consumed is reported
+// as out of range by the brokers. In this case you should decide what you want to do (try a different offset,
+// notify a human, etc) and handle it appropriately. For all other error cases, it will just keep retrying.
+// By default, it logs these errors to sarama.Logger; if you want to be notified directly of all errors, set
+// your config's Consumer.Return.Errors to true and read from the Errors channel, using a select statement
+// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
+//
+// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
+// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
+// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
+// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
+// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
+type PartitionConsumer interface {
+	// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
+	// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
+	// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
+	// this before calling Close on the underlying client.
+	AsyncClose()
+
+	// Close stops the PartitionConsumer from fetching messages. It will initiate a shutdown just like AsyncClose, drain
+	// the Messages channel, harvest any errors & return them to the caller. Note that if you are continuing to service
+	// the Messages channel when this function is called, you will be competing with Close for messages; consider
+	// calling AsyncClose, instead. It is required to call this function (or AsyncClose) before a consumer object passes
+	// out of scope, as it will otherwise leak memory. You must call this before calling Close on the underlying client.
+	Close() error
+
+	// Messages returns the read channel for the messages that are returned by
+	// the broker.
+	Messages() <-chan *ConsumerMessage
+
+	// Errors returns a read channel of errors that occurred during consuming, if
+	// enabled. By default, errors are logged and not returned over this channel.
+	// If you want to implement any custom error handling, set your config's
+	// Consumer.Return.Errors setting to true, and read from this channel.
+	Errors() <-chan *ConsumerError
+
+	// HighWaterMarkOffset returns the high water mark offset of the partition,
+	// i.e. the offset that will be used for the next message that will be produced.
+	// You can use this to determine how far behind the processing is.
+	HighWaterMarkOffset() int64
+}
+
+type partitionConsumer struct {
+	highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
+
+	consumer *consumer
+	conf     *Config
+	broker   *brokerConsumer
+	messages chan *ConsumerMessage
+	errors   chan *ConsumerError
+	feeder   chan *FetchResponse
+
+	trigger, dying chan none
+	closeOnce      sync.Once
+	topic          string
+	partition      int32
+	responseResult error
+	fetchSize      int32
+	offset         int64
+	retries        int32
+}
+
+var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
+
+func (child *partitionConsumer) sendError(err error) {
+	cErr := &ConsumerError{
+		Topic:     child.topic,
+		Partition: child.partition,
+		Err:       err,
+	}
+
+	if child.conf.Consumer.Return.Errors {
+		child.errors <- cErr
+	} else {
+		Logger.Println(cErr)
+	}
+}
+
+func (child *partitionConsumer) computeBackoff() time.Duration {
+	if child.conf.Consumer.Retry.BackoffFunc != nil {
+		retries := atomic.AddInt32(&child.retries, 1)
+		return child.conf.Consumer.Retry.BackoffFunc(int(retries))
+	}
+	return child.conf.Consumer.Retry.Backoff
+}
+
+func (child *partitionConsumer) dispatcher() {
+	for range child.trigger {
+		select {
+		case <-child.dying:
+			close(child.trigger)
+		case <-time.After(child.computeBackoff()):
+			if child.broker != nil {
+				child.consumer.unrefBrokerConsumer(child.broker)
+				child.broker = nil
+			}
+
+			Logger.Printf("consumer/%s/%d finding new broker\n", child.topic, child.partition)
+			if err := child.dispatch(); err != nil {
+				child.sendError(err)
+				child.trigger <- none{}
+			}
+		}
+	}
+
+	if child.broker != nil {
+		child.consumer.unrefBrokerConsumer(child.broker)
+	}
+	child.consumer.removeChild(child)
+	close(child.feeder)
+}
+
+func (child *partitionConsumer) dispatch() error {
+	if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
+		return err
+	}
+
+	var leader *Broker
+	var err error
+	if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
+		return err
+	}
+
+	child.broker = child.consumer.refBrokerConsumer(leader)
+
+	child.broker.input <- child
+
+	return nil
+}
+
+func (child *partitionConsumer) chooseStartingOffset(offset int64) error {
+	newestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetNewest)
+	if err != nil {
+		return err
+	}
+	oldestOffset, err := child.consumer.client.GetOffset(child.topic, child.partition, OffsetOldest)
+	if err != nil {
+		return err
+	}
+
+	switch {
+	case offset == OffsetNewest:
+		child.offset = newestOffset
+	case offset == OffsetOldest:
+		child.offset = oldestOffset
+	case offset >= oldestOffset && offset <= newestOffset:
+		child.offset = offset
+	default:
+		return ErrOffsetOutOfRange
+	}
+
+	return nil
+}
+
+func (child *partitionConsumer) Messages() <-chan *ConsumerMessage {
+	return child.messages
+}
+
+func (child *partitionConsumer) Errors() <-chan *ConsumerError {
+	return child.errors
+}
+
+func (child *partitionConsumer) AsyncClose() {
+	// this triggers whatever broker owns this child to abandon it and close its trigger channel, which causes
+	// the dispatcher to exit its loop, which removes it from the consumer then closes its 'messages' and
+	// 'errors' channel (alternatively, if the child is already at the dispatcher for some reason, that will
+	// also just close itself)
+	child.closeOnce.Do(func() {
+		close(child.dying)
+	})
+}
+
+func (child *partitionConsumer) Close() error {
+	child.AsyncClose()
+
+	var errors ConsumerErrors
+	for err := range child.errors {
+		errors = append(errors, err)
+	}
+
+	if len(errors) > 0 {
+		return errors
+	}
+	return nil
+}
+
+func (child *partitionConsumer) HighWaterMarkOffset() int64 {
+	return atomic.LoadInt64(&child.highWaterMarkOffset)
+}
+
+func (child *partitionConsumer) responseFeeder() {
+	var msgs []*ConsumerMessage
+	expiryTicker := time.NewTicker(child.conf.Consumer.MaxProcessingTime)
+	firstAttempt := true
+
+feederLoop:
+	for response := range child.feeder {
+		msgs, child.responseResult = child.parseResponse(response)
+
+		if child.responseResult == nil {
+			atomic.StoreInt32(&child.retries, 0)
+		}
+
+		for i, msg := range msgs {
+		messageSelect:
+			select {
+			case <-child.dying:
+				child.broker.acks.Done()
+				continue feederLoop
+			case child.messages <- msg:
+				firstAttempt = true
+			case <-expiryTicker.C:
+				if !firstAttempt {
+					child.responseResult = errTimedOut
+					child.broker.acks.Done()
+				remainingLoop:
+					for _, msg = range msgs[i:] {
+						select {
+						case child.messages <- msg:
+						case <-child.dying:
+							break remainingLoop
+						}
+					}
+					child.broker.input <- child
+					continue feederLoop
+				} else {
+					// current message has not been sent, return to select
+					// statement
+					firstAttempt = false
+					goto messageSelect
+				}
+			}
+		}
+
+		child.broker.acks.Done()
+	}
+
+	expiryTicker.Stop()
+	close(child.messages)
+	close(child.errors)
+}
+
+func (child *partitionConsumer) parseMessages(msgSet *MessageSet) ([]*ConsumerMessage, error) {
+	var messages []*ConsumerMessage
+	for _, msgBlock := range msgSet.Messages {
+		for _, msg := range msgBlock.Messages() {
+			offset := msg.Offset
+			timestamp := msg.Msg.Timestamp
+			if msg.Msg.Version >= 1 {
+				baseOffset := msgBlock.Offset - msgBlock.Messages()[len(msgBlock.Messages())-1].Offset
+				offset += baseOffset
+				if msg.Msg.LogAppendTime {
+					timestamp = msgBlock.Msg.Timestamp
+				}
+			}
+			if offset < child.offset {
+				continue
+			}
+			messages = append(messages, &ConsumerMessage{
+				Topic:          child.topic,
+				Partition:      child.partition,
+				Key:            msg.Msg.Key,
+				Value:          msg.Msg.Value,
+				Offset:         offset,
+				Timestamp:      timestamp,
+				BlockTimestamp: msgBlock.Msg.Timestamp,
+			})
+			child.offset = offset + 1
+		}
+	}
+	if len(messages) == 0 {
+		child.offset++
+	}
+	return messages, nil
+}
+
+func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
+	messages := make([]*ConsumerMessage, 0, len(batch.Records))
+
+	for _, rec := range batch.Records {
+		offset := batch.FirstOffset + rec.OffsetDelta
+		if offset < child.offset {
+			continue
+		}
+		timestamp := batch.FirstTimestamp.Add(rec.TimestampDelta)
+		if batch.LogAppendTime {
+			timestamp = batch.MaxTimestamp
+		}
+		messages = append(messages, &ConsumerMessage{
+			Topic:     child.topic,
+			Partition: child.partition,
+			Key:       rec.Key,
+			Value:     rec.Value,
+			Offset:    offset,
+			Timestamp: timestamp,
+			Headers:   rec.Headers,
+		})
+		child.offset = offset + 1
+	}
+	if len(messages) == 0 {
+		child.offset++
+	}
+	return messages, nil
+}
+
+func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
+	var (
+		metricRegistry          = child.conf.MetricRegistry
+		consumerBatchSizeMetric metrics.Histogram
+	)
+
+	if metricRegistry != nil {
+		consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
+	}
+
+	// If request was throttled and empty we log and return without error
+	if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
+		Logger.Printf(
+			"consumer/broker/%d FetchResponse throttled %v\n",
+			child.broker.broker.ID(), response.ThrottleTime)
+		return nil, nil
+	}
+
+	block := response.GetBlock(child.topic, child.partition)
+	if block == nil {
+		return nil, ErrIncompleteResponse
+	}
+
+	if block.Err != ErrNoError {
+		return nil, block.Err
+	}
+
+	nRecs, err := block.numRecords()
+	if err != nil {
+		return nil, err
+	}
+
+	consumerBatchSizeMetric.Update(int64(nRecs))
+
+	if nRecs == 0 {
+		partialTrailingMessage, err := block.isPartial()
+		if err != nil {
+			return nil, err
+		}
+		// We got no messages. If we got a trailing one then we need to ask for more data.
+		// Otherwise we just poll again and wait for one to be produced...
+		if partialTrailingMessage {
+			if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize == child.conf.Consumer.Fetch.Max {
+				// we can't ask for more data, we've hit the configured limit
+				child.sendError(ErrMessageTooLarge)
+				child.offset++ // skip this one so we can keep processing future messages
+			} else {
+				child.fetchSize *= 2
+				// check int32 overflow
+				if child.fetchSize < 0 {
+					child.fetchSize = math.MaxInt32
+				}
+				if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
+					child.fetchSize = child.conf.Consumer.Fetch.Max
+				}
+			}
+		}
+
+		return nil, nil
+	}
+
+	// we got messages, reset our fetch size in case it was increased for a previous request
+	child.fetchSize = child.conf.Consumer.Fetch.Default
+	atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
+
+	// abortedProducerIDs contains producerID which message should be ignored as uncommitted
+	// - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
+	// - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
+	abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
+	abortedTransactions := block.getAbortedTransactions()
+
+	messages := []*ConsumerMessage{}
+	for _, records := range block.RecordsSet {
+		switch records.recordsType {
+		case legacyRecords:
+			messageSetMessages, err := child.parseMessages(records.MsgSet)
+			if err != nil {
+				return nil, err
+			}
+
+			messages = append(messages, messageSetMessages...)
+		case defaultRecords:
+			// Consume remaining abortedTransaction up to last offset of current batch
+			for _, txn := range abortedTransactions {
+				if txn.FirstOffset > records.RecordBatch.LastOffset() {
+					break
+				}
+				abortedProducerIDs[txn.ProducerID] = struct{}{}
+				// Pop abortedTransactions so that we never add it again
+				abortedTransactions = abortedTransactions[1:]
+			}
+
+			recordBatchMessages, err := child.parseRecords(records.RecordBatch)
+			if err != nil {
+				return nil, err
+			}
+
+			// Parse and commit offset but do not expose messages that are:
+			// - control records
+			// - part of an aborted transaction when set to `ReadCommitted`
+
+			// control record
+			isControl, err := records.isControl()
+			if err != nil {
+				// I don't know why there is this continue in case of error to begin with
+				// Safe bet is to ignore control messages if ReadUncommitted
+				// and block on them in case of error and ReadCommitted
+				if child.conf.Consumer.IsolationLevel == ReadCommitted {
+					return nil, err
+				}
+				continue
+			}
+			if isControl {
+				controlRecord, err := records.getControlRecord()
+				if err != nil {
+					return nil, err
+				}
+
+				if controlRecord.Type == ControlRecordAbort {
+					delete(abortedProducerIDs, records.RecordBatch.ProducerID)
+				}
+				continue
+			}
+
+			// filter aborted transactions
+			if child.conf.Consumer.IsolationLevel == ReadCommitted {
+				_, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
+				if records.RecordBatch.IsTransactional && isAborted {
+					continue
+				}
+			}
+
+			messages = append(messages, recordBatchMessages...)
+		default:
+			return nil, fmt.Errorf("unknown records type: %v", records.recordsType)
+		}
+	}
+
+	return messages, nil
+}
+
+type brokerConsumer struct {
+	consumer         *consumer
+	broker           *Broker
+	input            chan *partitionConsumer
+	newSubscriptions chan []*partitionConsumer
+	subscriptions    map[*partitionConsumer]none
+	wait             chan none
+	acks             sync.WaitGroup
+	refs             int
+}
+
+func (c *consumer) newBrokerConsumer(broker *Broker) *brokerConsumer {
+	bc := &brokerConsumer{
+		consumer:         c,
+		broker:           broker,
+		input:            make(chan *partitionConsumer),
+		newSubscriptions: make(chan []*partitionConsumer),
+		wait:             make(chan none),
+		subscriptions:    make(map[*partitionConsumer]none),
+		refs:             0,
+	}
+
+	go withRecover(bc.subscriptionManager)
+	go withRecover(bc.subscriptionConsumer)
+
+	return bc
+}
+
+// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
+// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
+// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
+// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
+// so the main goroutine can block waiting for work if it has none.
+func (bc *brokerConsumer) subscriptionManager() {
+	var buffer []*partitionConsumer
+
+	for {
+		if len(buffer) > 0 {
+			select {
+			case event, ok := <-bc.input:
+				if !ok {
+					goto done
+				}
+				buffer = append(buffer, event)
+			case bc.newSubscriptions <- buffer:
+				buffer = nil
+			case bc.wait <- none{}:
+			}
+		} else {
+			select {
+			case event, ok := <-bc.input:
+				if !ok {
+					goto done
+				}
+				buffer = append(buffer, event)
+			case bc.newSubscriptions <- nil:
+			}
+		}
+	}
+
+done:
+	close(bc.wait)
+	if len(buffer) > 0 {
+		bc.newSubscriptions <- buffer
+	}
+	close(bc.newSubscriptions)
+}
+
+//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
+func (bc *brokerConsumer) subscriptionConsumer() {
+	<-bc.wait // wait for our first piece of work
+
+	for newSubscriptions := range bc.newSubscriptions {
+		bc.updateSubscriptions(newSubscriptions)
+
+		if len(bc.subscriptions) == 0 {
+			// We're about to be shut down or we're about to receive more subscriptions.
+			// Either way, the signal just hasn't propagated to our goroutine yet.
+			<-bc.wait
+			continue
+		}
+
+		response, err := bc.fetchNewMessages()
+
+		if err != nil {
+			Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
+			bc.abort(err)
+			return
+		}
+
+		bc.acks.Add(len(bc.subscriptions))
+		for child := range bc.subscriptions {
+			child.feeder <- response
+		}
+		bc.acks.Wait()
+		bc.handleResponses()
+	}
+}
+
+func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
+	for _, child := range newSubscriptions {
+		bc.subscriptions[child] = none{}
+		Logger.Printf("consumer/broker/%d added subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
+	}
+
+	for child := range bc.subscriptions {
+		select {
+		case <-child.dying:
+			Logger.Printf("consumer/broker/%d closed dead subscription to %s/%d\n", bc.broker.ID(), child.topic, child.partition)
+			close(child.trigger)
+			delete(bc.subscriptions, child)
+		default:
+			// no-op
+		}
+	}
+}
+
+//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
+func (bc *brokerConsumer) handleResponses() {
+	for child := range bc.subscriptions {
+		result := child.responseResult
+		child.responseResult = nil
+
+		switch result {
+		case nil:
+			// no-op
+		case errTimedOut:
+			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
+				bc.broker.ID(), child.topic, child.partition)
+			delete(bc.subscriptions, child)
+		case ErrOffsetOutOfRange:
+			// there's no point in retrying this it will just fail the same way again
+			// shut it down and force the user to choose what to do
+			child.sendError(result)
+			Logger.Printf("consumer/%s/%d shutting down because %s\n", child.topic, child.partition, result)
+			close(child.trigger)
+			delete(bc.subscriptions, child)
+		case ErrUnknownTopicOrPartition, ErrNotLeaderForPartition, ErrLeaderNotAvailable, ErrReplicaNotAvailable:
+			// not an error, but does need redispatching
+			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
+				bc.broker.ID(), child.topic, child.partition, result)
+			child.trigger <- none{}
+			delete(bc.subscriptions, child)
+		default:
+			// dunno, tell the user and try redispatching
+			child.sendError(result)
+			Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because %s\n",
+				bc.broker.ID(), child.topic, child.partition, result)
+			child.trigger <- none{}
+			delete(bc.subscriptions, child)
+		}
+	}
+}
+
+func (bc *brokerConsumer) abort(err error) {
+	bc.consumer.abandonBrokerConsumer(bc)
+	_ = bc.broker.Close() // we don't care about the error this might return, we already have one
+
+	for child := range bc.subscriptions {
+		child.sendError(err)
+		child.trigger <- none{}
+	}
+
+	for newSubscriptions := range bc.newSubscriptions {
+		if len(newSubscriptions) == 0 {
+			<-bc.wait
+			continue
+		}
+		for _, child := range newSubscriptions {
+			child.sendError(err)
+			child.trigger <- none{}
+		}
+	}
+}
+
+func (bc *brokerConsumer) fetchNewMessages() (*FetchResponse, error) {
+	request := &FetchRequest{
+		MinBytes:    bc.consumer.conf.Consumer.Fetch.Min,
+		MaxWaitTime: int32(bc.consumer.conf.Consumer.MaxWaitTime / time.Millisecond),
+	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_9_0_0) {
+		request.Version = 1
+	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_10_0_0) {
+		request.Version = 2
+	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_10_1_0) {
+		request.Version = 3
+		request.MaxBytes = MaxResponseSize
+	}
+	if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
+		request.Version = 4
+		request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
+	}
+
+	for child := range bc.subscriptions {
+		request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)
+	}
+
+	return bc.broker.Fetch(request)
+}