[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/async_producer.go b/vendor/github.com/Shopify/sarama/async_producer.go
index 11e0849..5911f7b 100644
--- a/vendor/github.com/Shopify/sarama/async_producer.go
+++ b/vendor/github.com/Shopify/sarama/async_producer.go
@@ -60,13 +60,28 @@
noProducerEpoch = -1
)
-func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) int32 {
+func (t *transactionManager) getAndIncrementSequenceNumber(topic string, partition int32) (int32, int16) {
key := fmt.Sprintf("%s-%d", topic, partition)
t.mutex.Lock()
defer t.mutex.Unlock()
sequence := t.sequenceNumbers[key]
t.sequenceNumbers[key] = sequence + 1
- return sequence
+ return sequence, t.producerEpoch
+}
+
+func (t *transactionManager) bumpEpoch() {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ t.producerEpoch++
+ for k := range t.sequenceNumbers {
+ t.sequenceNumbers[k] = 0
+ }
+}
+
+func (t *transactionManager) getProducerID() (int64, int16) {
+ t.mutex.Lock()
+ defer t.mutex.Unlock()
+ return t.producerID, t.producerEpoch
}
func newTransactionManager(conf *Config, client Client) (*transactionManager, error) {
@@ -208,6 +223,8 @@
flags flagSet
expectation chan *ProducerError
sequenceNumber int32
+ producerEpoch int16
+ hasSequence bool
}
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
@@ -234,6 +251,9 @@
func (m *ProducerMessage) clear() {
m.flags = 0
m.retries = 0
+ m.sequenceNumber = 0
+ m.producerEpoch = 0
+ m.hasSequence = false
}
// ProducerError is the type of error generated when the producer fails to deliver a message.
@@ -247,6 +267,10 @@
return fmt.Sprintf("kafka: Failed to produce message to topic %s: %s", pe.Msg.Topic, pe.Err)
}
+func (pe ProducerError) Unwrap() error {
+ return pe.Err
+}
+
// ProducerErrors is a type that wraps a batch of "ProducerError"s and implements the Error interface.
// It can be returned from the Producer's Close method to avoid the need to manually drain the Errors channel
// when closing a producer.
@@ -328,6 +352,10 @@
p.inFlight.Add(1)
}
+ for _, interceptor := range p.conf.Producer.Interceptors {
+ msg.safelyApplyInterceptor(interceptor)
+ }
+
version := 1
if p.conf.Version.IsAtLeast(V0_11_0_0) {
version = 2
@@ -388,10 +416,6 @@
continue
}
}
- // All messages being retried (sent or not) have already had their retry count updated
- if tp.parent.conf.Producer.Idempotent && msg.retries == 0 {
- msg.sequenceNumber = tp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
- }
handler := tp.handlers[msg.Partition]
if handler == nil {
@@ -411,7 +435,7 @@
var partitions []int32
err := tp.breaker.Run(func() (err error) {
- var requiresConsistency = false
+ requiresConsistency := false
if ep, ok := tp.partitioner.(DynamicConsistencyPartitioner); ok {
requiresConsistency = ep.MessageRequiresConsistency(msg)
} else {
@@ -425,7 +449,6 @@
}
return
})
-
if err != nil {
return err
}
@@ -520,7 +543,6 @@
}()
for msg := range pp.input {
-
if pp.brokerProducer != nil && pp.brokerProducer.abandoned != nil {
select {
case <-pp.brokerProducer.abandoned:
@@ -571,6 +593,15 @@
Logger.Printf("producer/leader/%s/%d selected broker %d\n", pp.topic, pp.partition, pp.leader.ID())
}
+ // Now that we know we have a broker to actually try and send this message to, generate the sequence
+ // number for it.
+ // All messages being retried (sent or not) have already had their retry count updated
+ // Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
+ if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
+ msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
+ msg.hasSequence = true
+ }
+
pp.brokerProducer.input <- msg
}
}
@@ -652,6 +683,7 @@
input: input,
output: bridge,
responses: responses,
+ stopchan: make(chan struct{}),
buffer: newProduceSet(p),
currentRetries: make(map[string]map[int32]error),
}
@@ -696,6 +728,7 @@
output chan<- *produceSet
responses <-chan *brokerProducerResponse
abandoned chan struct{}
+ stopchan chan struct{}
buffer *produceSet
timer <-chan time.Time
@@ -711,12 +744,17 @@
for {
select {
- case msg := <-bp.input:
- if msg == nil {
+ case msg, ok := <-bp.input:
+ if !ok {
+ Logger.Printf("producer/broker/%d input chan closed\n", bp.broker.ID())
bp.shutdown()
return
}
+ if msg == nil {
+ continue
+ }
+
if msg.flags&syn == syn {
Logger.Printf("producer/broker/%d state change to [open] on %s/%d\n",
bp.broker.ID(), msg.Topic, msg.Partition)
@@ -742,12 +780,21 @@
}
if bp.buffer.wouldOverflow(msg) {
- if err := bp.waitForSpace(msg); err != nil {
+ Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
+ if err := bp.waitForSpace(msg, false); err != nil {
bp.parent.retryMessage(msg, err)
continue
}
}
+ if bp.parent.txnmgr.producerID != noProducerID && bp.buffer.producerEpoch != msg.producerEpoch {
+ // The epoch was reset, need to roll the buffer over
+ Logger.Printf("producer/broker/%d detected epoch rollover, waiting for new buffer\n", bp.broker.ID())
+ if err := bp.waitForSpace(msg, true); err != nil {
+ bp.parent.retryMessage(msg, err)
+ continue
+ }
+ }
if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
@@ -760,8 +807,14 @@
bp.timerFired = true
case output <- bp.buffer:
bp.rollOver()
- case response := <-bp.responses:
- bp.handleResponse(response)
+ case response, ok := <-bp.responses:
+ if ok {
+ bp.handleResponse(response)
+ }
+ case <-bp.stopchan:
+ Logger.Printf(
+ "producer/broker/%d run loop asked to stop\n", bp.broker.ID())
+ return
}
if bp.timerFired || bp.buffer.readyToFlush() {
@@ -785,7 +838,7 @@
for response := range bp.responses {
bp.handleResponse(response)
}
-
+ close(bp.stopchan)
Logger.Printf("producer/broker/%d shut down\n", bp.broker.ID())
}
@@ -797,9 +850,7 @@
return bp.currentRetries[msg.Topic][msg.Partition]
}
-func (bp *brokerProducer) waitForSpace(msg *ProducerMessage) error {
- Logger.Printf("producer/broker/%d maximum request accumulated, waiting for space\n", bp.broker.ID())
-
+func (bp *brokerProducer) waitForSpace(msg *ProducerMessage, forceRollover bool) error {
for {
select {
case response := <-bp.responses:
@@ -807,7 +858,7 @@
// handling a response can change our state, so re-check some things
if reason := bp.needsRetry(msg); reason != nil {
return reason
- } else if !bp.buffer.wouldOverflow(msg) {
+ } else if !bp.buffer.wouldOverflow(msg) && !forceRollover {
return nil
}
case bp.output <- bp.buffer:
@@ -1018,6 +1069,12 @@
}
func (p *asyncProducer) returnError(msg *ProducerMessage, err error) {
+ // We need to reset the producer ID epoch if we set a sequence number on it, because the broker
+ // will never see a message with this number, so we can never continue the sequence.
+ if msg.hasSequence {
+ Logger.Printf("producer/txnmanager rolling over epoch due to publish failure on %s/%d", msg.Topic, msg.Partition)
+ p.txnmgr.bumpEpoch()
+ }
msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {