gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index 9b15cd1..5911f7b 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -60,13 +60,28 @@
noProducerEpoch = -1
)
-func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
+func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
- return sequence
+ return sequence, t.producerEpoch
+}
+
+func (t *transactionManager) bumpEpoch() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.producerEpoch++
+ for k := range t.sequenceNumbers {
+ t.sequenceNumbers[k] = 0
+ }
+}
+
+func (t *transactionManager) getProducerID() (int64, int16) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ return t.producerID, t.producerEpoch
}
func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
@@ -208,6 +223,8 @@
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
+ producerEpoch int16
+ hasSequence bool
}
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
@@ -234,6 +251,9 @@
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
+ m.sequenceNumber = 0
+ m.producerEpoch = 0
+ m.hasSequence = false
}
// ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -247,6 +267,10 @@
return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
}
+func (pe ProducerError) Unwrap() error {
+ return pe.Err
+}
+
// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
// when closing a producer.
@@ -328,6 +352,10 @@
p.inFlight.Add(1)
}
+ for _, interceptor := range p.conf.Producer.Interceptors {
+ msg.safelyApplyInterceptor(interceptor)
+ }
+
version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
@@ -388,10 +416,6 @@
continue
}
}
- // All messages being retried (sent or not) have already had their retry count updated
- if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
- msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
- }
handler := tp.handlers[msg.Partition]
if handler == nil {
@@ -411,7 +435,7 @@
var partitions []int32
err := tp.breaker.Run(func() (err error) {
- var requiresConsistency = false
+ requiresConsistency := false
if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
requiresConsistency = ep.MessageRequiresConsistency(msg)
} else {
@@ -425,7 +449,6 @@
}
return
})
-
if err != nil {
return err
}
@@ -570,6 +593,15 @@
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
+ // Now that we know we have a broker to actually try and send this message to, generate the sequence
+ // number for it.
+ // All messages being retried (sent or not) have already had their retry count updated
+ // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
+ if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
+ msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
+ msg.hasSequence = true
+ }
+
pp.brokerProducer.input <- msg
}
}
@@ -748,12 +780,21 @@
}
if bp.buffer.wouldOverflow(msg) {
- if err := bp.waitForSpace(msg); err != nil {
+ Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
+ if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
+ if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
+ // The epoch was reset, need to roll the buffer over
+ Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
+ if err := bp.waitForSpace(msg, true); err != nil {
+ bp.parent.retryMessage(msg, err)
+ continue
+ }
+ }
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
@@ -809,9 +850,7 @@
return bp.currentRetries[msg.Topic][msg.Partition]
}
-func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
- Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
-
+func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
for {
select {
case response := <-bp.responses:
@@ -819,7 +858,7 @@
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
- } else if !bp.buffer.wouldOverflow(msg) {
+ } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
return nil
}
case bp.output <- bp.buffer:
@@ -1030,6 +1069,12 @@
}
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
+ // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
+ // will never see a message with this number, so we can never continue the sequence.
+ if msg.hasSequence {
+ Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
+ p.txnmgr.bumpEpoch()
+ }
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {