[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/vendor/github.com/Shopify/sarama/produce_response.go b/vendor/github.com/Shopify/sarama/produce_response.go
index 4c5cd35..edf9787 100644
--- a/vendor/github.com/Shopify/sarama/produce_response.go
+++ b/vendor/github.com/Shopify/sarama/produce_response.go
@@ -5,11 +5,27 @@
"time"
)
+// Protocol, http://kafka.apache.org/protocol.html
+// v1
+// v2 = v3 = v4
+// v5 = v6 = v7
+// Produce Response (Version: 7) => [responses] throttle_time_ms
+// responses => topic [partition_responses]
+// topic => STRING
+// partition_responses => partition error_code base_offset log_append_time log_start_offset
+// partition => INT32
+// error_code => INT16
+// base_offset => INT64
+// log_append_time => INT64
+// log_start_offset => INT64
+// throttle_time_ms => INT32
+
+// partition_responses in protocol
type ProduceResponseBlock struct {
- Err KError
- Offset int64
- // only provided if Version >= 2 and the broker is configured with `LogAppendTime`
- Timestamp time.Time
+ Err KError // v0, error_code
+ Offset int64 // v0, base_offset
+ Timestamp time.Time // v2, log_append_time, and the broker is configured with `LogAppendTime`
+ StartOffset int64 // v5, log_start_offset
}
func (b *ProduceResponseBlock) decode(pd packetDecoder, version int16) (err error) {
@@ -32,6 +48,13 @@
}
}
+ if version >= 5 {
+ b.StartOffset, err = pd.getInt64()
+ if err != nil {
+ return err
+ }
+ }
+
return nil
}
@@ -49,13 +72,17 @@
pe.putInt64(timestamp)
}
+ if version >= 5 {
+ pe.putInt64(b.StartOffset)
+ }
+
return nil
}
type ProduceResponse struct {
- Blocks map[string]map[int32]*ProduceResponseBlock
+ Blocks map[string]map[int32]*ProduceResponseBlock // v0, responses
Version int16
- ThrottleTime time.Duration // only provided if Version >= 1
+ ThrottleTime time.Duration // v1, throttle_time_ms
}
func (r *ProduceResponse) decode(pd packetDecoder, version int16) (err error) {
@@ -129,6 +156,7 @@
}
}
}
+
if r.Version >= 1 {
pe.putInt32(int32(r.ThrottleTime / time.Millisecond))
}
@@ -143,17 +171,12 @@
return r.Version
}
+func (r *ProduceResponse) headerVersion() int16 {
+ return 0
+}
+
func (r *ProduceResponse) requiredVersion() KafkaVersion {
- switch r.Version {
- case 1:
- return V0_9_0_0
- case 2:
- return V0_10_0_0
- case 3:
- return V0_11_0_0
- default:
- return MinVersion
- }
+ return MinVersion
}
func (r *ProduceResponse) GetBlock(topic string, partition int32) *ProduceResponseBlock {