[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/offset_response.go b/vendor/github.com/Shopify/sarama/offset_response.go
index 8b2193f..69349ef 100644
--- a/vendor/github.com/Shopify/sarama/offset_response.go
+++ b/vendor/github.com/Shopify/sarama/offset_response.go
@@ -50,11 +50,19 @@
}
type OffsetResponse struct {
- Version int16
- Blocks map[string]map[int32]*OffsetResponseBlock
+ Version int16
+ ThrottleTimeMs int32
+ Blocks map[string]map[int32]*OffsetResponseBlock
}
func (r *OffsetResponse) decode(pd packetDecoder, version int16) (err error) {
+ if version >= 2 {
+ r.ThrottleTimeMs, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
+
numTopics, err := pd.getArrayLength()
if err != nil {
return err
@@ -120,6 +128,10 @@
*/
func (r *OffsetResponse) encode(pe packetEncoder) (err error) {
+ if r.Version >= 2 {
+ pe.putInt32(r.ThrottleTimeMs)
+ }
+
if err = pe.putArrayLength(len(r.Blocks)); err != nil {
return err
}
@@ -150,10 +162,16 @@
return r.Version
}
+func (r *OffsetResponse) headerVersion() int16 {
+ return 0
+}
+
func (r *OffsetResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_10_1_0
+ case 2:
+ return V0_11_0_0
default:
return MinVersion
}