[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/vendor/github.com/Shopify/sarama/consumer.go b/vendor/github.com/Shopify/sarama/consumer.go
index 72c4d7c..f9cd172 100644
--- a/vendor/github.com/Shopify/sarama/consumer.go
+++ b/vendor/github.com/Shopify/sarama/consumer.go
@@ -35,6 +35,10 @@
return fmt.Sprintf("kafka: error while consuming %s/%d: %s", ce.Topic, ce.Partition, ce.Err)
}
+func (ce ConsumerError) Unwrap() error {
+ return ce.Err
+}
+
// ConsumerErrors is a type that wraps a batch of errors and implements the Error interface.
// It can be returned from the PartitionConsumer's Close methods to avoid the need to manually drain errors
// when stopping.
@@ -299,6 +303,8 @@
errors chan *ConsumerError
feeder chan *FetchResponse
+ preferredReadReplica int32
+
trigger, dying chan none
closeOnce sync.Once
topic string
@@ -359,18 +365,29 @@
close(child.feeder)
}
+func (child *partitionConsumer) preferredBroker() (*Broker, error) {
+ if child.preferredReadReplica >= 0 {
+ broker, err := child.consumer.client.Broker(child.preferredReadReplica)
+ if err == nil {
+ return broker, nil
+ }
+ }
+
+ // if prefered replica cannot be found fallback to leader
+ return child.consumer.client.Leader(child.topic, child.partition)
+}
+
func (child *partitionConsumer) dispatch() error {
if err := child.consumer.client.RefreshMetadata(child.topic); err != nil {
return err
}
- var leader *Broker
- var err error
- if leader, err = child.consumer.client.Leader(child.topic, child.partition); err != nil {
+ broker, err := child.preferredBroker()
+ if err != nil {
return err
}
- child.broker = child.consumer.refBrokerConsumer(leader)
+ child.broker = child.consumer.refBrokerConsumer(broker)
child.broker.input <- child
@@ -422,13 +439,13 @@
func (child *partitionConsumer) Close() error {
child.AsyncClose()
- var errors ConsumerErrors
+ var consumerErrors ConsumerErrors
for err := range child.errors {
- errors = append(errors, err)
+ consumerErrors = append(consumerErrors, err)
}
- if len(errors) > 0 {
- return errors
+ if len(consumerErrors) > 0 {
+ return consumerErrors
}
return nil
}
@@ -451,6 +468,7 @@
}
for i, msg := range msgs {
+ child.interceptors(msg)
messageSelect:
select {
case <-child.dying:
@@ -464,6 +482,7 @@
child.broker.acks.Done()
remainingLoop:
for _, msg = range msgs[i:] {
+ child.interceptors(msg)
select {
case child.messages <- msg:
case <-child.dying:
@@ -586,6 +605,8 @@
consumerBatchSizeMetric.Update(int64(nRecs))
+ child.preferredReadReplica = block.PreferredReadReplica
+
if nRecs == 0 {
partialTrailingMessage, err := block.isPartial()
if err != nil {
@@ -623,7 +644,7 @@
abortedProducerIDs := make(map[int64]struct{}, len(block.AbortedTransactions))
abortedTransactions := block.getAbortedTransactions()
- messages := []*ConsumerMessage{}
+ var messages []*ConsumerMessage
for _, records := range block.RecordsSet {
switch records.recordsType {
case legacyRecords:
@@ -693,6 +714,12 @@
return messages, nil
}
+func (child *partitionConsumer) interceptors(msg *ConsumerMessage) {
+ for _, interceptor := range child.conf.Consumer.Interceptors {
+ msg.safelyApplyInterceptor(interceptor)
+ }
+}
+
type brokerConsumer struct {
consumer *consumer
broker *Broker
@@ -761,7 +788,7 @@
close(bc.newSubscriptions)
}
-//subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
+// subscriptionConsumer ensures we will get nil right away if no new subscriptions is available
func (bc *brokerConsumer) subscriptionConsumer() {
<-bc.wait // wait for our first piece of work
@@ -776,7 +803,6 @@
}
response, err := bc.fetchNewMessages()
-
if err != nil {
Logger.Printf("consumer/broker/%d disconnecting due to error processing FetchRequest: %s\n", bc.broker.ID(), err)
bc.abort(err)
@@ -810,15 +836,27 @@
}
}
-//handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
+// handleResponses handles the response codes left for us by our subscriptions, and abandons ones that have been closed
func (bc *brokerConsumer) handleResponses() {
for child := range bc.subscriptions {
result := child.responseResult
child.responseResult = nil
+ if result == nil {
+ if preferredBroker, err := child.preferredBroker(); err == nil {
+ if bc.broker.ID() != preferredBroker.ID() {
+ // not an error but needs redispatching to consume from prefered replica
+ child.trigger <- none{}
+ delete(bc.subscriptions, child)
+ }
+ }
+ continue
+ }
+
+ // Discard any replica preference.
+ child.preferredReadReplica = -1
+
switch result {
- case nil:
- // no-op
case errTimedOut:
Logger.Printf("consumer/broker/%d abandoned subscription to %s/%d because consuming was taking too long\n",
bc.broker.ID(), child.topic, child.partition)
@@ -887,6 +925,21 @@
request.Version = 4
request.Isolation = bc.consumer.conf.Consumer.IsolationLevel
}
+ if bc.consumer.conf.Version.IsAtLeast(V1_1_0_0) {
+ request.Version = 7
+ // We do not currently implement KIP-227 FetchSessions. Setting the id to 0
+ // and the epoch to -1 tells the broker not to generate as session ID we're going
+ // to just ignore anyway.
+ request.SessionID = 0
+ request.SessionEpoch = -1
+ }
+ if bc.consumer.conf.Version.IsAtLeast(V2_1_0_0) {
+ request.Version = 10
+ }
+ if bc.consumer.conf.Version.IsAtLeast(V2_3_0_0) {
+ request.Version = 11
+ request.RackID = bc.consumer.conf.RackID
+ }
for child := range bc.subscriptions {
request.AddBlock(child.topic, child.partition, child.offset, child.fetchSize)