gRPC migration update
Change-Id: Icdd1a824948fa994cd36bd121c962f5ecf74e3cf
diff --git a/vendor/github.com/Shopify/sarama/request.go b/vendor/github.com/Shopify/sarama/request.go
index 97437d6..d899df5 100644
--- a/vendor/github.com/Shopify/sarama/request.go
+++ b/vendor/github.com/Shopify/sarama/request.go
@@ -11,6 +11,7 @@
versionedDecoder
key() int16
version() int16
+ headerVersion() int16
requiredVersion() KafkaVersion
}
@@ -26,12 +27,19 @@
pe.putInt16(r.body.version())
pe.putInt32(r.correlationID)
- err := pe.putString(r.clientID)
- if err != nil {
- return err
+ if r.body.headerVersion() >= 1 {
+ err := pe.putString(r.clientID)
+ if err != nil {
+ return err
+ }
}
- err = r.body.encode(pe)
+ if r.body.headerVersion() >= 2 {
+ // we don't use tag headers at the moment so we just put an array length of 0
+ pe.putUVarint(0)
+ }
+
+ err := r.body.encode(pe)
if err != nil {
return err
}
@@ -65,6 +73,14 @@
return PacketDecodingError{fmt.Sprintf("unknown request key (%d)", key)}
}
+ if r.body.headerVersion() >= 2 {
+ // tagged field
+ _, err = pd.getUVarint()
+ if err != nil {
+ return err
+ }
+ }
+
return r.body.decode(pd, version)
}
@@ -105,7 +121,7 @@
case 0:
return &ProduceRequest{}
case 1:
- return &FetchRequest{}
+ return &FetchRequest{Version: version}
case 2:
return &OffsetRequest{Version: version}
case 3:
@@ -113,7 +129,7 @@
case 8:
return &OffsetCommitRequest{Version: version}
case 9:
- return &OffsetFetchRequest{}
+ return &OffsetFetchRequest{Version: version}
case 10:
return &FindCoordinatorRequest{}
case 11:
@@ -166,6 +182,16 @@
return &CreatePartitionsRequest{}
case 42:
return &DeleteGroupsRequest{}
+ case 44:
+ return &IncrementalAlterConfigsRequest{}
+ case 45:
+ return &AlterPartitionReassignmentsRequest{}
+ case 46:
+ return &ListPartitionReassignmentsRequest{}
+ case 50:
+ return &DescribeUserScramCredentialsRequest{}
+ case 51:
+ return &AlterUserScramCredentialsRequest{}
}
return nil
}