gRPC migration update

Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/Shopify/sarama/produce_set.go b/vendor/github.com/Shopify/sarama/produce_set.go
index b684aa4..9c70f81 100644
--- a/vendor/github.com/Shopify/sarama/produce_set.go
+++ b/vendor/github.com/Shopify/sarama/produce_set.go
@@ -13,17 +13,22 @@
 }
 
 type produceSet struct {
-	parent *asyncProducer
-	msgs   map[string]map[int32]*partitionSet
+	parent        *asyncProducer
+	msgs          map[string]map[int32]*partitionSet
+	producerID    int64
+	producerEpoch int16
 
 	bufferBytes int
 	bufferCount int
 }
 
 func newProduceSet(parent *asyncProducer) *produceSet {
+	pid, epoch := parent.txnmgr.getProducerID()
 	return &produceSet{
-		msgs:   make(map[string]map[int32]*partitionSet),
-		parent: parent,
+		msgs:          make(map[string]map[int32]*partitionSet),
+		parent:        parent,
+		producerID:    pid,
+		producerEpoch: epoch,
 	}
 }
 
@@ -65,8 +70,8 @@
 				Version:          2,
 				Codec:            ps.parent.conf.Producer.Compression,
 				CompressionLevel: ps.parent.conf.Producer.CompressionLevel,
-				ProducerID:       ps.parent.txnmgr.producerID,
-				ProducerEpoch:    ps.parent.txnmgr.producerEpoch,
+				ProducerID:       ps.producerID,
+				ProducerEpoch:    ps.producerEpoch,
 			}
 			if ps.parent.conf.Producer.Idempotent {
 				batch.FirstSequence = msg.sequenceNumber
@@ -78,12 +83,17 @@
 		}
 		partitions[msg.Partition] = set
 	}
-	set.msgs = append(set.msgs, msg)
 
 	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 		if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
 			return errors.New("assertion failed: message out of sequence added to a batch")
 		}
+	}
+
+	// Past this point we can't return an error, because we've already added the message to the set.
+	set.msgs = append(set.msgs, msg)
+
+	if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
 		// We are being conservative here to avoid having to prep encode the record
 		size += maximumRecordOverhead
 		rec := &Record{
@@ -129,6 +139,10 @@
 		req.Version = 3
 	}
 
+	if ps.parent.conf.Producer.Compression == CompressionZSTD && ps.parent.conf.Version.IsAtLeast(V2_1_0_0) {
+		req.Version = 7
+	}
+
 	for topic, partitionSets := range ps.msgs {
 		for partition, set := range partitionSets {
 			if req.Version >= 3 {