VOL-2112 move to voltha-lib-go
Change-Id: Ic1af08003c1d2c698c0cce371e64f47b47b8d875
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
index ce72ff1..72c4d7c 100644
--- a/vendor/github.com/Shopify/sarama/consumer.go
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -3,20 +3,24 @@
import (
"errors"
"fmt"
+ "math"
"sync"
"sync/atomic"
"time"
+
+ "github.com/rcrowley/go-metrics"
)
// ConsumerMessage encapsulates a Kafka message returned by the consumer.
type ConsumerMessage struct {
- Key, Value []byte
- Topic string
- Partition int32
- Offset int64
+ Headers []*RecordHeader // only set if kafka is version 0.11+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
- Headers []*RecordHeader // only set if kafka is version 0.11+
+
+ Key, Value []byte
+ Topic string
+ Partition int32
+ Offset int64
}
// ConsumerError is what is provided to the user when an error occurs.
@@ -43,13 +47,7 @@
// Consumer manages PartitionConsumers which process Kafka messages from brokers. You MUST call Close()
// on a consumer to avoid leaks, it will not be garbage-collected automatically when it passes out of
// scope.
-//
-// Sarama's Consumer type does not currently support automatic consumer-group rebalancing and offset tracking.
-// For Zookeeper-based tracking (Kafka 0.8.2 and earlier), the https://github.com/wvanbergen/kafka library
-// builds on Sarama to add this support. For Kafka-based tracking (Kafka 0.9 and later), the
-// https://github.com/bsm/sarama-cluster library builds on Sarama to add this support.
type Consumer interface {
-
// Topics returns the set of available topics as retrieved from the cluster
// metadata. This method is the same as Client.Topics(), and is provided for
// convenience.
@@ -75,13 +73,11 @@
}
type consumer struct {
- client Client
- conf *Config
- ownClient bool
-
- lock sync.Mutex
+ conf *Config
children map[string]map[int32]*partitionConsumer
brokerConsumers map[*Broker]*brokerConsumer
+ client Client
+ lock sync.Mutex
}
// NewConsumer creates a new consumer using the given broker addresses and configuration.
@@ -90,18 +86,19 @@
if err != nil {
return nil, err
}
-
- c, err := NewConsumerFromClient(client)
- if err != nil {
- return nil, err
- }
- c.(*consumer).ownClient = true
- return c, nil
+ return newConsumer(client)
}
// NewConsumerFromClient creates a new consumer using the given client. It is still
// necessary to call Close() on the underlying client when shutting down this consumer.
func NewConsumerFromClient(client Client) (Consumer, error) {
+ // For clients passed in by the client, ensure we don't
+ // call Close() on it.
+ cli := &nopCloserClient{client}
+ return newConsumer(cli)
+}
+
+func newConsumer(client Client) (Consumer, error) {
// Check that we are not dealing with a closed Client before processing any other arguments
if client.Closed() {
return nil, ErrClosedClient
@@ -118,10 +115,7 @@
}
func (c *consumer) Close() error {
- if c.ownClient {
- return c.client.Close()
- }
- return nil
+ return c.client.Close()
}
func (c *consumer) Topics() ([]string, error) {
@@ -261,12 +255,11 @@
// or a separate goroutine. Check out the Consumer examples to see implementations of these different approaches.
//
// To terminate such a for/range loop while the loop is executing, call AsyncClose. This will kick off the process of
-// consumer tear-down & return imediately. Continue to loop, servicing the Messages channel until the teardown process
+// consumer tear-down & return immediately. Continue to loop, servicing the Messages channel until the teardown process
// AsyncClose initiated closes it (thus terminating the for/range loop). If you've already ceased reading Messages, call
// Close; this will signal the PartitionConsumer's goroutines to begin shutting down (just like AsyncClose), but will
// also drain the Messages channel, harvest all errors & return them once cleanup has completed.
type PartitionConsumer interface {
-
// AsyncClose initiates a shutdown of the PartitionConsumer. This method will return immediately, after which you
// should continue to service the 'Messages' and 'Errors' channels until they are empty. It is required to call this
// function, or Close before a consumer object passes out of scope, as it will otherwise leak memory. You must call
@@ -298,24 +291,22 @@
type partitionConsumer struct {
highWaterMarkOffset int64 // must be at the top of the struct because https://golang.org/pkg/sync/atomic/#pkg-note-BUG
- consumer *consumer
- conf *Config
- topic string
- partition int32
+ consumer *consumer
+ conf *Config
broker *brokerConsumer
messages chan *ConsumerMessage
errors chan *ConsumerError
feeder chan *FetchResponse
trigger, dying chan none
- responseResult error
closeOnce sync.Once
-
- fetchSize int32
- offset int64
-
- retries int32
+ topic string
+ partition int32
+ responseResult error
+ fetchSize int32
+ offset int64
+ retries int32
}
var errTimedOut = errors.New("timed out feeding messages to the user") // not user-facing
@@ -338,9 +329,8 @@
if child.conf.Consumer.Retry.BackoffFunc != nil {
retries := atomic.AddInt32(&child.retries, 1)
return child.conf.Consumer.Retry.BackoffFunc(int(retries))
- } else {
- return child.conf.Consumer.Retry.Backoff
}
+ return child.conf.Consumer.Retry.Backoff
}
func (child *partitionConsumer) dispatcher() {
@@ -432,12 +422,6 @@
func (child *partitionConsumer) Close() error {
child.AsyncClose()
- go withRecover(func() {
- for range child.messages {
- // drain
- }
- })
-
var errors ConsumerErrors
for err := range child.errors {
errors = append(errors, err)
@@ -469,14 +453,22 @@
for i, msg := range msgs {
messageSelect:
select {
+ case <-child.dying:
+ child.broker.acks.Done()
+ continue feederLoop
case child.messages <- msg:
firstAttempt = true
case <-expiryTicker.C:
if !firstAttempt {
child.responseResult = errTimedOut
child.broker.acks.Done()
+ remainingLoop:
for _, msg = range msgs[i:] {
- child.messages <- msg
+ select {
+ case child.messages <- msg:
+ case <-child.dying:
+ break remainingLoop
+ }
}
child.broker.input <- child
continue feederLoop
@@ -532,7 +524,8 @@
}
func (child *partitionConsumer) parseRecords(batch *RecordBatch) ([]*ConsumerMessage, error) {
- var messages []*ConsumerMessage
+ messages := make([]*ConsumerMessage, 0, len(batch.Records))
+
for _, rec := range batch.Records {
offset := batch.FirstOffset + rec.OffsetDelta
if offset < child.offset {
@@ -560,6 +553,23 @@
}
func (child *partitionConsumer) parseResponse(response *FetchResponse) ([]*ConsumerMessage, error) {
+ var (
+ metricRegistry = child.conf.MetricRegistry
+ consumerBatchSizeMetric metrics.Histogram
+ )
+
+ if metricRegistry != nil {
+ consumerBatchSizeMetric = getOrRegisterHistogram("consumer-batch-size", metricRegistry)
+ }
+
+ // If request was throttled and empty we log and return without error
+ if response.ThrottleTime != time.Duration(0) && len(response.Blocks) == 0 {
+ Logger.Printf(
+ "consumer/broker/%d FetchResponse throttled %v\n",
+ child.broker.broker.ID(), response.ThrottleTime)
+ return nil, nil
+ }
+
block := response.GetBlock(child.topic, child.partition)
if block == nil {
return nil, ErrIncompleteResponse
@@ -573,6 +583,9 @@
if err != nil {
return nil, err
}
+
+ consumerBatchSizeMetric.Update(int64(nRecs))
+
if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
@@ -587,6 +600,10 @@
child.offset++ // skip this one so we can keep processing future messages
} else {
child.fetchSize *= 2
+ // check int32 overflow
+ if child.fetchSize < 0 {
+ child.fetchSize = math.MaxInt32
+ }
if child.conf.Consumer.Fetch.Max > 0 && child.fetchSize > child.conf.Consumer.Fetch.Max {
child.fetchSize = child.conf.Consumer.Fetch.Max
}
@@ -600,6 +617,12 @@
child.fetchSize = child.conf.Consumer.Fetch.Default
atomic.StoreInt64(&child.highWaterMarkOffset, block.HighWaterMarkOffset)
+ // abortedProducerIDs contains producerID which message should be ignored as uncommitted
+ // - producerID are added when the partitionConsumer iterate over the offset at which an aborted transaction begins (abortedTransaction.FirstOffset)
+ // - producerID are removed when partitionConsumer iterate over an aborted controlRecord, meaning the aborted transaction for this producer is over
+ abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
+ abortedTransactions := block.getAbortedTransactions()
+
messages := []*ConsumerMessage{}
for _, records := range block.RecordsSet {
switch records.recordsType {
@@ -611,13 +634,55 @@
messages = append(messages, messageSetMessages...)
case defaultRecords:
+ // Consume remaining abortedTransaction up to last offset of current batch
+ for _, txn := range abortedTransactions {
+ if txn.FirstOffset > records.RecordBatch.LastOffset() {
+ break
+ }
+ abortedProducerIDs[txn.ProducerID] = struct{}{}
+ // Pop abortedTransactions so that we never add it again
+ abortedTransactions = abortedTransactions[1:]
+ }
+
recordBatchMessages, err := child.parseRecords(records.RecordBatch)
if err != nil {
return nil, err
}
- if control, err := records.isControl(); err != nil || control {
+
+ // Parse and commit offset but do not expose messages that are:
+ // - control records
+ // - part of an aborted transaction when set to `ReadCommitted`
+
+ // control record
+ isControl, err := records.isControl()
+ if err != nil {
+ // I don't know why there is this continue in case of error to begin with
+ // Safe bet is to ignore control messages if ReadUncommitted
+ // and block on them in case of error and ReadCommitted
+ if child.conf.Consumer.IsolationLevel == ReadCommitted {
+ return nil, err
+ }
continue
}
+ if isControl {
+ controlRecord, err := records.getControlRecord()
+ if err != nil {
+ return nil, err
+ }
+
+ if controlRecord.Type == ControlRecordAbort {
+ delete(abortedProducerIDs, records.RecordBatch.ProducerID)
+ }
+ continue
+ }
+
+ // filter aborted transactions
+ if child.conf.Consumer.IsolationLevel == ReadCommitted {
+ _, isAborted := abortedProducerIDs[records.RecordBatch.ProducerID]
+ if records.RecordBatch.IsTransactional && isAborted {
+ continue
+ }
+ }
messages = append(messages, recordBatchMessages...)
default:
@@ -628,15 +693,13 @@
return messages, nil
}
-// brokerConsumer
-
type brokerConsumer struct {
consumer *consumer
broker *Broker
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
- wait chan none
subscriptions map[*partitionConsumer]none
+ wait chan none
acks sync.WaitGroup
refs int
}
@@ -658,14 +721,14 @@
return bc
}
+// The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
+// goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
+// up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
+// it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
+// so the main goroutine can block waiting for work if it has none.
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
- // The subscriptionManager constantly accepts new subscriptions on `input` (even when the main subscriptionConsumer
- // goroutine is in the middle of a network request) and batches it up. The main worker goroutine picks
- // up a batch of new subscriptions between every network request by reading from `newSubscriptions`, so we give
- // it nil if no new subscriptions are available. We also write to `wait` only when new subscriptions is available,
- // so the main goroutine can block waiting for work if it has none.
for {
if len(buffer) > 0 {
select {
@@ -698,10 +761,10 @@
close(bc.newSubscriptions)
}
+//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
<-bc.wait // wait for our first piece of work
- // the subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
for newSubscriptions := range bc.newSubscriptions {
bc.updateSubscriptions(newSubscriptions)
@@ -742,20 +805,20 @@
close(child.trigger)
delete(bc.subscriptions, child)
default:
- break
+ // no-op
}
}
}
+//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
func (bc *brokerConsumer) handleResponses() {
- // handles the response codes left for us by our subscriptions, and abandons ones that have been closed
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil
switch result {
case nil:
- break
+ // no-op
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
@@ -822,7 +885,7 @@
}
if bc.consumer.conf.Version.IsAtLeast(V0_11_0_0) {
request.Version = 4
- request.Isolation = ReadUncommitted // We don't support yet transactions.
+ request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
for child := range bc.subscriptions {