blob: 4c9ce4df552cac7f10cf435354d51345bb695e48 [file] [log] [blame]
Scott Bakered4efab2020-01-13 19:12:25 -08001package sarama
2
3type offsetRequestBlock struct {
4 time int64
5 maxOffsets int32 // Only used in version 0
6}
7
8func (b *offsetRequestBlock) encode(pe packetEncoder, version int16) error {
David K. Bainbridgebd6b2882021-08-26 13:31:02 +00009 pe.putInt64(b.time)
Scott Bakered4efab2020-01-13 19:12:25 -080010 if version == 0 {
11 pe.putInt32(b.maxOffsets)
12 }
13
14 return nil
15}
16
17func (b *offsetRequestBlock) decode(pd packetDecoder, version int16) (err error) {
18 if b.time, err = pd.getInt64(); err != nil {
19 return err
20 }
21 if version == 0 {
22 if b.maxOffsets, err = pd.getInt32(); err != nil {
23 return err
24 }
25 }
26 return nil
27}
28
29type OffsetRequest struct {
30 Version int16
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000031 IsolationLevel IsolationLevel
Scott Bakered4efab2020-01-13 19:12:25 -080032 replicaID int32
33 isReplicaIDSet bool
34 blocks map[string]map[int32]*offsetRequestBlock
35}
36
37func (r *OffsetRequest) encode(pe packetEncoder) error {
38 if r.isReplicaIDSet {
39 pe.putInt32(r.replicaID)
40 } else {
41 // default replica ID is always -1 for clients
42 pe.putInt32(-1)
43 }
44
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000045 if r.Version >= 2 {
46 pe.putBool(r.IsolationLevel == ReadCommitted)
47 }
48
Scott Bakered4efab2020-01-13 19:12:25 -080049 err := pe.putArrayLength(len(r.blocks))
50 if err != nil {
51 return err
52 }
53 for topic, partitions := range r.blocks {
54 err = pe.putString(topic)
55 if err != nil {
56 return err
57 }
58 err = pe.putArrayLength(len(partitions))
59 if err != nil {
60 return err
61 }
62 for partition, block := range partitions {
63 pe.putInt32(partition)
64 if err = block.encode(pe, r.Version); err != nil {
65 return err
66 }
67 }
68 }
69 return nil
70}
71
72func (r *OffsetRequest) decode(pd packetDecoder, version int16) error {
73 r.Version = version
74
75 replicaID, err := pd.getInt32()
76 if err != nil {
77 return err
78 }
79 if replicaID >= 0 {
80 r.SetReplicaID(replicaID)
81 }
82
David K. Bainbridgebd6b2882021-08-26 13:31:02 +000083 if r.Version >= 2 {
84 tmp, err := pd.getBool()
85 if err != nil {
86 return err
87 }
88
89 r.IsolationLevel = ReadUncommitted
90 if tmp {
91 r.IsolationLevel = ReadCommitted
92 }
93 }
94
Scott Bakered4efab2020-01-13 19:12:25 -080095 blockCount, err := pd.getArrayLength()
96 if err != nil {
97 return err
98 }
99 if blockCount == 0 {
100 return nil
101 }
102 r.blocks = make(map[string]map[int32]*offsetRequestBlock)
103 for i := 0; i < blockCount; i++ {
104 topic, err := pd.getString()
105 if err != nil {
106 return err
107 }
108 partitionCount, err := pd.getArrayLength()
109 if err != nil {
110 return err
111 }
112 r.blocks[topic] = make(map[int32]*offsetRequestBlock)
113 for j := 0; j < partitionCount; j++ {
114 partition, err := pd.getInt32()
115 if err != nil {
116 return err
117 }
118 block := &offsetRequestBlock{}
119 if err := block.decode(pd, version); err != nil {
120 return err
121 }
122 r.blocks[topic][partition] = block
123 }
124 }
125 return nil
126}
127
128func (r *OffsetRequest) key() int16 {
129 return 2
130}
131
132func (r *OffsetRequest) version() int16 {
133 return r.Version
134}
135
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000136func (r *OffsetRequest) headerVersion() int16 {
137 return 1
138}
139
Scott Bakered4efab2020-01-13 19:12:25 -0800140func (r *OffsetRequest) requiredVersion() KafkaVersion {
141 switch r.Version {
142 case 1:
143 return V0_10_1_0
David K. Bainbridgebd6b2882021-08-26 13:31:02 +0000144 case 2:
145 return V0_11_0_0
Scott Bakered4efab2020-01-13 19:12:25 -0800146 default:
147 return MinVersion
148 }
149}
150
151func (r *OffsetRequest) SetReplicaID(id int32) {
152 r.replicaID = id
153 r.isReplicaIDSet = true
154}
155
156func (r *OffsetRequest) ReplicaID() int32 {
157 if r.isReplicaIDSet {
158 return r.replicaID
159 }
160 return -1
161}
162
163func (r *OffsetRequest) AddBlock(topic string, partitionID int32, time int64, maxOffsets int32) {
164 if r.blocks == nil {
165 r.blocks = make(map[string]map[int32]*offsetRequestBlock)
166 }
167
168 if r.blocks[topic] == nil {
169 r.blocks[topic] = make(map[int32]*offsetRequestBlock)
170 }
171
172 tmp := new(offsetRequestBlock)
173 tmp.time = time
174 if r.Version == 0 {
175 tmp.maxOffsets = maxOffsets
176 }
177
178 r.blocks[topic][partitionID] = tmp
179}