[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/fetch_response.go b/vendor/github.com/Shopify/sarama/fetch_response.go
index 3afc187..54b8828 100644
--- a/vendor/github.com/Shopify/sarama/fetch_response.go
+++ b/vendor/github.com/Shopify/sarama/fetch_response.go
@@ -30,13 +30,15 @@
}
type FetchResponseBlock struct {
- Err KError
- HighWaterMarkOffset int64
- LastStableOffset int64
- AbortedTransactions []*AbortedTransaction
- Records *Records // deprecated: use FetchResponseBlock.RecordsSet
- RecordsSet []*Records
- Partial bool
+ Err KError
+ HighWaterMarkOffset int64
+ LastStableOffset int64
+ LogStartOffset int64
+ AbortedTransactions []*AbortedTransaction
+ PreferredReadReplica int32
+ Records *Records // deprecated: use FetchResponseBlock.RecordsSet
+ RecordsSet []*Records
+ Partial bool
}
func (b *FetchResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -57,6 +59,13 @@
return err
}
+ if version >= 5 {
+ b.LogStartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
numTransact, err := pd.getArrayLength()
if err != nil {
return err
@@ -75,6 +84,15 @@
}
}
+ if version >= 11 {
+ b.PreferredReadReplica, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ } else {
+ b.PreferredReadReplica = -1
+ }
+
recordsSize, err := pd.getInt32()
if err != nil {
return err
@@ -166,6 +184,10 @@
if version >= 4 {
pe.putInt64(b.LastStableOffset)
+ if version >= 5 {
+ pe.putInt64(b.LogStartOffset)
+ }
+
if err = pe.putArrayLength(len(b.AbortedTransactions)); err != nil {
return err
}
@@ -176,6 +198,10 @@
}
}
+ if version >= 11 {
+ pe.putInt32(b.PreferredReadReplica)
+ }
+
pe.push(&lengthField{})
for _, records := range b.RecordsSet {
err = records.encode(pe)
@@ -200,7 +226,9 @@
type FetchResponse struct {
Blocks map[string]map[int32]*FetchResponseBlock
ThrottleTime time.Duration
- Version int16 // v1 requires 0.9+, v2 requires 0.10+
+ ErrorCode int16
+ SessionID int32
+ Version int16
LogAppendTime bool
Timestamp time.Time
}
@@ -216,6 +244,17 @@
r.ThrottleTime = time.Duration(throttle) * time.Millisecond
}
+ if r.Version >= 7 {
+ r.ErrorCode, err = pd.getInt16()
+ if err != nil {
+ return err
+ }
+ r.SessionID, err = pd.getInt32()
+ if err != nil {
+ return err
+ }
+ }
+
numTopics, err := pd.getArrayLength()
if err != nil {
return err
@@ -258,6 +297,11 @@
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
+ if r.Version >= 7 {
+ pe.putInt16(r.ErrorCode)
+ pe.putInt32(r.SessionID)
+ }
+
err = pe.putArrayLength(len(r.Blocks))
if err != nil {
return err
@@ -281,7 +325,6 @@
return err
}
}
-
}
return nil
}
@@ -294,18 +337,34 @@
return r.Version
}
+func (r *FetchResponse) headerVersion() int16 {
+ return 0
+}
+
func (r *FetchResponse) requiredVersion() KafkaVersion {
switch r.Version {
+ case 0:
+ return MinVersion
case 1:
return V0_9_0_0
case 2:
return V0_10_0_0
case 3:
return V0_10_1_0
- case 4:
+ case 4, 5:
return V0_11_0_0
+ case 6:
+ return V1_0_0_0
+ case 7:
+ return V1_1_0_0
+ case 8:
+ return V2_0_0_0
+ case 9, 10:
+ return V2_1_0_0
+ case 11:
+ return V2_3_0_0
default:
- return MinVersion
+ return MaxVersion
}
}