[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/offset_fetch_response.go b/vendor/github.com/Shopify/sarama/offset_fetch_response.go
index 9e25702..1944922 100644
--- a/vendor/github.com/Shopify/sarama/offset_fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/offset_fetch_response.go
@@ -8,6 +8,8 @@
}
func (b *OffsetFetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
+ isFlexible := version >= 6
+
b.Offset, err = pd.getInt64()
if err != nil {
return err
@@ -20,7 +22,11 @@
}
}
- b.Metadata, err = pd.getString()
+ if isFlexible {
+ b.Metadata, err = pd.getCompactString()
+ } else {
+ b.Metadata, err = pd.getString()
+ }
if err != nil {
return err
}
@@ -31,23 +37,37 @@
}
b.Err = KError(tmp)
+ if isFlexible {
+ if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+ return err
+ }
+ }
+
return nil
}
func (b *OffsetFetchResponseBlock) encode(pe packetEncoder, version int16) (err error) {
+ isFlexible := version >= 6
pe.putInt64(b.Offset)
if version >= 5 {
pe.putInt32(b.LeaderEpoch)
}
-
- err = pe.putString(b.Metadata)
+ if isFlexible {
+ err = pe.putCompactString(b.Metadata)
+ } else {
+ err = pe.putString(b.Metadata)
+ }
if err != nil {
return err
}
pe.putInt16(int16(b.Err))
+ if isFlexible {
+ pe.putEmptyTaggedFieldArray()
+ }
+
return nil
}
@@ -58,19 +78,37 @@
Err KError
}
-func (r *OffsetFetchResponse) encode(pe packetEncoder) error {
+func (r *OffsetFetchResponse) encode(pe packetEncoder) (err error) {
+ isFlexible := r.Version >= 6
+
if r.Version >= 3 {
pe.putInt32(r.ThrottleTimeMs)
}
-
- if err := pe.putArrayLength(len(r.Blocks)); err != nil {
+ if isFlexible {
+ pe.putCompactArrayLength(len(r.Blocks))
+ } else {
+ err = pe.putArrayLength(len(r.Blocks))
+ }
+ if err != nil {
return err
}
+
for topic, partitions := range r.Blocks {
- if err := pe.putString(topic); err != nil {
+ if isFlexible {
+ err = pe.putCompactString(topic)
+ } else {
+ err = pe.putString(topic)
+ }
+ if err != nil {
return err
}
- if err := pe.putArrayLength(len(partitions)); err != nil {
+
+ if isFlexible {
+ pe.putCompactArrayLength(len(partitions))
+ } else {
+ err = pe.putArrayLength(len(partitions))
+ }
+ if err != nil {
return err
}
for partition, block := range partitions {
@@ -79,15 +117,22 @@
return err
}
}
+ if isFlexible {
+ pe.putEmptyTaggedFieldArray()
+ }
}
if r.Version >= 2 {
pe.putInt16(int16(r.Err))
}
+ if isFlexible {
+ pe.putEmptyTaggedFieldArray()
+ }
return nil
}
func (r *OffsetFetchResponse) decode(pd packetDecoder, version int16) (err error) {
r.Version = version
+ isFlexible := version >= 6
if version >= 3 {
r.ThrottleTimeMs, err = pd.getInt32()
@@ -96,7 +141,12 @@
}
}
- numTopics, err := pd.getArrayLength()
+ var numTopics int
+ if isFlexible {
+ numTopics, err = pd.getCompactArrayLength()
+ } else {
+ numTopics, err = pd.getArrayLength()
+ }
if err != nil {
return err
}
@@ -104,22 +154,30 @@
if numTopics > 0 {
r.Blocks = make(map[string]map[int32]*OffsetFetchResponseBlock, numTopics)
for i := 0; i < numTopics; i++ {
- name, err := pd.getString()
+ var name string
+ if isFlexible {
+ name, err = pd.getCompactString()
+ } else {
+ name, err = pd.getString()
+ }
if err != nil {
return err
}
- numBlocks, err := pd.getArrayLength()
+ var numBlocks int
+ if isFlexible {
+ numBlocks, err = pd.getCompactArrayLength()
+ } else {
+ numBlocks, err = pd.getArrayLength()
+ }
if err != nil {
return err
}
- if numBlocks == 0 {
- r.Blocks[name] = nil
- continue
+ r.Blocks[name] = nil
+ if numBlocks > 0 {
+ r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
}
- r.Blocks[name] = make(map[int32]*OffsetFetchResponseBlock, numBlocks)
-
for j := 0; j < numBlocks; j++ {
id, err := pd.getInt32()
if err != nil {
@@ -131,8 +189,15 @@
if err != nil {
return err
}
+
r.Blocks[name][id] = block
}
+
+ if isFlexible {
+ if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+ return err
+ }
+ }
}
}
@@ -144,6 +209,12 @@
r.Err = KError(kerr)
}
+ if isFlexible {
+ if _, err := pd.getEmptyTaggedFieldArray(); err != nil {
+ return err
+ }
+ }
+
return nil
}
@@ -155,6 +226,14 @@
return r.Version
}
+func (r *OffsetFetchResponse) headerVersion() int16 {
+ if r.Version >= 6 {
+ return 1
+ }
+
+ return 0
+}
+
func (r *OffsetFetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
@@ -167,6 +246,10 @@
return V2_0_0_0
case 5:
return V2_1_0_0
+ case 6:
+ return V2_4_0_0
+ case 7:
+ return V2_5_0_0
default:
return MinVersion
}