[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go
index 4c5cd35..edf9787 100644
--- a/vendor/github.com/Shopify/sarama/produce_response.go
+++ b/vendor/github.com/Shopify/sarama/produce_response.go
@@ -5,11 +5,27 @@
 	"time"
 )
 
+// Protocol, http://kafka.apache.org/protocol.html
+// v1
+// v2 = v3 = v4
+// v5 = v6 = v7
+// Produce Response (Version: 7) => [responses] throttle_time_ms
+//   responses => topic [partition_responses]
+//     topic => STRING
+//     partition_responses => partition error_code base_offset log_append_time log_start_offset
+//       partition => INT32
+//       error_code => INT16
+//       base_offset => INT64
+//       log_append_time => INT64
+//       log_start_offset => INT64
+//   throttle_time_ms => INT32
+
+// partition_responses in protocol
 type ProduceResponseBlock struct {
-	Err    KError
-	Offset int64
-	// only provided if Version >= 2 and the broker is configured with `LogAppendTime`
-	Timestamp time.Time
+	Err         KError    // v0, error_code
+	Offset      int64     // v0, base_offset
+	Timestamp   time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
+	StartOffset int64     // v5, log_start_offset
 }
 
 func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -32,6 +48,13 @@
 		}
 	}
 
+	if version >= 5 {
+		b.StartOffset, err = pd.getInt64()
+		if err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -49,13 +72,17 @@
 		pe.putInt64(timestamp)
 	}
 
+	if version >= 5 {
+		pe.putInt64(b.StartOffset)
+	}
+
 	return nil
 }
 
 type ProduceResponse struct {
-	Blocks       map[string]map[int32]*ProduceResponseBlock
+	Blocks       map[string]map[int32]*ProduceResponseBlock // v0, responses
 	Version      int16
-	ThrottleTime time.Duration // only provided if Version >= 1
+	ThrottleTime time.Duration // v1, throttle_time_ms
 }
 
 func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -129,6 +156,7 @@
 			}
 		}
 	}
+
 	if r.Version >= 1 {
 		pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
 	}
@@ -143,17 +171,12 @@
 	return r.Version
 }
 
+func (r *ProduceResponse) headerVersion() int16 {
+	return 0
+}
+
 func (r *ProduceResponse) requiredVersion() KafkaVersion {
-	switch r.Version {
-	case 1:
-		return V0_9_0_0
-	case 2:
-		return V0_10_0_0
-	case 3:
-		return V0_11_0_0
-	default:
-		return MinVersion
-	}
+	return MinVersion
 }
 
 func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {