[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_response.go b/vendor/github.com/Shopify/sarama/offset_fetch_response.go
index 9e25702..1944922 100644
--- a/vendor/github.com/Shopify/sarama/offset_fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/offset_fetch_response.go
@@ -8,6 +8,8 @@
 }
 
 func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
+	isFlexible := version >= 6
+
 	b.Offset, err = pd.getInt64()
 	if err != nil {
 		return err
@@ -20,7 +22,11 @@
 		}
 	}
 
-	b.Metadata, err = pd.getString()
+	if isFlexible {
+		b.Metadata, err = pd.getCompactString()
+	} else {
+		b.Metadata, err = pd.getString()
+	}
 	if err != nil {
 		return err
 	}
@@ -31,23 +37,37 @@
 	}
 	b.Err = KError(tmp)
 
+	if isFlexible {
+		if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
 func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
+	isFlexible := version >= 6
 	pe.putInt64(b.Offset)
 
 	if version >= 5 {
 		pe.putInt32(b.LeaderEpoch)
 	}
-
-	err = pe.putString(b.Metadata)
+	if isFlexible {
+		err = pe.putCompactString(b.Metadata)
+	} else {
+		err = pe.putString(b.Metadata)
+	}
 	if err != nil {
 		return err
 	}
 
 	pe.putInt16(int16(b.Err))
 
+	if isFlexible {
+		pe.putEmptyTaggedFieldArray()
+	}
+
 	return nil
 }
 
@@ -58,19 +78,37 @@
 	Err            KError
 }
 
-func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
+func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
+	isFlexible := r.Version >= 6
+
 	if r.Version >= 3 {
 		pe.putInt32(r.ThrottleTimeMs)
 	}
-
-	if err := pe.putArrayLength(len(r.Blocks)); err != nil {
+	if isFlexible {
+		pe.putCompactArrayLength(len(r.Blocks))
+	} else {
+		err = pe.putArrayLength(len(r.Blocks))
+	}
+	if err != nil {
 		return err
 	}
+
 	for topic, partitions := range r.Blocks {
-		if err := pe.putString(topic); err != nil {
+		if isFlexible {
+			err = pe.putCompactString(topic)
+		} else {
+			err = pe.putString(topic)
+		}
+		if err != nil {
 			return err
 		}
-		if err := pe.putArrayLength(len(partitions)); err != nil {
+
+		if isFlexible {
+			pe.putCompactArrayLength(len(partitions))
+		} else {
+			err = pe.putArrayLength(len(partitions))
+		}
+		if err != nil {
 			return err
 		}
 		for partition, block := range partitions {
@@ -79,15 +117,22 @@
 				return err
 			}
 		}
+		if isFlexible {
+			pe.putEmptyTaggedFieldArray()
+		}
 	}
 	if r.Version >= 2 {
 		pe.putInt16(int16(r.Err))
 	}
+	if isFlexible {
+		pe.putEmptyTaggedFieldArray()
+	}
 	return nil
 }
 
 func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
 	r.Version = version
+	isFlexible := version >= 6
 
 	if version >= 3 {
 		r.ThrottleTimeMs, err = pd.getInt32()
@@ -96,7 +141,12 @@
 		}
 	}
 
-	numTopics, err := pd.getArrayLength()
+	var numTopics int
+	if isFlexible {
+		numTopics, err = pd.getCompactArrayLength()
+	} else {
+		numTopics, err = pd.getArrayLength()
+	}
 	if err != nil {
 		return err
 	}
@@ -104,22 +154,30 @@
 	if numTopics > 0 {
 		r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
 		for i := 0; i < numTopics; i++ {
-			name, err := pd.getString()
+			var name string
+			if isFlexible {
+				name, err = pd.getCompactString()
+			} else {
+				name, err = pd.getString()
+			}
 			if err != nil {
 				return err
 			}
 
-			numBlocks, err := pd.getArrayLength()
+			var numBlocks int
+			if isFlexible {
+				numBlocks, err = pd.getCompactArrayLength()
+			} else {
+				numBlocks, err = pd.getArrayLength()
+			}
 			if err != nil {
 				return err
 			}
 
-			if numBlocks == 0 {
-				r.Blocks[name] = nil
-				continue
+			r.Blocks[name] = nil
+			if numBlocks > 0 {
+				r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
 			}
-			r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
-
 			for j := 0; j < numBlocks; j++ {
 				id, err := pd.getInt32()
 				if err != nil {
@@ -131,8 +189,15 @@
 				if err != nil {
 					return err
 				}
+
 				r.Blocks[name][id] = block
 			}
+
+			if isFlexible {
+				if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+					return err
+				}
+			}
 		}
 	}
 
@@ -144,6 +209,12 @@
 		r.Err = KError(kerr)
 	}
 
+	if isFlexible {
+		if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -155,6 +226,14 @@
 	return r.Version
 }
 
+func (r *OffsetFetchResponse) headerVersion() int16 {
+	if r.Version >= 6 {
+		return 1
+	}
+
+	return 0
+}
+
 func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
 	switch r.Version {
 	case 1:
@@ -167,6 +246,10 @@
 		return V2_0_0_0
 	case 5:
 		return V2_1_0_0
+	case 6:
+		return V2_4_0_0
+	case 7:
+		return V2_5_0_0
 	default:
 		return MinVersion
 	}