[VOL-4293] OpenONU Adapter update for gRPC migration
Change-Id: I05300d3b95b878f44576a99a05f53f52fdc0cda1
diff --git a/vendor/github.com/Shopify/sarama/offset_request.go b/vendor/github.com/Shopify/sarama/offset_request.go
index 326c372..4c9ce4d 100644
--- a/vendor/github.com/Shopify/sarama/offset_request.go
+++ b/vendor/github.com/Shopify/sarama/offset_request.go
@@ -6,7 +6,7 @@
}
func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
- pe.putInt64(int64(b.time))
+ pe.putInt64(b.time)
if version == 0 {
pe.putInt32(b.maxOffsets)
}
@@ -28,6 +28,7 @@
type OffsetRequest struct {
Version int16
+ IsolationLevel IsolationLevel
replicaID int32
isReplicaIDSet bool
blocks map[string]map[int32]*offsetRequestBlock
@@ -41,6 +42,10 @@
pe.putInt32(-1)
}
+ if r.Version >= 2 {
+ pe.putBool(r.IsolationLevel == ReadCommitted)
+ }
+
err := pe.putArrayLength(len(r.blocks))
if err != nil {
return err
@@ -75,6 +80,18 @@
r.SetReplicaID(replicaID)
}
+ if r.Version >= 2 {
+ tmp, err := pd.getBool()
+ if err != nil {
+ return err
+ }
+
+ r.IsolationLevel = ReadUncommitted
+ if tmp {
+ r.IsolationLevel = ReadCommitted
+ }
+ }
+
blockCount, err := pd.getArrayLength()
if err != nil {
return err
@@ -116,10 +133,16 @@
return r.Version
}
+func (r *OffsetRequest) headerVersion() int16 {
+ return 1
+}
+
func (r *OffsetRequest) requiredVersion() KafkaVersion {
switch r.Version {
case 1:
return V0_10_1_0
+ case 2:
+ return V0_11_0_0
default:
return MinVersion
}