[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index bba0f7e..9c70f81 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -13,17 +13,22 @@
}
type produceSet struct {
- parent *asyncProducer
- msgs map[string]map[int32]*partitionSet
+ parent *asyncProducer
+ msgs map[string]map[int32]*partitionSet
+ producerID int64
+ producerEpoch int16
bufferBytes int
bufferCount int
}
func newProduceSet(parent *asyncProducer) *produceSet {
+ pid, epoch := parent.txnmgr.getProducerID()
return &produceSet{
- msgs: make(map[string]map[int32]*partitionSet),
- parent: parent,
+ msgs: make(map[string]map[int32]*partitionSet),
+ parent: parent,
+ producerID: pid,
+ producerEpoch: epoch,
}
}
@@ -44,9 +49,10 @@
}
timestamp := msg.Timestamp
- if msg.Timestamp.IsZero() {
+ if timestamp.IsZero() {
timestamp = time.Now()
}
+ timestamp = timestamp.Truncate(time.Millisecond)
partitions := ps.msgs[msg.Topic]
if partitions == nil {
@@ -64,8 +70,8 @@
Version: 2,
Codec: ps.parent.conf.Producer.Compression,
CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
- ProducerID: ps.parent.txnmgr.producerID,
- ProducerEpoch: ps.parent.txnmgr.producerEpoch,
+ ProducerID: ps.producerID,
+ ProducerEpoch: ps.producerEpoch,
}
if ps.parent.conf.Producer.Idempotent {
batch.FirstSequence = msg.sequenceNumber
@@ -77,12 +83,17 @@
}
partitions[msg.Partition] = set
}
- set.msgs = append(set.msgs, msg)
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
return errors.New("assertion failed: message out of sequence added to a batch")
}
+ }
+
+ // Past this point we can't return an error, because we've already added the message to the set.
+ set.msgs = append(set.msgs, msg)
+
+ if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
// We are being conservative here to avoid having to prep encode the record
size += maximumRecordOverhead
rec := &Record{
@@ -128,6 +139,10 @@
req.Version = 3
}
+ if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
+ req.Version = 7
+ }
+
for topic, partitionSets := range ps.msgs {
for partition, set := range partitionSets {
if req.Version >= 3 {