gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index b684aa4..9c70f81 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -13,17 +13,22 @@
}
type produceSet struct {
- parent *asyncProducer
- msgs map[string]map[int32]*partitionSet
+ parent *asyncProducer
+ msgs map[string]map[int32]*partitionSet
+ producerID int64
+ producerEpoch int16
bufferBytes int
bufferCount int
}
func newProduceSet(parent *asyncProducer) *produceSet {
+ pid, epoch := parent.txnmgr.getProducerID()
return &produceSet{
- msgs: make(map[string]map[int32]*partitionSet),
- parent: parent,
+ msgs: make(map[string]map[int32]*partitionSet),
+ parent: parent,
+ producerID: pid,
+ producerEpoch: epoch,
}
}
@@ -65,8 +70,8 @@
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
- ProducerID: ps.parent.txnmgr.producerID,
- ProducerEpoch: ps.parent.txnmgr.producerEpoch,
+ ProducerID: ps.producerID,
+ ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
@@ -78,12 +83,17 @@
}
partitions[msg.Partition] = set
}
- set.msgs = append(set.msgs, msg)
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
return errors.New("assertion failed: message out of sequence added to a batch")
}
+ }
+
+ // Past this point we can't return an error, because we've already added the message to the set.
+ set.msgs = append(set.msgs, msg)
+
+ if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
@@ -129,6 +139,10 @@
req.Version = 3
}
+ if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
+ req.Version = 7
+ }
+
for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
if req.Version >= 3 {